日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

rocketmq 消费者不能调用其他服务_Spring Cloud Alibaba RocketMQ - 构建异步通信的微服务...

發布時間:2024/9/19 javascript 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 rocketmq 消费者不能调用其他服务_Spring Cloud Alibaba RocketMQ - 构建异步通信的微服务... 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

引入MQ后的架構演進

MQ的選擇

消息隊列對比參照表:

RocketMQ vs. ActiveMQ vs. Kafka:

參考至:

CentOS7上搭建RocketMQ

環境要求:

CentOS 7.2

64位JDK1.8+

4G+的可用磁盤空間

1、下載RocketMQ的二進制包,我這里使用的是4.5.1版本,下載地址如下:

使用wget命令下載:

[root@study-01 ~]# cd /usr/local/src

[root@study-01 /usr/local/src]# wget http://mirror.bit.edu.cn/apache/rocketmq/4.5.1/rocketmq-all-4.5.1-bin-release.zip

2、解壓下載好的壓縮包,并移動到合適的目錄下:

[root@study-01 /usr/local/src]# unzip rocketmq-all-4.5.1-bin-release.zip

[root@study-01 /usr/local/src]# mv rocketmq-all-4.5.1-bin-release /usr/local/rocketmq-4.5.1

注:若沒有安裝unzip命令則使用如下命令安裝:

yum install -y unzip

3、進入rocketmq的根目錄并查看是否包含如下目錄及文件:

[root@study-01 /usr/local/src]# cd /usr/local/rocketmq-4.5.1

[root@study-01 /usr/local/rocketmq-4.5.1]# ls

benchmark bin conf lib LICENSE NOTICE README.md

4、沒問題后,使用如下命令啟動Name Server:

[root@study-01 /usr/local/rocketmq-4.5.1]# nohup sh bin/mqnamesrv &

[1] 2448

[root@study-01 /usr/local/rocketmq-4.5.1]#

5、查看默認的9876端口是否被監聽,以驗證Name Server是否啟動成功:

[root@study-01 /usr/local/rocketmq-4.5.1]# netstat -lntp |grep java

tcp6 0 0 :::9876 :::* LISTEN 2454/java

[root@study-01 /usr/local/rocketmq-4.5.1]#

6、啟動Broker:

[root@study-01 /usr/local/rocketmq-4.5.1]# nohup sh bin/mqbroker -n localhost:9876 &

[2] 2485

[root@study-01 /usr/local/rocketmq-4.5.1]#

7、驗證Broker是否啟動成功,如果啟動成功,能看到類似如下的日志::

[root@study-01 /usr/local/rocketmq-4.5.1]# cat ~/logs/rocketmqlogs/broker.log |grep "boot success"

2019-08-04 01:27:38 INFO main - The broker[study-01, 192.168.190.129:10911] boot success. serializeType=JSON and name server is localhost:9876

[root@study-01 /usr/local/rocketmq-4.5.1]#

若想停止Name Server和Broker,則依次執行以下兩條命令即可:

