日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

4.2.4 Kafka高级特性解析(物理存储、稳定性:事物,控制器,可靠性,一致性,_consumer_offsets、延时队列、自定义重试队列)

發(fā)布時(shí)間:2024/3/13 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 4.2.4 Kafka高级特性解析(物理存储、稳定性:事物,控制器,可靠性,一致性,_consumer_offsets、延时队列、自定义重试队列) 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

Kafka高級特性解析


文章目錄

  • Kafka高級特性解析
  • 2.5 物理存儲(chǔ)
    • 2.5.1 日志存儲(chǔ)概述
    • 2.5.2 日志存儲(chǔ)
      • 2.5.2.1 索引
        • 2.5.2.1.1 偏移量
        • 2.5.2.1.2 時(shí)間戳
      • 2.5.2.2 清理
        • 2.5.2.2.1 日志刪除
        • 2.5.2.2.2 日志壓縮策略
    • 2.5.3 磁盤存儲(chǔ)
      • 2.5.3.1 零拷貝
      • 2.5.3.2 頁緩存
      • 2.5.3.3 順序?qū)懭?/li>
  • 2.6 穩(wěn)定性
    • 2.6.1 事務(wù)
        • 2.6.1.1 冪等性
        • 2.6.1.2 事務(wù)操作
    • 2.6.2 控制器
      • 2.6.2.1 broker選舉
    • 2.6.3 可靠性保證
      • 2.6.3.1 失效副本
      • 2.6.3.2 副本復(fù)制
    • 2.6.4 一致性保證
      • **一、概念**
      • 二、Follower副本何時(shí)更新LEO
      • 三、Follower副本何時(shí)更新HW
      • 四、Leader副本何時(shí)更新LEO
      • 五、Leader副本何時(shí)更新HW值
      • 六、HW和LEO正常更新案例
      • 七、HW和LEO異常案例
      • 八、Leader Epoch使用
        • Kafka解決方案
        • 規(guī)避數(shù)據(jù)丟失
        • 規(guī)避數(shù)據(jù)不一致
    • 2.6.5 消息重復(fù)的場景及解決方案
      • 2.6.5.1 生產(chǎn)者階段重復(fù)場景
        • 2.6.5.1.1 根本原因
        • 2.6.5.1.2 重試過程
        • 2.6.5.1.3 可恢復(fù)異常說明
        • 2.6.5.1.4 記錄順序問題
      • 2.6.5.2 生產(chǎn)者發(fā)送重復(fù)解決方案
        • 2.6.5.2.1 啟動(dòng)kafka的冪等性
        • 2.6.5.2.2 ack=0,不重試。
      • 2.6.5.3 生產(chǎn)者和broke階段消息丟失場景
        • 2.6.5.3.1 ack=0,不重試
        • 2.6.5.3.2 ack=1,leader crash
        • 2.6.5.3.3 unclean.leader.election.enable 配置true
      • 2.6.5.4 解決生產(chǎn)者和broke階段消息丟失
        • 2.6.5.4.1 禁用unclean選舉,ack=all
        • 2.6.5.4.2 配置:min.insync.replicas > 1
        • 2.6.5.4.3 失敗的offset單獨(dú)記錄
      • 2.6.5.5 消費(fèi)者數(shù)據(jù)重復(fù)場景及解決方案
        • 2.6.5.5.1 根本原因
        • 2.6.5.5.2 場景
      • 2.6.5.6 解決方案
        • 2.6.5.6.1 取消自動(dòng)提交
        • 2.6.5.6.2 下游做冪等
    • 2.6.6 __consumer_offsets
  • 2.7 延時(shí)隊(duì)列
  • 2.8 重試隊(duì)列
    • 自定義實(shí)現(xiàn)步驟
    • 代碼實(shí)現(xiàn)


2.5 物理存儲(chǔ)

2.5.1 日志存儲(chǔ)概述

Kafka 消息是以主題為單位進(jìn)行歸類,各個(gè)主題之間是彼此獨(dú)立的,互不影響。
每個(gè)主題又可以分為一個(gè)或多個(gè)分區(qū)。
每個(gè)分區(qū)各自存在一個(gè)記錄消息數(shù)據(jù)的日志文件。

圖中,創(chuàng)建了一個(gè) tp_demo_01 主題,其存在6個(gè) Parition,對應(yīng)的每個(gè)Parition下存在一個(gè)[Topic-Parition] 命名的消息日志文件。在理想情況下,數(shù)據(jù)流量分?jǐn)偟礁鱾€(gè) Parition 中,實(shí)現(xiàn)了負(fù)載均衡的效果。在分區(qū)日志文件中,你會(huì)發(fā)現(xiàn)很多類型的文件,比如: .index、.timestamp、.log、.snapshot 等。

其中,文件名一致的文件集合就稱為 LogSement

