日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

跟Kafka学技术系列之时间轮

發(fā)布時間:2025/3/21 编程问答 15 豆豆
生活随笔 收集整理的這篇文章主要介紹了 跟Kafka学技术系列之时间轮 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

寫在前面

kafka是一個分布式消息中間件,其高可用高吞吐的特點是大數(shù)據(jù)領(lǐng)域首選的消息中間件,Kafka是分布式消息隊列的順序讀寫文件分段組織串聯(lián)起來思想的鼻祖,包括RocketMq這些消息隊列都是借鑒了Kafka早期的架構(gòu)和設(shè)計思路改造而來,所以在架構(gòu)設(shè)計層面,Kafka有非常多值得借鑒的地方。本文是作者介紹Kafka優(yōu)秀架構(gòu)設(shè)計文章中的一篇,文中的代碼和流程圖均是base on 0.10.2.0版本。

引出環(huán)形隊列和延遲隊列

從2個面試題說起,第1個問題,如果一臺機器上有10w個定時任務(wù),如何做到高效觸發(fā)?

具體場景是:

有一個APP實時消息通道系統(tǒng),對每個用戶會維護一個APP到服務(wù)器的TCP連接,用來實時收發(fā)消息,對這個TCP連接,有這樣一個需求:“如果連續(xù)30s沒有請求包(例如登錄,消息,keepalive包),服務(wù)端就要將這個用戶的狀態(tài)置為離線”。

其中,單機TCP同時在線量約在10w級別,keepalive請求包較分散大概30s一次,吞吐量約在3000qps。

怎么做?

常用方案使用time定時任務(wù),每秒掃描一次所有連接的集合Map<uid, last_packet_time>,把連接時間(每次有新的請求更新對應(yīng)連接的連接時間)比當前時間的差值大30s的連接找出來處理。

另一種方案,使用環(huán)形隊列法:

三個重要的數(shù)據(jù)結(jié)構(gòu):

1)30s超時,就創(chuàng)建一個index從0到30的環(huán)形隊列(本質(zhì)是個數(shù)組)

2)環(huán)上每一個slot是一個Set<uid>,任務(wù)集合

3)同時還有一個Map<uid, index>,記錄uid落在環(huán)上的哪個slot里

這樣當有某用戶uid有請求包到達時:

1)從Map結(jié)構(gòu)中,查找出這個uid存儲在哪一個slot里

2)從這個slot的Set結(jié)構(gòu)中,刪除這個uid

3)將uid重新加入到新的slot中,具體是哪一個slot呢 => Current Index指針所指向的上一個slot,因為這個slot,會被timer在30s之后掃描到

4)更新Map,這個uid對應(yīng)slot的index值

哪些元素會被超時掉呢?

Current Index每秒種移動一個slot,這個slot對應(yīng)的Set<uid>中所有uid都應(yīng)該被集體超時!如果最近30s有請求包來到,一定被放到Current Index的前一個slot了,Current Index所在的slot對應(yīng)Set中所有元素,都是最近30s沒有請求包來到的。

所以,當沒有超時時,Current Index掃到的每一個slot的Set中應(yīng)該都沒有元素。

兩種方案對比:

方案一每次都要輪詢所有數(shù)據(jù),而方案二使用環(huán)形隊列只需要輪詢這一刻需要過期的數(shù)據(jù),如果沒有數(shù)據(jù)過期則沒有數(shù)據(jù)要處理,并且是批量超時,并且由于是環(huán)形結(jié)構(gòu)更加節(jié)約空間,這很適合高性能場景。

第二個問題:在開發(fā)過程中有延遲一定時間的任務(wù)要執(zhí)行,怎么做?

如果不重復(fù)造輪子的話,我們的選擇當然是延遲隊列或者Timer。

延遲隊列和在Timer中增 加延時任務(wù)采用數(shù)組表示的最小堆的數(shù)據(jù)結(jié)構(gòu)實現(xiàn),每次放入新元素和移除隊首元素時間復(fù)雜度為O(nlog(n))。

