日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

kafka的offset笔记

發布時間:2023/12/31 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka的offset笔记 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

版本

這個看起來有點多此一舉,

我一開始也是這麼想的。

後來經過測試發現,新版本的kafka已經不再兼容老版本的kafka中的命令了,所以本篇記錄是爲了針對新版本的kafka的相關操作的。

組件版本
Zookeeper3.6.0
Kafka2.5.0

?

概念

①生產者Offset

生產者寫入topic的各個partition時,有多少個partition就有多少個offset

②消費者Offset

這是某一個分區的offset情況,我們已經知道生產者寫入的offset是最新最大的值也就是12,

而當Consumer A進行消費時,他從0開始消費,一直消費到了9,他的offset就記錄在了9,

Consumer B就紀錄在了11。

等下一次他們再來消費時,他們可以選擇接著上一次的位置消費,當然也可以選擇從頭消費,或者跳到最近的記錄并從“現在”開始消費。

此時,每個partition有多少消費組,那就有多少個offset

?

消費者組

費者組的概念其實并不影響對offset的理解,上面的情況Consumer A,Consumer B如果是同組就不能同時消費一個分區的消息,不同組的消費者可以同時消費一個分區的消息。

還有一種offset的說法,就是consumer消費未提交時,本地是有另外一個offset的,這個offset不一定與集群中記錄的offset一致。

所以,kafka每一個topic分區和生產者,消費者不同,是有多個offset的。

?

?

概念總結如下:

offset是指某一個分區的偏移量。

topic partition offset 這三個唯一確定一條消息。

生產者的offset其實就是最新的offset。

消費者的offset是他自己維護的,他可以選擇分區最開始,最新,也可以記住他消費到哪了。

消費者組是為了不同組的消費者可以同時消費一個分區的消息。

?

關於Group這個概念

首先注意,producer沒有group的說法,

kafka提到group一定是consumer這邊。

?

Group-id位置

具體文件變量
$KAFKA/config/consumer.propertiesgroup.id
$KAFKA/config/connect-distributed.propertiesgroup.id

如果在新建topic的時候,不特別指定,那麼默認使用的是consumer.properties裏面的group.id

?

__consumer_offsets的哪個partition保存了consumer group的位移信息

查看$KAFKA/config/consumer.properties

得到group.id是test-consumer-group,

填入下面的代碼並運行

public class kafka_hash {public static void main(String args[]){System.out.println(Math.abs("test-consumer-group".hashCode()) % 50);}}

實驗結果爲31

也就是說__consumer_offsets的partition 31保存了consumer group的位移信息

記住31這個數字,後面會用到

操作

操作

具體命令

創建topic$KAFKA/bin/kafka-topics.sh --zookeeper Desktop:2181 --create --topic mytopic --replication-factor 3 --partitions 3
查看topic列表$KAFKA/bin/kafka-topics.sh --list --zookeeper Desktop:2181
生產數據$KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic mytopic
存放在各個partition的offset終點$KAFKA/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list Desktop:9091,Laptop:9092,Laptop:9093 --topic mytopic --time -1
存放在各個partition的offset起點

$KAFKA/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list Desktop:9091,Laptop:9092,Laptop:9093 --topic mytopic --time -2

查詢__consumer_offsets topic所有內容$KAFKA/bin/kafka-console-consumer.sh --consumer.config $KAFKA/config/consumer.properties \
--formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
--bootstrap-server Desktop:9091 --topic __consumer_offsets --from-beginning
查詢__consumer_offsets的partition 31包含的關於mytopic的offset信息 $KAFKA/bin/kafka-console-consumer.sh \ --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \ --bootstrap-server Desktop:9091 --topic __consumer_offsets --partition 31 --from-beginning

?

上面的最後一句命令會得到:

[test-consumer-group,mytopic,1]::OffsetAndMetadata(offset=3, leaderEpoch=Optional[0], metadata=, commitTimestamp=1611494686173, expireTimestamp=None)
[test-consumer-group,mytopic,0]::OffsetAndMetadata(offset=4, leaderEpoch=Optional[0], metadata=, commitTimestamp=1611494686173, expireTimestamp=None)
[test-consumer-group,mytopic,2]::OffsetAndMetadata(offset=4, leaderEpoch=Optional[0], metadata=, commitTimestamp=1611494691174, expireTimestamp=None)
[test-consumer-group,mytopic,1]::OffsetAndMetadata(offset=3, leaderEpoch=Optional[0], metadata=, commitTimestamp=1611494691174, expireTimestamp=None)
?

上述表格中produce端需要的數據如下:

{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "561558", "item_id":"3611281", "category_id": "965809", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "894923", "item_id":"3076029", "category_id": "1879194", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "834377", "item_id":"4541270", "category_id": "3738615", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "315321", "item_id":"942195", "category_id": "4339722", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "625915", "item_id":"1162383", "category_id": "570735", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "578814", "item_id":"176722", "category_id": "982926", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "873335", "item_id":"1256540", "category_id": "1451783", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "429984", "item_id":"4625350", "category_id": "2355072", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "866796", "item_id":"534083", "category_id": "4203730", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "937166", "item_id":"321683", "category_id": "2355072", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

?

其他

另外,根據[5],在同一個consumer group中,對於同一個topic而言,只能有一個consumer消費到其中特定的一條數據。

[6]中提到了Coordinator

Reference:

[1]Kafka到底有幾個Offset?——Kafka核心之偏移量機制
[2]自己維護kafka_offset中的坑

[3]kafka查詢最新producer offset的命令

[4]Kafka 如何讀取offset topic內容 (__consumer_offsets)

[5]多個consumer使用同一個group.id消費同一個topic

[6]Kafka消費組(consumer group)

總結

以上是生活随笔為你收集整理的kafka的offset笔记的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。