[root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/mqshutdown broker

The mqbroker(2492) is running...

Send shutdown request to mqbroker(2492) OK # 輸出該信息說明停止成功

[root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/mqshutdown namesrv

The mqnamesrv(2454) is running...

Send shutdown request to mqnamesrv(2454) OK # 輸出該信息說明停止成功

[2]+ 退出 143 nohup sh bin/mqbroker -n localhost:9876

[root@study-01 /usr/local/rocketmq-4.5.1]#

驗證RocketMQ功能是否正常

1、驗證生產消息正常,執行如下命令:

[root@study-01 /usr/local/rocketmq-4.5.1]# export NAMESRV_ADDR=localhost:9876

[root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

正常的情況下,會看到一堆的類似于如下的輸出,這是生產消息后成功的result:

SendResult [sendStatus=SEND_OK, msgId=C0A8BE810A690D7163610FCC253B03E7, offsetMsgId=C0A8BE8100002A9F000000000002BDFE, messageQueue=MessageQueue [topic=TopicTest, brokerName=study-01, queueId=3], queueOffset=249]

2、驗證消費消息正常,執行如下命令:

[root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

正常的情況下,會看到一堆的類似于如下的輸出,這是消費的消息內容:

ConsumeMessageThread_6 Receive New Messages: [MessageExt [queueId=3, storeSize=180, queueOffset=242, sysFlag=0, bornTimestamp=1564853837073, bornHost=/192.168.190.129:34708, storeTimestamp=1564853837074, storeHost=/192.168.190.129:10911, msgId=C0A8BE8100002A9F000000000002AA4E, commitLogOffset=174670, bodyCRC=911284903, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1564854006859, UNIQ_KEY=C0A8BE810A690D7163610FCC251103CB, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 55, 49], transactionId='null'}]]

搭建RocketMQ控制臺

RocketMQ官方提供了一個基于Spring Boot開發的可視化控制臺,可以方便我們查看RocketMQ的運行情況以及提升運維效率。所以本小節將介紹一下如何搭建搭建RocketMQ的控制臺,由于我們使用的RocketMQ版本是4.5.1,所以需要對控制臺的源碼進行一些改動以適配RocketMQ的4.5.1版本。

1、首先需要下載源碼,有兩種方式,一是使用git克隆代碼倉庫,二是直接下載rocketmq-externals的zip包,我這里使用git方式,執行如下命令:

git clone https://github.com/apache/rocketmq-externals.git

2、修改控制臺代碼,使用IDE打開rocketmq-console項目,如下圖所示:

2.1、修改項目中的application.properties配置文件,我這里主要是修改了監聽端口和Name Server的連接地址,至于其他配置項有需要的話可按照說明自行修改:

# console的監聽端口,默認是8080

server.port=8011

# Name Server的連接地址;非必須,可以在啟動了console后,在控制臺導航欄 - 運維 - NameSvrAddrList一欄設置

rocketmq.config.namesrvAddr=192.168.190.129:9876

2.2、修改依賴,由于console項目默認使用的rocketmq版本是4.4.0,與我們這里使用的是4.5.1不完全兼容,所以需要修改一下依賴版本,找到這一行:

4.4.0

修改為:

4.5.1

2.3、修改代碼,由于修改了rocketmq的版本,會導致org.apache.rocketmq.console.service.impl.MessageServiceImpl#queryMessageByTopic方法編譯報錯,所以需要改動一下此處代碼 ,將:

@Override

public List queryMessageByTopic(String topic, final long begin, final long end) {

DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, null);

...

修改為:

@Override

public List queryMessageByTopic(String topic, final long begin, final long end) {

RPCHook rpcHook = null;

DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook);

...

3、打包構建并啟動,打開idea的terminal,執行如下命令:

# 在rocketmq-console目錄下執行

mvn clean package -DskipTests

# 進入jar包存放目錄

cd target

# 啟動rocketmq console

java -jar rocketmq-console-ng-1.0.1.jar

4、使用瀏覽器訪問控制臺,我這里由于修改了端口,所以訪問地址是:http://localhost:8011,正常的情況下能看到如下界面:

不習慣英文的話可以在右上角切換語言:

由于控制臺是可視化界面并且支持中文,這里就不過多介紹了,可以參考官方的控制臺使用說明文檔:

RocketMQ術語與概念

我這里將基本的術語與概念簡單總結成了思維導圖:

官方文檔:

Spring消息編程模型 - 編寫生產者

在以上小節搭建完RocketMQ之后,我們來使用Spring的消息編程模型,編寫一個簡單的示例。首先需要在項目中添加相關依賴如下:

org.apache.rocketmq

rocketmq-spring-boot-starter

2.0.3

在配置文件中添加rocketmq相關的配置如下:

rocketmq:

name-server: 192.168.190.129:9876

producer:

# 小坑:必須指定group

group: test-group

編寫生產者的代碼,這里以Controller做示例,具體代碼如下:

package com.zj.node.contentcenter.controller.content;

import lombok.Data;

import lombok.RequiredArgsConstructor;

import org.apache.rocketmq.spring.core.RocketMQTemplate;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**

* 生產者

*

* @author 01

* @date 2019-08-03

**/

@RestController

@RequiredArgsConstructor

public class TestProducerController {

/**

* 用于發送消息到 RocketMQ 的api

*/

private final RocketMQTemplate rocketMQTemplate;

@GetMapping("/test-rocketmq/sendMsg")

public String testSendMsg() {

String topic = "test-topic";

// 發送消息

rocketMQTemplate.convertAndSend(topic, MyMessage.getInstance());

return "send message success";

}

}

@Data

class MyMessage {

private Integer id;

private String name;

private String status;

private Date createTime;

static MyMessage getInstance() {

MyMessage message = new Message();

message.id = 1;

message.name = "×××";

message.status = "default";

message.createTime = new Date();

return message;

}

}

編寫完成后,啟動項目,訪問該接口:

消息發送成功后,可以到RocketMQ的控制臺中進行查看:

消息體可以在消息詳情中查看,如下:

從生產者的代碼來看,可以說是十分的簡單了,只需要使用一個RocketMQTemplate就可以實現將對象轉換成消息體并發送消息。實際上除了RocketMQ外,其他的MQ也有對應的Template,如下:

RocketMQ:RocketMQTemplate

ActiveMQ/Artemis:JmsTemplate

RabbitMQ:AmqpTemplate

Kafka:KafkaTemplate

Spring消息編程模型 - 編寫消費者

在消費者項目中,也需要添加rocketmq的依賴:

org.apache.rocketmq

rocketmq-spring-boot-starter

2.0.3

同樣需要配置Name Server的連接地址:

rocketmq:

name-server: 192.168.190.129:9876

編寫消費者的代碼,具體代碼如下:

package com.zj.node.usercenter.rocketmq;

import com.alibaba.fastjson.JSON;

import lombok.Data;

import lombok.extern.slf4j.Slf4j;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;

import org.apache.rocketmq.spring.core.RocketMQListener;

import org.springframework.stereotype.Component;

import java.util.Date;

/**

* 消費者監聽器

*

* @author 01

* @date 2019-08-03

**/

@Slf4j

@Component

// topic需要和生產者的topic一致,consumerGroup屬性是必須指定的,內容可以隨意

@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer-group")

public class TestConsumerListener implements RocketMQListener {

/**

* 監聽到消息的時候就會調用該方法

*

* @param message 消息體

*/

@Override

public void onMessage(MyMessage message) {

log.info("從test-topic中監聽到消息");

log.info(JSON.toJSONString(message));

}

}

/**

* 消息體結構需要一致

*/

@Data

class MyMessage {

private Integer id;

private String name;

private String status;

private Date createTime;

}

編寫完成后啟動項目,由于之前我們已經往隊列里發送了消息,所以此時消費者項目一啟動,就可以監聽到消息并消費,控制臺就會輸出如下日志:

RocketMQ事務消息

眾所周知RocketMQ是支持事務消息的,這也是很多人選擇使用RocketMQ作為消息中間件的一大原因,也是RocketMQ的一大特定。RocketMQ事務消息的流程如下圖所示:

由于原圖是英文的,所以進行了一個大致的翻譯。如下:

簡單剖析一下流程:

1、生產者向MQ Server發送半消息,半消息是一種特殊的消息,這種消息會被存儲到MQ Server里,但是會標記為暫時不能投遞的狀態,所以此時消費者不會消費該消息

2、當半消息發送成功后,生產者就會去執行本地事務

3、生產者根據本地事務的執行結果,向MQ Server發送commit或rollback消息進行二次確認。如果MQ Server接收到的是commit則會將半消息標記為可投遞狀態,那么消費者就可以進行消費。反之,MQ Server接收到的是rollback則會將半消息丟棄掉,消費者就無法進行消費

4、若MQ Server未接收到二次確認的消息或生產者暫停了本地事務的執行,MQ Server則會定時(默認1分鐘)向生產者發送回查消息,檢查生產者的本地事務狀態。然后生產者會根據回查的本地事務執行結果向MQ Server再次發送commit或rollback消息

概念術語:

半消息(Half Message):暫時無法消費的消息,生產者將消息發送到了MQ Server,但這個消息會被標記為“暫不能投遞”狀態,先存儲起來;消費者不會消費這條消息

消息回查(Message Status Check):網絡斷開或生產者重啟可能導致丟失事務消息的第二次確認。當MQ Server發現消息長時間處于半消息狀態時,將向消息生產者發送請求,詢問該消息的最終狀態(提交或回滾)

消息三態:

Commit:提交事務消息,消費者可以消費此消息

Rollback:回滾事務消息,broker會刪除該消息,消費者不能消費

UNKNOWN:broker需要回查確認該消息的狀態

編碼實現RocketMQ事務消息

要想實現RocketMQ事務消息的話,需要按照流程圖編寫一些代碼。在開始編碼之前,先在數據庫中創建一張RocketMQ的事務日志表,用作于本地事務回查的依據,表結構如下:

然后再建一張表,作為事務方法操作的數據表,表結構如下:

接著開始寫代碼,首先定義一個service,里面有帶有事務注解的方法以及發送事務消息的方法。具體代碼如下:

package com.zj.node.contentcenter.service.test;

import com.zj.node.contentcenter.dao.content.NoticeMapper;

import com.zj.node.contentcenter.dao.log.RocketmqTransactionLogMapper;

import com.zj.node.contentcenter.domain.entity.content.Notice;

import com.zj.node.contentcenter.domain.entity.log.RocketmqTransactionLog;

import lombok.Data;

import lombok.RequiredArgsConstructor;

import org.apache.rocketmq.spring.core.RocketMQTemplate;

import org.apache.rocketmq.spring.support.RocketMQHeaders;

import org.springframework.messaging.support.MessageBuilder;

import org.springframework.stereotype.Service;

import org.springframework.transaction.annotation.Transactional;

import java.util.Date;

import java.util.UUID;

/**

* @author 01

* @date 2019-08-08

**/

@Service

@RequiredArgsConstructor

public class TestProducerService {

private final RocketMQTemplate rocketMQTemplate;

private final NoticeMapper noticeMapper;

private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;

public String testSendMsg(Notice notice) {

// topic

String topic = "test-topic";

// 生產者所在的事務組

String txProducerGroup = "tx-test-producer-group";

// 生產事務id

String transactionId = UUID.randomUUID().toString();

// 發送半消息

rocketMQTemplate.sendMessageInTransaction(

txProducerGroup, topic,

// 消息體

MessageBuilder.withPayload("事務消息")

// header是消息的頭部分,可以用作傳參

.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)

.setHeader("notice_id", notice.getId())

.build(),

// 傳遞到executeLocalTransaction的參數

notice);

return "send message success";

}

@Transactional(rollbackFor = Exception.class)

public void updateNotice(Integer noticeId, Notice notice) {

Notice newNotice = new Notice();

newNotice.setId(noticeId);

newNotice.setContent(notice.getContent());

noticeMapper.updateByPrimaryKeySelective(newNotice);

}

@Transactional(rollbackFor = Exception.class)

public void updateNoticeWithRocketMQLog(Integer noticeId, Notice notice, String transactionId) {

updateNotice(noticeId, notice);

// 寫入事務日志

rocketmqTransactionLogMapper.insertSelective(

RocketmqTransactionLog.builder()

.transactionId(transactionId)

.log("updateNotice")

.build()

);

}

}

實現一個本地事務監聽器,用于執行事務方法及提供本地事務狀態的回查方法。具體代碼如下:

package com.zj.node.contentcenter.rocketmq;

import com.zj.node.contentcenter.dao.log.RocketmqTransactionLogMapper;

import com.zj.node.contentcenter.domain.entity.content.Notice;

import com.zj.node.contentcenter.domain.entity.log.RocketmqTransactionLog;

import com.zj.node.contentcenter.service.test.TestProducerService;

import lombok.RequiredArgsConstructor;

import lombok.extern.slf4j.Slf4j;

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;

import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;

import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;

import org.apache.rocketmq.spring.support.RocketMQHeaders;

import org.springframework.messaging.Message;

import org.springframework.messaging.MessageHeaders;

/**

* 本地事務監聽器

*

* @author 01

* @date 2019-08-08

**/

@Slf4j

@RequiredArgsConstructor

// 這里的txProducerGroup需要與sendMessageInTransaction里設置的一致

@RocketMQTransactionListener(txProducerGroup = "tx-test-producer-group")

public class TestTransactionListener implements RocketMQLocalTransactionListener {

private final TestProducerService service;

private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;

/**

* 用于執行本地事務的方法

*/

@Override

public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {

MessageHeaders headers = msg.getHeaders();

String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);

log.info("執行本地事務方法. 事務id: {}", transactionId);

// header里拿出來的都是String類型

Integer noticeId = Integer.parseInt((String) headers.get("notice_id"));

try {

// 執行帶有事務注解的方法

service.updateNoticeWithRocketMQLog(noticeId, (Notice) arg, transactionId);

// 正常執行,向MQ Server發送commit消息

return RocketMQLocalTransactionState.COMMIT;

} catch (Exception e) {

log.error("本地事務方法發生異常,消息將被回滾", e);

// 發生異常向MQ Server發送rollback消息

return RocketMQLocalTransactionState.ROLLBACK;

}

}

/**

* 用于回查本地事務的執行結果

*/

@Override

public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {

MessageHeaders headers = msg.getHeaders();

String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);

log.warn("回查本地事務狀態. 事務id: {}", transactionId);

// 按事務id查詢日志數據

RocketmqTransactionLog transactionLog = rocketmqTransactionLogMapper.selectOne(

RocketmqTransactionLog.builder()

.transactionId(transactionId)

.build()

);

// 如果能按事務id查詢出來數據表示本地事務執行成功,沒有數據則表示本地事務執行失敗

if (transactionLog == null) {

log.warn("本地事務執行失敗,事務日志不存在,消息將被回滾. 事務id: {}", transactionId);

return RocketMQLocalTransactionState.ROLLBACK;

}

return RocketMQLocalTransactionState.COMMIT;

}

}

