rocketmq 消息 自定义_跟我学RocketMQ[1-4]之消息消费及支持spring
博客地址:朝·聞·道?www.wuwenliang.net
本文我將繼續(xù)講解如何使用DefaultMQPushConsumer對RocketMQ中的消息進行消費,同時在文章的第二部分將繼續(xù)帶領讀者朋友對DefaultMQPushConsumer進行薄封裝,讓我們在Spring中更容易對消息進行消費。DefaultMQPushConsumer不區(qū)分普通消息和事務消息,即我們能夠利用DefaultMQPushConsumer實現(xiàn)對普通消息和事務消息的消費。
通過DefaultMQProducer消費消息
首先,聲明一個DefaultMQPushConsumer客戶端,并通過構(gòu)造器初始化,構(gòu)造參數(shù)為消費者組。官方建議消費者組以“CID_”開頭。
DefaultMQPushConsumer consumer =
new DefaultMQPushConsumer("CID_SNOWALKER");
設置NameServer地址
defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");
設置Consumer第一次啟動從隊列頭部開始消費
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
設置消費模式為集群方式,CLUSTERING模式下每條消息只會被一個Consumer消費一次,如果設置為BROADCASTING則為廣播模式,每個消費者都會將消息消費至少一次。一般我們使用的均為CLUSTERING模式。
defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
注冊消息監(jiān)聽器,這里需要實現(xiàn)MessageListenerConcurrently接口,并實現(xiàn)consumeMessage(List msgs, ConsumeConcurrentlyContext context) 方法,我這里的demo是lambda形式,實際上是一樣的。如果你不喜歡lambda形式,可以繼續(xù)使用匿名內(nèi)部類或者自行定義一個類實現(xiàn)該接口。
defaultMQPushConsumer.registerMessageListener(
(msgs, context) -> {
for (MessageExt msg : msgs) {
String realMessage = new String(msg.getBody());
LOGGER.info("當前消費線程名={}, 消息id={}, 收到消息為={}",
Thread.currentThread().getName(),
msg.getMsgId(),
realMessage);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
)
這里注意,當消費邏輯執(zhí)行成功,則返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,后續(xù)將不再對該消息進行消費。如果消費邏輯失敗,則需要設置為ConsumeConcurrentlyStatus.RECONSUME_LATER, RocketMQ會對消息進行重新推送,默認推送16次,目的是盡量保證消息消費成功,如果達到最大重試次數(shù),還是失敗則進入死信隊列,等待人工干預。
調(diào)用start()方法,啟動對隊列的監(jiān)聽,開始進行消息的消費。
defaultMQPushConsumer.start();
我們嘗試運行一下,這里我已經(jīng)有了對應的消費者,可以看下運行的日志:
2019-01-23 09:55:25.022 INFO 18784 --- [ublicExecutor_8] c.s.shield.job.publisher.DemoPublisher :
消息id=AC1E5356496018B4AAC2736D06CF0002, 發(fā)送結(jié)果=SEND_OK
2019-01-23 09:55:27.519 INFO 18784 --- [MessageThread_8] c.s.shield.job.consumer.DemoConsumer :
當前消費線程名=ConsumeMessageThread_8, 消息id=AC1E5356496018B4AAC2736D06CF0002, 收到消息為={"msgName":"rocketmq-simple-msg-test","topicName":"SNOWALKER_TEST","tagName":"SNOWALKER_TEST-TAG","clusterName":"localhost.localdomain","taskName":"測試消息簡單發(fā)送------第0次","threadSize":"10","threadName":"simple-msg-test-0"}
可以看到broker推送消息至消費端,并且被成功消費。
Spring框架整合DefaultMQPushConsumer
我們?nèi)匀换赟pring Boot v1.5.3.RELEASE, Spring v4.3.8.RELEASE 對DefaultMQPushConsumer進行整合,相關(guān)代碼已經(jīng)上傳至github
這里對核心代碼進行講解。
首先定義RocketMQPushConsumerAgent.java并將其聲明為spring的bean,作用域為prototype,即多例形式。
@Scope("prototype")
@Component
public class RocketMQPushConsumerAgent {
聲明消息監(jiān)聽器及消息消費者
private MessageListenerConcurrently messageListener;
private DefaultMQPushConsumer defaultMQPushConsumer;
init()方法為核心的初始化邏輯,在該方法中,初始化了DefaultMQPushConsumer,并設置NameServer地址、消費模式以及將外部實現(xiàn)的監(jiān)聽器設置給內(nèi)部的messageListener引用。
接著對消息主題進行訂閱,對該主題下所有的消息進行監(jiān)聽,這里有待優(yōu)化,后續(xù)將把消息的過濾表達式也暴露給調(diào)用者。
所有的配置參數(shù)均通過RocketMQConsumerConfig進行設置,保證接口的整潔性,RocketMQConsumerConfig將在附錄中進行簡單講解。
public RocketMQPushConsumerAgent init(RocketMQConsumerConfig consumerConfig, MessageListenerConcurrently messageListener) throws MQClientException {
defaultMQPushConsumer = new DefaultMQPushConsumer(consumerConfig.getConsumerGroup());
defaultMQPushConsumer.setNamesrvAddr(consumerConfig.getNameSrvAddr());
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 消費模式
if (consumerConfig.getMessageModel() != null) {
defaultMQPushConsumer.setMessageModel(consumerConfig.getMessageModel());
}
// 注冊監(jiān)聽器
this.messageListener = messageListener;
defaultMQPushConsumer.registerMessageListener(this.messageListener);
defaultMQPushConsumer.subscribe(consumerConfig.getTopic(), "*");
LOGGER.debug("com.shield.job.message.rocketmq.RocketMQConsumerAgent消費者客戶端組裝完成");
return this;
}
獨立的啟動方法
public void start() throws MQClientException {
this.defaultMQPushConsumer.start();
}
獨立的關(guān)閉方法
public void destroy() {
defaultMQPushConsumer.shutdown();
LOGGER.debug("com.shield.job.message.rocketmq.RocketMQConsumerAgent消費者客戶端[已關(guān)閉]");
}
為方便外部對消費者進行進一步的自定義設置,提供外部獲取defaultMQPushConsumer的接口。
public DefaultMQPushConsumer getConsumer() {
return defaultMQPushConsumer;
}
RocketMQPushConsumerAgent使用案例
仍然依據(jù)開頭的示例進行改造。
@Component
public class DemoConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(DemoConsumer.class);
使用@Resource(name = “rocketMQPushConsumerAgent”)或者直接@Autowired將自定義的消息消費者注入。
@Resource(name = "rocketMQPushConsumerAgent")
RocketMQPushConsumerAgent rocketMQConsumerAgent;
調(diào)用方需要實現(xiàn)一個返回值為void的方法,并標記為@PostConstruct,在該方法中進行rocketMQConsumerAgent的初始化。當spring在加載過程中,DemoConsumer初始化之前會調(diào)用該init()方法初始化rocketMQConsumerAgent。通過start()鏈式調(diào)用,啟動消息消費者,內(nèi)部是調(diào)用的defaultMQPushConsumer.start()方法。
@PostConstruct
void init() {
try {
rocketMQConsumerAgent.init(
new RocketMQConsumerConfig(
"snowalker-consumer-group",
"172.30.83.100:9876",
"SNOWALKER_TEST",
MessageModel.CLUSTERING),
(msgs, context) -> {
for (MessageExt msg : msgs) {
String realMessage = new String(msg.getBody());
LOGGER.info("當前消費線程名={}, 消息id={}, 收到消息為={}",
Thread.currentThread().getName(),
msg.getMsgId(),
realMessage);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
).start();
LOGGER.info("DemoConsumer 初始化RocketMQ簡單消息消費者完成");
} catch (MQClientException e) {
e.printStackTrace();
LOGGER.info("DemoConsumer 初始化RocketMQ簡單消息消費者失敗");
}
}
}
在init()方法中同時將消息監(jiān)聽器的實現(xiàn)邏輯注入,消費者會加載該接口的實現(xiàn)。
附錄:RocketMQConsumerConfig配置類
public class RocketMQConsumerConfig {
/**消費者組*/
private String consumerGroup;
/**nameServer地址*/
private String nameSrvAddr;
/**消息消費主題*/
private String topic;
private MessageModel messageModel;
public RocketMQConsumerConfig(String consumerGroup, String nameSrvAddr, String topic) {
Preconditions.checkNotNull(consumerGroup);
Preconditions.checkNotNull(nameSrvAddr);
Preconditions.checkNotNull(topic);
this.consumerGroup = consumerGroup;
this.nameSrvAddr = nameSrvAddr;
this.topic = topic;
}
public RocketMQConsumerConfig(String consumerGroup, String nameSrvAddr, String topic, MessageModel messageModel) {
Preconditions.checkNotNull(consumerGroup);
Preconditions.checkNotNull(nameSrvAddr);
Preconditions.checkNotNull(topic);
Preconditions.checkNotNull(messageModel);
this.consumerGroup = consumerGroup;
this.nameSrvAddr = nameSrvAddr;
this.topic = topic;
this.messageModel = messageModel;
}
public String getConsumerGroup() {
return consumerGroup;
}
public String getNameSrvAddr() {
return nameSrvAddr;
}
public String getTopic() {
return topic;
}
public MessageModel getMessageModel() {
return messageModel;
}
}
該配置類封裝了消費者客戶端初始化的必填參數(shù),目的是收攏初始化參數(shù),從而使初始化接口更加簡潔,符合開閉原則。
總結(jié)
以上是生活随笔為你收集整理的rocketmq 消息 自定义_跟我学RocketMQ[1-4]之消息消费及支持spring的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何进行模糊分页
- 下一篇: ApkTool2.34 打包经验