時間輪

方案二所采用的環(huán)形隊列,就是時間輪的底層數(shù)據(jù)結(jié)構(gòu),它能夠讓需要處理的數(shù)據(jù)(任務(wù)的抽象)集中,在Kafka中存在大量的延遲操作,比如延遲生產(chǎn)、延遲拉取以及延遲刪除等。Kafka并沒有使用JDK自帶的Timer或者DelayQueue來實現(xiàn)延遲的功能,而是基于時間輪自定義了一個用于實現(xiàn)延遲功能的定時器(SystemTimer)。JDK的Timer和DelayQueue插入和刪除操作的平均時間復(fù)雜度為O(nlog(n)),并不能滿足Kafka的高性能要求,而基于時間輪可以將插入和刪除操作的時間復(fù)雜度都降為O(1)。時間輪的應(yīng)用并非Kafka獨有,其應(yīng)用場景還有很多,在Netty、Akka、Quartz、Zookeeper等組件中都存在時間輪的蹤影。

時間輪的數(shù)據(jù)結(jié)構(gòu)

參考下圖,Kafka中的時間輪(TimingWheel)是一個存儲定時任務(wù)的環(huán)形隊列,底層采用數(shù)組實現(xiàn),數(shù)組中的每個元素可以存放一個定時任務(wù)列表(TimerTaskList)。TimerTaskList是一個環(huán)形的雙向鏈表,鏈表中的每一項表示的都是定時任務(wù)項(TimerTaskEntry),其中封裝了真正的定時任務(wù)TimerTask。在Kafka源碼中對這個TimeTaskList是用一個名稱為buckets的數(shù)組表示的,所以后面介紹中可能TimerTaskList也會被稱為bucket。

時間輪相關(guān)名詞解釋

tickMs:時間輪由多個時間格組成,每個時間格就是tickMs,它代表當前時間輪的基本時間跨度。

wheelSize:代表每一層時間輪的格數(shù)

interval:當前時間輪的總體時間跨度,interval=tickMs × wheelSize

startMs:構(gòu)造當層時間輪時候的當前時間,第一層的時間輪的startMs是TimeUnit.NANOSECONDS.toMillis(nanoseconds()),上層時間輪的startMs為下層時間輪的currentTime。

currentTime:表示時間輪當前所處的時間,currentTime是tickMs的整數(shù)倍(通過currentTime=startMs - (startMs % tickMs來保正currentTime一定是tickMs的整數(shù)倍),這個運算類比鐘表中分鐘里65秒分針指針指向的還是1分鐘)。currentTime可以將整個時間輪劃分為到期部分和未到期部分,currentTime當前指向的時間格也屬于到期部分,表示剛好到期,需要處理此時間格所對應(yīng)的TimerTaskList的所有任務(wù)。

時間輪中的任務(wù)存放

若時間輪的tickMs=1ms,wheelSize=20,那么可以計算得出interval為20ms。初始情況下表盤指針currentTime指向時間格0,此時有一個定時為2ms的任務(wù)插入進來會存放到時間格為2的TimerTaskList中。隨著時間的不斷推移,指針currentTime不斷向前推進,過了2ms之后,當?shù)竭_時間格2時,就需要將時間格2所對應(yīng)的TimeTaskList中的任務(wù)做相應(yīng)的到期操作。此時若又有一個定時為8ms的任務(wù)插入進來,則會存放到時間格10中,currentTime再過8ms后會指向時間格10。如果同時有一個定時為19ms的任務(wù)插入進來怎么辦?新來的TimerTaskEntry會復(fù)用原來的TimerTaskList,所以它會插入到原本已經(jīng)到期的時間格1中。總之,整個時間輪的總體跨度是不變的,隨著指針currentTime的不斷推進,當前時間輪所能處理的時間段也在不斷后移,總體時間范圍在currentTime和currentTime+interval之間。

時間輪的升降級

