日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

mysql增量同步kafka_MySQL数据实时增量同步到Kafka - Flume

發布時間:2024/10/8 数据库 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 mysql增量同步kafka_MySQL数据实时增量同步到Kafka - Flume 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

寫在前面的話

需求,將MySQL里的數據實時增量同步到Kafka。接到活兒的時候,第一個想法就是通過讀取MySQL的binlog日志,將數據寫到Kafka。不過對比了一些工具,例如:Canel,Databus,Puma等,這些都是需要部署server和client的。其中server端是由這些工具實現,配置了就可以讀binlog,而client端是需要我們動手編寫程序的,遠沒有達到我即插即用的期望和懶人的標準。

同步的格式

原作者的插件flume-ng-sql-source只支持csv的格式,如果開始同步之后,數據庫表需要增減字段,則會給開發者造成很大的困擾。所以我添加了一個分支版本,用來將數據以JSON的格式,同步到kafka,字段語義更加清晰。

將此jar包下載之后,和相應的數據庫驅動包,一起放到flume的lib目錄之下即可。

處理機制

flume-ng-sql-source在【status.file.name】文件中記錄讀取數據庫表的偏移量,進程重啟后,可以接著上次的進度,繼續增量讀表。

啟動說明

說明:啟動命令里的【YYYYMM=201711】,會傳入到flume.properties里面,替換${YYYYMM}

[test@localhost?~]$?YYYYMM=201711?bin/flume-ng?agent?-c?conf?-f?conf/flume.properties?-n?sync?&

-c:表示配置文件的目錄,在此我們配置了flume-env.sh,也在conf目錄下;

-f:指定配置文件,這個配置文件必須在全局選項的--conf參數定義的目錄下,就是說這個配置文件要在前面配置的conf目錄下面;

-n:表示要啟動的agent的名稱,也就是我們flume.properties配置文件里面,配置項的前綴,這里我們配的前綴是【sync】;

flume的配置說明

flume-env.sh

#?配置JVM堆內存和java運行參數,配置-DpropertiesImplementation參數是為了在flume.properties配置文件中使用環境變量

export?JAVA_OPTS="-Xms512m?-Xmx512m?-Dcom.sun.management.jmxremote?-DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties"

flume.properties

#?數據來源

sync.sources?=?s-1

#?數據通道

sync.channels?=?c-1

#?數據去處,這里配置了failover,根據下面的優先級配置,會先啟用k-1,k-1掛了后再啟用k-2

sync.sinks?=?k-1?k-2

#這個是配置failover的關鍵,需要有一個sink?group

sync.sinkgroups?=?g-1

sync.sinkgroups.g-1.sinks?=?k-1?k-2

#處理的類型是failover

sync.sinkgroups.g-1.processor.type?=?failover

#優先級,數字越大優先級越高,每個sink的優先級必須不相同

sync.sinkgroups.g-1.processor.priority.k-1?=?5

sync.sinkgroups.g-1.processor.priority.k-2?=?10

#設置為10秒,當然可以根據你的實際狀況更改成更快或者很慢

sync.sinkgroups.g-1.processor.maxpenalty?=?10000

##########?數據通道的定義

#?數據量不大,直接放內存。其實還可以放在JDBC,kafka或者磁盤文件等

sync.channels.c-1.type?=?memory

#?通道隊列的最大長度

sync.channels.c-1.capacity?=?100000

#?putList和takeList隊列的最大長度,sink從capacity中抓取batchsize個event,放到這個隊列。所以此參數最好比capacity小,比sink的batchsize大。

#?官方定義:The?maximum?number?of?events?the?channel?will?take?from?a?source?or?give?to?a?sink?per?transaction.

sync.channels.c-1.transactionCapacity?=?1000

sync.channels.c-1.byteCapacityBufferPercentage?=?20

###?默認值的默認值等于JVM可用的最大內存的80%,可以不配置

#?sync.channels.c-1.byteCapacity?=?800000

#########sql?source#################

#?source?s-1用到的通道,和sink的通道要保持一致,否則就GG了

sync.sources.s-1.channels=c-1

#########?For?each?one?of?the?sources,?the?type?is?defined

sync.sources.s-1.type?=?org.keedio.flume.source.SQLSource

sync.sources.s-1.hibernate.connection.url?=?jdbc:mysql://192.168.1.10/testdb?useSSL=false

#########?Hibernate?Database?connection?properties

sync.sources.s-1.hibernate.connection.user?=?test

sync.sources.s-1.hibernate.connection.password?=?123456

sync.sources.s-1.hibernate.connection.autocommit?=?true

sync.sources.s-1.hibernate.dialect?=?org.hibernate.dialect.MySQL5Dialect

sync.sources.s-1.hibernate.connection.driver_class?=?com.mysql.jdbc.Driver

sync.sources.s-1.run.query.delay=10000

sync.sources.s-1.status.file.path?=?/home/test/apache-flume-1.8.0-bin/status

#?用上${YYYYMM}環境變量,是因為我用的測試表示一個月表,每個月的數據會放到相應的表里。使用方式見上面的啟動說明

sync.sources.s-1.status.file.name?=?test_${YYYYMM}.status

########?Custom?query

sync.sources.s-1.start.from?=?0

sync.sources.s-1.custom.query?=?select?*?from?t_test_${YYYYMM}?where?id?>?$@$?order?by?id?asc

sync.sources.s-1.batch.size?=?100

sync.sources.s-1.max.rows?=?100

