日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

elasticsearch date_MySQL数据实时增量同步到Elasticsearch

發布時間:2025/3/15 数据库 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 elasticsearch date_MySQL数据实时增量同步到Elasticsearch 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Mysql到Elasticsearch的數據同步,一般用ETL來實現,但性能并不理想,目前大部分的ETL是定時查詢Mysql數據庫有沒有新增數據或者修改數據,如果數據量小影響不大,但如果幾百萬上千萬的數據量性能就明顯的下降很多,本文是使用Go實現的go-mysql-transfer中間件來實時監控Mysql的Binlog日志,然后同步到Elasticsearch,從實時性、性能效果都不錯。

一、go-mysql-transfer

go-mysql-transfer是使用Go語言實現的MySQL數據庫實時增量同步工具。能夠實時監聽MySQL二進制日志(binlog)的變動,將變更內容形成指定格式的消息,發送到接收端。在數據庫和接收端之間形成一個高性能、低延遲的增量數據(Binlog)同步管道, 具有如下特點:

1、不依賴其它組件,一鍵部署

2、集成多種接收端,如:Redis、MongoDB、Elasticsearch、RabbitMQ、Kafka、RocketMQ,不需要再編寫客戶端,開箱即用

3、內置豐富的數據解析、消息生成規則;支持Lua腳本,以處理更復雜的數據邏輯

4、支持監控告警,集成Prometheus客戶端

5、高可用集群部署

6、數據同步失敗重試

7、全量數據初始化

詳情及安裝說明 請參見: MySQL Binlog 增量同步工具go-mysql-transfer實現詳解

項目開源地址:go-mysql-transfer

二、配置

# app.ymltarget: elasticsearch #目標類型#elasticsearch連接配置es_addrs: 127.0.0.1:9200 #連接地址,多個用逗號分隔es_version: 7 # Elasticsearch版本,支持6和7、默認為7#es_password: # 用戶名#es_version: # 密碼

三、數據轉換規則

相關配置如下:

rule: - schema: eseap #數據庫名稱 table: t_user #表名稱 #order_by_column: id #排序字段,存量數據同步時不能為空 #column_lower_case: true #列名稱轉為小寫,默認為false #column_upper_case:false#列名稱轉為大寫,默認為false column_underscore_to_camel: true #列名稱下劃線轉駝峰,默認為false # 包含的列,多值逗號分隔,如:id,name,age,area_id 為空時表示包含全部列 #include_columns: ID,USER_NAME,PASSWORD #exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗號分隔,如:id,name,age,area_id 默認為空 #default_column_values: area_name=合肥 #默認的列-值,多個用逗號分隔,如:source=binlog,area_name=合肥 #date_formatter: yyyy-MM-dd #date類型格式化, 不填寫默認yyyy-MM-dd #datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp類型格式化,不填寫默認yyyy-MM-dd HH:mm:ss #Elasticsearch相關 es_index: user_index #Index名稱,可以為空,默認使用表(Table)名稱 #es_mappings: #索引映射,可以為空,為空時根據數據類型自行推導ES推導 # - # column: REMARK #數據庫列名稱 # field: remark #映射后的ES字段名稱 # type: text #ES字段類型 # analyzer: ik_smart #ES分詞器,type為text此項有意義 # #format: #日期格式,type為date此項有意義 # - # column: USER_NAME #數據庫列名稱 # field: account #映射后的ES字段名稱 # type: keyword #ES字段類型

示例一

t_user表,數據如下:

自動創建的Mapping,如下:

同步到Elasticsearch的數據如下:

示例二

t_user表,同實例一

使用如下配置:

rule: - schema: eseap #數據庫名稱 table: t_user #表名稱 order_by_column: id #排序字段,存量數據同步時不能為空 column_lower_case: true #列名稱轉為小寫,默認為false #column_upper_case:false#列名稱轉為大寫,默認為false #column_underscore_to_camel: true #列名稱下劃線轉駝峰,默認為false # 包含的列,多值逗號分隔,如:id,name,age,area_id 為空時表示包含全部列 #include_columns: ID,USER_NAME,PASSWORD #exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗號分隔,如:id,name,age,area_id 默認為空 default_column_values: area_name=合肥 #默認的列-值,多個用逗號分隔,如:source=binlog,area_name=合肥 #date_formatter: yyyy-MM-dd #date類型格式化, 不填寫默認yyyy-MM-dd #datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp類型格式化,不填寫默認yyyy-MM-dd HH:mm:ss #Elasticsearch相關 es_index: user_index #Index名稱,可以為空,默認使用表(Table)名稱 es_mappings: #索引映射,可以為空,為空時根據數據類型自行推導ES推導 - column: REMARK #數據庫列名稱 field: remark #映射后的ES字段名稱 type: text #ES字段類型 analyzer: ik_smart #ES分詞器,type為text此項有意義 #format: #日期格式,type為date此項有意義 - column: USER_NAME #數據庫列名稱 field: account #映射后的ES字段名稱 type: keyword #ES字段類型

