日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

(十三)RabbitMQ使用详解

發布時間:2023/12/3 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 (十三)RabbitMQ使用详解 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

RabbitMQ是基于AMQP的一款消息管理系統。AMQP(Advanced Message Queuing Protocol),是一個提供消息服務的應用層標準高級消息隊列協議,其中RabbitMQ就是基于這種協議的一種實現。

常見mq:

  • ActiveMQ:基于JMS
  • RabbitMQ:基于AMQP協議,erlang語言開發,穩定性好
  • RocketMQ:基于JMS,阿里巴巴產品,目前交由Apache基金會
  • Kafka:分布式消息系統,高吞吐量

Java Client

生產者和消費者都屬于客戶端, rabbitMQ的java客戶端如下

創建 maven 工程

?

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.10.0</version> </dependency>

AMQP協議的回顧

1 消息模型

RabbitMq有5種常用的消息模型

1.1 基本消息模型

這是最簡單的消息模型,如下圖:

生產者將消息發送到隊列,消費者從隊列中獲取消息,隊列是存儲消息的緩沖區。

再演示代碼之前,我們先創建一個工程rabbitmq-demo,并編寫一個工具類,用于提供與mq服務創建連接

public class ConnectionUtil {/*** 建立與RabbitMQ的連接* @return* @throws Exception*/public static Connection getConnection() throws Exception {//定義連接工廠ConnectionFactory factory = new ConnectionFactory();//設置服務地址factory.setHost("192.168.18.130");//端口factory.setPort(5672);//設置賬號信息,用戶名、密碼、vhostfactory.setUsername("admin");factory.setPassword("admin");// 通過工程獲取連接Connection connection = factory.newConnection();return connection;} }

生產者發送消息

接下來是生產者發送消息,其過程包括:1.與mq服務建立連接,2.建立通道,3.聲明隊列(有相同隊列則不創建,沒有則創建),4.發送消息,代碼如下:

public class Send {private static final String QUEUE_NAME = "basic_queue";public static void main(String[] args) throws Exception {//消息發送端與mq服務創建連接Connection connection = ConnectionUtil.getConnection();//建立通道Channel channel = connection.createChannel();//聲明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "hello world";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("生產者已發送:" + message);channel.close();connection.close();} }

消費者接受消息

消費者在接收消息的過程需要經歷如下幾個步驟: 1.與mqfuwu建立連接,2.建立通道,3.聲明隊列,4,接收消息,代碼如下:

public class Consumer1 {private static final String QUEUE_NAME = "basic_queue";public static void main(String[] args) throws Exception {//消息消費者與mq服務建立連接Connection connection = ConnectionUtil.getConnection();//建立通道Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);DefaultConsumer consumer = new DefaultConsumer(channel) {// 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {// body 即消息體String msg = new String(body);System.out.println("消費者1接收到消息:" + msg);}};channel.basicConsume(QUEUE_NAME, true, consumer);} }

消息的接收與消費使用都需要在一個匿名內部類DefaultConsumer中完成

注意:隊列需要提前聲明,如果未聲明就使用隊列,則會報錯。如果不清楚生產者和消費者誰先聲明,為了保證不報錯,生產者和消費者都聲明隊列,隊列的創建會保證冪等性,也就是說生產者和消費者都聲明同一個隊列,則只會創建一個隊列

1.2 Work Queues工作隊列模型

在基本消息模型中,一個生產者對應一個消費者,而實際生產過程中,往往消息生產會發送很多條消息,如果消費者只有一個的話效率就會很低,因此rabbitmq有另外一種消息模型,這種模型下,一個生產發送消息到隊列,允許有多個消費者接收消息,但是一條消息只會被一個消費者獲取。

生產者發送消息

與基本消息模型基本一致,這里測試循環發布20條消息:

public class Send {private static final String QUEUE_NAME = "work_queue";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 循環發布任務for (int i = 1; i <= 20; i++) {// 消息內容String message = "task .. " + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("生產者發送消息:" + message);Thread.sleep(500);}channel.close();connection.close();} }

消費者1

public class Consumer1 {private static final String QUEUE_NAME = "work_queue";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body);System.out.println("消費者1接收到消息:" + msg);try {Thread.sleep(50);//模擬消費耗時} catch (InterruptedException e) {e.printStackTrace();}}};channel.basicConsume(QUEUE_NAME, true, consumer);} }

消費者2

public class Consumer2 {private static final String QUEUE_NAME = "work_queue";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {String msg = new String(body);System.out.println("消費者2接收到消息:" + msg);try {Thread.sleep(50);//模擬消費耗時} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(QUEUE_NAME, true, consumer);} }

此時有兩個消費者監聽同一個隊列,當兩個消費者都工作時,生成者發送消息,就會按照負載均衡算法分配給不同消費者,如下圖:

1.3 訂閱模型

在之前的模型中,一條消息只能被一個消費者獲取,而在訂閱模式中,可以實現一條消息被多個消費者獲取。在這種模型下,消息傳遞過程中比之前多了一個exchange交換機,生產者不是直接發送消息到隊列,而是先發送給交換機,經由交換機分配到不同的隊列,而每個消費者都有自己的隊列:

解讀:

1、1個生產者,多個消費者

2、每一個消費者都有自己的一個隊列

3、生產者沒有將消息直接發送到隊列,而是發送到了交換機

4、每個隊列都要綁定到交換機

5、生產者發送的消息,經過交換機到達隊列,實現一個消息被多個消費者獲取的目的

X(exchange)交換機的類型有以下幾種:

Fanout:廣播,交換機將消息發送到所有與之綁定的隊列中去Direct:定向,交換機按照指定的Routing Key發送到匹配的隊列中去Topics:通配符,與Direct大致相同,不同在于Routing Key可以根據通配符進行匹配

注意:在發布訂閱模型中,生產者只負責發消息到交換機,至于消息該怎么發,以及發送到哪個隊列,生產者都不負責。一般由消費者創建隊列,并且綁定到交換機

訂閱模型之Fanout

在廣播模式下,消息發送的流程如下:

