RocketMQ:消息ACK机制源码解析
消息消費(fèi)進(jìn)度
概述
消費(fèi)者消費(fèi)消息過程中,為了避免消息的重復(fù)消費(fèi),應(yīng)將消息消費(fèi)進(jìn)度保存起來,當(dāng)其他消費(fèi)者再對消息進(jìn)行消費(fèi)時(shí),讀取已消費(fèi)的消息偏移量,對之后的消息進(jìn)行消費(fèi)即可。
消息模式分為兩種:
- 集群模式:一條消息只能被一個(gè)消費(fèi)者消費(fèi)
- 廣播模式:一條消息被所有消費(fèi)者都消費(fèi)一次
廣播模式下,消息被所有消費(fèi)者消費(fèi),因此消息消費(fèi)的進(jìn)度可以跟消費(fèi)端保存在一起,即本地保存。
集群模式下,消息只能被集群內(nèi)的一個(gè)消費(fèi)者消費(fèi),進(jìn)度不能保存在消費(fèi)端,否則會(huì)導(dǎo)致消息重復(fù)消費(fèi),因此集群模式下消息進(jìn)度集中保存在Broker中。
消息進(jìn)度存儲(chǔ)接口
OffsetStore
public interface OffsetStore {//加載消息消費(fèi)進(jìn)度void load() throws MQClientException;//更新消費(fèi)進(jìn)度并保存在內(nèi)存中void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly);//從本地存儲(chǔ)中獲取消息消費(fèi)進(jìn)度long readOffset(final MessageQueue mq, final ReadOffsetType type);//保存所有消息消費(fèi)進(jìn)度-本地/遠(yuǎn)程void persistAll(final Set<MessageQueue> mqs);void persist(final MessageQueue mq);//移除偏移量void removeOffset(MessageQueue mq);//根據(jù)Topic克隆一份消息隊(duì)列消費(fèi)進(jìn)度緩存表Map<MessageQueue, Long> cloneOffsetTable(String topic);//更新消費(fèi)進(jìn)度到Brokervoid updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,MQBrokerException, InterruptedException, MQClientException; }DefaultMQPushConsumerImpl#start
switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING://廣播模式下 將消息消費(fèi)進(jìn)度存儲(chǔ)到本地this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;case CLUSTERING://集群模式下 將消息消費(fèi)的進(jìn)度存儲(chǔ)到遠(yuǎn)端Broker中this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;default:break; } this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);如上所示,根據(jù)消息消費(fèi)模式的不同,會(huì)創(chuàng)建不同的OffsetStore對象。
廣播模式消費(fèi)進(jìn)度存儲(chǔ)(LocalFileOffsetStore)
public class LocalFileOffsetStore implements OffsetStore {//存儲(chǔ)目錄//消費(fèi)者啟動(dòng)時(shí)-可以通過"-D rocketmq.client.localOffsetStoreDir=路徑"來指定public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty("rocketmq.client.localOffsetStoreDir",System.getProperty("user.home") + File.separator + ".rocketmq_offsets");private final static InternalLogger log = ClientLogger.getLog();//MQ客戶端private final MQClientInstance mQClientFactory;//消費(fèi)組名private final String groupName;//存儲(chǔ)路徑private final String storePath;//以MessageQueue為鍵-消費(fèi)偏移量為值的緩存表private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =new ConcurrentHashMap<MessageQueue, AtomicLong>(); }構(gòu)造函數(shù)
public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) {this.mQClientFactory = mQClientFactory;this.groupName = groupName;//json格式存儲(chǔ)this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator +this.mQClientFactory.getClientId() + File.separator +this.groupName + File.separator +"offsets.json"; }LocalFileOffsetStore#load
public void load() throws MQClientException {//從本地磁盤中進(jìn)行讀取json文件-并進(jìn)行序列化封裝轉(zhuǎn)化為mapOffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {//存入緩存表offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);log.info("load consumer's offset, {} {} {}",this.groupName,mq,offset.get());}} }public class OffsetSerializeWrapper extends RemotingSerializable {private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =new ConcurrentHashMap<MessageQueue, AtomicLong>(); }LocalFileOffsetStore#persistAll
public void persistAll(Set<MessageQueue> mqs) {if (null == mqs || mqs.isEmpty()) {return;}OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {if (mqs.contains(entry.getKey())) {AtomicLong offset = entry.getValue();//填充<消息隊(duì)列-消費(fèi)偏移量>緩存表offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);}}//轉(zhuǎn)為json格式String jsonString = offsetSerializeWrapper.toJson(true);if (jsonString != null) {try {//jsonString->file->保存到storePathMixAll.string2File(jsonString, this.storePath);} catch (IOException e) {log.error("persistAll consumer offset Exception, " + this.storePath, e);}} }persistAll()的入口是MQClientInstance#startScheduledTask
MQClientInstance#startScheduledTask
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.persistAllConsumerOffset();} catch (Exception e) {log.error("ScheduledTask persistAllConsumerOffset exception", e);}} }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);消費(fèi)端啟動(dòng)后延遲10s開啟該定時(shí)任務(wù),每隔5s進(jìn)行一次持久化。
MQClientInstance#persistAllConsumerOffset
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) {Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();impl.persistConsumerOffset(); }DefaultMQPushConsumerImpl#persistConsumerOffset
try {this.makeSureStateOK();Set<MessageQueue> mqs = new HashSet<MessageQueue>();//獲取重負(fù)載分配好的消息隊(duì)列Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();mqs.addAll(allocateMq);//當(dāng)前是LocalFileOffsetStore.persistAllthis.offsetStore.persistAll(mqs); } catch (Exception e) {log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e); }集群模式消費(fèi)進(jìn)度存儲(chǔ)(RemoteBrokerOffsetStore)
RemoteBrokerOffsetStore
private final static InternalLogger log = ClientLogger.getLog(); //MQ客戶端實(shí)例-該實(shí)例被同一個(gè)JVM下的消費(fèi)者和生產(chǎn)者共用 private final MQClientInstance mQClientFactory; //消費(fèi)組名 private final String groupName; //以消息隊(duì)列為鍵-消費(fèi)偏移量為值的緩存表 private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =new ConcurrentHashMap<MessageQueue, AtomicLong>();RemoteBrokerOffsetStore#persistAll
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {MessageQueue mq = entry.getKey();AtomicLong offset = entry.getValue();if (offset != null) {if (mqs.contains(mq)) {try {//更新消費(fèi)偏移量到Brokerthis.updateConsumeOffsetToBroker(mq, offset.get());log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",this.groupName,this.mQClientFactory.getClientId(),mq,offset.get());} catch (Exception e) {log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);}} } }RemoteBrokerOffsetStore#updateConsumeOffsetToBroker
同步更新消息消費(fèi)偏移量,如Master關(guān)閉,則更新到Slave。
//從MQ客戶端中根據(jù)BrokerName獲取消息隊(duì)列對應(yīng)的Broker FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); if (null == findBrokerResult) {//根據(jù)Topic從NameServer更新路由信息this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());//重新查找findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); }if (findBrokerResult != null) {//封裝消息消費(fèi)隊(duì)列更新請求頭UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();requestHeader.setTopic(mq.getTopic()); //主題信息requestHeader.setConsumerGroup(this.groupName); //消費(fèi)者組requestHeader.setQueueId(mq.getQueueId()); //隊(duì)列IDrequestHeader.setCommitOffset(offset); //消費(fèi)偏移量if (isOneway) {//Oneway->根據(jù)Broker地址->發(fā)送請求將消費(fèi)偏移量保存到Broker-超時(shí)時(shí)間默認(rèn)為5sthis.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);} else {this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);} } else {throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); }RemoteBrokerOffsetStore#updateOffset
if (mq != null) {//從緩存中獲取消息隊(duì)列對應(yīng)的偏移量AtomicLong offsetOld = this.offsetTable.get(mq);//為空if (null == offsetOld) {//將存入的offset存入內(nèi)存中offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));}//不為空->根據(jù)increaseOnly更新原先的offsetOldif (null != offsetOld) {if (increaseOnly) {MixAll.compareAndIncreaseOnly(offsetOld, offset);} else {offsetOld.set(offset);}} }RemoteBrokerOffsetStore#readOffset
public long readOffset(final MessageQueue mq, //消息隊(duì)列final ReadOffsetType type) { //讀取偏移量類型if (mq != null) {switch (type) {case MEMORY_FIRST_THEN_STORE://從內(nèi)存中讀取 case READ_FROM_MEMORY: {AtomicLong offset = this.offsetTable.get(mq);if (offset != null) {return offset.get();} else if (ReadOffsetType.READ_FROM_MEMORY == type) {return -1;}}//從Broker中讀取 case READ_FROM_STORE: {try {//從Broker中獲取消費(fèi)偏移量long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);AtomicLong offset = new AtomicLong(brokerOffset);//更新至內(nèi)存中(map)this.updateOffset(mq, offset.get(), false);return brokerOffset;}// No offset in brokercatch (MQBrokerException e) {return -1;}//Other exceptionscatch (Exception e) {log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);return -2;}}default:break;}}RemoteBrokerOffsetStore#fetchConsumeOffsetFromBroker
//從MQ客戶端實(shí)例中獲取Boker信息 FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); if (null == findBrokerResult) {//從NameServer中更新Topic的路由信息this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());//重新獲取BrokerfindBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); }if (findBrokerResult != null) {//封裝查詢消費(fèi)進(jìn)度請求頭QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();requestHeader.setTopic(mq.getTopic());requestHeader.setConsumerGroup(this.groupName);requestHeader.setQueueId(mq.getQueueId());//帶上請求頭調(diào)用MQClientAPI到Broker中獲取return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); } else {throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); }本文僅作為個(gè)人學(xué)習(xí)使用,如有不足或錯(cuò)誤請指正!
總結(jié)
以上是生活随笔為你收集整理的RocketMQ:消息ACK机制源码解析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RocketMQ:消费端的消息消息队列负
- 下一篇: RocketMQ:消息存储机制详解与源码