rabbitmq的基本使用
現(xiàn)在微服務(wù)盛行, 我們通常會(huì)進(jìn)行解耦, 這時(shí)候就需要異步的消息隊(duì)列來幫助各個(gè)服務(wù)之間解耦
rabbitmq的基本概念介紹
rabbitmq的基本概念有消息producer(消息生產(chǎn)者)、exchange(交換機(jī))、queue(隊(duì)列)、consumer(消費(fèi)者)、routingKey
(圖中的P是producer, 即消息生產(chǎn)者, 中簡(jiǎn)的Server是Exchange(交換機(jī)) 和 Queue(隊(duì)列))
- Queue(隊(duì)列)
queue是存放消息的隊(duì)列, 實(shí)際上就是一個(gè)存放消息數(shù)據(jù)結(jié)構(gòu)為隊(duì)列的一個(gè)容器
- exchange(交換機(jī))
我們可能會(huì)簡(jiǎn)單的以為發(fā)送者會(huì)把消息發(fā)送到隊(duì)列中, 然后消費(fèi)者對(duì)隊(duì)列進(jìn)行監(jiān)聽。事實(shí)上, 消息發(fā)送者永遠(yuǎn)不會(huì)將消息直接發(fā)送到隊(duì)列中, 而是將消息發(fā)送到exhang中, 再由exchange通過一定的路由規(guī)則路由到對(duì)應(yīng)的消息隊(duì)列中。
交換機(jī)有四種類型:
- routingKey
在上面介紹exchange中說到消息通過一定的路由規(guī)則路由到對(duì)應(yīng)的隊(duì)列中, routingKey就是起著這樣的一個(gè)作用,通常我們發(fā)送消息到exchane中的時(shí)候會(huì)攜帶一個(gè)routingKey, 而這個(gè)routingKey就是exchange和queue綁定的一個(gè)規(guī)則, 由此便可以將消息從exchange再發(fā)送到對(duì)應(yīng)的queue上
參考文章https://segmentfault.com/a/11...
SpringBoot中使用rabbitmq
首先添加以下依賴:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>配置項(xiàng)如下:
spring:rabbitmq:port: 5672password: guestusername: guesthost: localhostlistener:simple:acknowledge-mode: manualconcurrency: 1max-concurrency: 1retry:enabled: true在瀏覽器輸入http://localhost:15672/, 在mq上我們新建了一個(gè)名為exchange1, routingKey為exhcange1-queue1的exchange, 并且映射到名為queue1的隊(duì)列,
發(fā)送消息代碼Prodcuer:
public class Sender{@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(Object object) {CorrelationData correlationData = new CorrelationData();correlationData.setId("exchange1-queue1-id");String message = "hello world";rabbitTemplate.convertAndSend("exchange1", "exchange1-queue1", "helloworld", new CorrelationData());} }在上面的代碼中我們發(fā)送了一個(gè)消息到名為"exchange1", routingKey為"exchange1-queue1"的消息。我們啟動(dòng)rabbitmq。
發(fā)送后可以在mq上看到如下圖已經(jīng)成功發(fā)送了。
接下來貼上接受消息的receiver代碼:
@Component public class Receiver {@RabbitListener(queues = "queue1")public void receive(Message message, Channel channel) {try {System.out.println(message.getBody());} catch (Exception e) {e.printStackTrace();}} }接受消息后再看mq上如下圖:
可以看出queue1上Ready一欄是0,但是Unacked一欄和Total一欄依然有消息, 這是因?yàn)槲覀冊(cè)倥渲梦募性O(shè)置的是手動(dòng)的ack,這時(shí)候代碼中沒有進(jìn)行ack, mq會(huì)認(rèn)為消費(fèi)者沒有成功消費(fèi)掉這條消息, 這時(shí)候就需要在配置文件中設(shè)置成自動(dòng)ack, 或者在代碼中手動(dòng)進(jìn)行ack,在消費(fèi)者后添加如下代碼:
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);總結(jié)
以上是生活随笔為你收集整理的rabbitmq的基本使用的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: (四)Java B2B2C o2o多用户
- 下一篇: D3可视化:(1)初次见面,SVG与D3