RocketMQ 源码分析 事务消息
生活随笔
收集整理的這篇文章主要介紹了
RocketMQ 源码分析 事务消息
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
為什么80%的碼農都做不了架構師?>>> ??
1. 概述
必須必須必須?前置閱讀內容:
- 《事務消息(阿里云)》
2. 事務消息發送
2.1 Producer 發送事務消息
- 活動圖如下(結合?核心代碼?理解):
- 實現代碼如下:
2.2 Broker 處理結束事務請求
- ?查詢請求的消息,進行提交 / 回滾。實現代碼如下:
2.3 Broker 生成 ConsumeQueue
- ?事務消息,提交(COMMIT)后才生成?ConsumeQueue。
3. 事務消息回查
- 【事務消息回查】功能曾經開源過,目前(V4.0.0)暫未開源。如下是該功能的開源情況:
| 官方V3.0.4 ~ V3.1.4 | 基于 文件系統 實現 | 已開源 |
| 官方V3.1.5 ~ V4.0.0 | 基于 數據庫 實現 | 未完全開源 |
我們來看看兩種情況下是怎么實現的。
3.1 Broker 發起【事務消息回查】
3.1.1 官方V3.1.4:基于文件系統
倉庫地址:https://github.com/YunaiV/rocketmq-3.1.9/tree/release_3.1.4
相較于普通消息,【事務消息】多依賴如下三個組件:
- TransactionStateService?:事務狀態服務,負責對【事務消息】進行管理,包括存儲與更新事務消息狀態、回查事務消息狀態等等。
- TranStateTable?:【事務消息】狀態存儲。基于?MappedFileQueue?實現,默認存儲路徑為~/store/transaction/statetable,每條【事務消息】狀態存儲結構如下:
| 1 | offset | CommitLog 物理存儲位置 | Long | 8 |
| 2 | size | 消息長度 | Int | 4 |
| 3 | timestamp | 消息存儲時間,單位:秒 | Int | 4 |
| 4 | producerGroupHash | producerGroup 求 HashCode | Int | 4 |
| 5 | state | 事務狀態 | Int | 4 |
- TranRedoLog?:TranStateTable?重放日志,每次寫操作?TranStateTable?記錄重放日志。當Broker?異常關閉時,使用?TranRedoLog?恢復?TranStateTable。基于?ConsumeQueue?實現,Topic?為?TRANSACTION_REDOLOG_TOPIC_XXXX,默認存儲路徑為?~/store/transaction/redolog。
簡單手繪邏輯圖如下?:
3.1.1.1 存儲消息到 CommitLog
- 存儲【half消息】到?CommitLog?時,消息隊列位置(queueOffset)使用?TranStateTable?最大物理位置(可寫入物理位置)。這樣,消息可以索引到自己對應的?TranStateTable?的位置和記錄。
核心代碼如下:
1: // 【DefaultAppendMessageCallback.java】2: class DefaultAppendMessageCallback implements AppendMessageCallback {3: public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final Object msg) {4: // ... ...5: 6: // 事務消息需要特殊處理 7: final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());8: switch (tranType) {9: case MessageSysFlag.TransactionPreparedType: // 消息隊列位置(queueOffset)使用 TranStateTable 最大物理位置(可寫入物理位置) 10: queueOffset = CommitLog.this.defaultMessageStore.getTransactionStateService().getTranStateTableOffset().get(); 11: break; 12: case MessageSysFlag.TransactionRollbackType: 13: queueOffset = msgInner.getQueueOffset(); 14: break; 15: case MessageSysFlag.TransactionNotType: 16: case MessageSysFlag.TransactionCommitType: 17: default: 18: break; 19: } 20: 21: // ... ... 22: 23: switch (tranType) { 24: case MessageSysFlag.TransactionPreparedType: 25: // 更新 TranStateTable 最大物理位置(可寫入物理位置) 26: CommitLog.this.defaultMessageStore.getTransactionStateService().getTranStateTableOffset().incrementAndGet(); 27: break; 28: case MessageSysFlag.TransactionRollbackType: 29: break; 30: case MessageSysFlag.TransactionNotType: 31: case MessageSysFlag.TransactionCommitType: 32: // 更新下一次的ConsumeQueue信息 33: CommitLog.this.topicQueueTable.put(key, ++queueOffset); 34: break; 35: default: 36: break; 37: } 38: 39: // 返回結果 40: return result; 41: } 42: }3.1.1.2 寫【事務消息】狀態存儲(TranStateTable)
- 處理【Half消息】時,新增【事務消息】狀態存儲(TranStateTable)。
- 處理【Commit / Rollback消息】時,更新 【事務消息】狀態存儲(TranStateTable) COMMIT / ROLLBACK。
- 每次寫操作【事務消息】狀態存儲(TranStateTable),記錄重放日志(TranRedoLog)。
核心代碼如下:
1: // 【DispatchMessageService.java】2: private void doDispatch() {3: if (!this.requestsRead.isEmpty()) {4: for (DispatchRequest req : this.requestsRead) {5: 6: // ... ...7: 8: // 2、寫【事務消息】狀態存儲(TranStateTable)9: if (req.getProducerGroup() != null) {10: switch (tranType) {11: case MessageSysFlag.TransactionNotType:12: break;13: case MessageSysFlag.TransactionPreparedType:14: // 新增 【事務消息】狀態存儲(TranStateTable)15: DefaultMessageStore.this.getTransactionStateService().appendPreparedTransaction(16: req.getCommitLogOffset(), req.getMsgSize(), (int) (req.getStoreTimestamp() / 1000), req.getProducerGroup().hashCode());17: break;18: case MessageSysFlag.TransactionCommitType:19: case MessageSysFlag.TransactionRollbackType:20: // 更新 【事務消息】狀態存儲(TranStateTable) COMMIT / ROLLBACK21: DefaultMessageStore.this.getTransactionStateService().updateTransactionState(22: req.getTranStateTableOffset(), req.getPreparedTransactionOffset(), req.getProducerGroup().hashCode(), tranType);23: break;24: }25: }26: // 3、記錄 TranRedoLog27: switch (tranType) {28: case MessageSysFlag.TransactionNotType:29: break;30: case MessageSysFlag.TransactionPreparedType:31: // 記錄 TranRedoLog32: DefaultMessageStore.this.getTransactionStateService().getTranRedoLog().putMessagePostionInfoWrapper(33: req.getCommitLogOffset(), req.getMsgSize(), TransactionStateService.PreparedMessageTagsCode,34: req.getStoreTimestamp(), 0L);35: break;36: case MessageSysFlag.TransactionCommitType:37: case MessageSysFlag.TransactionRollbackType:38: // 記錄 TranRedoLog39: DefaultMessageStore.this.getTransactionStateService().getTranRedoLog().putMessagePostionInfoWrapper(40: req.getCommitLogOffset(), req.getMsgSize(), req.getPreparedTransactionOffset(),41: req.getStoreTimestamp(), 0L);42: break;43: }44: }45: 46: // ...省略代碼47: }48: }49: // ??????【TransactionStateService.java】50: /**51: * 新增事務狀態52: *53: * @param clOffset commitLog 物理位置54: * @param size 消息長度55: * @param timestamp 消息存儲時間56: * @param groupHashCode groupHashCode57: * @return 是否成功58: */59: public boolean appendPreparedTransaction(//60: final long clOffset,//61: final int size,//62: final int timestamp,//63: final int groupHashCode//64: ) {65: MapedFile mapedFile = this.tranStateTable.getLastMapedFile();66: if (null == mapedFile) {67: log.error("appendPreparedTransaction: create mapedfile error.");68: return false;69: }70: 71: // 首次創建,加入定時任務中72: if (0 == mapedFile.getWrotePostion()) {73: this.addTimerTask(mapedFile);74: }75: 76: this.byteBufferAppend.position(0);77: this.byteBufferAppend.limit(TSStoreUnitSize);78: 79: // Commit Log Offset80: this.byteBufferAppend.putLong(clOffset);81: // Message Size82: this.byteBufferAppend.putInt(size);83: // Timestamp84: this.byteBufferAppend.putInt(timestamp);85: // Producer Group Hashcode86: this.byteBufferAppend.putInt(groupHashCode);87: // Transaction State88: this.byteBufferAppend.putInt(MessageSysFlag.TransactionPreparedType);89: 90: return mapedFile.appendMessage(this.byteBufferAppend.array());91: }92: 93: /**94: * 更新事務狀態95: *96: * @param tsOffset tranStateTable 物理位置97: * @param clOffset commitLog 物理位置98: * @param groupHashCode groupHashCode99: * @param state 事務狀態 100: * @return 是否成功 101: */ 102: public boolean updateTransactionState( 103: final long tsOffset, 104: final long clOffset, 105: final int groupHashCode, 106: final int state) { 107: SelectMapedBufferResult selectMapedBufferResult = this.findTransactionBuffer(tsOffset); 108: if (selectMapedBufferResult != null) { 109: try { 110: // 校驗是否能夠更新 111: // .... ... 112: 113: // 更新事務狀態 114: selectMapedBufferResult.getByteBuffer().putInt(TS_STATE_POS, state); 115: } 116: catch (Exception e) { 117: log.error("updateTransactionState exception", e); 118: } 119: finally { 120: selectMapedBufferResult.release(); 121: } 122: } 123: 124: return false; 125: }3.1.1.3 【事務消息】回查
- TranStateTable?每個?MappedFile?都對應一個?Timer。Timer?固定周期(默認:60s)遍歷MappedFile,查找【half消息】,向?Producer?發起【事務消息】回查請求。【事務消息】回查結果的邏輯不在此處進行,在?CommitLog dispatch時執行。
實現代碼如下:
1: // 【TransactionStateService.java】2: /**3: * 初始化定時任務4: */5: private void initTimerTask() {6: //7: final List<MapedFile> mapedFiles = this.tranStateTable.getMapedFiles();8: for (MapedFile mf : mapedFiles) {9: this.addTimerTask(mf);10: }11: }12: 13: /**14: * 每個文件初始化定時任務15: * @param mf 文件16: */17: private void addTimerTask(final MapedFile mf) {18: this.timer.scheduleAtFixedRate(new TimerTask() {19: private final MapedFile mapedFile = mf;20: private final TransactionCheckExecuter transactionCheckExecuter = TransactionStateService.this.defaultMessageStore.getTransactionCheckExecuter();21: private final long checkTransactionMessageAtleastInterval = TransactionStateService.this.defaultMessageStore.getMessageStoreConfig()22: .getCheckTransactionMessageAtleastInterval();23: private final boolean slave = TransactionStateService.this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE;24: 25: @Override26: public void run() {27: // Slave不需要回查事務狀態28: if (slave) {29: return;30: }31: // Check功能是否開啟32: if (!TransactionStateService.this.defaultMessageStore.getMessageStoreConfig()33: .isCheckTransactionMessageEnable()) {34: return;35: }36: 37: try {38: SelectMapedBufferResult selectMapedBufferResult = mapedFile.selectMapedBuffer(0);39: if (selectMapedBufferResult != null) {40: long preparedMessageCountInThisMapedFile = 0; // 回查的【half消息】數量41: int i = 0;42: try {43: // 循環每條【事務消息】狀態,對【half消息】進行回查44: for (; i < selectMapedBufferResult.getSize(); i += TSStoreUnitSize) {45: selectMapedBufferResult.getByteBuffer().position(i);46: 47: // Commit Log Offset48: long clOffset = selectMapedBufferResult.getByteBuffer().getLong();49: // Message Size50: int msgSize = selectMapedBufferResult.getByteBuffer().getInt();51: // Timestamp52: int timestamp = selectMapedBufferResult.getByteBuffer().getInt();53: // Producer Group Hashcode54: int groupHashCode = selectMapedBufferResult.getByteBuffer().getInt();55: // Transaction State56: int tranType = selectMapedBufferResult.getByteBuffer().getInt();57: 58: // 已經提交或者回滾的消息跳過59: if (tranType != MessageSysFlag.TransactionPreparedType) {60: continue;61: }62: 63: // 遇到時間不符合最小輪詢間隔,終止64: long timestampLong = timestamp * 1000;65: long diff = System.currentTimeMillis() - timestampLong;66: if (diff < checkTransactionMessageAtleastInterval) {67: break;68: }69: 70: preparedMessageCountInThisMapedFile++;71: 72: // 回查Producer73: try {74: this.transactionCheckExecuter.gotoCheck(groupHashCode, getTranStateOffset(i), clOffset, msgSize);75: } catch (Exception e) {76: tranlog.warn("gotoCheck Exception", e);77: }78: }79: 80: // 無回查的【half消息】數量,且遍歷完,則終止定時任務81: if (0 == preparedMessageCountInThisMapedFile //82: && i == mapedFile.getFileSize()) {83: tranlog.info("remove the transaction timer task, because no prepared message in this mapedfile[{}]", mapedFile.getFileName());84: this.cancel();85: }86: } finally {87: selectMapedBufferResult.release();88: }89: 90: tranlog.info("the transaction timer task execute over in this period, {} Prepared Message: {} Check Progress: {}/{}", mapedFile.getFileName(),//91: preparedMessageCountInThisMapedFile, i / TSStoreUnitSize, mapedFile.getFileSize() / TSStoreUnitSize);92: } else if (mapedFile.isFull()) {93: tranlog.info("the mapedfile[{}] maybe deleted, cancel check transaction timer task", mapedFile.getFileName());94: this.cancel();95: return;96: }97: } catch (Exception e) {98: log.error("check transaction timer task Exception", e);99: } 100: } 101: 102: 103: private long getTranStateOffset(final long currentIndex) { 104: long offset = (this.mapedFile.getFileFromOffset() + currentIndex) / TransactionStateService.TSStoreUnitSize; 105: return offset; 106: } 107: }, 1000 * 60, this.defaultMessageStore.getMessageStoreConfig().getCheckTransactionMessageTimerInterval()); 108: } 109: 110: // 【DefaultTransactionCheckExecuter.java】 111: @Override 112: public void gotoCheck(int producerGroupHashCode, long tranStateTableOffset, long commitLogOffset, 113: int msgSize) { 114: // 第一步、查詢Producer 115: final ClientChannelInfo clientChannelInfo = this.brokerController.getProducerManager().pickProducerChannelRandomly(producerGroupHashCode); 116: if (null == clientChannelInfo) { 117: log.warn("check a producer transaction state, but not find any channel of this group[{}]", producerGroupHashCode); 118: return; 119: } 120: 121: // 第二步、查詢消息 122: SelectMapedBufferResult selectMapedBufferResult = this.brokerController.getMessageStore().selectOneMessageByOffset(commitLogOffset, msgSize); 123: if (null == selectMapedBufferResult) { 124: log.warn("check a producer transaction state, but not find message by commitLogOffset: {}, msgSize: ", commitLogOffset, msgSize); 125: return; 126: } 127: 128: // 第三步、向Producer發起請求 129: final CheckTransactionStateRequestHeader requestHeader = new CheckTransactionStateRequestHeader(); 130: requestHeader.setCommitLogOffset(commitLogOffset); 131: requestHeader.setTranStateTableOffset(tranStateTableOffset); 132: this.brokerController.getBroker2Client().checkProducerTransactionState(clientChannelInfo.getChannel(), requestHeader, selectMapedBufferResult); 133: }3.1.1.4 初始化【事務消息】狀態存儲(TranStateTable)
- 根據最后 Broker 關閉是否正常,會有不同的初始化方式。
核心代碼如下:
?
1: // 【TransactionStateService.java】2: /**3: * 初始化 TranRedoLog4: * @param lastExitOK 是否正常退出5: */6: public void recoverStateTable(final boolean lastExitOK) {7: if (lastExitOK) {8: this.recoverStateTableNormal();9: } else {10: // 第一步,刪除State Table11: this.tranStateTable.destroy();12: // 第二步,通過RedoLog全量恢復StateTable13: this.recreateStateTable();14: }15: }16: 17: /**18: * 掃描 TranRedoLog 重建 StateTable19: */20: private void recreateStateTable() {21: this.tranStateTable = new MapedFileQueue(StorePathConfigHelper.getTranStateTableStorePath(defaultMessageStore22: .getMessageStoreConfig().getStorePathRootDir()), defaultMessageStore23: .getMessageStoreConfig().getTranStateTableMapedFileSize(), null);24: 25: final TreeSet<Long> preparedItemSet = new TreeSet<Long>();26: 27: // 第一步,從頭掃描RedoLog28: final long minOffset = this.tranRedoLog.getMinOffsetInQuque();29: long processOffset = minOffset;30: while (true) {31: SelectMapedBufferResult bufferConsumeQueue = this.tranRedoLog.getIndexBuffer(processOffset);32: if (bufferConsumeQueue != null) {33: try {34: long i = 0;35: for (; i < bufferConsumeQueue.getSize(); i += ConsumeQueue.CQStoreUnitSize) {36: long offsetMsg = bufferConsumeQueue.getByteBuffer().getLong();37: int sizeMsg = bufferConsumeQueue.getByteBuffer().getInt();38: long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();39: 40: if (TransactionStateService.PreparedMessageTagsCode == tagsCode) { // Prepared41: preparedItemSet.add(offsetMsg);42: } else { // Commit/Rollback43: preparedItemSet.remove(tagsCode);44: }45: }46: 47: processOffset += i;48: } finally { // 必須釋放資源49: bufferConsumeQueue.release();50: }51: } else {52: break;53: }54: }55: log.info("scan transaction redolog over, End offset: {}, Prepared Transaction Count: {}", processOffset, preparedItemSet.size());56: 57: // 第二步,重建StateTable58: Iterator<Long> it = preparedItemSet.iterator();59: while (it.hasNext()) {60: Long offset = it.next();61: MessageExt msgExt = this.defaultMessageStore.lookMessageByOffset(offset);62: if (msgExt != null) {63: this.appendPreparedTransaction(msgExt.getCommitLogOffset(), msgExt.getStoreSize(),64: (int) (msgExt.getStoreTimestamp() / 1000),65: msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP).hashCode());66: this.tranStateTableOffset.incrementAndGet();67: }68: }69: }70: 71: /**72: * 加載(解析)TranStateTable 的 MappedFile73: * 1. 清理多余 MappedFile,設置最后一個 MappedFile的寫入位置(position74: * 2. 設置 TanStateTable 最大物理位置(可寫入位置)75: */76: private void recoverStateTableNormal() {77: final List<MapedFile> mapedFiles = this.tranStateTable.getMapedFiles();78: if (!mapedFiles.isEmpty()) {79: // 從倒數第三個文件開始恢復80: int index = mapedFiles.size() - 3;81: if (index < 0) {82: index = 0;83: }84: 85: int mapedFileSizeLogics = this.tranStateTable.getMapedFileSize();86: MapedFile mapedFile = mapedFiles.get(index);87: ByteBuffer byteBuffer = mapedFile.sliceByteBuffer();88: long processOffset = mapedFile.getFileFromOffset();89: long mapedFileOffset = 0;90: while (true) {91: for (int i = 0; i < mapedFileSizeLogics; i += TSStoreUnitSize) {92: 93: final long clOffset_read = byteBuffer.getLong();94: final int size_read = byteBuffer.getInt();95: final int timestamp_read = byteBuffer.getInt();96: final int groupHashCode_read = byteBuffer.getInt();97: final int state_read = byteBuffer.getInt();98: 99: boolean stateOK = false; 100: switch (state_read) { 101: case MessageSysFlag.TransactionPreparedType: 102: case MessageSysFlag.TransactionCommitType: 103: case MessageSysFlag.TransactionRollbackType: 104: stateOK = true; 105: break; 106: default: 107: break; 108: } 109: 110: // 說明當前存儲單元有效 111: if (clOffset_read >= 0 && size_read > 0 && stateOK) { 112: mapedFileOffset = i + TSStoreUnitSize; 113: } else { 114: log.info("recover current transaction state table file over, " + mapedFile.getFileName() + " " 115: + clOffset_read + " " + size_read + " " + timestamp_read); 116: break; 117: } 118: } 119: 120: // 走到文件末尾,切換至下一個文件 121: if (mapedFileOffset == mapedFileSizeLogics) { 122: index++; 123: if (index >= mapedFiles.size()) { // 循環while結束 124: log.info("recover last transaction state table file over, last maped file " + mapedFile.getFileName()); 125: break; 126: } else { // 切換下一個文件 127: mapedFile = mapedFiles.get(index); 128: byteBuffer = mapedFile.sliceByteBuffer(); 129: processOffset = mapedFile.getFileFromOffset(); 130: mapedFileOffset = 0; 131: log.info("recover next transaction state table file, " + mapedFile.getFileName()); 132: } 133: } else { 134: log.info("recover current transaction state table queue over " + mapedFile.getFileName() + " " + (processOffset + mapedFileOffset)); 135: break; 136: } 137: } 138: 139: // 清理多余 MappedFile,設置最后一個 MappedFile的寫入位置(position 140: processOffset += mapedFileOffset; 141: this.tranStateTable.truncateDirtyFiles(processOffset); 142: 143: // 設置 TanStateTable 最大物理位置(可寫入位置) 144: this.tranStateTableOffset.set(this.tranStateTable.getMaxOffset() / TSStoreUnitSize); 145: log.info("recover normal over, transaction state table max offset: {}", this.tranStateTableOffset.get()); 146: } 147: }3.1.1.5 補充
- 為什么 V3.1.5 開始,使用 數據庫 實現【事務狀態】的存儲?如下是來自官方文檔的說明,可能是一部分原因:
RocketMQ 這種實現事務方式,沒有通過 KV 存儲做,而是通過 Offset 方式,存在一個顯著缺陷,即通過 Offset 更改數據,會令系統的臟頁過多,需要特別關注。
3.1.2 官方V4.0.0:基于數據庫
倉庫地址:https://github.com/apache/incubator-rocketmq
官方V4.0.0 暫時未完全開源【事務消息回查】功能,So 我們需要進行一些猜想,可能不一定正確?。
?我們來對比【官方V3.1.4:基于文件】的實現。
- TransactionRecord :記錄每條【事務消息】。類似?TranStateTable。
| offset | offset | ? |
| producerGroupHash | producerGroup | ? |
| size | 無 | 非必須字段:【事務消息】回查時,使用 offset 讀取 CommitLog 獲得。 |
| timestamp | 無 | 非必須字段:【事務消息】回查時,使用 offset 讀取 CommitLog 獲得。 |
| state | 無 | 非必須字段: 事務開始,增加記錄;事務結束,刪除記錄。 |
另外,數據庫本身保證了數據存儲的可靠性,無需?TranRedoLog。
簡單手繪邏輯圖如下?:
3.2 Producer 接收【事務消息回查】
- 順序圖如下:
- 核心代碼如下:
?
轉載于:https://my.oschina.net/oosc/blog/1795277
總結
以上是生活随笔為你收集整理的RocketMQ 源码分析 事务消息的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何关闭linux端口
- 下一篇: DDL语言的学习