日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka控制器,复制与存储小结

發布時間:2023/12/3 编程问答 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka控制器,复制与存储小结 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

【README】

  • 1,本文主要總結kafka復制,存儲細節;
  • 2,本文的kafka集群版本是3.0.0, 有3個broker,分別是 centos201, centos202, centos203 對應的brokerid為 1, 2, 3 ;

【1】kafka內部原理

【1.1】broker-消息中心點

1)broker一個獨立的kafka服務器節點;也稱為發送消息的中心點

  • kafka使用zk維護集群成員關系;
  • 每個broker都有自己的id存儲在zk;broker啟動時,創建zk節點把自己id注冊到zk;

2)zk存儲的kafka集群信息的節點列表

# zk存儲的kafka集群信息的節點 [zk: localhost:2181(CONNECTED) 1] ls / [cluster,controller_epoch,controller,brokers,zookeeper,feature,admin,isr_change_notification,consumers,log_dir_event_notification,latest_producer_id_block,config]

查看zk中的 broker id

# 查看kafka brokerid 和 topic [zk: localhost:2181(CONNECTED) 2] ls /brokers [ids, topics, seqid] [zk: localhost:2181(CONNECTED) 3] ls /brokers/ids [1, 2, 3] [zk: localhost:2181(CONNECTED) 4] ls /brokers/topics [hello04, hello05, hello02, hello03, hello01, hello10, __consumer_offsets]

【1.2】控制器

1)控制器定義:集群里第一個啟動的broker通過在zk創建臨時節點 /controller 讓自己成為控制器;

其他broker也嘗試創建 controller 節點,若已存在,則報錯;其他 broker 會在控制器節點上創建 zk watch 對象,這樣非控制器節點可以收到控制器節點狀態變更的通知;(干貨——這種方式可以確保一個集群只能有一個控制器存在,防止腦裂問題

2)控制器選舉策略:一旦控制器被關閉或與zk斷開,其他broker通過watch對象就會收到控制器消失的通知,這些 非控制器broker 會競爭在 zk 上創建 controller節點,誰最先創建成功,誰就是集群控制器; 然后其他broker在控制器節點上創建 zk watch對象;

  • 2.1)每次控制器選舉后: 控制器紀元值(時代值)controller_epoch? 都會遞增;其他broker若收到控制器發出的包含舊 epoch 的消息,就會忽略;

3)控制器實驗

step1) 查看 控制器和控制器紀元

[zk: localhost:2181(CONNECTED) 5] get /controller_epoch 6[zk: localhost:2181(CONNECTED) 6] get /controller {"version":1,"brokerid":1,"timestamp":"1638692039821"}

顯然, epoch是6,控制器是broker1;

step2)停止掉 broker1;?

這個時候,broker2,3 會競爭選舉為控制器;我們再次查看控制器,發現控制器現在是broker2了;且 epoch自增為7;?

[zk: localhost:2181(CONNECTED) 7] get /controller_epoch 7[zk: localhost:2181(CONNECTED) 8] get /controller {"version":1,"brokerid":2,"timestamp":"1638733315396"}

4)控制器作用

  • 控制器負責在broker加入或離開時進行分區首領選舉;
  • 控制器使用 epoch 避免腦裂問題

【補充】腦裂指兩個節點同時認為自己是集群控制器;?

