kafka控制器,复制与存储小结
【README】
- 1,本文主要總結kafka復制,存儲細節;
- 2,本文的kafka集群版本是3.0.0, 有3個broker,分別是 centos201, centos202, centos203 對應的brokerid為 1, 2, 3 ;
【1】kafka內部原理
【1.1】broker-消息中心點
1)broker:一個獨立的kafka服務器節點;也稱為發送消息的中心點;
- kafka使用zk維護集群成員關系;
- 每個broker都有自己的id存儲在zk;broker啟動時,創建zk節點把自己id注冊到zk;
2)zk存儲的kafka集群信息的節點列表
# zk存儲的kafka集群信息的節點 [zk: localhost:2181(CONNECTED) 1] ls / [cluster,controller_epoch,controller,brokers,zookeeper,feature,admin,isr_change_notification,consumers,log_dir_event_notification,latest_producer_id_block,config]查看zk中的 broker id
# 查看kafka brokerid 和 topic [zk: localhost:2181(CONNECTED) 2] ls /brokers [ids, topics, seqid] [zk: localhost:2181(CONNECTED) 3] ls /brokers/ids [1, 2, 3] [zk: localhost:2181(CONNECTED) 4] ls /brokers/topics [hello04, hello05, hello02, hello03, hello01, hello10, __consumer_offsets]【1.2】控制器
1)控制器定義:集群里第一個啟動的broker通過在zk創建臨時節點 /controller 讓自己成為控制器;
其他broker也嘗試創建 controller 節點,若已存在,則報錯;其他 broker 會在控制器節點上創建 zk watch 對象,這樣非控制器節點可以收到控制器節點狀態變更的通知;(干貨——這種方式可以確保一個集群只能有一個控制器存在,防止腦裂問題)
2)控制器選舉策略:一旦控制器被關閉或與zk斷開,其他broker通過watch對象就會收到控制器消失的通知,這些 非控制器broker 會競爭在 zk 上創建 controller節點,誰最先創建成功,誰就是集群控制器; 然后其他broker在控制器節點上創建 zk watch對象;
- 2.1)每次控制器選舉后: 控制器紀元值(時代值)controller_epoch? 都會遞增;其他broker若收到控制器發出的包含舊 epoch 的消息,就會忽略;
3)控制器實驗
step1) 查看 控制器和控制器紀元
[zk: localhost:2181(CONNECTED) 5] get /controller_epoch 6[zk: localhost:2181(CONNECTED) 6] get /controller {"version":1,"brokerid":1,"timestamp":"1638692039821"}顯然, epoch是6,控制器是broker1;
step2)停止掉 broker1;?
這個時候,broker2,3 會競爭選舉為控制器;我們再次查看控制器,發現控制器現在是broker2了;且 epoch自增為7;?
[zk: localhost:2181(CONNECTED) 7] get /controller_epoch 7[zk: localhost:2181(CONNECTED) 8] get /controller {"version":1,"brokerid":2,"timestamp":"1638733315396"}4)控制器作用
- 控制器負責在broker加入或離開時進行分區首領選舉;
- 控制器使用 epoch 避免腦裂問題;
【補充】腦裂指兩個節點同時認為自己是集群控制器;?
5)zk的作用
【1.3】復制
復制功能是kafka架構的核心;在kafka 文檔里,kafka把自己描述為 一個分布式的,可分區的,可復制的提交日志服務;(kakfa的日志就是數據或消息);
【1.3.1】副本
1)數據存儲
kafka使用主題來組織數據(邏輯);使用分區為單位來讀寫數據(物理);
為什么說kakfa以分區為單位讀寫? 是因為我們創建帶有分區數和副本數的主題后, kakfa會創建以這個分區命名的文件夾,分區文件夾下存儲消息內容,索引文件等;
?2)主題,分區,副本關系
- 1個主題對應多個分區;
- 1個分區對應多個副本;
- 1個副本對應多個分段文件;(分段存儲)?
3)副本類型?
- 3.1)首領副本:每個分區都有一個首領副本,消息讀寫首先會操作首領副本;
- 3.2)跟隨者副本:首領副本以外的副本;它們不處理讀寫請求,唯一任務是從首領副本復制消息,與首領保持數據同步;如果首領發生崩潰,其中一個同步的跟隨者副本被提升為首領副本;
補充1:跟隨者副本在成為不同步副本前的時間是通過 replica.lag.time.max.ms 來配置;
補充2:跟隨者從首領副本復制消息時的請求,與消費者從首領副本消費消息時發出的請求是一樣的;
【1.4】處理請求
1)broker處理請求過程
- step1)broker會在監聽端口上運行一個 Acceptor線程(可以理解為服務器套接字 ServerSocket),這個線程會創建一個連接(類似ServerSocket.accept() 方法),把請求交給 Processor線程(網絡線程)去處理;
- step2)Processor線程從客戶端獲取請求消息,把它放進請求隊列,然后從響應隊列獲取響應結果并發送給客戶端;
- step3) 在請求被放入請求隊列后, IO線程會處理它們,并把處理結果放入 響應隊列;
?2)常見請求類型
- 生產請求:生產者發送的請求,包含要寫入的消息;
- 獲取請求:消費者或跟隨者副本所在broker需要從首領副本所在broker獲取消息而發送的請求;
【注意】
3)客戶端怎么知道請求發送到哪里呢?
3.1)客戶端在發送請求前,先發送元數據請求;
- 這種請求的響應結果包括 主題,主題分區,分區副本以及首領副本;
3.2)客戶端會緩存這些元數據信息;
- 獲取元數據信息后,會直接往對應的 broker發送請求和獲取請求;
- 當然,客戶端需要定時刷新元數據緩存; 刷新時間間隔通過? metadata.max.age.ms 來配置;?
【1.4.1】生產請求
1)生產者acks有3個值;
- acks=0 ; 生產者在發送消息后,默認發送成功;而不會等待服務器響應;
- acks=1 ; 只要集群的首領節點收到消息,生產者就會收到發送成功的響應;而不管副本節點是否收到消息;
- acks=all; 需要集群的首領節點和跟隨節點(副本節點)都收到消息后,生產者才會收到發送成功的響應;
2)首領副本所在broker收到生產請求后,會對請求做一些驗證:
- 發送數據的用戶是否有寫入權限;
- acks的值是否合法; (只允許出現 0, 1, all);
- 根據acks的值,進行副本復制策略;
【1.4.2】獲取請求
1)首領副本所在broker收到獲取請求后,會根據客戶端指定的請求偏移量從分區里讀取消息;
2)kafka使用 零復制技術 向客戶端發送消息,即kafka直接把消息從文件發送到網絡通道,而不經過任何中間緩沖區;(干貨——這是kakfa與大部分數據庫不一樣的地方,其他數據庫在把數據發送到客戶端前,會把數據保存到本地緩存)
- 零復制技術優點:避免了字節復制,也不需要管理內存緩沖區,從而獲取更好性能;?
3)消費者客戶端只能讀取已經被寫入所有同步副本的消息,而不是所有消息
- 因為還沒有被足夠多副本復制的消息被認為是不安全的;如果首領副本所在broker發送崩潰,另一副本成為新首領,那這些不安全的消息就會丟失;
4)擴展 ISR, HW高水位
小結: 消費者只能看到已經復制所有副本的消息;
5)在 Kafka 中,高水位的作用主要有 2 個。
6)下面這張圖展示了多個與高水位相關的 Kafka 術語 。
我們假設這是某個分區 Leader副本的高水位圖。
1)首先,請你注意圖中的“已提交消息”和“未提交消息”。在分區高水位以下的消息被認為是已提交消息,反之就是未提交消息。消費者只能消費已提交消息,即圖中位移小于 8 的所有消息。另外,需要關注的是,位移值等于高水位的消息也屬于未提交消息。也就是說,高水位上的消息是不能被消費者消費的。
2)圖中還有一個日志末端位移的概念,即 Log End Offset,簡寫是 LEO。
它表示副本寫入下一條消息的位移值。注意,數字 15 所在的方框是虛線,這就說明,這個副本當前只有 15 條消息,位移值是從 0 到 14,下一條新消息的位移是 15。顯然,介于高水位和 LEO 之間的消息就屬于未提交消息。這也從側面告訴了我們一個重要的事實,那就是:同一個副本對象,其高水位值不會大于 LEO 值。
【高水位小結】高水位和 LEO 是副本對象的兩個重要屬性
?【1.4.3】其他請求
- OffsetCommitRequest, 偏移量提交請求;
- OffsetFetchRequest;
- ListOffsetsRequest;
【1.5】物理存儲
1)kafka的基本存儲單元是分區; 分區會在所屬broker上的kafka數據根目錄下新建名為分區名的文件夾,如 hello04-2(主題為hello04的2號分區文件夾),kafka數據根目錄由 server.properties 中的 log.dirs 來指定;
2)主題,分區,副本關系
- 1個主題對應多個分區;
- 1個分區對應多個副本;
- 1個副本對應多個分段文件;(分段存儲)?
【1.5.1】分區分配
1)創建指定分區和副本數的topic來做實驗
# 創建分區數3副本數2的主題 kafka-topics.sh --bootstrap-server centos201:9092 --create --topic hello11 --partitions 3 --replication-factor 2 # 副本數量必須小于等于broker數量,但分區數沒有這個限制;查看分區詳情
[root@centos201 hello04-1]# kafka-topics.sh --bootstrap-server centos201:9092 \ --describe --topic hello11 Topic: hello11 TopicId: IliU_BDeS8ycreLufxCMMw PartitionCount: 3 ReplicationFactor: 2 Configs: segment.bytes=1024Topic: hello11 Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3Topic: hello11 Partition: 1 Leader: 3 Replicas: 3,1 Isr: 3,1Topic: hello11 Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2查看具體存儲數據的文件夾,以broker1為例;?
根據topic詳情,我們知道 broker1 存儲了topic hello11的1號和2號分區; 且它是2號分區首領所在的broker ;
進入 broker1的kafka數據根目錄,
?
?進入其中一個分區文件夾查看? hello11-1 ,如下:
再查看分區文件夾前,我們先寫入10條消息; 指定topic hello11, 1號分區;
for (int i = 0; i < 10; i++) {Future<RecordMetadata> future = producer.send( new ProducerRecord<String, String>("hello11", 1,"", String.format("[%s] ", order++) + now + " > " + DataFactory.INSTANCE.genOneHundred()));try {System.out.println("[生產者] " + future.get().partition() + "-" + future.get().offset());} catch (Exception e) {e.printStackTrace();} }查看分區文件夾下的文件 ;
2) kafka的分段存儲;
因為在一個大文件里查找和刪除消息很耗時;所以把一個分區分成若干片段進行存儲;默認情況下,一個片段存儲1g數據,為了實驗,這里我修改為 1k,可以在 server.properties文件中設置 log.segment.bytes=1024 來實現;
3)kafka的稀疏索引
- kafka并沒有對每條消息建立索引,那樣太大了,而是采用稀疏索引(稀疏存儲)的方式,即一條索引記錄指向一個消息范圍;
例如: 索引值 1~100 指向 數據文件1.log中的消息1到消息100的消息范圍的起始地址;
refer2 Apache Kafka ;
當消費者指定消費某個offset記錄時, kafka集群通過二分查找從索引文件找出包含offset的索引值,通過索引值找到對應數據文件的起始地址,然后從起始地址開始順序讀取對應offset的消息;
【1.5.2】文件格式
1)kafka 使用零復制技術給消費者發送消息,避免了對生產者已經壓縮過的消息進行解壓和再壓縮;?
2)普通消息與壓縮消息格式 ?
?可以看出,多個壓縮消息共用同一個消息頭,從而減少消息大小;
【References】
總結
以上是生活随笔為你收集整理的kafka控制器,复制与存储小结的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 落地页网址怎么生成(落地页网址怎么生成的
- 下一篇: kafka可靠数据传递