日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 运维知识 > 数据库 >内容正文

数据库

MySQL到Elasticsearch数据同步

發(fā)布時(shí)間:2025/3/15 数据库 46 豆豆
生活随笔 收集整理的這篇文章主要介紹了 MySQL到Elasticsearch数据同步 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

一、使用logstash進(jìn)行同步

下載logstash,地址:https://www.elastic.co/downloads/logstash,我這里使用的7.11.1版本,解壓后文件如下:

其中mysql文件夾是自己導(dǎo)的,因?yàn)槭褂玫氖莔ysql8.0以及mysql-connector-java-8.0.20這個(gè)jar包

到導(dǎo)入的ealsticsearch的mysql表的結(jié)構(gòu)如下:

bin文件夾下創(chuàng)建Logstash配置文件,文件名為mysql.conf

內(nèi)容配置如下:

input{ stdin{} jdbc{ jdbc_connection_string => "jdbc:mysql://xxx.xxx.xxx.xxx:3306/ELASTICSEARCH?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC" jdbc_user => "xxx" jdbc_password => "xxxxxx" jdbc_driver_library => "D:\logstash-7.11.1\mysql\mysql-connector-java-8.0.20.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" statement => "SELECT * FROM t_poem" schedule => "* * * * *" } } output { stdout { codec => json_lines } elasticsearch { hosts => "http://xxx.xxx.xxx.xxx:9200" #如果elasticsearsh有配置用戶密碼的話,配下以下 user => xxx password => xxxxxxxxx #事先在es中建立的索引 index => "poem" #使用mysql數(shù)據(jù)中的id作為es的id,注意,id只是名稱(chēng),如果數(shù)據(jù)庫(kù)中的名稱(chēng)不是id,那么這里也需要修改 document_id => "%{id}" } }

更多詳細(xì)的配置:

  • jdbc_driver_library: jdbc驅(qū)動(dòng)的路徑,在上一步中已經(jīng)下載
  • jdbc_driver_class: 驅(qū)動(dòng)類(lèi)的名字,mysql填com.mysql.jdbc.Driver
  • jdbc_connection_string: mysql 地址
  • jdbc_user: mysql 用戶
  • jdbc_password: mysql密碼
  • schedule: 執(zhí)行sql時(shí)機(jī),類(lèi)似 crontab 的調(diào)度,上面配置表示每分鐘刷新一次。
  • statement: 要執(zhí)行的sql,以 “:” 開(kāi)頭是定義的變量,可以通過(guò)parameters 來(lái)設(shè)置變量,這里的sql_last_value是內(nèi)置的變量,表示上一次sql執(zhí)行中> -> update_time的值
  • statement_filepath:和上面statement參數(shù)二選一,存放需要執(zhí)行的SQL語(yǔ)句的文件位置,適用于多個(gè)sql語(yǔ)句的場(chǎng)景。
  • use_column_value: 使用遞增列的值
  • tracking_column_type: 遞增字段的類(lèi)型,numeric表示數(shù)值類(lèi)型,
  • timestamp 表示時(shí)間戳類(lèi)型
  • tracking_column: 遞增字段的名稱(chēng),這里使用updatetime這一列,這列的類(lèi)型是timestamp
  • last_run_metadata_path: 同步點(diǎn)文件,這個(gè)文件記錄了上次的同步點(diǎn),重啟時(shí)會(huì)讀取這個(gè)文件,這個(gè)文件可以手動(dòng)修改
  • index: 導(dǎo)入到es中的index名,這里我直接設(shè)置成了mysql表的名字
  • document_id: 導(dǎo)入到es中的文檔id,這個(gè)需要設(shè)置成主鍵,否則同一條記錄更新后在es中會(huì)出現(xiàn)兩條記錄,%{id} 表示引用mysql表中id字段的值

es新建poem索引:(注意跟mysql的數(shù)據(jù)段名保持一致)

#創(chuàng)建索引 PUT /poem {"settings": {"number_of_shards": 1,"number_of_replicas": 0}, "mappings":{"properties":{"id":{"type":"text"},"name":{"type":"text"},"author":{"type":"text"},"type":{"type": "text"},"content":{"type":"text","analyzer": "ik_max_word","search_analyzer": "ik_max_word"},"href":{"type": "text"},"authordes":{"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_max_word"},"origin":{"type":"text","analyzer": "ik_max_word","search_analyzer": "ik_max_word"},"categoryId":{"type": "text"}}} }

進(jìn)入bin目錄,啟動(dòng)logstash服務(wù),開(kāi)始同步mysql數(shù)據(jù)到es

二、使用阿里云開(kāi)源工具Canal

canal是阿里巴巴旗下的一款開(kāi)源項(xiàng)目,純Java開(kāi)發(fā)。基于數(shù)據(jù)庫(kù)增量日志解析,提供增量數(shù)據(jù)訂閱&消費(fèi),目前主要支持了MySQL(也支持mariaDB)。
當(dāng)前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

  • canal工作原理
  • canal模擬mysql slave的交互協(xié)議,偽裝自己為mysql slave,向mysql master發(fā)送dump協(xié)議,mysql master收到dump請(qǐng)求,開(kāi)始推送binary log給slave(也就是canal)
    canal解析binary log對(duì)象(原始為byte流)

  • Canal 的組成部分
  • 簡(jiǎn)單來(lái)說(shuō),Canal 會(huì)將自己偽裝成 MySQL 從節(jié)點(diǎn)(Slave),并從主節(jié)點(diǎn)(Master)獲取 Binlog,解析和貯存后供下游消費(fèi)端使用。Canal 包含兩個(gè)組成部分:服務(wù)端和客戶端。服務(wù)端負(fù)責(zé)連接至不同的 MySQL 實(shí)例,并為每個(gè)實(shí)例維護(hù)一個(gè)事件消息隊(duì)列;客戶端則可以訂閱這些隊(duì)列中的數(shù)據(jù)變更事件,處理并存儲(chǔ)到數(shù)據(jù)倉(cāng)庫(kù)中。下面我們來(lái)看如何快速搭建起一個(gè) Canal 服務(wù)。

  • 配置 MySQL 主節(jié)點(diǎn)
  • MySQL 默認(rèn)沒(méi)有開(kāi)啟 Binlog,因此我們需要對(duì) my.cnf 文件做以下修改:

    server-id = 1 log_bin = /path/to/mysql-bin.log binlog_format = ROW

    注意 binlog_format 必須設(shè)置為 ROW, 因?yàn)樵?STATEMENT 或 MIXED 模式下, Binlog 只會(huì)記錄和傳輸
    SQL 語(yǔ)句(以減少日志大小),而不包含具體數(shù)據(jù),我們也就無(wú)法保存了。

    從節(jié)點(diǎn)通過(guò)一個(gè)專(zhuān)門(mén)的賬號(hào)連接主節(jié)點(diǎn),這個(gè)賬號(hào)需要擁有全局的 REPLICATION 權(quán)限。我們可以使用 GRANT 命令創(chuàng)建這樣的賬號(hào):

    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
  • 啟動(dòng) Canal 服務(wù)端
    從 GitHub 項(xiàng)目發(fā)布頁(yè)中下載 Canal 服務(wù)端代碼(https://github.com/alibaba/canal/releases )
    配置文件在 conf 文件夾下,有以下目錄結(jié)構(gòu):
  • canal.deployer/conf/canal.properties
    canal.deployer/conf/instanceA/instance.properties
    canal.deployer/conf/instanceB/instance.properties

    conf/canal.properties 是主配置文件,如其中的 canal.port 用以指定服務(wù)端監(jiān)聽(tīng)的端口。instanceA/instance.properties 則是各個(gè)實(shí)例的配置文件,主要的配置項(xiàng)有:

    # slaveId 不能與 my.cnf 中的 server-id 項(xiàng)重復(fù) canal.instance.mysql.slaveId = 1234 canal.instance.master.address = 127.0.0.1:3306 canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.connectionCharset = UTF-8 # 訂閱實(shí)例中所有的數(shù)據(jù)庫(kù)和表 canal.instance.filter.regex = .*\\..*
  • 編寫(xiě) Canal 客戶端
  • 從服務(wù)端消費(fèi)變更消息時(shí),我們需要?jiǎng)?chuàng)建一個(gè) Canal 客戶端,指定需要訂閱的數(shù)據(jù)庫(kù)和表,并開(kāi)啟輪詢(xún)。

    參考文章

    總結(jié)

    以上是生活随笔為你收集整理的MySQL到Elasticsearch数据同步的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。