日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

SpringBoot2.x Nacos RocketMQ 事务消息

發布時間:2024/9/27 javascript 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 SpringBoot2.x Nacos RocketMQ 事务消息 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

需求背景:
現在有內容中心(content-center)和 用戶中心(user-center)2個微服務,請求內容中心,發送消息給用戶中心,完成為指定用戶添加積分操作。

文章目錄

          • 一、準備工作
            • 1. 版本對照
            • 2. 下載啟動RocketMQ
            • 3. 引入maven依賴
          • 二、內容中心(服務端)
            • 2.1. 表結構設計
            • 2.2. 配置MQ信息
            • 2.3. 控制層
            • 2.4. service層
            • 2.5. RocketMQ 事務消息監聽
          • 三、用戶中心(客戶端)
            • 3.1. 依賴
            • 3.2.配置
            • 3.3. 消息監聽
            • 開源項目:

一、準備工作
1. 版本對照
RocketMQ 版本RocketMQ控制臺版本RocketMQ starter版本
RocketMQ 4.8.0支持RocketMQ 4.8.02.2.0
2. 下載啟動RocketMQ

linux 環境 RocketMQ 4.8.0 安裝、部署控制臺
https://blog.csdn.net/weixin_40816738/article/details/116269833

windows下RocketMQ下載、安裝、部署、控制臺
https://blog.csdn.net/weixin_40816738/article/details/115734482

3. 引入maven依賴
<!--集成rocketmq--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version></dependency>
二、內容中心(服務端)

消息發送端代碼編寫

2.1. 表結構設計

share分享表和rocketmq_transaction_logRocketMQ事務日志表2張表,
share

CREATE TABLE IF NOT EXISTS `share` (`id` INT NOT NULL AUTO_INCREMENT COMMENT 'id',`user_id` INT NOT NULL DEFAULT 0 COMMENT '發布人id',`title` VARCHAR(80) NOT NULL DEFAULT '' COMMENT '標題',`create_time` DATETIME NOT NULL COMMENT '創建時間',`update_time` DATETIME NOT NULL COMMENT '修改時間',`is_original` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否原創 0:否 1:是',`author` VARCHAR(45) NOT NULL DEFAULT '' COMMENT '作者',`cover` VARCHAR(256) NOT NULL DEFAULT '' COMMENT '封面',`summary` VARCHAR(256) NOT NULL DEFAULT '' COMMENT '概要信息',`price` INT NOT NULL DEFAULT 0 COMMENT '價格(需要的積分)',`download_url` VARCHAR(256) NOT NULL DEFAULT '' COMMENT '下載地址',`buy_count` INT NOT NULL DEFAULT 0 COMMENT '下載數 ',`show_flag` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否顯示 0:否 1:是',`audit_status` VARCHAR(10) NOT NULL DEFAULT 0 COMMENT '審核狀態 NOT_YET: 待審核 PASSED:審核通過 REJECTED:審核不通過',`reason` VARCHAR(200) NOT NULL DEFAULT '' COMMENT '審核不通過原因',PRIMARY KEY (`id`)) ENGINE = InnoDB COMMENT = '分享表';

rocketmq_transaction_logRocketMQ

-- ----------------------------------------------------- -- Table `rocketmq_transaction_log` -- ----------------------------------------------------- create table rocketmq_transaction_log (id int auto_increment comment 'id'primary key,transaction_Id varchar(45) not null comment '事務id',log varchar(45) not null comment '日志' )comment 'RocketMQ事務日志表';

具體詳情:見項目源碼

2.2. 配置MQ信息
  • 項目內部yml配置
server:port: 8003 spring:application:# 應用名稱name: ly-rockketmqprofiles:# 環境配置active: devcloud:nacos:discovery:# 服務注冊地址server-addr: nacos.server.com:8848config:# 配置中心地址server-addr: nacos.server.com:8848# 配置文件格式file-extension: yml# 共享配置shared-configs:- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
  • nacos服務端配置
