kafka创建topic命令_0748-5.14.4-Kafka的扩容和缩容
?文檔編寫目的
在Kafka集群資源使用已超出系統配置的資源時,或者有大量資源閑置造成資源浪費的時候,需要分別通過擴容Kafka和縮容Kafka來進行調整。本篇文章Fayson主要介紹如何進行Kafka的擴容和縮容,以及變更后的Kafka集群如何進行負載均衡的操作。
- 測試環境:
1.Redhat7.2
2.采用root用戶操作
3.CM為5.16.2,CDH為5.14.4
4.Kafka版本為0.10.2
5.集群啟用了Kerberos,Kafka未啟用Kerberos和Sentry
Kafka集群的擴容
2.1 當前Kafka集群狀態
集群中有3個kafka broker
有2個topic,分別為test和test1,情況如下
2.2 擴容前準備
新擴容的機器要先加入集群中,通過CM管理,按照下面的步驟進行操作
1.修改新添加的機器的hostname
[root@hadoop6?~]#?hostnamectl?set-hostname?cdh04.hadoop.com2.修改/etc/hosts文件并同步到所有節點,這里用腳本來實現。
192.168.0.204?cdh01.hadoop.com?cdh01192.168.0.205?cdh02.hadoop.com?cdh02192.168.0.206?cdh03.hadoop.com?cdh03192.168.0.195?cdh04.hadoop.com?cdh043.新添加的節點關閉防火墻,設置開機自動關閉
[root@cdh04?~]#?systemctl?stop?firewalld[root@cdh04?~]#?systemctl?disable?firewalld4.新添加的節點禁用SELinux,并修改修改/etc/selinux/config
[root@cdh04?~]#?setenforce?0setenforce:?SELinux?is?disabled[root@cdh04?~]#?vim?/etc/selinux/config5.新添加的節點關閉透明大頁面,并且在/etc/rc.d/rc.local 里面加入腳本,設置自動開機關閉。
[root@cdh04?~]#?echo?never?>?/sys/kernel/mm/transparent_hugepage/defrag[root@cdh04?~]#?echo?never?>?/sys/kernel/mm/transparent_hugepage/enabled[root@cdh04?~]#?vim?/etc/rc.d/rc.localif?test?-f?/sys/kernel/mm/transparent_hugepage/defragthen?echo?never?>?/sys/kernel/mm/transparent_hugepage/defragfiif?test?-f?/sys/kernel/mm/transparent_hugepage/enabledthen?echo?never?>?/sys/kernel/mm/transparent_hugepage/enabledfi[root@cdh04?~]#?chmod?+x?/etc/rc.d/rc.local6.新添加的節點設置SWAP,并且設置開機自動更改
[root@cdh04?~]#?sysctl?vm.swappiness=1vm.swappiness?=?1[root@cdh04?~]#?echo?vm.swappiness?=?1??>>?/etc/sysctl.conf7.新添加的節點配置時鐘同步,先在所有服務器卸載chrony,再安裝ntp,再修改配置把時鐘同步跟其他kafka broker節點保持一致
[root@cdh04?~]#?yum?-y?remove?chrony[root@cdh04?~]#?yum?-y?install?ntpvim?/etc/ntp.conf啟動ntp服務,并設置開機啟動
[root@cdh04?java]#?systemctl?start?ntpd[root@cdh04 java]# systemctl enable ntpd8.新添加的節點安裝kerberos客戶端,并進行配置
[root@cdh04?~]#?yum?-y?install?krb5-libs?krb5-workstation從其他節點拷貝/etc/krb5.conf文件到新節點
9.從其他節點拷貝集群使用的JDK到新節點/usr/java目錄下
[root@cdh01?java]#?scp?-r?jdk1.8.0_131/?cdh04:/usr/java/jdk1.8.0_131/10.在CM編輯主機模板kafka,只添加kafka broker角色
2.3 擴容Kafka
1.添加新的節點到集群,從CM主頁點擊Add Hosts,如下圖所示
2.點擊繼續
3.搜索主機,輸入主機名,點擊搜索并繼續
4.輸入自定義存儲庫地址,并繼續
5.點擊安裝JDK并繼續
6.輸入主機密碼并繼續
7.正在進行安裝,安裝成功后點擊繼續
激活完成,點擊繼續
8.點擊繼續,進入主機檢查,添加主機完成
9.應用主機模板kafka,擴容完成
擴容完成
擴容后平衡
在擴容完成后,可以通過自帶的命令來生成topic的平衡策略和執行平衡的操作。
3.1 生成平衡策略
1.查詢當前創建的topic
[root@cdh01?~]#?kafka-topics?--list?--zookeeper?cdh01.hadoop.com:21812.按照查詢到的topic來創建文件topics-to-move.json 格式如下
{"topics":?[{"topic":?"test"},{"topic":?"test1"}????????????],"version":1}3.在CM查詢kafka broker的ID
4.用命令生成遷移方案,下面畫紅框的就是生成的平衡方案
kafka-reassign-partitions?--zookeeper?cdh01.hadoop.com:2181?--topics-to-move-json-file?topics-to-move.json??--broker-list?"121,124,125,126"?--generate5.把Proposed partition reassignment configuration下的內容復制保存為json文件newkafka.json,平衡方案生成完成。
3.2 執行平衡策略
1.執行下面的命令來進行平衡
[root@cdh01?~]#?kafka-reassign-partitions?--zookeeper?cdh01.hadoop.com:2181?--reassignment-json-file?newkafka.json?--execute2.用下面命令來進行查看執行的進度
kafka-reassign-partitions?--zookeeper?cdh01.hadoop.com:2181?--reassignment-json-file?newkafka.json?--verify從結果可以看到已經執行完成。,并且在新添加的節點上也可以看到有topic的副本存過來了。
縮容前的準備
1.編輯之前創建的topics-to-move.json文件,添加上系統自動生成的__consumer_offsets
[root@cdh01?~]#?vim?topics-to-move.json?{"topics":?[{"topic":?"test"},{"topic":?"__consumer_offsets"},{"topic":?"test1"}????????????],"version":1}2.再次使用命令生成遷移計劃,這里只選取121,124,126這三個broker,然后把生成計劃中的126替換成125進行保存,這樣就把126上的數據全部遷移到了125上。
kafka-reassign-partitions?--zookeeper?cdh01.hadoop.com:2181?--topics-to-move-json-file?topics-to-move.json??--broker-list?"121,124,126"?--generate3.執行遷移命令,進行遷移
kafka-reassign-partitions?--zookeeper?cdh01.hadoop.com:2181?--reassignment-json-file?newkafka.json?--execute4.進行查詢,遷移完成
kafka-reassign-partitions?--zookeeper?cdh01.hadoop.com:2181?--reassignment-json-file?newkafka.json?--verify5.在要刪除的broker上也可以看到,topic數據已經遷移走
Kafka集群的縮容
在完成上訴縮容前的準備后,現在可以進行kafka集群的縮容。
1.從CM進入Kafka的實例界面
2.勾選要刪除的broker,先停止該broker
3.停止完成后,進行刪除
刪除完成。
總結
1.Kafka集群的擴容和縮容可以通過CM來進行添加broker和刪除broker來進行。
2.在Kafka集群擴容后,已有topic的partition不會自動均衡到新的磁盤上。可以通過kafka-reassign-partitions命令來進行數據平衡,先用命令生成平衡方案,再執行。也可以手動編輯遷移方案來進行執行。
3.新建topic的partition, 會以磁盤為單位,按照partition數量最少的來落盤。
4.在Kafka縮容前,需要把要刪除的broker上的topic數據遷出,也可以通過kafka-reassign-partitions來進行遷移,手動編輯遷移方案,再通過命令執行即可。
5.kafka-reassign-partitions這個命令,只有指定了broker id上的topic才會參與partition的reassign。根據我們的需求,可以手動來編寫和修改遷移計劃。
總結
以上是生活随笔為你收集整理的kafka创建topic命令_0748-5.14.4-Kafka的扩容和缩容的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 个人日常理财知识,有哪些实用的理财小常识
- 下一篇: 服务器配置和性能,服务器性能配置(硬件)