日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

rocketmq原理_彻底看懂RocketMQ事务实现原理

發布時間:2024/9/27 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 rocketmq原理_彻底看懂RocketMQ事务实现原理 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

面試中經常會問到比如RocketMQ的事務是如何實現的呢?學習框架,我們不僅要熟練使用,更要掌握設計及原理,才算熟悉一個框架。

1 RocketMQ 事務使用案例

public class CreateOrderService { @Autowired private OrderDao orderDao; @Autowired private ExecutorService executorService; private TransactionMQProducer producer; // 初始化transactionListener 和 producer @Init public void init() throws MQClientException { TransactionListener transactionListener = createTransactionListener(); producer = new TransactionMQProducer("myGroup"); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); } // 創建訂單服務的請求入口 @PUT @RequestMapping(...) public boolean createOrder(@RequestBody CreateOrderRequest request) { // 根據創建訂單請求創建一條消息 Message msg = createMessage(request); // 發送事務消息 SendResult sendResult = producer.sendMessageInTransaction(msg, request); // 返回:事務是否成功 return sendResult.getSendStatus() == SendStatus.SEND_OK; } private TransactionListener createTransactionListener() { return new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { CreateOrderRequest request = (CreateOrderRequest ) arg; try { // 執行本地事務創建訂單 orderDao.createOrderInDB(request); // 如果沒拋異常說明執行成功,提交事務消息 return LocalTransactionState.COMMIT_MESSAGE; } catch (Throwable t) { // 失敗則直接回滾事務消息 return LocalTransactionState.ROLLBACK_MESSAGE; } } // 反查本地事務 @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { // 從消息中獲得訂單ID String orderId = msg.getUserProperty("orderId"); // 去db查詢訂單號是否存在,若存在則提交事務 // 若不存在,可能是本地事務失敗了,也可能是本地事務還在執行,所以返回UNKNOW return orderDao.isOrderIdExistsInDB(orderId)? LocalTransactionState.COMMIT_MESSAGE: LocalTransactionState.UNKNOW; } }; }}

如上案例展示了一個訂單創建服務,即往db插一條訂單記錄,并發一條創建訂單的消息,要求寫db和發消息倆個操作在一個事務內執行。
首先在init()方法中初始化了transactionListener和發生RocketMQ事務消息的變量producer。

  • createOrder()
    真正提供創建訂單服務的方法,根據請求的參數創建一條消息,然后調用 producer發事務消息,并返回事務執行結果。

  • createTransactionListener()
    在init()方法中調用,構造實現RocketMQ的TransactionListener接口的匿名類,該接口需要實現如下兩個方法:

    • executeLocalTransaction:執行本地事務,在這里我們直接把訂單數據插入到數據庫中,并返回本地事務的執行結果。