# MQ name-server地址 rocketmq:name-server: 127.0.0.1:9876producer:#必須指定groupgroup: test-group
2.3. 控制層
package com.gblfy.lyrocketmq.controller;import com.gblfy.common.dto.ShareAuditDTO; import com.gblfy.lyrocketmq.entity.Share; import com.gblfy.lyrocketmq.service.ShareService; import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*;@RestController @RequestMapping("/admin/shares") @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class ShareAdminController {private final ShareService shareService;@PutMapping("/audit/{id}")public Share auditById(@PathVariable Integer id, @RequestBody ShareAuditDTO auditDTO) {//TODO 認證授權return this.shareService.auditById(id, auditDTO);} }
2.4. service層
package com.gblfy.lyrocketmq.service;import com.gblfy.api.RemoteProductService; import com.gblfy.common.dto.ShareAuditDTO; import com.gblfy.common.dto.ShareDTO; import com.gblfy.common.dto.UserAddBonusMsgDTO; import com.gblfy.common.dto.UserDTO; import com.gblfy.common.enums.AuditStatusEnum; import com.gblfy.lyrocketmq.entity.RocketmqTransactionLog; import com.gblfy.lyrocketmq.entity.Share; import com.gblfy.lyrocketmq.mapper.RocketmqTransactionLogMapper; import com.gblfy.lyrocketmq.mapper.ShareMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional;import java.util.Objects; import java.util.UUID;@Slf4j @Service @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class ShareService {private final ShareMapper shareMapper;private final RemoteProductService userCenterFeignClient;private final RocketMQTemplate rocketMQTemplate;private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;public ShareDTO findById(Integer id) {Share share = this.shareMapper.selectByPrimaryKey(id);Integer userId = share.getUserId();UserDTO userDTO = this.userCenterFeignClient.findById(userId);ShareDTO shareDTO = new ShareDTO();BeanUtils.copyProperties(share, shareDTO);//設置發布人shareDTO.setWxNickname(userDTO.getWxNickname());return shareDTO;}public Share auditById(Integer id, ShareAuditDTO auditDTO) {// 1. 查詢share是否存在,不存在或者當前的audit_status != NOT_YET,那么拋異常Share share = this.shareMapper.selectByPrimaryKey(id);if (share == null) {throw new IllegalArgumentException("參數非法!該分享不存在!");}if (!Objects.equals("NOT_YET", share.getAuditStatus())) {throw new IllegalArgumentException("參數非法!該分享已審核通過或審核不通過!");}//----------------------------------------發送半消息----------------------------------------// 3. 如果是PASS,那么發送消息給rocketmq,讓用戶中心去消費,并為發布人添加積分if (AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())) {//消息idString transactionId = UUID.randomUUID().toString();this.rocketMQTemplate.sendMessageInTransaction("tx-add-bonus-group",MessageBuilder.withPayload(UserAddBonusMsgDTO.builder().userId(share.getUserId()).bonus(50).build()// Header有妙用).setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId).setHeader("share_id", id).build(),//arg有大用處auditDTO);} else {this.auditByIdInDB(id, auditDTO);}return share;}/*** 審批** @param id* @param auditDTO*/public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO) {Share share = Share.builder().id(id).auditStatus(auditDTO.getAuditStatusEnum().toString()).reason(auditDTO.getReason()).build();this.shareMapper.updateByPrimaryKeySelective(share);}@Transactional(rollbackFor = Exception.class)public void auditByIdWithRoketMqlog(Integer id, ShareAuditDTO auditDTO, String transactionId) {this.auditByIdInDB(id, auditDTO);this.rocketmqTransactionLogMapper.insertSelective(RocketmqTransactionLog.builder().transactionId(transactionId).log("審核分享..").build());} }
2.5. RocketMQ 事務消息監聽
package com.gblfy.lyrocketmq.listener;import com.gblfy.common.dto.ShareAuditDTO; import com.gblfy.lyrocketmq.entity.RocketmqTransactionLog; import com.gblfy.lyrocketmq.mapper.RocketmqTransactionLogMapper; import com.gblfy.lyrocketmq.service.ShareService; import lombok.RequiredArgsConstructor; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders;@RocketMQTransactionListener @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {private final ShareService shareService;private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;/*** 執行本地事務** @param msg 消息header信息* @param arg 消息體* @return*/@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {MessageHeaders headers = msg.getHeaders();String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);Integer share_id = Integer.valueOf((String) headers.get("share_id"));try {this.shareService.auditByIdWithRoketMqlog(share_id, (ShareAuditDTO) arg, transactionId);return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {e.printStackTrace();return RocketMQLocalTransactionState.ROLLBACK;}}/*** 本地事務的檢查,檢查本地事務是否成功** @param msg* @return*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {MessageHeaders headers = msg.getHeaders();String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);RocketmqTransactionLog rocketmqTransactionLog = this.rocketmqTransactionLogMapper.selectOne(RocketmqTransactionLog.builder().transactionId(transactionId).build());if (rocketmqTransactionLog != null) {return RocketMQLocalTransactionState.COMMIT;}return RocketMQLocalTransactionState.ROLLBACK;} }

詳細見源碼:本文底部

三、用戶中心(客戶端)

消息消費端代碼編寫

3.1. 依賴
<!--集成rocketmq--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version></dependency>
3.2.配置
  • 項目內部yml配置
server:port: 9000 spring:application:# 應用名稱name: ly-productprofiles:# 環境配置active: devcloud:nacos:discovery:# 服務注冊地址server-addr: nacos.server.com:8848config:# 配置中心地址server-addr: nacos.server.com:8848# 配置文件格式file-extension: yml# 共享配置shared-configs:- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
  • nacos服務端配置
rocketmq:name-server: 127.0.0.1:9876
3.3. 消息監聽
package com.gblfy.product.listenner;import com.gblfy.common.dto.UserAddBonusMsgDTO; import com.gblfy.product.entity.BonusEventLog; import com.gblfy.product.entity.User; import com.gblfy.product.mapper.BonusEventLogMapper; import com.gblfy.product.mapper.UserMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;import java.util.Date;@Slf4j @Service @RequiredArgsConstructor(onConstructor = @__(@Autowired)) @RocketMQMessageListener(topic = "tx-add-bonus-group", consumerGroup = "consumer-group") public class AddBonusListener implements RocketMQListener<UserAddBonusMsgDTO> {private final UserMapper userMapper;private final BonusEventLogMapper bonusEventLogMapper;@Overridepublic void onMessage(UserAddBonusMsgDTO message) {// 1. 為用戶添加積分Integer userId = message.getUserId();Integer bonus = message.getBonus();User user = this.userMapper.selectByPrimaryKey(userId);user.setBonus(user.getBonus() + bonus);this.userMapper.updateByPrimaryKeySelective(user);// 2.記錄日志到bonus_event_log表中this.bonusEventLogMapper.insert(BonusEventLog.builder().userId(userId).value(bonus).event("CONTRIBUTE").createTime(new Date()).description("投稿加積分...").build());log.info("積分添加完畢...");} }
開源項目:

https://gitee.com/gb_90/micro-service-parent

總結

以上是生活随笔為你收集整理的SpringBoot2.x Nacos RocketMQ 事务消息的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。