日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka 延迟队列

發(fā)布時間:2023/12/20 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka 延迟队列 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

最近在看Kafka延遲隊列的實現(xiàn)方式,發(fā)現(xiàn)大部分講的都很片面,都是時間輪相關(guān)的東西,搞得一知半解的,最終根據(jù)自己的理解,設計了一套延遲隊列,和大家一起討論一下,服務流程如下

?如圖所示,所有的消息進來之后,都會被分配到delay隊列中,然后delay隊列消費消息滿足時間要求后再發(fā)送到業(yè)務隊列中,這樣做的目的是避免消息阻塞,如果我們沒有delay隊列,所有消息都在業(yè)務隊列中,那必然會產(chǎn)生一定的堆積,因為這個隊列本身要做的事情太多,delay隊列就是為了分擔他的壓力

這里說下為什么有三個delay隊列,我這里其實是想根據(jù)業(yè)務劃分優(yōu)先級,也是為了可以減少消息的延遲,將數(shù)據(jù)做了歸類,比如延遲1分鐘左右的數(shù)據(jù),放在高優(yōu)先級的隊列中, 10分鐘延遲的放在中優(yōu)先級,2小時延遲的放在低優(yōu)先級

具體的操作方式:每個delay隊列中的消息,不僅要存當前消息的內(nèi)容,還要存下一個要消費的消息位置(offet),這樣就可以避免我們以 O(n)的復雜度去遍歷隊列,檢查要執(zhí)行的隊列數(shù)據(jù),另外考慮到有新增數(shù)據(jù)插隊的情況,需要在緩存中也維護一份當前最優(yōu)先的offet值,方便我們做插隊處理

kafka中可以修改offet值,通過seek() 函數(shù)即可

這里還要考慮一個問題,就是當最優(yōu)先的隊列數(shù)據(jù)還有1小時才要執(zhí)行,那我們怎么處理,是sleep嗎?如果sleep太久的話,當程序代碼不能在max.poll.interval.ms配置的期望時間內(nèi)處理這些消息的話,kafka就會認為這個消費者已經(jīng)掛了,會進行rebalance,同時你這個消費者就無法再拉取到任何消息了,Kafka本身提供很優(yōu)雅的解決方案,pause() 方法可以暫定消費,resume() 方法可以恢復消費,這樣就不會出現(xiàn)異常了

總結(jié)

以上是生活随笔為你收集整理的kafka 延迟队列的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。