kafka源码分析(二)Metadata的数据结构与读取、更新策略
生活随笔
收集整理的這篇文章主要介紹了
kafka源码分析(二)Metadata的数据结构与读取、更新策略
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
?
一、基本思路
?異步發送的基本思路就是:send的時候,KafkaProducer把消息放到本地的消息隊列RecordAccumulator,然后一個后臺線程Sender不斷循環,把消息發給Kafka集群。
要實現這個,還得有一個前提條件:就是KafkaProducer/Sender都需要獲取集群的配置信息Metadata。所謂Metadata,也就是在上一篇所講的,Topic/Partion與broker的映射關系:每一個Topic的每一個Partion,得知道其對應的broker列表是什么,其中leader是誰,follower是誰。
?
二、2個數據流
?
所以在上圖中,有2個數據流:?
Metadata流(A1,A2,A3):Sender從集群獲取信息,然后更新Metadata; KafkaProducer先讀取Metadata,然后把消息放入隊列。
?
消息流(B1, B2, B3)
從上圖可以看出,Metadata是多個producer線程讀,一個sender線程更新,因此它必須是線程安全的
?
三、Metadata的線程安全性
從下面代碼也可以看出,它的所有public方法都是synchronized:
1 public final class Metadata {
2 。。。
3 public synchronized Cluster fetch() {
4 return this.cluster;
5 }
6 public synchronized long timeToNextUpdate(long nowMs) {
7 。。。
8 }
9 public synchronized int requestUpdate() {
10 。。。
11 }
12 。。。
13 } ?
?
四、Metadata的數據結構
1 public final class Metadata { 2 ... 3 private final long refreshBackoffMs; //更新失敗的情況下,下1次更新的補償時間(這個變量在代碼中意義不是太大) 4 private final long metadataExpireMs; //關鍵值:每隔多久,更新一次。缺省是600*1000,也就是10分種 5 private int version; //每更新成功1次,version遞增1。這個變量主要用于在while循環,wait的時候,作為循環判斷條件 6 private long lastRefreshMs; //上一次更新時間(也包含更新失敗的情況) 7 private long lastSuccessfulRefreshMs; //上一次成功更新的時間(如果每次都成功的話,則2者相等。否則,lastSuccessulRefreshMs < lastRefreshMs) 8 private Cluster cluster; //集群配置信息 9 private boolean needUpdate; //是否強制刷新 10 、 11 ... 12 }
?
轉載于:https://www.cnblogs.com/zcjcsl/p/8746561.html
總結
以上是生活随笔為你收集整理的kafka源码分析(二)Metadata的数据结构与读取、更新策略的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 天凤给什么特性??
- 下一篇: Apache 流框架 Flink,Spa