  • 可以有多個消費者,每個消費者都有自己的隊列
  • 每個隊列都要與exchange綁定
  • 生產者發送消息到exchange
  • exchange將消息把消息發送到所有綁定的隊列中去
  • 消費者從各自的隊列中獲取消息
  • 生產者發送消息

    public class Send {private static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明exchange,指定類型為fanoutchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);String message = "hello world";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println("生產者發送消息:" + message);channel.close();connection.close();} }

    消費者

    public class Consumer1 {private static final String QUEUE_NAME = "fanout_queue_1";private static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//消費者聲明自己的隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 聲明exchange,指定類型為directchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//消費者將隊列與交換機進行綁定channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{String msg = new String(body);System.out.println("消費者1獲取到消息:" + msg);}});} }

    其他消費者只需修改QUEUE_NAME即可

    注意:exchange與隊列一樣都需要提前聲明,如果未聲明就使用交換機,則會報錯。如果不清楚生產者和消費者誰先聲明,為了保證不報錯,生產者和消費者都聲明交換機,同樣的,交換機的創建也會保證冪等性。

    訂閱模型之Direct

    在fanout模型中,生產者發布消息,所有消費者都可以獲取所有消息。在路由模式(Direct)中,可以實現不同的消息被不同的隊列消費,在Direct模式下,交換機不再將消息發送給所有綁定的隊列,而是根據Routing Key將消息發送到指定的隊列,隊列在與交換機綁定時會設定一個Routing Key,而生產者發送的消息時也需要攜帶一個Routing Key。

    如圖所示,消費者C1的隊列與交換機綁定時設置的Routing Key是“error”, 而C2的隊列與交換機綁定時設置的Routing Key包括三個:“info”,“error”,“warning”,假如生產者發送一條消息到交換機,并設置消息的Routing Key為“info”,那么交換機只會將消息發送給C2的隊列。

    生產者發送消息

    public class Send {private static final String EXCHANGE_NAME = "direct_exchange";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明exchange,指定類型為directchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String message = "新增一個訂單";//生產者發送消息時,設置消息的Routing Key:"insert"channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes());System.out.println("生產者發送消息:" + message);channel.close();connection.close();} }

    消費者1

    public class Consumer1 {private static final String QUEUE_NAME = "direct_queue_1";private static final String EXCHANGE_NAME = "direct_exchange";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//消費者聲明自己的隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//聲明交換機channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//消費者將隊列與交換機進行綁定,并且設置Routing Key:"insert"channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{String msg = new String(body);System.out.println("消費者1獲取到消息:" + msg);}});} }

    其他消費者需要修改隊列名QUEUE_NAME和Routing Key,上述生成者發送的消息,消費者1是可以獲取到的

    發布訂閱之Topics

    Topic類型的Exchange與Direct相比,都是可以根據RoutingKey把消息路由到不同的隊列。只不過Topic類型Exchange可以讓隊列在綁定Routing key 的時候使用通配符

    Routingkey 一般都是有一個或多個單詞組成,多個單詞之間以”.”分割,例如: item.insert

    通配符規則:

    #:匹配一個或多個詞*:匹配不多不少恰好1個詞

