kafka 延时消息处理
? ? 你一定遇到過(guò)這種情況,接收到消息時(shí)并不符合馬上處理的條件(例如頻率限制),但是又不能丟掉,于是先存起來(lái),過(guò)一陣子再來(lái)處理。系統(tǒng)應(yīng)該怎么設(shè)計(jì)呢?可能你會(huì)想到數(shù)據(jù)庫(kù),用一個(gè)字段來(lái)標(biāo)記執(zhí)行的狀態(tài),或者設(shè)置一個(gè)等待的時(shí)間戳,不管是哪種都需要反復(fù)地從數(shù)據(jù)庫(kù)存取,還要考慮出異常情況狀態(tài)的維護(hù)。
? ? 作為一款優(yōu)秀的消息處理服務(wù),kafka 具有完善的事務(wù)管理,狀態(tài)管理和災(zāi)難恢復(fù)功能。只要我們稍加變通一下,kafka 也能作為延遲消息處理的解決方案,而且實(shí)現(xiàn)上比用數(shù)據(jù)庫(kù)簡(jiǎn)單得多。
? ? 以下代碼均在 spring-boot 2.0.5 和 spring-kafka 2.1.10 中測(cè)試通過(guò)。建議事先閱讀文檔?https://docs.spring.io/spring-kafka/docs/2.5.4.RELEASE/reference/html/#receiving-messages?以便能很好地理解以下內(nèi)容。
設(shè)計(jì)思路
設(shè)計(jì) 2 個(gè)隊(duì)列(topic),一個(gè)收到消息馬上執(zhí)行,另一個(gè)用來(lái)接收需延遲處理的消息。話句話說(shuō),接收延遲消息的隊(duì)列直到消息可執(zhí)行之前一直在 block 狀態(tài),所以有局限性,定時(shí)不能非常精確,并且任務(wù)執(zhí)行次序與加進(jìn)來(lái)的次序是一致的。spring-boot 的配置
application.yml ————————————————————spring:## kafkakafka:bootstrap-servers: 127.0.0.1:9092consumer:group-id: myGroupauto-offset-reset: earliestenable-auto-commit: falseproperties:max:poll:interval:# 設(shè)置時(shí)間必須比延遲處理的時(shí)間大,不然會(huì)報(bào)錯(cuò)ms: 1200000listener:# 把提交模式改為手動(dòng)ack-mode: MANUAL kafka 默認(rèn)的消費(fèi)模式是自動(dòng)提交,意思是,當(dāng)?MessageListener 收到消息,執(zhí)行處理方法后自動(dòng)提交已完成狀態(tài),該消息就從隊(duì)列里移除了。配置 ack-mode: MANUAL 改為手動(dòng)提交后,我們就可以根據(jù)需要保留數(shù)據(jù)在消息隊(duì)列,以便以后再處理。 max.poll.interval.ms 設(shè)小了可能會(huì)收到下面的錯(cuò)誤: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. 請(qǐng)務(wù)必設(shè)置一個(gè)比等待執(zhí)行時(shí)間更長(zhǎng)的時(shí)間。發(fā)送消息
@Autowired private KafkaTemplate kafkaTemplate;public void myAction(){// 定義 data// 任務(wù)推送到 KafkakafkaTemplate.send(“myJob", data.toString()); }該部分沒(méi)有特別的地方,跟普通的消息消息發(fā)送一樣。
接收消息
定義兩個(gè) topic:myJob 和 myJob-delay @SpringBootApplication @ServletComponentScan public class Application {@KafkaListener(topics = “myJob”)@SendTo(“myJob-delay")public String onMessage(ConsumerRecord<?, ?> cr, Acknowledgment ack) {String json = (String) cr.value();JSONObject data = JSON.parseObject(json);if (/* 需要延遲處理 */){// 提交ack.acknowledge();// 發(fā)送到 @SendTodata.put("until", System.currentTimeMillis() + msToDelay);return data.toString();}// 正常處理// do real work// 提交ack.acknowledge();return null;}@KafkaListener(topics = “myJob-delay")@SendTo(“myJob")public String delayMessage(ConsumerRecord<?, ?> cr, Acknowledgment ack){String json = (String) cr.value();JSONObject data = JSON.parseObject(json);Long until = data.getLong("until");// 阻塞直到 untilwhile (System.currentTimeMillis() < until){Thread.sleep( Math.max(0, until - System.currentTimeMillis()) );}// 提交ack.acknowledge();// 轉(zhuǎn)移到 @SendToreturn json;} }代碼很簡(jiǎn)單,不用解釋也能看明白。稍微提一下幾個(gè)重要的地方。
@KafkaListener 的方法參數(shù)里有?Acknowledgment ack,這是AckMode.MANUAL 模式下必須要添加的參數(shù)。
ack.acknowledge() 用來(lái)標(biāo)記一條消息已經(jīng)消費(fèi)完成,即將從消息隊(duì)列里移除。執(zhí)行之前消息會(huì)一直保留在隊(duì)列中,即時(shí)宕機(jī)重啟后也能恢復(fù)。
@SendTo 用來(lái)在隊(duì)列(topic)間轉(zhuǎn)移消息,只要 return 非 null 的數(shù)據(jù)。以上代碼中,當(dāng)需要延遲處理時(shí),消息從 myJob 轉(zhuǎn)移到 myJob-delay;而當(dāng)條件滿足時(shí),消息又從 myJob-delay 轉(zhuǎn)移到了 myJob。
自從 spring-kafka 2.2.4 版本之后,可以在方法上定義?max.poll.interval.ms ,更加靈活了。例如
@KafkaListener(topics = "myTopic", groupId = "group", properties = { "max.poll.interval.ms:60000”, ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100”} )? ? 以上是延遲消息處理的簡(jiǎn)單實(shí)現(xiàn),適合延時(shí)要求不那么高的場(chǎng)合。朋友們想一下,假如延時(shí)比較復(fù)雜,執(zhí)行的次序也不一定跟消息到達(dá)的次序一致,系統(tǒng)又該怎樣設(shè)計(jì)呢?
假如這篇文章對(duì)你有所幫助, 請(qǐng)關(guān)注我公眾號(hào), 發(fā)現(xiàn)更多有用的文章
?
總結(jié)
以上是生活随笔為你收集整理的kafka 延时消息处理的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 【Linux】僵尸进程(Z状态)和孤儿进
- 下一篇: 《魔兽世界》中的小背景