日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

使用canal同步MySQL数据到Elasticsearch(ES)

發布時間:2024/9/30 数据库 56 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用canal同步MySQL数据到Elasticsearch(ES) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

  • 1、功能及使用場景
    • 1.1、功能介紹
    • 1.2、使用場景
  • 2、需求引入
  • 3、canal文件下載及準備
    • 3.1 下載文件
    • 3.2 準備文件
  • 4、deployer安裝及效果測試
    • 4.1、deployer 配置修改
      • 4.1.1 準備
      • 4.1.2 修改連接數據庫信息
    • 4.2 啟動deployer
    • 4.3 測試deployer效果
      • 4.3.1 在本地電腦新建普通maven工程
      • 4.3.2 新建ClientSimple類
  • 5、adapter安裝及效果測試
    • 5.1 修改配置
      • 5.1.1 修改啟動器配置: application.yml
      • 5.1.2 修改 conf/es/mytest_user.yml文件
    • 5.2 啟動adapter
    • 5.3 效果測試
  • 6、全量數據導入

1、功能及使用場景

1.1、功能介紹

canal是阿里巴巴開源的mysql數據傳輸組件,基于mysql binlog,提供了準確、實時的數據傳輸服務。有關binlog介紹,參見binlog介紹。
以下來自官方GitHub介紹。GitHub地址


canal [k?’n?l],譯意為水道/管道/溝渠,主要用途是基于 MySQL 數據庫增量日志解析,提供增量數據訂閱和消費

早期阿里巴巴因為杭州和美國雙機房部署,存在跨機房同步的業務需求,實現方式主要是基于業務 trigger 獲取增量變更。從 2010 年開始,業務逐步嘗試數據庫日志解析獲取增量變更進行同步,由此衍生出了大量的數據庫增量訂閱和消費業務。

基于日志增量訂閱和消費的業務包括

  • 數據庫鏡像
  • 數據庫實時備份
  • 索引構建和實時維護(拆分異構索引、倒排索引等)
  • 業務 cache 刷新
  • 帶業務邏輯的增量數據處理

當前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

1.2、使用場景

根據官方文檔,目標系統支持MySQL、kafka、elasticsearch、hbase、rocketMQ、pulsar等。

本文主要介紹MySQL同步到Elasticsearch的使用。

2、需求引入

整個過程用一個示例來介紹canal的安裝使用。

存在如下兩張表(一對多關系),用戶信息表和用戶權限表,需要將用戶信息以及權限同步到es

user_info(用戶信息表):

idnamerole_id
1張三1
2李四2
3王大錘2

role(權限表):

role_idrole_name
1管理員
2測試員

查詢sql:

SELECT a.id AS _id, a.name, a.role_id, b.role_name FROM user_info a LEFT JOIN role b ON b.role_id = a.role_id

查詢結果:

_idnamerole_idrole_name
3王大錘2測試員
1張三1管理員
2李四1管理員

對應的es索引結構:

{"user_index": {"mappings": {"_doc": {"properties": {"name": {"type": "text"},"role_id": {"type": "long"},"role_name": {"type": "text"}}}}} }

3、canal文件下載及準備

3.1 下載文件

官方GitHub下載地址:https://github.com/alibaba/canal/releases
下載canal服務端(canal.deployer-1.1.4.tar.gz)和客戶端(canal.adapter-1.1.4.tar.gz),如下圖。

下載完成如下

3.2 準備文件


命令運行完成后,進入adapter和deployer可以看到如下結構(忽略我本機的.DS_Store)
adapter:

deployer:

4、deployer安裝及效果測試

4.1、deployer 配置修改

4.1.1 準備

(針對阿里云 RDS for MySQL , 默認打開了 binlog , 并且賬號默認具有 binlog dump 權限 , 不需要任何權限或者 binlog 設置,可以直接跳過這一步)

  • 對于自建 MySQL , 需要先開啟 Binlog 寫入功能,配置 binlog-format 為 ROW 模式,my.cnf 中配置如下
    [mysqld]
    log-bin=mysql-bin # 開啟 binlog
    binlog-format=ROW # 選擇 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復

  • 授權 canal 鏈接 MySQL 賬號具有作為 MySQL slave 的權限, 如果已有賬戶可直接 grant,下面新建了canal賬號

CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;

4.1.2 修改連接數據庫信息

