當前位置:
首頁 >
前端技术
> javascript
>内容正文
javascript
Spring 整合 RocketMQ
生活随笔
收集整理的這篇文章主要介紹了
Spring 整合 RocketMQ
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
1. 引入jar包<!-- RocketMQ --><dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-all</artifactId> <version>3.2.6</version> <type>pom</type> </dependency><dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.2.6</version> </dependency> 2.Spring bean 配置單例<bean id="myProducer" class="cn.zno.rocketmq.MyProducer"init-method="init" destroy-method="destroy"scope="singleton"><property name="producerGroup" value="MyProducerGroup" /><property name="namesrvAddr" value="127.0.0.1:9876" /></bean><bean class="cn.zno.rocketmq.MyConsumer" init-method="init" destroy-method="destroy"scope="singleton"><property name="consumerGroup" value="MyConsumerGroup" /><property name="namesrvAddr" value="127.0.0.1:9876" /></bean> 3. 自定義producer
public class MyProducer {private final Logger logger = LoggerFactory.getLogger(MyProducer.class);private DefaultMQProducer defaultMQProducer;private String producerGroup;private String namesrvAddr;/*** Spring bean init-method*/public void init() throws MQClientException {// 參數(shù)信息logger.info("DefaultMQProducer initialize!");logger.info(producerGroup);logger.info(namesrvAddr);// 初始化defaultMQProducer = new DefaultMQProducer(producerGroup);defaultMQProducer.setNamesrvAddr(namesrvAddr);defaultMQProducer.setInstanceName(String.valueOf(System.currentTimeMillis()));defaultMQProducer.start();logger.info("DefaultMQProudcer start success!");}/*** Spring bean destroy-method*/public void destroy() {defaultMQProducer.shutdown();}public DefaultMQProducer getDefaultMQProducer() {return defaultMQProducer;}// ---------------setter -----------------public void setProducerGroup(String producerGroup) {this.producerGroup = producerGroup;}public void setNamesrvAddr(String namesrvAddr) {this.namesrvAddr = namesrvAddr;}4. 自定義consumer
public class MyConsumer {private final Logger logger = LoggerFactory.getLogger(MyConsumer.class);private DefaultMQPushConsumer defaultMQPushConsumer;private String namesrvAddr;private String consumerGroup;/*** Spring bean init-method*/public void init() throws InterruptedException, MQClientException {// 參數(shù)信息logger.info("DefaultMQPushConsumer initialize!");logger.info(consumerGroup);logger.info(namesrvAddr);// 一個應用創(chuàng)建一個Consumer,由應用來維護此對象,可以設置為全局對象或者單例<br>// 注意:ConsumerGroupName需要由應用來保證唯一defaultMQPushConsumer = new DefaultMQPushConsumer(consumerGroup);defaultMQPushConsumer.setNamesrvAddr(namesrvAddr);defaultMQPushConsumer.setInstanceName(String.valueOf(System.currentTimeMillis()));// 訂閱指定MyTopic下tags等于MyTagdefaultMQPushConsumer.subscribe("MyTopic", "MyTag");// 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br>// 如果非第一次啟動,那么按照上次消費的位置繼續(xù)消費defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 設置為集群消費(區(qū)別于廣播消費)
defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {// 默認msgs里只有一條消息,可以通過設置consumeMessageBatchMaxSize參數(shù)來批量接收消息@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {MessageExt msg = msgs.get(0);if (msg.getTopic().equals("MyTopic")) {// TODO 執(zhí)行Topic的消費邏輯if (msg.getTags() != null && msg.getTags().equals("MyTag")) {// TODO 執(zhí)行Tag的消費}}// 如果沒有return success ,consumer會重新消費該消息,直到return successreturn ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// Consumer對象在使用之前必須要調用start初始化,初始化一次即可<br>defaultMQPushConsumer.start();logger.info("DefaultMQPushConsumer start success!");}/*** Spring bean destroy-method*/public void destroy() {defaultMQPushConsumer.shutdown();}// ----------------- setter --------------------public void setNamesrvAddr(String namesrvAddr) {this.namesrvAddr = namesrvAddr;}public void setConsumerGroup(String consumerGroup) {this.consumerGroup = consumerGroup;}5. 發(fā)消息@Autowiredprivate MyProducer myProducer;public void sendMessage() {Message msg = new Message("MyTopic", "MyTag", (JSONObject.fromObject(someMessage)).getBytes());SendResult sendResult = null;try {sendResult = myProducer.getDefaultMQProducer().send(msg);} catch (MQClientException e) {logger.error(e.getMessage() + String.valueOf(sendResult));}// 當消息發(fā)送失敗時如何處理if (sendResult == null || sendResult.getSendStatus() != SendStatus.SEND_OK) {// TODO}}
?
總結
以上是生活随笔為你收集整理的Spring 整合 RocketMQ的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java消息队列--ActiveMq 初
- 下一篇: 代理模式 、JDK动态代理、cglib动