日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka(六)Kafka基本客户端命令操作

發(fā)布時間:2024/8/23 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka(六)Kafka基本客户端命令操作 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

轉(zhuǎn)載自:https://blog.51cto.com/littledevil/2147950

主題管理

創(chuàng)建主題

如果配置了auto.create.topics.enable=true(這也是默認值)這樣當生產(chǎn)者向一個沒有創(chuàng)建的主題發(fā)送消息就會自動創(chuàng)建,其分區(qū)數(shù)量和副本數(shù)量也是有默認配置來控制的。

# 我們這里創(chuàng)建一個3個分區(qū)每個分區(qū)有2個副本的主題 kafka-topics.sh --create --zookeeper 172.16.48.171:2181/kafka --replication-factor 2 --partitions 3 --topic KafkaTest
--create表示建立
--zookeeper

表示ZK地址,可以傳遞多個,用逗號分隔

--zookeeper IP:PORT,IP:PORT,IP:PORT/kafka

--replication-factor表示副本數(shù)量,這里的數(shù)量是包含Leader副本和Follower副本,副本數(shù)量不能超過代理數(shù)量
--partitions表示主題的分區(qū)數(shù)量,必須傳遞該參數(shù)。Kafka的生產(chǎn)者和消費者采用多線程并行對主題的消息進行處理,每個線程處理一個分區(qū),分區(qū)越多吞吐量就會越大,但是分區(qū)越多也意味著需要打開更多的文件句柄數(shù)量,這樣也會帶來一些開銷。
--topic表示主題名稱

在Zookeeper中可以看到如下信息

刪除主題

刪除有兩種方式手動和自動

  • 手動方式需要刪除各個節(jié)點日志路徑下的該主題所有分區(qū),并且刪除zookeeper上/brokers/topics和/config/topics下的對應(yīng)主題節(jié)點

  • 自動刪除就是通過腳本來完成,同時需要配置服務(wù)器配置文件中的delete.topic.enable=true,默認為false也就是說通過命令刪除主題只會刪除ZK中的節(jié)點,日志文件不會刪除需要手動清理,如果配置為true,則會自動刪除日志文件。

kafka-topics.sh --delete --zookeeper 172.16.48.171:2181/kafka --topic KafkaTest

下面的兩句話就是說該主題標記為刪除/admin/delete_topics節(jié)點下。實際數(shù)據(jù)沒有影響因為該參數(shù)沒有設(shè)置為true。

查看主題

# 列出所有主題 kafka-topics.sh --list --zookeeper 172.16.48.171:2181/kafka

下面是從ZK中看到的所有主題

# 查看所有主題信息 kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka

# 查看特定主題信息 kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka --topic BBB

Replicas:是AR列表,表示副本分布在哪些代理上,且該列表第一個元素就是Leader副本所在代理

ISR:該列表是顯示已經(jīng)同步的副本集合,這個列表的副本都是存活的

# 通過--describe 和 --under-replicated-partitions 可以查看正在同步的主題或者同步可能發(fā)生異常, # 也就是ISR列表長度小于AR列表,如果一切正常則不會返回任何東西,也可以通過 --tipic 指定具體主題 kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka --under-replicated-partitions # 查看哪些主題建立時使用了單獨的配置 kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka --topics-with-overrides

這里只有一個內(nèi)部主題__comsumer_offsets使用了非配置文件中的設(shè)置

?

配置管理

所謂配置就是參數(shù),比如修改主題的默認參數(shù)。

主題級別的

# 查看配置 kafka-configs.sh --describe --zookeeper 172.16.48.171:2181/kafka --entity-type topics --entity-name BB

這里顯示 Configs for topic 'BBB' are 表示它的配置有哪些,這里沒有表示沒有為該主題單獨設(shè)置配置,都是使用的默認配置。

# 增加一個配置 kafka-configs.sh --zookeeper 172.16.48.171:2181/kafka --entity-type topics --entity-name BBB --alter --add-config flush.messages=2

如果修改的話還是相同的命令,只是把值修改一下

# 刪除配置 kafka-configs.sh --zookeeper 172.16.48.171:2181/kafka --entity-type topics --entity-name BBB --alter --delete-config flush.messages

客戶端級別

這個主要是設(shè)置流控

# 設(shè)置指定消費者的流控 --entity-name 是客戶端在創(chuàng)建生產(chǎn)者或者消費者時是指定的client.id名稱 kafka-configs.sh --zookeeper 172.16.48.171:2181/kafka --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=1024' --entity-type clients --entity-name COMSUMER_NAME

下圖為ZK中對應(yīng)的信息

?

分區(qū)管理

分區(qū)平衡

Leader副本在集群中應(yīng)該是均衡分布,因為Leader副本對外提供讀寫服務(wù),盡可能不讓同一個主題的多個Leader副本在同一個代理上,但是隨著時間推移比如故障轉(zhuǎn)移等情況發(fā)送,Leader副本可能不均衡。有兩種方式設(shè)置自動平衡,自動和手動。

自動就是在配置文件中增加?auto.leader.rebalance.enable?=?true?如果該項為false,當某個節(jié)點故障恢復(fù)并重新上線后,它原來的Leader副本也不會轉(zhuǎn)移回來,只是一個Follower副本。

手動就是通過命令來執(zhí)行

kafka-preferred-replica-election.sh --zookeeper 172.16.48.171:2181/kafka