簡單說明一下這些方法的執行流程:

首先調用TestProducerService.testSendMsg向MQ Server發送半消息,從代碼也可以看到該方法里不會執行本地事務方法。當MQ Server接收半消息成功后,會告訴生產者接收成功,接著就會執行本地事務監聽器里的executeLocalTransaction方法,該方法里會調用TestProducerService里帶有事務注解的方法updateNoticeWithRocketMQLog,并在事務方法執行完畢后返回本地事務狀態給MQ Server。若executeLocalTransaction方法返回的事務狀態是UNKNOWN或者該方法出于某種原因沒有被執行完畢,那么MQ Server就接收不到二次確認消息,默認會在一分鐘后向生產者發送回查消息,生產者接收到回查消息的話就會執行本地事務監聽器里的checkLocalTransaction方法,通過事務日志記錄表的數據來確認該事務狀態并返回。

RocketMQ日志相關的坑

由于rocketmq有自己內部的日志體系,所以默認不會使用Slf4j。體現到executeLocalTransaction方法的話,就是如果該方法的執行過程中拋出了異常的話,異常信息不會被打印到控制臺,而是輸出到rocketmq_client.log日志文件中。相關源碼:org.apache.rocketmq.client.log.ClientLogger

如果希望rocketmq的日志輸出到控制臺的話,需要在啟動類的main方法中增加如下代碼:

// 讓rocketmq使用slf4j日志

System.setProperty(ClientLogger.CLIENT_LOG_USESLF4J, "true");

總結

以上是生活随笔為你收集整理的rocketmq 消费者不能调用其他服务_Spring Cloud Alibaba RocketMQ - 构建异步通信的微服务...的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。