sync.sources.s-1.hibernate.connection.provider_class?=?org.hibernate.connection.C3P0ConnectionProvider

sync.sources.s-1.hibernate.c3p0.min_size=5

sync.sources.s-1.hibernate.c3p0.max_size=20

#########?sinks?1

#?sink?k-1用到的通道,和source的通道要保持一致,否則取不到數據

sync.sinks.k-1.channel?=?c-1

sync.sinks.k-1.type?=?org.apache.flume.sink.kafka.KafkaSink

sync.sinks.k-1.kafka.topic?=?sync-test

sync.sinks.k-1.kafka.bootstrap.servers?=?localhost:9092

sync.sinks.k-1.kafka.producer.acks?=?1

#?每批次處理的event數量

sync.sinks.k-1.kafka.flumeBatchSize??=?100

#########?sinks?2

#?sink?k-2用到的通道,和source的通道要保持一致,否則取不到數據

sync.sinks.k-2.channel?=?c-1

sync.sinks.k-2.type?=?org.apache.flume.sink.kafka.KafkaSink

sync.sinks.k-2.kafka.topic?=?sync-test

sync.sinks.k-2.kafka.bootstrap.servers?=?localhost:9092

sync.sinks.k-2.kafka.producer.acks?=?1

sync.sinks.k-2.kafka.flumeBatchSize??=?100

flume各部分參數含義

batchData的大小見參數:batchSize

PutList和TakeList的大小見參數:transactionCapactiy

Channel總容量大小見參數:capacity

問題記錄

異常:Exception in thread "PollableSourceRunner-SQLSource-src-1" java.lang.AbstractMethodError: org.keedio.flume.source.SQLSource.getMaxBackOffSleepInterval()J

分析:由于我用的是flume1.8,而flume-ng-sql-1.4.3插件對應的flume-ng-core版本是1.5.2,1.8版本里的PollableSource接口多了兩個方法 getBackOffSleepIncrement(); getMaxBackOffSleepInterval();在失敗補償暫停線程處理時,需要用到這個方法。

解決方法:更新flume-ng-sql-1.4.3里依賴的flume-ng-core版本為1.8.0,并在源代碼【SQLSource.java】里添加這兩個方法即可。

@Override

public?long?getBackOffSleepIncrement()?{

return?1000;

}

@Override

public?long?getMaxBackOffSleepInterval()?{

return?5000;

}

總結

以上是生活随笔為你收集整理的mysql增量同步kafka_MySQL数据实时增量同步到Kafka - Flume的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 国产性生活视频 | 爱爱爱爱网| 少妇粉嫩小泬白浆流出 | 日本不卡一区二区三区在线观看 | 午夜在线免费观看 | 免费在线观看av片 | 少妇人妻真实偷人精品视频 | 欧美性受xxx黑人xyx性爽 | 麻豆黄色片 | 成年人免费看视频 | 最好看的2019中文大全在线观看 | 久久新视频 | 91在线网址 | 欧美精品hd | 黄色在线视频网站 | 波多野结衣家庭主妇 | 99国产在线观看 | 重口另类 | 9l视频自拍九色9l视频成人 | 久久精品视频免费播放 | 香港黄色网址 | 熟女少妇精品一区二区 | 好吊视频一二三区 | 黄色网址在线免费看 | 国产精彩视频在线观看 | 九九久久综合 | 久久这里只有精品23 | 成人欧美一区二区 | 久久发布国产伦子伦精品 | 亚洲综合视频网站 | 久久久精品一区二区涩爱 | 欧美成人午夜精品免费 | 欧美亚色| 亚洲AV午夜成人片 | 日韩毛片在线播放 | 国产一区二区三区在线视频 | 九九热久久免费视频 | 久久久久亚洲av成人毛片韩 | 国产成人精品一区二区三 | 性激烈视频在线观看 | 免费久久久久 | 婷婷天天| 日本视频www色 | 色网在线免费观看 | 久章草影院 | 国产人妻人伦精品1国产丝袜 | 天堂av2024| 污视频免费在线观看 | 欧美怡红院视频 | 日本大尺度吃奶做爰视频 | 欧美成人自拍视频 | 免费一级欧美 | 好色先生视频污 | 91岛国| 激情内射人妻1区2区3区 | 四色在线| jiuse九色 | 日韩毛片免费观看 | 国产一在线观看 | 午夜激情在线 | 免费观看已满十八岁 | 视频一区二区在线播放 | 亚洲激情综合网 | 亚洲图片一区 | 久久高清内射无套 | 亚洲欧美综合一区二区 | 国产欧美日韩高清 | 日韩在线视频精品 | 亚洲国产精选 | a网址| 黄色av网站在线免费观看 | 欧美视频在线一区 | 九色福利视频 | 丰满少妇aaaaaa爰片毛片 | 高清国产在线 | 538任你躁在线精品免费 | 色婷婷热久久 | 成人久久视频 | 欧美日韩一区电影 | 中文字幕亚洲欧美 | 深夜视频在线看 | 午夜日韩精品 | 精品国产96亚洲一区二区三区 | 国产精品福利电影 | 日韩欧美高清在线观看 | 新亚洲天堂| 欧美男女激情 | 黄色在线免费看 | 波多野结衣高清在线 | 国产成人区 | 乳色吐息在线观看 | av在线播放器 | 国产欧美在线精品日韩 | 国产中文字幕在线播放 | 亚洲精品中文字幕在线播放 | 色哟哟中文字幕 | 97麻豆| 免费色站| 亚洲成熟丰满熟妇高潮xxxxx |