es_mappings 定義索引的mappings(映射關系),不定義es_mappings則使用列類型自動創建索引的mappings(映射關系)。

自動創建的Mapping,如下:

同步到Elasticsearch的數據如下:

四、Lua腳本

使用Lua腳本可以實現更復雜的數據處理邏輯,go-mysql-transfer支持Lua5.1語法。

示例一

t_user表,數據如下:

引入Lua腳本:

#規則配置 rule: - schema: eseap #數據庫名稱 table: t_user #表名稱 order_by_column: id #排序字段,存量數據同步時不能為空 lua_file_path: lua/t_user_es.lua #lua腳本文件 es_index: user_index #Elasticsearch Index名稱,可以為空,默認使用表(Table)名稱 es_mappings: #索引映射,可以為空,為空時根據數據類型自行推導ES推導 - field: id #映射后的ES字段名稱 type: keyword #ES字段類型 - field: userName #映射后的ES字段名稱 type: keyword #ES字段類型 - field: password #映射后的ES字段名稱 type: keyword #ES字段類型 - field: createTime #映射后的ES字段名稱 type: date #ES字段類型 format: yyyy-MM-dd HH:mm:ss #日期格式,type為date此項有意義 - field: remark #映射后的ES字段名稱 type: text #ES字段類型 analyzer: ik_smart #ES分詞器,type為text此項有意義 - field: source #映射后的ES字段名稱 type: keyword #ES字段類型

es_mappings 定義索引的mappings(映射關系),不定義es_mappings則根據字段的值自動創建mappings(映射關系)。根據es_mappings 生成的mappings如下:

user_index索引mappings

Lua腳本:

local ops = require("esOps") --加載elasticsearch操作模塊local row = ops.rawRow() --當前數據庫的一行數據,table類型,key為列名稱local action = ops.rawAction() --當前數據庫事件,包括:insert、update、deletelocal id = row["ID"] --獲取ID列的值local userName = row["USER_NAME"] --獲取USER_NAME列的值local password = row["PASSWORD"] --獲取USER_NAME列的值local createTime = row["CREATE_TIME"] --獲取CREATE_TIME列的值local remark = row["REMARK"] --獲取REMARK列的值local result = {} -- 定義一個table,作為結果集result["id"] = idresult["userName"] = userNameresult["password"] = passwordresult["createTime"] = createTimeresult["remark"] = remarkresult["source"] = "binlog" -- 數據來源if action == "insert" then -- 只監聽新增事件 ops.INSERT("t_user",id,result) -- 新增,參數1為index名稱,string類型;參數2為要插入的數據主鍵;參數3為要插入的數據,tablele類型或者json字符串end

同步到Elasticsearch的數據如下:

示例二

t_user表,同實例一

引入Lua腳本:

schema: eseap #數據庫名稱 table: t_user #表名稱 lua_file_path: lua/t_user_es2.lua #lua腳本文件

未明確定義index名稱、mappings,es會根據值自動創建一個名為t_user的index。

使用如下腳本:

local ops = require("esOps") --加載elasticsearch操作模塊local row = ops.rawRow() --當前數據庫的一行數據,table類型,key為列名稱local action = ops.rawAction() --當前數據庫事件,包括:insert、update、deletelocal id = row["ID"] --獲取ID列的值local userName = row["USER_NAME"] --獲取USER_NAME列的值local password = row["PASSWORD"] --獲取USER_NAME列的值local createTime = row["CREATE_TIME"] --獲取CREATE_TIME列的值local result = {} -- 定義一個table,作為結果集result["id"] = idresult["userName"] = userNameresult["password"] = passwordresult["createTime"] = createTimeresult["remark"] = remarkresult["source"] = "binlog" -- 數據來源if action == "insert" then -- 只監聽新增事件 ops.INSERT("t_user",id,result) -- 新增,參數1為index名稱,string類型;參數2為要插入的數據主鍵;參數3為要插入的數據,tablele類型或者json字符串end

同步到Elasticsearch的數據如下:

esOps模塊提供的方法如下:

  • INSERT: 插入操作,如:ops.INSERT(index,id,result)。參數index為索引名稱,字符串類型;參數index為要插入數據的主鍵;參數result為要插入的數據,可以為table類型或者json字符串
  • UPDATE: 修改操作,如:ops.UPDATE(index,id,result)。參數index為索引名稱,字符串類型;參數index為要修改數據的主鍵;參數result為要修改的數據,可以為table類型或者json字符串
  • DELETE: 刪除操作,如:ops.DELETE(index,id)。參數index為索引名稱,字符串類型;參數id為要刪除的數據主鍵,類型不限;

  • 文章來源:https://www.jianshu.com/p/5a9b6c4f318c

    總結

    以上是生活随笔為你收集整理的elasticsearch date_MySQL数据实时增量同步到Elasticsearch的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。