轉自:https://blog.csdn.net/zz775854904/article/details/81092892
MQ全稱為Message Queue,?消息隊列(MQ)是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數(shù)據(jù))來通信,而無需專用連接來鏈接它們。消息傳遞指的是程序之間通過在消息中發(fā)送數(shù)據(jù)進行通信,而不是通過直接調用彼此來通信,直接調用通常是用于諸如遠程過程調用的技術。排隊指的是應用程序通過 隊列來通信。隊列的使用除去了接收和發(fā)送應用程序同時執(zhí)行的要求..
開始正題!
在開發(fā)之前需要下載rabbitmq, 而在rabbitmq安裝之前,童鞋們需要安裝erlang, 因為rabbitmq是用erlang寫的.
安裝完畢之后,我們建立一個maven項目.然后我們開始配置項目.
<spring.version>3.2.8.RELEASE</spring.version> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aop</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aspects</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-tx</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jdbc</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>${spring.version}</version> </dependency> ?
由于是spring整合,我們需要加入spring的依賴.
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.3.5.RELEASE</version> </dependency> 依賴加好了之后, 我們需要定義消息生產者和消息發(fā)送者.
由于exchange有幾種,這里我只測試了兩種, 通過分別定義兩個exchange去綁定direct和topic..
首先, 定義消息生產者, 通過配置將template鏈接connect-factory并注入到代碼中使用.
package com.chris.producer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.io.IOException; @Service public class MessageProducer { private Logger logger = LoggerFactory.getLogger(MessageProducer.class); @Resource(name="amqpTemplate") private AmqpTemplate amqpTemplate; @Resource(name="amqpTemplate2") private AmqpTemplate amqpTemplate2; public void sendMessage(Object message) throws IOException { logger.info("to send message:{}", message); amqpTemplate.convertAndSend("queueTestKey", message); amqpTemplate.convertAndSend("queueTestChris", message); amqpTemplate2.convertAndSend("wuxing.xxxx.wsdwd", message); } } ?
然后我們定義消息消費者, 這里,我定義了三個消費者, 通過監(jiān)聽消息隊列, 分別接受各自所匹配的消息.
第一個消費者, 接受direct的消息, 他的exchange為exchangeTest, ?rout-key為queueTestKey
package com.chris.consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; public class MessageConsumer implements MessageListener { private Logger logger = LoggerFactory.getLogger(MessageConsumer.class); @Override public void onMessage(Message message) { logger.info("consumer receive message------->:{}", message); } } ?
第二個消費者, 接受direct的消息(為了測試一個exchange可以發(fā)送多個消息), 他的exchange為exchangeTest, ?rout-key為queueTestChris.
package com.chris.consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; public class ChrisConsumer implements MessageListener { private Logger logger = LoggerFactory.getLogger(ChrisConsumer.class); @Override public void onMessage(Message message) { logger.info("chris receive message------->:{}", message); } }
第三個消費者, 接受topic的消息他的exchange為exchangeTest2, ?pattern為wuxing.*.. 網上說.*可以匹配一個, .#可以匹配一個或多個..但是筆者好像兩個都試了..都可以匹配一個或多個..不知道什么鬼...
package com.chris.consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; public class WuxingConsumer implements MessageListener { private Logger logger = LoggerFactory.getLogger(WuxingConsumer.class); @Override public void onMessage(Message message) { logger.info("wuxing receive message------->:{}", message); } } ?
然后就是關鍵的地方了..rabbit整合spring的配置文件.
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd"> <rabbit:connection-factory id="connectionFactory" username="guest" password="guest" host="localhost" port="5672" /> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="exchangeTest"/> <rabbit:admin id="connectAdmin" connection-factory="connectionFactory"/> <rabbit:queue name="queueTest" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin"/> <rabbit:direct-exchange name="exchangeTest" durable="true" auto-delete="false" declared-by="connectAdmin"> <rabbit:bindings> <rabbit:binding queue="queueTest" key="queueTestKey"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <bean id="messageReceiver" class="com.chris.consumer.MessageConsumer"></bean> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener queues="queueTest" ref="messageReceiver"/> </rabbit:listener-container> <rabbit:queue name="queueChris" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin"/> <rabbit:direct-exchange name="exchangeTest" durable="true" auto-delete="false" declared-by="connectAdmin"> <rabbit:bindings> <rabbit:binding queue="queueChris" key="queueTestChris"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <bean id="receiverChris" class="com.chris.consumer.ChrisConsumer"></bean> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener queues="queueChris" ref="receiverChris"/> </rabbit:listener-container> <rabbit:connection-factory id="connectionFactory2" username="guest" password="guest" host="localhost" port="5672"/> <rabbit:template id="amqpTemplate2" connection-factory="connectionFactory2" exchange="exchangeTest2"/> <rabbit:admin id="connectAdmin2" connection-factory="connectionFactory2"/> <rabbit:queue name="queueWuxing" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin2"/> <rabbit:topic-exchange name="exchangeTest2" durable="true" auto-delete="false" declared-by="connectAdmin2"> <rabbit:bindings> <rabbit:binding queue="queueWuxing" pattern="wuxing.*"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> <bean id="recieverWuxing" class="com.chris.consumer.WuxingConsumer"></bean> <rabbit:listener-container connection-factory="connectionFactory2" > <rabbit:listener queues="queueWuxing" ref="recieverWuxing"/> </rabbit:listener-container> </beans> ?
這里,有個問題筆者研究了好久...就是如何定義兩個exchange, 一開始一直不成功..直到找到了一篇國外的文章才解決...
定義兩個exchange的時候, 需要用到declared-by..
而這個必須要引入下面的這個申明, 才有..
http: http: ?
文件中大概的配置解釋一下.
connect-factory進行連接rabbitmq服務.
template用于連接factory并指定exchange, 這上面還能直接指定rout-key.
admin相當于一個管理員的角色..可以將exchange和queue進行管理,?
queue和topic-exchange分別定義隊列和路由器, 這里需要用declared-by指定管理員,從而連接到相應的factory.
listener-container用于消費者的監(jiān)聽(其實,rabbit配置中是可以指定某個類的某個方法的, 但是筆者失敗了, 還在試驗中...)
這里還有一個問題...需要大家注意..
當一個exchange綁定了一種類型之后, 這個exchange在配置就不能再換成另一種了.會一直報錯,?received 'direct' but current is 'topic' ?類似這種..
筆者這個也是被坑了若干時間去找問題...
?
然后貼下spring的基本配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd"> <import resource="classpath*:rabbitmq.xml" /> <context:component-scan base-package="com.chris.consumer, com.chris.producer" /> <context:annotation-config /> <context:spring-configured /> </beans> ?
然后是單元測試類, 這里通過輸出100-1慢慢遞減,去觀察控制臺消費者接收消息的情況.
package com.chris; import com.chris.producer.MessageProducer; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class MessageTest { private Logger logger = LoggerFactory.getLogger(MessageTest.class); private ApplicationContext context = null; @Before public void setUp() throws Exception { context = new ClassPathXmlApplicationContext("application.xml"); } @Test public void should_send_a_amq_message() throws Exception { MessageProducer messageProducer = (MessageProducer) context.getBean("messageProducer"); int a = 100; while (a > 0) { messageProducer.sendMessage("Hello, I am amq sender num :" + a--); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
然后控制臺的結果如下(這里只貼出關鍵信息, 其他配置的log的省略了)
2016-09-22 16:15:00,330 [main] INFO [com.chris.producer.MessageProducer] - to send message:Hello, I am amq sender num :100 2016-09-22 16:15:00,348 [main] DEBUG [org.springframework.amqp.rabbit.connection.CachingConnectionFactory] - Creating cached Rabbit Channel from AMQChannel(amqp://guest@127.0.0.1:5672/,3) 2016-09-22 16:15:00,348 [main] DEBUG [org.springframework.amqp.rabbit.core.RabbitTemplate] - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3) 2016-09-22 16:15:00,349 [main] DEBUG [org.springframework.amqp.rabbit.core.RabbitTemplate] - Publishing message on exchange [exchangeTest], routingKey = [queueTestKey] 2016-09-22 16:15:00,357 [main] DEBUG [org.springframework.amqp.rabbit.core.RabbitTemplate] - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp: 2016-09-22 16:15:00,358 [main] DEBUG [org.springframework.amqp.rabbit.core.RabbitTemplate] - Publishing message on exchange [exchangeTest], routingKey = [queueTestChris] 2016-09-22 16:15:00,368 [main] DEBUG [org.springframework.amqp.rabbit.connection.CachingConnectionFactory] - Creating cached Rabbit Channel from AMQChannel(amqp://guest@127.0.0.1:5672/,2) 2016-09-22 16:15:00,369 [main] DEBUG [org.springframework.amqp.rabbit.core.RabbitTemplate] - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2) 2016-09-22 16:15:00,369 [main] DEBUG [org.springframework.amqp.rabbit.core.RabbitTemplate] - Publishing message on exchange [exchangeTest2], routingKey = [wuxing.xxxx.wsdwd] 2016-09-22 16:15:00,370 [pool-1-thread-6] DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer] - Storing delivery for Consumer: tags=[[amq.ctag-hyW85GZHk-AHLLFJUmNLDQ]], channel=Cached Rabbit Channel: AMQChannel(amqp: 2016-09-22 16:15:00,372 [SimpleAsyncTaskExecutor-1] DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer] - Received message: (Body:'Hello, I am amq sender num :100'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchangeTest, receivedRoutingKey=queueTestKey, deliveryTag=1, messageCount=0]) 2016-09-22 16:15:00,373 [SimpleAsyncTaskExecutor-1] INFO [com.chris.consumer.MessageConsumer] - consumer receive message------->:(Body:'Hello, I am amq sender num :100'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchangeTest, receivedRoutingKey=queueTestKey, deliveryTag=1, messageCount=0]) 2016-09-22 16:15:00,374 [SimpleAsyncTaskExecutor-1] DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer] - Retrieving delivery for Consumer: tags=[[amq.ctag-hyW85GZHk-AHLLFJUmNLDQ]], channel=Cached Rabbit Channel: AMQChannel(amqp: 2016-09-22 16:15:00,379 [pool-2-thread-4] DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer] - Storing delivery for Consumer: tags=[[amq.ctag-T-c1red0T_HHyCFfpXLYIQ]], channel=Cached Rabbit Channel: AMQChannel(amqp: 2016-09-22 16:15:00,381 [SimpleAsyncTaskExecutor-1] DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer] - Received message: (Body:'Hello, I am amq sender num :100'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchangeTest2, receivedRoutingKey=wuxing.xxxx.wsdwd, deliveryTag=1, messageCount=0]) 2016-09-22 16:15:00,382 [SimpleAsyncTaskExecutor-1] INFO [com.chris.consumer.WuxingConsumer] - wuxing receive message------->:(Body:'Hello, I am amq sender num :100'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchangeTest2, receivedRoutingKey=wuxing.xxxx.wsdwd, deliveryTag=1, messageCount=0]) 2016-09-22 16:15:00,383 [SimpleAsyncTaskExecutor-1] DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer] - Retrieving delivery for Consumer: tags=[[amq.ctag-T-c1red0T_HHyCFfpXLYIQ]], channel=Cached Rabbit Channel: AMQChannel(amqp: 2016-09-22 16:15:00,396 [pool-1-thread-5] DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer] - Storing delivery for Consumer: tags=[[amq.ctag-h5ERpaWrnqmkNhbfM7S8Ww]], channel=Cached Rabbit Channel: AMQChannel(amqp: 2016-09-22 16:15:00,397 [SimpleAsyncTaskExecutor-1] DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer] - Received message: (Body:'Hello, I am amq sender num :100'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchangeTest, receivedRoutingKey=queueTestChris, deliveryTag=1, messageCount=0]) 2016-09-22 16:15:00,398 [SimpleAsyncTaskExecutor-1] INFO [com.chris.consumer.ChrisConsumer] - chris receive message------->:(Body:'Hello, I am amq sender num :100'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchangeTest, receivedRoutingKey=queueTestChris, deliveryTag=1, messageCount=0])
我們可以看到生產者有發(fā)出一個信息, 然后發(fā)布在了三個通道上.
?
1. on exchange [exchangeTest] , routingKey = [queueTestKey]
2. on exchange [exchangeTest] , routingKey = [queueTestChris]
3. on exchange [exchangeTest2] , routingKey =?[wuxing.xxxx.wsdwd]
?
然后三個消費者分別收到了他們的消息..至此, 整個test就結束了.
對項目有興趣的童鞋可以拿項目的源碼玩一玩 ?源碼在這里
轉載于:https://www.cnblogs.com/sharpest/p/10428953.html
總結
以上是生活随笔為你收集整理的spring集成多个rabbitMQ的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。