vi conf/example/instance.properties
修改如下標紅信息

4.2 啟動deployer

在deployer目錄運行啟動腳本

sh bin/startup.sh

  • 查看 server 日志
    tail -f logs/canal/canal.log
  • 查看 instance 的日志
    tail -f logs/example/example.log

看到如上日志,標志啟動成功

4.3 測試deployer效果

(4.3 步驟可跳過,主要為驗證deployer端效果)

4.3.1 在本地電腦新建普通maven工程


pom文件,依賴如下pom

<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version> </dependency>

4.3.2 新建ClientSimple類

粘貼測試代碼(下面鏈接頁面上的ClientSimple代碼)
https://github.com/alibaba/canal/wiki/ClientExample

將圈紅ip改為部署adapter的ip,然后直接啟動此main方法

啟動完成看到如下日志:

日志會循環打印count
此時觸發數據庫變更

user_info表變更前

user_info表變更后,新加了一條名叫邏輯的記錄

可在日志處觀察到變更信息,標志著deployer監聽mysql binlog變更成功

5、adapter安裝及效果測試

canal adapter 的 Elastic Search 版本支持6.x.x以上

官方文檔地址
https://github.com/alibaba/canal/wiki/Sync-ES

5.1 修改配置

5.1.1 修改啟動器配置: application.yml

進adapter/conf目錄
vi application.yml
修改如下配置,注意縮進格式,yml文件嚴格縮進格式,格式錯誤會引起啟動失敗問題。

mode使用rest模式,測試使用transport會出問題,目前沒找到原因,下圖mode還沒更改

5.1.2 修改 conf/es/mytest_user.yml文件

adapter將會自動加載 conf/es 下的所有.yml結尾的配置文件
不需要的文件可刪除,只配置需要的yml文件
修改如下配置,esMapping信息,包括 index,type,sql,其中sql盡量保證不要換行,在文本編輯器中編輯成一行,在粘貼進去,否則可能會出問題(還是由于yml格式問題)
此處的sql的字段別名即是es字段名,不寫別名,默認原名即是es字段名,有關詳細說明,可參見官方文檔https://github.com/alibaba/canal/wiki/Sync-ES
etlCondition 可以注釋掉,我們默認任何條件都同步

5.2 啟動adapter

啟動命令 sh bin/startup.sh
查看日志:
tail -f logs/adapter/adapter.log
觀察如下日志,即表示啟動成功

5.3 效果測試

canal是一個MySQL增量訂閱組件,所以不支持數據的初始化
我們需要在數據庫觸發變更,才能將數據同步到es
變更前:
es數據->在kibana查詢對應數據,可以看到右側數據為空

mysql數據→使用查詢sql,查詢到數據如下


下面,我們進行數據變更:

比如將張三名字變成張六,MySQL數據如下:

此時,es數據在MySQL數據變更同時,es數據相應變更,如下,可以觀察到,數據變更已經成功從MySQL同步到elasticsearch

需要注意的是,由于我們只變更了user_info表,所以此處只同步了user_info表的name和id字段,role_name字段,并沒有同步,只有role_name字段變更時,才會被同步,所以實際使用時,要先做好數據初始化工作。


同時,可在logs/adapter/adapter.log觀察到數據變更日志


至此,全部操作完成

6、全量數據導入

由于canal只支持增量導入,所以官方adapter提供了全量導入手動觸發的功能,

canal全表同步(etl功能,手動觸發)
參見源碼:

/*** ETL curl http://127.0.0.1:8081/etl/hbase/mytest_person2.yml -X POST** @param type 類型 hbase, es* @param task 任務名對應配置文件名 mytest_person2.yml* @param params etl where條件參數, 為空全部導入*/ @PostMapping("/etl/{type}/{task}") public EtlResult etl(@PathVariable String type, @PathVariable String task,@RequestParam(name = "params", required = false) String params) {return etl(type, null, task, params); }

只需要發送http命令即可

例:如下,ip和端口是部署canal adapter的IP和端口,然后類型選es,后面在跟上對應的yml配置文件即可

curl http://127.0.0.1:8081/etl/es/mytest_person2.yml -X POST

導入成功提示

與50位技術專家面對面20年技術見證,附贈技術全景圖

總結

以上是生活随笔為你收集整理的使用canal同步MySQL数据到Elasticsearch(ES)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。