分區(qū)遷移

當下線一個節(jié)點需要將該節(jié)點上的分區(qū)副本遷移到其他可用節(jié)點上,Kafka并不會自動進行分區(qū)遷移,如果不遷移就會導(dǎo)致某些主題數(shù)據(jù)丟失和不可用的情況。當增加新節(jié)點時,只有新創(chuàng)建的主題才會分配到新節(jié)點上,之前的主題分區(qū)不會自動分配到新節(jié)點上,因為老的分區(qū)在創(chuàng)建時AR列表中沒有這個新節(jié)點。

上面2個主題,每個主題3個分區(qū),每個分區(qū)3個副本,我們假設(shè)現(xiàn)在代理2要下線,所以我們要把代理2上的這兩個主題的分區(qū)數(shù)據(jù)遷移出來。

# 1. 在KAFKA目錄的config目錄中建立topics-to-move.json文件 {"topics":[{"topic":"AAA"},{"topic":"BBB"}],"version":1 }

?

# 2. 生成分區(qū)分配方案,只是生成一個方案信息然后輸出 kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --topics-to-move-json-file ./topics-to-move.json --broker-list "1,2" --generate

這個命令的原理是從zookeeper中讀取主題元數(shù)據(jù)信息及制定的有效代理,根據(jù)分區(qū)副本分配算法重新計算指定主題的分區(qū)副本分配方案。把【Proposed partition reassignment configuration】下面的分區(qū)方案保存到一個JSON文件中,partitions-reassignment.json 文件名無所謂。

# 3. 執(zhí)行方案 kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./partitions-reassignment.json --execute

# 4. 查看進度 kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./partitions-reassignment.json --verify

查看結(jié)果,這里已經(jīng)沒有代理0了。

集群擴容

上面演示了節(jié)點下線的數(shù)據(jù)遷移,這里演示一下集群擴容的數(shù)據(jù)遷移。我們還是用上面兩個主題,假設(shè)代理0又重新上線了。其實擴容就是上面的反向操作

# 1. 建立JSON文件 # 該文件和之前的相同

?

# 2. 生成方案并保存到一個JSON文件中 kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --topics-to-move-json-file ./topics-to-move.json --broker-list "0,1,2" --generate

# 3. 數(shù)據(jù)遷移,這里通過--throttle做一個限流操作,如果數(shù)據(jù)過大會把網(wǎng)絡(luò)堵塞。 kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./partitions-reassignment.json --execute --throttle 1024

查看進度和結(jié)果

增加分區(qū)

通常在需要提供吞吐量的時候我們會增加分區(qū),然后如果代理數(shù)量不擴大,同時生產(chǎn)者和消費者線程不增大,你擴展了分區(qū)也沒有用。

kafka-topics.sh --alter --zookeeper 172.16.48.171:2181/kafka --partitions 3 --topic KafkaTest03

增加副本

集群規(guī)模擴大并且想對所有主題或者指定主題提高可用性,那么可以增加原有主題的副本數(shù)量

上面是3個分區(qū),每個分區(qū)1個副本,我們現(xiàn)在把每個分區(qū)擴展為3個副本

# 1. 創(chuàng)建JSON文件 replica-extends.json {"version": 1,"partitions": [{"topic": "KafkaTest04","partition": 0,"replicas": [0,1,2]},{"topic": "KafkaTest04","partition": 1,"replicas": [0,1,2]},{"topic": "KafkaTest04","partition": 2,"replicas": [0,1,2]}] } # 2. 執(zhí)行分區(qū)副本重新分配命令 kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./replica-extends.json --execute

查看狀態(tài)

查看結(jié)果

?

鏡像操作

Kafka有一個鏡像工具kafka-mirror-maker.sh,用于將一個集群數(shù)據(jù)同步到另外一個集群中,這個非常有用,比如機房搬遷就需要進行數(shù)據(jù)同步。該工具的本質(zhì)就是創(chuàng)建一個消費者,在源集群中需要遷移的主題消費數(shù)據(jù),然后創(chuàng)建一個生產(chǎn)者,將消費的數(shù)據(jù)寫入到目標集群中。

首先創(chuàng)建消費者配置文件mirror-consumer.properties(文件路徑和名稱是自定義的)

# 源kafka集群代理地址列表 bootstrap.servers=IP1:9092,IP2:9092,IP3:9092 # 消費者組名 group.id=mirror

其次創(chuàng)建生產(chǎn)者配置文件mirror-producer.properties(文件路徑和名稱是自定義的)

# 目標kafka集群地址列表 bootstrap.servers=IP1:9092,IP2:9092,IP3:9092

運行鏡像命令

# 通過 --whitelist 指定需要鏡像的主題,通過 --blacklist 指定不需要鏡像的主題 kafka-mirror-maker.sh --consumer.config PATH/mirror-consumer.properties --producer.config PATH/mirror-producer.properties --whitelist TOPIC

由于鏡像操作是啟動一個生產(chǎn)者和消費者,所以數(shù)據(jù)同步完成后這個生產(chǎn)者和消費者并不會關(guān)閉,它會依然等待新數(shù)據(jù),所以同步完成以后你需要自己查看,確認完成了則關(guān)閉生產(chǎn)者和消費者。

總結(jié)

以上是生活随笔為你收集整理的Kafka(六)Kafka基本客户端命令操作的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。