日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 >

RocketMQ:消息ACK机制源码解析

發(fā)布時(shí)間:2025/3/21 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RocketMQ:消息ACK机制源码解析 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

消息消費(fèi)進(jìn)度

概述

消費(fèi)者消費(fèi)消息過程中,為了避免消息的重復(fù)消費(fèi),應(yīng)將消息消費(fèi)進(jìn)度保存起來,當(dāng)其他消費(fèi)者再對消息進(jìn)行消費(fèi)時(shí),讀取已消費(fèi)的消息偏移量,對之后的消息進(jìn)行消費(fèi)即可。

消息模式分為兩種:

  • 集群模式:一條消息只能被一個(gè)消費(fèi)者消費(fèi)
  • 廣播模式:一條消息被所有消費(fèi)者都消費(fèi)一次

廣播模式下,消息被所有消費(fèi)者消費(fèi),因此消息消費(fèi)的進(jìn)度可以跟消費(fèi)端保存在一起,即本地保存。

集群模式下,消息只能被集群內(nèi)的一個(gè)消費(fèi)者消費(fèi),進(jìn)度不能保存在消費(fèi)端,否則會(huì)導(dǎo)致消息重復(fù)消費(fèi),因此集群模式下消息進(jìn)度集中保存在Broker中。

消息進(jìn)度存儲(chǔ)接口

OffsetStore

