日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

原理+实践,Kafka MirrorMaker使用与性能调优全解析

發布時間:2025/4/5 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 原理+实践,Kafka MirrorMaker使用与性能调优全解析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

http://blog.csdn.net/zhanyuanlin/article/details/76695481

Kakfa MirrorMaker是Kafka 官方提供的跨數據中心的流數據同步方案。其實現原理,其實就是通過從Source Cluster消費消息然后將消息生產到Target Cluster,即普通的消息生產和消費。用戶只要通過簡單的consumer配置和producer配置,然后啟動Mirror,就可以實現準實時的數據同步。

1. Kafka MirrorMaker基本特性

Kafka Mirror的基本特性有:

  • 在Target Cluster沒有對應的Topic的時候,Kafka MirrorMaker會自動為我們在Target Cluster上創建一個一模一樣(Topic Name、分區數量、副本數量)一模一樣的topic。如果Target Cluster存在相同的Topic則不進行創建,并且,MirrorMaker運行Source Cluster和Target Cluster的Topic的分區數量和副本數量不同。
  • 同我們使用Kafka API創建KafkaConsumer一樣,Kafka MirrorMaker允許我們指定多個Topic。比如,TopicA|TopicB|TopicC。在這里,|其實是正則匹配符,MirrorMaker也兼容使用逗號進行分隔。
  • 多線程支持。MirrorMaker會在每一個線程上創建一個Consumer對象,如果性能允許,建議多創建一些線程
  • 多進程任意橫向擴展,前提是這些進程的consumerGroup相同。無論是多進程還是多線程,都是由Kafka ConsumerGroup的設計帶來的任意橫向擴展性,具體的分區分派,即具體的TopicPartition會分派給Group中的哪個Topic負責,是Kafka自動完成的,Consumer無需關心。?
    我們使用Kafka MirrorMaker完成遠程的AWS(Source Cluster)上的Kafka信息同步到公司的計算集群(Target Cluster)。由于我們的大數據集群只有一個統一的出口IP,因此,Kafka MirrorMaker部署在本地(Target Cluster),它負責從遠程的Source Cluster上的AWS Kafka 上拉取數據,然后生產到本地的Kafka。
  • Kafka MirrorMaker的官方文檔一直沒有更新,因此新版Kafka為MirrorMaker增加的一些參數、特性等在文檔上往往找不到,需要看Kafka MirrorMaker的源碼。Kafka MirrorMaker的主類位于kafka.tools.MirrorMaker,尤其是一些參數的解析邏輯和主要的執行流程,會比較有助于我們理解和運維Kafka MirrorMaker。


    2. 新舊Consumer API的使用問題

    從Kafka 0.9版本開始引入了new consumer API。相比于普通的old consumer api,new Conumser API有以下主要改變:

    • 統一了舊版本的High-Level和Low-Level Consumer API;-
    • new consumer API消除了對zookeeper的依賴,修改了ConsumerGroup的管理等等協議
    • new Consumer API完全使用Java實現,不再依賴Scala環境
    • 更好了安全認證只在new consumer中實現,在old consumer中沒有。

    Kakfa MirrorMaker同時提供了對新舊版本的Consumer API的支持。?
    默認是舊版API,當添加–new.consumer,MirrorMaker將使用新的Consumer進行消息消費:

    // Create consumersval mirrorMakerConsumers = if (!useNewConsumer) {//如果用戶沒有配置使用new consumer,則使用舊的consumerval customRebalanceListener = { val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt) if (customRebalanceListenerClass != null) { val rebalanceListenerArgs = options.valueOf(rebalanceListenerArgsOpt) if (rebalanceListenerArgs != null) { Some(CoreUtils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass, rebalanceListenerArgs)) } else { Some(CoreUtils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass)) } } else { None } } if (customRebalanceListener.exists(!_.isInstanceOf[ConsumerRebalanceListener])) throw new IllegalArgumentException("The rebalance listener should be an instance of kafka.consumer.ConsumerRebalanceListener") createOldConsumers(//創建舊的consumer numStreams, options.valueOf(consumerConfigOpt), customRebalanceListener, Option(options.valueOf(whitelistOpt)), Option(options.valueOf(blacklistOpt))) } else {//用戶指定使用new consumer val customRebalanceListener = { val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt) if (customRebalanceListenerClass != null) { val rebalanceListenerArgs = options.valueOf(rebalanceListenerArgsOpt) if (rebalanceListenerArgs != null) { Some(CoreUtils.createObject[org.apache.kafka.clients.consumer.ConsumerRebalanceListener](customRebalanceListenerClass, rebalanceListenerArgs)) } else { Some(CoreUtils.createObject[org.apache.kafka.clients.consumer.ConsumerRebalanceListener](customRebalanceListenerClass)) } } else { None } } if (customRebalanceListener.exists(!_.isInstanceOf[org.apache.kafka.clients.consumer.ConsumerRebalanceListener])) throw new IllegalArgumentException("The rebalance listener should be an instance of" + "org.apache.kafka.clients.consumer.ConsumerRebalanceListner") createNewConsumers(//創建new consumer numStreams, options.valueOf(consumerConfigOpt), customRebalanceListener, Option(options.valueOf(whitelistOpt))) }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    這是我啟動Kakfa MirrorMaker 的命令:

    nohup ./bin/kafka-mirror-maker.sh --new.consumer --consumer.config config/mirror-consumer.properties --num.streams 40 --producer.config config/mirror-producer.properties --whitelist 'ABTestMsg|AppColdStartMsg|BackPayMsg|WebMsg|GoldOpenMsg|BoCaiMsg' &
    • 1

    mirror-consumer.properties配置文件如下:

    #新版consumer擯棄了對zookeeper的依賴,使用bootstrap.servers告訴consumer kafka server的位置 bootstrap.servers=ip-188-33-33-31.eu-central-1.compute.internal:9092,ip-188-33-33-32.eu-central-1.compute.internal:9092,ip-188-33-33-33.eu-central-1.compute.internal:9092 #如果使用舊版Consumer,則使用zookeeper.connect #zookeeper.connect=ip-188-33-33-31.eu-central-1.compute.internal:2181,ip-188-33-33-32.eu-central-1.compute.internal:2181,ip-188-33-33-33.eu-central-1.compute.internal:2181 1.compute.internal:2181 #change the default 40000 to 50000 request.timeout.ms=50000 #hange default heartbeat interval from 3000 to 15000 heartbeat.interval.ms=30000 #change default session timeout from 30000 to 40000 session.timeout.ms=40000 #consumer group id group.id=africaBetMirrorGroupTest partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor #restrict the max poll records from 2147483647 to 200000 max.poll.records=20000 #set receive buffer from default 64kB to 512kb receive.buffer.bytes=524288 #set max amount of data per partition to override default 1048576 max.partition.fetch.bytes=5248576 #consumer timeout #consumer.timeout.ms=5000
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    mirror-producer.properties的配置文件如下:

    bootstrap.servers=10.120.241.146:9092,10.120.241.82:9092,10.120.241.110:9092 # name of the partitioner class for partitioning events; default partition spreads data randomly #partitioner.class= # specifies whether the messages are sent asynchronously (async) or synchronously (sync) producer.type=sync # specify the compression codec for all data generated: none, gzip, snappy, lz4. # the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy, lz4, respectively compression.codec=none # message encoder serializer.class=kafka.serializer.DefaultEncoder
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    同時,我使用kafka-consumer-groups.sh循環監控消費延遲:

    bin/kafka-consumer-groups.sh --bootstrap-server ip-188-33-33-31.eu-central-1.compute.internal:9092,ip-188-33-33-32.eu-central-1.compute.internal:9092,ip-188-33-33-33.eu-central-1.compute.internal:9092 --describe --group africaBetMirrorGroupTest --new-consumer
    • 1

    當我們使用new KafkaConsumer進行消息消費,要想通過kafka-consumer-groups.sh獲取整個group的offset、lag延遲信息,也必須加上–new-consumer,告知kafka-consumer-groups.sh,這個group的消費者使用的是new kafka consumer,即group中所有consumer的信息保存在了Kafka上的一個名字叫做__consumer_offsets的特殊topic上,而不是保存在zookeeper上。我在使用kafka-consumer-groups.sh的時候就不知道還需要添加–new-consumer,結果我啟動了MirrorMaker以后,感覺消息在消費,但是就是在zookeeper的/consumer/ids/上找不到group的任何信息。后來在stack overflow上問了別人才知道。

    3. 負載不均衡原因診斷以及問題解決

    在我的另外一篇博客《Kafka為Consumer分派分區:RangeAssignor和RoundRobinAssignor》中,介紹了Kafka內置的分區分派策略:RangeAssignor和RoundRobinAssignor。由于RangeAssignor是早期版本的Kafka的唯一的分區分派策略,因此,默認不配置的情況下,Kafka使用RangeAssignor進行分區分派,但是,在MirrorMaker的使用場景下,RoundRobinAssignor更有利于均勻的分區分派。甚至在KAFKA-3831中有人建議直接將MirrorMaker的默認分區分派策略改為RoundRobinAssignor。那么,它們到底有什么區別呢?我們先來看兩種策略下的分區分派結果。在我的實驗場景下,有6個topic:ABTestMsg|AppColdStartMsg|BackPayMsg|WebMsg|GoldOpenMsg|BoCaiMsg,每個topic有兩個分區。由于MirrorMaker所在的服務器性能良好,我設置--num.streams 40,即單臺MirrorMaker會用40個線程,創建40個獨立的Consumer進行消息消費,兩個MirrorMaker加起來80個線程,80個并行Consumer。由于總共只有6 * 2=12個TopicPartition,因此最多也只有12個Consumer會被分派到分區,其余Consumer空閑。?
    我們來看基于RangeAssignor分派策略的kafka-consumer-groups.sh 的結果:

    TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID ABTestMsg 0 780000 820038 49938 africaBetMirrorGroupTest-4-cf330e66-1319-4925-9605-46545df13453/114.113.198.126 africaBetMirrorGroupTest-0 ABTestMsg 1 774988 820038 55000 africaBetMirrorGroupTest-19-c77523e7-7b87-472b-9a26-cd902888944d/114.113.198.126 africaBetMirrorGroupTest-1 AppColdStartMsg 0 774000 820039 55938 africaBetMirrorGroupTest-19-674d8ad4-39d2-40cc-ae97-f4be9c1bb154/114.113.198.126 africaBetMirrorGroupTest-0 AppColdStartMsg 1 774100 820045 56038 africaBetMirrorGroupTest-15-91c67bf8-0c1c-42ac-97f0-5369794c2d1b/114.113.198.126 africaBetMirrorGroupTest-1 BackPayMsg 0 780000 820038 49938 africaBetMirrorGroupTest-4-cf330e66-1319-4925-9605-46545df13453/114.113.198.126 africaBetMirrorGroupTest-0 BackPayMsg 1 774988 820038 55000 africaBetMirrorGroupTest-19-c77523e7-7b87-472b-9a26-cd902888944d/114.113.198.126 africaBetMirrorGroupTest-1 WebMsg 0 774000 820039 55938 africaBetMirrorGroupTest-19-674d8ad4-39d2-40cc-ae97-f4be9c1bb154/114.113.198.126 africaBetMirrorGroupTest-0 WebMsg 1 774100 820045 56038 africaBetMirrorGroupTest-15-91c67bf8-0c1c-42ac-97f0-5369794c2d1b/114.113.198.126 africaBetMirrorGroupTest-1 GoldOpenMsg 0 780000 820038 49938 africaBetMirrorGroupTest-4-cf330e66-1319-4925-9605-46545df13453/114.113.198.126 africaBetMirrorGroupTest-0 GoldOpenMsg 1 774988 820038 55000 africaBetMirrorGroupTest-19-c77523e7-7b87-472b-9a26-cd902888944d/114.113.198.126 africaBetMirrorGroupTest-1 BoCaiMsg 0 774000 820039 55938 africaBetMirrorGroupTest-19-674d8ad4-39d2-40cc-ae97-f4be9c1bb154/114.113.198.126 africaBetMirrorGroupTest-0 BoCaiMsg 1 774100 820045 56038 africaBetMirrorGroupTest-15-91c67bf8-0c1c-42ac-97f0-5369794c2d1b/114.113.198.126 africaBetMirrorGroupTest-1 - - - - - africaBetMirrorGroupTest-

    轉載于:https://www.cnblogs.com/davidwang456/articles/8360067.html

    總結

    以上是生活随笔為你收集整理的原理+实践,Kafka MirrorMaker使用与性能调优全解析的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。