kafka 重新分配节点_Kafka扩容节点和分区迁移
背景
最近工作中碰到Kafka 節(jié)點(diǎn)的網(wǎng)卡成為了性能瓶頸,為了提高整個(gè)消息隊(duì)列的輸出吞吐量,需要將數(shù)據(jù)量大的Topic 遷移到新的Kafka節(jié)點(diǎn)上。
操作步驟
1. 新建Kafka 節(jié)點(diǎn)
通過CDH 管理界面在新機(jī)器上安裝Kafka 服務(wù),并得到相應(yīng)的Kafka broker id。(假設(shè)為140, 141)
2. 創(chuàng)建要遷移的Topic 列表
查看所有的Topic
$ cd /opt/cloudera/parcels/KAFKA/bin
$ ./kafka-topics --describe --zookeeper 10.1.1.50:2181/kafka
如果要?jiǎng)h除某些不用的Topic,可運(yùn)行命令
$ ./kafka-run-class kafka.admin.TopicCommand --delete --topic test_p1_r1 --zookeeper 10.1.1.50:2181/kafka
新建文件topics-to-move.json,包含要遷移到Topic 列表。這里只遷移了一個(gè)Topic,也可以是多個(gè)Topic。
{
"topics": [{"topic": "sdk_counters"}],
"version": 1
}
3. 生成Topic 分區(qū)分配表
使用kafka-reassign-partitions 工具生成分區(qū)分配表,其中需要指定topics-to-move.json 文件和遷移目標(biāo)節(jié)點(diǎn)的broker id
$ ./kafka-reassign-partitions --zookeeper 10.1.1.50:2181/kafka --topics-to-move-json-file ~/kafka/topics-to-move.json --broker-list "140,141" --generate
將生成以下結(jié)果
Current partition replica assignment
{"version":1,"partitions":[{"topic":"sdk_counters","partition":1,"replicas":[61,62]},{"topic":"sdk_counters","partition":0,"replicas":[62,61]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"sdk_counters","partition":1,"replicas":[140,141]},{"topic":"sdk_counters","partition":0,"replicas":[141,140]}]}
將Current partition replica assignment 的內(nèi)容保存到rollback-cluster-reassignment.json,用于回滾操作。將Proposed partition reassignment configuration 的內(nèi)容保存到expand-cluster-reassignment.json,用于執(zhí)行遷移操作。
在這里也可以手工編輯expand-cluster-reassignment.json 文件更改replica 和partition 配置。也可以在遷移之前更改Topic 的分區(qū)數(shù) (6)。
$ ./kafka-topics --zookeeper 10.1.1.50:2181/kafka --alter --topic sdk_counters --partitions 6
4. 執(zhí)行遷移操作
$ ./kafka-reassign-partitions --zookeeper 10.1.1.50:2181/kafka --reassignment-json-file ~/kafka/expand-cluster-reassignment.json --execute
遷移操作會(huì)將指定Topic 的數(shù)據(jù)文件移動(dòng)到新的節(jié)點(diǎn)目錄下,這個(gè)過程可能需要等待很長(zhǎng)時(shí)間,視Topic 的數(shù)據(jù)量而定??梢赃\(yùn)行以下命令查看執(zhí)行狀態(tài)。
$ ./kafka-reassign-partitions --zookeeper 10.1.1.50:2181/kafka --reassignment-json-file ~/kafka/expand-cluster-reassignment.json --verify
狀態(tài)有兩種,in progress 表示正在遷移,completed successlly 表示已經(jīng)成功完成遷移。在此過程中,可以在各個(gè)Kafka 的節(jié)點(diǎn)上使用iftop 工具實(shí)時(shí)監(jiān)控網(wǎng)絡(luò)帶寬。可以觀察到遷移的source 節(jié)點(diǎn)使用了大量的輸出帶寬,遷移的target 節(jié)點(diǎn)使用了大量的輸入帶寬。由于在遷移過程中,會(huì)占用大量的網(wǎng)卡帶寬進(jìn)行數(shù)據(jù)傳輸,可能會(huì)影響到其他Topic 和應(yīng)用程序的帶寬使用。
遷移完成后,原先的節(jié)點(diǎn)下將不存在該Topic 的數(shù)據(jù)文件。
優(yōu)化
減少遷移的數(shù)據(jù)量
如果要遷移的Topic 有大量數(shù)據(jù)(Topic 默認(rèn)保留1天的數(shù)據(jù)),可以在遷移之前臨時(shí)動(dòng)態(tài)地調(diào)整retention.ms 來減少數(shù)據(jù)量,Kafka 會(huì)主動(dòng)purge 掉1個(gè)小時(shí)之前的數(shù)據(jù)。
$ ./kafka-topics --zookeeper 10.1.1.50:2181/kafka --alter --topic sdk_counters --config retention.ms=3600000
在遷移完成后,恢復(fù)原先設(shè)置
$ ./kafka-topics --zookeeper 10.1.1.50:2181/kafka --alter --topic sdk_counters --config retention.ms=86400000
在遷移過程中,不會(huì)影響應(yīng)用程序?qū)慘afka,在遷移完成后需要查看應(yīng)用程序是否運(yùn)行正常。
在已有的Topic 上增加分區(qū)
如果使用kafka-topics 動(dòng)態(tài)地增加partition 數(shù)目,則新增的partition 可能會(huì)出現(xiàn)在遷移之前的機(jī)器上。這時(shí)可以使用kafka-reassign-partitions 工具并手動(dòng)更改分區(qū)分配表以保證所有的分區(qū)都在遷移后的機(jī)器上。注意要保持舊的分區(qū)中的節(jié)點(diǎn)分配和replica 和之前相同,否則會(huì)導(dǎo)致Kafka 對(duì)舊分區(qū)的重新遷移,增加了遷移時(shí)間,并且可能導(dǎo)致正在運(yùn)行的程序因?yàn)榉謪^(qū)失效而出錯(cuò)。
重新指定partition leader
有時(shí)候由于節(jié)點(diǎn)down 了,partition 的leader 可能不是我們prefer 的,這時(shí),可以通過kafka-preferred-replica-election 工具將replica 中的第一個(gè)節(jié)點(diǎn)作為該分區(qū)的leader。
手動(dòng)編輯topicPartitionList.json 文件,指定要重新分配leader 的分區(qū)。
{"partitions":[{"topic":"sdk_counters","partition":5}]}
執(zhí)行命令
$ ./kafka-preferred-replica-election --zookeeper 10.1.1.50:2181/kafka -path-to-json-file ~/kafka/topicPartitionList.json
中斷遷移任務(wù)
一旦啟動(dòng)reassign 腳本,則無法停止遷移任務(wù)。如果需要強(qiáng)制停止,可以通過zookeeper 進(jìn)行修改。
$ zookeeper-client -server 10.1.1.50:2181/kafka
[zk] delete /admin/reassign_partitions
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)總結(jié)
以上是生活随笔為你收集整理的kafka 重新分配节点_Kafka扩容节点和分区迁移的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 打开python环境_windows下切
- 下一篇: gsonformat插件_裂墙推荐!In