如果此時有個定時為350ms的任務(wù)該如何處理?直接擴充wheelSize的大小么?Kafka中不乏幾萬甚至幾十萬毫秒的定時任務(wù),這個wheelSize的擴充沒有底線,就算將所有的定時任務(wù)的到期時間都設(shè)定一個上限,比如100萬毫秒,那么這個wheelSize為100萬毫秒的時間輪不僅占用很大的內(nèi)存空間,而且效率也會拉低。Kafka為此引入了層級時間輪的概念,當任務(wù)的到期時間超過了當前時間輪所表示的時間范圍時,就會嘗試添加到上層時間輪中

參考上圖,復(fù)用之前的案例,第一層的時間輪tickMs=1ms, wheelSize=20, interval=20ms。第二層的時間輪的tickMs為第一層時間輪的interval,即為20ms。每一層時間輪的wheelSize是固定的,都是20,那么第二層的時間輪的總體時間跨度interval為400ms。以此類推,這個400ms也是第三層的tickMs的大小,第三層的時間輪的總體時間跨度為8000ms。

剛才提到的350ms的任務(wù),不會插入到第一層時間輪,會插入到interval=20*20的第二層時間輪中,具體插入到時間輪的哪個bucket呢?先用350/tickMs(20)=virtualId(17),然后virtualId(17) %wheelSize (20) = 17,所以350會放在第17個bucket。如果此時有一個450ms后執(zhí)行的任務(wù),那么會放在第三層時間輪中,按照剛才的計算公式,會放在第0個bucket。第0個bucket里會包含

[400,800)ms的任務(wù)。隨著時間流逝,當時間過去了400ms,那么450ms后就要執(zhí)行的任務(wù)還剩下50ms的時間才能執(zhí)行,此時有一個時間輪降級的操作,將50ms任務(wù)重新提交到層級時間輪中,那么此時50ms的任務(wù)根據(jù)公式會放入第二個時間輪的第2個bucket中,此bucket的時間范圍為[40,60)ms,然后再經(jīng)過40ms,這個50ms的任務(wù)又會被監(jiān)控到,此時距離任務(wù)執(zhí)行還有10ms,同樣將10ms的任務(wù)提交到層級時間輪,此時會加入到第一層時間輪的第10個bucket,所以再經(jīng)過10ms后,此任務(wù)到期,最終執(zhí)行。

整個時間輪的升級降級操作是不是很類似于我們的時鐘? 第一層時間輪tickMs=1s, wheelSize=60,interval=1min,此為秒鐘;第二層tickMs=1min,wheelSize=60,interval=1hour,此為分鐘;第三層tickMs=1hour,wheelSize為12,interval為12hours,此為時鐘。而鐘表的指針就對應(yīng)程序中的currentTime,這個后面分析代碼時候會講到(對這個的理解也是時間輪理解的重點和難點)。

Kafka中任務(wù)添加和驅(qū)動時間輪滾動的核心流程:

重點代碼介紹

這是往SystenTimer中添加一個任務(wù)

//在Systemtimer中添加一個任務(wù),任務(wù)被包裝為一個TimerTaskEntry private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = { //先判斷是否可以添加進時間輪中,如果不可以添加進去代表任務(wù)已經(jīng)過期或者任務(wù)被取消,注意這里的timingWheel持有上一層時間輪的引用,所以可能存在遞歸調(diào)用if (!timingWheel.add(timerTaskEntry)) {// Already expired or cancelledif (!timerTaskEntry.cancelled)//過期任務(wù)直接線程池異步執(zhí)行掉taskExecutor.submit(timerTaskEntry.timerTask)} } 復(fù)制

timingWheel添加任務(wù),遞歸添加直到添加該任務(wù)進合適的時間輪的bucket中

