日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

rocketmq 消息 自定义_跟我学RocketMQ[1-4]之消息消费及支持spring

發(fā)布時間:2024/7/23 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 rocketmq 消息 自定义_跟我学RocketMQ[1-4]之消息消费及支持spring 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

博客地址:朝·聞·道?www.wuwenliang.net

本文我將繼續(xù)講解如何使用DefaultMQPushConsumer對RocketMQ中的消息進行消費,同時在文章的第二部分將繼續(xù)帶領讀者朋友對DefaultMQPushConsumer進行薄封裝,讓我們在Spring中更容易對消息進行消費。DefaultMQPushConsumer不區(qū)分普通消息和事務消息,即我們能夠利用DefaultMQPushConsumer實現(xiàn)對普通消息和事務消息的消費。

通過DefaultMQProducer消費消息

首先,聲明一個DefaultMQPushConsumer客戶端,并通過構(gòu)造器初始化,構(gòu)造參數(shù)為消費者組。官方建議消費者組以“CID_”開頭。

DefaultMQPushConsumer consumer =

new DefaultMQPushConsumer("CID_SNOWALKER");

設置NameServer地址

defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");

設置Consumer第一次啟動從隊列頭部開始消費

defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

設置消費模式為集群方式,CLUSTERING模式下每條消息只會被一個Consumer消費一次,如果設置為BROADCASTING則為廣播模式,每個消費者都會將消息消費至少一次。一般我們使用的均為CLUSTERING模式。

defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);

注冊消息監(jiān)聽器,這里需要實現(xiàn)MessageListenerConcurrently接口,并實現(xiàn)consumeMessage(List msgs, ConsumeConcurrentlyContext context) 方法,我這里的demo是lambda形式,實際上是一樣的。如果你不喜歡lambda形式,可以繼續(xù)使用匿名內(nèi)部類或者自行定義一個類實現(xiàn)該接口。

