日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

两个service事务统一_RocketMQ进阶 - 事务消息

發布時間:2025/3/21 编程问答 20 豆豆
生活随笔 收集整理的這篇文章主要介紹了 两个service事务统一_RocketMQ进阶 - 事务消息 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

分布式消息選型的時候是否支持事務消息是一個很重要的考量點,而目前只有RocketMQ對事務消息支持的最好。今天我們來嘮嘮如何實現RocketMQ的事務消息!

Apache RocketMQ在4.3.0版中已經支持分布式事務消息,這里RocketMQ采用了2PC的思想來實現了提交事務消息,同時增加一個補償邏輯來處理二階段超時或者失敗的消息,如下圖所示。

RocketMQ事務流程概要

RocketMQ實現事務消息主要分為兩個階段:正常事務的發送及提交、事務信息的補償流程
整體流程為:

  • 正常事務發送與提交階段
  • 生產者發送一個半消息給MQServer(半消息是指消費者暫時不能消費的消息)
  • 服務端響應消息寫入結果,半消息發送成功
  • 開始執行本地事務
  • 根據本地事務的執行狀態執行Commit或者Rollback操作
    • 事務信息的補償流程
  • 如果MQServer長時間沒收到本地事務的執行狀態會向生產者發起一個確認回查的操作請求
  • 生產者收到確認回查請求后,檢查本地事務的執行狀態
  • 根據檢查后的結果執行Commit或者Rollback操作
    補償階段主要是用于解決生產者在發送Commit或者Rollback操作時發生超時或失敗的情況。
  • RocketMQ事務流程關鍵

  • 事務消息在一階段對用戶不可見
    事務消息相對普通消息最大的特點就是一階段發送的消息對用戶是不可見的,也就是說消費者不能直接消費。這里RocketMQ的實現方法是原消息的主題與消息消費隊列,然后把主題改成 RMQ_SYS_TRANS_HALF_TOPIC ,這樣由于消費者沒有訂閱這個主題,所以不會被消費。
  • 如何處理第二階段的失敗消息?在本地事務執行完成后會向MQServer發送Commit或Rollback操作,此時如果在發送消息的時候生產者出故障了,那么要保證這條消息最終被消費,MQServer會像服務端發送回查請求,確認本地事務的執行狀態。當然了rocketmq并不會無休止的的信息事務狀態回查,默認回查15次,如果15次回查還是無法得知事務狀態,RocketMQ默認回滾該消息。
  • 消息狀態
    事務消息有三種狀態:
    • TransactionStatus.CommitTransaction:提交事務消息,消費者可以消費此消息
    • TransactionStatus.RollbackTransaction:回滾事務,它代表該消息將被刪除,不允許被消費。
    • TransactionStatus.Unknown :中間狀態,它代表需要檢查消息隊列來確定狀態。

    代碼實現

    首先假設我們有這樣一個需求:

    用戶請求訂單微服務 order-service 接口刪除訂單(退貨),刪除訂單后需要發送消息給用戶服務 account-service,用戶微服務收到消息后會給用戶賬戶增加余額。

    這個需求跟錢相關,肯定要保證消息的事務性,接下來我們根據上面的原理實現整個流程。

    基礎配置

    生產者order-servcie和account-service都要引入RocketMQ相關依賴,增加RocketMQ的相關配置

    • 引入組件
    <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId> </dependency>
    • 添加配置
    # within rocketmq rocketmq:name-server: xxx.xx.x.xx:9876; xxx.xx.x.xx:9876producer:group: cloud-group

    發送半消息

    order-service在執行刪除訂單操作時發送一條半消息給MQServer,發送半消息主要是使用 rocketMQTemplate.sendMessageInTransaction() 方法,發送事務消息。

    @Override public void delete(String orderNo) {Order order = orderMapper.selectByNo(orderNo);//如果訂單存在且狀態為有效,進行業務處理if (order != null && CloudConstant.VALID_STATUS.equals(order.getStatus())) {String transactionId = UUID.randomUUID().toString();//如果可以刪除訂單則發送消息給rocketmq,讓用戶中心消費消息rocketMQTemplate.sendMessageInTransaction("add-amount",MessageBuilder.withPayload(UserAddMoneyDTO.builder().userCode(order.getAccountCode()).amount(order.getAmount()).build()).setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId).setHeader("order_id",order.getId()).build(),order);} }

    首先先校驗一下訂單狀態,然后發送消息給MQServer,這個邏輯大家都看得懂,主要是關注 sendMessageInTransaction() 方法,源碼如下:

    public TransactionSendResult sendMessageInTransaction(String destination, Message<?> message, Object arg) throws MessagingException {try {if (((TransactionMQProducer)this.producer).getTransactionListener() == null) {throw new IllegalStateException("The rocketMQTemplate does not exist TransactionListener");} else {org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);return this.producer.sendMessageInTransaction(rocketMsg, arg);}} catch (MQClientException var5) {throw RocketMQUtil.convert(var5);} }

    該方法有三個參數:

    • destination:目的地(主題),這里發送給 add-amount 這個主題
    • message:發送給消費者的消息體,需要使用 MessageBuilder.withPayload() 來構建消息
    • arg:參數

    注意,這里我們生成了一個transactionId,并放在header中跟消息一起發送(這里實際也可以構造成一個對象,放在arg里進行發送),作用后面再講!

    執行本地事務與回查

    MQServer收到半消息后會告訴生產者order-service確認收到半消息,這時候order-service需要執行本地事務,執行完本地事務后再告訴MQServer本地事務的執行狀態,確認消息究竟是Commit還是Rollback。如果在告訴MQServer本地執行狀態的時候出異常了還需要讓MQServer能夠回查到,怎么實現這一些列操作呢?

    RocketMQ提供了 RocketMQLocalTransactionListener 接口,本地事務監聽器,這個接口類的實現如下:

    第一個方法 executeLocalTransaction 為執行本地事務;第二個方法 checkLocalTransaction 為檢查本地事務的執行狀態,也就是回查動作。有了這個接口類我們的執行邏輯清楚了,但是還有個問題:本地事務已經執行完成了,怎么去回查本地事務的執行結果呢?

    我們可以在執行本地事務的時候同時生成一個事務日志,讓本地事務與日志事務在同一個方法中,同時添加 @Transactional 注解,保證兩個操作事務是一個原子操作。這樣如果事務日志表中有這個本地事務的信息,那就代表本地事務執行成功,需要Commit,相反如果沒有對應的事務日志,則表示沒執行成功,需要Rollback

    思路既然理順了,咱們就開擼。

    • 首先創建一個日志表

    很簡單的三個字段,主要是這個事務id,需要根據這個事務id回查事務,還記得我們在發送半消息時生成的事務id嗎,就是干這個用的!

    • 在生產者編寫方法實現 RocketMQLocalTransactionListener
    @Slf4j @RocketMQTransactionListener @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class AddUserAmountListener implements RocketMQLocalTransactionListener {private final OrderService orderService;private final RocketMqTransactionLogMapper rocketMqTransactionLogMapper;/*** 執行本地事務*/@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {log.info("執行本地事務");MessageHeaders headers = message.getHeaders();//獲取事務IDString transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);Integer orderId = Integer.valueOf((String)headers.get("order_id"));log.info("transactionId is {}, orderId is {}",transactionId,orderId);try{//執行本地事務,并記錄日志orderService.changeStatuswithRocketMqLog(orderId, CloudConstant.INVALID_STATUS,transactionId);//執行成功,可以提交事務return RocketMQLocalTransactionState.COMMIT;}catch (Exception e){return RocketMQLocalTransactionState.ROLLBACK;}}/*** 本地事務的檢查,檢查本地事務是否成功*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {MessageHeaders headers = message.getHeaders();//獲取事務IDString transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);log.info("檢查本地事務,事務ID:{}",transactionId);//根據事務id從日志表檢索QueryWrapper<RocketmqTransactionLog> queryWrapper = new QueryWrapper<>();queryWrapper.eq("transaction_id",transactionId);RocketmqTransactionLog rocketmqTransactionLog = rocketMqTransactionLogMapper.selectOne(queryWrapper);if(null != rocketmqTransactionLog){return RocketMQLocalTransactionState.COMMIT;}return RocketMQLocalTransactionState.ROLLBACK;} }
    • 執行本地事務的方法
    @Transactional(rollbackFor = RuntimeException.class) @Override public void changeStatuswithRocketMqLog(Integer id,String status,String transactionId){//將訂單狀態置位無效orderMapper.changeStatus(id,status);//插入事務表rocketMqTransactionLogMapper.insert(RocketmqTransactionLog.builder().transactionId(transactionId).log("執行刪除訂單操作").build()); }

    這一塊的代碼邏輯都是在生產端,即Order-Server,大家不要搞錯了

    消費消息

    Rollback的消息MQServer會給我們處理,我們只要關注Commit狀態時消費端可以正常消費即可。在 account-service監聽消息,如果收到消息則給用戶賬戶增加余額。

    @Slf4j @Service @RocketMQMessageListener(topic = "add-amount",consumerGroup = "cloud-group") @RequiredArgsConstructor(onConstructor = @__(@Autowired) ) public class AddUserAmountListener implements RocketMQListener<UserAddMoneyDTO> {private final AccountMapper accountMapper;/*** 收到消息的業務邏輯*/@Overridepublic void onMessage(UserAddMoneyDTO userAddMoneyDTO) {log.info("received message: {}",userAddMoneyDTO);accountMapper.increaseAmount(userAddMoneyDTO.getUserCode(),userAddMoneyDTO.getAmount());log.info("add money success");} }

    測試

    訂單表有這樣一條記錄,用戶為jianzh5,amount為200

    用戶表的記錄,執行完成后jianzh5的賬戶應該變成250

    • 調用刪除訂單接口,刪除訂單

    • 發送半消息

    • 執行本地事務,并生成事務日志

    • 模擬異常情況
      在發送Commit消息的時候我們用命令殺掉進程 taskkill /pid 19748 -t -f,模擬異常!

    • 重新啟動order-service,查看是否會執行回查動作

    MQServer進行回查,檢查事務日志,判斷是否可以提交事務

    • 消費者消費事務消息,保證事務的一致性

    小結

    使用RocketMQ實現事務消息的過程還是很復雜的,需要好好理解開頭的那張圖,只有理解了事務消息的交互過程才能編寫相應的代碼!

    如果本文對你有幫助,別忘記給我個三連:點贊,轉發,評論。咱們下期見!

    收藏 等于白嫖,點贊 才是真情!

    這里為大家準備了一份小小的禮物,關注公眾號 JAVA日知錄,輸入如下代碼,即可獲得百度網盤地址,無套路領取!
    001:《程序員必讀書籍》
    002:《從無到有搭建中小型互聯網公司后臺服務架構與運維架構》
    003:《互聯網企業高并發解決方案》
    004:《互聯網架構教學視頻》
    006:《SpringBoot實現點餐系統》
    007:《SpringSecurity實戰視頻》
    008:《Hadoop實戰教學視頻》
    009:《騰訊2019Techo開發者大會PPT》

    010: 微信交流群

    http://weixin.qq.com/r/_ElxaTTECGl3rXAK9xzq (二維碼自動識別)

    總結

    以上是生活随笔為你收集整理的两个service事务统一_RocketMQ进阶 - 事务消息的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。