def add(timerTaskEntry: TimerTaskEntry): Boolean = {val expiration = timerTaskEntry.expirationMs//任務(wù)取消if (timerTaskEntry.cancelled) {// Cancelledfalse} else if (expiration < currentTime + tickMs) {// 任務(wù)過期后會被執(zhí)行false} else if (expiration < currentTime + interval) {//任務(wù)過期時間比當前時間輪時間加周期小說明任務(wù)過期時間在本時間輪周期內(nèi)val virtualId = expiration / tickMs//找到任務(wù)對應(yīng)本時間輪的bucketval bucket = buckets((virtualId % wheelSize.toLong).toInt)bucket.add(timerTaskEntry)// Set the bucket expiration time//只有本bucket內(nèi)的任務(wù)都過期后才會bucket.setExpiration返回true此時將bucket放入延遲隊列if (bucket.setExpiration(virtualId * tickMs)) {//bucket是一個TimerTaskList,它實現(xiàn)了java.util.concurrent.Delayed接口,里面是一個多任務(wù)組成的鏈表,圖2有說明queue.offer(bucket)}true} else {// Out of the interval. Put it into the parent timer//任務(wù)的過期時間不在本時間輪周期內(nèi)說明需要升級時間輪,如果不存在則構(gòu)造上一層時間輪,繼續(xù)用上一層時間輪添加任務(wù)if (overflowWheel == null) addOverflowWheel()overflowWheel.add(timerTaskEntry)} }復(fù)制

在本層級時間輪里添加上一層時間輪里的過程,注意的是在下一層時間輪的interval為上一層時間輪的tickMs

private[this] def addOverflowWheel(): Unit = {synchronized {if (overflowWheel == null) {overflowWheel = new TimingWheel(tickMs = interval,wheelSize = wheelSize,startMs = currentTime,taskCounter = taskCounter,queue)}} } 復(fù)制

驅(qū)動時間輪滾動過程:

//注意這里會存在一個遞歸,一直驅(qū)動時間輪的指針滾動直到時間不足于驅(qū)動上層的時間輪滾動。 def advanceClock(timeMs: Long): Unit = {if (timeMs >= currentTime + tickMs) {//把當前時間打平為時間輪tickMs的整數(shù)倍currentTime = timeMs - (timeMs % tickMs)// Try to advance the clock of the overflow wheel if present//驅(qū)動上層時間輪,這里的傳給上層的currentTime時間是本層時間輪打平過的,但是在上層時間輪還是會繼續(xù)打平if (overflowWheel != null) overflowWheel.advanceClock(currentTime)} } 復(fù)制

這里是驅(qū)動源代碼:

//循環(huán)bucket里面的任務(wù)列表,一個個重新添加進時間輪,對符合條件的時間輪進行升降級或者執(zhí)行任務(wù) private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) => addTimerTaskEntry(timerTaskEntry)/** Advances the clock if there is an expired bucket. If there isn't any expired bucket when called,* waits up to timeoutMs before giving up.*/ def advanceClock(timeoutMs: Long): Boolean = {var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)if (bucket != null) {writeLock.lock()try {while (bucket != null) {//驅(qū)動時間輪timingWheel.advanceClock(bucket.getExpiration())//循環(huán)buckek也就是任務(wù)列表,任務(wù)列表一個個繼續(xù)添加進時間輪以此來升級或者降級時間輪,把過期任務(wù)找出來執(zhí)行bucket.flush(reinsert)//循環(huán)//這里就是從延遲隊列取出bucket,bucket是有延遲時間的,取出代表該bucket過期,我們通過bucket能取到bucket包含的任務(wù)列表bucket = delayQueue.poll()}} finally {writeLock.unlock()}true} else {false} } 復(fù)制

總結(jié)

kafka的延遲隊列使用時間輪實現(xiàn),能夠支持大量任務(wù)的高效觸發(fā),但是在kafka延遲隊列實現(xiàn)方案里還是看到了delayQueue的影子,使用delayQueue是對時間輪里面的bucket放入延遲隊列,以此來推動時間輪滾動,但是基于將插入和刪除操作則放入時間輪中,將這些操作的時間復(fù)雜度都降為O(1),提升效率。Kafka對性能的極致追求讓它把最合適的組件放在最適合的位置。

來源:https://www.tuicool.com/articles/fe2quav

總結(jié)

以上是生活随笔為你收集整理的跟Kafka学技术系列之时间轮的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。