LogSegment

  • 分區(qū)日志文件中包含很多的 LogSegment
  • Kafka 日志追加是順序?qū)懭氲?/li>
  • LogSegment 可以減小日志文件的大小
  • 進(jìn)行日志刪除的時(shí)候和數(shù)據(jù)查找的時(shí)候可以快速定位。
  • ActiveLogSegment 是活躍的日志分段,擁有文件擁有寫入權(quán)限,其余的 LogSegment 只有只讀的權(quán)限。
  • 日志文件存在多種后綴文件,重點(diǎn)需要關(guān)注 .index、.timestamp、.log 三種類型。
    類別作用

    每個(gè) LogSegment 都有一個(gè)基準(zhǔn)偏移量,表示當(dāng)前 LogSegment 中第一條消息的 offset。
    偏移量是一個(gè) 64 位的長整形數(shù),固定是20位數(shù)字,長度未達(dá)到,用 0 進(jìn)行填補(bǔ),索引文件和日志文件都由該作為文件名命名規(guī)則(00000000000000000000.index、00000000000000000000.timestamp、00000000000000000000.log)。
    如果日志文件名為 00000000000000000121.log ,則當(dāng)前日志文件的一條數(shù)據(jù)偏移量就是121(偏移量從 0 開始)。

    日志與索引文件

    配置項(xiàng)默認(rèn)值說明
    偏移量索引文件用于記錄消息偏移量與物理地址之間的映射關(guān)系。

    時(shí)間戳索引文件則根據(jù)時(shí)間戳查找對應(yīng)的偏移量。

    Kafka 中的索引文件是以稀疏索引的方式構(gòu)造消息的索引,并不保證每一個(gè)消息在索引文件中都有對應(yīng)的索引項(xiàng)。

    每當(dāng)寫入一定量的消息時(shí),偏移量索引文件和時(shí)間戳索引文件分別增加一個(gè)偏移量索引項(xiàng)和時(shí)間戳索引項(xiàng)。

    通過修改 log.index.interval.bytes 的值,改變索引項(xiàng)的密度。

    切分文件
    當(dāng)滿足如下幾個(gè)條件中的其中之一,就會(huì)觸發(fā)文件的切分:

  • 當(dāng)前日志分段文件的大小超過了 broker 端參數(shù) log.segment.bytes 配置的值。
    log.segment.bytes 參數(shù)的默認(rèn)值為 1073741824,即 1GB。
  • 當(dāng)前日志分段中消息的最大時(shí)間戳與當(dāng)前系統(tǒng)的時(shí)間戳的差值大于 log.roll.ms 或 log.roll.hours 參數(shù)配置的值。如果同時(shí)配置了log.roll.ms 和 log.roll.hours 參數(shù),那么 log.roll.ms 的優(yōu)先級高。默認(rèn)情況下,只配置了 log.roll.hours 參數(shù),其值為168,即 7 天。
  • 偏移量索引文件或時(shí)間戳索引文件的大小達(dá)到 broker 端參數(shù) log.index.size.max.bytes配置的值。 log.index.size.max.bytes 的默認(rèn)值為 10485760,即 10MB。
  • 追加的消息的偏移量與當(dāng)前日志分段的偏移量之間的差值大于 Integer.MAX_VALUE ,即要追加的消息的偏移量不能轉(zhuǎn)變?yōu)橄鄬ζ屏俊?/li>

    為什么是 Integer.MAX_VALUE ?
    1024 * 1024 * 1024=1073741824
    在偏移量索引文件中,每個(gè)索引項(xiàng)共占用 8 個(gè)字節(jié),并分為兩部分。

    相對偏移量和物理地址。
    相對偏移量:表示消息相對與基準(zhǔn)偏移量的偏移量,占 4 個(gè)字節(jié)
    物理地址:消息在日志分段文件中對應(yīng)的物理位置,也占 4 個(gè)字節(jié)
    4 個(gè)字節(jié)剛好對應(yīng) Integer.MAX_VALUE ,如果大于Integer.MAX_VALUE ,則不能用 4 個(gè)字節(jié)進(jìn)行表示了。

    索引文件切分過程
    索引文件會(huì)根據(jù) log.index.size.max.bytes 值進(jìn)行預(yù)先分配空間,即文件創(chuàng)建的時(shí)候就是最大值
    當(dāng)真正的進(jìn)行索引文件切分的時(shí)候,才會(huì)將其裁剪到實(shí)際數(shù)據(jù)大小的文件。
    這一點(diǎn)是跟日志文件有所區(qū)別的地方。其意義降低了代碼邏輯的復(fù)雜性。

    2.5.2 日志存儲(chǔ)

    2.5.2.1 索引

    偏移量索引文件用于記錄消息偏移量與物理地址之間的映射關(guān)系。時(shí)間戳索引文件則根據(jù)時(shí)間戳查找對應(yīng)的偏移量。

    文件:
    查看一個(gè)topic分區(qū)目錄下的內(nèi)容,發(fā)現(xiàn)有l(wèi)og、index和timeindex三個(gè)文件:

  • log文件名是以文件中第一條message的offset來命名的,實(shí)際offset長度是64位,但是這里只使用了20位,應(yīng)付生產(chǎn)是足夠的。
  • 一組index+log+timeindex文件的名字是一樣的,并且log文件默認(rèn)寫滿1G后,會(huì)進(jìn)行l(wèi)ogrolling形成一個(gè)新的組合來記錄消息,這個(gè)是通過broker端 log.segment.bytes =1073741824指定的。
  • index和timeindex在剛使用時(shí)會(huì)分配10M的大小,當(dāng)進(jìn)行 log rolling 后,它會(huì)修剪為實(shí)際的大小。

    1、創(chuàng)建主題:
  • [root@linux121 ~]# kafka-topics.sh --zookeeper linux121:2181/myKafka --create --topic tp_demo_05 --partitions 1 --replication-factor 1 --config segment.bytes=104857600

    2、創(chuàng)建消息文件:

    [root@linux121 ~]# for i in `seq 10000000`; do echo "hello lagou $i" >> nmm.txt; done


    3、將文本消息生產(chǎn)到主題中:

    [root@linux121 ~]# kafka-console-producer.sh --broker-list linux121:9092 --topic tp_demo_05 <nmm.txt

    4、查看存儲(chǔ)文件:

    [root@linux121 ~]# cd /var/lagou/kafka/kafka-logs/cd tp_demo_05-0 [root@linux121 ~]# ll


    如果想查看這些文件,可以使用kafka提供的shell來完成,幾個(gè)關(guān)鍵信息如下:
    (1)offset是逐漸增加的整數(shù),每個(gè)offset對應(yīng)一個(gè)消息的偏移量。
    (2)position:消息批字節(jié)數(shù),用于計(jì)算物理地址。
    (3)CreateTime:時(shí)間戳。
    (4)magic:2代表這個(gè)消息類型是V2,如果是0則代表是V0類型,1代表V1類型。
    (5)compresscodec:None說明沒有指定壓縮類型,kafka目前提供了4種可選擇,0-None、1-
    GZIP、2-snappy、3-lz4。 (6)crc:對所有字段進(jìn)行校驗(yàn)后的crc值。

    [root@linux121 tp_demo_05-0]# kafka-run-class.sh kafka.tools.DumpLogSegments - -files 00000000000000000000.log --print-data-log | head


    關(guān)于消息偏移量:
    一、消息存儲(chǔ)

  • 消息內(nèi)容保存在log日志文件中。
  • 消息封裝為Record,追加到log日志文件末尾,采用的是順序?qū)懩J健?/li>
  • 一個(gè)topic的不同分區(qū),可認(rèn)為是queue,順序?qū)懭虢邮盏降南ⅰ?br />
    消費(fèi)者有offset。下圖中,消費(fèi)者A消費(fèi)的offset是9,消費(fèi)者B消費(fèi)的offset是11,不同的消費(fèi)者offset是交給一個(gè)內(nèi)部公共topic來記錄的。

    (3)時(shí)間戳索引文件,它的作用是可以讓用戶查詢某個(gè)時(shí)間段內(nèi)的消息,它一條數(shù)據(jù)的結(jié)構(gòu)是時(shí)間戳(8byte)+相對offset(4byte),如果要使用這個(gè)索引文件,首先需要通過時(shí)間范圍,找到對應(yīng)的相對offset,然后再去對應(yīng)的index文件找到position信息,然后才能遍歷log文件,它也是需要使用上面說的index文件的。
    但是由于producer生產(chǎn)消息可以指定消息的時(shí)間戳,這可能將導(dǎo)致消息的時(shí)間戳不一定有先后順序,因此盡量不要生產(chǎn)消息時(shí)指定時(shí)間戳。
  • 2.5.2.1.1 偏移量

  • 位置索引保存在index文件中
  • log日志默認(rèn)每寫入4K(log.index.interval.bytes設(shè)定的),會(huì)寫入一條索引信息到index文件中,因此索引文件是稀疏索引,它不會(huì)為每條日志都建立索引信息。
  • log文件中的日志,是順序?qū)懭氲?#xff0c;由message+實(shí)際offset+position組成
  • 索引文件的數(shù)據(jù)結(jié)構(gòu)則是由相對offset(4byte)+position(4byte)組成,由于保存的是相對第一個(gè)消息的相對offset,只需要4byte就可以了,可以節(jié)省空間,在實(shí)際查找后還需要計(jì)算回實(shí)際的offset,這對用戶是透明的。
  • 稀疏索引,索引密度不高,但是offset有序,二分查找的時(shí)間復(fù)雜度為O(lgN),如果從頭遍歷時(shí)間復(fù)雜度是O(N)。
    示意圖如下:

    偏移量索引由相對偏移量和物理地址組成。

    可以通過如下命令解析 .index 文件

    kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.index --print-data-log | head

    注意:offset 與 position 沒有直接關(guān)系,因?yàn)闀?huì)刪除數(shù)據(jù)和清理日志。

    [root@node1 tp_demo_05-0]# kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000003925423.log --print-data-log | head


    在偏移量索引文件中,索引數(shù)據(jù)都是順序記錄 offset ,但時(shí)間戳索引文件中每個(gè)追加的索引時(shí)間戳必須大于之前追加的索引項(xiàng),否則不予追加。在 Kafka 0.11.0.0 以后,消息元數(shù)據(jù)中存在若干的時(shí)間戳信息。如果 broker 端參數(shù) log.message.timestamp.type 設(shè)置為LogAppendTIme ,那么時(shí)間戳必定能保持單調(diào)增長。反之如果是CreateTime 則無法保證順序。
    注意:timestamp文件中的 offset 與 index 文件中的 relativeOffset 不是一一對應(yīng)的。因?yàn)閿?shù)據(jù)的寫入是各自追加。

    思考:如何查看偏移量為23的消息?
    Kafka 中存在一個(gè) ConcurrentSkipListMap 來保存在每個(gè)日志分段,通過跳躍表方式,定位到在00000000000000000000.index ,通過二分法在偏移量索引文件中找到不大于 23 的最大索引項(xiàng),即offset 20 那欄,然后從日志分段文件中的物理位置為320 開始順序查找偏移量為 23 的消息。

    2.5.2.1.2 時(shí)間戳

    在偏移量索引文件中,索引數(shù)據(jù)都是順序記錄 offset ,但時(shí)間戳索引文件中每個(gè)追加的索引時(shí)間戳必須大于之前追加的索引項(xiàng),否則不予追加。在 Kafka 0.11.0.0 以后,消息信息中存在若干的時(shí)間戳信息。如果 broker 端參數(shù) log.message.timestamp.type 設(shè)置為LogAppendTIme ,那么時(shí)間戳必定能保持單調(diào)增長。反之如果是 CreateTime 則無法保證順序。

    通過時(shí)間戳方式進(jìn)行查找消息,需要通過查找時(shí)間戳索引和偏移量索引兩個(gè)文件

    時(shí)間戳索引索引格式:前八個(gè)字節(jié)表示時(shí)間戳,后四個(gè)字節(jié)表示偏移量


    思考:查找時(shí)間戳為 1557554753430 開始的消息?

  • 查找該時(shí)間戳應(yīng)該在哪個(gè)日志分段中。將1557554753430和每個(gè)日志分段中最大時(shí)間戳largestTimeStamp逐一對比,直到找到不小于1557554753430所對應(yīng)的日志分段。日志分段中的largestTimeStamp的計(jì)算是:先查詢該日志分段所對應(yīng)時(shí)間戳索引文件,找到最后一條索引項(xiàng),若最后一條索引項(xiàng)的時(shí)間戳字段值大于0,則取該值,否則取該日志分段的最近修改時(shí)間。
  • 查找該日志分段的偏移量索引文件,查找該偏移量對應(yīng)的物理地址。
  • 日志文件中從 320 的物理位置開始查找不小于 1557554753430 數(shù)據(jù)。
    注意:timestamp文件中的 offset 與 index 文件中的 relativeOffset 不是一一對應(yīng)的,因?yàn)閿?shù)據(jù)的寫入是各自追加。
  • 2.5.2.2 清理

    Kafka 提供兩種日志清理策略:
    日志刪除:按照一定的刪除策略,將不滿足條件的數(shù)據(jù)進(jìn)行數(shù)據(jù)刪除
    日志壓縮:針對每個(gè)消息的 Key 進(jìn)行整合,對于有相同 Key 的不同 Value 值,只保留最后一個(gè)版本。
    Kafka 提供 log.cleanup.policy 參數(shù)進(jìn)行相應(yīng)配置,默認(rèn)值: delete ,還可以選擇compact 。
    主題級別的配置項(xiàng)是 cleanup.policy 。

    2.5.2.2.1 日志刪除

    基于時(shí)間
    日志刪除任務(wù)會(huì)根據(jù) log.retention.hours/log.retention.minutes/log.retention.ms 設(shè)
    定日志保留的時(shí)間節(jié)點(diǎn)。如果超過該設(shè)定值,就需要進(jìn)行刪除。默認(rèn)是 7 天, log.retention.ms 優(yōu)先級最高。

    Kafka 依據(jù)日志分段中最大的時(shí)間戳進(jìn)行定位。

    首先要查詢該日志分段所對應(yīng)的時(shí)間戳索引文件,查找時(shí)間戳索引文件中最后一條索引項(xiàng),若最后一條索引項(xiàng)的時(shí)間戳字段值大于 0,則取該值,否則取最近修改時(shí)間。

    為什么不直接選最近修改時(shí)間呢?
    因?yàn)槿罩疚募梢杂幸鉄o意的被修改,并不能真實(shí)的反應(yīng)日志分段的最大時(shí)間信息。

    刪除過程

  • 從日志對象中所維護(hù)日志分段的跳躍表中移除待刪除的日志分段,保證沒有線程對這些日志分段進(jìn)行讀取操作。
  • 這些日志分段所有文件添加 上 .delete 后綴。
  • 交由一個(gè)以 “delete-file” 命名的延遲任務(wù)來刪除這些 .delete 為后綴的文件。延遲執(zhí)行時(shí)間可以通過 file.delete.delay.ms 進(jìn)行設(shè)置
  • 如果活躍的日志分段中也存在需要?jiǎng)h除的數(shù)據(jù)時(shí)?
    Kafka 會(huì)先切分出一個(gè)新的日志分段作為活躍日志分段,該日志分段不刪除,刪除原來的日志分段。
    先騰出地方,再刪除。

    基于日志大小
    日志刪除任務(wù)會(huì)檢查當(dāng)前日志的大小是否超過設(shè)定值。設(shè)定項(xiàng)為 log.retention.bytes ,單個(gè)日志分段的大小由 log.segment.bytes 進(jìn)行設(shè)定。
    刪除過程

  • 計(jì)算需要被刪除的日志總大小 (當(dāng)前日志文件大小(所有分段)減去retention值)。
  • 從日志文件第一個(gè) LogSegment 開始查找可刪除的日志分段的文件集合。
  • 執(zhí)行刪除。
  • 基于偏移量
    根據(jù)日志分段的下一個(gè)日志分段的起始偏移量是否大于等于日志文件的起始偏移量,若是,則可以刪除此日志分段。
    注意:日志文件的起始偏移量并不一定等于第一個(gè)日志分段的基準(zhǔn)偏移量,存在數(shù)據(jù)刪除,可能與之相等的那條數(shù)據(jù)已經(jīng)被刪除了。

    刪除過程

  • 從頭開始遍歷每個(gè)日志分段,日志分段1的下一個(gè)日志分段的起始偏移量為21,小于logStartOffset,將日志分段1加入到刪除隊(duì)列中
  • 日志分段 2 的下一個(gè)日志分段的起始偏移量為35,小于 logStartOffset,將 日志分段 2 加入到刪除隊(duì)列中
  • 日志分段 3 的下一個(gè)日志分段的起始偏移量為57,小于logStartOffset,將日志分段3加入刪除集合中
  • 日志分段4的下一個(gè)日志分段的其實(shí)偏移量為71,大于logStartOffset,則不進(jìn)行刪除。刪除過程
  • 2.5.2.2.2 日志壓縮策略

  • 概念
    日志壓縮是Kafka的一種機(jī)制,可以提供較為細(xì)粒度的記錄保留,而不是基于粗粒度的基于時(shí)間的
    保留。
    對于具有相同的Key,而數(shù)據(jù)不同,只保留最后一條數(shù)據(jù),前面的數(shù)據(jù)在合適的情況下刪除。
  • 應(yīng)用場景
    日志壓縮特性,就實(shí)時(shí)計(jì)算來說,可以在異常容災(zāi)方面有很好的應(yīng)用途徑。比如,我們在Spark、Flink中做實(shí)時(shí)計(jì)算時(shí),需要長期在內(nèi)存里面維護(hù)一些數(shù)據(jù),這些數(shù)據(jù)可能是通過聚合了一天或者一周的
    日志得到的,這些數(shù)據(jù)一旦由于異常因素(內(nèi)存、網(wǎng)絡(luò)、磁盤等)崩潰了,從頭開始計(jì)算需要很長的時(shí)間。一個(gè)比較有效可行的方式就是定時(shí)將內(nèi)存里的數(shù)據(jù)備份到外部存儲(chǔ)介質(zhì)中,當(dāng)崩潰出現(xiàn)時(shí),再從外
    部存儲(chǔ)介質(zhì)中恢復(fù)并繼續(xù)計(jì)算。
  • 使用日志壓縮來替代這些外部存儲(chǔ)有哪些優(yōu)勢及好處呢?這里為大家
    列舉并總結(jié)了幾點(diǎn):

    • Kafka即是數(shù)據(jù)源又是存儲(chǔ)工具,可以簡化技術(shù)棧,降低維護(hù)成本
    • 使用外部存儲(chǔ)介質(zhì)的話,需要將存儲(chǔ)的Key記錄下來,恢復(fù)的時(shí)候再使用這些Key將數(shù)據(jù)取回,實(shí)現(xiàn)起來有一定的工程難度和復(fù)雜度。使用Kafka的日志壓縮特性,只需要把數(shù)據(jù)寫進(jìn)Kafka,等異常出現(xiàn)恢復(fù)任務(wù)時(shí)再讀回到內(nèi)存就可以了
    • Kafka對于磁盤的讀寫做了大量的優(yōu)化工作,比如磁盤順序讀寫。相對于外部存儲(chǔ)介質(zhì)沒有索引查詢等工作量的負(fù)擔(dān),可以實(shí)現(xiàn)高性能。同時(shí),Kafka的日志壓縮機(jī)制可以充分利用廉價(jià)的磁盤,不用依賴昂貴的內(nèi)存來處理,在性能相似的情況下,實(shí)現(xiàn)非常高的性價(jià)比(這個(gè)觀點(diǎn)僅僅針對于異常處理和容災(zāi)的場景來說)

    3 日志壓縮方式的實(shí)現(xiàn)細(xì)節(jié)
    主題的 cleanup.policy 需要設(shè)置為compact。
    Kafka的后臺(tái)線程會(huì)定時(shí)將Topic遍歷兩次:

  • 記錄每個(gè)key的hash值最后一次出現(xiàn)的偏移量
  • 第二次檢查每個(gè)offset對應(yīng)的Key是否在后面的日志中出現(xiàn)過,如果出現(xiàn)了就刪除對應(yīng)的日志。
  • 日志壓縮允許刪除,除最后一個(gè)key之外,刪除先前出現(xiàn)的所有該key對應(yīng)的記錄。在一段時(shí)間后從日志中清理,以釋放空間。

    注意:日志壓縮與key有關(guān),確保每個(gè)消息的key不為null。

    壓縮是在Kafka后臺(tái)通過定時(shí)重新打開Segment來完成的,Segment的壓縮細(xì)節(jié)如下圖所示:

    日志壓縮可以確保:

  • 任何保持在日志頭部以內(nèi)的使用者都將看到所寫的每條消息,這些消息將具有順序偏移量。可以使用Topic的min.compaction.lag.ms屬性來保證消息在被壓縮之前必須經(jīng)過的最短時(shí)間。也就是說,它為每個(gè)消息在(未壓縮)頭部停留的時(shí)間提供了一個(gè)下限。可以使用Topic的max.compaction.lag.ms屬性來保證從收到消息到消息符合壓縮條件之間的最大延時(shí)
    • 消息始終保持順序,壓縮永遠(yuǎn)不會(huì)重新排序消息,只是刪除一些而已
    • 消息的偏移量永遠(yuǎn)不會(huì)改變,它是日志中位置的永久標(biāo)識(shí)符
    • 從日志開始的任何使用者將至少看到所有記錄的最終狀態(tài),按記錄的順序?qū)懭搿A硗?#xff0c;如果使用者在比Topic的log.cleaner.delete.retention.ms短的時(shí)間內(nèi)到達(dá)日志的頭部,則會(huì)看到已刪除記錄的所有delete標(biāo)記。保留時(shí)間默認(rèn)是24小時(shí)。

    默認(rèn)情況下,啟動(dòng)日志清理器,若需要啟動(dòng)特定Topic的日志清理,請?zhí)砑犹囟ǖ膶傩浴E渲萌罩厩謇砥?#xff0c;這里為大家總結(jié)了以下幾點(diǎn):
    1) log.cleanup.policy 設(shè)置為 compact ,Broker的配置,影響集群中所有的Topic。
    2) log.cleaner.min.compaction.lag.ms ,用于防止對更新超過最小消息進(jìn)行壓縮,如果沒有設(shè)置,除最后一個(gè)Segment之外,所有Segment都有資格進(jìn)行壓縮

    • log.cleaner.max.compaction.lag.ms ,用于防止低生產(chǎn)速率的日志在無限制的時(shí)間內(nèi)不壓縮。

    Kafka的日志壓縮原理并不復(fù)雜,就是定時(shí)把所有的日志讀取兩遍,寫一遍,而CPU的速度超過磁盤完全不是問題,只要日志的量對應(yīng)的讀取兩遍和寫入一遍的時(shí)間在可接受的范圍內(nèi),那么它的性能就是可以接受的。

    2.5.3 磁盤存儲(chǔ)

    2.5.3.1 零拷貝

    kafka高性能,是多方面協(xié)同的結(jié)果,包括宏觀架構(gòu)、分布式partition存儲(chǔ)、ISR數(shù)據(jù)同步、以及“無所不用其極”的高效利用磁盤/操作系統(tǒng)特性。
    零拷貝并不是不需要拷貝,而是減少不必要的拷貝次數(shù)。通常是說在IO讀寫過程中。
    nginx的高性能也有零拷貝的身影。

    傳統(tǒng)IO
    比如:讀取文件,socket發(fā)送
    傳統(tǒng)方式實(shí)現(xiàn):先讀取、再發(fā)送,實(shí)際經(jīng)過1~4四次copy。

    buffer = File.read Socket.send(buffer)

    1、第一次:將磁盤文件,讀取到操作系統(tǒng)內(nèi)核緩沖區(qū);
    2、第二次:將內(nèi)核緩沖區(qū)的數(shù)據(jù),copy到application應(yīng)用程序的buffer;
    3、第三步:將application應(yīng)用程序buffer中的數(shù)據(jù),copy到socket網(wǎng)絡(luò)發(fā)送緩沖區(qū)(屬于操作系統(tǒng)內(nèi)核的緩沖區(qū));
    4、第四次:將socket buffer的數(shù)據(jù),copy到網(wǎng)絡(luò)協(xié)議棧,由網(wǎng)卡進(jìn)行網(wǎng)絡(luò)傳輸。

    實(shí)際IO讀寫,需要進(jìn)行IO中斷,需要CPU響應(yīng)中斷(內(nèi)核態(tài)到用戶態(tài)轉(zhuǎn)換),盡管引入**DMA(DirectMemory Access,直接存儲(chǔ)器訪問)**來接管CPU的中斷請求,但四次copy是存在“不必要的拷貝”的。
    實(shí)際上并不需要第二個(gè)和第三個(gè)數(shù)據(jù)副本。數(shù)據(jù)可以直接從讀緩沖區(qū)傳輸?shù)教捉幼志彌_區(qū)。

    kafka的兩個(gè)過程:
    1、網(wǎng)絡(luò)數(shù)據(jù)持久化到磁盤 (Producer 到 Broker)
    2、磁盤文件通過網(wǎng)絡(luò)發(fā)送(Broker 到 Consumer)

    數(shù)據(jù)落盤通常都是非實(shí)時(shí)的,Kafka的數(shù)據(jù)并不是實(shí)時(shí)的寫入硬盤,它充分利用了現(xiàn)代操作系統(tǒng)分頁存儲(chǔ)來利用內(nèi)存提高I/O效率。

    磁盤文件通過網(wǎng)絡(luò)發(fā)送(Broker 到 Consumer)

    磁盤數(shù)據(jù)通過**DMA(Direct Memory Access,直接存儲(chǔ)器訪問)**拷貝到內(nèi)核態(tài) Buffer

    直接通過 DMA 拷貝到 NIC Buffer(socket buffer),無需 CPU 拷貝。

    除了減少數(shù)據(jù)拷貝外,整個(gè)讀文件 ==> 網(wǎng)絡(luò)發(fā)送由一個(gè) sendfile 調(diào)用完成,整個(gè)過程只有兩次上下文切換,因此大大提高了性能。

    Java NIO對sendfile的支持就是FileChannel.transferTo()/transferFrom()。

    fileChannel.transferTo( position, count, socketChannel);

    把磁盤文件讀取OS內(nèi)核緩沖區(qū)后的fileChannel,直接轉(zhuǎn)給socketChannel發(fā)送;底層就是sendfile。消費(fèi)者從broker讀取數(shù)據(jù),就是由此實(shí)現(xiàn)。

    具體來看,Kafka 的數(shù)據(jù)傳輸通過 TransportLayer 來完成,其子類 PlaintextTransportLayer 通過Java NIO 的 FileChannel 的 transferTo 和 transferFrom 方法實(shí)現(xiàn)零拷貝。

    注: transferTo 和 transferFrom 并不保證一定能使用零拷貝,需要操作系統(tǒng)支持。
    Linux 2.4+ 內(nèi)核通過 sendfile 系統(tǒng)調(diào)用,提供了零拷貝。

    2.5.3.2 頁緩存

    頁緩存是操作系統(tǒng)實(shí)現(xiàn)的一種主要的磁盤緩存,以此用來減少對磁盤 I/O 的操作。

    具體來說,就是把磁盤中的數(shù)據(jù)緩存到內(nèi)存中,把對磁盤的訪問變?yōu)閷?nèi)存的訪問。

    Kafka接收來自socket buffer的網(wǎng)絡(luò)數(shù)據(jù),應(yīng)用進(jìn)程不需要中間處理、直接進(jìn)行持久化時(shí)。可以使用mmap內(nèi)存文件映射。

    Memory Mapped Files

    簡稱mmap,簡單描述其作用就是:將磁盤文件映射到內(nèi)存, 用戶通過修改內(nèi)存就能修改磁盤文件。

    它的工作原理是直接利用操作系統(tǒng)的Page來實(shí)現(xiàn)磁盤文件到物理內(nèi)存的直接映射。完成映射之后你對物理內(nèi)存的操作會(huì)被同步到硬盤上(操作系統(tǒng)在適當(dāng)?shù)臅r(shí)候)。

    通過mmap,進(jìn)程像讀寫硬盤一樣讀寫內(nèi)存(當(dāng)然是虛擬機(jī)內(nèi)存)。使用這種方式可以獲取很大的I/O提升,省去了用戶空間到內(nèi)核空間復(fù)制的開銷。

    mmap也有一個(gè)很明顯的缺陷:不可靠,寫到mmap中的數(shù)據(jù)并沒有被真正的寫到硬盤,操作系統(tǒng)會(huì)在程序主動(dòng)調(diào)用flush的時(shí)候才把數(shù)據(jù)真正的寫到硬盤。

    Kafka提供了一個(gè)參數(shù) producer.type 來控制是不是主動(dòng)flush;

    如果Kafka寫入到mmap之后就立即flush然后再返回Producer叫同步(sync);

    寫入mmap之后立即返回Producer不調(diào)用flush叫異步(async)。

    Java NIO對文件映射的支持
    Java NIO,提供了一個(gè)MappedByteBuffer 類可以用來實(shí)現(xiàn)內(nèi)存映射。
    MappedByteBuffer只能通過調(diào)用FileChannel的map()取得,再?zèng)]有其他方式。
    FileChannel.map()是抽象方法,具體實(shí)現(xiàn)是在FileChannelImpl.map()可自行查看JDK源碼,其map0()方法就是調(diào)用了Linux內(nèi)核的mmap的API。


    使用 MappedByteBuffer類要注意的是

    • mmap的文件映射,在full gc時(shí)才會(huì)進(jìn)行釋放。當(dāng)close時(shí),需要手動(dòng)清除內(nèi)存映射文件,可以反射調(diào)用sun.misc.Cleaner方法。

    當(dāng)一個(gè)進(jìn)程準(zhǔn)備讀取磁盤上的文件內(nèi)容時(shí):

  • 操作系統(tǒng)會(huì)先查看待讀取的數(shù)據(jù)所在的頁 (page)是否在頁緩存(pagecache)中,如果存在(命 中)則直接返回?cái)?shù)據(jù),從而避免了對物理磁盤的 I/O 操作;
  • 如果沒有命中,則操作系統(tǒng)會(huì)向磁盤發(fā)起讀取請求并將讀取的數(shù)據(jù)頁存入頁緩存,之后再將數(shù)
    據(jù)返回給進(jìn)程。
  • 如果一個(gè)進(jìn)程需要將數(shù)據(jù)寫入磁盤:

  • 操作系統(tǒng)也會(huì)檢測數(shù)據(jù)對應(yīng)的頁是否在頁緩存中,如果不存在,則會(huì)先在頁緩存中添加相應(yīng)的
    頁,最后將數(shù)據(jù)寫入對應(yīng)的頁。
  • 被修改過后的頁也就變成了臟頁,操作系統(tǒng)會(huì)在合適的時(shí)間把臟頁中的數(shù)據(jù)寫入磁盤,以保持
    數(shù)據(jù)的一致性。
  • 對一個(gè)進(jìn)程而言,它會(huì)在進(jìn)程內(nèi)部緩存處理所需的數(shù)據(jù),然而這些數(shù)據(jù)有可能還緩存在操作系統(tǒng)的頁緩存中,因此同一份數(shù)據(jù)有可能被緩存了兩次。并且,除非使用Direct I/O的方式, 否則頁緩存很難
    被禁止。

    當(dāng)使用頁緩存的時(shí)候,即使Kafka服務(wù)重啟, 頁緩存還是會(huì)保持有效,然而進(jìn)程內(nèi)的緩存卻需要重建。這樣也極大地簡化了代碼邏輯,因?yàn)榫S護(hù)頁緩存和文件之間的一致性交由操作系統(tǒng)來負(fù)責(zé),這樣會(huì)
    比進(jìn)程內(nèi)維護(hù)更加安全有效。

    Kafka中大量使用了頁緩存,這是 Kafka 實(shí)現(xiàn)高吞吐的重要因素之一。
    消息先被寫入頁緩存,由操作系統(tǒng)負(fù)責(zé)刷盤任務(wù)。

    2.5.3.3 順序?qū)懭?/h3>

    操作系統(tǒng)可以針對線性讀寫做深層次的優(yōu)化,比如預(yù)讀(read-ahead,提前將一個(gè)比較大的磁盤塊讀入內(nèi)存) 和后寫(write-behind,將很多小的邏輯寫操作合并起來組成一個(gè)大的物理寫操作)技術(shù)。

    Kafka 在設(shè)計(jì)時(shí)采用了文件追加的方式來寫入消息,即只能在日志文件的尾部追加新的消 息,并且也不允許修改已寫入的消息,這種方式屬于典型的順序?qū)懕P的操作,所以就算 Kafka 使用磁盤作為存儲(chǔ)
    介質(zhì),也能承載非常大的吞吐量。

    mmap和sendfile:

  • Linux內(nèi)核提供、實(shí)現(xiàn)零拷貝的API;
  • sendfile 是將讀到內(nèi)核空間的數(shù)據(jù),轉(zhuǎn)到socket buffer,進(jìn)行網(wǎng)絡(luò)發(fā)送;
  • mmap將磁盤文件映射到內(nèi)存,支持讀和寫,對內(nèi)存的操作會(huì)反映在磁盤文件上。
  • RocketMQ 在消費(fèi)消息時(shí),使用了 mmap。kafka 使用了 sendFile。
  • Kafka速度快是因?yàn)?#xff1a;

  • partition順序讀寫,充分利用磁盤特性,這是基礎(chǔ);
  • Producer生產(chǎn)的數(shù)據(jù)持久化到broker,采用mmap文件映射,實(shí)現(xiàn)順序的快速寫入;
  • Customer從broker讀取數(shù)據(jù),采用sendfile,將磁盤文件讀到OS內(nèi)核緩沖區(qū)后,直接轉(zhuǎn)到socket buffer進(jìn)行網(wǎng)絡(luò)發(fā)送。
  • 2.6 穩(wěn)定性

    2.6.1 事務(wù)

    一、事務(wù)場景

  • 如producer發(fā)的多條消息組成一個(gè)事務(wù)這些消息需要對consumer同時(shí)可見或者同時(shí)不可見。
  • producer可能會(huì)給多個(gè)topic,多個(gè)partition發(fā)消息,這些消息也需要能放在一個(gè)事務(wù)里面,
    這就形成了一個(gè)典型的分布式事務(wù)。
  • kafka的應(yīng)用場景經(jīng)常是應(yīng)用先消費(fèi)一個(gè)topic,然后做處理再發(fā)到另一個(gè)topic,這個(gè)consume-transform-produce過程需要放到一個(gè)事務(wù)里面,比如在消息處理或者發(fā)送的過程中如果失敗了,消費(fèi)偏移量也不能提交。
  • producer或者producer所在的應(yīng)用可能會(huì)掛掉,新的producer啟動(dòng)以后需要知道怎么處理之
    前未完成的事務(wù) 。
  • 在一個(gè)原子操作中,根據(jù)包含的操作類型,可以分為三種情況,前兩種情況是事務(wù)引入的場景,最后一種沒用。
    1) 只有Producer生產(chǎn)消息;
    2)消費(fèi)消息和生產(chǎn)消息并存,這個(gè)是事務(wù)場景中最常用的情況,就是我們常說的consume-transform-produce 模式
    3) 只有consumer消費(fèi)消息,這種操作其實(shí)沒有什么意義,跟使用手動(dòng)提交效果一樣,而且也不是事務(wù)屬性引入的目的,所以一般不會(huì)使用這種情況
  • 二、幾個(gè)關(guān)鍵概念和推導(dǎo)

  • 因?yàn)閜roducer發(fā)送消息可能是分布式事務(wù),所以引入了常用的2PC,所以有事務(wù)協(xié)調(diào)者(Transaction Coordinator)。Transaction Coordinator和之前為了解決腦裂和驚群問題引入的Group Coordinator在選舉上類似。
  • 事務(wù)管理中事務(wù)日志是必不可少的,kafka使用一個(gè)內(nèi)部topic來保存事務(wù)日志,這個(gè)設(shè)計(jì)和之前使用內(nèi)部topic保存偏移量的設(shè)計(jì)保持一致。事務(wù)日志是Transaction Coordinator管理的狀態(tài)的持久化,因?yàn)椴恍枰厮菔聞?wù)的歷史狀態(tài),所以事務(wù)日志只用保存最近的事務(wù)狀態(tài)。
    __transaction_state
  • 因?yàn)槭聞?wù)存在commit和abort兩種操作,而客戶端又有read committed和readuncommitted兩種隔離級別,所以消息隊(duì)列必須能標(biāo)識(shí)事務(wù)狀態(tài),這個(gè)被稱作Control
    Message。
  • producer掛掉重啟或者漂移到其它機(jī)器需要能關(guān)聯(lián)的之前的未完成事務(wù)所以需要有一個(gè)唯一標(biāo)識(shí)符來進(jìn)行關(guān)聯(lián),這個(gè)就是TransactionalId,一個(gè)producer掛了,另一個(gè)有相同
    TransactionalId的producer能夠接著處理這個(gè)事務(wù)未完成的狀態(tài)。kafka目前沒有引入全局序,所以也沒有transaction id,這個(gè)TransactionalId是用戶提前配置的。
  • TransactionalId能關(guān)聯(lián)producer,也需要避免兩個(gè)使用相同TransactionalId的producer同時(shí)存在,所以引入了producer epoch來保證對應(yīng)一個(gè)TransactionalId只有一個(gè)活躍的producer
  • 三、事務(wù)語義
    2.1. 多分區(qū)原子寫入
    事務(wù)能夠保證Kafka topic下每個(gè)分區(qū)的原子寫入。事務(wù)中所有的消息都將被成功寫入或者丟棄。

    首先,我們來考慮一下原子 讀取-處理-寫入 周期是什么意思。簡而言之,這意味著如果某個(gè)應(yīng)用程序在某個(gè)topic tp0的偏移量X處讀取到了消息A,并且在對消息A進(jìn)行了一些處理(如B = F(A))之后
    將消息B寫入topic tp1,則只有當(dāng)消息A和B被認(rèn)為被成功地消費(fèi)并一起發(fā)布,或者完全不發(fā)布時(shí),整個(gè)讀取過程寫入操作是原子的。

    現(xiàn)在,只有當(dāng)消息A的偏移量X被標(biāo)記為已消費(fèi),消息A才從topic tp0消費(fèi),消費(fèi)到的數(shù)據(jù)偏移量(record offset)將被標(biāo)記為提交偏移量(Committing offset)。在Kafka中,我們通過寫入一個(gè)名為
    offsets topic的內(nèi)部Kafka topic來記錄offset commit。消息僅在其offset被提交給offsets topic時(shí)才被認(rèn)為成功消費(fèi)。

    由于offset commit只是對Kafkatopic的另一次寫入,并且由于消息僅在提交偏移量時(shí)被視為成功消費(fèi),所以跨多個(gè)主題和分區(qū)的原子寫入也啟用原子 讀取-處理-寫入 循環(huán):提交偏移量X到offset topic和消息B到tp1的寫入將是單個(gè)事務(wù)的一部分,所以整個(gè)步驟都是原子的。

    2.2. 粉碎“僵尸實(shí)例”
    我們通過為每個(gè)事務(wù)Producer分配一個(gè)稱為transactional.id的唯一標(biāo)識(shí)符來解決僵尸實(shí)例的問題。在進(jìn)程重新啟動(dòng)時(shí)能夠識(shí)別相同的Producer實(shí)例。

    API要求事務(wù)性Producer的第一個(gè)操作應(yīng)該是在Kafka集群中顯示注冊transactional.id。 當(dāng)注冊的時(shí)候,Kafka broker用給定的transactional.id檢查打開的事務(wù)并且完成處理。 Kafka也增加了一個(gè)與transactional.id相關(guān)的epoch。Epoch存儲(chǔ)每個(gè)transactional.id內(nèi)部元數(shù)據(jù)。

    一旦epoch被觸發(fā),任何具有相同的transactional.id和舊的epoch的生產(chǎn)者被視為僵尸,Kafka拒絕來自這些生產(chǎn)者的后續(xù)事務(wù)性寫入。

    簡而言之:Kafka可以保證Consumer最終只能消費(fèi)非事務(wù)性消息或已提交事務(wù)性消息。它將保留來自未完成事務(wù)的消息,并過濾掉已中止事務(wù)的消息。

    2.3 事務(wù)消息定義
    生產(chǎn)者可以顯式地發(fā)起事務(wù)會(huì)話,在這些會(huì)話中發(fā)送(事務(wù))消息,并提交或中止事務(wù)。有如下要求:

  • 原子性:消費(fèi)者的應(yīng)用程序不應(yīng)暴露于未提交事務(wù)的消息中。
  • 持久性:Broker不能丟失任何已提交的事務(wù)。
  • 排序:事務(wù)消費(fèi)者應(yīng)在每個(gè)分區(qū)中以原始順序查看事務(wù)消息。
  • 交織:每個(gè)分區(qū)都應(yīng)該能夠接收來自事務(wù)性生產(chǎn)者和非事務(wù)生產(chǎn)者的消息
  • 事務(wù)中不應(yīng)有重復(fù)的消息。
  • 如果允許事務(wù)性和非事務(wù)性消息的交織,則非事務(wù)性和事務(wù)性消息的相對順序?qū)⒒诟郊?#xff08;對于非事務(wù)性消息)和最終提交(對于事務(wù)性消息)的相對順序。

    在上圖中,分區(qū)p0和p1接收事務(wù)X1和X2的消息,以及非事務(wù)性消息。時(shí)間線是消息到達(dá)Broker的時(shí)間。由于首先提交了X2,所以每個(gè)分區(qū)都將在X1之前公開來自X2的消息。由于非事務(wù)性消息在X1和X2的提交之前到達(dá),因此這些消息將在來自任一事務(wù)的消息之前公開。

    四、事務(wù)配置
    1、創(chuàng)建消費(fèi)者代碼,需要:

    • 將配置中的自動(dòng)提交屬性(auto.commit)進(jìn)行關(guān)閉
    • 而且在代碼里面也不能使用手動(dòng)提交commitSync( )或者commitAsync( )
    • 設(shè)置isolation.level:READ_COMMITTED或READ_UNCOMMITTED

    2、創(chuàng)建生成者,代碼如下,需要:

    • 配置transactional.id屬性
    • 配置enable.idempotence屬性

    五、事務(wù)概覽
    生產(chǎn)者將表示事務(wù)開始/結(jié)束/中止?fàn)顟B(tài)的事務(wù)控制消息發(fā)送給使用多階段協(xié)議管理事務(wù)的高可用事務(wù)協(xié)調(diào)器。生產(chǎn)者將事務(wù)控制記錄(開始/結(jié)束/中止)發(fā)送到事務(wù)協(xié)調(diào)器,并將事務(wù)的消息直接發(fā)送到目標(biāo)數(shù)據(jù)分區(qū)。消費(fèi)者需要了解事務(wù)并緩沖每個(gè)待處理的事務(wù),直到它們到達(dá)其相應(yīng)的結(jié)束(提交/中止)記錄為止。

    • 事務(wù)組
    • 事務(wù)組中的生產(chǎn)者
    • 事務(wù)組的事務(wù)協(xié)調(diào)器
    • Leader brokers(事務(wù)數(shù)據(jù)所在分區(qū)的Broker)
    • 事務(wù)的消費(fèi)者

    六、事務(wù)組
    事務(wù)組用于映射到特定的事務(wù)協(xié)調(diào)器(基于日志分區(qū)數(shù)字的哈希)。該組中的生產(chǎn)者需要配置為該組事務(wù)生產(chǎn)者。由于來自這些生產(chǎn)者的所有事務(wù)都通過此協(xié)調(diào)器進(jìn)行,因此我們可以在這些事務(wù)生產(chǎn)者之間實(shí)現(xiàn)嚴(yán)格的有序

    七、生產(chǎn)者ID和事務(wù)組狀態(tài)
    事務(wù)生產(chǎn)者需要兩個(gè)新參數(shù):生產(chǎn)者ID和生產(chǎn)組。

    需要將生產(chǎn)者的輸入狀態(tài)與上一個(gè)已提交的事務(wù)相關(guān)聯(lián)。這使事務(wù)生產(chǎn)者能夠重試事務(wù)(通過為該事務(wù)重新創(chuàng)建輸入狀態(tài);在我們的用例中通常是偏移量的向量)。

    可以使用消費(fèi)者偏移量管理機(jī)制來管理這些狀態(tài)。消費(fèi)者偏移量管理器將每個(gè)鍵(consumergroup-topic-partition )與該分區(qū)的最后一個(gè)檢查點(diǎn)偏移量和元數(shù)據(jù)相關(guān)聯(lián)。在事務(wù)生產(chǎn)者中,我們保存消費(fèi)者的偏移量,該偏移量與事務(wù)的提交點(diǎn)關(guān)聯(lián)。此偏移提交記錄(在__consumer_offsets 主題中)應(yīng)作為事務(wù)的一部分寫入。即,存儲(chǔ)消費(fèi)組偏移量的__consumer_offsets 主題分區(qū)將需要參與事務(wù)。因此,假定生產(chǎn)者在事務(wù)中間失敗(事務(wù)協(xié)調(diào)器隨后到期);當(dāng)生產(chǎn)者恢復(fù)時(shí),它可以發(fā)出偏移量獲取請求,以恢復(fù)與最后提交的事務(wù)相關(guān)聯(lián)的輸入偏移量,并從該點(diǎn)恢復(fù)事務(wù)處理。

    為了支持此功能,我們需要對偏移量管理器和壓縮的 __consumer_offsets 主題進(jìn)行一些增強(qiáng)。

    首先,壓縮的主題現(xiàn)在還將包含事務(wù)控制記錄。我們將需要為這些控制記錄提出剔除策略。

    其次,偏移量管理器需要具有事務(wù)意識(shí);特別是,如果組與待處理的事務(wù)相關(guān)聯(lián),則偏移量提
    取請求應(yīng)返回錯(cuò)誤。

    八、事務(wù)協(xié)調(diào)器

    需要確保無論是什么樣的保留策略(日志分區(qū)的刪除還是壓縮),都不能刪除包含事務(wù)HW的日志分段。

    九、事務(wù)流程

    初始階段 (圖中步驟A)

  • Producer:計(jì)算哪個(gè)Broker作為事務(wù)協(xié)調(diào)器。
  • Producer:向事務(wù)協(xié)調(diào)器發(fā)送BeginTransaction(producerId, generation, partitions… )請
    求,當(dāng)然也可以發(fā)送另一個(gè)包含事務(wù)過期時(shí)間的。如果生產(chǎn)者需要將消費(fèi)者狀態(tài)作為事務(wù)的一部分提交事務(wù),則需要在BeginTransaction中包含對應(yīng)的 __consumer_offsets 主題分區(qū)信息。
  • Broker:生成事務(wù)ID
  • Coordinator:向事務(wù)協(xié)調(diào)主題追加BEGIN(TxId, producerId, generation, partitions…)消息,然后發(fā)送響應(yīng)給生產(chǎn)者。
  • Producer:讀取響應(yīng)(包含了事務(wù)ID:TxId) 6. Coordinator (and followers):在內(nèi)存更新當(dāng)前事務(wù)的待確認(rèn)事務(wù)狀態(tài)和數(shù)據(jù)分區(qū)信息。
  • 發(fā)送階段
    (圖中步驟2)
    Producer:發(fā)送事務(wù)消息給主題Leader分區(qū)所在的Broker。每個(gè)消息需要包含TxId和TxCtl字段。
    TxCtl僅用于標(biāo)記事務(wù)的最終狀態(tài)(提交還是中止)。生產(chǎn)者請求也封裝了生產(chǎn)者ID,但是不追加到日志中。

    結(jié)束階段 (生產(chǎn)者準(zhǔn)備提交事務(wù))
    (圖中步驟3、4、5。)

    十、事務(wù)的中止
    當(dāng)事務(wù)生產(chǎn)者發(fā)送業(yè)務(wù)消息的時(shí)候如果發(fā)生異常,可以中止該事務(wù)。如果事務(wù)提交超時(shí),事務(wù)協(xié)調(diào)器也會(huì)中止當(dāng)前事務(wù)。

    十一、基本事務(wù)流程的失敗

    十二、主題的壓縮
    壓縮主題在壓縮過程中會(huì)丟棄具有相同鍵的早期記錄。如果這些記錄是事務(wù)的一部分,這合法嗎?
    這可能有點(diǎn)怪異,但可能不會(huì)太有害,因?yàn)樵谥黝}中使用壓縮策略的理由是保留關(guān)鍵數(shù)據(jù)的最新更新。

    如果該應(yīng)用程序正在(例如)更新某些表,并且事務(wù)中的消息對應(yīng)于不同的鍵,則這種情況可能導(dǎo)致數(shù)據(jù)庫視圖不一致。

    十三、事務(wù)相關(guān)配置
    1、Broker configs

    2、Producer configs

    3、Consumer configs

    2.6.1.1 冪等性

    Kafka在引入冪等性之前,Producer向Broker發(fā)送消息,然后Broker將消息追加到消息流中后給Producer返回Ack信號值。實(shí)現(xiàn)流程如下:

    生產(chǎn)中,會(huì)出現(xiàn)各種不確定的因素,比如在Producer在發(fā)送給Broker的時(shí)候出現(xiàn)網(wǎng)絡(luò)異常。比如以下這種異常情況的出現(xiàn):

    上圖這種情況,當(dāng)Producer第一次發(fā)送消息給Broker時(shí),Broker將消息(x2,y2)追加到了消息流中,但是在返回Ack信號給Producer時(shí)失敗了(比如網(wǎng)絡(luò)異常) 。此時(shí),Producer端觸發(fā)重試機(jī)制,將消息(x2,y2)重新發(fā)送給Broker,Broker接收到消息后,再次將該消息追加到消息流中,然后成功返回Ack信號給Producer。這樣下來,消息流中就被重復(fù)追加了兩條相同的(x2,y2)的消息。

    冪等性
    保證在消息重發(fā)的時(shí)候,消費(fèi)者不會(huì)重復(fù)處理。即使在消費(fèi)者收到重復(fù)消息的時(shí)候,重復(fù)處理,也要保證最終結(jié)果的一致性。
    所謂冪等性,數(shù)學(xué)概念就是: f(f(x)) = f(x) 。f函數(shù)表示對消息的處理。
    比如,銀行轉(zhuǎn)賬,如果失敗,需要重試。不管重試多少次,都要保證最終結(jié)果一定是一致的。

    冪等性實(shí)現(xiàn)
    添加唯一ID,類似于數(shù)據(jù)庫的主鍵,用于唯一標(biāo)記一個(gè)消息。
    Kafka為了實(shí)現(xiàn)冪等性,它在底層設(shè)計(jì)架構(gòu)中引入了ProducerID和SequenceNumber。

    • ProducerID:在每個(gè)新的Producer初始化時(shí),會(huì)被分配一個(gè)唯一的ProducerID,這個(gè)ProducerID對客戶端使用者是不可見的。
    • SequenceNumber:對于每個(gè)ProducerID,Producer發(fā)送數(shù)據(jù)的每個(gè)Topic和Partition都對應(yīng)一個(gè)從0開始單調(diào)遞增的SequenceNumber值。


    同樣,這是一種理想狀態(tài)下的發(fā)送流程。實(shí)際情況下,會(huì)有很多不確定的因素,比如Broker在發(fā)送Ack信號給Producer時(shí)出現(xiàn)網(wǎng)絡(luò)異常,導(dǎo)致發(fā)送失敗。異常情況如下圖所示:

    當(dāng)Producer發(fā)送消息(x2,y2)給Broker時(shí),Broker接收到消息并將其追加到消息流中。此時(shí),Broker返回Ack信號給Producer時(shí),發(fā)生異常導(dǎo)致Producer接收Ack信號失敗。對于Producer來說,會(huì)觸發(fā)重試機(jī)制,將消息(x2,y2)再次發(fā)送,但是,由于引入了冪等性,在每條消息中附帶了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber發(fā)送給Broker,而之前Broker緩存過之前發(fā)送的相同的消息,那么在消息流中的消息就只有一條(x2,y2),不會(huì)出現(xiàn)重復(fù)發(fā)送的情況。

    客戶端在生成Producer時(shí),會(huì)實(shí)例化如下代碼:

    // 實(shí)例化一個(gè)Producer對象 Producer<String, String> producer = new KafkaProducer<>(props);

    在org.apache.kafka.clients.producer.internals.Sender類中,在run()中有一個(gè)maybeWaitForPid()方法,用來生成一個(gè)ProducerID,實(shí)現(xiàn)代碼如下:

    private void maybeWaitForPid() {if (transactionState == null)return;while (!transactionState.hasPid()) {try {Node node = awaitLeastLoadedNodeReady(requestTimeout);if (node != null) {ClientResponse response = sendAndAwaitInitPidRequest(node);if (response.hasResponse() && (response.responseBody() instanceof InitPidResponse)) {InitPidResponse initPidResponse = (InitPidResponse) response.responseBody();// 設(shè)置pid和epochtransactionState.setPidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch());} else {log.error("Received an unexpected response type for an InitPidRequest from { }." +"We will back off and try again.", node);}} else {log.debug("Could not find an available broker to send InitPidRequest to." +"We will back off and try again.");}} catch (Exception e) {log.warn("Received an exception while trying to get a pid. Will back off and retry.", e);}log.trace("Retry InitPidRequest in {}ms.", retryBackoffMs);time.sleep(retryBackoffMs);metadata.requestUpdate();} }

    2.6.1.2 事務(wù)操作

    在Kafka事務(wù)中,一個(gè)原子性操作,根據(jù)操作類型可以分為3種情況。情況如下:

    • 只有Producer生產(chǎn)消息,這種場景需要事務(wù)的介入;
    • 消費(fèi)消息和生產(chǎn)消息并存,比如Consumer&Producer模式,這種場景是一般Kafka項(xiàng)目中比較常見的模式,需要事務(wù)介入;
    • 只有Consumer消費(fèi)消息,這種操作在實(shí)際項(xiàng)目中意義不大,和手動(dòng)Commit Offsets的結(jié)果一樣,而且這種場景不是事務(wù)的引入目的。
    // 初始化事務(wù),需要注意確保transation.id屬性被分配 void initTransactions();// 開啟事務(wù) void beginTransaction() throws ProducerFencedException;// 為Consumer提供的在事務(wù)內(nèi)Commit Offsets的操作 void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throwsProducerFencedException;// 提交事務(wù) void commitTransaction() throws ProducerFencedException;// 放棄事務(wù),類似于回滾事務(wù)的操作 void abortTransaction() throws ProducerFencedException;

    案例1:單個(gè)Producer,使用事務(wù)保證消息的僅一次發(fā)送:

    package com.lagou.kafka.demo.producer;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer;import java.util.HashMap; import java.util.Map;public class MyTransactionalProducer {public static void main(String[] args) {Map<String, Object> configs = new HashMap<>();configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 提供生產(chǎn)者client.idconfigs.put(ProducerConfig.CLIENT_ID_CONFIG, "tx_producer");// 設(shè)置事務(wù)IDconfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my_tx_id_1");// 需要ISR全體確認(rèn)消息configs.put(ProducerConfig.ACKS_CONFIG, "all");KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);// 初始化事務(wù)producer.initTransactions();try {// 開啟事務(wù)producer.beginTransaction();// 發(fā)送事務(wù)消息producer.send(new ProducerRecord<>("tp_tx_01", "txkey1", "tx_msg_4"));producer.send(new ProducerRecord<>("tp_tx_01", "txkey2", "tx_msg_5"));producer.send(new ProducerRecord<>("tp_tx_01", "txkey3", "tx_msg_6"));// 人為制造異常int i = 1 / 0;// 提交事務(wù)producer.commitTransaction();} catch (Exception e) {e.printStackTrace();// 事務(wù)回滾producer.abortTransaction();} finally {// 關(guān)閉生產(chǎn)者producer.close();}} }

    正常運(yùn)行時(shí), 一次全都顯示,如果有異常則會(huì)回滾, 不發(fā)送消息

    案例2:在 消費(fèi)-轉(zhuǎn)換-生產(chǎn) 模式,使用事務(wù)保證僅一次發(fā)送。

    package com.lagou.kafka.demo;import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer;import java.util.Collections; import java.util.HashMap; import java.util.Map;public class MyTransactional {public static KafkaProducer<String, String> getProducer() {Map<String, Object> configs = new HashMap<>();configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 設(shè)置client.idconfigs.put(ProducerConfig.CLIENT_ID_CONFIG, "tx_producer_01");// 設(shè)置事務(wù)id, 必須設(shè)置configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx_id_02");// 需要所有的ISR副本確認(rèn)configs.put(ProducerConfig.ACKS_CONFIG, "all");// 啟用冪等性, 通過pid和sequenceconfigs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);return producer;}public static KafkaConsumer<String, String> getConsumer(String consumerGroupId) {Map<String, Object> configs = new HashMap<>();configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// 設(shè)置消費(fèi)組IDconfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_grp_02");// 不啟用消費(fèi)者偏移量的自動(dòng)確認(rèn),也不要手動(dòng)確認(rèn)configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer_client_02");// 如果不存在這個(gè)偏移量, 就自動(dòng)讀最早的configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 只讀取已提交的消息 // configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);return consumer;}public static void main(String[] args) {// 設(shè)置消費(fèi)組idString consumerGroupId = "consumer_grp_id_101";KafkaProducer<String, String> producer = getProducer();KafkaConsumer<String, String> consumer = getConsumer(consumerGroupId);// 事務(wù)的初始化producer.initTransactions();//訂閱主題consumer.subscribe(Collections.singleton("tp_tx_01"));final ConsumerRecords<String, String> records = consumer.poll(1_000);// 開啟事務(wù)producer.beginTransaction();try {Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();for (ConsumerRecord<String, String> record : records) {System.out.println(record);producer.send(new ProducerRecord<String, String>("tp_tx_out_01", record.key(), record.value()));offsets.put(new TopicPartition(record.topic(), record.partition()),// 下面要 +1, 偏移量表示下一條要消費(fèi)的消息new OffsetAndMetadata(record.offset() + 1));}// 將該消息的偏移量提交作為事務(wù)的一部分,隨事務(wù)一起提交和回滾(不手動(dòng)或自動(dòng)提交消費(fèi)偏移量)producer.sendOffsetsToTransaction(offsets, consumerGroupId);// int i = 1 / 0;// 提交事務(wù)producer.commitTransaction();} catch (Exception e) {e.printStackTrace();// 回滾事務(wù)producer.abortTransaction();} finally {// 關(guān)閉資源producer.close();consumer.close();}} }

    有異常時(shí), 運(yùn)行之后消費(fèi)者不會(huì)消費(fèi)消息, 同時(shí), 再次運(yùn)行時(shí), 還可以從之前的偏移量繼續(xù)消費(fèi)消息

    2.6.2 控制器

    Kafka集群包含若干個(gè)broker,broker.id指定broker的編號,編號不要重復(fù)。
    Kafka集群上創(chuàng)建的主題,包含若干個(gè)分區(qū)。
    每個(gè)分區(qū)包含若干個(gè)副本,副本因子包括了Follower副本和Leader副本。
    副本又分為ISR(同步副本分區(qū))和OSR(非同步副本分區(qū))。

    控制器就是一個(gè)broker。
    控制器除了一般broker的功能,還負(fù)責(zé)Leader分區(qū)的選舉。

    2.6.2.1 broker選舉

    集群里第一個(gè)啟動(dòng)的broker在Zookeeper中創(chuàng)建臨時(shí)節(jié)點(diǎn) /controller 。
    其他broker在該控制器節(jié)點(diǎn)創(chuàng)建Zookeeper watch對象,使用Zookeeper的監(jiān)聽機(jī)制接收該節(jié)點(diǎn)的變更。
    即:Kafka通過Zookeeper的分布式鎖特性選舉集群控制器。
    下圖中,節(jié)點(diǎn) /myKafka/controller 是一個(gè)zookeeper臨時(shí)節(jié)點(diǎn),其中 “brokerid”:0 ,表示當(dāng)前控制器是broker.id為0的broker。

    每個(gè)新選出的控制器通過 Zookeeper 的條件遞增操作獲得一個(gè)全新的、數(shù)值更大的 controller epoch。其他 broker 在知道當(dāng)前 controller epoch 后,如果收到由控制器發(fā)出的包含較舊epoch 的消息,就會(huì)忽略它們,以防止“腦裂”

    比如當(dāng)一個(gè)Leader副本分區(qū)所在的broker宕機(jī),需要選舉新的Leader副本分區(qū),有可能兩個(gè)具有不同紀(jì)元數(shù)字的控制器都選舉了新的Leader副本分區(qū),如果選舉出來的Leader副本分區(qū)不一樣,聽誰的?腦裂了。有了紀(jì)元數(shù)字,直接使用紀(jì)元數(shù)字最新的控制器結(jié)果。


    集群控制器負(fù)責(zé)監(jiān)聽 ids 節(jié)點(diǎn),一旦節(jié)點(diǎn)子節(jié)點(diǎn)發(fā)送變化,集群控制器得到通知。


    結(jié)論:

  • Kafka 使用 Zookeeper 的分布式鎖選舉控制器,并在節(jié)點(diǎn)加入集群或退出集群時(shí)通知控制器。
  • 控制器負(fù)責(zé)在節(jié)點(diǎn)加入或離開集群時(shí)進(jìn)行分區(qū)Leader選舉。
  • 控制器使用epoch 來避免“腦裂”。“腦裂”是指兩個(gè)節(jié)點(diǎn)同時(shí)認(rèn)為自己是當(dāng)前的控制器。
  • 2.6.3 可靠性保證

    概念

  • 創(chuàng)建Topic的時(shí)候可以指定 --replication-factor 3 ,表示分區(qū)的副本數(shù),不要超過broker的數(shù)量。
  • Leader是負(fù)責(zé)讀寫的節(jié)點(diǎn),而其他副本則是Follower。Producer只把消息發(fā)送到Leader,Follower定期地到Leader上Pull數(shù)據(jù)。
  • ISR是Leader負(fù)責(zé)維護(hù)的與其保持同步的Replica列表,即當(dāng)前活躍的副本列表。如果一個(gè)Follow落后太多,Leader會(huì)將它從ISR中移除。落后太多意思是該Follow復(fù)制的消息Follow長
    時(shí)間沒有向Leader發(fā)送fetch請求(參數(shù):replica.lag.time.max.ms 默認(rèn)值:10000)。 4. 為了保證可靠性,可以設(shè)置 acks=all 。Follower收到消息后,會(huì)像Leader發(fā)送ACK。一旦Leader收到了ISR中所有Replica的ACK,Leader就commit,那么Leader就向Producer發(fā)送ACK。
  • 副本的分配:
    當(dāng)某個(gè)topic的 --replication-factor 為N(N>1)時(shí),每個(gè)Partition都有N個(gè)副本,稱作replica。原則上是將replica均勻的分配到整個(gè)集群上。不僅如此,partition的分配也同樣需要均勻分配,為了更好的負(fù)載均衡。

    副本分配的三個(gè)目標(biāo):

  • 均衡地將副本分散于各個(gè)broker上
  • 對于某個(gè)broker上分配的分區(qū),它的其他副本在其他broker上
  • 如果所有的broker都有機(jī)架信息,盡量將分區(qū)的各個(gè)副本分配到不同機(jī)架上的broker。
  • 在不考慮機(jī)架信息的情況下:

  • 第一個(gè)副本分區(qū)通過輪詢的方式挑選一個(gè)broker,進(jìn)行分配。該輪詢從broker列表的隨機(jī)位置進(jìn)行輪詢。
  • 其余副本通過增加偏移進(jìn)行分配。
  • 2.6.3.1 失效副本

    失效副本的判定
    replica.lag.time.max.ms 默認(rèn)大小為10000。

    失效副本的分區(qū)個(gè)數(shù)是用于衡量Kafka性能指標(biāo)的重要部分。Kafka本身提供了一個(gè)相關(guān)的指標(biāo),即UnderReplicatedPartitions,這個(gè)可以通過JMX訪問:

    kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions 1

    取值范圍是大于等于0的整數(shù)。注意:如果Kafka集群正在做分區(qū)遷移(kafka-reassign-partitions.sh)的時(shí)候,這個(gè)值也會(huì)大于0。

    2.6.3.2 副本復(fù)制

    日志復(fù)制算法(log replication algorithm)必須提供的基本保證是,如果它告訴客戶端消息已被提交,而當(dāng)前Leader出現(xiàn)故障,新選出的Leader也必須具有該消息。在出現(xiàn)故障時(shí),Kafka會(huì)從掛掉Leader的ISR里面選擇一個(gè)Follower作為這個(gè)分區(qū)新的Leader。

    每個(gè)分區(qū)的 leader 會(huì)維護(hù)一個(gè)in-sync replica(同步副本列表,又稱 ISR)。當(dāng)Producer向broker發(fā)送消息,消息先寫入到對應(yīng)Leader分區(qū),然后復(fù)制到這個(gè)分區(qū)的所有副本中。ACKS=ALL時(shí),只有將消息成功復(fù)制到所有同步副本(ISR)后,這條消息才算被提交。


    當(dāng)副本落后于 leader 分區(qū)時(shí),這個(gè)副本被認(rèn)為是不同步或滯后的。在 Kafka中,副本的滯后于Leader是根據(jù) replica.lag.time.max.ms 來衡量。

    如何確認(rèn)某個(gè)副本處于滯后狀態(tài)
    通過 replica.lag.time.max.ms 來檢測卡住副本(Stuck replica)在所有情況下都能很好地工作。它跟蹤 follower 副本沒有向 leader 發(fā)送獲取請求的時(shí)間,通過這個(gè)可以推斷 follower 是否正常。

    另一方面,使用消息數(shù)量檢測不同步慢副本(Slow replica)的模型只有在為單個(gè)主題或具有同類流量模式的多個(gè)主題設(shè)置這些參數(shù)時(shí)才能很好地工作,但我們發(fā)現(xiàn)它不能擴(kuò)展到生產(chǎn)集群中所有主題。

    2.6.4 一致性保證

    一、概念

    1. 水位標(biāo)記
    水位或水印(watermark)一詞,表示位置信息,即位移(offset)。Kafka源碼中使用的名字是高水位,HW(high watermark)。

    2. 副本角色
    Kafka分區(qū)使用多個(gè)副本(replica)提供高可用。

    3. LEO和HW
    每個(gè)分區(qū)副本對象都有兩個(gè)重要的屬性:LEO和HW。

    • LEO:即日志末端位移(log end offset),記錄了該副本日志中下一條消息的位移值。如果LEO=10,那么表示該副本保存了10條消息,位移值范圍是[0, 9]。另外,Leader LEOFollower LEO的更新是有區(qū)別的。
    • HW:即上面提到的水位值。對于同一個(gè)副本對象而言,其HW值不會(huì)大于LEO值。小于等于HW值的所有消息都被認(rèn)為是“已備份”的(replicated)。Leader副本和Follower副本的HW更新不同。

    二、Follower副本何時(shí)更新LEO

    三、Follower副本何時(shí)更新HW

    Follower更新HW發(fā)生在其更新LEO之后,一旦Follower向Log寫完數(shù)據(jù),嘗試更新自己的HW值。

    比較當(dāng)前LEO值與FETCH響應(yīng)中Leader的HW值,取兩者的小者作為新的HW值。

    即:如果Follower的LEO大于Leader的HW,Follower HW值不會(huì)大于Leader的HW值。

    四、Leader副本何時(shí)更新LEO

    和Follower更新LEO相同,Leader寫Log時(shí)自動(dòng)更新自己的LEO值。

    五、Leader副本何時(shí)更新HW值

    Leader的HW值就是分區(qū)HW值,直接影響分區(qū)數(shù)據(jù)對消費(fèi)者的可見性 。

    Leader如何更新自己的HW值?Leader broker上保存了一套Follower副本的LEO以及自己的LEO。
    當(dāng)嘗試確定分區(qū)HW時(shí),它會(huì)選出所有滿足條件的副本,比較它們的LEO(包括Leader的LEO),并選擇最小的LEO值作為HW值
    需要滿足的條件,(二選一):

  • 處于ISR中
  • 副本LEO落后于Leader LEO的時(shí)長不大于 replica.lag.time.max.ms 參數(shù)值(默認(rèn)是10s)
  • 如果Kafka只判斷第一個(gè)條件的話,確定分區(qū)HW值時(shí)就不會(huì)考慮這些未在ISR中的副本,但這些副本已經(jīng)具備了“立刻進(jìn)入ISR”的資格,因此就可能出現(xiàn)分區(qū)HW值越過ISR中副本LEO的情況——不允許。因?yàn)榉謪^(qū)HW定義就是ISR中所有副本LEO的最小值。

    六、HW和LEO正常更新案例

    我們假設(shè)有一個(gè)topic,單分區(qū),副本因子是2,即一個(gè)Leader副本和一個(gè)Follower副本。我們看下當(dāng)producer發(fā)送一條消息時(shí),broker端的副本到底會(huì)發(fā)生什么事情以及分區(qū)HW是如何被更新的。

    1. 初始狀態(tài)

    2. Follower發(fā)送FETCH請求在Leader處理完P(guān)RODUCE請求之后
    producer給該topic分區(qū)發(fā)送了一條消息
    此時(shí)的狀態(tài)如下圖所示:


    PRODUCE請求處理完成后各值如下,Leader端的HW值依然是0,而LEO是1,Remote LEO也是0。

    假設(shè)此時(shí)follower發(fā)送了FETCH請求,則狀態(tài)變更如下:


    而Follower副本接收到FETCH Response后依次執(zhí)行下列操作:

  • 寫入本地Log,同時(shí)更新Follower自己管理的 LEO為1
  • 更新Follower HW:比較本地LEO和 FETCH Response 中的當(dāng)前Leader HW值,取較小者,Follower HW = 0
  • 此時(shí),第一輪FETCH RPC結(jié)束,我們會(huì)發(fā)現(xiàn)雖然Leader和Follower都已經(jīng)在Log中保存了這條消息,但分區(qū)HW值尚未被更新,仍為0。

    Follower第二輪FETCH
    分區(qū)HW是在第二輪FETCH RPC中被更新的,如下圖所示:


    同樣地,Follower副本接收到FETCH response后依次執(zhí)行下列操作:

  • 寫入本地Log,當(dāng)然沒東西可寫,Follower LEO也不會(huì)變化,依然是1。
  • 更新Follower HW:比較本地LEO和當(dāng)前LeaderHW取小者。由于都是1,故更新follower HW
    = 1 。

    此時(shí)消息已經(jīng)成功地被復(fù)制到Leader和Follower的Log中且分區(qū)HW是1,表明消費(fèi)者能夠消費(fèi)offset = 0的消息。
  • 七、HW和LEO異常案例

    Kafka使用HW值來決定副本備份的進(jìn)度,而HW值的更新通常需要額外一輪FETCH RPC才能完成。
    但這種設(shè)計(jì)是有問題的,可能引起的問題包括:

  • 備份數(shù)據(jù)丟失
  • 備份數(shù)據(jù)不一致
  • 數(shù)據(jù)丟失
    使用HW值來確定備份進(jìn)度時(shí)其值的更新是在下一輪RPC中完成的。如果Follower副本在標(biāo)記上方的的第一步與第二步之間發(fā)生崩潰,那么就有可能造成數(shù)據(jù)的丟失。

    上圖中有兩個(gè)副本:A和B。開始狀態(tài)是A是Leader。
    假設(shè)生產(chǎn)者 min.insync.replicas 為1,那么當(dāng)生產(chǎn)者發(fā)送兩條消息給A后,A寫入Log,此時(shí)Kafka會(huì)通知生產(chǎn)者這兩條消息寫入成功。

    但是在broker端,Leader和Follower的Log雖都寫入了2條消息且分區(qū)HW已經(jīng)被更新到2,但Follower HW尚未被更新還是1,也就是上面標(biāo)記的第二步尚未執(zhí)行,表中最后一條未執(zhí)行。

    倘若此時(shí)副本B所在的broker宕機(jī),那么重啟后B會(huì)自動(dòng)把LEO調(diào)整到之前的HW值1,故副本B會(huì)做日志截?cái)?log truncation),將offset = 1的那條消息從log中刪除,并調(diào)整LEO = 1。此時(shí)follower副本底層log中就只有一條消息,即offset = 0的消息!

    B重啟之后需要給A發(fā)FETCH請求,但若A所在broker機(jī)器在此時(shí)宕機(jī),那么Kafka會(huì)令B成為新的Leader,而當(dāng)A重啟回來后也會(huì)執(zhí)行日志截?cái)?#xff0c;將HW調(diào)整回1。這樣,offset=1的消息就從兩個(gè)副本的log中被刪除,也就是說這條已經(jīng)被生產(chǎn)者認(rèn)為發(fā)送成功的數(shù)據(jù)丟失。

    丟失數(shù)據(jù)的前提是 min.insync.replicas=1 時(shí),一旦消息被寫入Leader端Log即被認(rèn)為是committed 。延遲一輪 FETCH RPC 更新HW值的設(shè)計(jì)使follower HW值是異步延遲更新,若在這個(gè)過程中Leader發(fā)生變更,那么成為新Leader的Follower的HW值就有可能是過期的,導(dǎo)致生產(chǎn)者本是成功提交的消息被刪除。

    Leader和Follower數(shù)據(jù)離散
    除了可能造成的數(shù)據(jù)丟失以外,該設(shè)計(jì)還會(huì)造成Leader的Log和Follower的Log數(shù)據(jù)不一致。

    如Leader端記錄序列:m1,m2,m3,m4,m5,…;Follower端序列可能是m1,m3,m4,m5,…。
    看圖:

    八、Leader Epoch使用

    Kafka解決方案

    造成上述兩個(gè)問題的根本原因在于

  • HW值被用于衡量副本備份的成功與否。
  • 在出現(xiàn)失敗重啟時(shí)作為日志截?cái)嗟囊罁?jù)。
  • 但HW值的更新是異步延遲的,特別是需要額外的FETCH請求處理流程才能更新,故這中間發(fā)生的任何崩潰都可能導(dǎo)致HW值的過期。

    Kafka從0.11引入了 leader epoch 來取代HW值。Leader端使用內(nèi)存保存Leader的epoch信息,即使出現(xiàn)上面的兩個(gè)場景也能規(guī)避這些問題。

    所謂Leader epoch實(shí)際上是一對值:<epoch, offset>:

  • epoch表示Leader的版本號,從0開始,Leader變更過1次,epoch+1
  • offset對應(yīng)于該epoch版本的Leader寫入第一條消息的offset。因此假設(shè)有兩對值:
  • <0, 0> <1, 120>

    則表示第一個(gè)Leader從位移0開始寫入消息;共寫了120條[0, 119];而第二個(gè)Leader版本號是1,從位移120處開始寫入消息。

  • Leader broker中會(huì)保存這樣的一個(gè)緩存,并定期地寫入到一個(gè) checkpoint 文件中。
  • 當(dāng)Leader寫Log時(shí)它會(huì)嘗試更新整個(gè)緩存:如果這個(gè)Leader首次寫消息,則會(huì)在緩存中增加一個(gè)條目;否則就不做更新。
  • 每次副本變?yōu)長eader時(shí)會(huì)查詢這部分緩存,獲取出對應(yīng)Leader版本的位移,則不會(huì)發(fā)生數(shù)據(jù)不一致和丟失的情況。
  • 規(guī)避數(shù)據(jù)丟失

    規(guī)避數(shù)據(jù)不一致


    依靠Leader epoch的信息可以有效地規(guī)避數(shù)據(jù)不一致的問題。
    對于使用 unclean.leader.election.enable = true 設(shè)置的群集,該方案不能保證消息的一致性

    2.6.5 消息重復(fù)的場景及解決方案

    消息重復(fù)和丟失是kafka中很常見的問題,主要發(fā)生在以下三個(gè)階段:

  • 生產(chǎn)者階段
  • broke階段
  • 消費(fèi)者階段
  • 2.6.5.1 生產(chǎn)者階段重復(fù)場景

    2.6.5.1.1 根本原因

    生產(chǎn)發(fā)送的消息沒有收到正確的broke響應(yīng),導(dǎo)致生產(chǎn)者重試。

    生產(chǎn)者發(fā)出一條消息,broke落盤以后因?yàn)榫W(wǎng)絡(luò)等種種原因發(fā)送端得到一個(gè)發(fā)送失敗的響應(yīng)或者網(wǎng)絡(luò)中斷,然后生產(chǎn)者收到一個(gè)可恢復(fù)的Exception重試消息導(dǎo)致消息重復(fù)。

    2.6.5.1.2 重試過程


    說明:

  • new KafkaProducer()后創(chuàng)建一個(gè)后臺(tái)線程KafkaThread掃描RecordAccumulator中是否有消息;
  • 調(diào)用KafkaProducer.send()發(fā)送消息,實(shí)際上只是把消息保存到RecordAccumulator中;
  • 后臺(tái)線程KafkaThread掃描到RecordAccumulator中有消息后,將消息發(fā)送到kafka集群;
  • 如果發(fā)送成功,那么返回成功;
  • 如果發(fā)送失敗,那么判斷是否允許重試。如果不允許重試,那么返回失敗的結(jié)果;如果允許重試,把消息再保存到RecordAccumulator中,等待后臺(tái)線程KafkaThread掃描再次發(fā)送;
  • 2.6.5.1.3 可恢復(fù)異常說明

    異常是RetriableException類型或者TransactionManager允許重試;RetriableException類繼承關(guān)系如下:

    2.6.5.1.4 記錄順序問題

    如果設(shè)置 max.in.flight.requests.per.connection 大于1(默認(rèn)5,單個(gè)連接上發(fā)送的未確認(rèn)請求的最大數(shù)量,表示上一個(gè)發(fā)出的請求沒有確認(rèn)下一個(gè)請求又發(fā)出了)。大于1可能會(huì)改變記錄的順序,因?yàn)槿绻麑蓚€(gè)batch發(fā)送到單個(gè)分區(qū),第一個(gè)batch處理失敗并重試,但是第二個(gè)batch處理成功,那么第二個(gè)batch處理中的記錄可能先出現(xiàn)被消費(fèi)。

    設(shè)置 max.in.flight.requests.per.connection 為1,可能會(huì)影響吞吐量,可以解決單個(gè)生產(chǎn)者發(fā)送順序問題。如果多個(gè)生產(chǎn)者,生產(chǎn)者1先發(fā)送一個(gè)請求,生產(chǎn)者2后發(fā)送請求,此時(shí)生產(chǎn)者1返回可恢復(fù)異常,重試一定次數(shù)成功了。雖然生產(chǎn)者1先發(fā)送消息,但生產(chǎn)者2發(fā)送的消息會(huì)被先消費(fèi)。

    2.6.5.2 生產(chǎn)者發(fā)送重復(fù)解決方案

    2.6.5.2.1 啟動(dòng)kafka的冪等性

    要啟動(dòng)kafka的冪等性,設(shè)置: enable.idempotence=true ,以及 ack=all 以及 retries > 1 。

    2.6.5.2.2 ack=0,不重試。

    可能會(huì)丟消息,適用于吞吐量指標(biāo)重要性高于數(shù)據(jù)丟失,例如:日志收集。

    2.6.5.3 生產(chǎn)者和broke階段消息丟失場景

    2.6.5.3.1 ack=0,不重試

    生產(chǎn)者發(fā)送消息完,不管結(jié)果了,如果發(fā)送失敗也就丟失了。

    2.6.5.3.2 ack=1,leader crash

    生產(chǎn)者發(fā)送消息完,只等待Leader寫入成功就返回了,Leader分區(qū)丟失了,此時(shí)Follower沒來及同步,消息丟失。

    2.6.5.3.3 unclean.leader.election.enable 配置true

    允許選舉ISR以外的副本作為leader,會(huì)導(dǎo)致數(shù)據(jù)丟失,默認(rèn)為false。生產(chǎn)者發(fā)送異步消息,只等待Lead寫入成功就返回,Leader分區(qū)丟失,此時(shí)ISR中沒有Follower,Leader從OSR中選舉,因?yàn)镺SR中本來落后于Leader造成消息丟失。

    2.6.5.4 解決生產(chǎn)者和broke階段消息丟失

    2.6.5.4.1 禁用unclean選舉,ack=all

    ack=all / -1,tries > 1,unclean.leader.election.enable : false

    生產(chǎn)者發(fā)完消息,等待Follower同步完再返回,如果異常則重試。副本的數(shù)量可能影響吞吐量,不超過5個(gè),一般三個(gè)。

    不允許unclean Leader選舉。

    2.6.5.4.2 配置:min.insync.replicas > 1

    當(dāng)生產(chǎn)者將 acks 設(shè)置為 all (或 -1 )時(shí), min.insync.replicas>1 。指定確認(rèn)消息寫成功需要的最小副本數(shù)量。達(dá)不到這個(gè)最小值,生產(chǎn)者將引發(fā)一個(gè)異常(要么是NotEnoughReplicas,要么是NotEnoughReplicasAfterAppend)。

    當(dāng)一起使用時(shí), min.insync.replicas 和 ack 允許執(zhí)行更大的持久性保證。一個(gè)典型的場景是創(chuàng)建一個(gè)復(fù)制因子為3的主題,設(shè)置min.insync復(fù)制到2個(gè),用 all 配置發(fā)送。將確保如果大多數(shù)副本沒有
    收到寫操作,則生產(chǎn)者將引發(fā)異常。

    2.6.5.4.3 失敗的offset單獨(dú)記錄

    生產(chǎn)者發(fā)送消息,會(huì)自動(dòng)重試,遇到不可恢復(fù)異常會(huì)拋出,這時(shí)可以捕獲異常記錄到數(shù)據(jù)庫或緩存,進(jìn)行單獨(dú)處理。

    2.6.5.5 消費(fèi)者數(shù)據(jù)重復(fù)場景及解決方案

    2.6.5.5.1 根本原因

    數(shù)據(jù)消費(fèi)完沒有及時(shí)提交offset到broker。

    2.6.5.5.2 場景

    消息消費(fèi)端在消費(fèi)過程中掛掉沒有及時(shí)提交offset到broke,另一個(gè)消費(fèi)端啟動(dòng)拿之前記錄的offset開始消費(fèi),由于offset的滯后性可能會(huì)導(dǎo)致新啟動(dòng)的客戶端有少量重復(fù)消費(fèi)。

    2.6.5.6 解決方案

    2.6.5.6.1 取消自動(dòng)提交

    每次消費(fèi)完或者程序退出時(shí)手動(dòng)提交。這可能也沒法保證一條重復(fù)。

    2.6.5.6.2 下游做冪等

    一般是讓下游做冪等或者盡量每消費(fèi)一條消息都記錄offset,對于少數(shù)嚴(yán)格的場景可能需要把offset或唯一ID(例如訂單ID)和下游狀態(tài)更新放在同一個(gè)數(shù)據(jù)庫里面做事務(wù)來保證精確的一次更新或者在下游數(shù)據(jù)表里面同時(shí)記錄消費(fèi)offset,然后更新下游數(shù)據(jù)的時(shí)候用消費(fèi)位移做樂觀鎖拒絕舊位移的數(shù)據(jù)更新。

    2.6.6 __consumer_offsets

    Zookeeper不適合大批量的頻繁寫入操作。
    Kafka 1.0.2將consumer的位移信息保存在Kafka內(nèi)部的topic中,即__consumer_offsets主題,并且默認(rèn)提供了kafka_consumer_groups.sh腳本供用戶查看consumer信息。

  • 創(chuàng)建topic “tp_test_01”
  • [root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --create --topic tp_test_01 --partitions 5 --replication-factor 1
  • 使用kafka-console-producer.sh腳本生產(chǎn)消息
  • [root@node1 ~]# for i in `seq 100`; do echo "hello lagou $i" >> messages.txt; done [root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 --topic tp_test_01 < messages.txt

    由于默認(rèn)沒有指定key,所以根據(jù)round-robin方式,消息分布到不同的分區(qū)上。 (本例中生產(chǎn)了100條消息)

  • 驗(yàn)證消息生產(chǎn)成功
  • [root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 --topic tp_test_01 < messages.txt >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> [root@node1 ~]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list node1:9092 --topic tp_test_01 --time -1 #顯示結(jié)果 tp_test_01:2:20 tp_test_01:4:20 tp_test_01:1:20 tp_test_01:3:20 tp_test_01:0:20 [root@node1 ~]#

    結(jié)果輸出表明100條消息全部生產(chǎn)成功!

  • 創(chuàng)建一個(gè)console consumer group
  • [root@node1 ~]#kafka-console-consumer.sh --bootstrap-server node1:9092 --topic tp_test_01 --from-beginning
  • 獲取該consumer group的group id(后面需要根據(jù)該id查詢它的位移信息)
  • [root@node1 ~]# kafka-consumer-groups.sh --bootstrap-server node1:9092 --list

    輸出: console-consumer-49366 (記住這個(gè)id!)

  • 查詢__consumer_offsets topic所有內(nèi)容
    注意:運(yùn)行下面命令前先要在consumer.properties中設(shè)置exclude.internal.topics=false
  • [root@node1 ~]# kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server node1:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config /opt/lagou/kafka/config/consumer.properties --from-beginning

    默認(rèn)情況下__consumer_offsets有50個(gè)分區(qū),如果你的系統(tǒng)中consumer group也很多的話,那么這個(gè)命令的輸出結(jié)果會(huì)很多。

  • 計(jì)算指定consumer group在__consumer_offsets topic中分區(qū)信息
    這時(shí)候就用到了第5步獲取的group.id(本例中是console-consumer-49366)。Kafka會(huì)使用下面公式計(jì)算該group位移保存在__consumer_offsets的哪個(gè)分區(qū)上:
  • Math.abs(groupID.hashCode()) % numPartitions


    對應(yīng)的分區(qū)=Math.abs(“console-consumer-49366”.hashCode()) % 50 = 19,即__consumer_offsets的分區(qū)19保存了這個(gè)consumer group的位移信息。

  • 獲取指定consumer group的位移信息
  • [root@node1 ~]# kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 19 --broker-list node1:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"

    下面是輸出結(jié)果:

    上圖可見,該consumer group果然保存在分區(qū)11上,且位移信息都是對的(這里的位移信息是已消費(fèi)的位移,嚴(yán)格來說不是第3步中的位移。由于我的consumer已經(jīng)消費(fèi)完了所有的消息,所以這里的位移與第3步中的位移相同)。另外,可以看到__consumer_offsets topic的每一日志項(xiàng)的格式都是:
    [Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]。

    2.7 延時(shí)隊(duì)列

    兩個(gè)follower副本都已經(jīng)拉取到了leader副本的最新位置,此時(shí)又向leader副本發(fā)送拉取請求,而leader副本并沒有新的消息寫入,那么此時(shí)leader副本該如何處理呢?可以直接返回空的拉取結(jié)果給follower副本,不過在leader副本一直沒有新消息寫入的情況下,follower副本會(huì)一直發(fā)送拉取請求,并且總收到空的拉取結(jié)果,消耗資源。

    Kafka在處理拉取請求時(shí),會(huì)先讀取一次日志文件,如果收集不到足夠多(fetchMinBytes,由參數(shù)fetch.min.bytes配置,默認(rèn)值為1)的消息,那么就會(huì)創(chuàng)建一個(gè)延時(shí)拉取操作(DelayedFetch)以等待
    拉取到足夠數(shù)量的消息。當(dāng)延時(shí)拉取操作執(zhí)行時(shí),會(huì)再讀取一次日志文件,然后將拉取結(jié)果返回給follower副本。

    延遲操作不只是拉取消息時(shí)的特有操作,在Kafka中有多種延時(shí)操作,比如延時(shí)數(shù)據(jù)刪除、延時(shí)生產(chǎn)等。

    對于延時(shí)生產(chǎn)(消息)而言,如果在使用生產(chǎn)者客戶端發(fā)送消息的時(shí)候?qū)cks參數(shù)設(shè)置為-1,那么就意味著需要等待ISR集合中的所有副本都確認(rèn)收到消息之后才能正確地收到響應(yīng)的結(jié)果,或者捕獲超時(shí)異常。

    假設(shè)某個(gè)分區(qū)有3個(gè)副本:leader、follower1和follower2,它們都在分區(qū)的ISR集合中。不考慮ISR變動(dòng)的情況,Kafka在收到客戶端的生產(chǎn)請求后,將消息3和消息4寫入leader副本的本地日志文件。

    由于客戶端設(shè)置了acks為-1,那么需要等到follower1和follower2兩個(gè)副本都收到消息3和消息4后才能告知客戶端正確地接收了所發(fā)送的消息。如果在一定的時(shí)間內(nèi),follower1副本或follower2副本沒能夠完全拉取到消息3和消息4,那么就需要返回超時(shí)異常給客戶端。生產(chǎn)請求的超時(shí)時(shí)間由參數(shù)request.timeout.ms配置,默認(rèn)值為30000,即30s。

    那么這里等待消息3和消息4寫入follower1副本和follower2副本,并返回相應(yīng)的響應(yīng)結(jié)果給客戶端的動(dòng)作是由誰來執(zhí)行的呢?在將消息寫入leader副本的本地日志文件之后,Kafka會(huì)創(chuàng)建一個(gè)延時(shí)的生
    產(chǎn)操作(DelayedProduce),用來處理消息正常寫入所有副本或超時(shí)的情況,以返回相應(yīng)的響應(yīng)結(jié)果給客戶端。

    延時(shí)操作需要延時(shí)返回響應(yīng)的結(jié)果,首先它必須有一個(gè)超時(shí)時(shí)間(delayMs),如果在這個(gè)超時(shí)時(shí)間內(nèi)沒有完成既定的任務(wù),那么就需要強(qiáng)制完成以返回響應(yīng)結(jié)果給客戶端。其次,延時(shí)操作不同于定時(shí)
    操作,定時(shí)操作是指在特定時(shí)間之后執(zhí)行的操作,而延時(shí)操作可以在所設(shè)定的超時(shí)時(shí)間之前完成,所以延時(shí)操作能夠支持外部事件的觸發(fā)。

    就延時(shí)生產(chǎn)操作而言,它的外部事件是所要寫入消息的某個(gè)分區(qū)的HW(高水位)發(fā)生增長。也就是說,隨著follower副本不斷地與leader副本進(jìn)行消息同步,進(jìn)而促使HW進(jìn)一步增長,HW每增長一次
    都會(huì)檢測是否能夠完成此次延時(shí)生產(chǎn)操作,如果可以就執(zhí)行以此返回響應(yīng)結(jié)果給客戶端;如果在超時(shí)時(shí)間內(nèi)始終無法完成,則強(qiáng)制執(zhí)行。

    延時(shí)拉取操作,是由超時(shí)觸發(fā)或外部事件觸發(fā)而被執(zhí)行的。超時(shí)觸發(fā)很好理解,就是等到超時(shí)時(shí)間之后觸發(fā)第二次讀取日志文件的操作。外部事件觸發(fā)就稍復(fù)雜了一些,因?yàn)槔≌埱蟛粏螁斡蒮ollower副本發(fā)起,也可以由消費(fèi)者客戶端發(fā)起,兩種情況所對應(yīng)的外部事件也是不同的。如果是follower副本的延時(shí)拉取,它的外部事件就是消息追加到了leader副本的本地日志文件中;如果是消費(fèi)者客戶端的延
    時(shí)拉取,它的外部事件可以簡單地理解為HW的增長。

    時(shí)間輪實(shí)現(xiàn)延時(shí)隊(duì)列。
    TimeWheel。size,每個(gè)單元格的時(shí)間,每個(gè)單元格都代表一個(gè)時(shí)間,size*每個(gè)單元格的時(shí)間就是一個(gè)周期。

    2.8 重試隊(duì)列

    kafka沒有重試機(jī)制不支持消息重試,也沒有死信隊(duì)列,因此使用kafka做消息隊(duì)列時(shí),需要自己實(shí)現(xiàn)消息重試的功能。

    自定義實(shí)現(xiàn)步驟

    創(chuàng)建新的kafka主題作為重試隊(duì)列:

  • 創(chuàng)建一個(gè)topic作為重試topic,用于接收等待重試的消息。
  • 普通topic消費(fèi)者設(shè)置待重試消息的下一個(gè)重試topic。
  • 從重試topic獲取待重試消息儲(chǔ)存到redis的zset中,并以下一次消費(fèi)時(shí)間排序
  • 定時(shí)任務(wù)從redis獲取到達(dá)消費(fèi)事件的消息,并把消息發(fā)送到對應(yīng)的topic
  • 同一個(gè)消息重試次數(shù)過多則不再重試
  • 代碼實(shí)現(xiàn)

  • 新建springboot項(xiàng)目
    版本:2.2.8
    dependencies選擇如下三個(gè):

    pom.xml
  • <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.8.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.lagou.kafka.demo</groupId><artifactId>demo-retryqueue</artifactId><version>0.0.1-SNAPSHOT</version><name>demo-retryqueue</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.73</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
  • 添加application.properties
  • # bootstrap.servers spring.kafka.bootstrap-servers=node1:9092 # key序列化器 spring.kafka.producer.key- serializer=org.apache.kafka.common.serialization.StringSerializer # value序列化器 spring.kafka.producer.value- serializer=org.apache.kafka.common.serialization.StringSerializer # 消費(fèi)組id:group.id spring.kafka.consumer.group-id=retryGroup # key反序列化器 spring.kafka.consumer.key- deserializer=org.apache.kafka.common.serialization.StringDeserializer # value反序列化器 spring.kafka.consumer.value- deserializer=org.apache.kafka.common.serialization.StringDeserializer # redis數(shù)據(jù)庫編號 spring.redis.database=0 # redis主機(jī)地址 spring.redis.host=node1 # redis端口 spring.redis.port=6379 # Redis服務(wù)器連接密碼(默認(rèn)為空) spring.redis.password= # 連接池最大連接數(shù)(使用負(fù)值表示沒有限制) spring.redis.jedis.pool.max-active=20 # 連接池最大阻塞等待時(shí)間(使用負(fù)值表示沒有限制) spring.redis.jedis.pool.max-wait=-1 # 連接池中的最大空閑連接 spring.redis.jedis.pool.max-idle=10 # 連接池中的最小空閑連接 spring.redis.jedis.pool.min-idle=0 # 連接超時(shí)時(shí)間(毫秒) spring.redis.timeout=1000 # Kafka主題名稱 spring.kafka.topics.test=tp_demo_retry_01 # 重試隊(duì)列 spring.kafka.topics.retry=tp_demo_retry_02
  • RetryqueueApplication.java
  • package com.lagou.kafka.demo;import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class RetryqueueApplication {public static void main(String[] args) {SpringApplication.run(RetryqueueApplication.class, args);}}
  • AppConfig.java
  • package com.lagou.kafka.demo.config;import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate;// 配置redis template @Configuration public class AppConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<>();// 配置連接工廠template.setConnectionFactory(factory);return template;}}
  • KafkaController.java
  • package com.lagou.kafka.demo.controller;import com.lagou.kafka.demo.service.KafkaService; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;import java.util.concurrent.ExecutionException;@RestController public class RetryController {@Autowiredprivate KafkaService kafkaService;@Value("${spring.kafka.topics.test}")private String topic;@RequestMapping("/send/{message}")public String sendMessage(@PathVariable String message) throws ExecutionException, InterruptedException {ProducerRecord<String, String> record = new ProducerRecord<>(topic,message);// 向業(yè)務(wù)主題發(fā)送消息String result = kafkaService.sendMessage(record);return result;}}
  • KafkaService.java
  • package com.lagou.kafka.demo.service;import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service;import java.util.concurrent.ExecutionException;@Service public class KafkaService {private Logger log = LoggerFactory.getLogger(KafkaService.class);// 標(biāo)紅不管@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public String sendMessage(ProducerRecord<String, String> record) throws ExecutionException, InterruptedException {SendResult<String, String> result = this.kafkaTemplate.send(record).get();RecordMetadata metadata = result.getRecordMetadata();String returnResult = metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset();log.info("發(fā)送消息成功:" + returnResult);return returnResult;}}
  • ConsumerListener.java
  • package com.lagou.kafka.demo.listener;import com.lagou.kafka.demo.service.RetryService; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;@Component public class ConsumerListener {private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class);@Autowiredprivate RetryService kafkaRetryService;private static int index = 0;// 拉取下面主題的消息@KafkaListener(topics = "${spring.kafka.topics.test}", groupId = "${spring.kafka.consumer.group-id}")public void consume(ConsumerRecord<String, String> record) {try {// 業(yè)務(wù)處理log.info("消費(fèi)的消息:" + record);index++;if (index % 2 == 0) {throw new Exception("該重發(fā)了");}} catch (Exception e) {log.error(e.getMessage());// 消息重試,實(shí)際上先將消息放到redis, 再從redis放到消息隊(duì)列中kafkaRetryService.consumerLater(record);}}}
  • KafkaRetryService.java
  • package com.lagou.kafka.demo.service;import com.alibaba.fastjson.JSON; import com.lagou.kafka.demo.entity.RetryRecord; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Header; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service;import java.nio.ByteBuffer; import java.util.Calendar; import java.util.Date;@Service public class RetryService {private static final Logger log = LoggerFactory.getLogger(RetryService.class);/*** 消息消費(fèi)失敗后下一次消費(fèi)的延遲時(shí)間(秒)* 第一次重試延遲10秒;第 二次延遲30秒,第三次延遲1分鐘...*/private static final int[] RETRY_INTERVAL_SECONDS = {10, 30, 1*60, 2*60, 5*60, 10*60, 30*60, 1*60*60, 2*60*60};/*** 重試topic*/@Value("${spring.kafka.topics.retry}")private String retryTopic;@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void consumerLater(ConsumerRecord<String, String> record){// 獲取消息的已重試次數(shù)int retryTimes = getRetryTimes(record);Date nextConsumerTime = getNextConsumerTime(retryTimes);// 如果達(dá)到重試次數(shù),則不再重試if(nextConsumerTime == null) {return;}// 組織消息RetryRecord retryRecord = new RetryRecord();retryRecord.setNextTime(nextConsumerTime.getTime());retryRecord.setTopic(record.topic());retryRecord.setRetryTimes(retryTimes);retryRecord.setKey(record.key());retryRecord.setValue(record.value());// 轉(zhuǎn)換為字符串String value = JSON.toJSONString(retryRecord);// 發(fā)送到重試隊(duì)列kafkaTemplate.send(retryTopic, null, value);}/*** 獲取消息的已重試次數(shù)*/private int getRetryTimes(ConsumerRecord record){int retryTimes = -1;for(Header header : record.headers()){if(RetryRecord.KEY_RETRY_TIMES.equals(header.key())){ByteBuffer buffer = ByteBuffer.wrap(header.value());retryTimes = buffer.getInt();}}retryTimes++;return retryTimes;}/*** 獲取待重試消息的下一次消費(fèi)時(shí)間*/private Date getNextConsumerTime(int retryTimes){// 重試次數(shù)超過上限,不再重試if(RETRY_INTERVAL_SECONDS.length < retryTimes) {return null;}Calendar calendar = Calendar.getInstance();calendar.add(Calendar.SECOND, RETRY_INTERVAL_SECONDS[retryTimes]);return calendar.getTime();} }
  • RetryListener.java
  • package com.lagou.kafka.demo.listener;import com.alibaba.fastjson.JSON; import com.lagou.kafka.demo.entity.RetryRecord; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.ZSetOperations; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component;import java.util.Set; import java.util.UUID;@Component // 下面注解表示開啟調(diào)度 @EnableScheduling public class RetryListener {private Logger log = LoggerFactory.getLogger(RetryListener.class);private static final String RETRY_KEY_ZSET = "_retry_key"; // 時(shí)間private static final String RETRY_VALUE_MAP = "_retry_value"; // 消息@Autowiredprivate RedisTemplate<String,Object> redisTemplate;@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;//@Value("${spring.kafka.topics.test}")private String bizTopic;@KafkaListener(topics = "${spring.kafka.topics.retry}") // 只要有消息就取出來 // public void consume(List<ConsumerRecord<String, String>> list) { // for(ConsumerRecord<String, String> record : list){public void consume(ConsumerRecord<String, String> record) {System.out.println("需要重試的消息:" + record);RetryRecord retryRecord = JSON.parseObject(record.value(), RetryRecord.class);/*** 防止待重試消息太多撐爆redis,可以將待重試消息按下一次重試時(shí)間分開存儲(chǔ)放到不同介質(zhì)* 例如下一次重試時(shí)間在半小時(shí)以后的消息儲(chǔ)存到mysql,并定時(shí)從mysql讀取即將重試的消息儲(chǔ)儲(chǔ)存到redis*/// 通過redis的zset進(jìn)行時(shí)間排序String key = UUID.randomUUID().toString();redisTemplate.opsForHash().put(RETRY_VALUE_MAP, key, record.value());redisTemplate.opsForZSet().add(RETRY_KEY_ZSET, key, retryRecord.getNextTime());} // }/*** 定時(shí)任務(wù)從redis讀取到達(dá)重試時(shí)間的消息,發(fā)送到對應(yīng)的topic*/ // @Scheduled(cron="2 * * * * *")@Scheduled(fixedDelay = 2000)public void retryFromRedis() {log.warn("retryFromRedis----begin");long currentTime = System.currentTimeMillis();// 根據(jù)時(shí)間倒序獲取Set<ZSetOperations.TypedTuple<Object>> typedTuples =redisTemplate.opsForZSet().reverseRangeByScoreWithScores(RETRY_KEY_ZSET, 0, currentTime);// 移除取出的消息redisTemplate.opsForZSet().removeRangeByScore(RETRY_KEY_ZSET, 0, currentTime);for(ZSetOperations.TypedTuple<Object> tuple : typedTuples){String key = tuple.getValue().toString();String value = redisTemplate.opsForHash().get(RETRY_VALUE_MAP, key).toString();redisTemplate.opsForHash().delete(RETRY_VALUE_MAP, key);RetryRecord retryRecord = JSON.parseObject(value, RetryRecord.class);ProducerRecord record = retryRecord.parse();ProducerRecord recordReal = new ProducerRecord(bizTopic,record.partition(),record.timestamp(),record.key(),record.value(),record.headers());kafkaTemplate.send(recordReal);}// todo 發(fā)生異常將發(fā)送失敗的消息重新發(fā)送到redis} }
  • RetryRecord.java
  • package com.lagou.kafka.demo.entity;import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader;import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List;public class RetryRecord {public static final String KEY_RETRY_TIMES = "retryTimes";private String key;private String value;private Integer retryTimes;private String topic;private Long nextTime;public RetryRecord() {}public String getKey() {return key;}public void setKey(String key) {this.key = key;}public String getValue() {return value;}public void setValue(String value) {this.value = value;}public Integer getRetryTimes() {return retryTimes;}public void setRetryTimes(Integer retryTimes) {this.retryTimes = retryTimes;}public String getTopic() {return topic;}public void setTopic(String topic) {this.topic = topic;}public Long getNextTime() {return nextTime;}public void setNextTime(Long nextTime) {this.nextTime = nextTime;}public ProducerRecord parse() { // 解析成ProducerRecordInteger partition = null;Long timestamp = System.currentTimeMillis();List<Header> headers = new ArrayList<>();ByteBuffer retryTimesBuffer = ByteBuffer.allocate(4);retryTimesBuffer.putInt(retryTimes);retryTimesBuffer.flip();headers.add(new RecordHeader(RetryRecord.KEY_RETRY_TIMES, retryTimesBuffer));ProducerRecord sendRecord = new ProducerRecord(topic, partition, timestamp, key, value, headers);return sendRecord;} }

    總結(jié)

    以上是生活随笔為你收集整理的4.2.4 Kafka高级特性解析(物理存储、稳定性:事物,控制器,可靠性,一致性,_consumer_offsets、延时队列、自定义重试队列)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。