日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

kafka源码分析(二)Metadata的数据结构与读取、更新策略

發布時間:2023/11/27 生活经验 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka源码分析(二)Metadata的数据结构与读取、更新策略 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

?

一、基本思路

?異步發送的基本思路就是:send的時候,KafkaProducer把消息放到本地的消息隊列RecordAccumulator,然后一個后臺線程Sender不斷循環,把消息發給Kafka集群。

要實現這個,還得有一個前提條件:就是KafkaProducer/Sender都需要獲取集群的配置信息Metadata。所謂Metadata,也就是在上一篇所講的,Topic/Partion與broker的映射關系:每一個Topic的每一個Partion,得知道其對應的broker列表是什么,其中leader是誰,follower是誰。

?

二、2個數據流

?

所以在上圖中,有2個數據流:?

Metadata流(A1,A2,A3):Sender從集群獲取信息,然后更新Metadata; KafkaProducer先讀取Metadata,然后把消息放入隊列。

?

消息流(B1, B2, B3)

從上圖可以看出,Metadata是多個producer線程讀,一個sender線程更新,因此它必須是線程安全的

?

三、Metadata的線程安全性

從下面代碼也可以看出,它的所有public方法都是synchronized:

 1 public final class Metadata {
 2   。。。
 3     public synchronized Cluster fetch() {
 4         return this.cluster;
 5     }
 6     public synchronized long timeToNextUpdate(long nowMs) {
 7        。。。
 8     }
 9     public synchronized int requestUpdate() {
10       。。。
11     }
12     。。。    
13

?

?

四、Metadata的數據結構

 1 public final class Metadata {
 2 ...
 3     private final long refreshBackoffMs;  //更新失敗的情況下,下1次更新的補償時間(這個變量在代碼中意義不是太大)
 4     private final long metadataExpireMs; //關鍵值:每隔多久,更新一次。缺省是600*1000,也就是10分種
 5     private int version;         //每更新成功1次,version遞增1。這個變量主要用于在while循環,wait的時候,作為循環判斷條件
 6     private long lastRefreshMs;  //上一次更新時間(也包含更新失敗的情況)
 7     private long lastSuccessfulRefreshMs; //上一次成功更新的時間(如果每次都成功的話,則2者相等。否則,lastSuccessulRefreshMs < lastRefreshMs)
 8     private Cluster cluster;   //集群配置信息
 9     private boolean needUpdate;  //是否強制刷新
10 11   ...
12 }

?

轉載于:https://www.cnblogs.com/zcjcsl/p/8746561.html

總結

以上是生活随笔為你收集整理的kafka源码分析(二)Metadata的数据结构与读取、更新策略的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。