日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka key的作用_kafka系列(kafka端到端原理分析)

發布時間:2025/3/15 编程问答 18 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka key的作用_kafka系列(kafka端到端原理分析) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

  • Kafka 端到端源碼解析
    • Kafka的場景
    • Kafka概念
    • Topic 創建與刪除
      • Topic狀態流轉
      • 一些問題
      • Topic分區初始化選擇
    • kafka producer解析
      • 1. 發送流程
      • 2. 分區選擇策略?
      • 3. 攔截器有什么作用?
      • 4. 關鍵數據結構
      • 5. 參數配置
      • 6. ACK機制
      • 7.一些問題
    • Kafka網絡接收層
      • Kafka channel
      • 如何做限流的
    • Kafka內存管理
      • 堆內存
      • 堆外內存
    • kafka 存儲層解析
      • 消息格式
      • 消息索引
      • 一些問題
    • 副本管理
      • failover機制
    • kafka Consumer解析
      • 0.8.2版本客戶端
      • 0.10版本客戶端
      • 一些問題
    • zookeeper的作用
      • zookeeper在kafka中的作用

Kafka 端到端源碼解析

Kafka的場景

Kafka概念

  • Broker
  • Topic
  • Partition 邏輯上最小的單元
  • Offset
  • LogSegment 文件存儲最小的單元
  • Producer 生產者
  • Consumer 消費者
  • Zookeeper 提供分布式協調服務
  • Controller 集群中的master
  • ISR(In-Sync-Replica) Topic分區的副本狀態
  • 腦裂 集群中出現了雙主,對于kafka來說是雙controller
  • 羊群效應 當zookeeper上一個znode節點發生變化時,所有監聽該節點的客戶端都會發生相應的動作

Topic 創建與刪除

zk注冊,controller選舉具體的數據結構與流程

Topic狀態流轉

創建、在線、增加分區、下線、刪除

Topic 一些問題

  • topic分區數可不可以減少?如果可以,為什么?不可以
  • Kafka 目前有哪些內部topic?分別的作用是什么?__consumer_offset 用來保存用戶groupId對應的消費topic offset

Topic分區初始化選擇

按照broker數量均勻地分布在每個broker上

Kafka Producer解析

1. 發送流程

  • 第一步: 刷新元數據
  • 第二步: 序列化、選擇分區、注冊攔截器回調函數
  • 第三步: 往RecordAccmulator發送數據
  • 第四步:判斷batch是否滿了,滿了的話喚醒send后臺線程 有可能的異常:API版本不匹配;Buffer耗盡等
  • 第五步 : send后臺線程退出時,掃尾工作

2. 分區選擇策略?

  • 若該消息內無指定分區,則使用消息中指定的key哈希生成的分區
  • 若key為null,則按照輪詢的方式生成分區
  • 最后一種,若仍然不滿足需求,用戶還可以自己指定partition分區策略類,每條消息都按照這個策略進行
    因此,分區策略可以有四個級別:用戶自定義分區策略類、key哈希、輪詢、任一消息選擇任一分區,總的來說給用戶很大的自由度。

3. 攔截器有什么作用?

在每次消息處理成功后增加一個回調函數,一般用來記錄一些統計信息,為每條消息增加其他字段等等。

4. 關鍵數據結構

RecordAccmulator數據結構的作用

的內部是如何運作的?這是個線程安全的數據結構

ConcurrentHashMap《TopicPartition,Batch隊列》

Batch隊列需要保證線程安全

有一個緩沖池bufferPool,每次開始是已經有batch在發,如果不存在則開辟batchSize大小的空間;然后往Batch隊列的append數據,并且使得offset+1,然后會生成一個FutureRecordMetadata,用來表示batch是否滿

消息在如何在客戶端存儲的
MemoryRecord 定義了一條消息在內存中的存儲,

傳輸到socketChannel

