日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) >

RocketMQ:消息存储机制详解与源码解析

發(fā)布時(shí)間:2025/3/21 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RocketMQ:消息存储机制详解与源码解析 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

文章目錄

    • 消息存儲(chǔ)機(jī)制
      • 1.前言
      • 2.核心存儲(chǔ)類(lèi):DefaultMessageStore
      • 3.消息存儲(chǔ)流程
      • 4.消息存儲(chǔ)文件
      • 5.存儲(chǔ)文件內(nèi)存映射
        • 5.1.MapperFileQueue
        • 5.2.MappedFile
            • 5.2.1.commit
            • 5.2.2.flush
        • 5.3.TransientStorePool
      • 6.刷盤(pán)機(jī)制
          • 6.1.同步刷盤(pán)
          • 6.2.異步刷盤(pán)

消息存儲(chǔ)機(jī)制

1.前言

本文主要講解內(nèi)容是Broker接收到消息生產(chǎn)者發(fā)送的消息之后,如何將消息持久化存儲(chǔ)在Broker中。

2.核心存儲(chǔ)類(lèi):DefaultMessageStore

private final MessageStoreConfig messageStoreConfig; //消息配置屬性 private final CommitLog commitLog; //CommitLog文件存儲(chǔ)的實(shí)現(xiàn)類(lèi)->消息存儲(chǔ)在commitLog中 private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable; //消息隊(duì)列存儲(chǔ)緩存表,按照消息主題分組 private final FlushConsumeQueueService flushConsumeQueueService; //消息隊(duì)列文件刷盤(pán)服務(wù)線程 private final CleanCommitLogService cleanCommitLogService; //過(guò)期CommitLog文件刪除服務(wù) private final CleanConsumeQueueService cleanConsumeQueueService; //過(guò)期ConsumerQueue隊(duì)列文件刪除服務(wù) private final IndexService indexService; //索引服務(wù) private final AllocateMappedFileService allocateMappedFileService; //MappedFile分配服務(wù)->內(nèi)存映射處理commitLog、consumerQueue文件 private final ReputMessageService reputMessageService;//CommitLog消息分發(fā),根據(jù)CommitLog文件構(gòu)建ConsumerQueue、IndexFile文件 private final HAService haService; //消息主從同步實(shí)現(xiàn)服務(wù) private final ScheduleMessageService scheduleMessageService; //消息服務(wù)調(diào)度服務(wù) private final StoreStatsService storeStatsService; //消息存儲(chǔ)服務(wù) private final MessageArrivingListener messageArrivingListener; //消息到達(dá)監(jiān)聽(tīng)器 private final TransientStorePool transientStorePool; //消息堆外內(nèi)存緩存 private final BrokerStatsManager brokerStatsManager; //Broker狀態(tài)管理器 private final MessageArrivingListener messageArrivingListener; //消息拉取長(zhǎng)輪詢模式消息達(dá)到監(jiān)聽(tīng)器 private final BrokerConfig brokerConfig; //Broker配置類(lèi) private StoreCheckpoint storeCheckpoint; //文件刷盤(pán)監(jiān)測(cè)點(diǎn) private final LinkedList<CommitLogDispatcher> dispatcherList; //CommitLog文件轉(zhuǎn)發(fā)請(qǐng)求

以上屬性是消息存儲(chǔ)的核心,需要重點(diǎn)關(guān)注每個(gè)屬性的具體作用。

3.消息存儲(chǔ)流程

消息存儲(chǔ)時(shí)序圖如下:

消息存儲(chǔ)入口:DefaultMessageStore#putMessage

//檢查Broker是否是Slave || 判斷當(dāng)前寫(xiě)入狀態(tài)如果是正在寫(xiě)入,則不能繼續(xù) PutMessageStatus checkStoreStatus = this.checkStoreStatus(); if (checkStoreStatus != PutMessageStatus.PUT_OK) {return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null)); }//檢查消息主題和消息體長(zhǎng)度是否合法 PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch); if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null)); } //記錄開(kāi)始寫(xiě)入時(shí)間 long beginTime = this.getSystemClock().now(); //寫(xiě)入消息 CompletableFuture<PutMessageResult> resultFuture = this.commitLog.asyncPutMessages(messageExtBatch);resultFuture.thenAccept((result) -> {long elapsedTime = this.getSystemClock().now() - beginTime;if (elapsedTime > 500) {log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length);}//記錄相關(guān)統(tǒng)計(jì)信息this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);//存儲(chǔ)失敗if (null == result || !result.isOk()) {//存儲(chǔ)狀態(tài)服務(wù)->消息存儲(chǔ)失敗次數(shù)自增this.storeStatsService.getPutMessageFailedTimes().add(1);} });return resultFuture;

