日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 运维知识 > 数据库 >内容正文

数据库

elasticsearch解决同步删除数据库中不存在的数据

發(fā)布時(shí)間:2023/12/10 数据库 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 elasticsearch解决同步删除数据库中不存在的数据 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

????????摘要: jdbc-input-plugin 只能實(shí)現(xiàn)數(shù)據(jù)庫(kù)的追加,對(duì)于 elasticsearch 增量寫入,但經(jīng)常jdbc源一端的數(shù)據(jù)庫(kù)可能會(huì)做數(shù)據(jù)庫(kù)刪除或者更新操作。這樣一來(lái)數(shù)據(jù)庫(kù)與搜索引擎的數(shù)據(jù)庫(kù)就出現(xiàn)了不對(duì)稱的情況。當(dāng)然你如果有開發(fā)團(tuán)隊(duì)可以寫程序在刪除或者更新的時(shí)候同步對(duì)搜索引擎操作。如果你沒(méi)有這個(gè)能力,可以嘗試下面的方法。

?

?

?

解決MySQL與Elasticsearch?數(shù)據(jù)不對(duì)稱問(wèn)題

jdbc-input-plugin 只能實(shí)現(xiàn)數(shù)據(jù)庫(kù)的追加,對(duì)于 elasticsearch 增量寫入,但經(jīng)常jdbc源一端的數(shù)據(jù)庫(kù)可能會(huì)做數(shù)據(jù)庫(kù)刪除或者更新操作。這樣一來(lái)數(shù)據(jù)庫(kù)與搜索引擎的數(shù)據(jù)庫(kù)就出現(xiàn)了不對(duì)稱的情況。

當(dāng)然你如果有開發(fā)團(tuán)隊(duì)可以寫程序在刪除或者更新的時(shí)候同步對(duì)搜索引擎操作。如果你沒(méi)有這個(gè)能力,可以嘗試下面的方法。

這里有一個(gè)數(shù)據(jù)表 article , mtime 字段定義了 ON UPDATE CURRENT_TIMESTAMP 所以每次更新mtime的時(shí)間都會(huì)變化

mysql> desc article; +-------------+--------------+------+-----+--------------------------------+-------+ | Field | Type | Null | Key | Default | Extra | +-------------+--------------+------+-----+--------------------------------+-------+ | id | int(11) | NO | | 0 | | | title | mediumtext | NO | | NULL | | | description | mediumtext | YES | | NULL | | | author | varchar(100) | YES | | NULL | | | source | varchar(100) | YES | | NULL | | | content | longtext | YES | | NULL | | | status | enum('Y','N')| NO | | 'N' | | | ctime | timestamp | NO | | CURRENT_TIMESTAMP | | | mtime | timestamp | YES | | ON UPDATE CURRENT_TIMESTAMP | | +-------------+--------------+------+-----+--------------------------------+-------+ 7 rows in set (0.00 sec)| Field | Type | Null | Key | Default | Extra | +-------------+--------------+------+-----+--------------------------------+-------+ | id | int(11) | NO | | 0 | | | title | mediumtext | NO | | NULL | | | description | mediumtext | YES | | NULL | | | author | varchar(100) | YES | | NULL | | | source | varchar(100) | YES | | NULL | | | content | longtext | YES | | NULL | | | status | enum('Y','N')| NO | | 'N' | | | ctime | timestamp | NO | | CURRENT_TIMESTAMP | | | mtime | timestamp | YES | | ON UPDATE CURRENT_TIMESTAMP | | +-------------+--------------+------+-----+--------------------------------+-------+ 7 rows in set (0.00 sec)

logstash 增加 mtime 的查詢規(guī)則

