kafka的offset笔记
版本
這個看起來有點多此一舉,
我一開始也是這麼想的。
後來經過測試發現,新版本的kafka已經不再兼容老版本的kafka中的命令了,所以本篇記錄是爲了針對新版本的kafka的相關操作的。
| 組件 | 版本 |
| Zookeeper | 3.6.0 |
| Kafka | 2.5.0 |
?
概念
①生產者Offset
生產者寫入topic的各個partition時,有多少個partition就有多少個offset
②消費者Offset
這是某一個分區的offset情況,我們已經知道生產者寫入的offset是最新最大的值也就是12,
而當Consumer A進行消費時,他從0開始消費,一直消費到了9,他的offset就記錄在了9,
Consumer B就紀錄在了11。
等下一次他們再來消費時,他們可以選擇接著上一次的位置消費,當然也可以選擇從頭消費,或者跳到最近的記錄并從“現在”開始消費。
此時,每個partition有多少消費組,那就有多少個offset
?
消費者組
費者組的概念其實并不影響對offset的理解,上面的情況Consumer A,Consumer B如果是同組就不能同時消費一個分區的消息,不同組的消費者可以同時消費一個分區的消息。
還有一種offset的說法,就是consumer消費未提交時,本地是有另外一個offset的,這個offset不一定與集群中記錄的offset一致。
所以,kafka每一個topic分區和生產者,消費者不同,是有多個offset的。
?
?
概念總結如下:
offset是指某一個分區的偏移量。
topic partition offset 這三個唯一確定一條消息。
生產者的offset其實就是最新的offset。
消費者的offset是他自己維護的,他可以選擇分區最開始,最新,也可以記住他消費到哪了。
消費者組是為了不同組的消費者可以同時消費一個分區的消息。
?
關於Group這個概念
首先注意,producer沒有group的說法,
kafka提到group一定是consumer這邊。
?
Group-id位置
| 具體文件 | 變量 |
| $KAFKA/config/consumer.properties | group.id |
| $KAFKA/config/connect-distributed.properties | group.id |
如果在新建topic的時候,不特別指定,那麼默認使用的是consumer.properties裏面的group.id
?
__consumer_offsets的哪個partition保存了consumer group的位移信息
查看$KAFKA/config/consumer.properties
得到group.id是test-consumer-group,
填入下面的代碼並運行
public class kafka_hash {public static void main(String args[]){System.out.println(Math.abs("test-consumer-group".hashCode()) % 50);}}實驗結果爲31
也就是說__consumer_offsets的partition 31保存了consumer group的位移信息
記住31這個數字,後面會用到
操作
操作 | 具體命令 |
| 創建topic | $KAFKA/bin/kafka-topics.sh --zookeeper Desktop:2181 --create --topic mytopic --replication-factor 3 --partitions 3 |
| 查看topic列表 | $KAFKA/bin/kafka-topics.sh --list --zookeeper Desktop:2181 |
| 生產數據 | $KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic mytopic |
| 存放在各個partition的offset終點 | $KAFKA/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list Desktop:9091,Laptop:9092,Laptop:9093 --topic mytopic --time -1 |
| 存放在各個partition的offset起點 | $KAFKA/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list Desktop:9091,Laptop:9092,Laptop:9093 --topic mytopic --time -2 |
| 查詢__consumer_offsets topic所有內容 | $KAFKA/bin/kafka-console-consumer.sh --consumer.config $KAFKA/config/consumer.properties \ --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \ --bootstrap-server Desktop:9091 --topic __consumer_offsets --from-beginning |
| 查詢__consumer_offsets的partition 31包含的關於mytopic的offset信息 | $KAFKA/bin/kafka-console-consumer.sh \
--formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
--bootstrap-server Desktop:9091 --topic __consumer_offsets --partition 31 --from-beginning ? |
上面的最後一句命令會得到:
[test-consumer-group,mytopic,1]::OffsetAndMetadata(offset=3, leaderEpoch=Optional[0], metadata=, commitTimestamp=1611494686173, expireTimestamp=None)
[test-consumer-group,mytopic,0]::OffsetAndMetadata(offset=4, leaderEpoch=Optional[0], metadata=, commitTimestamp=1611494686173, expireTimestamp=None)
[test-consumer-group,mytopic,2]::OffsetAndMetadata(offset=4, leaderEpoch=Optional[0], metadata=, commitTimestamp=1611494691174, expireTimestamp=None)
[test-consumer-group,mytopic,1]::OffsetAndMetadata(offset=3, leaderEpoch=Optional[0], metadata=, commitTimestamp=1611494691174, expireTimestamp=None)
?
上述表格中produce端需要的數據如下:
{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "561558", "item_id":"3611281", "category_id": "965809", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "894923", "item_id":"3076029", "category_id": "1879194", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "834377", "item_id":"4541270", "category_id": "3738615", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "315321", "item_id":"942195", "category_id": "4339722", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "625915", "item_id":"1162383", "category_id": "570735", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "578814", "item_id":"176722", "category_id": "982926", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "873335", "item_id":"1256540", "category_id": "1451783", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "429984", "item_id":"4625350", "category_id": "2355072", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "866796", "item_id":"534083", "category_id": "4203730", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "937166", "item_id":"321683", "category_id": "2355072", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}?
其他
另外,根據[5],在同一個consumer group中,對於同一個topic而言,只能有一個consumer消費到其中特定的一條數據。
[6]中提到了Coordinator
Reference:
[1]Kafka到底有幾個Offset?——Kafka核心之偏移量機制
[2]自己維護kafka_offset中的坑
[3]kafka查詢最新producer offset的命令
[4]Kafka 如何讀取offset topic內容 (__consumer_offsets)
[5]多個consumer使用同一個group.id消費同一個topic
[6]Kafka消費組(consumer group)
總結
以上是生活随笔為你收集整理的kafka的offset笔记的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: i春秋官网4.0上线啦 文末有福利
- 下一篇: GraphQL快速入门教程