日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

Spring 整合 RocketMQ

發(fā)布時間:2024/4/13 javascript 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spring 整合 RocketMQ 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
1. 引入jar包<!-- RocketMQ --><dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-all</artifactId> <version>3.2.6</version> <type>pom</type> </dependency><dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.2.6</version> </dependency> 2.Spring bean 配置單例<bean id="myProducer" class="cn.zno.rocketmq.MyProducer"init-method="init" destroy-method="destroy"scope="singleton"><property name="producerGroup" value="MyProducerGroup" /><property name="namesrvAddr" value="127.0.0.1:9876" /></bean><bean class="cn.zno.rocketmq.MyConsumer" init-method="init" destroy-method="destroy"scope="singleton"><property name="consumerGroup" value="MyConsumerGroup" /><property name="namesrvAddr" value="127.0.0.1:9876" /></bean> 3. 自定義producer public class MyProducer {private final Logger logger = LoggerFactory.getLogger(MyProducer.class);private DefaultMQProducer defaultMQProducer;private String producerGroup;private String namesrvAddr;/*** Spring bean init-method*/public void init() throws MQClientException {// 參數(shù)信息logger.info("DefaultMQProducer initialize!");logger.info(producerGroup);logger.info(namesrvAddr);// 初始化defaultMQProducer = new DefaultMQProducer(producerGroup);defaultMQProducer.setNamesrvAddr(namesrvAddr);defaultMQProducer.setInstanceName(String.valueOf(System.currentTimeMillis()));defaultMQProducer.start();logger.info("DefaultMQProudcer start success!");}/*** Spring bean destroy-method*/public void destroy() {defaultMQProducer.shutdown();}public DefaultMQProducer getDefaultMQProducer() {return defaultMQProducer;}// ---------------setter -----------------public void setProducerGroup(String producerGroup) {this.producerGroup = producerGroup;}public void setNamesrvAddr(String namesrvAddr) {this.namesrvAddr = namesrvAddr;}4. 自定義consumer public class MyConsumer {private final Logger logger = LoggerFactory.getLogger(MyConsumer.class);private DefaultMQPushConsumer defaultMQPushConsumer;private String namesrvAddr;private String consumerGroup;/*** Spring bean init-method*/public void init() throws InterruptedException, MQClientException {// 參數(shù)信息logger.info("DefaultMQPushConsumer initialize!");logger.info(consumerGroup);logger.info(namesrvAddr);// 一個應用創(chuàng)建一個Consumer,由應用來維護此對象,可以設置為全局對象或者單例<br>// 注意:ConsumerGroupName需要由應用來保證唯一defaultMQPushConsumer = new DefaultMQPushConsumer(consumerGroup);defaultMQPushConsumer.setNamesrvAddr(namesrvAddr);defaultMQPushConsumer.setInstanceName(String.valueOf(System.currentTimeMillis()));// 訂閱指定MyTopic下tags等于MyTagdefaultMQPushConsumer.subscribe("MyTopic", "MyTag");// 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br>// 如果非第一次啟動,那么按照上次消費的位置繼續(xù)消費defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 設置為集群消費(區(qū)別于廣播消費) defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {// 默認msgs里只有一條消息,可以通過設置consumeMessageBatchMaxSize參數(shù)來批量接收消息@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {MessageExt msg = msgs.get(0);if (msg.getTopic().equals("MyTopic")) {// TODO 執(zhí)行Topic的消費邏輯if (msg.getTags() != null && msg.getTags().equals("MyTag")) {// TODO 執(zhí)行Tag的消費}}// 如果沒有return success ,consumer會重新消費該消息,直到return successreturn ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// Consumer對象在使用之前必須要調用start初始化,初始化一次即可<br>defaultMQPushConsumer.start();logger.info("DefaultMQPushConsumer start success!");}/*** Spring bean destroy-method*/public void destroy() {defaultMQPushConsumer.shutdown();}// ----------------- setter --------------------public void setNamesrvAddr(String namesrvAddr) {this.namesrvAddr = namesrvAddr;}public void setConsumerGroup(String consumerGroup) {this.consumerGroup = consumerGroup;}5. 發(fā)消息@Autowiredprivate MyProducer myProducer;public void sendMessage() {Message msg = new Message("MyTopic", "MyTag", (JSONObject.fromObject(someMessage)).getBytes());SendResult sendResult = null;try {sendResult = myProducer.getDefaultMQProducer().send(msg);} catch (MQClientException e) {logger.error(e.getMessage() + String.valueOf(sendResult));}// 當消息發(fā)送失敗時如何處理if (sendResult == null || sendResult.getSendStatus() != SendStatus.SEND_OK) {// TODO}}

?

總結

以上是生活随笔為你收集整理的Spring 整合 RocketMQ的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 毛片一级在线观看 | 日韩毛片中文字幕 | 男人爽女人下面动态图 | 丝袜 中出 制服 人妻 美腿 | 午夜黄色福利视频 | 亚洲一二三四在线 | 久久久三区 | 欧美 国产 精品 | 久久123| 九色影院| 国产日韩欧美一区二区东京热 | 欧美cccc极品丰满hd | a级一级片 | 岛国裸体写真hd在线 | 欧美三级午夜理伦三级老人 | 欧美日本一区二区 | 黑人一区二区三区四区五区 | 国产你懂的| 伊人狠狠干| 国产精品一区在线观看 | 亚洲美女激情视频 | 韩国福利一区 | 爱久久视频 | 影音先锋男人资源网站 | 天堂网站| 91蜜桃臀久久一区二区 | 婷婷久久五月 | 精品免费 | 日韩久久久久久久久 | 国产精品18久久久久久久久 | 国产日韩一区二区 | 狼人伊人干 | 97av视频| 婷婷网址 | 在线免费视频观看 | 国产精品五月天 | 国产精品美女久久 | 精品区在线观看 | 日韩欧美精品免费 | 美腿丝袜亚洲色图 | 日韩视频免费观看 | 欧美日韩国产成人在线 | 美女扒开腿让男人 | 亚洲精品免费观看 | 色戒电影未测减除版 | 超碰综合 | 性xxx18| 成人小视频免费在线观看 | 好爽…又高潮了毛片免费看 | 国产宾馆自拍 | 美女又黄又爽 | 可以直接观看的av | 超碰夫妻 | 美女扒开内裤让男人桶 | 欧美日韩aa | 综合人人| 欧美日韩视频在线观看一区 | 日韩二区视频 | 天堂在线| 国产三级做爰高清在线 | 绿色地狱在线观看 | 欧美永久精品 | 丰满女人又爽又紧又丰满 | 久久久久亚洲无码 | 台湾男男gay做爽爽的视频 | 国产精品zjzjzj在线观看 | 国内精品99 | 森泽佳奈中文字幕 | 国产一区二区激情 | 肉丝袜脚交视频一区二区 | 污视频网站在线播放 | 色 综合 欧美 亚洲 国产 | 天堂中文在线观看视频 | 国产91在线高潮白浆在线观看 | 色爱综合网 | 国产精品无码在线 | 日本一区二区不卡在线 | 久久国内精品视频 | 黄色男同视频 | 91免费观看视频 | 国产原创在线播放 | 亚洲成年人av | 精品人妻少妇一区二区 | 成人黄色在线观看视频 | av漫画在线观看 | 一级黄色片毛片 | 伊人影院在线视频 | 凹凸福利视频 | 一道本一区二区 | 国产喷潮 | 免费草逼视频 | 激情久| 超碰一区二区 | 六月色 | 88福利视频| 第九色 | 夜夜骑夜夜 | www.youjizz.com视频 | 欧美一区二区三区成人久久片 |