    • checkLocalTransaction:反查本地事務,上述流程中是在db中查詢訂單號是否存在,若存在則提交事務,若不存在,可能本地事務失敗了,也可能本地事務還在執行,所以返回UNKNOW

這樣便使用RocketMQ的事務簡單實現了一個創建訂單的分布式事務。

2 RocketMQ事務消息實現原理

2.1 Pro端如何發事務消息?

DefaultMQProducerImpl#sendMessageInTransaction

public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { TransactionListener transactionListener = getCheckListener(); if (null == localTransactionExecuter && null == transactionListener) { throw new MQClientException("tranExecutor is null", null); } // ignore DelayTimeLevel parameter if (msg.getDelayTimeLevel() != 0) { MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL); } Validators.checkMessage(msg, this.defaultMQProducer); SendResult sendResult = null; // 給待發送消息添加屬性,表明是一個事務消息(即半消息) MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); // 像發送普通消息一樣,把這條事務消息發往Broker try { sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException("send message Exception", e); } LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null; switch (sendResult.getSendStatus()) { // 事務消息發送成功 case SEND_OK: { try { if (sendResult.getTransactionId() != null) { msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); } String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null != transactionId && !"".equals(transactionId)) { msg.setTransactionId(transactionId); } if (null != localTransactionExecuter) { localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); } else if (transactionListener != null) { log.debug("Used new transaction API"); // 開始執行本地事務 localTransactionState = transactionListener.executeLocalTransaction(msg, arg); } if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { log.info("executeLocalTransactionBranch return {}", localTransactionState); log.info(msg.toString()); } } catch (Throwable e) { log.info("executeLocalTransactionBranch exception", e); log.info(msg.toString()); localException = e; } } break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break; } // 事務過程的最后,給Broker發送提交或回滾事務的RPC請求。 try { this.endTransaction(sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); } TransactionSendResult transactionSendResult = new TransactionSendResult(); transactionSendResult.setSendStatus(sendResult.getSendStatus()); transactionSendResult.setMessageQueue(sendResult.getMessageQueue()); transactionSendResult.setMsgId(sendResult.getMsgId()); transactionSendResult.setQueueOffset(sendResult.getQueueOffset()); transactionSendResult.setTransactionId(sendResult.getTransactionId()); transactionSendResult.setLocalTransactionState(localTransactionState); return transactionSendResult;}

有事務反查機制作兜底,該RPC請求即使失敗或丟失,也不會影響事務最終的結果。最后構建事務消息的發送結果,并返回。

2.2 Broker端如何處理事務消息?

SendMessageProcessor#asyncSendMessage

跟進去看看真正處理半消息的業務邏輯,這段處理邏輯在類

TransactionalMessageBridge

  • putHalfMessage

  • parseHalfMessageInner

    RocketMQ并非將事務消息保存至消息中 client 指定的 queue,而是記錄了原始的 topic 和 queue 后,把這個事務消息保存在

設計思想
  • 特殊的內部 topic:RMQ_SYS_TRANS_HALF_TOPIC

  • 序號為 0 的 queue

這套 topic 和 queue 對消費者不可見,因此里面的消息也永遠不會被消費。這就保證在事務提交成功之前,這個事務消息對 Consumer 是消費不到的。

2.3 Broker端如何事務反查?

在Broker的TransactionalMessageCheckService服務中啟動了一個定時器,定時從事務消息queue中讀出所有待反查的事務消息。

AbstractTransactionalMessageCheckListener#resolveHalfMsg

  • 針對每個需要反查的半消息,Broker會給對應的Producer發一個要求執行事務狀態反查的RPC請求

  • AbstractTransactionalMessageCheckListener#sendCheckMessage

  • Broker2Client#checkProducerTransactionState根據RPC返回響應中的反查結果,來決定這個半消息是需要提交還是回滾,或者后續繼續來反查。

最后,提交或者回滾事務。首先把半消息標記為已處理

  • 如果是提交事務,就把半消息從半消息隊列中復制到該消息真正的topic和queue中

  • 如果是回滾事務,什么都不做

  • EndTransactionProcessor#processRequest

    最后結束該事務。

    3 總結

    • 整體實現流程

    RocketMQ是基于兩階段提交來實現的事務,把這些事務消息暫存在一個特殊的queue中,待事務提交后再移動到業務隊列中。最后,RocketMQ的事務適用于解決本地事務和發消息的數據一致性問題。

    參考

    • https://juejin.im/post/6844904193526857742

    點個在看支持我吧,轉發就更好了

    總結

    以上是生活随笔為你收集整理的rocketmq原理_彻底看懂RocketMQ事务实现原理的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 亚洲4438 | 国产精品11 | 国产午夜伦鲁鲁 | chien国产乱露脸对白 | 无人在线观看高清视频 | 狠狠摸狠狠操 | 国产精品无码免费播放 | 亚洲激情av在线 | 精品久久影视 | 性猛交xxxx乱大交3 | 日韩欧美黄色片 | 麻豆免费在线观看视频 | 天天操夜夜操狠狠操 | 亚洲毛片一区二区三区 | 亚洲成人aa | 一区二区三区四区亚洲 | 中出视频在线观看 | 波多野吉衣一区二区 | 国产对白羞辱绿帽vk | 国产中文字幕在线 | 色女生影院| 欧美视频一二三区 | 福利视频网站 | 激情噜噜 | 久久久久久国产精品三级玉女聊斋 | 国产又黄又猛的视频 | 天天综合入口 | 噜噜噜噜私人影院 | 红桃视频国产 | 中文一区二区在线 | 女人高潮特级毛片 | 老子影院午夜伦不卡大全 | 黄色激情四射 | 国产视频首页 | 高潮流白浆在线观看 | 色噜噜狠狠一区二区三区果冻 | 国产喷水视频 | 日本在线一本 | 开心六月婷婷 | 国产第9页 | 爱爱福利社 | 欧美激情一区二区三区四区 | 丝袜制服一区 | 亚洲色偷精品一区二区三区 | 久久蜜臀| 欧美一级性| 老妇裸体性激交老太视频 | 偷偷在线观看免费高清av | 日韩免费在线视频 | 精品一卡二卡三卡 | 男人的网址 | 国产二级视频 | 激情图片网站 | 久久成| 久久精品3| 色亚洲成人 | 久久yy| 天天操天天操天天操天天操 | 亚洲国产日韩一区二区 | 久色亚洲| 自拍色图| 亚洲嫩草影院 | 国产精品视频网 | 狠狠干91| 国精产品一区二区三区 | 使劲插视频 | 一级片成人 | 成人免费91 | 国产在线观看精品 | 操操日| 一级全黄裸体免费视频 | 优优色综合 | 男女涩涩视频 | 亚洲日本在线观看视频 | 一级不卡毛片 | 懂色av一区二区三区蜜臀 | 日本黄色播放器 | 狠狠cao日日穞夜夜穞av | 九色视频国产 | 国产极品在线观看 | 午夜影片 | 精品人妻aV中文字幕乱码色欲 | 国产免费自拍 | 奇米影视777在线观看 | 亚洲丝袜一区 | 国产香蕉在线 | 综合色婷婷 | 国产一区二区免费在线观看 | 欧美1234区 | 色涩网站 | 午夜激情一区 | 中文字幕人妻熟女在线 | 91精品国产综合久久精品图片 | 亚洲欧美另类中文字幕 | 在线一区av | 一区二区三区av夏目彩春 | 亚洲精品国产精品乱码不66 | 成人入口 | 一区二区福利 |