javascript
Spring Boot笔记-接收RabbitMQ队列中的消息
目錄
?
?
基本概念
代碼與實例
?
基本概念
首先有個關(guān)鍵:此處實驗接收的數(shù)據(jù)類型為Order,這里要求發(fā)送和接收要一模一樣。
包括包名和類名都要一模一樣:
如下,consumerDemo
下面是productorDemo
這里,包名和類都一模一樣否則接收端監(jiān)聽會失敗!
?
在消費(接收訂閱)端要配置一些數(shù)據(jù):
spring.rabbitmq.listener.simple.concurrency=5 spring.rabbitmq.listener.simple.max-concurrency=10 spring.rabbitmq.listener.simple.acknowledge-mode=manual這里指的是目前并發(fā)為5個,最大并發(fā)數(shù)為10個,監(jiān)聽確認(rèn)為手動,也就是接收了數(shù)據(jù),要給RabbitMQ給一個反饋信息
如下
這里有2個注解,是簡單使用RabbitMQ的關(guān)鍵!
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "order-queue", durable = "true"),exchange = @Exchange(name = "order-exchange", type = "topic"),key = "order.#"))@RabbitHandlerpublic void onOrderMessage(@Payload Order order,@Headers Map<String, Object> headers,Channel channel //手工簽收需要rabbitMQ的通道) throws Exception{............ }這里如果沒有對應(yīng)的交換機和隊列,那么此處就會自動新建
?
代碼與實例
發(fā)送端不停的發(fā)送消息!接收端如下:
發(fā)送端關(guān)鍵代碼:
OderSender.java
package SpringBoot.demo.produce;import SpringBoot.demo.entity.Order; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;@Component public class OderSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(Order order) throws Exception{CorrelationData correlationData = new CorrelationData();correlationData.setId(order.getMessageId());rabbitTemplate.convertAndSend("order-exchange", //exchange"order.abcd", //routingKeyorder, //消息體correlationData); //correlationData消息唯一ID} }DemoApplicationTests.java
package SpringBoot.demo;import SpringBoot.demo.entity.Order; import SpringBoot.demo.produce.OderSender; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;import java.util.UUID;@RunWith(SpringRunner.class) @SpringBootTest public class DemoApplicationTests {@Autowiredprivate OderSender oderSender;@Testpublic void contextLoads() {}@Testpublic void testSend1()throws Exception{Order order = new Order();order.setId("20180618000000000003");order.setName("測試訂單3");order.setMessageId(System.currentTimeMillis() + "$" + UUID.randomUUID().toString());oderSender.send(order);}}application.properties
server.port=8001 spring.rabbitmq.addresses=192.168.164.141:5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000server.servlet.context-path=/ spring.http.encoding.charset=UTF-8 spring.jackson.data-format=yyyy-MM-dd HH:mm:ss spring.jackson.time-zone=GMT+8 spring.jackson.default-property-inclusion=NON_NULL?
接收端關(guān)鍵如下:
OrderReceiver.java
package SpringBoot.demo.consumer;import SpringBoot.demo.entity.Order; import com.rabbitmq.client.Channel; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component;import java.util.Map;@Component public class OrderReceiver {@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "order-queue", durable = "true"),exchange = @Exchange(name = "order-exchange", type = "topic"),key = "order.#"))@RabbitHandlerpublic void onOrderMessage(@Payload Order order,@Headers Map<String, Object> headers,Channel channel //手工簽收需要rabbitMQ的通道) throws Exception{//消費者操作System.out.println("--------------收到消息,開始消費--------------");System.out.println("訂單ID:" + order.getId());//告訴RabbitMQ我已經(jīng)簽收了long deliveryTag = (long)headers.get(AmqpHeaders.DELIVERY_TAG);channel.basicAck(deliveryTag, false); //false為不支持批量簽收}}application.properties
server.port=8002 spring.rabbitmq.addresses=192.168.164.141:5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000server.servlet.context-path=/ spring.http.encoding.charset=UTF-8 spring.jackson.data-format=yyyy-MM-dd HH:mm:ss spring.jackson.time-zone=GMT+8 spring.jackson.default-property-inclusion=NON_NULL#配置關(guān)于consumer相關(guān)的 spring.rabbitmq.listener.simple.concurrency=5 spring.rabbitmq.listener.simple.max-concurrency=10 spring.rabbitmq.listener.simple.acknowledge-mode=manual#限流,同一時間只有一條消息消費 spring.rabbitmq.listener.simple.prefetch=1?
源碼下載地址:
https://github.com/fengfanchen/Java/tree/master/ProductorDemo
https://github.com/fengfanchen/Java/tree/master/consumerDemo
總結(jié)
以上是生活随笔為你收集整理的Spring Boot笔记-接收RabbitMQ队列中的消息的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Nginx文档阅读笔记-Reverse
- 下一篇: Spring Boot笔记-通过反射获取