5)zk的作用

  • kafka使用zk的臨時節點來選舉控制器;
  • zk在broker加入或退出集群時通知控制器;

  • 【1.3】復制

    復制功能是kafka架構的核心;在kafka 文檔里,kafka把自己描述為 一個分布式的,可分區的,可復制的提交日志服務;(kakfa的日志就是數據或消息);

    【1.3.1】副本

    1)數據存儲

    kafka使用主題來組織數據(邏輯);使用分區為單位來讀寫數據(物理);

    為什么說kakfa以分區為單位讀寫? 是因為我們創建帶有分區數和副本數的主題后, kakfa會創建以這個分區命名的文件夾,分區文件夾下存儲消息內容,索引文件等;


    ?2)主題,分區,副本關系

    • 1個主題對應多個分區;
    • 1個分區對應多個副本;
    • 1個副本對應多個分段文件;(分段存儲)?

    3)副本類型?

    • 3.1)首領副本:每個分區都有一個首領副本,消息讀寫首先會操作首領副本;
    • 3.2)跟隨者副本:首領副本以外的副本;它們不處理讀寫請求,唯一任務是從首領副本復制消息,與首領保持數據同步;如果首領發生崩潰,其中一個同步的跟隨者副本被提升為首領副本;

    補充1:跟隨者副本在成為不同步副本前的時間是通過 replica.lag.time.max.ms 來配置;

    補充2:跟隨者從首領副本復制消息時的請求,與消費者從首領副本消費消息時發出的請求是一樣的;


    【1.4】處理請求

    1)broker處理請求過程

    • step1)broker會在監聽端口上運行一個 Acceptor線程可以理解為服務器套接字 ServerSocket),這個線程會創建一個連接(類似ServerSocket.accept() 方法),把請求交給 Processor線程(網絡線程)去處理;
    • step2)Processor線程從客戶端獲取請求消息,把它放進請求隊列,然后從響應隊列獲取響應結果并發送給客戶端;
    • step3) 在請求被放入請求隊列后, IO線程會處理它們,并把處理結果放入 響應隊列;

    ?2)常見請求類型

    • 生產請求:生產者發送的請求,包含要寫入的消息;
    • 獲取請求:消費者或跟隨者副本所在broker需要從首領副本所在broker獲取消息而發送的請求;

    【注意】

  • 生產請求和獲取請求都必須發送給分區的首領副本,跟隨者副本不參與消息讀寫,僅做備份和支持集群高可用;
  • kafka客戶端要自己負責把生產請求和獲取請求發送到正確的broker上;否則broker會返回錯誤響應;
  • 3)客戶端怎么知道請求發送到哪里呢?

    3.1)客戶端在發送請求前,先發送元數據請求

    • 這種請求的響應結果包括 主題,主題分區,分區副本以及首領副本;

    3.2)客戶端會緩存這些元數據信息;

    • 獲取元數據信息后,會直接往對應的 broker發送請求和獲取請求;
    • 當然,客戶端需要定時刷新元數據緩存; 刷新時間間隔通過? metadata.max.age.ms 來配置;?


    【1.4.1】生產請求

    1)生產者acks有3個值;

    • acks=0 ; 生產者在發送消息后,默認發送成功;而不會等待服務器響應;
    • acks=1 ; 只要集群的首領節點收到消息,生產者就會收到發送成功的響應;而不管副本節點是否收到消息;
    • acks=all; 需要集群的首領節點和跟隨節點(副本節點)都收到消息后,生產者才會收到發送成功的響應;

    2)首領副本所在broker收到生產請求后,會對請求做一些驗證:

    • 發送數據的用戶是否有寫入權限;
    • acks的值是否合法; (只允許出現 0, 1, all);
    • 根據acks的值,進行副本復制策略;

    【1.4.2】獲取請求

    1)首領副本所在broker收到獲取請求后,會根據客戶端指定的請求偏移量從分區里讀取消息;

    2)kafka使用 零復制技術 向客戶端發送消息,即kafka直接把消息從文件發送到網絡通道,而不經過任何中間緩沖區;(干貨——這是kakfa與大部分數據庫不一樣的地方,其他數據庫在把數據發送到客戶端前,會把數據保存到本地緩存)

    • 零復制技術優點:避免了字節復制,也不需要管理內存緩沖區,從而獲取更好性能;?

    3)消費者客戶端只能讀取已經被寫入所有同步副本的消息,而不是所有消息

    • 因為還沒有被足夠多副本復制的消息被認為是不安全的;如果首領副本所在broker發送崩潰,另一副本成為新首領,那這些不安全的消息就會丟失;

    4)擴展 ISR, HW高水位

  • ISR, In-Sync-Replica set, 同步副本集合,即所有與首領副本保持同步的副本集合;
  • LEO,log end offset,日志末端偏移量 ,即副本寫入下一條消息的位移值;
  • HW高水位,High Watermark, 所有副本最小的LEO
  • 小結: 消費者只能看到已經復制所有副本的消息;

    5)在 Kafka 中,高水位的作用主要有 2 個。

  • 定義消息可見性,即用來標識分區下的哪些消息是可以被消費者消費的。
  • 幫助 Kafka 完成副本同步。
  • 6)下面這張圖展示了多個與高水位相關的 Kafka 術語 。

    我們假設這是某個分區 Leader副本的高水位圖。

    1)首先,請你注意圖中的“已提交消息”和“未提交消息”。在分區高水位以下的消息被認為是已提交消息,反之就是未提交消息。消費者只能消費已提交消息,即圖中位移小于 8 的所有消息。另外,需要關注的是,位移值等于高水位的消息也屬于未提交消息。也就是說,高水位上的消息是不能被消費者消費的

    2)圖中還有一個日志末端位移的概念,即 Log End Offset,簡寫是 LEO。
    它表示副本寫入下一條消息的位移值。注意,數字 15 所在的方框是虛線,這就說明,這個副本當前只有 15 條消息,位移值是從 0 到 14,下一條新消息的位移是 15。顯然,介于高水位和 LEO 之間的消息就屬于未提交消息。這也從側面告訴了我們一個重要的事實,那就是:同一個副本對象,其高水位值不會大于 LEO 值。

    【高水位小結】高水位和 LEO 是副本對象的兩個重要屬性

  • Kafka 所有副本都有對應的高水位和 LEO 值,而不僅僅是 Leader 副本。
  • Kafka 使用 Leader 副本的高水位來定義所在分區的高水位。

  • ?【1.4.3】其他請求

    • OffsetCommitRequest, 偏移量提交請求;
    • OffsetFetchRequest;
    • ListOffsetsRequest;

    【1.5】物理存儲

    1)kafka的基本存儲單元是分區; 分區會在所屬broker上的kafka數據根目錄下新建名為分區名的文件夾,如 hello04-2(主題為hello04的2號分區文件夾),kafka數據根目錄由 server.properties 中的 log.dirs 來指定;

    2)主題,分區,副本關系

    • 1個主題對應多個分區;
    • 1個分區對應多個副本;
    • 1個副本對應多個分段文件;(分段存儲)?


    【1.5.1】分區分配

    1)創建指定分區和副本數的topic來做實驗

    # 創建分區數3副本數2的主題 kafka-topics.sh --bootstrap-server centos201:9092 --create --topic hello11 --partitions 3 --replication-factor 2 # 副本數量必須小于等于broker數量,但分區數沒有這個限制;

    查看分區詳情

    [root@centos201 hello04-1]# kafka-topics.sh --bootstrap-server centos201:9092 \ --describe --topic hello11 Topic: hello11 TopicId: IliU_BDeS8ycreLufxCMMw PartitionCount: 3 ReplicationFactor: 2 Configs: segment.bytes=1024Topic: hello11 Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3Topic: hello11 Partition: 1 Leader: 3 Replicas: 3,1 Isr: 3,1Topic: hello11 Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2

    查看具體存儲數據的文件夾,以broker1為例;?

    根據topic詳情,我們知道 broker1 存儲了topic hello11的1號和2號分區; 且它是2號分區首領所在的broker

    進入 broker1的kafka數據根目錄,

    ?

    ?進入其中一個分區文件夾查看? hello11-1 ,如下:

    再查看分區文件夾前,我們先寫入10條消息; 指定topic hello11, 1號分區

    for (int i = 0; i < 10; i++) {Future<RecordMetadata> future = producer.send( new ProducerRecord<String, String>("hello11", 1,"", String.format("[%s] ", order++) + now + " > " + DataFactory.INSTANCE.genOneHundred()));try {System.out.println("[生產者] " + future.get().partition() + "-" + future.get().offset());} catch (Exception e) {e.printStackTrace();} }

    查看分區文件夾下的文件 ;

    2) kafka的分段存儲

    因為在一個大文件里查找和刪除消息很耗時;所以把一個分區分成若干片段進行存儲;默認情況下,一個片段存儲1g數據,為了實驗,這里我修改為 1k,可以在 server.properties文件中設置 log.segment.bytes=1024 來實現;

    3)kafka的稀疏索引

    • kafka并沒有對每條消息建立索引,那樣太大了,而是采用稀疏索引(稀疏存儲)的方式,即一條索引記錄指向一個消息范圍;

    例如: 索引值 1~100 指向 數據文件1.log中的消息1到消息100的消息范圍的起始地址;

    refer2 Apache Kafka ;

    當消費者指定消費某個offset記錄時, kafka集群通過二分查找從索引文件找出包含offset的索引值,通過索引值找到對應數據文件的起始地址,然后從起始地址開始順序讀取對應offset的消息;


    【1.5.2】文件格式

    1)kafka 使用零復制技術給消費者發送消息,避免了對生產者已經壓縮過的消息進行解壓和再壓縮;?

    2)普通消息與壓縮消息格式 ?

    ?可以看出,多個壓縮消息共用同一個消息頭,從而減少消息大小;

    【References】

  • kafka權威指南;
  • Apache Kafka
  • 總結

    以上是生活随笔為你收集整理的kafka控制器,复制与存储小结的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。