5. 參數配置

  • batch.size指的是大小,不是消息數
  • ling.ms是每隔該時間就定時發送
  • maxFlightPerConnection=1保證了消息在單分區內的順序性
  • 6. ACK機制

    代表對于消息可靠性的容忍度
    Ack=1 代表leader返回ack即可 Ack=-1 代表所有副本返回ack Ack=0代表不需要返回

    7. Producer一些問題

    • kafka 分區器、序列化器、攔截器之間的處理順序?序列化器、分區器、 攔截器(發送完成后才會調用)
    • 如何保證topic消息順序性?全局消息順序性:采用一個topic partition 單分區順序性: maxFlightPerConnection=1
    • 性能調優問題?
    • 數據壓縮問題?
    • 數據冪等性?
      kafka 0.11版本之后提供了producer的冪等性
    • kafka 生產者客戶端用了幾個線程
      sender線程、producer主線程、

    Kafka網絡接收層

    Kafka channel

    如何做限流的?

    圖中展示了通用的限流算法

    server/ClientQuatoManager負責進行流量控制

    如何做數據安全的?

    Kafka內存管理

    堆外內存

    堆外內存主要用在kafka consumer中,一般為了提高I/O效率,都采用NIO的方式讀取文件,而讀取后的數據都保存在ByteBuffer數據結構中,ByteBuffer封裝了堆外內存的引用。 ByteBufferMessageSet 解讀

    kafka 存儲層解析

    存儲層是利用本地文件系統的文件來存儲的,首先每個topic對應N個分區,每個分區對應有三類文件(log文件、index文件與timeindex文件)。Log文件以每條二進制序列化后的消息為基本單位存儲消息,每條消息的基本格式如下表格,而log文件分為很多個logsegment,每個segment的大小是一樣的,例如1GB,三個文件的名字為文件中第一個消息的offset數值。

    消息格式(V1版本)

    filed | size | desciption

    ------ | ------ | ------ |

    offset | 8 B | 偏移量

    message size | 4 B | 消息大小

    crc32 | 4 B | crc校驗碼

    magic | 1B | Api的版本

    timestamp | 8 B | 消息時間戳

    attributes | 1 B | 屬性?

    key length | 4 B | key的長度

    key | | key的消息體

    value length | 4B | value長度

    value | | 消息體長度

    消息索引

  • 給定時間戳—>定位某個LogSegment—>定位offset—>定位消息位置?
    根據時間戳查找offset,先順序定位到LogSegment(找到第一個大于該時間戳的LogSegment),然后timeindex內部二分查找定位到offset
  • 給定offset—> 定位到某個LogSegment—>定位消息位置 ?
    根據offset,跳表中定位到LogSegment,然后index內部二分查找定位到offset位置,再順序搜索定位到文件位置
  • 刷盤策略

    kafka是異步刷盤的,有后臺線程專程將內存中的數據寫入到磁盤中 index 文件通過mmap從磁盤映射到用戶空間內存中,log文件則是普通的讀取文件。

    日志清理與Compaction

    流程與數據結構

    一些問題

    • 談談你對頁緩存、內核層、塊層、設備層的理解
      內核層 :操作系統中的內存數據與用戶態buffer進行相互拷貝
      pagecache : 文件讀到操作系統內存中,操作系統的內存管理系統會預讀
      塊層:管理設備I/O隊列,對I/O請求進行合并、排序等 設備層:通過DMA與內存直接交互,將數據寫到磁盤

    副本管理

    為什么用ISR,不用Raft之類的協議?借鑒了PacificA算法協議。 兩個重要的組件:配置管理(對應kafka ISR,leader epoch commited_point)
    HighWaterMark的作用:commited 消息度量;讀可見性== 參考

    Failover機制

    • 若unclean.leader.election.enable為true,再去replica中去找存活的broker。而ISR中的broker存在是這樣:只有當follower從leader拉取數據跟得上leader的數據速度時,才會在ISR中,否則,被剔除掉ISR列表中。
    • 若unclean.leader.election.enable為false,拋出異常

    為什么會有unclean.leader.election.enable這個參數呢?

    那么數據一致性是如何保證的呢,如何知道副本的狀態是可靠的?ISR就保存了kafka認為可靠的副本,它們具備這樣的條件:1 . 落后leader的消息條數在一定閾值內 2.或者落后在一定時間內; 但是,follower的復制狀態誰又能保證一定能跟得上leader呢?這樣,就存在著一種可能性,有可能ISR中只有leader,其他的副本都跟不上leader; 因此,這個時候,patition到底可用不可用?這就是一個權衡了,若只從ISR中獲取leader,保證了數據的可靠性,但partition就不可用了,若從replica中獲取,則可用性增強,但是數據可能存在丟失情況。 因此unclean.leader.election.enable這個參數設計為true,則保證了可用性,也就是CAP中的A P;設置為false,則保證了數據一致性,也就是CAP中的CP

    kafka Consumer解析

    推拉模型

    推 拉

    0.8.2版本客戶端

    0.10版本客戶端

    一些問題

    • kafka 如何做到不重復消費?
      現有的kafka可以做到寫冪等性(0.11版本之后),但是做不到消費冪等性。消費完后寫offset到zk失敗,這個狀態consumer客戶端是感知不到的,二者并沒有類似TCP的ack機制。因此下一次還是會從上次提交的offset繼續讀,就會出現重復消費。我個人覺得解決這個問題可以從兩個方向來考慮:應用端做消費冪等性處理,也即每條消息會有一個全局的key,應用端保存消費過消息的key,每次新消費一條數據,key做重復判斷,若重復,則丟棄這條數據。當然這會帶來額外的內存與查詢開銷。
      同樣,應用端也就是consumer端需要消息處理和offset提交這兩步是事務的,也即要么操作成功要么撤回恢復之前的狀態。這需要應用端有事務保障,但往往很多應用端是不支持事務的,比如kafka數據落盤hdfs,kafka數據消費完寫入本地文件等等。但官方給的kafka consumer-process-kafka 給出了一個不錯的參考的例子和思路。基本上遵循了分布式系統中的兩階段提交想法和思路,具體可以參見

    個人理解重復消費出現的概率并不會很高,在服務端改進會帶來很大的性能損耗,這可能是為什么大家都選擇不處理的重要原因吧。另外,本身系統與系統之間傳輸數據,很難做到消息的exactly once的。無論是kafka到存儲系統hdfs還是spark flink下游計算系統等。若數據傳輸都在一個系統之內,那相對好處理一些,比如kafka的事務,保證了consume-process-producer的事務場景,也就是從kafka消費處理完畢后再到kafka,這個可以做到exactly once。

    zookeeper的作用

    zookeeper在kafka中的作用

  • controller選舉,所有的broker在zk /controller下注冊臨時節點,任意一個搶先的broker注冊成功,則為controller
  • kafka consumer負載均衡
  • 集群節點存活狀態監測
  • topic創建觸發
  • broker上線、下線的通知
  • ISR配置變更
  • 總結

    以上是生活随笔為你收集整理的kafka key的作用_kafka系列(kafka端到端原理分析)的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。