elasticsearch date_MySQL数据实时增量同步到Elasticsearch
Mysql到Elasticsearch的數據同步,一般用ETL來實現,但性能并不理想,目前大部分的ETL是定時查詢Mysql數據庫有沒有新增數據或者修改數據,如果數據量小影響不大,但如果幾百萬上千萬的數據量性能就明顯的下降很多,本文是使用Go實現的go-mysql-transfer中間件來實時監控Mysql的Binlog日志,然后同步到Elasticsearch,從實時性、性能效果都不錯。
一、go-mysql-transfer
go-mysql-transfer是使用Go語言實現的MySQL數據庫實時增量同步工具。能夠實時監聽MySQL二進制日志(binlog)的變動,將變更內容形成指定格式的消息,發送到接收端。在數據庫和接收端之間形成一個高性能、低延遲的增量數據(Binlog)同步管道, 具有如下特點:
1、不依賴其它組件,一鍵部署
2、集成多種接收端,如:Redis、MongoDB、Elasticsearch、RabbitMQ、Kafka、RocketMQ,不需要再編寫客戶端,開箱即用
3、內置豐富的數據解析、消息生成規則;支持Lua腳本,以處理更復雜的數據邏輯
4、支持監控告警,集成Prometheus客戶端
5、高可用集群部署
6、數據同步失敗重試
7、全量數據初始化
詳情及安裝說明 請參見: MySQL Binlog 增量同步工具go-mysql-transfer實現詳解
項目開源地址:go-mysql-transfer
二、配置
# app.ymltarget: elasticsearch #目標類型#elasticsearch連接配置es_addrs: 127.0.0.1:9200 #連接地址,多個用逗號分隔es_version: 7 # Elasticsearch版本,支持6和7、默認為7#es_password: # 用戶名#es_version: # 密碼三、數據轉換規則
相關配置如下:
rule: - schema: eseap #數據庫名稱 table: t_user #表名稱 #order_by_column: id #排序字段,存量數據同步時不能為空 #column_lower_case: true #列名稱轉為小寫,默認為false #column_upper_case:false#列名稱轉為大寫,默認為false column_underscore_to_camel: true #列名稱下劃線轉駝峰,默認為false # 包含的列,多值逗號分隔,如:id,name,age,area_id 為空時表示包含全部列 #include_columns: ID,USER_NAME,PASSWORD #exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗號分隔,如:id,name,age,area_id 默認為空 #default_column_values: area_name=合肥 #默認的列-值,多個用逗號分隔,如:source=binlog,area_name=合肥 #date_formatter: yyyy-MM-dd #date類型格式化, 不填寫默認yyyy-MM-dd #datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp類型格式化,不填寫默認yyyy-MM-dd HH:mm:ss #Elasticsearch相關 es_index: user_index #Index名稱,可以為空,默認使用表(Table)名稱 #es_mappings: #索引映射,可以為空,為空時根據數據類型自行推導ES推導 # - # column: REMARK #數據庫列名稱 # field: remark #映射后的ES字段名稱 # type: text #ES字段類型 # analyzer: ik_smart #ES分詞器,type為text此項有意義 # #format: #日期格式,type為date此項有意義 # - # column: USER_NAME #數據庫列名稱 # field: account #映射后的ES字段名稱 # type: keyword #ES字段類型示例一
t_user表,數據如下:
自動創建的Mapping,如下:
同步到Elasticsearch的數據如下:
示例二
t_user表,同實例一
使用如下配置:
rule: - schema: eseap #數據庫名稱 table: t_user #表名稱 order_by_column: id #排序字段,存量數據同步時不能為空 column_lower_case: true #列名稱轉為小寫,默認為false #column_upper_case:false#列名稱轉為大寫,默認為false #column_underscore_to_camel: true #列名稱下劃線轉駝峰,默認為false # 包含的列,多值逗號分隔,如:id,name,age,area_id 為空時表示包含全部列 #include_columns: ID,USER_NAME,PASSWORD #exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗號分隔,如:id,name,age,area_id 默認為空 default_column_values: area_name=合肥 #默認的列-值,多個用逗號分隔,如:source=binlog,area_name=合肥 #date_formatter: yyyy-MM-dd #date類型格式化, 不填寫默認yyyy-MM-dd #datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp類型格式化,不填寫默認yyyy-MM-dd HH:mm:ss #Elasticsearch相關 es_index: user_index #Index名稱,可以為空,默認使用表(Table)名稱 es_mappings: #索引映射,可以為空,為空時根據數據類型自行推導ES推導 - column: REMARK #數據庫列名稱 field: remark #映射后的ES字段名稱 type: text #ES字段類型 analyzer: ik_smart #ES分詞器,type為text此項有意義 #format: #日期格式,type為date此項有意義 - column: USER_NAME #數據庫列名稱 field: account #映射后的ES字段名稱 type: keyword #ES字段類型es_mappings 定義索引的mappings(映射關系),不定義es_mappings則使用列類型自動創建索引的mappings(映射關系)。
自動創建的Mapping,如下:
同步到Elasticsearch的數據如下:
四、Lua腳本
使用Lua腳本可以實現更復雜的數據處理邏輯,go-mysql-transfer支持Lua5.1語法。
示例一
t_user表,數據如下:
引入Lua腳本:
#規則配置 rule: - schema: eseap #數據庫名稱 table: t_user #表名稱 order_by_column: id #排序字段,存量數據同步時不能為空 lua_file_path: lua/t_user_es.lua #lua腳本文件 es_index: user_index #Elasticsearch Index名稱,可以為空,默認使用表(Table)名稱 es_mappings: #索引映射,可以為空,為空時根據數據類型自行推導ES推導 - field: id #映射后的ES字段名稱 type: keyword #ES字段類型 - field: userName #映射后的ES字段名稱 type: keyword #ES字段類型 - field: password #映射后的ES字段名稱 type: keyword #ES字段類型 - field: createTime #映射后的ES字段名稱 type: date #ES字段類型 format: yyyy-MM-dd HH:mm:ss #日期格式,type為date此項有意義 - field: remark #映射后的ES字段名稱 type: text #ES字段類型 analyzer: ik_smart #ES分詞器,type為text此項有意義 - field: source #映射后的ES字段名稱 type: keyword #ES字段類型es_mappings 定義索引的mappings(映射關系),不定義es_mappings則根據字段的值自動創建mappings(映射關系)。根據es_mappings 生成的mappings如下:
user_index索引mappings
Lua腳本:
local ops = require("esOps") --加載elasticsearch操作模塊local row = ops.rawRow() --當前數據庫的一行數據,table類型,key為列名稱local action = ops.rawAction() --當前數據庫事件,包括:insert、update、deletelocal id = row["ID"] --獲取ID列的值local userName = row["USER_NAME"] --獲取USER_NAME列的值local password = row["PASSWORD"] --獲取USER_NAME列的值local createTime = row["CREATE_TIME"] --獲取CREATE_TIME列的值local remark = row["REMARK"] --獲取REMARK列的值local result = {} -- 定義一個table,作為結果集result["id"] = idresult["userName"] = userNameresult["password"] = passwordresult["createTime"] = createTimeresult["remark"] = remarkresult["source"] = "binlog" -- 數據來源if action == "insert" then -- 只監聽新增事件 ops.INSERT("t_user",id,result) -- 新增,參數1為index名稱,string類型;參數2為要插入的數據主鍵;參數3為要插入的數據,tablele類型或者json字符串end同步到Elasticsearch的數據如下:
示例二
t_user表,同實例一
引入Lua腳本:
schema: eseap #數據庫名稱 table: t_user #表名稱 lua_file_path: lua/t_user_es2.lua #lua腳本文件未明確定義index名稱、mappings,es會根據值自動創建一個名為t_user的index。
使用如下腳本:
local ops = require("esOps") --加載elasticsearch操作模塊local row = ops.rawRow() --當前數據庫的一行數據,table類型,key為列名稱local action = ops.rawAction() --當前數據庫事件,包括:insert、update、deletelocal id = row["ID"] --獲取ID列的值local userName = row["USER_NAME"] --獲取USER_NAME列的值local password = row["PASSWORD"] --獲取USER_NAME列的值local createTime = row["CREATE_TIME"] --獲取CREATE_TIME列的值local result = {} -- 定義一個table,作為結果集result["id"] = idresult["userName"] = userNameresult["password"] = passwordresult["createTime"] = createTimeresult["remark"] = remarkresult["source"] = "binlog" -- 數據來源if action == "insert" then -- 只監聽新增事件 ops.INSERT("t_user",id,result) -- 新增,參數1為index名稱,string類型;參數2為要插入的數據主鍵;參數3為要插入的數據,tablele類型或者json字符串end同步到Elasticsearch的數據如下:
esOps模塊提供的方法如下:
文章來源:https://www.jianshu.com/p/5a9b6c4f318c
總結
以上是生活随笔為你收集整理的elasticsearch date_MySQL数据实时增量同步到Elasticsearch的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 基础功能4-画loss
- 下一篇: linux cmake编译源码,linu