(十三)RabbitMQ使用详解
RabbitMQ是基于AMQP的一款消息管理系統。AMQP(Advanced Message Queuing Protocol),是一個提供消息服務的應用層標準高級消息隊列協議,其中RabbitMQ就是基于這種協議的一種實現。
常見mq:
- ActiveMQ:基于JMS
- RabbitMQ:基于AMQP協議,erlang語言開發,穩定性好
- RocketMQ:基于JMS,阿里巴巴產品,目前交由Apache基金會
- Kafka:分布式消息系統,高吞吐量
Java Client
生產者和消費者都屬于客戶端, rabbitMQ的java客戶端如下
創建 maven 工程
?
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.10.0</version> </dependency>AMQP協議的回顧
1 消息模型
RabbitMq有5種常用的消息模型
1.1 基本消息模型
這是最簡單的消息模型,如下圖:
生產者將消息發送到隊列,消費者從隊列中獲取消息,隊列是存儲消息的緩沖區。
再演示代碼之前,我們先創建一個工程rabbitmq-demo,并編寫一個工具類,用于提供與mq服務創建連接
public class ConnectionUtil {/*** 建立與RabbitMQ的連接* @return* @throws Exception*/public static Connection getConnection() throws Exception {//定義連接工廠ConnectionFactory factory = new ConnectionFactory();//設置服務地址factory.setHost("192.168.18.130");//端口factory.setPort(5672);//設置賬號信息,用戶名、密碼、vhostfactory.setUsername("admin");factory.setPassword("admin");// 通過工程獲取連接Connection connection = factory.newConnection();return connection;} }生產者發送消息
接下來是生產者發送消息,其過程包括:1.與mq服務建立連接,2.建立通道,3.聲明隊列(有相同隊列則不創建,沒有則創建),4.發送消息,代碼如下:
public class Send {private static final String QUEUE_NAME = "basic_queue";public static void main(String[] args) throws Exception {//消息發送端與mq服務創建連接Connection connection = ConnectionUtil.getConnection();//建立通道Channel channel = connection.createChannel();//聲明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "hello world";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("生產者已發送:" + message);channel.close();connection.close();} }消費者接受消息
消費者在接收消息的過程需要經歷如下幾個步驟: 1.與mqfuwu建立連接,2.建立通道,3.聲明隊列,4,接收消息,代碼如下:
public class Consumer1 {private static final String QUEUE_NAME = "basic_queue";public static void main(String[] args) throws Exception {//消息消費者與mq服務建立連接Connection connection = ConnectionUtil.getConnection();//建立通道Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);DefaultConsumer consumer = new DefaultConsumer(channel) {// 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {// body 即消息體String msg = new String(body);System.out.println("消費者1接收到消息:" + msg);}};channel.basicConsume(QUEUE_NAME, true, consumer);} }消息的接收與消費使用都需要在一個匿名內部類DefaultConsumer中完成
注意:隊列需要提前聲明,如果未聲明就使用隊列,則會報錯。如果不清楚生產者和消費者誰先聲明,為了保證不報錯,生產者和消費者都聲明隊列,隊列的創建會保證冪等性,也就是說生產者和消費者都聲明同一個隊列,則只會創建一個隊列
1.2 Work Queues工作隊列模型
在基本消息模型中,一個生產者對應一個消費者,而實際生產過程中,往往消息生產會發送很多條消息,如果消費者只有一個的話效率就會很低,因此rabbitmq有另外一種消息模型,這種模型下,一個生產發送消息到隊列,允許有多個消費者接收消息,但是一條消息只會被一個消費者獲取。
生產者發送消息
與基本消息模型基本一致,這里測試循環發布20條消息:
public class Send {private static final String QUEUE_NAME = "work_queue";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 循環發布任務for (int i = 1; i <= 20; i++) {// 消息內容String message = "task .. " + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("生產者發送消息:" + message);Thread.sleep(500);}channel.close();connection.close();} }消費者1
public class Consumer1 {private static final String QUEUE_NAME = "work_queue";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body);System.out.println("消費者1接收到消息:" + msg);try {Thread.sleep(50);//模擬消費耗時} catch (InterruptedException e) {e.printStackTrace();}}};channel.basicConsume(QUEUE_NAME, true, consumer);} }消費者2
public class Consumer2 {private static final String QUEUE_NAME = "work_queue";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {String msg = new String(body);System.out.println("消費者2接收到消息:" + msg);try {Thread.sleep(50);//模擬消費耗時} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(QUEUE_NAME, true, consumer);} }此時有兩個消費者監聽同一個隊列,當兩個消費者都工作時,生成者發送消息,就會按照負載均衡算法分配給不同消費者,如下圖:
1.3 訂閱模型
在之前的模型中,一條消息只能被一個消費者獲取,而在訂閱模式中,可以實現一條消息被多個消費者獲取。在這種模型下,消息傳遞過程中比之前多了一個exchange交換機,生產者不是直接發送消息到隊列,而是先發送給交換機,經由交換機分配到不同的隊列,而每個消費者都有自己的隊列:
解讀:
1、1個生產者,多個消費者
2、每一個消費者都有自己的一個隊列
3、生產者沒有將消息直接發送到隊列,而是發送到了交換機
4、每個隊列都要綁定到交換機
5、生產者發送的消息,經過交換機到達隊列,實現一個消息被多個消費者獲取的目的
X(exchange)交換機的類型有以下幾種:
Fanout:廣播,交換機將消息發送到所有與之綁定的隊列中去Direct:定向,交換機按照指定的Routing Key發送到匹配的隊列中去Topics:通配符,與Direct大致相同,不同在于Routing Key可以根據通配符進行匹配注意:在發布訂閱模型中,生產者只負責發消息到交換機,至于消息該怎么發,以及發送到哪個隊列,生產者都不負責。一般由消費者創建隊列,并且綁定到交換機
訂閱模型之Fanout
在廣播模式下,消息發送的流程如下:
生產者發送消息
public class Send {private static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明exchange,指定類型為fanoutchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);String message = "hello world";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println("生產者發送消息:" + message);channel.close();connection.close();} }消費者
public class Consumer1 {private static final String QUEUE_NAME = "fanout_queue_1";private static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//消費者聲明自己的隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 聲明exchange,指定類型為directchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//消費者將隊列與交換機進行綁定channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{String msg = new String(body);System.out.println("消費者1獲取到消息:" + msg);}});} }其他消費者只需修改QUEUE_NAME即可
注意:exchange與隊列一樣都需要提前聲明,如果未聲明就使用交換機,則會報錯。如果不清楚生產者和消費者誰先聲明,為了保證不報錯,生產者和消費者都聲明交換機,同樣的,交換機的創建也會保證冪等性。
訂閱模型之Direct
在fanout模型中,生產者發布消息,所有消費者都可以獲取所有消息。在路由模式(Direct)中,可以實現不同的消息被不同的隊列消費,在Direct模式下,交換機不再將消息發送給所有綁定的隊列,而是根據Routing Key將消息發送到指定的隊列,隊列在與交換機綁定時會設定一個Routing Key,而生產者發送的消息時也需要攜帶一個Routing Key。
如圖所示,消費者C1的隊列與交換機綁定時設置的Routing Key是“error”, 而C2的隊列與交換機綁定時設置的Routing Key包括三個:“info”,“error”,“warning”,假如生產者發送一條消息到交換機,并設置消息的Routing Key為“info”,那么交換機只會將消息發送給C2的隊列。
生產者發送消息
public class Send {private static final String EXCHANGE_NAME = "direct_exchange";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明exchange,指定類型為directchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String message = "新增一個訂單";//生產者發送消息時,設置消息的Routing Key:"insert"channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes());System.out.println("生產者發送消息:" + message);channel.close();connection.close();} }消費者1
public class Consumer1 {private static final String QUEUE_NAME = "direct_queue_1";private static final String EXCHANGE_NAME = "direct_exchange";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//消費者聲明自己的隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//聲明交換機channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//消費者將隊列與交換機進行綁定,并且設置Routing Key:"insert"channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{String msg = new String(body);System.out.println("消費者1獲取到消息:" + msg);}});} }其他消費者需要修改隊列名QUEUE_NAME和Routing Key,上述生成者發送的消息,消費者1是可以獲取到的
發布訂閱之Topics
Topic類型的Exchange與Direct相比,都是可以根據RoutingKey把消息路由到不同的隊列。只不過Topic類型Exchange可以讓隊列在綁定Routing key 的時候使用通配符
Routingkey 一般都是有一個或多個單詞組成,多個單詞之間以”.”分割,例如: item.insert
通配符規則:
#:匹配一個或多個詞*:匹配不多不少恰好1個詞舉例:
audit.#:能夠匹配audit.irs.corporate 或者 audit.irsaudit.*:只能匹配audit.irsTopics生產者代碼與Direct大致相同,只不過子聲明交換機時,將類型設為BuiltinExchangeType.TOPIC(topic),
消費者代碼也與Direct大致相同,也是在聲明交換機時設置類型為topic,代碼不再演示
Spring AMQP
Spring AMQP是對AMQP的一種封裝,目的是能夠讓我們更簡便的使用消息隊列,下面介紹一下Spring AMQP在Spring boot中的使用方法
依賴和配置
添加AMQP的啟動器:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>在application.yml中添加RabbitMQ的地址:
spring:rabbitmq:host: 192.168.18.130username: adminpassword: admin消費者
消費者需要定義一個類,類中定義監聽隊列的方法
@Component public class Listener {@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "spring.test.queue", durable = "false"),exchange = @Exchange(value = "spring.test.exchange", type = ExchangeTypes.DIRECT),key = "insert"))public void listen(String msg){System.out.println("消費者接受到消息:" + msg);} }注解:
@Component:保證監聽類被spring掃描到
@RabbitListener:
@RabbitListener包含很多內容,在發布訂閱模式中,我們可以使用其中的“QueueBinding[] bindings”,其中QueueBinding底層如下:
其中Queue表示隊列,Exchange表示交換機,key表示Routing Key
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "spring.test.queue", durable = "false"),exchange = @Exchange(value = "spring.test.exchange", type = ExchangeTypes.DIRECT),key = "insert"))@Queue會創建隊列
@Exchange會創建交換機
@QueueBinding會綁定隊列和交換機
生產者發送消息
可以通過注解引入AmqpTemplate:
@RunWith(SpringRunner.class) @SpringBootTest public class SpringAMQPTest {@Resourceprivate AmqpTemplate template;@Testpublic void testSendMsg() throws InterruptedException {String message = "hello spring";template.convertAndSend("spring.test.exchange", "insert", message);System.out.println("生產者發送消息:" + message);Thread.sleep(10000);//等待10s,讓測試方法延遲結束,防止消費者未來得及獲取消息} }RabbitMQ如何防止消息丟失
1. 消息確認機制(ACK)
RabbitMQ有一個ACK機制,消費者在接收到消息后會向mq服務發送回執ACK,告知消息已被接收。這種ACK分為兩種情況:
- 自動ACK:消息一旦被接收,消費者會自動發送ACK
- 手動ACK:消息接收后,不會自動發送ACK,而是需要手動發送ACK
如果消費者沒有發送ACK,則消息會一直保留在隊列中,等待下次接收。但這里存在一個問題,就是一旦消費者發送了ACK,如果消費者后面宕機,則消息會丟失。因此自動ACK不能保證消費者在接收到消息之后能夠正常完成業務功能,因此需要在消息被充分利用之后,手動ACK確認
自動ACK,basicConsume方法中將autoAck參數設為true即可:
手動ack,在匿名內部類中,手動發送ACK:
當然,如果設置了手動ack,但又不手動發送ACK確認,消息會一直停留在隊列中,可能造成消息的重復獲取
2. 持久化
消息確認機制(ACK)能夠保證消費者不丟失消息,但假如消費者在獲取消息之前mq服務宕機,則消息也會丟失,因此要保證消息在服務端不丟失,則需要將消息進行持久化。隊列、交換機、消息都要持久化。
隊列持久化
exchange持久化
消息持久化
3. 生產者確認
生成者在發送消息過程中也可能出現錯誤或者網絡延遲燈故障,導致消息未成功發送到交換機或者隊列,或重復發送消息,為了解決這個問題,rabbitmq中有多個解決辦法:
事務:
用事務將消息發送代碼包圍起來:
Confirm模式:
如下所示,在發送代碼前執行channel.confirmSelect(),如果消息未正常發送,就會進入if代碼塊,可以進行重發也可以對失敗消息進行記錄
異步confirm方法:
顧名思義,就是生產者發送消息后不用等待服務端回饋發送狀態,可以繼續執行后面的代碼,對于失敗消息重發進行異步處理:
Spring AMQP中添加配置:
生產者確認機制,確保消息正確發送,如果發送失敗會有錯誤回執,從而觸發重試
spring:rabbitmq:publisher-confirms: true?
總結
以上是生活随笔為你收集整理的(十三)RabbitMQ使用详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: (十二)C3P0连接池使用教程
- 下一篇: (十四)消息中间件MQ详解及四大MQ比较