RocketMq Consumer 最佳实践
RocketMq Consumer 最佳實(shí)踐
翻譯自RocketMQ官方文檔
Consumer Group and Subscriptions 消費(fèi)集群和訂閱
第一件你應(yīng)該關(guān)心的是不同的消費(fèi)集群能獨(dú)立的消費(fèi)相同的topic,每一個(gè)都有自己的消費(fèi)偏移量,需要確保在相同群組的consumer訂閱了相同的的topic。
MessageListener
-
Orderly
為了確保消費(fèi)時(shí)按照順序,這種consumer會(huì)鎖住每一個(gè)MessageQueue。這會(huì)造成性能上的丟失,但是也是很有用的當(dāng)你對(duì)順序消息比較關(guān)心。不建議拋出異常,你可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT來(lái)代替拋出異常。 -
Concurrently
這種consumer會(huì)并發(fā)的消費(fèi),為了高性能時(shí)可以使用這種。不建議拋出異常,你可以返回ConsumeConcurrentlyStatus.RECONSUME_LATER來(lái)代替拋出異常。 -
Consume Status
對(duì)于MessageListenerConcurrently,你可以返回RECONSUME_LATER來(lái)告訴consumer你現(xiàn)在沒有正確消費(fèi)并且希望重新消費(fèi)。你可以繼續(xù)消費(fèi)其他消息。
對(duì)于MessageListenerOrderly,因?yàn)槟汴P(guān)心順序,你不能跳到下一條消息,但是你可以返回SUSPEND_CURRENT_QUEUE_A_MOMENT告訴consumer等一會(huì)。 -
Blocking
建議不要阻塞Listener,因?yàn)檫@會(huì)鎖住線程池,并且通常可能會(huì)導(dǎo)致消費(fèi)進(jìn)程停下。
Thread Number
consumer使用一個(gè) ThreadPoolExecutor 來(lái)執(zhí)行內(nèi)部消費(fèi),你可以改變這個(gè)ThreadPoolExecutor通過(guò)兩個(gè)方法setConsumeThreadMin or setConsumeThreadMax.
ConsumeFromWhere
當(dāng)創(chuàng)建一個(gè)新的consumer集群,需要決定是否需要消費(fèi)歷史仍在Broker存在的消息。CONSUME_FROM_LAST_OFFSET將忽略所有的歷史消息,并消費(fèi)從那之后的消息。CONSUME_FROM_FIRST_OFFSET會(huì)消費(fèi)所有存在于Broker的Message,你還可以選擇使用CONSUME_FROM_TIMESTAMP來(lái)選擇一個(gè)時(shí)間戳去決定消費(fèi)從哪個(gè)時(shí)間戳之后的消息。
Duplication
很多情況可能會(huì)導(dǎo)致重復(fù),比如
- Producer重復(fù)發(fā)送(在FLUSH_SLAVE_TIMEOUT的場(chǎng)景)
- Consumer 關(guān)閉沒有及時(shí)更新Broker。
需要在消費(fèi)端做去重,比如在唯一主鍵。
總結(jié)
以上是生活随笔為你收集整理的RocketMq Consumer 最佳实践的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RocketMq Producer最佳实
- 下一篇: RocketMq namesvr 最佳实