defaultMQPushConsumer.registerMessageListener(

(msgs, context) -> {

for (MessageExt msg : msgs) {

String realMessage = new String(msg.getBody());

LOGGER.info("當前消費線程名={}, 消息id={}, 收到消息為={}",

Thread.currentThread().getName(),

msg.getMsgId(),

realMessage);

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

)

這里注意,當消費邏輯執(zhí)行成功,則返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,后續(xù)將不再對該消息進行消費。如果消費邏輯失敗,則需要設置為ConsumeConcurrentlyStatus.RECONSUME_LATER, RocketMQ會對消息進行重新推送,默認推送16次,目的是盡量保證消息消費成功,如果達到最大重試次數(shù),還是失敗則進入死信隊列,等待人工干預。

調(diào)用start()方法,啟動對隊列的監(jiān)聽,開始進行消息的消費。

defaultMQPushConsumer.start();

我們嘗試運行一下,這里我已經(jīng)有了對應的消費者,可以看下運行的日志:

2019-01-23 09:55:25.022 INFO 18784 --- [ublicExecutor_8] c.s.shield.job.publisher.DemoPublisher :

消息id=AC1E5356496018B4AAC2736D06CF0002, 發(fā)送結(jié)果=SEND_OK

2019-01-23 09:55:27.519 INFO 18784 --- [MessageThread_8] c.s.shield.job.consumer.DemoConsumer :

當前消費線程名=ConsumeMessageThread_8, 消息id=AC1E5356496018B4AAC2736D06CF0002, 收到消息為={"msgName":"rocketmq-simple-msg-test","topicName":"SNOWALKER_TEST","tagName":"SNOWALKER_TEST-TAG","clusterName":"localhost.localdomain","taskName":"測試消息簡單發(fā)送------第0次","threadSize":"10","threadName":"simple-msg-test-0"}

可以看到broker推送消息至消費端,并且被成功消費。

Spring框架整合DefaultMQPushConsumer

我們?nèi)匀换赟pring Boot v1.5.3.RELEASE, Spring v4.3.8.RELEASE 對DefaultMQPushConsumer進行整合,相關(guān)代碼已經(jīng)上傳至github

這里對核心代碼進行講解。

首先定義RocketMQPushConsumerAgent.java并將其聲明為spring的bean,作用域為prototype,即多例形式。

@Scope("prototype")

@Component

public class RocketMQPushConsumerAgent {

聲明消息監(jiān)聽器及消息消費者

private MessageListenerConcurrently messageListener;

private DefaultMQPushConsumer defaultMQPushConsumer;

init()方法為核心的初始化邏輯,在該方法中,初始化了DefaultMQPushConsumer,并設置NameServer地址、消費模式以及將外部實現(xiàn)的監(jiān)聽器設置給內(nèi)部的messageListener引用。

接著對消息主題進行訂閱,對該主題下所有的消息進行監(jiān)聽,這里有待優(yōu)化,后續(xù)將把消息的過濾表達式也暴露給調(diào)用者。

所有的配置參數(shù)均通過RocketMQConsumerConfig進行設置,保證接口的整潔性,RocketMQConsumerConfig將在附錄中進行簡單講解。

public RocketMQPushConsumerAgent init(RocketMQConsumerConfig consumerConfig, MessageListenerConcurrently messageListener) throws MQClientException {

defaultMQPushConsumer = new DefaultMQPushConsumer(consumerConfig.getConsumerGroup());

defaultMQPushConsumer.setNamesrvAddr(consumerConfig.getNameSrvAddr());

defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

// 消費模式

if (consumerConfig.getMessageModel() != null) {

defaultMQPushConsumer.setMessageModel(consumerConfig.getMessageModel());

}

// 注冊監(jiān)聽器

this.messageListener = messageListener;

defaultMQPushConsumer.registerMessageListener(this.messageListener);

defaultMQPushConsumer.subscribe(consumerConfig.getTopic(), "*");

LOGGER.debug("com.shield.job.message.rocketmq.RocketMQConsumerAgent消費者客戶端組裝完成");

return this;

}

獨立的啟動方法

public void start() throws MQClientException {

this.defaultMQPushConsumer.start();

}

獨立的關(guān)閉方法

public void destroy() {

defaultMQPushConsumer.shutdown();

LOGGER.debug("com.shield.job.message.rocketmq.RocketMQConsumerAgent消費者客戶端[已關(guān)閉]");

}

為方便外部對消費者進行進一步的自定義設置,提供外部獲取defaultMQPushConsumer的接口。

public DefaultMQPushConsumer getConsumer() {

return defaultMQPushConsumer;

}

RocketMQPushConsumerAgent使用案例

仍然依據(jù)開頭的示例進行改造。

@Component

public class DemoConsumer {

private static final Logger LOGGER = LoggerFactory.getLogger(DemoConsumer.class);

使用@Resource(name = “rocketMQPushConsumerAgent”)或者直接@Autowired將自定義的消息消費者注入。

@Resource(name = "rocketMQPushConsumerAgent")

RocketMQPushConsumerAgent rocketMQConsumerAgent;

調(diào)用方需要實現(xiàn)一個返回值為void的方法,并標記為@PostConstruct,在該方法中進行rocketMQConsumerAgent的初始化。當spring在加載過程中,DemoConsumer初始化之前會調(diào)用該init()方法初始化rocketMQConsumerAgent。通過start()鏈式調(diào)用,啟動消息消費者,內(nèi)部是調(diào)用的defaultMQPushConsumer.start()方法。

@PostConstruct

void init() {

try {

rocketMQConsumerAgent.init(

new RocketMQConsumerConfig(

"snowalker-consumer-group",

"172.30.83.100:9876",

"SNOWALKER_TEST",

MessageModel.CLUSTERING),

(msgs, context) -> {

for (MessageExt msg : msgs) {

String realMessage = new String(msg.getBody());

LOGGER.info("當前消費線程名={}, 消息id={}, 收到消息為={}",

Thread.currentThread().getName(),

msg.getMsgId(),

realMessage);

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

).start();

LOGGER.info("DemoConsumer 初始化RocketMQ簡單消息消費者完成");

} catch (MQClientException e) {

e.printStackTrace();

LOGGER.info("DemoConsumer 初始化RocketMQ簡單消息消費者失敗");

}

}

}

在init()方法中同時將消息監(jiān)聽器的實現(xiàn)邏輯注入,消費者會加載該接口的實現(xiàn)。

附錄:RocketMQConsumerConfig配置類

public class RocketMQConsumerConfig {

/**消費者組*/

private String consumerGroup;

/**nameServer地址*/

private String nameSrvAddr;

/**消息消費主題*/

private String topic;

private MessageModel messageModel;

public RocketMQConsumerConfig(String consumerGroup, String nameSrvAddr, String topic) {

Preconditions.checkNotNull(consumerGroup);

Preconditions.checkNotNull(nameSrvAddr);

Preconditions.checkNotNull(topic);

this.consumerGroup = consumerGroup;

this.nameSrvAddr = nameSrvAddr;

this.topic = topic;

}

public RocketMQConsumerConfig(String consumerGroup, String nameSrvAddr, String topic, MessageModel messageModel) {

Preconditions.checkNotNull(consumerGroup);

Preconditions.checkNotNull(nameSrvAddr);

Preconditions.checkNotNull(topic);

Preconditions.checkNotNull(messageModel);

this.consumerGroup = consumerGroup;

this.nameSrvAddr = nameSrvAddr;

this.topic = topic;

this.messageModel = messageModel;

}

public String getConsumerGroup() {

return consumerGroup;

}

public String getNameSrvAddr() {

return nameSrvAddr;

}

public String getTopic() {

return topic;

}

public MessageModel getMessageModel() {

return messageModel;

}

}

該配置類封裝了消費者客戶端初始化的必填參數(shù),目的是收攏初始化參數(shù),從而使初始化接口更加簡潔,符合開閉原則。

總結(jié)

以上是生活随笔為你收集整理的rocketmq 消息 自定义_跟我学RocketMQ[1-4]之消息消费及支持spring的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 国产午夜无码视频在线观看 | 国产精品久久久久久久久久直播 | 春色网站 | 性色av蜜臀av浪潮av老女人 | 日本不卡一二三 | 国产精品高清在线观看 | 丰满少妇被猛烈进入高清播放 | 中文字幕资源站 | 欧美一级做a爰片免费视频 成人激情在线观看 | 国产九色在线播放九色 | 国产精品又黄又爽又色无遮挡 | 国产亚洲精品aaaaaaa片 | 羞羞漫画在线播放 | 宝贝乖h调教灌尿穿环 | 自拍偷拍精品 | 国内毛片毛片毛片毛片毛片 | 国产深夜福利在线 | 精品无码av一区二区三区四区 | 奇米影视777第四色 2019中文字幕在线免费观看 | 爱乃なみ加勒比在线播放 | 欧美精品成人一区二区三区四区 | 国产av一区二区三区最新精品 | 国产h在线观看 | 肉丝超薄少妇一区二区三区 | 国产九色视频 | 欧美韩一区 | www.嫩草.com | 欧美精品二区三区四区免费看视频 | 美女视频毛片 | 国产主播福利在线 | 青青操网 | 色婷在线 | 丁香花免费高清完整在线播放 | 色诱av | 女同互舔视频 | 都市激情自拍偷拍 | 国产探花精品一区二区 | 美女网站av | 欧美精品四区 | 天天射影院 | 夜夜嗨老熟女av一区二区三区 | 日韩va在线观看 | 99久99| 好大好爽好舒服 | 欧美日韩免费在线 | 成人国产免费视频 | 国产suv精品一区 | 欧美老肥妇做爰bbww | 日本三级中文字幕 | 捆绑无遮挡打光屁股调教女仆 | 国产任你操| 日本色中色 | 精品国产一区二区三 | 国产一区二区三区自拍 | 国产免费一区二区三区 | 精品国自产拍在线观看 | 涩涩综合 | 亚洲熟妇中文字幕五十中出 | 麻豆网站| 神马久久久久久 | 国产女主播在线观看 | 亚洲情侣av | 午夜视频在线观看视频 | 国产欧美一区二区三区免费看 | 欧美亚洲日本 | 日韩在线视频一区 | 桃色一区二区 | 亚洲精选一区二区三区 | 日本少妇激情 | 波多野结衣一区二区三区中文字幕 | 国产奶水涨喷在线播放 | 国产一区二区视频免费 | 成人性做爰aaa片免费看不忠 | 91亚洲在线| 欧美激情性生活 | 丰满熟妇人妻av无码区 | 一区二区三区视频网站 | 久久婷婷av| 亚洲av电影一区 | 97视频免费在线观看 | 日韩 中文字幕 | 艳情五月| 谁有免费黄色网址 | 又大又硬又爽免费视频 | 波多野结衣在线视频播放 | 99热免费精品 | 国产精品欧美一区二区 | 女人叫床很黄很污句子 | 天天天操 | 欧美干干干 | 久久久久国色av免费观看性色 | 国产真人做爰毛片视频直播 | 一本大道久久精品 | 美女福利视频网 | 成人自拍视频在线观看 | 韩国电影一区二区三区 | 少妇aa | 一级特黄欧美 | 一边吃奶一边摸做爽视频 |