    舉例:

    audit.#:能夠匹配audit.irs.corporate 或者 audit.irsaudit.*:只能匹配audit.irs

    Topics生產者代碼與Direct大致相同,只不過子聲明交換機時,將類型設為BuiltinExchangeType.TOPIC(topic),

    消費者代碼也與Direct大致相同,也是在聲明交換機時設置類型為topic,代碼不再演示

    Spring AMQP

    Spring AMQP是對AMQP的一種封裝,目的是能夠讓我們更簡便的使用消息隊列,下面介紹一下Spring AMQP在Spring boot中的使用方法

    依賴和配置

    添加AMQP的啟動器:

    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>

    在application.yml中添加RabbitMQ的地址:

    spring:rabbitmq:host: 192.168.18.130username: adminpassword: admin

    消費者

    消費者需要定義一個類,類中定義監聽隊列的方法

    @Component public class Listener {@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "spring.test.queue", durable = "false"),exchange = @Exchange(value = "spring.test.exchange", type = ExchangeTypes.DIRECT),key = "insert"))public void listen(String msg){System.out.println("消費者接受到消息:" + msg);} }

    注解:

    @Component:保證監聽類被spring掃描到

    @RabbitListener:

    @RabbitListener包含很多內容,在發布訂閱模式中,我們可以使用其中的“QueueBinding[] bindings”,其中QueueBinding底層如下:

    其中Queue表示隊列,Exchange表示交換機,key表示Routing Key

    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "spring.test.queue", durable = "false"),exchange = @Exchange(value = "spring.test.exchange", type = ExchangeTypes.DIRECT),key = "insert"))

    @Queue會創建隊列

    @Exchange會創建交換機

    @QueueBinding會綁定隊列和交換機

    生產者發送消息

    可以通過注解引入AmqpTemplate:

    @RunWith(SpringRunner.class) @SpringBootTest public class SpringAMQPTest {@Resourceprivate AmqpTemplate template;@Testpublic void testSendMsg() throws InterruptedException {String message = "hello spring";template.convertAndSend("spring.test.exchange", "insert", message);System.out.println("生產者發送消息:" + message);Thread.sleep(10000);//等待10s,讓測試方法延遲結束,防止消費者未來得及獲取消息} }

    RabbitMQ如何防止消息丟失

    1. 消息確認機制(ACK)

    RabbitMQ有一個ACK機制,消費者在接收到消息后會向mq服務發送回執ACK,告知消息已被接收。這種ACK分為兩種情況:

    • 自動ACK:消息一旦被接收,消費者會自動發送ACK
    • 手動ACK:消息接收后,不會自動發送ACK,而是需要手動發送ACK

    如果消費者沒有發送ACK,則消息會一直保留在隊列中,等待下次接收。但這里存在一個問題,就是一旦消費者發送了ACK,如果消費者后面宕機,則消息會丟失。因此自動ACK不能保證消費者在接收到消息之后能夠正常完成業務功能,因此需要在消息被充分利用之后,手動ACK確認

    自動ACK,basicConsume方法中將autoAck參數設為true即可:

    手動ack,在匿名內部類中,手動發送ACK:

    當然,如果設置了手動ack,但又不手動發送ACK確認,消息會一直停留在隊列中,可能造成消息的重復獲取

    2. 持久化

    消息確認機制(ACK)能夠保證消費者不丟失消息,但假如消費者在獲取消息之前mq服務宕機,則消息也會丟失,因此要保證消息在服務端不丟失,則需要將消息進行持久化。隊列、交換機、消息都要持久化。

    隊列持久化

    exchange持久化

    消息持久化

    3. 生產者確認

    生成者在發送消息過程中也可能出現錯誤或者網絡延遲燈故障,導致消息未成功發送到交換機或者隊列,或重復發送消息,為了解決這個問題,rabbitmq中有多個解決辦法:

    事務:

    用事務將消息發送代碼包圍起來:

    Confirm模式:

    如下所示,在發送代碼前執行channel.confirmSelect(),如果消息未正常發送,就會進入if代碼塊,可以進行重發也可以對失敗消息進行記錄

    異步confirm方法:

    顧名思義,就是生產者發送消息后不用等待服務端回饋發送狀態,可以繼續執行后面的代碼,對于失敗消息重發進行異步處理:

    Spring AMQP中添加配置:

    生產者確認機制,確保消息正確發送,如果發送失敗會有錯誤回執,從而觸發重試

    spring:rabbitmq:publisher-confirms: true

    ?

    總結

    以上是生活随笔為你收集整理的(十三)RabbitMQ使用详解的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。