jdbc {jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"jdbc_user => "cms"jdbc_password => "password"schedule => "* * * * *" #定時(shí)cron的表達(dá)式,這里是每分鐘執(zhí)行一次statement => "select * from article where mtime > :sql_last_value"use_column_value => truetracking_column => "mtime"tracking_column_type => "timestamp" record_last_run => truelast_run_metadata_path => "/var/tmp/article-mtime.last"}"/usr/share/java/mysql-connector-java.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"jdbc_user => "cms"jdbc_password => "password"schedule => "* * * * *" #定時(shí)cron的表達(dá)式,這里是每分鐘執(zhí)行一次statement => "select * from article where mtime > :sql_last_value"use_column_value => truetracking_column => "mtime"tracking_column_type => "timestamp" record_last_run => truelast_run_metadata_path => "/var/tmp/article-mtime.last"}

創(chuàng)建回收站表,這個(gè)事用于解決數(shù)據(jù)庫(kù)刪除,或者禁用 status = 'N' 這種情況的。

CREATE TABLE `elasticsearch_trash` (`id` int(11) NOT NULL,`ctime` timestamp NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 TABLE `elasticsearch_trash` (`id` int(11) NOT NULL,`ctime` timestamp NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8

為 article 表創(chuàng)建觸發(fā)器

CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_UPDATE` BEFORE UPDATE ON `article` FOR EACH ROW BEGIN-- 此處的邏輯是解決文章狀態(tài)變?yōu)?N 的時(shí)候,需要將搜索引擎中對(duì)應(yīng)的數(shù)據(jù)刪除。IF NEW.status = 'N' THENinsert into elasticsearch_trash(id) values(OLD.id);END IF;-- 此處邏輯是修改狀態(tài)到 Y 的時(shí)候,方式elasticsearch_trash仍然存在該文章ID,導(dǎo)致誤刪除。所以需要?jiǎng)h除回收站中得回收記錄。IF NEW.status = 'Y' THENdelete from elasticsearch_trash where id = OLD.id;END IF; ENDCREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_DELETE` BEFORE DELETE ON `article` FOR EACH ROW BEGIN-- 此處邏輯是文章被刪除同事將改文章放入搜索引擎回收站。insert into elasticsearch_trash(id) values(OLD.id); END DEFINER=`dba`@`%` TRIGGER `article_BEFORE_UPDATE` BEFORE UPDATE ON `article` FOR EACH ROW BEGIN-- 此處的邏輯是解決文章狀態(tài)變?yōu)?N 的時(shí)候,需要將搜索引擎中對(duì)應(yīng)的數(shù)據(jù)刪除。IF NEW.status = 'N' THENinsert into elasticsearch_trash(id) values(OLD.id);END IF;-- 此處邏輯是修改狀態(tài)到 Y 的時(shí)候,方式elasticsearch_trash仍然存在該文章ID,導(dǎo)致誤刪除。所以需要?jiǎng)h除回收站中得回收記錄。IF NEW.status = 'Y' THENdelete from elasticsearch_trash where id = OLD.id;END IF; ENDCREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_DELETE` BEFORE DELETE ON `article` FOR EACH ROW BEGIN-- 此處邏輯是文章被刪除同事將改文章放入搜索引擎回收站。insert into elasticsearch_trash(id) values(OLD.id); END

接下來(lái)我們需要寫一個(gè)簡(jiǎn)單地 Shell 每分鐘運(yùn)行一次,從 elasticsearch_trash 數(shù)據(jù)表中取出數(shù)據(jù),然后使用 curl 命令調(diào)用 elasticsearch restful 接口,刪除被收回的數(shù)據(jù)。

你還可以開發(fā)相關(guān)的程序,這里提供一個(gè) Spring boot 定時(shí)任務(wù)例子。

實(shí)體

package cn.netkiller.api.domain.elasticsearch;import java.util.Date;import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.Id; import javax.persistence.Table;@Entity @Table public class ElasticsearchTrash {@Idprivate int id;@Column(columnDefinition = "TIMESTAMP DEFAULT CURRENT_TIMESTAMP")private Date ctime;public int getId() {return id;}public void setId(int id) {this.id = id;}public Date getCtime() {return ctime;}public void setCtime(Date ctime) {this.ctime = ctime;}}

倉(cāng)庫(kù)?

package cn.netkiller.api.repository.elasticsearch;import org.springframework.data.repository.CrudRepository;import com.example.api.domain.elasticsearch.ElasticsearchTrash;public interface ElasticsearchTrashRepository extends CrudRepository<ElasticsearchTrash, Integer>{}

定時(shí)任務(wù)?

package cn.netkiller.api.schedule;import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.rest.RestStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component;import com.example.api.domain.elasticsearch.ElasticsearchTrash; import com.example.api.repository.elasticsearch.ElasticsearchTrashRepository;@Component public class ScheduledTasks {private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);@Autowiredprivate TransportClient client;@Autowiredprivate ElasticsearchTrashRepository alasticsearchTrashRepository;public ScheduledTasks() {}@Scheduled(fixedRate = 1000 * 60) // 60秒運(yùn)行一次調(diào)度任務(wù)public void cleanTrash() {for (ElasticsearchTrash elasticsearchTrash : alasticsearchTrashRepository.findAll()) {DeleteResponse response = client.prepareDelete("information", "article", elasticsearchTrash.getId() + "").get();RestStatus status = response.status();logger.info("delete {} {}", elasticsearchTrash.getId(), status.toString());if (status == RestStatus.OK || status == RestStatus.NOT_FOUND) {alasticsearchTrashRepository.delete(elasticsearchTrash);}}} }

Spring boot 啟動(dòng)主程序。?

package cn.netkiller.api;import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling;@SpringBootApplication @EnableScheduling public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);} }

本文轉(zhuǎn)載自:

https://my.oschina.net/neochen/blog/1518679#comment-list

?

本人最近開了一個(gè)公眾號(hào),會(huì)講一些常用的技術(shù),以及面試題,歡迎關(guān)注

掃碼關(guān)注,每天獲取最前沿的互聯(lián)網(wǎng)知識(shí)~

?

總結(jié)

以上是生活随笔為你收集整理的elasticsearch解决同步删除数据库中不存在的数据的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。