DefaultMessageStore#checkStoreStatus

//存儲(chǔ)服務(wù)已停止 if (this.shutdown) {log.warn("message store has shutdown, so putMessage is forbidden");return PutMessageStatus.SERVICE_NOT_AVAILABLE; } //Broker為Slave->不可寫(xiě)入 if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {long value = this.printTimes.getAndIncrement();if ((value % 50000) == 0) {log.warn("broke role is slave, so putMessage is forbidden");}return PutMessageStatus.SERVICE_NOT_AVAILABLE; }//不可寫(xiě)入->broker磁盤(pán)已滿/寫(xiě)入邏輯隊(duì)列錯(cuò)誤/寫(xiě)入索引文件錯(cuò)誤 if (!this.runningFlags.isWriteable()) {long value = this.printTimes.getAndIncrement();if ((value % 50000) == 0) {log.warn("the message store is not writable. It may be caused by one of the following reasons: " +"the broker's disk is full, write to logic queue error, write to index file error, etc");}return PutMessageStatus.SERVICE_NOT_AVAILABLE; } else {this.printTimes.set(0); } //操作系統(tǒng)頁(yè)寫(xiě)入是否繁忙 if (this.isOSPageCacheBusy()) {return PutMessageStatus.OS_PAGECACHE_BUSY; } return PutMessageStatus.PUT_OK;

CommitLog#asyncPutMessages

//記錄消息存儲(chǔ)時(shí)間 messageExtBatch.setStoreTimestamp(System.currentTimeMillis()); AppendMessageResult result;StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());//消息類(lèi)型是否合法 if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null)); }//....//獲取上一個(gè)MapperFile對(duì)象->內(nèi)存映射的具體實(shí)現(xiàn) MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();//追加消息需要加鎖->串行化處理 putMessageLock.lock(); try {long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();this.beginTimeInLock = beginLockTimestamp;//記錄消息存儲(chǔ)時(shí)間->保證消息的有序性messageExtBatch.setStoreTimestamp(beginLockTimestamp);//判斷如果mappedFile如果為空或者已滿,創(chuàng)建新的mappedFile文件if (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise}//如果創(chuàng)建失敗,直接返回if (null == mappedFile) {log.error("Create mapped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));}//!!!寫(xiě)入消息到mappedFile中!!!result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);//根據(jù)寫(xiě)入結(jié)果做不同的處理switch (result.getStatus()) {case PUT_OK:break;case END_OF_FILE:unlockMappedFile = mappedFile;// Create a new file, re-write the messagemappedFile = this.mappedFileQueue.getLastMappedFile(0);if (null == mappedFile) {// XXX: warn and notify melog.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));}result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));case UNKNOWN_ERROR:default:beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));}elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;beginTimeInLock = 0; } finally {putMessageLock.unlock(); }if (elapsedTimeInLock > 500) {log.warn("[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, messageExtBatch.getBody().length, result); }if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {this.defaultMessageStore.unlockMappedFile(unlockMappedFile); }PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);// Statistics storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).add(result.getMsgNum()); storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).add(result.getWroteBytes());//根據(jù)刷盤(pán)策略進(jìn)行刷盤(pán) CompletableFuture<PutMessageStatus> flushOKFuture = submitFlushRequest(result, messageExtBatch); //主從同步 CompletableFuture<PutMessageStatus> replicaOKFuture = submitReplicaRequest(result, messageExtBatch);

MappedFile#appendMessagesInner

assert messageExt != null; assert cb != null;//獲取寫(xiě)指針/寫(xiě)入位置 int currentPos = this.wrotePosition.get();//寫(xiě)指針偏移量小于文件指定大小 if (currentPos < this.fileSize) {//寫(xiě)入緩沖區(qū)ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();byteBuffer.position(currentPos);AppendMessageResult result;//根據(jù)消息類(lèi)型->批量/單個(gè)->進(jìn)行不同處理if (messageExt instanceof MessageExtBrokerInner) {//單個(gè)消息//調(diào)用回調(diào)方法寫(xiě)入磁盤(pán)->CommitLog#doAppendresult = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,(MessageExtBrokerInner) messageExt, putMessageContext);} else if (messageExt instanceof MessageExtBatch) {//批量消息result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,(MessageExtBatch) messageExt, putMessageContext);} else {//未知消息->返回異常結(jié)果return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}//更新寫(xiě)指針this.wrotePosition.addAndGet(result.getWroteBytes());//更新寫(xiě)入時(shí)間戳this.storeTimestamp = result.getStoreTimestamp();//返回寫(xiě)入結(jié)果->成功return result; } log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize); return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);

