4.2.4 Kafka高级特性解析(物理存储、稳定性:事物,控制器,可靠性,一致性,_consumer_offsets、延时队列、自定义重试队列)
Kafka高級特性解析
文章目錄
- Kafka高級特性解析
- 2.5 物理存儲(chǔ)
- 2.5.1 日志存儲(chǔ)概述
- 2.5.2 日志存儲(chǔ)
- 2.5.2.1 索引
- 2.5.2.1.1 偏移量
- 2.5.2.1.2 時(shí)間戳
- 2.5.2.2 清理
- 2.5.2.2.1 日志刪除
- 2.5.2.2.2 日志壓縮策略
- 2.5.3 磁盤存儲(chǔ)
- 2.5.3.1 零拷貝
- 2.5.3.2 頁緩存
- 2.5.3.3 順序?qū)懭?/li>
- 2.6 穩(wěn)定性
- 2.6.1 事務(wù)
- 2.6.1.1 冪等性
- 2.6.1.2 事務(wù)操作
- 2.6.2 控制器
- 2.6.2.1 broker選舉
- 2.6.3 可靠性保證
- 2.6.3.1 失效副本
- 2.6.3.2 副本復(fù)制
- 2.6.4 一致性保證
- **一、概念**
- 二、Follower副本何時(shí)更新LEO
- 三、Follower副本何時(shí)更新HW
- 四、Leader副本何時(shí)更新LEO
- 五、Leader副本何時(shí)更新HW值
- 六、HW和LEO正常更新案例
- 七、HW和LEO異常案例
- 八、Leader Epoch使用
- Kafka解決方案
- 規(guī)避數(shù)據(jù)丟失
- 規(guī)避數(shù)據(jù)不一致
- 2.6.5 消息重復(fù)的場景及解決方案
- 2.6.5.1 生產(chǎn)者階段重復(fù)場景
- 2.6.5.1.1 根本原因
- 2.6.5.1.2 重試過程
- 2.6.5.1.3 可恢復(fù)異常說明
- 2.6.5.1.4 記錄順序問題
- 2.6.5.2 生產(chǎn)者發(fā)送重復(fù)解決方案
- 2.6.5.2.1 啟動(dòng)kafka的冪等性
- 2.6.5.2.2 ack=0,不重試。
- 2.6.5.3 生產(chǎn)者和broke階段消息丟失場景
- 2.6.5.3.1 ack=0,不重試
- 2.6.5.3.2 ack=1,leader crash
- 2.6.5.3.3 unclean.leader.election.enable 配置true
- 2.6.5.4 解決生產(chǎn)者和broke階段消息丟失
- 2.6.5.4.1 禁用unclean選舉,ack=all
- 2.6.5.4.2 配置:min.insync.replicas > 1
- 2.6.5.4.3 失敗的offset單獨(dú)記錄
- 2.6.5.5 消費(fèi)者數(shù)據(jù)重復(fù)場景及解決方案
- 2.6.5.5.1 根本原因
- 2.6.5.5.2 場景
- 2.6.5.6 解決方案
- 2.6.5.6.1 取消自動(dòng)提交
- 2.6.5.6.2 下游做冪等
- 2.6.6 __consumer_offsets
- 2.7 延時(shí)隊(duì)列
- 2.8 重試隊(duì)列
- 自定義實(shí)現(xiàn)步驟
- 代碼實(shí)現(xiàn)
2.5 物理存儲(chǔ)
2.5.1 日志存儲(chǔ)概述
Kafka 消息是以主題為單位進(jìn)行歸類,各個(gè)主題之間是彼此獨(dú)立的,互不影響。
每個(gè)主題又可以分為一個(gè)或多個(gè)分區(qū)。
每個(gè)分區(qū)各自存在一個(gè)記錄消息數(shù)據(jù)的日志文件。
圖中,創(chuàng)建了一個(gè) tp_demo_01 主題,其存在6個(gè) Parition,對應(yīng)的每個(gè)Parition下存在一個(gè)[Topic-Parition] 命名的消息日志文件。在理想情況下,數(shù)據(jù)流量分?jǐn)偟礁鱾€(gè) Parition 中,實(shí)現(xiàn)了負(fù)載均衡的效果。在分區(qū)日志文件中,你會(huì)發(fā)現(xiàn)很多類型的文件,比如: .index、.timestamp、.log、.snapshot 等。
其中,文件名一致的文件集合就稱為 LogSement。
LogSegment
日志文件存在多種后綴文件,重點(diǎn)需要關(guān)注 .index、.timestamp、.log 三種類型。
類別作用
每個(gè) LogSegment 都有一個(gè)基準(zhǔn)偏移量,表示當(dāng)前 LogSegment 中第一條消息的 offset。
偏移量是一個(gè) 64 位的長整形數(shù),固定是20位數(shù)字,長度未達(dá)到,用 0 進(jìn)行填補(bǔ),索引文件和日志文件都由該作為文件名命名規(guī)則(00000000000000000000.index、00000000000000000000.timestamp、00000000000000000000.log)。
如果日志文件名為 00000000000000000121.log ,則當(dāng)前日志文件的一條數(shù)據(jù)偏移量就是121(偏移量從 0 開始)。
日志與索引文件
配置項(xiàng)默認(rèn)值說明
偏移量索引文件用于記錄消息偏移量與物理地址之間的映射關(guān)系。
時(shí)間戳索引文件則根據(jù)時(shí)間戳查找對應(yīng)的偏移量。
Kafka 中的索引文件是以稀疏索引的方式構(gòu)造消息的索引,并不保證每一個(gè)消息在索引文件中都有對應(yīng)的索引項(xiàng)。
每當(dāng)寫入一定量的消息時(shí),偏移量索引文件和時(shí)間戳索引文件分別增加一個(gè)偏移量索引項(xiàng)和時(shí)間戳索引項(xiàng)。
通過修改 log.index.interval.bytes 的值,改變索引項(xiàng)的密度。
切分文件
當(dāng)滿足如下幾個(gè)條件中的其中之一,就會(huì)觸發(fā)文件的切分:
log.segment.bytes 參數(shù)的默認(rèn)值為 1073741824,即 1GB。
為什么是 Integer.MAX_VALUE ?
1024 * 1024 * 1024=1073741824
在偏移量索引文件中,每個(gè)索引項(xiàng)共占用 8 個(gè)字節(jié),并分為兩部分。
相對偏移量和物理地址。
相對偏移量:表示消息相對與基準(zhǔn)偏移量的偏移量,占 4 個(gè)字節(jié)
物理地址:消息在日志分段文件中對應(yīng)的物理位置,也占 4 個(gè)字節(jié)
4 個(gè)字節(jié)剛好對應(yīng) Integer.MAX_VALUE ,如果大于Integer.MAX_VALUE ,則不能用 4 個(gè)字節(jié)進(jìn)行表示了。
索引文件切分過程
索引文件會(huì)根據(jù) log.index.size.max.bytes 值進(jìn)行預(yù)先分配空間,即文件創(chuàng)建的時(shí)候就是最大值
當(dāng)真正的進(jìn)行索引文件切分的時(shí)候,才會(huì)將其裁剪到實(shí)際數(shù)據(jù)大小的文件。
這一點(diǎn)是跟日志文件有所區(qū)別的地方。其意義降低了代碼邏輯的復(fù)雜性。
2.5.2 日志存儲(chǔ)
2.5.2.1 索引
偏移量索引文件用于記錄消息偏移量與物理地址之間的映射關(guān)系。時(shí)間戳索引文件則根據(jù)時(shí)間戳查找對應(yīng)的偏移量。
文件:
查看一個(gè)topic分區(qū)目錄下的內(nèi)容,發(fā)現(xiàn)有l(wèi)og、index和timeindex三個(gè)文件:
1、創(chuàng)建主題:
2、創(chuàng)建消息文件:
[root@linux121 ~]# for i in `seq 10000000`; do echo "hello lagou $i" >> nmm.txt; done
3、將文本消息生產(chǎn)到主題中:
4、查看存儲(chǔ)文件:
[root@linux121 ~]# cd /var/lagou/kafka/kafka-logs/cd tp_demo_05-0 [root@linux121 ~]# ll
如果想查看這些文件,可以使用kafka提供的shell來完成,幾個(gè)關(guān)鍵信息如下:
(1)offset是逐漸增加的整數(shù),每個(gè)offset對應(yīng)一個(gè)消息的偏移量。
(2)position:消息批字節(jié)數(shù),用于計(jì)算物理地址。
(3)CreateTime:時(shí)間戳。
(4)magic:2代表這個(gè)消息類型是V2,如果是0則代表是V0類型,1代表V1類型。
(5)compresscodec:None說明沒有指定壓縮類型,kafka目前提供了4種可選擇,0-None、1-
GZIP、2-snappy、3-lz4。 (6)crc:對所有字段進(jìn)行校驗(yàn)后的crc值。
關(guān)于消息偏移量:
一、消息存儲(chǔ)
消費(fèi)者有offset。下圖中,消費(fèi)者A消費(fèi)的offset是9,消費(fèi)者B消費(fèi)的offset是11,不同的消費(fèi)者offset是交給一個(gè)內(nèi)部公共topic來記錄的。
(3)時(shí)間戳索引文件,它的作用是可以讓用戶查詢某個(gè)時(shí)間段內(nèi)的消息,它一條數(shù)據(jù)的結(jié)構(gòu)是時(shí)間戳(8byte)+相對offset(4byte),如果要使用這個(gè)索引文件,首先需要通過時(shí)間范圍,找到對應(yīng)的相對offset,然后再去對應(yīng)的index文件找到position信息,然后才能遍歷log文件,它也是需要使用上面說的index文件的。
但是由于producer生產(chǎn)消息可以指定消息的時(shí)間戳,這可能將導(dǎo)致消息的時(shí)間戳不一定有先后順序,因此盡量不要生產(chǎn)消息時(shí)指定時(shí)間戳。
2.5.2.1.1 偏移量
稀疏索引,索引密度不高,但是offset有序,二分查找的時(shí)間復(fù)雜度為O(lgN),如果從頭遍歷時(shí)間復(fù)雜度是O(N)。
示意圖如下:
偏移量索引由相對偏移量和物理地址組成。
可以通過如下命令解析 .index 文件
注意:offset 與 position 沒有直接關(guān)系,因?yàn)闀?huì)刪除數(shù)據(jù)和清理日志。
在偏移量索引文件中,索引數(shù)據(jù)都是順序記錄 offset ,但時(shí)間戳索引文件中每個(gè)追加的索引時(shí)間戳必須大于之前追加的索引項(xiàng),否則不予追加。在 Kafka 0.11.0.0 以后,消息元數(shù)據(jù)中存在若干的時(shí)間戳信息。如果 broker 端參數(shù) log.message.timestamp.type 設(shè)置為LogAppendTIme ,那么時(shí)間戳必定能保持單調(diào)增長。反之如果是CreateTime 則無法保證順序。
注意:timestamp文件中的 offset 與 index 文件中的 relativeOffset 不是一一對應(yīng)的。因?yàn)閿?shù)據(jù)的寫入是各自追加。
思考:如何查看偏移量為23的消息?
Kafka 中存在一個(gè) ConcurrentSkipListMap 來保存在每個(gè)日志分段,通過跳躍表方式,定位到在00000000000000000000.index ,通過二分法在偏移量索引文件中找到不大于 23 的最大索引項(xiàng),即offset 20 那欄,然后從日志分段文件中的物理位置為320 開始順序查找偏移量為 23 的消息。
2.5.2.1.2 時(shí)間戳
在偏移量索引文件中,索引數(shù)據(jù)都是順序記錄 offset ,但時(shí)間戳索引文件中每個(gè)追加的索引時(shí)間戳必須大于之前追加的索引項(xiàng),否則不予追加。在 Kafka 0.11.0.0 以后,消息信息中存在若干的時(shí)間戳信息。如果 broker 端參數(shù) log.message.timestamp.type 設(shè)置為LogAppendTIme ,那么時(shí)間戳必定能保持單調(diào)增長。反之如果是 CreateTime 則無法保證順序。
通過時(shí)間戳方式進(jìn)行查找消息,需要通過查找時(shí)間戳索引和偏移量索引兩個(gè)文件。
時(shí)間戳索引索引格式:前八個(gè)字節(jié)表示時(shí)間戳,后四個(gè)字節(jié)表示偏移量。
思考:查找時(shí)間戳為 1557554753430 開始的消息?
注意:timestamp文件中的 offset 與 index 文件中的 relativeOffset 不是一一對應(yīng)的,因?yàn)閿?shù)據(jù)的寫入是各自追加。
2.5.2.2 清理
Kafka 提供兩種日志清理策略:
日志刪除:按照一定的刪除策略,將不滿足條件的數(shù)據(jù)進(jìn)行數(shù)據(jù)刪除
日志壓縮:針對每個(gè)消息的 Key 進(jìn)行整合,對于有相同 Key 的不同 Value 值,只保留最后一個(gè)版本。
Kafka 提供 log.cleanup.policy 參數(shù)進(jìn)行相應(yīng)配置,默認(rèn)值: delete ,還可以選擇compact 。
主題級別的配置項(xiàng)是 cleanup.policy 。
2.5.2.2.1 日志刪除
基于時(shí)間
日志刪除任務(wù)會(huì)根據(jù) log.retention.hours/log.retention.minutes/log.retention.ms 設(shè)
定日志保留的時(shí)間節(jié)點(diǎn)。如果超過該設(shè)定值,就需要進(jìn)行刪除。默認(rèn)是 7 天, log.retention.ms 優(yōu)先級最高。
Kafka 依據(jù)日志分段中最大的時(shí)間戳進(jìn)行定位。
首先要查詢該日志分段所對應(yīng)的時(shí)間戳索引文件,查找時(shí)間戳索引文件中最后一條索引項(xiàng),若最后一條索引項(xiàng)的時(shí)間戳字段值大于 0,則取該值,否則取最近修改時(shí)間。
為什么不直接選最近修改時(shí)間呢?
因?yàn)槿罩疚募梢杂幸鉄o意的被修改,并不能真實(shí)的反應(yīng)日志分段的最大時(shí)間信息。
刪除過程
如果活躍的日志分段中也存在需要?jiǎng)h除的數(shù)據(jù)時(shí)?
Kafka 會(huì)先切分出一個(gè)新的日志分段作為活躍日志分段,該日志分段不刪除,刪除原來的日志分段。
先騰出地方,再刪除。
基于日志大小
日志刪除任務(wù)會(huì)檢查當(dāng)前日志的大小是否超過設(shè)定值。設(shè)定項(xiàng)為 log.retention.bytes ,單個(gè)日志分段的大小由 log.segment.bytes 進(jìn)行設(shè)定。
刪除過程
基于偏移量
根據(jù)日志分段的下一個(gè)日志分段的起始偏移量是否大于等于日志文件的起始偏移量,若是,則可以刪除此日志分段。
注意:日志文件的起始偏移量并不一定等于第一個(gè)日志分段的基準(zhǔn)偏移量,存在數(shù)據(jù)刪除,可能與之相等的那條數(shù)據(jù)已經(jīng)被刪除了。
刪除過程
2.5.2.2.2 日志壓縮策略
日志壓縮是Kafka的一種機(jī)制,可以提供較為細(xì)粒度的記錄保留,而不是基于粗粒度的基于時(shí)間的
保留。
對于具有相同的Key,而數(shù)據(jù)不同,只保留最后一條數(shù)據(jù),前面的數(shù)據(jù)在合適的情況下刪除。
日志壓縮特性,就實(shí)時(shí)計(jì)算來說,可以在異常容災(zāi)方面有很好的應(yīng)用途徑。比如,我們在Spark、Flink中做實(shí)時(shí)計(jì)算時(shí),需要長期在內(nèi)存里面維護(hù)一些數(shù)據(jù),這些數(shù)據(jù)可能是通過聚合了一天或者一周的
日志得到的,這些數(shù)據(jù)一旦由于異常因素(內(nèi)存、網(wǎng)絡(luò)、磁盤等)崩潰了,從頭開始計(jì)算需要很長的時(shí)間。一個(gè)比較有效可行的方式就是定時(shí)將內(nèi)存里的數(shù)據(jù)備份到外部存儲(chǔ)介質(zhì)中,當(dāng)崩潰出現(xiàn)時(shí),再從外
部存儲(chǔ)介質(zhì)中恢復(fù)并繼續(xù)計(jì)算。
使用日志壓縮來替代這些外部存儲(chǔ)有哪些優(yōu)勢及好處呢?這里為大家
列舉并總結(jié)了幾點(diǎn):
- Kafka即是數(shù)據(jù)源又是存儲(chǔ)工具,可以簡化技術(shù)棧,降低維護(hù)成本
- 使用外部存儲(chǔ)介質(zhì)的話,需要將存儲(chǔ)的Key記錄下來,恢復(fù)的時(shí)候再使用這些Key將數(shù)據(jù)取回,實(shí)現(xiàn)起來有一定的工程難度和復(fù)雜度。使用Kafka的日志壓縮特性,只需要把數(shù)據(jù)寫進(jìn)Kafka,等異常出現(xiàn)恢復(fù)任務(wù)時(shí)再讀回到內(nèi)存就可以了
- Kafka對于磁盤的讀寫做了大量的優(yōu)化工作,比如磁盤順序讀寫。相對于外部存儲(chǔ)介質(zhì)沒有索引查詢等工作量的負(fù)擔(dān),可以實(shí)現(xiàn)高性能。同時(shí),Kafka的日志壓縮機(jī)制可以充分利用廉價(jià)的磁盤,不用依賴昂貴的內(nèi)存來處理,在性能相似的情況下,實(shí)現(xiàn)非常高的性價(jià)比(這個(gè)觀點(diǎn)僅僅針對于異常處理和容災(zāi)的場景來說)
3 日志壓縮方式的實(shí)現(xiàn)細(xì)節(jié)
主題的 cleanup.policy 需要設(shè)置為compact。
Kafka的后臺(tái)線程會(huì)定時(shí)將Topic遍歷兩次:
日志壓縮允許刪除,除最后一個(gè)key之外,刪除先前出現(xiàn)的所有該key對應(yīng)的記錄。在一段時(shí)間后從日志中清理,以釋放空間。
注意:日志壓縮與key有關(guān),確保每個(gè)消息的key不為null。
壓縮是在Kafka后臺(tái)通過定時(shí)重新打開Segment來完成的,Segment的壓縮細(xì)節(jié)如下圖所示:
日志壓縮可以確保:
- 消息始終保持順序,壓縮永遠(yuǎn)不會(huì)重新排序消息,只是刪除一些而已
- 消息的偏移量永遠(yuǎn)不會(huì)改變,它是日志中位置的永久標(biāo)識(shí)符
- 從日志開始的任何使用者將至少看到所有記錄的最終狀態(tài),按記錄的順序?qū)懭搿A硗?#xff0c;如果使用者在比Topic的log.cleaner.delete.retention.ms短的時(shí)間內(nèi)到達(dá)日志的頭部,則會(huì)看到已刪除記錄的所有delete標(biāo)記。保留時(shí)間默認(rèn)是24小時(shí)。
默認(rèn)情況下,啟動(dòng)日志清理器,若需要啟動(dòng)特定Topic的日志清理,請?zhí)砑犹囟ǖ膶傩浴E渲萌罩厩謇砥?#xff0c;這里為大家總結(jié)了以下幾點(diǎn):
1) log.cleanup.policy 設(shè)置為 compact ,Broker的配置,影響集群中所有的Topic。
2) log.cleaner.min.compaction.lag.ms ,用于防止對更新超過最小消息進(jìn)行壓縮,如果沒有設(shè)置,除最后一個(gè)Segment之外,所有Segment都有資格進(jìn)行壓縮
- log.cleaner.max.compaction.lag.ms ,用于防止低生產(chǎn)速率的日志在無限制的時(shí)間內(nèi)不壓縮。
Kafka的日志壓縮原理并不復(fù)雜,就是定時(shí)把所有的日志讀取兩遍,寫一遍,而CPU的速度超過磁盤完全不是問題,只要日志的量對應(yīng)的讀取兩遍和寫入一遍的時(shí)間在可接受的范圍內(nèi),那么它的性能就是可以接受的。
2.5.3 磁盤存儲(chǔ)
2.5.3.1 零拷貝
kafka高性能,是多方面協(xié)同的結(jié)果,包括宏觀架構(gòu)、分布式partition存儲(chǔ)、ISR數(shù)據(jù)同步、以及“無所不用其極”的高效利用磁盤/操作系統(tǒng)特性。
零拷貝并不是不需要拷貝,而是減少不必要的拷貝次數(shù)。通常是說在IO讀寫過程中。
nginx的高性能也有零拷貝的身影。
傳統(tǒng)IO
比如:讀取文件,socket發(fā)送
傳統(tǒng)方式實(shí)現(xiàn):先讀取、再發(fā)送,實(shí)際經(jīng)過1~4四次copy。
1、第一次:將磁盤文件,讀取到操作系統(tǒng)內(nèi)核緩沖區(qū);
2、第二次:將內(nèi)核緩沖區(qū)的數(shù)據(jù),copy到application應(yīng)用程序的buffer;
3、第三步:將application應(yīng)用程序buffer中的數(shù)據(jù),copy到socket網(wǎng)絡(luò)發(fā)送緩沖區(qū)(屬于操作系統(tǒng)內(nèi)核的緩沖區(qū));
4、第四次:將socket buffer的數(shù)據(jù),copy到網(wǎng)絡(luò)協(xié)議棧,由網(wǎng)卡進(jìn)行網(wǎng)絡(luò)傳輸。
實(shí)際IO讀寫,需要進(jìn)行IO中斷,需要CPU響應(yīng)中斷(內(nèi)核態(tài)到用戶態(tài)轉(zhuǎn)換),盡管引入**DMA(DirectMemory Access,直接存儲(chǔ)器訪問)**來接管CPU的中斷請求,但四次copy是存在“不必要的拷貝”的。
實(shí)際上并不需要第二個(gè)和第三個(gè)數(shù)據(jù)副本。數(shù)據(jù)可以直接從讀緩沖區(qū)傳輸?shù)教捉幼志彌_區(qū)。
kafka的兩個(gè)過程:
1、網(wǎng)絡(luò)數(shù)據(jù)持久化到磁盤 (Producer 到 Broker)
2、磁盤文件通過網(wǎng)絡(luò)發(fā)送(Broker 到 Consumer)
數(shù)據(jù)落盤通常都是非實(shí)時(shí)的,Kafka的數(shù)據(jù)并不是實(shí)時(shí)的寫入硬盤,它充分利用了現(xiàn)代操作系統(tǒng)分頁存儲(chǔ)來利用內(nèi)存提高I/O效率。
磁盤文件通過網(wǎng)絡(luò)發(fā)送(Broker 到 Consumer)
磁盤數(shù)據(jù)通過**DMA(Direct Memory Access,直接存儲(chǔ)器訪問)**拷貝到內(nèi)核態(tài) Buffer
直接通過 DMA 拷貝到 NIC Buffer(socket buffer),無需 CPU 拷貝。
除了減少數(shù)據(jù)拷貝外,整個(gè)讀文件 ==> 網(wǎng)絡(luò)發(fā)送由一個(gè) sendfile 調(diào)用完成,整個(gè)過程只有兩次上下文切換,因此大大提高了性能。
Java NIO對sendfile的支持就是FileChannel.transferTo()/transferFrom()。
fileChannel.transferTo( position, count, socketChannel);
把磁盤文件讀取OS內(nèi)核緩沖區(qū)后的fileChannel,直接轉(zhuǎn)給socketChannel發(fā)送;底層就是sendfile。消費(fèi)者從broker讀取數(shù)據(jù),就是由此實(shí)現(xiàn)。
具體來看,Kafka 的數(shù)據(jù)傳輸通過 TransportLayer 來完成,其子類 PlaintextTransportLayer 通過Java NIO 的 FileChannel 的 transferTo 和 transferFrom 方法實(shí)現(xiàn)零拷貝。
注: transferTo 和 transferFrom 并不保證一定能使用零拷貝,需要操作系統(tǒng)支持。
Linux 2.4+ 內(nèi)核通過 sendfile 系統(tǒng)調(diào)用,提供了零拷貝。
2.5.3.2 頁緩存
頁緩存是操作系統(tǒng)實(shí)現(xiàn)的一種主要的磁盤緩存,以此用來減少對磁盤 I/O 的操作。
具體來說,就是把磁盤中的數(shù)據(jù)緩存到內(nèi)存中,把對磁盤的訪問變?yōu)閷?nèi)存的訪問。
Kafka接收來自socket buffer的網(wǎng)絡(luò)數(shù)據(jù),應(yīng)用進(jìn)程不需要中間處理、直接進(jìn)行持久化時(shí)。可以使用mmap內(nèi)存文件映射。
Memory Mapped Files
簡稱mmap,簡單描述其作用就是:將磁盤文件映射到內(nèi)存, 用戶通過修改內(nèi)存就能修改磁盤文件。
它的工作原理是直接利用操作系統(tǒng)的Page來實(shí)現(xiàn)磁盤文件到物理內(nèi)存的直接映射。完成映射之后你對物理內(nèi)存的操作會(huì)被同步到硬盤上(操作系統(tǒng)在適當(dāng)?shù)臅r(shí)候)。
通過mmap,進(jìn)程像讀寫硬盤一樣讀寫內(nèi)存(當(dāng)然是虛擬機(jī)內(nèi)存)。使用這種方式可以獲取很大的I/O提升,省去了用戶空間到內(nèi)核空間復(fù)制的開銷。
mmap也有一個(gè)很明顯的缺陷:不可靠,寫到mmap中的數(shù)據(jù)并沒有被真正的寫到硬盤,操作系統(tǒng)會(huì)在程序主動(dòng)調(diào)用flush的時(shí)候才把數(shù)據(jù)真正的寫到硬盤。
Kafka提供了一個(gè)參數(shù) producer.type 來控制是不是主動(dòng)flush;
如果Kafka寫入到mmap之后就立即flush然后再返回Producer叫同步(sync);
寫入mmap之后立即返回Producer不調(diào)用flush叫異步(async)。
Java NIO對文件映射的支持
Java NIO,提供了一個(gè)MappedByteBuffer 類可以用來實(shí)現(xiàn)內(nèi)存映射。
MappedByteBuffer只能通過調(diào)用FileChannel的map()取得,再?zèng)]有其他方式。
FileChannel.map()是抽象方法,具體實(shí)現(xiàn)是在FileChannelImpl.map()可自行查看JDK源碼,其map0()方法就是調(diào)用了Linux內(nèi)核的mmap的API。
使用 MappedByteBuffer類要注意的是
- mmap的文件映射,在full gc時(shí)才會(huì)進(jìn)行釋放。當(dāng)close時(shí),需要手動(dòng)清除內(nèi)存映射文件,可以反射調(diào)用sun.misc.Cleaner方法。
當(dāng)一個(gè)進(jìn)程準(zhǔn)備讀取磁盤上的文件內(nèi)容時(shí):
據(jù)返回給進(jìn)程。
如果一個(gè)進(jìn)程需要將數(shù)據(jù)寫入磁盤:
頁,最后將數(shù)據(jù)寫入對應(yīng)的頁。
數(shù)據(jù)的一致性。
對一個(gè)進(jìn)程而言,它會(huì)在進(jìn)程內(nèi)部緩存處理所需的數(shù)據(jù),然而這些數(shù)據(jù)有可能還緩存在操作系統(tǒng)的頁緩存中,因此同一份數(shù)據(jù)有可能被緩存了兩次。并且,除非使用Direct I/O的方式, 否則頁緩存很難
被禁止。
當(dāng)使用頁緩存的時(shí)候,即使Kafka服務(wù)重啟, 頁緩存還是會(huì)保持有效,然而進(jìn)程內(nèi)的緩存卻需要重建。這樣也極大地簡化了代碼邏輯,因?yàn)榫S護(hù)頁緩存和文件之間的一致性交由操作系統(tǒng)來負(fù)責(zé),這樣會(huì)
比進(jìn)程內(nèi)維護(hù)更加安全有效。
Kafka中大量使用了頁緩存,這是 Kafka 實(shí)現(xiàn)高吞吐的重要因素之一。
消息先被寫入頁緩存,由操作系統(tǒng)負(fù)責(zé)刷盤任務(wù)。
2.5.3.3 順序?qū)懭?/h3>
操作系統(tǒng)可以針對線性讀寫做深層次的優(yōu)化,比如預(yù)讀(read-ahead,提前將一個(gè)比較大的磁盤塊讀入內(nèi)存) 和后寫(write-behind,將很多小的邏輯寫操作合并起來組成一個(gè)大的物理寫操作)技術(shù)。
Kafka 在設(shè)計(jì)時(shí)采用了文件追加的方式來寫入消息,即只能在日志文件的尾部追加新的消 息,并且也不允許修改已寫入的消息,這種方式屬于典型的順序?qū)懕P的操作,所以就算 Kafka 使用磁盤作為存儲(chǔ)
介質(zhì),也能承載非常大的吞吐量。
mmap和sendfile:
Kafka速度快是因?yàn)?#xff1a;
2.6 穩(wěn)定性
2.6.1 事務(wù)
一、事務(wù)場景
這就形成了一個(gè)典型的分布式事務(wù)。
前未完成的事務(wù) 。
1) 只有Producer生產(chǎn)消息;
2)消費(fèi)消息和生產(chǎn)消息并存,這個(gè)是事務(wù)場景中最常用的情況,就是我們常說的consume-transform-produce 模式
3) 只有consumer消費(fèi)消息,這種操作其實(shí)沒有什么意義,跟使用手動(dòng)提交效果一樣,而且也不是事務(wù)屬性引入的目的,所以一般不會(huì)使用這種情況
二、幾個(gè)關(guān)鍵概念和推導(dǎo)
__transaction_state
Message。
TransactionalId的producer能夠接著處理這個(gè)事務(wù)未完成的狀態(tài)。kafka目前沒有引入全局序,所以也沒有transaction id,這個(gè)TransactionalId是用戶提前配置的。
三、事務(wù)語義
2.1. 多分區(qū)原子寫入
事務(wù)能夠保證Kafka topic下每個(gè)分區(qū)的原子寫入。事務(wù)中所有的消息都將被成功寫入或者丟棄。
首先,我們來考慮一下原子 讀取-處理-寫入 周期是什么意思。簡而言之,這意味著如果某個(gè)應(yīng)用程序在某個(gè)topic tp0的偏移量X處讀取到了消息A,并且在對消息A進(jìn)行了一些處理(如B = F(A))之后
將消息B寫入topic tp1,則只有當(dāng)消息A和B被認(rèn)為被成功地消費(fèi)并一起發(fā)布,或者完全不發(fā)布時(shí),整個(gè)讀取過程寫入操作是原子的。
現(xiàn)在,只有當(dāng)消息A的偏移量X被標(biāo)記為已消費(fèi),消息A才從topic tp0消費(fèi),消費(fèi)到的數(shù)據(jù)偏移量(record offset)將被標(biāo)記為提交偏移量(Committing offset)。在Kafka中,我們通過寫入一個(gè)名為
offsets topic的內(nèi)部Kafka topic來記錄offset commit。消息僅在其offset被提交給offsets topic時(shí)才被認(rèn)為成功消費(fèi)。
由于offset commit只是對Kafkatopic的另一次寫入,并且由于消息僅在提交偏移量時(shí)被視為成功消費(fèi),所以跨多個(gè)主題和分區(qū)的原子寫入也啟用原子 讀取-處理-寫入 循環(huán):提交偏移量X到offset topic和消息B到tp1的寫入將是單個(gè)事務(wù)的一部分,所以整個(gè)步驟都是原子的。
2.2. 粉碎“僵尸實(shí)例”
我們通過為每個(gè)事務(wù)Producer分配一個(gè)稱為transactional.id的唯一標(biāo)識(shí)符來解決僵尸實(shí)例的問題。在進(jìn)程重新啟動(dòng)時(shí)能夠識(shí)別相同的Producer實(shí)例。
API要求事務(wù)性Producer的第一個(gè)操作應(yīng)該是在Kafka集群中顯示注冊transactional.id。 當(dāng)注冊的時(shí)候,Kafka broker用給定的transactional.id檢查打開的事務(wù)并且完成處理。 Kafka也增加了一個(gè)與transactional.id相關(guān)的epoch。Epoch存儲(chǔ)每個(gè)transactional.id內(nèi)部元數(shù)據(jù)。
一旦epoch被觸發(fā),任何具有相同的transactional.id和舊的epoch的生產(chǎn)者被視為僵尸,Kafka拒絕來自這些生產(chǎn)者的后續(xù)事務(wù)性寫入。
簡而言之:Kafka可以保證Consumer最終只能消費(fèi)非事務(wù)性消息或已提交事務(wù)性消息。它將保留來自未完成事務(wù)的消息,并過濾掉已中止事務(wù)的消息。
2.3 事務(wù)消息定義
生產(chǎn)者可以顯式地發(fā)起事務(wù)會(huì)話,在這些會(huì)話中發(fā)送(事務(wù))消息,并提交或中止事務(wù)。有如下要求:
如果允許事務(wù)性和非事務(wù)性消息的交織,則非事務(wù)性和事務(wù)性消息的相對順序?qū)⒒诟郊?#xff08;對于非事務(wù)性消息)和最終提交(對于事務(wù)性消息)的相對順序。
在上圖中,分區(qū)p0和p1接收事務(wù)X1和X2的消息,以及非事務(wù)性消息。時(shí)間線是消息到達(dá)Broker的時(shí)間。由于首先提交了X2,所以每個(gè)分區(qū)都將在X1之前公開來自X2的消息。由于非事務(wù)性消息在X1和X2的提交之前到達(dá),因此這些消息將在來自任一事務(wù)的消息之前公開。
四、事務(wù)配置
1、創(chuàng)建消費(fèi)者代碼,需要:
- 將配置中的自動(dòng)提交屬性(auto.commit)進(jìn)行關(guān)閉
- 而且在代碼里面也不能使用手動(dòng)提交commitSync( )或者commitAsync( )
- 設(shè)置isolation.level:READ_COMMITTED或READ_UNCOMMITTED
2、創(chuàng)建生成者,代碼如下,需要:
- 配置transactional.id屬性
- 配置enable.idempotence屬性
五、事務(wù)概覽
生產(chǎn)者將表示事務(wù)開始/結(jié)束/中止?fàn)顟B(tài)的事務(wù)控制消息發(fā)送給使用多階段協(xié)議管理事務(wù)的高可用事務(wù)協(xié)調(diào)器。生產(chǎn)者將事務(wù)控制記錄(開始/結(jié)束/中止)發(fā)送到事務(wù)協(xié)調(diào)器,并將事務(wù)的消息直接發(fā)送到目標(biāo)數(shù)據(jù)分區(qū)。消費(fèi)者需要了解事務(wù)并緩沖每個(gè)待處理的事務(wù),直到它們到達(dá)其相應(yīng)的結(jié)束(提交/中止)記錄為止。
- 事務(wù)組
- 事務(wù)組中的生產(chǎn)者
- 事務(wù)組的事務(wù)協(xié)調(diào)器
- Leader brokers(事務(wù)數(shù)據(jù)所在分區(qū)的Broker)
- 事務(wù)的消費(fèi)者
六、事務(wù)組
事務(wù)組用于映射到特定的事務(wù)協(xié)調(diào)器(基于日志分區(qū)數(shù)字的哈希)。該組中的生產(chǎn)者需要配置為該組事務(wù)生產(chǎn)者。由于來自這些生產(chǎn)者的所有事務(wù)都通過此協(xié)調(diào)器進(jìn)行,因此我們可以在這些事務(wù)生產(chǎn)者之間實(shí)現(xiàn)嚴(yán)格的有序
七、生產(chǎn)者ID和事務(wù)組狀態(tài)
事務(wù)生產(chǎn)者需要兩個(gè)新參數(shù):生產(chǎn)者ID和生產(chǎn)組。
需要將生產(chǎn)者的輸入狀態(tài)與上一個(gè)已提交的事務(wù)相關(guān)聯(lián)。這使事務(wù)生產(chǎn)者能夠重試事務(wù)(通過為該事務(wù)重新創(chuàng)建輸入狀態(tài);在我們的用例中通常是偏移量的向量)。
可以使用消費(fèi)者偏移量管理機(jī)制來管理這些狀態(tài)。消費(fèi)者偏移量管理器將每個(gè)鍵(consumergroup-topic-partition )與該分區(qū)的最后一個(gè)檢查點(diǎn)偏移量和元數(shù)據(jù)相關(guān)聯(lián)。在事務(wù)生產(chǎn)者中,我們保存消費(fèi)者的偏移量,該偏移量與事務(wù)的提交點(diǎn)關(guān)聯(lián)。此偏移提交記錄(在__consumer_offsets 主題中)應(yīng)作為事務(wù)的一部分寫入。即,存儲(chǔ)消費(fèi)組偏移量的__consumer_offsets 主題分區(qū)將需要參與事務(wù)。因此,假定生產(chǎn)者在事務(wù)中間失敗(事務(wù)協(xié)調(diào)器隨后到期);當(dāng)生產(chǎn)者恢復(fù)時(shí),它可以發(fā)出偏移量獲取請求,以恢復(fù)與最后提交的事務(wù)相關(guān)聯(lián)的輸入偏移量,并從該點(diǎn)恢復(fù)事務(wù)處理。
為了支持此功能,我們需要對偏移量管理器和壓縮的 __consumer_offsets 主題進(jìn)行一些增強(qiáng)。
首先,壓縮的主題現(xiàn)在還將包含事務(wù)控制記錄。我們將需要為這些控制記錄提出剔除策略。
其次,偏移量管理器需要具有事務(wù)意識(shí);特別是,如果組與待處理的事務(wù)相關(guān)聯(lián),則偏移量提
取請求應(yīng)返回錯(cuò)誤。
八、事務(wù)協(xié)調(diào)器
需要確保無論是什么樣的保留策略(日志分區(qū)的刪除還是壓縮),都不能刪除包含事務(wù)HW的日志分段。
九、事務(wù)流程
初始階段 (圖中步驟A)
求,當(dāng)然也可以發(fā)送另一個(gè)包含事務(wù)過期時(shí)間的。如果生產(chǎn)者需要將消費(fèi)者狀態(tài)作為事務(wù)的一部分提交事務(wù),則需要在BeginTransaction中包含對應(yīng)的 __consumer_offsets 主題分區(qū)信息。
發(fā)送階段
(圖中步驟2)
Producer:發(fā)送事務(wù)消息給主題Leader分區(qū)所在的Broker。每個(gè)消息需要包含TxId和TxCtl字段。
TxCtl僅用于標(biāo)記事務(wù)的最終狀態(tài)(提交還是中止)。生產(chǎn)者請求也封裝了生產(chǎn)者ID,但是不追加到日志中。
結(jié)束階段 (生產(chǎn)者準(zhǔn)備提交事務(wù))
(圖中步驟3、4、5。)
十、事務(wù)的中止
當(dāng)事務(wù)生產(chǎn)者發(fā)送業(yè)務(wù)消息的時(shí)候如果發(fā)生異常,可以中止該事務(wù)。如果事務(wù)提交超時(shí),事務(wù)協(xié)調(diào)器也會(huì)中止當(dāng)前事務(wù)。
十一、基本事務(wù)流程的失敗
十二、主題的壓縮
壓縮主題在壓縮過程中會(huì)丟棄具有相同鍵的早期記錄。如果這些記錄是事務(wù)的一部分,這合法嗎?
這可能有點(diǎn)怪異,但可能不會(huì)太有害,因?yàn)樵谥黝}中使用壓縮策略的理由是保留關(guān)鍵數(shù)據(jù)的最新更新。
如果該應(yīng)用程序正在(例如)更新某些表,并且事務(wù)中的消息對應(yīng)于不同的鍵,則這種情況可能導(dǎo)致數(shù)據(jù)庫視圖不一致。
十三、事務(wù)相關(guān)配置
1、Broker configs
2、Producer configs
3、Consumer configs
2.6.1.1 冪等性
Kafka在引入冪等性之前,Producer向Broker發(fā)送消息,然后Broker將消息追加到消息流中后給Producer返回Ack信號值。實(shí)現(xiàn)流程如下:
生產(chǎn)中,會(huì)出現(xiàn)各種不確定的因素,比如在Producer在發(fā)送給Broker的時(shí)候出現(xiàn)網(wǎng)絡(luò)異常。比如以下這種異常情況的出現(xiàn):
上圖這種情況,當(dāng)Producer第一次發(fā)送消息給Broker時(shí),Broker將消息(x2,y2)追加到了消息流中,但是在返回Ack信號給Producer時(shí)失敗了(比如網(wǎng)絡(luò)異常) 。此時(shí),Producer端觸發(fā)重試機(jī)制,將消息(x2,y2)重新發(fā)送給Broker,Broker接收到消息后,再次將該消息追加到消息流中,然后成功返回Ack信號給Producer。這樣下來,消息流中就被重復(fù)追加了兩條相同的(x2,y2)的消息。
冪等性
保證在消息重發(fā)的時(shí)候,消費(fèi)者不會(huì)重復(fù)處理。即使在消費(fèi)者收到重復(fù)消息的時(shí)候,重復(fù)處理,也要保證最終結(jié)果的一致性。
所謂冪等性,數(shù)學(xué)概念就是: f(f(x)) = f(x) 。f函數(shù)表示對消息的處理。
比如,銀行轉(zhuǎn)賬,如果失敗,需要重試。不管重試多少次,都要保證最終結(jié)果一定是一致的。
冪等性實(shí)現(xiàn)
添加唯一ID,類似于數(shù)據(jù)庫的主鍵,用于唯一標(biāo)記一個(gè)消息。
Kafka為了實(shí)現(xiàn)冪等性,它在底層設(shè)計(jì)架構(gòu)中引入了ProducerID和SequenceNumber。
- ProducerID:在每個(gè)新的Producer初始化時(shí),會(huì)被分配一個(gè)唯一的ProducerID,這個(gè)ProducerID對客戶端使用者是不可見的。
- SequenceNumber:對于每個(gè)ProducerID,Producer發(fā)送數(shù)據(jù)的每個(gè)Topic和Partition都對應(yīng)一個(gè)從0開始單調(diào)遞增的SequenceNumber值。
同樣,這是一種理想狀態(tài)下的發(fā)送流程。實(shí)際情況下,會(huì)有很多不確定的因素,比如Broker在發(fā)送Ack信號給Producer時(shí)出現(xiàn)網(wǎng)絡(luò)異常,導(dǎo)致發(fā)送失敗。異常情況如下圖所示:
當(dāng)Producer發(fā)送消息(x2,y2)給Broker時(shí),Broker接收到消息并將其追加到消息流中。此時(shí),Broker返回Ack信號給Producer時(shí),發(fā)生異常導(dǎo)致Producer接收Ack信號失敗。對于Producer來說,會(huì)觸發(fā)重試機(jī)制,將消息(x2,y2)再次發(fā)送,但是,由于引入了冪等性,在每條消息中附帶了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber發(fā)送給Broker,而之前Broker緩存過之前發(fā)送的相同的消息,那么在消息流中的消息就只有一條(x2,y2),不會(huì)出現(xiàn)重復(fù)發(fā)送的情況。
客戶端在生成Producer時(shí),會(huì)實(shí)例化如下代碼:
// 實(shí)例化一個(gè)Producer對象 Producer<String, String> producer = new KafkaProducer<>(props);在org.apache.kafka.clients.producer.internals.Sender類中,在run()中有一個(gè)maybeWaitForPid()方法,用來生成一個(gè)ProducerID,實(shí)現(xiàn)代碼如下:
private void maybeWaitForPid() {if (transactionState == null)return;while (!transactionState.hasPid()) {try {Node node = awaitLeastLoadedNodeReady(requestTimeout);if (node != null) {ClientResponse response = sendAndAwaitInitPidRequest(node);if (response.hasResponse() && (response.responseBody() instanceof InitPidResponse)) {InitPidResponse initPidResponse = (InitPidResponse) response.responseBody();// 設(shè)置pid和epochtransactionState.setPidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch());} else {log.error("Received an unexpected response type for an InitPidRequest from { }." +"We will back off and try again.", node);}} else {log.debug("Could not find an available broker to send InitPidRequest to." +"We will back off and try again.");}} catch (Exception e) {log.warn("Received an exception while trying to get a pid. Will back off and retry.", e);}log.trace("Retry InitPidRequest in {}ms.", retryBackoffMs);time.sleep(retryBackoffMs);metadata.requestUpdate();} }2.6.1.2 事務(wù)操作
在Kafka事務(wù)中,一個(gè)原子性操作,根據(jù)操作類型可以分為3種情況。情況如下:
- 只有Producer生產(chǎn)消息,這種場景需要事務(wù)的介入;
- 消費(fèi)消息和生產(chǎn)消息并存,比如Consumer&Producer模式,這種場景是一般Kafka項(xiàng)目中比較常見的模式,需要事務(wù)介入;
- 只有Consumer消費(fèi)消息,這種操作在實(shí)際項(xiàng)目中意義不大,和手動(dòng)Commit Offsets的結(jié)果一樣,而且這種場景不是事務(wù)的引入目的。
案例1:單個(gè)Producer,使用事務(wù)保證消息的僅一次發(fā)送:
package com.lagou.kafka.demo.producer;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer;import java.util.HashMap; import java.util.Map;public class MyTransactionalProducer {public static void main(String[] args) {Map<String, Object> configs = new HashMap<>();configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 提供生產(chǎn)者client.idconfigs.put(ProducerConfig.CLIENT_ID_CONFIG, "tx_producer");// 設(shè)置事務(wù)IDconfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my_tx_id_1");// 需要ISR全體確認(rèn)消息configs.put(ProducerConfig.ACKS_CONFIG, "all");KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);// 初始化事務(wù)producer.initTransactions();try {// 開啟事務(wù)producer.beginTransaction();// 發(fā)送事務(wù)消息producer.send(new ProducerRecord<>("tp_tx_01", "txkey1", "tx_msg_4"));producer.send(new ProducerRecord<>("tp_tx_01", "txkey2", "tx_msg_5"));producer.send(new ProducerRecord<>("tp_tx_01", "txkey3", "tx_msg_6"));// 人為制造異常int i = 1 / 0;// 提交事務(wù)producer.commitTransaction();} catch (Exception e) {e.printStackTrace();// 事務(wù)回滾producer.abortTransaction();} finally {// 關(guān)閉生產(chǎn)者producer.close();}} }正常運(yùn)行時(shí), 一次全都顯示,如果有異常則會(huì)回滾, 不發(fā)送消息
案例2:在 消費(fèi)-轉(zhuǎn)換-生產(chǎn) 模式,使用事務(wù)保證僅一次發(fā)送。
package com.lagou.kafka.demo;import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer;import java.util.Collections; import java.util.HashMap; import java.util.Map;public class MyTransactional {public static KafkaProducer<String, String> getProducer() {Map<String, Object> configs = new HashMap<>();configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 設(shè)置client.idconfigs.put(ProducerConfig.CLIENT_ID_CONFIG, "tx_producer_01");// 設(shè)置事務(wù)id, 必須設(shè)置configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx_id_02");// 需要所有的ISR副本確認(rèn)configs.put(ProducerConfig.ACKS_CONFIG, "all");// 啟用冪等性, 通過pid和sequenceconfigs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);return producer;}public static KafkaConsumer<String, String> getConsumer(String consumerGroupId) {Map<String, Object> configs = new HashMap<>();configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// 設(shè)置消費(fèi)組IDconfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_grp_02");// 不啟用消費(fèi)者偏移量的自動(dòng)確認(rèn),也不要手動(dòng)確認(rèn)configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer_client_02");// 如果不存在這個(gè)偏移量, 就自動(dòng)讀最早的configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 只讀取已提交的消息 // configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);return consumer;}public static void main(String[] args) {// 設(shè)置消費(fèi)組idString consumerGroupId = "consumer_grp_id_101";KafkaProducer<String, String> producer = getProducer();KafkaConsumer<String, String> consumer = getConsumer(consumerGroupId);// 事務(wù)的初始化producer.initTransactions();//訂閱主題consumer.subscribe(Collections.singleton("tp_tx_01"));final ConsumerRecords<String, String> records = consumer.poll(1_000);// 開啟事務(wù)producer.beginTransaction();try {Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();for (ConsumerRecord<String, String> record : records) {System.out.println(record);producer.send(new ProducerRecord<String, String>("tp_tx_out_01", record.key(), record.value()));offsets.put(new TopicPartition(record.topic(), record.partition()),// 下面要 +1, 偏移量表示下一條要消費(fèi)的消息new OffsetAndMetadata(record.offset() + 1));}// 將該消息的偏移量提交作為事務(wù)的一部分,隨事務(wù)一起提交和回滾(不手動(dòng)或自動(dòng)提交消費(fèi)偏移量)producer.sendOffsetsToTransaction(offsets, consumerGroupId);// int i = 1 / 0;// 提交事務(wù)producer.commitTransaction();} catch (Exception e) {e.printStackTrace();// 回滾事務(wù)producer.abortTransaction();} finally {// 關(guān)閉資源producer.close();consumer.close();}} }有異常時(shí), 運(yùn)行之后消費(fèi)者不會(huì)消費(fèi)消息, 同時(shí), 再次運(yùn)行時(shí), 還可以從之前的偏移量繼續(xù)消費(fèi)消息
2.6.2 控制器
Kafka集群包含若干個(gè)broker,broker.id指定broker的編號,編號不要重復(fù)。
Kafka集群上創(chuàng)建的主題,包含若干個(gè)分區(qū)。
每個(gè)分區(qū)包含若干個(gè)副本,副本因子包括了Follower副本和Leader副本。
副本又分為ISR(同步副本分區(qū))和OSR(非同步副本分區(qū))。
控制器就是一個(gè)broker。
控制器除了一般broker的功能,還負(fù)責(zé)Leader分區(qū)的選舉。
2.6.2.1 broker選舉
集群里第一個(gè)啟動(dòng)的broker在Zookeeper中創(chuàng)建臨時(shí)節(jié)點(diǎn) /controller 。
其他broker在該控制器節(jié)點(diǎn)創(chuàng)建Zookeeper watch對象,使用Zookeeper的監(jiān)聽機(jī)制接收該節(jié)點(diǎn)的變更。
即:Kafka通過Zookeeper的分布式鎖特性選舉集群控制器。
下圖中,節(jié)點(diǎn) /myKafka/controller 是一個(gè)zookeeper臨時(shí)節(jié)點(diǎn),其中 “brokerid”:0 ,表示當(dāng)前控制器是broker.id為0的broker。
每個(gè)新選出的控制器通過 Zookeeper 的條件遞增操作獲得一個(gè)全新的、數(shù)值更大的 controller epoch。其他 broker 在知道當(dāng)前 controller epoch 后,如果收到由控制器發(fā)出的包含較舊epoch 的消息,就會(huì)忽略它們,以防止“腦裂”。
比如當(dāng)一個(gè)Leader副本分區(qū)所在的broker宕機(jī),需要選舉新的Leader副本分區(qū),有可能兩個(gè)具有不同紀(jì)元數(shù)字的控制器都選舉了新的Leader副本分區(qū),如果選舉出來的Leader副本分區(qū)不一樣,聽誰的?腦裂了。有了紀(jì)元數(shù)字,直接使用紀(jì)元數(shù)字最新的控制器結(jié)果。
集群控制器負(fù)責(zé)監(jiān)聽 ids 節(jié)點(diǎn),一旦節(jié)點(diǎn)子節(jié)點(diǎn)發(fā)送變化,集群控制器得到通知。
結(jié)論:
2.6.3 可靠性保證
概念
時(shí)間沒有向Leader發(fā)送fetch請求(參數(shù):replica.lag.time.max.ms 默認(rèn)值:10000)。 4. 為了保證可靠性,可以設(shè)置 acks=all 。Follower收到消息后,會(huì)像Leader發(fā)送ACK。一旦Leader收到了ISR中所有Replica的ACK,Leader就commit,那么Leader就向Producer發(fā)送ACK。
副本的分配:
當(dāng)某個(gè)topic的 --replication-factor 為N(N>1)時(shí),每個(gè)Partition都有N個(gè)副本,稱作replica。原則上是將replica均勻的分配到整個(gè)集群上。不僅如此,partition的分配也同樣需要均勻分配,為了更好的負(fù)載均衡。
副本分配的三個(gè)目標(biāo):
在不考慮機(jī)架信息的情況下:
2.6.3.1 失效副本
失效副本的判定
replica.lag.time.max.ms 默認(rèn)大小為10000。
失效副本的分區(qū)個(gè)數(shù)是用于衡量Kafka性能指標(biāo)的重要部分。Kafka本身提供了一個(gè)相關(guān)的指標(biāo),即UnderReplicatedPartitions,這個(gè)可以通過JMX訪問:
取值范圍是大于等于0的整數(shù)。注意:如果Kafka集群正在做分區(qū)遷移(kafka-reassign-partitions.sh)的時(shí)候,這個(gè)值也會(huì)大于0。
2.6.3.2 副本復(fù)制
日志復(fù)制算法(log replication algorithm)必須提供的基本保證是,如果它告訴客戶端消息已被提交,而當(dāng)前Leader出現(xiàn)故障,新選出的Leader也必須具有該消息。在出現(xiàn)故障時(shí),Kafka會(huì)從掛掉Leader的ISR里面選擇一個(gè)Follower作為這個(gè)分區(qū)新的Leader。
每個(gè)分區(qū)的 leader 會(huì)維護(hù)一個(gè)in-sync replica(同步副本列表,又稱 ISR)。當(dāng)Producer向broker發(fā)送消息,消息先寫入到對應(yīng)Leader分區(qū),然后復(fù)制到這個(gè)分區(qū)的所有副本中。ACKS=ALL時(shí),只有將消息成功復(fù)制到所有同步副本(ISR)后,這條消息才算被提交。
當(dāng)副本落后于 leader 分區(qū)時(shí),這個(gè)副本被認(rèn)為是不同步或滯后的。在 Kafka中,副本的滯后于Leader是根據(jù) replica.lag.time.max.ms 來衡量。
如何確認(rèn)某個(gè)副本處于滯后狀態(tài)
通過 replica.lag.time.max.ms 來檢測卡住副本(Stuck replica)在所有情況下都能很好地工作。它跟蹤 follower 副本沒有向 leader 發(fā)送獲取請求的時(shí)間,通過這個(gè)可以推斷 follower 是否正常。
另一方面,使用消息數(shù)量檢測不同步慢副本(Slow replica)的模型只有在為單個(gè)主題或具有同類流量模式的多個(gè)主題設(shè)置這些參數(shù)時(shí)才能很好地工作,但我們發(fā)現(xiàn)它不能擴(kuò)展到生產(chǎn)集群中所有主題。
2.6.4 一致性保證
一、概念
1. 水位標(biāo)記
水位或水印(watermark)一詞,表示位置信息,即位移(offset)。Kafka源碼中使用的名字是高水位,HW(high watermark)。
2. 副本角色
Kafka分區(qū)使用多個(gè)副本(replica)提供高可用。
3. LEO和HW
每個(gè)分區(qū)副本對象都有兩個(gè)重要的屬性:LEO和HW。
- LEO:即日志末端位移(log end offset),記錄了該副本日志中下一條消息的位移值。如果LEO=10,那么表示該副本保存了10條消息,位移值范圍是[0, 9]。另外,Leader LEOFollower LEO的更新是有區(qū)別的。
- HW:即上面提到的水位值。對于同一個(gè)副本對象而言,其HW值不會(huì)大于LEO值。小于等于HW值的所有消息都被認(rèn)為是“已備份”的(replicated)。Leader副本和Follower副本的HW更新不同。
二、Follower副本何時(shí)更新LEO
三、Follower副本何時(shí)更新HW
Follower更新HW發(fā)生在其更新LEO之后,一旦Follower向Log寫完數(shù)據(jù),嘗試更新自己的HW值。
比較當(dāng)前LEO值與FETCH響應(yīng)中Leader的HW值,取兩者的小者作為新的HW值。
即:如果Follower的LEO大于Leader的HW,Follower HW值不會(huì)大于Leader的HW值。
四、Leader副本何時(shí)更新LEO
和Follower更新LEO相同,Leader寫Log時(shí)自動(dòng)更新自己的LEO值。
五、Leader副本何時(shí)更新HW值
Leader的HW值就是分區(qū)HW值,直接影響分區(qū)數(shù)據(jù)對消費(fèi)者的可見性 。
Leader如何更新自己的HW值?Leader broker上保存了一套Follower副本的LEO以及自己的LEO。
當(dāng)嘗試確定分區(qū)HW時(shí),它會(huì)選出所有滿足條件的副本,比較它們的LEO(包括Leader的LEO),并選擇最小的LEO值作為HW值。
需要滿足的條件,(二選一):
如果Kafka只判斷第一個(gè)條件的話,確定分區(qū)HW值時(shí)就不會(huì)考慮這些未在ISR中的副本,但這些副本已經(jīng)具備了“立刻進(jìn)入ISR”的資格,因此就可能出現(xiàn)分區(qū)HW值越過ISR中副本LEO的情況——不允許。因?yàn)榉謪^(qū)HW定義就是ISR中所有副本LEO的最小值。
六、HW和LEO正常更新案例
我們假設(shè)有一個(gè)topic,單分區(qū),副本因子是2,即一個(gè)Leader副本和一個(gè)Follower副本。我們看下當(dāng)producer發(fā)送一條消息時(shí),broker端的副本到底會(huì)發(fā)生什么事情以及分區(qū)HW是如何被更新的。
1. 初始狀態(tài)
2. Follower發(fā)送FETCH請求在Leader處理完P(guān)RODUCE請求之后
producer給該topic分區(qū)發(fā)送了一條消息
此時(shí)的狀態(tài)如下圖所示:
PRODUCE請求處理完成后各值如下,Leader端的HW值依然是0,而LEO是1,Remote LEO也是0。
假設(shè)此時(shí)follower發(fā)送了FETCH請求,則狀態(tài)變更如下:
而Follower副本接收到FETCH Response后依次執(zhí)行下列操作:
此時(shí),第一輪FETCH RPC結(jié)束,我們會(huì)發(fā)現(xiàn)雖然Leader和Follower都已經(jīng)在Log中保存了這條消息,但分區(qū)HW值尚未被更新,仍為0。
Follower第二輪FETCH
分區(qū)HW是在第二輪FETCH RPC中被更新的,如下圖所示:
同樣地,Follower副本接收到FETCH response后依次執(zhí)行下列操作:
= 1 。
此時(shí)消息已經(jīng)成功地被復(fù)制到Leader和Follower的Log中且分區(qū)HW是1,表明消費(fèi)者能夠消費(fèi)offset = 0的消息。
七、HW和LEO異常案例
Kafka使用HW值來決定副本備份的進(jìn)度,而HW值的更新通常需要額外一輪FETCH RPC才能完成。
但這種設(shè)計(jì)是有問題的,可能引起的問題包括:
數(shù)據(jù)丟失
使用HW值來確定備份進(jìn)度時(shí)其值的更新是在下一輪RPC中完成的。如果Follower副本在標(biāo)記上方的的第一步與第二步之間發(fā)生崩潰,那么就有可能造成數(shù)據(jù)的丟失。
上圖中有兩個(gè)副本:A和B。開始狀態(tài)是A是Leader。
假設(shè)生產(chǎn)者 min.insync.replicas 為1,那么當(dāng)生產(chǎn)者發(fā)送兩條消息給A后,A寫入Log,此時(shí)Kafka會(huì)通知生產(chǎn)者這兩條消息寫入成功。
但是在broker端,Leader和Follower的Log雖都寫入了2條消息且分區(qū)HW已經(jīng)被更新到2,但Follower HW尚未被更新還是1,也就是上面標(biāo)記的第二步尚未執(zhí)行,表中最后一條未執(zhí)行。
倘若此時(shí)副本B所在的broker宕機(jī),那么重啟后B會(huì)自動(dòng)把LEO調(diào)整到之前的HW值1,故副本B會(huì)做日志截?cái)?log truncation),將offset = 1的那條消息從log中刪除,并調(diào)整LEO = 1。此時(shí)follower副本底層log中就只有一條消息,即offset = 0的消息!
B重啟之后需要給A發(fā)FETCH請求,但若A所在broker機(jī)器在此時(shí)宕機(jī),那么Kafka會(huì)令B成為新的Leader,而當(dāng)A重啟回來后也會(huì)執(zhí)行日志截?cái)?#xff0c;將HW調(diào)整回1。這樣,offset=1的消息就從兩個(gè)副本的log中被刪除,也就是說這條已經(jīng)被生產(chǎn)者認(rèn)為發(fā)送成功的數(shù)據(jù)丟失。
丟失數(shù)據(jù)的前提是 min.insync.replicas=1 時(shí),一旦消息被寫入Leader端Log即被認(rèn)為是committed 。延遲一輪 FETCH RPC 更新HW值的設(shè)計(jì)使follower HW值是異步延遲更新,若在這個(gè)過程中Leader發(fā)生變更,那么成為新Leader的Follower的HW值就有可能是過期的,導(dǎo)致生產(chǎn)者本是成功提交的消息被刪除。
Leader和Follower數(shù)據(jù)離散
除了可能造成的數(shù)據(jù)丟失以外,該設(shè)計(jì)還會(huì)造成Leader的Log和Follower的Log數(shù)據(jù)不一致。
如Leader端記錄序列:m1,m2,m3,m4,m5,…;Follower端序列可能是m1,m3,m4,m5,…。
看圖:
八、Leader Epoch使用
Kafka解決方案
造成上述兩個(gè)問題的根本原因在于
但HW值的更新是異步延遲的,特別是需要額外的FETCH請求處理流程才能更新,故這中間發(fā)生的任何崩潰都可能導(dǎo)致HW值的過期。
Kafka從0.11引入了 leader epoch 來取代HW值。Leader端使用內(nèi)存保存Leader的epoch信息,即使出現(xiàn)上面的兩個(gè)場景也能規(guī)避這些問題。
所謂Leader epoch實(shí)際上是一對值:<epoch, offset>:
則表示第一個(gè)Leader從位移0開始寫入消息;共寫了120條[0, 119];而第二個(gè)Leader版本號是1,從位移120處開始寫入消息。
規(guī)避數(shù)據(jù)丟失
規(guī)避數(shù)據(jù)不一致
依靠Leader epoch的信息可以有效地規(guī)避數(shù)據(jù)不一致的問題。
對于使用 unclean.leader.election.enable = true 設(shè)置的群集,該方案不能保證消息的一致性
2.6.5 消息重復(fù)的場景及解決方案
消息重復(fù)和丟失是kafka中很常見的問題,主要發(fā)生在以下三個(gè)階段:
2.6.5.1 生產(chǎn)者階段重復(fù)場景
2.6.5.1.1 根本原因
生產(chǎn)發(fā)送的消息沒有收到正確的broke響應(yīng),導(dǎo)致生產(chǎn)者重試。
生產(chǎn)者發(fā)出一條消息,broke落盤以后因?yàn)榫W(wǎng)絡(luò)等種種原因發(fā)送端得到一個(gè)發(fā)送失敗的響應(yīng)或者網(wǎng)絡(luò)中斷,然后生產(chǎn)者收到一個(gè)可恢復(fù)的Exception重試消息導(dǎo)致消息重復(fù)。
2.6.5.1.2 重試過程
說明:
2.6.5.1.3 可恢復(fù)異常說明
異常是RetriableException類型或者TransactionManager允許重試;RetriableException類繼承關(guān)系如下:
2.6.5.1.4 記錄順序問題
如果設(shè)置 max.in.flight.requests.per.connection 大于1(默認(rèn)5,單個(gè)連接上發(fā)送的未確認(rèn)請求的最大數(shù)量,表示上一個(gè)發(fā)出的請求沒有確認(rèn)下一個(gè)請求又發(fā)出了)。大于1可能會(huì)改變記錄的順序,因?yàn)槿绻麑蓚€(gè)batch發(fā)送到單個(gè)分區(qū),第一個(gè)batch處理失敗并重試,但是第二個(gè)batch處理成功,那么第二個(gè)batch處理中的記錄可能先出現(xiàn)被消費(fèi)。
設(shè)置 max.in.flight.requests.per.connection 為1,可能會(huì)影響吞吐量,可以解決單個(gè)生產(chǎn)者發(fā)送順序問題。如果多個(gè)生產(chǎn)者,生產(chǎn)者1先發(fā)送一個(gè)請求,生產(chǎn)者2后發(fā)送請求,此時(shí)生產(chǎn)者1返回可恢復(fù)異常,重試一定次數(shù)成功了。雖然生產(chǎn)者1先發(fā)送消息,但生產(chǎn)者2發(fā)送的消息會(huì)被先消費(fèi)。
2.6.5.2 生產(chǎn)者發(fā)送重復(fù)解決方案
2.6.5.2.1 啟動(dòng)kafka的冪等性
要啟動(dòng)kafka的冪等性,設(shè)置: enable.idempotence=true ,以及 ack=all 以及 retries > 1 。
2.6.5.2.2 ack=0,不重試。
可能會(huì)丟消息,適用于吞吐量指標(biāo)重要性高于數(shù)據(jù)丟失,例如:日志收集。
2.6.5.3 生產(chǎn)者和broke階段消息丟失場景
2.6.5.3.1 ack=0,不重試
生產(chǎn)者發(fā)送消息完,不管結(jié)果了,如果發(fā)送失敗也就丟失了。
2.6.5.3.2 ack=1,leader crash
生產(chǎn)者發(fā)送消息完,只等待Leader寫入成功就返回了,Leader分區(qū)丟失了,此時(shí)Follower沒來及同步,消息丟失。
2.6.5.3.3 unclean.leader.election.enable 配置true
允許選舉ISR以外的副本作為leader,會(huì)導(dǎo)致數(shù)據(jù)丟失,默認(rèn)為false。生產(chǎn)者發(fā)送異步消息,只等待Lead寫入成功就返回,Leader分區(qū)丟失,此時(shí)ISR中沒有Follower,Leader從OSR中選舉,因?yàn)镺SR中本來落后于Leader造成消息丟失。
2.6.5.4 解決生產(chǎn)者和broke階段消息丟失
2.6.5.4.1 禁用unclean選舉,ack=all
ack=all / -1,tries > 1,unclean.leader.election.enable : false
生產(chǎn)者發(fā)完消息,等待Follower同步完再返回,如果異常則重試。副本的數(shù)量可能影響吞吐量,不超過5個(gè),一般三個(gè)。
不允許unclean Leader選舉。
2.6.5.4.2 配置:min.insync.replicas > 1
當(dāng)生產(chǎn)者將 acks 設(shè)置為 all (或 -1 )時(shí), min.insync.replicas>1 。指定確認(rèn)消息寫成功需要的最小副本數(shù)量。達(dá)不到這個(gè)最小值,生產(chǎn)者將引發(fā)一個(gè)異常(要么是NotEnoughReplicas,要么是NotEnoughReplicasAfterAppend)。
當(dāng)一起使用時(shí), min.insync.replicas 和 ack 允許執(zhí)行更大的持久性保證。一個(gè)典型的場景是創(chuàng)建一個(gè)復(fù)制因子為3的主題,設(shè)置min.insync復(fù)制到2個(gè),用 all 配置發(fā)送。將確保如果大多數(shù)副本沒有
收到寫操作,則生產(chǎn)者將引發(fā)異常。
2.6.5.4.3 失敗的offset單獨(dú)記錄
生產(chǎn)者發(fā)送消息,會(huì)自動(dòng)重試,遇到不可恢復(fù)異常會(huì)拋出,這時(shí)可以捕獲異常記錄到數(shù)據(jù)庫或緩存,進(jìn)行單獨(dú)處理。
2.6.5.5 消費(fèi)者數(shù)據(jù)重復(fù)場景及解決方案
2.6.5.5.1 根本原因
數(shù)據(jù)消費(fèi)完沒有及時(shí)提交offset到broker。
2.6.5.5.2 場景
消息消費(fèi)端在消費(fèi)過程中掛掉沒有及時(shí)提交offset到broke,另一個(gè)消費(fèi)端啟動(dòng)拿之前記錄的offset開始消費(fèi),由于offset的滯后性可能會(huì)導(dǎo)致新啟動(dòng)的客戶端有少量重復(fù)消費(fèi)。
2.6.5.6 解決方案
2.6.5.6.1 取消自動(dòng)提交
每次消費(fèi)完或者程序退出時(shí)手動(dòng)提交。這可能也沒法保證一條重復(fù)。
2.6.5.6.2 下游做冪等
一般是讓下游做冪等或者盡量每消費(fèi)一條消息都記錄offset,對于少數(shù)嚴(yán)格的場景可能需要把offset或唯一ID(例如訂單ID)和下游狀態(tài)更新放在同一個(gè)數(shù)據(jù)庫里面做事務(wù)來保證精確的一次更新或者在下游數(shù)據(jù)表里面同時(shí)記錄消費(fèi)offset,然后更新下游數(shù)據(jù)的時(shí)候用消費(fèi)位移做樂觀鎖拒絕舊位移的數(shù)據(jù)更新。
2.6.6 __consumer_offsets
Zookeeper不適合大批量的頻繁寫入操作。
Kafka 1.0.2將consumer的位移信息保存在Kafka內(nèi)部的topic中,即__consumer_offsets主題,并且默認(rèn)提供了kafka_consumer_groups.sh腳本供用戶查看consumer信息。
由于默認(rèn)沒有指定key,所以根據(jù)round-robin方式,消息分布到不同的分區(qū)上。 (本例中生產(chǎn)了100條消息)
結(jié)果輸出表明100條消息全部生產(chǎn)成功!
輸出: console-consumer-49366 (記住這個(gè)id!)
注意:運(yùn)行下面命令前先要在consumer.properties中設(shè)置exclude.internal.topics=false
默認(rèn)情況下__consumer_offsets有50個(gè)分區(qū),如果你的系統(tǒng)中consumer group也很多的話,那么這個(gè)命令的輸出結(jié)果會(huì)很多。
這時(shí)候就用到了第5步獲取的group.id(本例中是console-consumer-49366)。Kafka會(huì)使用下面公式計(jì)算該group位移保存在__consumer_offsets的哪個(gè)分區(qū)上:
對應(yīng)的分區(qū)=Math.abs(“console-consumer-49366”.hashCode()) % 50 = 19,即__consumer_offsets的分區(qū)19保存了這個(gè)consumer group的位移信息。
下面是輸出結(jié)果:
上圖可見,該consumer group果然保存在分區(qū)11上,且位移信息都是對的(這里的位移信息是已消費(fèi)的位移,嚴(yán)格來說不是第3步中的位移。由于我的consumer已經(jīng)消費(fèi)完了所有的消息,所以這里的位移與第3步中的位移相同)。另外,可以看到__consumer_offsets topic的每一日志項(xiàng)的格式都是:
[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]。
2.7 延時(shí)隊(duì)列
兩個(gè)follower副本都已經(jīng)拉取到了leader副本的最新位置,此時(shí)又向leader副本發(fā)送拉取請求,而leader副本并沒有新的消息寫入,那么此時(shí)leader副本該如何處理呢?可以直接返回空的拉取結(jié)果給follower副本,不過在leader副本一直沒有新消息寫入的情況下,follower副本會(huì)一直發(fā)送拉取請求,并且總收到空的拉取結(jié)果,消耗資源。
Kafka在處理拉取請求時(shí),會(huì)先讀取一次日志文件,如果收集不到足夠多(fetchMinBytes,由參數(shù)fetch.min.bytes配置,默認(rèn)值為1)的消息,那么就會(huì)創(chuàng)建一個(gè)延時(shí)拉取操作(DelayedFetch)以等待
拉取到足夠數(shù)量的消息。當(dāng)延時(shí)拉取操作執(zhí)行時(shí),會(huì)再讀取一次日志文件,然后將拉取結(jié)果返回給follower副本。
延遲操作不只是拉取消息時(shí)的特有操作,在Kafka中有多種延時(shí)操作,比如延時(shí)數(shù)據(jù)刪除、延時(shí)生產(chǎn)等。
對于延時(shí)生產(chǎn)(消息)而言,如果在使用生產(chǎn)者客戶端發(fā)送消息的時(shí)候?qū)cks參數(shù)設(shè)置為-1,那么就意味著需要等待ISR集合中的所有副本都確認(rèn)收到消息之后才能正確地收到響應(yīng)的結(jié)果,或者捕獲超時(shí)異常。
假設(shè)某個(gè)分區(qū)有3個(gè)副本:leader、follower1和follower2,它們都在分區(qū)的ISR集合中。不考慮ISR變動(dòng)的情況,Kafka在收到客戶端的生產(chǎn)請求后,將消息3和消息4寫入leader副本的本地日志文件。
由于客戶端設(shè)置了acks為-1,那么需要等到follower1和follower2兩個(gè)副本都收到消息3和消息4后才能告知客戶端正確地接收了所發(fā)送的消息。如果在一定的時(shí)間內(nèi),follower1副本或follower2副本沒能夠完全拉取到消息3和消息4,那么就需要返回超時(shí)異常給客戶端。生產(chǎn)請求的超時(shí)時(shí)間由參數(shù)request.timeout.ms配置,默認(rèn)值為30000,即30s。
那么這里等待消息3和消息4寫入follower1副本和follower2副本,并返回相應(yīng)的響應(yīng)結(jié)果給客戶端的動(dòng)作是由誰來執(zhí)行的呢?在將消息寫入leader副本的本地日志文件之后,Kafka會(huì)創(chuàng)建一個(gè)延時(shí)的生
產(chǎn)操作(DelayedProduce),用來處理消息正常寫入所有副本或超時(shí)的情況,以返回相應(yīng)的響應(yīng)結(jié)果給客戶端。
延時(shí)操作需要延時(shí)返回響應(yīng)的結(jié)果,首先它必須有一個(gè)超時(shí)時(shí)間(delayMs),如果在這個(gè)超時(shí)時(shí)間內(nèi)沒有完成既定的任務(wù),那么就需要強(qiáng)制完成以返回響應(yīng)結(jié)果給客戶端。其次,延時(shí)操作不同于定時(shí)
操作,定時(shí)操作是指在特定時(shí)間之后執(zhí)行的操作,而延時(shí)操作可以在所設(shè)定的超時(shí)時(shí)間之前完成,所以延時(shí)操作能夠支持外部事件的觸發(fā)。
就延時(shí)生產(chǎn)操作而言,它的外部事件是所要寫入消息的某個(gè)分區(qū)的HW(高水位)發(fā)生增長。也就是說,隨著follower副本不斷地與leader副本進(jìn)行消息同步,進(jìn)而促使HW進(jìn)一步增長,HW每增長一次
都會(huì)檢測是否能夠完成此次延時(shí)生產(chǎn)操作,如果可以就執(zhí)行以此返回響應(yīng)結(jié)果給客戶端;如果在超時(shí)時(shí)間內(nèi)始終無法完成,則強(qiáng)制執(zhí)行。
延時(shí)拉取操作,是由超時(shí)觸發(fā)或外部事件觸發(fā)而被執(zhí)行的。超時(shí)觸發(fā)很好理解,就是等到超時(shí)時(shí)間之后觸發(fā)第二次讀取日志文件的操作。外部事件觸發(fā)就稍復(fù)雜了一些,因?yàn)槔≌埱蟛粏螁斡蒮ollower副本發(fā)起,也可以由消費(fèi)者客戶端發(fā)起,兩種情況所對應(yīng)的外部事件也是不同的。如果是follower副本的延時(shí)拉取,它的外部事件就是消息追加到了leader副本的本地日志文件中;如果是消費(fèi)者客戶端的延
時(shí)拉取,它的外部事件可以簡單地理解為HW的增長。
時(shí)間輪實(shí)現(xiàn)延時(shí)隊(duì)列。
TimeWheel。size,每個(gè)單元格的時(shí)間,每個(gè)單元格都代表一個(gè)時(shí)間,size*每個(gè)單元格的時(shí)間就是一個(gè)周期。
2.8 重試隊(duì)列
kafka沒有重試機(jī)制不支持消息重試,也沒有死信隊(duì)列,因此使用kafka做消息隊(duì)列時(shí),需要自己實(shí)現(xiàn)消息重試的功能。
自定義實(shí)現(xiàn)步驟
創(chuàng)建新的kafka主題作為重試隊(duì)列:
代碼實(shí)現(xiàn)
版本:2.2.8
dependencies選擇如下三個(gè):
pom.xml
總結(jié)
以上是生活随笔為你收集整理的4.2.4 Kafka高级特性解析(物理存储、稳定性:事物,控制器,可靠性,一致性,_consumer_offsets、延时队列、自定义重试队列)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 职场语录:新人,没人会告诉你的职场潜规则
- 下一篇: 工业物联网·能耗监控智慧空调接入华为云解