public interface OffsetStore {//加載消息消費(fèi)進(jìn)度void load() throws MQClientException;//更新消費(fèi)進(jìn)度并保存在內(nèi)存中void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly);//從本地存儲(chǔ)中獲取消息消費(fèi)進(jìn)度long readOffset(final MessageQueue mq, final ReadOffsetType type);//保存所有消息消費(fèi)進(jìn)度-本地/遠(yuǎn)程void persistAll(final Set<MessageQueue> mqs);void persist(final MessageQueue mq);//移除偏移量void removeOffset(MessageQueue mq);//根據(jù)Topic克隆一份消息隊(duì)列消費(fèi)進(jìn)度緩存表Map<MessageQueue, Long> cloneOffsetTable(String topic);//更新消費(fèi)進(jìn)度到Brokervoid updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,MQBrokerException, InterruptedException, MQClientException; }

DefaultMQPushConsumerImpl#start

switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING://廣播模式下 將消息消費(fèi)進(jìn)度存儲(chǔ)到本地this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;case CLUSTERING://集群模式下 將消息消費(fèi)的進(jìn)度存儲(chǔ)到遠(yuǎn)端Broker中this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;default:break; } this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);

如上所示,根據(jù)消息消費(fèi)模式的不同,會(huì)創(chuàng)建不同的OffsetStore對象。


廣播模式消費(fèi)進(jìn)度存儲(chǔ)(LocalFileOffsetStore)

public class LocalFileOffsetStore implements OffsetStore {//存儲(chǔ)目錄//消費(fèi)者啟動(dòng)時(shí)-可以通過"-D rocketmq.client.localOffsetStoreDir=路徑"來指定public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty("rocketmq.client.localOffsetStoreDir",System.getProperty("user.home") + File.separator + ".rocketmq_offsets");private final static InternalLogger log = ClientLogger.getLog();//MQ客戶端private final MQClientInstance mQClientFactory;//消費(fèi)組名private final String groupName;//存儲(chǔ)路徑private final String storePath;//以MessageQueue為鍵-消費(fèi)偏移量為值的緩存表private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =new ConcurrentHashMap<MessageQueue, AtomicLong>(); }

構(gòu)造函數(shù)

public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) {this.mQClientFactory = mQClientFactory;this.groupName = groupName;//json格式存儲(chǔ)this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator +this.mQClientFactory.getClientId() + File.separator +this.groupName + File.separator +"offsets.json"; }

LocalFileOffsetStore#load

public void load() throws MQClientException {//從本地磁盤中進(jìn)行讀取json文件-并進(jìn)行序列化封裝轉(zhuǎn)化為mapOffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {//存入緩存表offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);log.info("load consumer's offset, {} {} {}",this.groupName,mq,offset.get());}} }public class OffsetSerializeWrapper extends RemotingSerializable {private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =new ConcurrentHashMap<MessageQueue, AtomicLong>(); }

LocalFileOffsetStore#persistAll

public void persistAll(Set<MessageQueue> mqs) {if (null == mqs || mqs.isEmpty()) {return;}OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {if (mqs.contains(entry.getKey())) {AtomicLong offset = entry.getValue();//填充<消息隊(duì)列-消費(fèi)偏移量>緩存表offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);}}//轉(zhuǎn)為json格式String jsonString = offsetSerializeWrapper.toJson(true);if (jsonString != null) {try {//jsonString->file->保存到storePathMixAll.string2File(jsonString, this.storePath);} catch (IOException e) {log.error("persistAll consumer offset Exception, " + this.storePath, e);}} }

persistAll()的入口是MQClientInstance#startScheduledTask

MQClientInstance#startScheduledTask

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.persistAllConsumerOffset();} catch (Exception e) {log.error("ScheduledTask persistAllConsumerOffset exception", e);}} }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

消費(fèi)端啟動(dòng)后延遲10s開啟該定時(shí)任務(wù),每隔5s進(jìn)行一次持久化。

MQClientInstance#persistAllConsumerOffset

Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) {Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();impl.persistConsumerOffset(); }

DefaultMQPushConsumerImpl#persistConsumerOffset

try {this.makeSureStateOK();Set<MessageQueue> mqs = new HashSet<MessageQueue>();//獲取重負(fù)載分配好的消息隊(duì)列Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();mqs.addAll(allocateMq);//當(dāng)前是LocalFileOffsetStore.persistAllthis.offsetStore.persistAll(mqs); } catch (Exception e) {log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e); }

集群模式消費(fèi)進(jìn)度存儲(chǔ)(RemoteBrokerOffsetStore)

RemoteBrokerOffsetStore

private final static InternalLogger log = ClientLogger.getLog(); //MQ客戶端實(shí)例-該實(shí)例被同一個(gè)JVM下的消費(fèi)者和生產(chǎn)者共用 private final MQClientInstance mQClientFactory; //消費(fèi)組名 private final String groupName; //以消息隊(duì)列為鍵-消費(fèi)偏移量為值的緩存表 private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =new ConcurrentHashMap<MessageQueue, AtomicLong>();

RemoteBrokerOffsetStore#persistAll

for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {MessageQueue mq = entry.getKey();AtomicLong offset = entry.getValue();if (offset != null) {if (mqs.contains(mq)) {try {//更新消費(fèi)偏移量到Brokerthis.updateConsumeOffsetToBroker(mq, offset.get());log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",this.groupName,this.mQClientFactory.getClientId(),mq,offset.get());} catch (Exception e) {log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);}} } }

RemoteBrokerOffsetStore#updateConsumeOffsetToBroker

同步更新消息消費(fèi)偏移量,如Master關(guān)閉,則更新到Slave。

//從MQ客戶端中根據(jù)BrokerName獲取消息隊(duì)列對應(yīng)的Broker FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); if (null == findBrokerResult) {//根據(jù)Topic從NameServer更新路由信息this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());//重新查找findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); }if (findBrokerResult != null) {//封裝消息消費(fèi)隊(duì)列更新請求頭UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();requestHeader.setTopic(mq.getTopic()); //主題信息requestHeader.setConsumerGroup(this.groupName); //消費(fèi)者組requestHeader.setQueueId(mq.getQueueId()); //隊(duì)列IDrequestHeader.setCommitOffset(offset); //消費(fèi)偏移量if (isOneway) {//Oneway->根據(jù)Broker地址->發(fā)送請求將消費(fèi)偏移量保存到Broker-超時(shí)時(shí)間默認(rèn)為5sthis.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);} else {this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);} } else {throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); }

RemoteBrokerOffsetStore#updateOffset

if (mq != null) {//從緩存中獲取消息隊(duì)列對應(yīng)的偏移量AtomicLong offsetOld = this.offsetTable.get(mq);//為空if (null == offsetOld) {//將存入的offset存入內(nèi)存中offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));}//不為空->根據(jù)increaseOnly更新原先的offsetOldif (null != offsetOld) {if (increaseOnly) {MixAll.compareAndIncreaseOnly(offsetOld, offset);} else {offsetOld.set(offset);}} }

RemoteBrokerOffsetStore#readOffset

public long readOffset(final MessageQueue mq, //消息隊(duì)列final ReadOffsetType type) { //讀取偏移量類型if (mq != null) {switch (type) {case MEMORY_FIRST_THEN_STORE://從內(nèi)存中讀取 case READ_FROM_MEMORY: {AtomicLong offset = this.offsetTable.get(mq);if (offset != null) {return offset.get();} else if (ReadOffsetType.READ_FROM_MEMORY == type) {return -1;}}//從Broker中讀取 case READ_FROM_STORE: {try {//從Broker中獲取消費(fèi)偏移量long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);AtomicLong offset = new AtomicLong(brokerOffset);//更新至內(nèi)存中(map)this.updateOffset(mq, offset.get(), false);return brokerOffset;}// No offset in brokercatch (MQBrokerException e) {return -1;}//Other exceptionscatch (Exception e) {log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);return -2;}}default:break;}}

RemoteBrokerOffsetStore#fetchConsumeOffsetFromBroker

//從MQ客戶端實(shí)例中獲取Boker信息 FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); if (null == findBrokerResult) {//從NameServer中更新Topic的路由信息this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());//重新獲取BrokerfindBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); }if (findBrokerResult != null) {//封裝查詢消費(fèi)進(jìn)度請求頭QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();requestHeader.setTopic(mq.getTopic());requestHeader.setConsumerGroup(this.groupName);requestHeader.setQueueId(mq.getQueueId());//帶上請求頭調(diào)用MQClientAPI到Broker中獲取return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); } else {throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); }

本文僅作為個(gè)人學(xué)習(xí)使用,如有不足或錯(cuò)誤請指正!

總結(jié)

以上是生活随笔為你收集整理的RocketMQ:消息ACK机制源码解析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。