6.3.3 延迟缓存
副本管理器針對生產(chǎn)請求和l拉取請求都有一個全局的延遲緩存,生產(chǎn)請求對應(yīng)延遲緩存中存儲了延遲的生產(chǎn)(DelayedProduce),拉取請求對應(yīng)延遲緩存中存儲了延遲的拉取(DelayedFetch)。Kafka的延遲緩存數(shù)據(jù)結(jié)構(gòu)(DelayedOperatlonPurgatory)和上一節(jié)的Purgatory類似。下面的代碼片段以延遲的生產(chǎn)和拉取為例,列舉了副本管理器中,與延遲緩存、延遲操作相關(guān)的方法:
延遲緩存除了管理延遲操作,還要從分區(qū)角度嘗試完成延遲的操作,延遲緩存主要有下面兩個方法。
- tryCompleteElseWatch()方法。嘗試完成延遲的操作,如果不能完成,將延遲操作加入延遲緩存中。一旦將延遲操作加入延遲緩存的監(jiān)控,延遲操作的每個分區(qū)都會監(jiān)視該延遲操作。
- checkAndComplete()方法。它的參數(shù)不是延遲操作對象,而是延遲緩存的鍵(分區(qū))。外部事件調(diào)用該方法,根據(jù)指定的鍵(分區(qū)),嘗試完成延遲緩存中的延遲操作。
注意:本章分析的延遲生產(chǎn)和延遲拉取,在豆豆遲緩存中的鍵都是分區(qū),但延遲緩存的鍵并不一定就是分區(qū)。比如上一幸延遲的加入和延遲的心跳,在延遲緩存中的鍵分別是消費(fèi)組和消費(fèi)者編號,而不是分區(qū)。
服務(wù)端創(chuàng)建的延遲操作有多個分區(qū),在加入到延遲緩存時,每個分區(qū)都對應(yīng)相同的延遲操作。服務(wù)端在剛創(chuàng)建延遲操作時,因?yàn)闆]有滿足條件,所以才會創(chuàng)建延遲的操作。以6.3.2節(jié)“4.延遲生產(chǎn)的示例”為例,服務(wù)端處理生產(chǎn)請求,將消息集寫到分區(qū)[凹,凹,P3],并創(chuàng)建了延遲的生產(chǎn)。服務(wù)端將延遲的生產(chǎn)加入到延遲緩存中,正常的結(jié)果是[Pl->DelayedProduce,PZ->DelayedProduce,P3->DelayedProduce]。但如果在加入的過程中,延遲的生產(chǎn)滿足了條件,即3個分區(qū)的備份副本都同步了主副本的消息,那么服務(wù)端就不需要再監(jiān)控這個延遲的操作了。比如服務(wù)端將Pl->DelayedProduce加入延遲緩再后,延遲的生產(chǎn)可以完成,那么剩下的[PZ->DelayedProduce,P3->DelayedProduce]就不會被1111入延遲緩存了。
延遲緩存的tryCoMpleteElseWatch()方法將延遲操作加入延遲緩存之前,會先嘗試一次完成延遲的操作。如果不能完成,才會調(diào)用watchForOperation()方法將延遲操作加入到分區(qū)對應(yīng)的監(jiān)視器(Watchers)。在這之后,還會再次嘗試一次完成延遲的操作,如果還不能完成,才會將延遲操作加入定時幫(Timer)。相關(guān)代碼如下:
延遲操作不僅存在于延遲緩存中,還會被定時器監(jiān)控。延遲操作在延遲緩存中的生命周期分別與外部事件、定時持有關(guān)。下面兩點(diǎn)解釋了延遲操作在延遲緩存中的生命周期。
- 將延遲操作加入延遲緩存,目的是讓外部事件有機(jī)會嘗試完成延遲的操作。當(dāng)滿足條件,可以完成延遲操作H才,服務(wù)端才會返回響應(yīng)結(jié)果給客戶端,并將延遲操作從延遲緩存中刪除。
- 將延遲操作力[I入定時器,目的是在延遲操作超時后,服務(wù)端可以強(qiáng)制返回響應(yīng)結(jié)果給客戶端。注意:延遲緩存的作用是:外部事件可以根據(jù)分區(qū),嘗試完成監(jiān)視器的所有延遲操作。定時器的作用是:在延遲操作超時后,強(qiáng)制完成延遲的操作。兩者都保存了延遲操作,但前者有分區(qū),后者沒有分區(qū)。被定時器監(jiān)控的延遲操作,并不需要分區(qū),因?yàn)槎〞r器與分區(qū)無關(guān)。
延遲緩’存的每個鍵都有一個監(jiān)視器,它管理了鏈表結(jié)構(gòu)的延遲操作。外部事件發(fā)生時,會給定一個鍵,然后i用用這個健對應(yīng)監(jiān)視器的tryCompleteWatched()方法,嘗試完成監(jiān)視器中所有的延遲操作。監(jiān)視器嘗i式完成所有延遲操作的過程中,會調(diào)用每個延遲操作的tryComplete()方法,判斷能否完成延遲的操作。如果某個延遲操作能夠完成,貝lj將對應(yīng)的延遲操作從鏈表中移除。相關(guān)代碼如下:
6.3.2節(jié)提到,外部事件根據(jù)指定分區(qū)嘗試完成延遲的操作。如果延遲操作可以完成,只會從延遲緩存中刪除這個分區(qū)中已經(jīng)完成的延遲操作,并不會刪除其他分區(qū)中已經(jīng)完成的延遲操作。監(jiān)視器的purgeC0111pleted()方法會清理所有已經(jīng)完成的延遲操作,這個方法會被清理線程調(diào)用。
如圖6-64所示,以6.3.2節(jié)的延遲拉取為例,外部事件嘗試完成分區(qū)凹的延遲操作,可以完成DelayedFetch2和DelayedFetch4,它們會立即從延遲緩存中刪除。另外,定時的清理線程會檢查所有的監(jiān)視器,在檢查到DelayedFetch2和DelayedFetch4時,才會將其從分區(qū)P2和分區(qū)內(nèi)的監(jiān)視器中移除。
下面對比了監(jiān)視器嘗試完成延遲的操作、清理已完成的延遲操作兩個方法的不同點(diǎn)。
- 嘗試完成時會先判斷延遲操作是否已經(jīng)完成,如果沒有,則調(diào)用每個延遲操作的tryComplete()方法。這兩者的返回值只要是true,就會刪除當(dāng)前的延遲操作。
- 清理已完成的延遲操作,并不會調(diào)用延遲操作的tryComplete()方法,而是直接判斷延遲操作是否已經(jīng)完成,如果是,則從監(jiān)視器中刪除當(dāng)前的延遲操作。
清理線程的作用是清理所有監(jiān)視器中已經(jīng)完成的延遲操作。它作為延遲緩存的內(nèi)部類,需要訪問延遲緩存的watchersForKey成員變量才能正常地展開工作。另外,清理器每次運(yùn)行時都會增加定時器的時鐘。下面列汁1了清理器與延遲緩存、定時器相關(guān)的代碼:
延遲緩存的tryCompleteElseWatch()方法在將延遲操作加入指定鍵的監(jiān)視器后,會增加esti~時edTotalOperations計(jì)數(shù)器,并往定時器的延遲隊(duì)列中添加延遲的操作。清理線程的運(yùn)行方法根據(jù)計(jì)數(shù)器的值減去定時器的大小(delayed變量),正常來看這個差距會等于零。
但實(shí)際上,清理器在運(yùn)行時會先調(diào)用定時器的advanceClock()方法,將定時器的時鐘往前移動一次。定時器在運(yùn)行時,如果延遲的操作超時了,就會將延遲操作從定時器的延遲隊(duì)列中移除。一旦延遲操作從定時器中刪除,定時器的大小就會減少,那么計(jì)數(shù)器減去定時器的大小就會大于零。最后,清理線程就會滿足“差距大于purgeinterval這個條件”,開始清理延遲緩存中所有的監(jiān)視器。
Kafka服務(wù)端創(chuàng)建的延遲操作(DelayedOperation)會作為一個定時任務(wù)(TifilerTask),加入定時器(T"imer)的延遲隊(duì)列(DelayQueue)。當(dāng)延遲操作超時后,定時器會將延遲操作從延遲隊(duì)列中彈:-H,并調(diào)用延遲操作的運(yùn)行方法,強(qiáng)制完成延遲的操作。
定時器使用“延遲隊(duì)列”管理服務(wù)端創(chuàng)建的所有延遲操作,延遲隊(duì)列的每個元素是定時任務(wù)列表(TimerTaskli.st),一個定時任務(wù)列表可以存放多個定時任務(wù)條目(Ti.merTaskEntry)。服務(wù)端創(chuàng)建的延遲操作對象,會先包裝成定時任務(wù)條目,然后才會加入延遲隊(duì)列指定的一個定時任務(wù)列表。“延遲隊(duì)列”是定時器中保存“定時任務(wù)列表”的全局?jǐn)?shù)據(jù)結(jié)構(gòu),但服務(wù)端創(chuàng)建的“延遲操作”不是直接加入“定時任務(wù)列表”,而是加入到“時間輪”(Ti.mi.ngWheel),延遲隊(duì)列和時間輪之間的關(guān)系如下。
列中。 (2)超時的定時任務(wù)列表會被延遲隊(duì)列的poll()方法彈:-H。定時任務(wù)列表超時并不一定表示定時任務(wù)超時,將定時任務(wù)重新加入時間輪,如果加入失敗,說明定時任務(wù)的確超時,通過錢程池執(zhí)行任務(wù)。 (3)執(zhí)行延遲操作對應(yīng)的定時任務(wù),只在定時器的addTi.merTaskEntry()方法f~I調(diào)用。所以在advanceClock()方法中將定時任務(wù)列表從延遲隊(duì)列中彈出后,調(diào)用定時任務(wù)列表的flush()方法將所有的定時任務(wù)重新加入時間輪,這樣才有機(jī)會執(zhí)行超時的定時任務(wù)。 (4)延遲隊(duì)列的poll()方法只會彈出超時的定時任務(wù)列表,隊(duì)列中的每個元素按照超時時間排序,如果第一個定時任務(wù)列表都沒有過期,那么其他定時任務(wù)歹lj表也一定不會超時。假設(shè)調(diào)用advanceClock()方法時,第一次調(diào)用延遲隊(duì)列的poll()方法會彈II\一個超時的定時任務(wù)列表,第二次調(diào)用延遲隊(duì)列沒有參數(shù)的poll()方法沒有超時的定時任務(wù)列表,就不會再彈州定時任務(wù)列表了。定時器的相關(guān)代碼如下:
注意:延遲操作本身的失效時間(expirationMs)等于客戶端請求設(shè)直的延遲時間(delayMs)加上當(dāng)前時間,它是一個絕對的時間戳。比如客戶端請求設(shè)置的延這時間是10秒,當(dāng)前時間是2017-1-110:00:00,那么延遲操作的失效時間等于2017-1-110:00:10。Java的延遲隊(duì)列是一個基于時間的優(yōu)先級隊(duì)列,延遲隊(duì)列的元素(即每個定時任務(wù)列表)都有一個失效時間,這個失效時間也是一個絕對的時間截。不過,定時任務(wù)列表在實(shí)現(xiàn)Delayed接- 的getDelay()方法,則妥將絕對的失效時間減去當(dāng)前時間,表示定時任務(wù)列表在多長時間之后會過期。當(dāng)getDelay()方法返回值小于等于零時,就表示定時任務(wù)列表已經(jīng)過期,需妥立即衫t-1于。
時間輪類似于一個環(huán)形緩沖區(qū),不同的是,加入環(huán)形緩沖區(qū)的數(shù)據(jù)只能順序加入,而加入時間輪的數(shù)據(jù)可以不按順序加入。并且,如果當(dāng)前時間輪放不下加入的數(shù)據(jù)時,它會創(chuàng)建一個更高層的時間輪。第一層時間輪的tickMs=l表示一格的長度是l毫秒,wheelSize=20表示一共20格,它的范圍是20毫秒。第二層時間輪的tickMs=20表示一格的長度是20毫秒,它的范圍是400毫秒。如圖6-65所示,假設(shè)有5個定時任務(wù),它們的超時時間分別是[8,8,25,3日,35]。前2個定時任務(wù)會加入到第一個時間輪的第八個桶,后3個定時任務(wù)會加入到第二個時間輪的第一個桶中。
定時器只持有第一層時間輪的引用,并不會持有其他更高層的時間輪。比如上面的示例中,第一層時間輪會持有第二層時間輪的引用,如果還有第三層時間輪,則第二層時間輪會持有第三層時間輪的引用定時器將定時任務(wù)加入當(dāng)前時間輪,要判斷定時任務(wù)的失效時間是再在當(dāng)前時間輪的范圍內(nèi)。如果不在當(dāng)前時間輪的范圍內(nèi),則要將定時任務(wù)上升到更高一層的時間輪中。相關(guān)代碼如下:
以前面5個定時任務(wù)為例來分析層級時間輪的工作方式。如圖6-66所示,當(dāng)前時間為8毫秒時,第一層時間輪的buckets定時任務(wù)列表超時,會被延遲隊(duì)列彈出。在將定時任務(wù)列表中的定時任務(wù)重新加入第一層時間輪時,由于定時任務(wù)的失效時間小于當(dāng)前時間加上tickMs=lr怡,所以加入失敗。
如圖6-67所示,當(dāng)前時間為20毫秒時,第二層時間輪的bucketl定時任務(wù)列表超時,也會被延遲隊(duì)列彈:1:\。不同的是:在將定時任務(wù)列表中的定時任務(wù)重新加入第一層時間輪時,3個定時任務(wù)都還沒有失效。并且,它們都在第一層時間輪的范圍內(nèi),所以允許重新加入定時器的第一層時間輪中。
如圖6-68所示,最終第二層時間輪bucketl定時任務(wù)列表的3個定時任務(wù)都被降級后,加入到第一層時間輪3個不同的定時任務(wù)列表中,分別是[bucket5,bucket10,bucket10]。后續(xù)這3個定時任務(wù)的執(zhí)行和圖6-66類似,一旦超時被延遲隊(duì)列彈出,再次加入定時器就會失敗,并且會立即執(zhí)行定時任務(wù),強(qiáng)制完成延遲的操作。
本節(jié)分析了延遲操作在延遲緩存和定時器中的生命周期,外部事件嘗試完成延遲緩存中的延遲操作,定時器會在延遲操作失效后強(qiáng)制完成延遲操作。清理器會定期地刪除延遲緩存中已經(jīng)完成的延遲操作。
6.4 小結(jié)
本章主要分析了日志存儲、日志管理、副本管理器的具體實(shí)現(xiàn)。下面分別總結(jié)這3個知識點(diǎn)的一些要點(diǎn)。日志存儲會將消息集寫到底層的日志文件,它的主要概念有以下幾點(diǎn)。
- 一個日志(Log)有多個日志分段(LogSegment)。每個日志分段由數(shù)據(jù)文件(FileMessageSet)和索引文件(Offsetindex)組成。
- 偏移量是消息最重要的組成部分。每條消息寫入底層數(shù)據(jù)文件,者IS會有一個遞增的偏移量。
- 索引文件保存了消息偏移量到物理位置的映射關(guān)系,但并不是保存數(shù)據(jù)文件的所有消息,而是間隔一定數(shù)量的消息才保存一條映射關(guān)系。索引文件保存的偏移量是相對偏移鹽,數(shù)據(jù)文件中每條消息的偏移量是分區(qū)級別的絕對偏移量。
- 存儲索引文件的條目時,將絕對偏移量減去日志分段的基準(zhǔn)偏移鹽。查詢索引文件返回的相對偏移量要加上基準(zhǔn)偏移量,才能用于查詢數(shù)據(jù)文件。
- 客戶端每次讀取數(shù)據(jù)文件,服務(wù)端都會創(chuàng)建一個文件視圖,文件視圖和底層數(shù)據(jù)文件共用一個文件通道,但擁有不同的開始位置和結(jié)束位置。
- 服務(wù)端返回文件視圖給客戶端,采用零拷貝技術(shù),將底層文件通道的數(shù)據(jù)直接傳輸?shù)骄W(wǎng)絡(luò)通道。
日志管理器(LogMa_!1ager)管理了服務(wù)端的所有日志,除了上面對日志的追加和讀取操作外,日志管理還有下面幾個后臺管理的線程類。
- 定時將數(shù)據(jù)文件寫到磁盤上、定時將恢復(fù)點(diǎn)寫入檢查點(diǎn)文件。
- 日志清理線程根據(jù)日志的大小和時間清理最舊的日志分段。
- 日志壓縮線程將相同鍵的不同消息進(jìn)行壓縮,壓縮線程將日志按照清理點(diǎn)分成頭部和尾部。
副本管理然(ReplicaManager)保存了服務(wù)端的所有分區(qū),并處理客戶端發(fā)送的讀寫請求。
- 副本管理器處理讀寫請求,會先操作分區(qū)的主副本。appendMessages()方法會將消息集寫入主副本的本地臼志,fetchMessage()方法會從主副本的本地日志讀取消息集。
- 每個分區(qū)都有一個主副本和多個備份副本,只有本地副本才有日志對象。副本有兩個重要的位置信息:LEO表示副本的最新偏移量,HW表示副本的最高水位。
- 生產(chǎn)請求的應(yīng)答值(acks)需要服務(wù)端創(chuàng)建延遲的生產(chǎn)(DelayedProduce),拉取請求的最少字節(jié)數(shù)(fetchMinBytes)需要服務(wù)端創(chuàng)建延遲的拉取(DelayedFetch)。
- 延遲緩存會記錄分區(qū)到延遲操作的映射關(guān)系,外部事件會根據(jù)分區(qū)嘗試完成延遲的操作。
- 延遲緩存有監(jiān)視器、清理器、定時器協(xié)調(diào)完成延遲的操作。
在0.8版本以前,Kafka并沒有日志復(fù)制的特性,因此一旦消息代理節(jié)點(diǎn)掛掉,這個節(jié)點(diǎn)上的數(shù)據(jù)就會丟失。在0.8版本以后,Kafka提供了日志的副本機(jī)制,雖然只有主副本可以響應(yīng)數(shù)據(jù)的讀寫請求,但是備份副本會向主副本中及時地同步數(shù)據(jù)。這樣,當(dāng)主副本掛掉后,備份副本就可以選舉出新的主副本,并繼續(xù)響應(yīng)客戶端的讀寫請求。下一章我們來分析服務(wù)端如何實(shí)現(xiàn)副本的復(fù)制特性。
總結(jié)
以上是生活随笔為你收集整理的6.3.3 延迟缓存的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: VM技术(一)NES模拟器VM综述
- 下一篇: BetterAndBetter--Mac