CommitLog#doAppend

public AppendMessageResult doAppend(final long fileFromOffset, //文件序列偏移量final ByteBuffer byteBuffer, //NIO字節(jié)容器final int maxBlank, //最大可寫(xiě)入字節(jié)數(shù) final MessageExtBrokerInner msgInner, //消息封裝實(shí)體PutMessageContext putMessageContext) {//文件寫(xiě)入偏移量long wroteOffset = fileFromOffset + byteBuffer.position();//構(gòu)建msgIdSupplier<String> msgIdSupplier = () -> {//系統(tǒng)標(biāo)識(shí)int sysflag = msgInner.getSysFlag();//msgId底層存儲(chǔ)由16個(gè)字節(jié)組成int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;//分配16個(gè)字節(jié)的存儲(chǔ)空間ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);//8個(gè)字節(jié)->ip、host各占用4個(gè)字節(jié)MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);//清除緩沖區(qū)->因?yàn)榻酉聛?lái)需要翻轉(zhuǎn)緩沖區(qū)msgIdBuffer.clear();//剩下的8個(gè)字節(jié)用來(lái)存儲(chǔ)commitLog偏移量-wroteOffsetmsgIdBuffer.putLong(msgIdLen - 8, wroteOffset);return UtilAll.bytes2string(msgIdBuffer.array());};//獲取當(dāng)前主題消息隊(duì)列唯一keyString key = putMessageContext.getTopicQueueTableKey();//根據(jù)key獲取消息存儲(chǔ)偏移量Long queueOffset = CommitLog.this.topicQueueTable.get(key);if (null == queueOffset) {queueOffset = 0L;CommitLog.this.topicQueueTable.put(key, queueOffset);}// Transaction messages that require special handlingfinal int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());switch (tranType) {// Prepared and Rollback message is not consumed, will not enter the// consumer queueccase MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:queueOffset = 0L;break;case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:default:break;}ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();//計(jì)算消息存儲(chǔ)長(zhǎng)度final int msgLen = preEncodeBuffer.getInt(0);// Determines whether there is sufficient free space//消息是如果沒(méi)有足夠的存儲(chǔ)空間則新創(chuàng)建CommitLog文件if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {this.msgStoreItemMemory.clear();// 1 TOTALSIZEthis.msgStoreItemMemory.putInt(maxBlank);// 2 MAGICCODEthis.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);// 3 The remaining space may be any value// Here the length of the specially set maxBlankfinal long beginTimeMills = CommitLog.this.defaultMessageStore.now();byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset,maxBlank, /* only wrote 8 bytes, but declare wrote maxBlank for compute write position */msgIdSupplier, msgInner.getStoreTimestamp(),queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);}int pos = 4 + 4 + 4 + 4 + 4;// 6 QUEUEOFFSETpreEncodeBuffer.putLong(pos, queueOffset);pos += 8;// 7 PHYSICALOFFSETpreEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position());int ipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;// 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMPpos += 8 + 4 + 8 + ipLen;// refresh store time stamp in lockpreEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());final long beginTimeMills = CommitLog.this.defaultMessageStore.now();// Write messages to the queue buffer//將消息存儲(chǔ)到byteBuffer中byteBuffer.put(preEncodeBuffer);msgInner.setEncodedBuff(null);//返回AppendMessageResultAppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);switch (tranType) {case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:break;case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:// The next update ConsumeQueue informationCommitLog.this.topicQueueTable.put(key, ++queueOffset);break;default:break;}return result; }

AppendMessageResult

public class AppendMessageResult {private AppendMessageStatus status; //消息追加結(jié)果private long wroteOffset; //消息寫(xiě)入偏移量 private int wroteBytes; //消息待寫(xiě)入字節(jié)private String msgId; //消息ID private Supplier<String> msgIdSupplier; //消息IDprivate long storeTimestamp; //消息寫(xiě)入時(shí)間戳private long logicsOffset; //消息隊(duì)列偏移量private long pagecacheRT = 0; //消息開(kāi)始寫(xiě)入時(shí)間戳 }

返回消息寫(xiě)入結(jié)果,回到CommitLog#asyncPutMessages

result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext); switch (result.getStatus()) {case PUT_OK:break; } //釋放鎖 putMessageLock.unlock(); //存儲(chǔ)數(shù)據(jù)統(tǒng)計(jì) storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).add(result.getMsgNum()); storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).add(result.getWroteBytes());//根據(jù)刷盤(pán)策略進(jìn)行刷盤(pán) CompletableFuture<PutMessageStatus> flushOKFuture = submitFlushRequest(result, messageExtBatch); //消息主從同步 CompletableFuture<PutMessageStatus> replicaOKFuture = submitReplicaRequest(result, messageExtBatch);

4.消息存儲(chǔ)文件

  • commitLog:消息存儲(chǔ)目錄
  • config:配置信息
  • consumerqueue:消息隊(duì)列存儲(chǔ)目錄
  • index:消息索引文件存儲(chǔ)目錄
  • abort:Broker異常關(guān)閉時(shí)信息記錄
  • checkpoint:文件監(jiān)測(cè)點(diǎn),存儲(chǔ)commitlog、consumerqueue、index文件最后一次刷盤(pán)時(shí)間戳。

5.存儲(chǔ)文件內(nèi)存映射

RocketMQ通過(guò)使用內(nèi)存映射文件提高IO訪問(wèn)性能,無(wú)論是CommitLog、ConsumerQueue還是IndexFile,單個(gè)文件都被設(shè)計(jì)為固定長(zhǎng)度。

如果一個(gè)文件寫(xiě)滿以后再創(chuàng)建一個(gè)新文件,文件名就為該文件第一條消息對(duì)應(yīng)的全局物理偏移量,如下圖所示。

5.1.MapperFileQueue

//存儲(chǔ)目錄 private final String storePath;//單個(gè)文件大小 protected final int mappedFileSize;//MappedFile文件集合 protected final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();//映射文件MapperFile分配服務(wù)線程 private final AllocateMappedFileService allocateMappedFileService;//刷盤(pán)指針 protected long flushedWhere = 0;//當(dāng)前數(shù)據(jù)提交指針 private long committedWhere = 0;

根據(jù)存儲(chǔ)時(shí)間獲取對(duì)應(yīng)的MappedFile

public MappedFile getMappedFileByTime(final long timestamp) {//拷貝映射文件Object[] mfs = this.copyMappedFiles(0);if (null == mfs) {return null;}//遍歷映射文件數(shù)組for (int i = 0; i < mfs.length; i++) {MappedFile mappedFile = (MappedFile) mfs[i];//MappedFile的最后修改時(shí)間大于指定時(shí)間戳->返回該文件if (mappedFile.getLastModifiedTimestamp() >= timestamp) {return mappedFile;}}return (MappedFile) mfs[mfs.length - 1]; }

根據(jù)消息存儲(chǔ)偏移量查找MappedFile

public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {try {//分別獲取第一個(gè)和最后一個(gè)映射文件MappedFile firstMappedFile = this.getFirstMappedFile();MappedFile lastMappedFile = this.getLastMappedFile();//第一個(gè)文件和最后一個(gè)文件均不為空,則進(jìn)行處理if (firstMappedFile != null && lastMappedFile != null) {if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",offset,firstMappedFile.getFileFromOffset(),lastMappedFile.getFileFromOffset() + this.mappedFileSize,this.mappedFileSize,this.mappedFiles.size());} else {//獲得文件索引int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));//目標(biāo)映射文件MappedFile targetFile = null;try {//根據(jù)文件索引查找目標(biāo)文件targetFile = this.mappedFiles.get(index);} catch (Exception ignored) {}//對(duì)獲取到的映射文件進(jìn)行檢查-判空-偏移量是否合法if (targetFile != null && offset >= targetFile.getFileFromOffset()&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {return targetFile;}//繼續(xù)選擇映射文件for (MappedFile tmpMappedFile : this.mappedFiles) {if (offset >= tmpMappedFile.getFileFromOffset()&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {return tmpMappedFile;}}}//返回第一個(gè)映射文件if (returnFirstOnNotFound) {return firstMappedFile;}}} catch (Exception e) {log.error("findMappedFileByOffset Exception", e);}return null; }

獲取存儲(chǔ)文件最小偏移量

public long getMinOffset() {if (!this.mappedFiles.isEmpty()) {try {return this.mappedFiles.get(0).getFileFromOffset();} catch (IndexOutOfBoundsException e) {//continue;} catch (Exception e) {log.error("getMinOffset has exception.", e);}}return -1; }

獲取存儲(chǔ)文件最大偏移量

public long getMaxOffset() {//最后一個(gè)映射文件MappedFile mappedFile = getLastMappedFile();if (mappedFile != null) {return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();}return 0; }

獲取存儲(chǔ)文件當(dāng)前寫(xiě)指針

public long getMaxWrotePosition() {MappedFile mappedFile = getLastMappedFile();if (mappedFile != null) {return mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();}return 0; }

5.2.MappedFile

//操作系統(tǒng)每頁(yè)刷寫(xiě)大小,默認(rèn)4K public static final int OS_PAGE_SIZE = 1024 * 4; //當(dāng)前JVM實(shí)例中MappedFile虛擬內(nèi)存 private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0); //當(dāng)前JVM實(shí)例中MappedFile對(duì)象個(gè)數(shù) private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0); //當(dāng)前文件的寫(xiě)指針 protected final AtomicInteger wrotePosition = new AtomicInteger(0); //當(dāng)前文件的提交指針 protected final AtomicInteger committedPosition = new AtomicInteger(0); //刷盤(pán)指針 private final AtomicInteger flushedPosition = new AtomicInteger(0); //文件大小 protected int fileSize; //文件通道 protected FileChannel fileChannel;/*** 堆外內(nèi)存ByteBuffer* Message will put to here first, and then reput to FileChannel if writeBuffer is not null.*/ protected ByteBuffer writeBuffer = null; //堆外內(nèi)存池 protected TransientStorePool transientStorePool = null; //文件名稱 private String fileName; //該文件的處理偏移量 private long fileFromOffset; //物理文件 private File file; //文件映射緩沖區(qū) private MappedByteBuffer mappedByteBuffer; //存儲(chǔ)時(shí)間戳 private volatile long storeTimestamp = 0; //是否是初次創(chuàng)建 private boolean firstCreateInQueue = false;

MappedFile初始化

private void init(final String fileName, final int fileSize) throws IOException {this.fileName = fileName;this.fileSize = fileSize;this.file = new File(fileName);this.fileFromOffset = Long.parseLong(this.file.getName());boolean ok = false;//確保文件目錄正確ensureDirOK(this.file.getParent());try {this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);TOTAL_MAPPED_FILES.incrementAndGet();ok = true;} catch (FileNotFoundException e) {log.error("Failed to create file " + this.fileName, e);throw e;} catch (IOException e) {log.error("Failed to map file " + this.fileName, e);throw e;} finally {if (!ok && this.fileChannel != null) {this.fileChannel.close();}} }

值得注意的是MappedFile還有一個(gè)屬性值transientStorePoolEnable,當(dāng)這個(gè)屬性值為true時(shí),數(shù)據(jù)會(huì)先存儲(chǔ)到對(duì)外內(nèi)存,如何通過(guò)commit線程將數(shù)據(jù)提交到內(nèi)存映射buffer中,最后通過(guò)flush線程將內(nèi)存映射刷寫(xiě)到磁盤(pán)中。

開(kāi)啟transientStorePoolEnable

public void init(final String fileName, final int fileSize,final TransientStorePool transientStorePool) throws IOException {init(fileName, fileSize);//初始化對(duì)外內(nèi)存緩沖區(qū)this.writeBuffer = transientStorePool.borrowBuffer();this.transientStorePool = transientStorePool; }
5.2.1.commit

刷盤(pán)文件提交流程大致如下:

DefaultMessageStore#flush→CommitLog→MappedFileQueue→MappedFile

//DefaultMessageStore public long flush() {return this.commitLog.flush(); } //CommitLog public long flush() {//----------↓-----------this.mappedFileQueue.commit(0);this.mappedFileQueue.flush(0);return this.mappedFileQueue.getFlushedWhere(); } //MappedFileQueue public boolean commit(final int commitLeastPages) {boolean result = true;MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);if (mappedFile != null) {//----------↓-----------int offset = mappedFile.commit(commitLeastPages);long where = mappedFile.getFileFromOffset() + offset;result = where == this.committedWhere;this.committedWhere = where;}return result; }

最后進(jìn)入MappedFile進(jìn)行數(shù)據(jù)刷寫(xiě)提交:

MappedFile#commit

public int commit(final int commitLeastPages) {//如果為空->說(shuō)明沒(méi)有開(kāi)啟transientStorePoolEnable->無(wú)需向文件通道fileChannel提交數(shù)據(jù) //將wrotePosition視為committedPosition并返回->然后直接進(jìn)行flush操作if (writeBuffer == null) {return this.wrotePosition.get();}//提交數(shù)據(jù)頁(yè)數(shù)大于commitLeastPagesif (this.isAbleToCommit(commitLeastPages)) {//MappedFile是否被銷(xiāo)毀//hold()->isAvailable()->MappedFile.available<屬性繼承于ReferenceResource>//文件如何被摧毀可見(jiàn)下文中的shutdown()if (this.hold()) {//--↓--commit0();this.release();} else {log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());}}// All dirty data has been committed to FileChannel.// 所有數(shù)據(jù)提交后,清空緩沖區(qū)if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {this.transientStorePool.returnBuffer(writeBuffer);this.writeBuffer = null;}return this.committedPosition.get(); }

MappedFile#isAbleToCommit

//已提交刷盤(pán)的指針 int flush = this.committedPosition.get(); //文件寫(xiě)指針 int write = this.wrotePosition.get();//刷盤(pán)已寫(xiě)滿 if (this.isFull()) {return true; }if (commitLeastPages > 0) {//文件內(nèi)容達(dá)到commitLeastPages->進(jìn)行刷盤(pán)return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages; } return write > flush;

MappedFile#commit0

//寫(xiě)指針 int writePos = this.wrotePosition.get(); //上次提交指針 int lastCommittedPosition = this.committedPosition.get(); //寫(xiě)指針一定要大于上次提交指針 if (writePos - lastCommittedPosition > 0) {try {//復(fù)制共享內(nèi)存區(qū)域ByteBuffer byteBuffer = writeBuffer.slice();//設(shè)置提交位置是上次提交位置byteBuffer.position(lastCommittedPosition);//最大提交數(shù)量byteBuffer.limit(writePos);//設(shè)置fileChannel位置是上次提交位置this.fileChannel.position(lastCommittedPosition);//將lastCommittedPosition到writePos的數(shù)據(jù)復(fù)制到FileChannel中this.fileChannel.write(byteBuffer);//重置提交位置為writePos->以此反復(fù)避免提交重復(fù)數(shù)據(jù)this.committedPosition.set(writePos);} catch (Throwable e) {log.error("Error occurred when commit data to FileChannel.", e);} }
5.2.2.flush

刷寫(xiě)磁盤(pán),直接調(diào)用MappedByteBuffer或fileChannel的force方法將內(nèi)存中的數(shù)據(jù)持久化到磁盤(pán),那么flushedPosition應(yīng)該等于MappedByteBuffer中的寫(xiě)指針;

  • 如果writeBuffer不為空,則flushPosition應(yīng)該等于上一次的commit指針;因?yàn)樯弦淮翁峤坏臄?shù)據(jù)就是進(jìn)入到MappedByteBuffer中的數(shù)據(jù);
  • 如果writeBuffer為空,數(shù)據(jù)時(shí)直接進(jìn)入到MappedByteBuffer,wrotePosition代表的是MappedByteBuffer中的指針,故設(shè)置flushPosition為wrotePosition。

提交數(shù)據(jù)到fileChannel后開(kāi)始刷盤(pán),步驟如下:

CommitLog#flush→MappedFileQueue#flush→MappedFile#flush

MappedFile#flush

//達(dá)到刷盤(pán)條件 if (this.isAbleToFlush(flushLeastPages)) {//加鎖,同步刷盤(pán)if (this.hold()) {//讀指針int value = getReadPosition();try {//開(kāi)啟TransientStorePool->fileChannel//關(guān)閉TransientStorePool->mappedByteBuffer//We only append data to fileChannel or mappedByteBuffer, never both.//數(shù)據(jù)從writeBuffer提交數(shù)據(jù)到fileChannel->forceif (writeBuffer != null || this.fileChannel.position() != 0) {this.fileChannel.force(false);}//數(shù)據(jù)直接傳到mappedByteBuffer->forceelse {this.mappedByteBuffer.force();}} catch (Throwable e) {log.error("Error occurred when force data to disk.", e);}//更新刷盤(pán)位置this.flushedPosition.set(value);this.release();} else {log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());this.flushedPosition.set(getReadPosition());} } return this.getFlushedPosition();

MappedFile#getReadPosition

/*** 獲取當(dāng)前文件最大可讀指針* @return The max position which have valid data*/ public int getReadPosition() {//如果writeBuffer為空直接返回當(dāng)前的寫(xiě)指針,否則返回上次提交的指針//在MappedFile中,只有提交了的數(shù)據(jù)(寫(xiě)入到MappedByteBuffer或FileChannel中的數(shù)據(jù))才是安全的數(shù)據(jù)return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get(); }

MappedFile#shutdown

MappedFile文件銷(xiāo)毀的實(shí)現(xiàn)方法為ReferenceResource中的public boolean destory(long intervalForcibly),intervalForcibly表示拒絕被銷(xiāo)毀的最大存活時(shí)間。

if (this.available) {//關(guān)閉MappedFilethis.available = false;//設(shè)置關(guān)閉時(shí)間戳this.firstShutdownTimestamp = System.currentTimeMillis();//釋放資源this.release(); } else if (this.getRefCount() > 0) {if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {this.refCount.set(-1000 - this.getRefCount());this.release();} }

5.3.TransientStorePool

用于短時(shí)間存儲(chǔ)數(shù)據(jù)的存儲(chǔ)池。RocketMQ單獨(dú)創(chuàng)建ByteBuffer內(nèi)存緩沖區(qū),用來(lái)臨時(shí)存儲(chǔ)數(shù)據(jù),數(shù)據(jù)先寫(xiě)入該內(nèi)存映射,然后由commit線程將數(shù)據(jù)復(fù)制到目標(biāo)物理文件所對(duì)應(yīng)的內(nèi)存映射中。RocketMQ引入該機(jī)制主要的原因是提供一種內(nèi)存鎖定,將當(dāng)前堆外內(nèi)存一直鎖定在內(nèi)存中,避免被進(jìn)程將內(nèi)存交換到磁盤(pán)。

private final int poolSize; //availableBuffers個(gè)數(shù) private final int fileSize; //每個(gè)ByteBuffer大小 private final Deque<ByteBuffer> availableBuffers; //雙端隊(duì)列-存儲(chǔ)可用緩沖區(qū)的容器 private final MessageStoreConfig storeConfig; //消息存儲(chǔ)配置

初始化:

public void init() {//創(chuàng)建poolSize個(gè)堆外內(nèi)存區(qū)for (int i = 0; i < poolSize; i++) {//分配內(nèi)存ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);//內(nèi)存地址final long address = ((DirectBuffer) byteBuffer).address();Pointer pointer = new Pointer(address);//使用com.sun.jna.Library類(lèi)庫(kù)將該批內(nèi)存鎖定,避免被置換到交換區(qū),提高存儲(chǔ)性能LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));availableBuffers.offer(byteBuffer);} }

6.刷盤(pán)機(jī)制

6.1.同步刷盤(pán)

CommitLog#submitFlushRequest

//同步刷盤(pán) if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {//刷寫(xiě)CommitLog服務(wù)線程final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;//需要等待消息存儲(chǔ)結(jié)果if (messageExt.isWaitStoreMsgOK()) {//封裝刷盤(pán)請(qǐng)求GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());//將request放入刷寫(xiě)磁盤(pán)服務(wù)線程中//--------↓--------service.putRequest(request);//等待寫(xiě)入結(jié)果返回return request.future();} else {//喚醒同步刷盤(pán)線程service.wakeup();return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);} } else {//異步刷盤(pán).... }

GroupCommitRequest

public static class GroupCommitRequest {private final long nextOffset;private CompletableFuture<PutMessageStatus> flushOKFuture = new CompletableFuture<>();private final long startTimestamp = System.currentTimeMillis();private long timeoutMillis = Long.MAX_VALUE; }

GroupCommitService

class GroupCommitService extends FlushCommitLogService {//分別存儲(chǔ)寫(xiě)請(qǐng)求和讀請(qǐng)求的容器private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>();private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>();//消息存儲(chǔ)自旋鎖-保護(hù)以上容器線程安全private final PutMessageSpinLock lock = new PutMessageSpinLock(); }

GroupCommitService#putRequest

//加上自旋鎖 lock.lock(); try {//將寫(xiě)請(qǐng)求放入容器this.requestsWrite.add(request); } finally {lock.unlock(); } //喚醒線程 this.wakeup();

GroupCommitService#run

CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) {try {//等待線程10sthis.waitForRunning(10);//執(zhí)行提交任務(wù)this.doCommit();} catch (Exception e) {CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);} } // Under normal circumstances shutdown, wait for the arrival of the // request, and then flush try {Thread.sleep(10); } catch (InterruptedException e) {CommitLog.log.warn("GroupCommitService Exception, ", e); } synchronized (this) {this.swapRequests(); } this.doCommit(); CommitLog.log.info(this.getServiceName() + " service end");

GroupCommitService#doCommit

if (!this.requestsRead.isEmpty()) {//遍歷requestsReadfor (GroupCommitRequest req : this.requestsRead) {//刷盤(pán)后指針位置大于請(qǐng)求指針偏移量則代表已經(jīng)刷盤(pán)成功//下一個(gè)文件中可能有消息,所以最多兩次flushboolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();for (int i = 0; i < 2 && !flushOK; i++) {CommitLog.this.mappedFileQueue.flush(0);flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();}//喚醒發(fā)送消息客戶端 req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);}//更新刷盤(pán)監(jiān)測(cè)點(diǎn)long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();if (storeTimestamp > 0) {CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);}//清空任務(wù)容器this.requestsRead = new LinkedList<>(); } else {//因?yàn)閭€(gè)別消息設(shè)置為異步flush,所以會(huì)走到這個(gè)過(guò)程CommitLog.this.mappedFileQueue.flush(0); }
6.2.異步刷盤(pán)

在消息追加到內(nèi)存后,立即返回給消息發(fā)送端。如果開(kāi)啟transientStorePoolEnable,RocketMQ會(huì)單獨(dú)申請(qǐng)一個(gè)與目標(biāo)物理文件(commitLog)同樣大小的堆外內(nèi)存,該堆外內(nèi)存將使用內(nèi)存鎖定,確保不會(huì)被置換到虛擬內(nèi)存中去,消息首先追加到堆外內(nèi)存,然后提交到物理文件的內(nèi)存映射中,然后刷寫(xiě)到磁盤(pán)。如果未開(kāi)啟transientStorePoolEnable,消息直接追加到物理文件直接映射文件中,然后刷寫(xiě)到磁盤(pán)中。

開(kāi)啟transientStorePoolEnable后異步刷盤(pán)步驟:

  • 將消息直接追加到ByteBuffer堆外內(nèi)存
  • CommitRealTimeService線程每隔200ms將ByteBuffer中的消息提交到fileChannel
  • commit操作成功,將commitedPosition向后移動(dòng)
  • FlushRealTimeService線程每隔500ms將fileChannel的數(shù)據(jù)刷寫(xiě)到磁盤(pán)
// Synchronization flush if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {... } // Asynchronous flush else {//開(kāi)啟TransientStorePoolEnableif (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {//喚醒flushCommitLogService服務(wù)線程flushCommitLogService.wakeup();} else {commitLogService.wakeup();}return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); }

CommitRealTimeService#run

提交線程工作機(jī)制:

//間隔時(shí)間:200msint interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();//一次提交的最少頁(yè)數(shù):4int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();//兩次提交的最大間隔:200msint commitDataThoroughInterval =CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();//上次提交間隔超過(guò)commitDataThoroughInterval,則忽略提交commitDataLeastPages參數(shù),直接提交long begin = System.currentTimeMillis();if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {this.lastCommitTimestamp = begin;//忽略提交頁(yè)數(shù)要求commitDataLeastPages = 0;}try {//執(zhí)行提交操作,將待提交數(shù)據(jù)提交到物理文件的內(nèi)存映射區(qū)并返回提交結(jié)果boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);long end = System.currentTimeMillis();//提交成功if (!result) {this.lastCommitTimestamp = end; // result = false means some data committed.//now wake up flush thread.//喚醒刷盤(pán)線程FlushRealTimeService(FlushCommitLogService的子類(lèi))flushCommitLogService.wakeup();}if (end - begin > 500) {log.info("Commit data to file costs {} ms", end - begin);}this.waitForRunning(interval);} catch (Throwable e) {CommitLog.log.error(this.getServiceName() + " service has exception. ", e);} }

FlushCommitLogService#run

刷盤(pán)線程工作機(jī)制:

//線程不停止 while (!this.isStopped()) {//線程執(zhí)行間隔:500msint interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();//一次刷盤(pán)任務(wù)最少包含頁(yè)數(shù):4int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();//兩次刷盤(pán)任務(wù)最大間隔:10sint flushPhysicQueueThoroughInterval =CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();boolean printFlushProgress = false;// Print flush progresslong currentTimeMillis = System.currentTimeMillis();//如果當(dāng)前時(shí)間戳大于上次刷盤(pán)時(shí)間+最大刷盤(pán)任務(wù)間隔 則本次刷盤(pán)任務(wù)忽略flushPhysicQueueLeastPages(設(shè)置為0) 直接提交刷盤(pán)任務(wù)if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {this.lastFlushTimestamp = currentTimeMillis;flushPhysicQueueLeastPages = 0;printFlushProgress = (printTimes++ % 10) == 0;}try {if (flushCommitLogTimed) {//線程執(zhí)行間隔-500mThread.sleep(interval);} else {this.waitForRunning(interval);}if (printFlushProgress) {this.printFlushProgress();}long begin = System.currentTimeMillis();//刷寫(xiě)磁盤(pán)CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);//更新存儲(chǔ)監(jiān)測(cè)點(diǎn)文件的時(shí)間戳long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();if (storeTimestamp > 0) {CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);}long past = System.currentTimeMillis() - begin;if (past > 500) {log.info("Flush data to disk costs {} ms", past);}} catch (Throwable e) {CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);this.printFlushProgress();} }

本文僅作為個(gè)人學(xué)習(xí)使用,如有不足或錯(cuò)誤請(qǐng)指正!

總結(jié)

以上是生活随笔為你收集整理的RocketMQ:消息存储机制详解与源码解析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。