日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RocketMQ源码分析之RocketMQ事务消息实现原下篇(事务提交或回滚)

發(fā)布時間:2025/4/5 编程问答 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RocketMQ源码分析之RocketMQ事务消息实现原下篇(事务提交或回滚) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

本文將重點分析RocketMQ Broker如何處理事務(wù)消息提交、回滾命令,根據(jù)前面的介紹,其入口EndTransactionProcessor#proce***equest:

OperationResult result = new OperationResult();
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { // @1
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader); // @2
if (result.getResponseCode() == ResponseCode.SUCCESS) { // @3
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); // @4
if (res.getCode() == ResponseCode.SUCCESS) {
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage()); // @5
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp()); // @6
RemotingCommand sendResult = sendFinalMessage(msgInner); // @7
if (sendResult.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); // @8
}
return sendResult;
}
return res;br/>}
}
代碼@1:如果請求為提交事務(wù),進(jìn)入事務(wù)消息提交處理流程。
代碼@2:提交消息,別被這名字誤導(dǎo)了,該方法主要是根據(jù)commitLogOffset從commitlog文件中查找消息返回OperationResult實例:

private MessageExt prepareMessage :消息對象。
private int responseCode:查找結(jié)果。
private String responseRemark :錯誤提示。
代碼@3:如果成功查找到消息,則繼續(xù)處理,否則返回給客戶端,消息未找到錯誤信息。

代碼@4:驗證消息必要字段。
驗證消息的生產(chǎn)組與請求信息中的生產(chǎn)者組是否一致。
驗證消息的隊列偏移量(queueOffset)與請求信息中的偏移量是否一致。
驗證消息的commitLogOffset與請求信息中的CommitLogOffset是否一致。

代碼@5:調(diào)用endMessageTransaction方法,該方法主要的目的就是恢復(fù)事務(wù)消息的真實的主題、隊列,并設(shè)置事務(wù)ID。

代碼@6:設(shè)置消息的相關(guān)屬性,這一步應(yīng)該直接在endMessageTransaction中實現(xiàn)就好,統(tǒng)一恢復(fù)原消息的數(shù)量,特別關(guān)注的是取消了事務(wù)相關(guān)的系統(tǒng)標(biāo)記。

代碼@7:發(fā)送最終消息,其實現(xiàn)原理非常簡單,調(diào)用MessageStore將消息存儲在commitlog文件中,此時的消息,會被轉(zhuǎn)發(fā)到原消息主題對應(yīng)的消費(fèi)隊列,被消費(fèi)者消費(fèi)。

代碼@8:刪除預(yù)處理消息(prepare),其實是將消息存儲在主題為:RMQ_SYS_TRANS_OP_HALF_TOPIC的主題中,代表這些消息已經(jīng)被處理(提交或回滾)。

上述就是事務(wù)消息提交的流程,事務(wù)回滾類似,接下來大概分析一下事務(wù)消息回滾的流程。

EndTransactionProcessor#proce***equest
else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader); // @1
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); // @2
}
return res;br/>}
}
代碼@1:回滾消息,其實內(nèi)部就是根據(jù)commitlogOffset查找消息。
代碼@2:將消息存儲在RMQ_SYS_TRANS_OP_HALF_TOPIC中,代表該消息已被處理,與提交事務(wù)消息不同的是,提交事務(wù)消息會將消息恢復(fù)原主題與隊列,再次存儲在commitlog文件中。

事務(wù)消息在Broker服務(wù)端的提交回滾流程就介紹到這了。其核心實現(xiàn)就是根據(jù)commitlogOffset找到消息,如果是提交動作,就恢復(fù)原消息的主題與隊列,再次存入commitlog文件進(jìn)而轉(zhuǎn)到消息消費(fèi)隊列,供消費(fèi)者消費(fèi),然后將原預(yù)處理消息存入一個新的主題RMQ_SYS_TRANS_OP_HALF_TOPIC,代表該消息已被處理;回滾消息與提交事務(wù)消息不同的是,提交事務(wù)消息會將消息恢復(fù)原主題與隊列,再次存儲在commitlog文件中。

轉(zhuǎn)載于:https://blog.51cto.com/14031893/2340243

總結(jié)

以上是生活随笔為你收集整理的RocketMQ源码分析之RocketMQ事务消息实现原下篇(事务提交或回滚)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 亚洲色大成网站www www.97ai.com | 国产一级做a爰片久久毛片男男 | 国产成人综合欧美精品久久 | 久久精品视频日本 | 国产日韩欧美综合 | 综合久久久久综合 | www.com污| 日韩美女做爰高潮免费 | 无码少妇精品一区二区免费动态 | 免费一级淫片 | 亚洲精品无码永久在线观看 | 日韩精品视频免费在线观看 | 亚洲国产小视频 | 日韩中文字幕 | 国产精品人妻一区二区三区 | 免费av在线播放网址 | 美女流白浆视频 | 自拍偷拍欧美视频 | 69黄色片| 欧美精品v | 亚洲色图第一页 | 5个黑人躁我一个视频 | 中日韩在线观看视频 | 深夜视频一区二区三区 | 男人的天堂黄色 | 亚洲成人播放 | 欧美性猛交xxx乱大交3 | 激情五月色婷婷 | 黄色片视频免费在线观看 | 国产精品96久久久久久 | 国产尤物视频在线观看 | 国产精品国产三级国产三级人妇 | 看黄色一级 | 少妇无内裤下蹲露大唇视频 | 久久精品一区二区免费播放 | av在线手机版 | 男人的天堂一区 | 欧美黑人性受xxxx精品 | 免费91网站 | 国产激情一区二区三区四区 | 国产视频不卡一区 | 成人高清在线 | 久久久精品美女 | 国产成人免费看一级大黄 | 狠狠爱av| 日本免费在线视频 | 无码av免费毛片一区二区 | 亚洲成av人片在线观看无码 | 国产一线二线三线在线观看 | 婷婷色中文字幕 | 中文字幕一区二区在线播放 | 日本欧美视频 | 国产毛片自拍 | 在线成人 | 美女黄色片网站 | www.精品一区 | 欧美一区二区三区粗大 | 午夜电影天堂 | 91天堂在线观看 | 久久亚洲成人av | 在线观看欧美视频 | 澳门久久久 | 欧美a级黄色 | 久久91精品国产91久久小草 | 日本视频h | 91蝌蚪91密月 | 少妇一区二区三区 | 国产精品久久久久久久一区二区 | 亚洲一区 在线播放 | 黄色片a级片 | 美国美女黄色片 | 国产一区二区在线电影 | 精品久久久久久久中文字幕 | 亚洲国产丝袜 | 成人免费看av | 巨乳免费观看 | 性生活一级大片 | 国产伦精品一区二区三区高清版禁 | 91亚州| 无码人妻精品一区二区三 | 久久av影院| 99国产精品99久久久久久粉嫩 | 国产精品网友自拍 | 亚洲图区欧美 | 国产精品永久免费观看 | 免费看一级黄色片 | 亚洲丝袜在线观看 | 色综合av综合无码综合网站 | 亚洲精品久久久久久久久久久久久 | av第一页 | 99久久精品免费视频 | 午夜怡红院 | 97视频免费| 日日躁夜夜躁狠狠躁 | 欧美性视频播放 | 第一福利在线视频 | 亚洲伊人婷婷 | 小嫩嫩精品导航 | 亚洲天堂日韩av |