日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

日志文件和mysql同步到kafka_logstash_output_kafka:Mysql 同步 Kafka 深入详解

發(fā)布時間:2025/3/11 数据库 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 日志文件和mysql同步到kafka_logstash_output_kafka:Mysql 同步 Kafka 深入详解 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

0、題記

實際業(yè)務(wù)場景中,會遇到基礎(chǔ)數(shù)據(jù)存在 Mysql 中,實時寫入數(shù)據(jù)量比較大的情景。遷移至kafka是一種比較好的業(yè)務(wù)選型方案。

而mysql寫入kafka的選型方案有:

方案一:logstash_output_kafka 插件。

方案二:kafka_connector。

方案三:debezium 插件。

方案四:flume。

方案五:其他類似方案。

其中:debezium和flume是基于 mysql binlog 實現(xiàn)的。

如果需要同步歷史全量數(shù)據(jù)+實時更新數(shù)據(jù),建議使用logstash。

1、logstash同步原理

常用的logstash的插件是:logstash_input_jdbc實現(xiàn)關(guān)系型 數(shù)據(jù)庫 到Elasticsearch等的同步。

實際上, 核心logstash的同步原理的掌握 ,有助于大家理解類似的各種庫之間的同步。

logstash 核心原理 :輸入生成事件,過濾器修改它們,輸出將它們發(fā)送到其他地方。

logstash核心三部分組成:input、filter、output。

input?{?}

filter?{?}

output?{?}

1.1 input輸入

包含但遠不限于:

jdbc:關(guān)系型數(shù)據(jù)庫:mysql、 oracle 等。

file:從文件系統(tǒng)上的文件讀取。

syslog:在已知端口514上偵聽syslog消息。

redis:redis消息。beats:處理 Beats發(fā)送的事件。

kafka:kafka實時數(shù)據(jù)流。

1.2 filter過濾器

過濾器是Logstash管道中的中間處理設(shè)備。您可以將過濾器與條件組合,以便在事件滿足特定條件時對其執(zhí)行操作。

可以把它比作數(shù)據(jù)處理的 ETL 環(huán)節(jié)。

一些有用的過濾包括:

grok:解析并構(gòu)造任意文本。 Grok是目前Logstash中將非結(jié)構(gòu)化日志數(shù)據(jù)解析為結(jié)構(gòu)化和可查詢內(nèi)容的最佳方式 。有了內(nèi)置于Logstash的120種模式,您很可能會找到滿足您需求的模式!

mutate:對事件字段執(zhí)行常規(guī)轉(zhuǎn)換。您可以重命名,刪除,替換和修改事件中的字段。

drop:完全刪除事件,例如調(diào)試事件。

clone:制作事件的副本,可能添加或刪除字段。

geoip:添加有關(guān)IP地址的地理位置的信息。

1.3 output輸出

輸出是Logstash管道的最后階段。一些常用的輸出包括:

elasticsearch:將事件數(shù)據(jù)發(fā)送到Elasticsearch。

file:將事件數(shù)據(jù)寫入磁盤上的文件。

kafka:將事件寫入Kafka。

詳細的filter demo參考:http://t.cn/EaAt4zP

2、同步Mysql到kafka配置參考

input?{

jdbc?{

jdbc_connection_string?=>?"jdbc:mysql://192.168.1.12:3306/news_base"

jdbc_user?=>?"root"

jdbc_password?=>?"xxxxxxx"

jdbc_driver_library?=>?"/home/logstash-6.4.0/lib/mysql-connector-java-5.1.47.jar"

jdbc_driver_class?=>?"com.mysql.jdbc.Driver"

#schedule?=>?"*?*?*?*?*"

statement?=>?"SELECT?*?from?news_info?WHERE?id?>?:sql_last_value??order?by?id"

use_column_value?=>?true

tracking_column?=>?"id"???????? ??????tracking_column_type?=>?"numeric"

record_last_run?=>?true

last_run_metadata_path?=>?"/home/logstash-6.4.0/sync_data/news_last_run"???? ????}

}

filter?{

ruby{

code?=>?"event.set('gather_time_unix',event.get('gather_time').to_i*1000)"

}

ruby{

code?=>?"event.set('publish_time_unix',event.get('publish_time').to_i*1000)"

}

mutate?{

remove_field?=>?[?"@version"?]

remove_field?=>?[?"@timestamp"?]

remove_field?=>?[?"gather_time"?]

remove_field?=>?[?"publish_time"?]

}

}

output?{

kafka?{

bootstrap_servers?=>?"192.168.1.13:9092"

codec?=>?json_lines

topic_id?=>?"mytopic"

}

file?{

codec?=>?json_lines

path?=>?"/tmp/output_a.log"

}

}

以上內(nèi)容不復(fù)雜,不做細講。

注意:

Mysql借助logstash同步后,日期類型格式:“2019-04-20 13:55:53”已經(jīng)被識別為日期格式。

code => "event.set('gather_time_unix',event.get('gather_time').to_i*1000)",

是將Mysql中的時間格式轉(zhuǎn)化為時間戳格式。

3、坑總結(jié)

3.1 坑1字段大小寫問題

from星友:使用logstash同步mysql數(shù)據(jù)的,因為在jdbc.conf里面沒有添加 lowercase_column_names

=> "false" ?這個屬性,所以logstash默認把查詢結(jié)果的列明改為了小寫,同步進了es,所以就導(dǎo)致es里面看到的字段名稱全是小寫。

最后總結(jié):es是支持大寫字段名稱的,問題出在logstash沒用好,需要在同步配置中加上 lowercase_column_names => "false" ?。記錄下來希望可以幫到更多人。

3.2 同步到ES中的數(shù)據(jù)會不會重復(fù)?

想將關(guān)系數(shù)據(jù)庫的數(shù)據(jù)同步至ES中,如果在集群的多臺 服務(wù)器 上同時啟動logstash。

解讀:實際項目中就是沒用隨機id ?使用指定id作為es的_id ,指定id可以是url的md5.這樣相同數(shù)據(jù)就會走更新覆蓋以前數(shù)據(jù)

3.3 相同配置logstash,升級6.3之后不能同步數(shù)據(jù)。

解讀:高版本基于時間增量有優(yōu)化。

tracking_column_type => "timestamp" 應(yīng)該是需要指定標(biāo)識為時間類型,默認為數(shù)字類型numeric

3.4 ETL字段統(tǒng)一在哪處理?

解讀:可以logstash同步mysql的時候sql查詢階段處理,如: select a_value as avalue*** 。

或者filter階段處理, mutate rename 處理。

mutate?{

rename?=>?["shortHostname",?"hostname"?]

}

或者kafka階段借助kafka stream處理。

4、小結(jié)

相關(guān)配置和同步都不復(fù)雜,復(fù)雜點往往在于filter階段的解析還有l(wèi)ogstash性能問題。

需要結(jié)合實際業(yè)務(wù)場景做深入的研究和性能分析。

有問題,歡迎留言討論。

推薦閱讀:

3、 一張圖理清楚關(guān)系型數(shù)據(jù)庫與Elasticsearch同步?http://t.cn/EaAceD3

4、新的實現(xiàn):http://t.cn/EaAt60O

5、mysql2mysql:?http://t.cn/EaAtK7r 6、推薦開源實現(xiàn): http://t.cn/EaAtjqN

加入星球,更短時間更快習(xí)得更多干貨!

創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅持創(chuàng)作打卡瓜分現(xiàn)金大獎

總結(jié)

以上是生活随笔為你收集整理的日志文件和mysql同步到kafka_logstash_output_kafka:Mysql 同步 Kafka 深入详解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。