MySQL到Elasticsearch数据同步
一、使用logstash進(jìn)行同步
下載logstash,地址:https://www.elastic.co/downloads/logstash,我這里使用的7.11.1版本,解壓后文件如下:
其中mysql文件夾是自己導(dǎo)的,因?yàn)槭褂玫氖莔ysql8.0以及mysql-connector-java-8.0.20這個(gè)jar包
到導(dǎo)入的ealsticsearch的mysql表的結(jié)構(gòu)如下:
bin文件夾下創(chuàng)建Logstash配置文件,文件名為mysql.conf
內(nèi)容配置如下:
更多詳細(xì)的配置:
- jdbc_driver_library: jdbc驅(qū)動(dòng)的路徑,在上一步中已經(jīng)下載
- jdbc_driver_class: 驅(qū)動(dòng)類(lèi)的名字,mysql填com.mysql.jdbc.Driver
- jdbc_connection_string: mysql 地址
- jdbc_user: mysql 用戶
- jdbc_password: mysql密碼
- schedule: 執(zhí)行sql時(shí)機(jī),類(lèi)似 crontab 的調(diào)度,上面配置表示每分鐘刷新一次。
- statement: 要執(zhí)行的sql,以 “:” 開(kāi)頭是定義的變量,可以通過(guò)parameters 來(lái)設(shè)置變量,這里的sql_last_value是內(nèi)置的變量,表示上一次sql執(zhí)行中> -> update_time的值
- statement_filepath:和上面statement參數(shù)二選一,存放需要執(zhí)行的SQL語(yǔ)句的文件位置,適用于多個(gè)sql語(yǔ)句的場(chǎng)景。
- use_column_value: 使用遞增列的值
- tracking_column_type: 遞增字段的類(lèi)型,numeric表示數(shù)值類(lèi)型,
- timestamp 表示時(shí)間戳類(lèi)型
- tracking_column: 遞增字段的名稱(chēng),這里使用updatetime這一列,這列的類(lèi)型是timestamp
- last_run_metadata_path: 同步點(diǎn)文件,這個(gè)文件記錄了上次的同步點(diǎn),重啟時(shí)會(huì)讀取這個(gè)文件,這個(gè)文件可以手動(dòng)修改
- index: 導(dǎo)入到es中的index名,這里我直接設(shè)置成了mysql表的名字
- document_id: 導(dǎo)入到es中的文檔id,這個(gè)需要設(shè)置成主鍵,否則同一條記錄更新后在es中會(huì)出現(xiàn)兩條記錄,%{id} 表示引用mysql表中id字段的值
es新建poem索引:(注意跟mysql的數(shù)據(jù)段名保持一致)
#創(chuàng)建索引 PUT /poem {"settings": {"number_of_shards": 1,"number_of_replicas": 0}, "mappings":{"properties":{"id":{"type":"text"},"name":{"type":"text"},"author":{"type":"text"},"type":{"type": "text"},"content":{"type":"text","analyzer": "ik_max_word","search_analyzer": "ik_max_word"},"href":{"type": "text"},"authordes":{"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_max_word"},"origin":{"type":"text","analyzer": "ik_max_word","search_analyzer": "ik_max_word"},"categoryId":{"type": "text"}}} }進(jìn)入bin目錄,啟動(dòng)logstash服務(wù),開(kāi)始同步mysql數(shù)據(jù)到es
二、使用阿里云開(kāi)源工具Canal
canal是阿里巴巴旗下的一款開(kāi)源項(xiàng)目,純Java開(kāi)發(fā)。基于數(shù)據(jù)庫(kù)增量日志解析,提供增量數(shù)據(jù)訂閱&消費(fèi),目前主要支持了MySQL(也支持mariaDB)。
當(dāng)前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
canal模擬mysql slave的交互協(xié)議,偽裝自己為mysql slave,向mysql master發(fā)送dump協(xié)議,mysql master收到dump請(qǐng)求,開(kāi)始推送binary log給slave(也就是canal)
canal解析binary log對(duì)象(原始為byte流)
簡(jiǎn)單來(lái)說(shuō),Canal 會(huì)將自己偽裝成 MySQL 從節(jié)點(diǎn)(Slave),并從主節(jié)點(diǎn)(Master)獲取 Binlog,解析和貯存后供下游消費(fèi)端使用。Canal 包含兩個(gè)組成部分:服務(wù)端和客戶端。服務(wù)端負(fù)責(zé)連接至不同的 MySQL 實(shí)例,并為每個(gè)實(shí)例維護(hù)一個(gè)事件消息隊(duì)列;客戶端則可以訂閱這些隊(duì)列中的數(shù)據(jù)變更事件,處理并存儲(chǔ)到數(shù)據(jù)倉(cāng)庫(kù)中。下面我們來(lái)看如何快速搭建起一個(gè) Canal 服務(wù)。
MySQL 默認(rèn)沒(méi)有開(kāi)啟 Binlog,因此我們需要對(duì) my.cnf 文件做以下修改:
server-id = 1 log_bin = /path/to/mysql-bin.log binlog_format = ROW注意 binlog_format 必須設(shè)置為 ROW, 因?yàn)樵?STATEMENT 或 MIXED 模式下, Binlog 只會(huì)記錄和傳輸
SQL 語(yǔ)句(以減少日志大小),而不包含具體數(shù)據(jù),我們也就無(wú)法保存了。
從節(jié)點(diǎn)通過(guò)一個(gè)專(zhuān)門(mén)的賬號(hào)連接主節(jié)點(diǎn),這個(gè)賬號(hào)需要擁有全局的 REPLICATION 權(quán)限。我們可以使用 GRANT 命令創(chuàng)建這樣的賬號(hào):
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';從 GitHub 項(xiàng)目發(fā)布頁(yè)中下載 Canal 服務(wù)端代碼(https://github.com/alibaba/canal/releases )
配置文件在 conf 文件夾下,有以下目錄結(jié)構(gòu):
canal.deployer/conf/canal.properties
canal.deployer/conf/instanceA/instance.properties
canal.deployer/conf/instanceB/instance.properties
conf/canal.properties 是主配置文件,如其中的 canal.port 用以指定服務(wù)端監(jiān)聽(tīng)的端口。instanceA/instance.properties 則是各個(gè)實(shí)例的配置文件,主要的配置項(xiàng)有:
# slaveId 不能與 my.cnf 中的 server-id 項(xiàng)重復(fù) canal.instance.mysql.slaveId = 1234 canal.instance.master.address = 127.0.0.1:3306 canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.connectionCharset = UTF-8 # 訂閱實(shí)例中所有的數(shù)據(jù)庫(kù)和表 canal.instance.filter.regex = .*\\..*從服務(wù)端消費(fèi)變更消息時(shí),我們需要?jiǎng)?chuàng)建一個(gè) Canal 客戶端,指定需要訂閱的數(shù)據(jù)庫(kù)和表,并開(kāi)啟輪詢(xún)。
參考文章
總結(jié)
以上是生活随笔為你收集整理的MySQL到Elasticsearch数据同步的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 服务器 Font family [‘sa
- 下一篇: python拼接mysql时遇到unsu