javascript
rocketmq 消息指定_SpringBoot 整合 RocketMQ 如何实现消息生产消费?
有時(shí)候我們?cè)谑褂孟㈥?duì)列的時(shí)候,往往需要能夠保證消息的順序消費(fèi),而RocketMQ是可以支持消息的順序消費(fèi)的。
RocketMQ在發(fā)送消息的時(shí)候,是將消息發(fā)送到不同的隊(duì)列中,然后消費(fèi)端從多個(gè)隊(duì)列中讀取消息進(jìn)行消費(fèi),很明顯,在這種全局模式下,是無(wú)法實(shí)現(xiàn)順序消費(fèi)的。
為了實(shí)現(xiàn)順序消費(fèi),我們需要把有順序的消息按照他的順序,將他們發(fā)送到同一個(gè)隊(duì)列中,這樣消費(fèi)端在消費(fèi)的時(shí)候,就保證了其順序。
但是順序消費(fèi)的性能肯定也相對(duì)差一些,因?yàn)橹荒苁褂靡粋€(gè)隊(duì)列。
一、在pom.xml中添加依賴(lài):
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.0</version> </dependency>二、在application.yml中配置RocketMQ地址:
server:port: 8888rocketmq:name-server: 127.0.0.1:9876producer:group: ${spring.application.name}sendMessageTimeout: 300000備注:官方下載RocketMQ,本地啟動(dòng)RocketMQ。
三、 一個(gè)簡(jiǎn)單的生產(chǎn)消費(fèi)案例:
生產(chǎn)者:向 stringTopic 的主題中發(fā)送一個(gè) Hello RecketMQ 的字符串。
@RestController@RequestMapping("/mq")public class ProducerController {@Resourceprivate RocketMQTemplate rocketMQTemplate;@RequestMapping("/sync/send1")public String syncSendString() {//發(fā)送一個(gè)同步消息,會(huì)返回值 ---發(fā)送到 stringTopic 主題SendResult sendResult = rocketMQTemplate.syncSend("stringTopic", "Hello RocketMQ");return sendResult.toString();}}消費(fèi)者:監(jiān)聽(tīng) stringTopic 主題。
@Service@RocketMQMessageListener(topic = "stringTopic", consumerGroup = "string_consumer")public class StringConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("消費(fèi)者接收消息:" + message);}}1、啟動(dòng)當(dāng)前服務(wù)。
2、用瀏覽器或者HTTP Client工具訪(fǎng)問(wèn):http://localhost:8888/mq/sync/send1
3、查看控制臺(tái)輸出:【消費(fèi)者接收消息:Hello RocketMQ】即表示消息消費(fèi)成功。
四、實(shí)現(xiàn)順序消費(fèi)
生產(chǎn)者: 生產(chǎn)多條消息,方便觀(guān)察順序。向 orderTopic 主題發(fā)送5條消息,內(nèi)容分別是 no1 no2 no3 no4 no5。第三個(gè)參數(shù)是order ,他的作用是會(huì)根據(jù)他的hash值計(jì)算發(fā)送到哪一個(gè)隊(duì)列。用同一個(gè)值order,那么他們的hash一樣??梢员WC發(fā)送到同一個(gè)隊(duì)列里。
@RestController@RequestMapping("/mq")public class ProducerController {@Resourceprivate RocketMQTemplate rocketMQTemplate;/**************驗(yàn)證RocketMQ順序消費(fèi)***************/@RequestMapping("/send/ordered")public String sendOrderedMsg(){/*** hashKey: 為了保證報(bào)到同一個(gè)隊(duì)列中,將消息發(fā)送到orderTopic主題上*/rocketMQTemplate.syncSendOrderly("orderTopic","no1","order");rocketMQTemplate.syncSendOrderly("orderTopic","no2","order");rocketMQTemplate.syncSendOrderly("orderTopic","no3","order");rocketMQTemplate.syncSendOrderly("orderTopic","no4","order");rocketMQTemplate.syncSendOrderly("orderTopic","no5","order");return "success";}}消費(fèi)者:消費(fèi)者在消費(fèi)的時(shí)候,默認(rèn)是異步多線(xiàn)程消費(fèi)的,所以無(wú)法保證順序,需要指定同步消費(fèi)。指定 consumeMode = ConsumeMode.ORDERLY。默認(rèn)值是 consumeMode = ConsumeMode.CONCURRENT。
@Service@RocketMQMessageListener(topic = "orderTopic", consumerGroup = "ordered-consumer", consumeMode = ConsumeMode.ORDERLY)public class OrderedConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("順序消費(fèi),收到消息:" + message);}}1、啟動(dòng)當(dāng)前服務(wù)。
2、用瀏覽器或者HTTP Client工具訪(fǎng)問(wèn):http://localhost:8888/mq/send/ordered
3、查看控制臺(tái)輸出:【順序打印:no1 no2 no3 no4 no5】即表示消息消費(fèi)成功。
Spring是一個(gè)JavaEE一站式的開(kāi)發(fā)框架。它提供的功能涵蓋了JavaEE程序中的表示層,服務(wù)層,持久層功能組件。這意味著,使用了Spring框架,一個(gè)框架就可以滿(mǎn)足整個(gè)JavaEE程序的開(kāi)發(fā)。
但Spring框架,更加強(qiáng)調(diào)的是它的輕量級(jí)(模塊的可插拔)!!也就是說(shuō),除了內(nèi)核以外模塊,如果你不想使用可以不用,它能夠整合任何第三方的框架。
所以,在現(xiàn)實(shí)開(kāi)發(fā)中,Spring主要用于整合其他框架。我也整理了一份關(guān)于spring的面試題,希望可以幫助到大家!
感謝你看到這里,我是程序員青秧,一枚小碼農(nóng),從事開(kāi)發(fā)六年了,每天都會(huì)分享java相關(guān)技術(shù)文章或行業(yè)資訊
歡迎大家關(guān)注我的專(zhuān)欄:
程序員青秧
里面不定期分享Java架構(gòu)技術(shù)知識(shí)點(diǎn)及解析,還會(huì)不斷更新BATJ面試專(zhuān)題,歡迎大家前來(lái)探討交流,如有好的文章也歡迎投稿。(注意專(zhuān)欄簡(jiǎn)介的介紹獲取最新一線(xiàn)大廠(chǎng)Java面試題總結(jié)資料!)
注意專(zhuān)欄簡(jiǎn)介的介紹獲取最新一線(xiàn)大廠(chǎng)Java面試題總結(jié)資料!
總結(jié)
以上是生活随笔為你收集整理的rocketmq 消息指定_SpringBoot 整合 RocketMQ 如何实现消息生产消费?的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 淘宝运费险怎么赔付
- 下一篇: html中.inner样式,JavaSc