rocketmq原理_彻底看懂RocketMQ事务实现原理
面試中經常會問到比如RocketMQ的事務是如何實現的呢?學習框架,我們不僅要熟練使用,更要掌握設計及原理,才算熟悉一個框架。
1 RocketMQ 事務使用案例
public class CreateOrderService { @Autowired private OrderDao orderDao; @Autowired private ExecutorService executorService; private TransactionMQProducer producer; // 初始化transactionListener 和 producer @Init public void init() throws MQClientException { TransactionListener transactionListener = createTransactionListener(); producer = new TransactionMQProducer("myGroup"); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); } // 創建訂單服務的請求入口 @PUT @RequestMapping(...) public boolean createOrder(@RequestBody CreateOrderRequest request) { // 根據創建訂單請求創建一條消息 Message msg = createMessage(request); // 發送事務消息 SendResult sendResult = producer.sendMessageInTransaction(msg, request); // 返回:事務是否成功 return sendResult.getSendStatus() == SendStatus.SEND_OK; } private TransactionListener createTransactionListener() { return new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { CreateOrderRequest request = (CreateOrderRequest ) arg; try { // 執行本地事務創建訂單 orderDao.createOrderInDB(request); // 如果沒拋異常說明執行成功,提交事務消息 return LocalTransactionState.COMMIT_MESSAGE; } catch (Throwable t) { // 失敗則直接回滾事務消息 return LocalTransactionState.ROLLBACK_MESSAGE; } } // 反查本地事務 @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { // 從消息中獲得訂單ID String orderId = msg.getUserProperty("orderId"); // 去db查詢訂單號是否存在,若存在則提交事務 // 若不存在,可能是本地事務失敗了,也可能是本地事務還在執行,所以返回UNKNOW return orderDao.isOrderIdExistsInDB(orderId)? LocalTransactionState.COMMIT_MESSAGE: LocalTransactionState.UNKNOW; } }; }}如上案例展示了一個訂單創建服務,即往db插一條訂單記錄,并發一條創建訂單的消息,要求寫db和發消息倆個操作在一個事務內執行。
首先在init()方法中初始化了transactionListener和發生RocketMQ事務消息的變量producer。
createOrder()
真正提供創建訂單服務的方法,根據請求的參數創建一條消息,然后調用 producer發事務消息,并返回事務執行結果。createTransactionListener()
在init()方法中調用,構造實現RocketMQ的TransactionListener接口的匿名類,該接口需要實現如下兩個方法:executeLocalTransaction:執行本地事務,在這里我們直接把訂單數據插入到數據庫中,并返回本地事務的執行結果。
checkLocalTransaction:反查本地事務,上述流程中是在db中查詢訂單號是否存在,若存在則提交事務,若不存在,可能本地事務失敗了,也可能本地事務還在執行,所以返回UNKNOW
這樣便使用RocketMQ的事務簡單實現了一個創建訂單的分布式事務。
2 RocketMQ事務消息實現原理
2.1 Pro端如何發事務消息?
DefaultMQProducerImpl#sendMessageInTransaction
public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { TransactionListener transactionListener = getCheckListener(); if (null == localTransactionExecuter && null == transactionListener) { throw new MQClientException("tranExecutor is null", null); } // ignore DelayTimeLevel parameter if (msg.getDelayTimeLevel() != 0) { MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL); } Validators.checkMessage(msg, this.defaultMQProducer); SendResult sendResult = null; // 給待發送消息添加屬性,表明是一個事務消息(即半消息) MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); // 像發送普通消息一樣,把這條事務消息發往Broker try { sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException("send message Exception", e); } LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null; switch (sendResult.getSendStatus()) { // 事務消息發送成功 case SEND_OK: { try { if (sendResult.getTransactionId() != null) { msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); } String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null != transactionId && !"".equals(transactionId)) { msg.setTransactionId(transactionId); } if (null != localTransactionExecuter) { localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); } else if (transactionListener != null) { log.debug("Used new transaction API"); // 開始執行本地事務 localTransactionState = transactionListener.executeLocalTransaction(msg, arg); } if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { log.info("executeLocalTransactionBranch return {}", localTransactionState); log.info(msg.toString()); } } catch (Throwable e) { log.info("executeLocalTransactionBranch exception", e); log.info(msg.toString()); localException = e; } } break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break; } // 事務過程的最后,給Broker發送提交或回滾事務的RPC請求。 try { this.endTransaction(sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); } TransactionSendResult transactionSendResult = new TransactionSendResult(); transactionSendResult.setSendStatus(sendResult.getSendStatus()); transactionSendResult.setMessageQueue(sendResult.getMessageQueue()); transactionSendResult.setMsgId(sendResult.getMsgId()); transactionSendResult.setQueueOffset(sendResult.getQueueOffset()); transactionSendResult.setTransactionId(sendResult.getTransactionId()); transactionSendResult.setLocalTransactionState(localTransactionState); return transactionSendResult;}有事務反查機制作兜底,該RPC請求即使失敗或丟失,也不會影響事務最終的結果。最后構建事務消息的發送結果,并返回。
2.2 Broker端如何處理事務消息?
SendMessageProcessor#asyncSendMessage
跟進去看看真正處理半消息的業務邏輯,這段處理邏輯在類
TransactionalMessageBridge
putHalfMessage
parseHalfMessageInner
RocketMQ并非將事務消息保存至消息中 client 指定的 queue,而是記錄了原始的 topic 和 queue 后,把這個事務消息保存在
特殊的內部 topic:RMQ_SYS_TRANS_HALF_TOPIC
序號為 0 的 queue
這套 topic 和 queue 對消費者不可見,因此里面的消息也永遠不會被消費。這就保證在事務提交成功之前,這個事務消息對 Consumer 是消費不到的。
2.3 Broker端如何事務反查?
在Broker的TransactionalMessageCheckService服務中啟動了一個定時器,定時從事務消息queue中讀出所有待反查的事務消息。
AbstractTransactionalMessageCheckListener#resolveHalfMsg
針對每個需要反查的半消息,Broker會給對應的Producer發一個要求執行事務狀態反查的RPC請求
AbstractTransactionalMessageCheckListener#sendCheckMessage
Broker2Client#checkProducerTransactionState根據RPC返回響應中的反查結果,來決定這個半消息是需要提交還是回滾,或者后續繼續來反查。
最后,提交或者回滾事務。首先把半消息標記為已處理
如果是提交事務,就把半消息從半消息隊列中復制到該消息真正的topic和queue中
如果是回滾事務,什么都不做
EndTransactionProcessor#processRequest
最后結束該事務。
3 總結
整體實現流程
RocketMQ是基于兩階段提交來實現的事務,把這些事務消息暫存在一個特殊的queue中,待事務提交后再移動到業務隊列中。最后,RocketMQ的事務適用于解決本地事務和發消息的數據一致性問題。
參考
https://juejin.im/post/6844904193526857742
總結
以上是生活随笔為你收集整理的rocketmq原理_彻底看懂RocketMQ事务实现原理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: android 16 登陆,那些年我们一
- 下一篇: c++ 指针拼接字符串_字符串拼接+和c