日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

rabbitmq direct 多个消费者_RabbitMQ实战应用技巧

發布時間:2025/3/15 编程问答 21 豆豆
生活随笔 收集整理的這篇文章主要介紹了 rabbitmq direct 多个消费者_RabbitMQ实战应用技巧 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1. RabbitMQ實戰應用技巧

1.1. 前言

由于項目原因,之后會和RabbitMQ比較多的打交道,所以讓我們來好好整理下RabbitMQ的應用實戰技巧,盡量避免日后的采坑

1.2. 概述

RabbitMQ有幾個重要的概念:虛擬主機,交換機,隊列和綁定

  • 虛擬主機:一個虛擬主機持有一組交換機、隊列和綁定,我們可以從虛擬主機層面的顆粒度進行權限控制
  • 交換機:Exchange用于轉發消息,它并不存儲消息,如果沒有Queue隊列綁定到Exchange,它會直接丟棄掉生產者發來的數據。
    交換機還有個關聯的重要概念:路由鍵,消息轉發到哪個隊列根據路由鍵決定
  • 綁定:就是綁定交換機和隊列,它是多對多的關系,也就是說多個交換機可以綁同一個隊列,也可以一個交換機綁多個隊列

1.3. 交換機

交換機有四種類型的模式Direct, topic, Headers and Fanout

1.3.1. Direct Exchage

Direct模式使用的是RabbitMQ的默認交換機,也是最簡單的模式,適合比較簡單的場景

如下圖所示,使用Direct模式,我們需要創建不同的隊列,而默認交換機則通過Routing key路由鍵的值來決定轉發到哪個隊列,可以看到,路由鍵綁定隊列是可以指定多個的

1.3.2. Topic Exchange

Topic模式主要是根據通配符匹配,也就類似于模糊匹配,當這種匹配模式和路由鍵匹配后交換機就能轉發消息到指定隊列

  • 路由鍵為一串字符串,由句號(.)隔開,比如a.b.c
  • (*)代表指定位置一個單詞,(#)代表零個或者多個單詞,比如a.*.b.#,表示a和b中間隨意填個單詞,b后面可以跟n個單詞,比如a.x.b.c.d.e

Topic模式和Direct模式的區別在于交換機需要自己指定,路由鍵支持模糊匹配,例如:

rabbitTemplate.convertAndSend("topicExchange","a.x.b.d", " hello world!");

1.3.3. Headers Exchage

Headers也是根據規則匹配,但它不是根據路由鍵了,headers有個自定義匹配規則,它將匹配鍵值設在了消息的headers屬性上,當這些鍵值對有一對或者全部匹配時,消息才會被投遞到對應隊列,這種模式效率相對較低,一般不推薦使用

1.3.4. Fanout Exchange

Fanout即為大名鼎鼎的廣播模式了,它不需要管路由鍵,會把消息發給綁定它的全部隊列,就算配置了路由鍵也會被忽略

1.4. 復雜情況

  • 首先我們Direct模式,一個生產者一個消費者的情況,也就對應了一個發送者和一個隊列A接收,這是沒有疑問的,發送什么接收什么
  • 當Direct模式,一個生產者發消息,開啟多個消費者也就是多個相同queue,此時消息由多個消費者均勻分攤,不會重復消費(前提ack正常)
  • 當Topic模式,一個交換機綁定兩個隊列,路由鍵有重疊關系,如下代碼,此時指定路由鍵topic.message發送消息,隊列queueMessage和queueMessages都能接收到相同消息,也就是說,topic模式可以實現類似于廣播模式的形式,甚至更加靈活,它能否轉發到消息由路由鍵決定。
  • 相比于Fanout模式,我們如果要對消費者隊列分組發送,我們需要指定不同的路由鍵;而Fanout模式則需要指定不同的交換機和隊列綁定,實際使用結合實際情況
  • @Configuration public class TopicRabbitConfig {final static String message = "topic.message";final static String messages = "topic.messages";@Beanpublic Queue queueMessage() {return new Queue(TopicRabbitConfig.message);}@Beanpublic Queue queueMessages() {return new Queue(TopicRabbitConfig.messages);}@BeanTopicExchange exchange() {return new TopicExchange("exchange");}@BeanBinding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");}@BeanBinding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");} }

    1.5. springboot配置

    我們的常用配置如下

    spring.rabbitmq.addresses=localhost:5672 spring.rabbitmq.username=user spring.rabbitmq.password=123456 spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=1000 ##設置監聽限制:最大10,默認5 spring.rabbitmq.listener.simple.concurrency=5 spring.rabbitmq.listener.simple.max-concurrency=10 spring.rabbitmq.publisher-confirms=true spring.rabbitmq.publisher-returns=true spring.rabbitmq.template.mandatory=true spring.rabbitmq.listener.simple.acknowledge-mode=manual

    其中最后四條配置需要著重解釋:

    • spring.rabbitmq.publisher-confirms為true,表示生產者消息發出后,MQ的broker接收到了消息,發送回執表示確認接收,不設置則可能導致消息丟失
    • spring.rabbitmq.publisher-returns為true,表示當消息不能到達MQ的Broker端,,則使用監聽器對不可達的消息做后續處理,這種一般是路由鍵沒配好,或MQ宕機才可能發生
    • spring.rabbitmq.template.mandatory當上面兩個為true時,這個一定要配true,否則上面兩個不起作用
    • spring.rabbitmq.listener.simple.acknowledge-mode這個為manual表示手工確認,實際生產應該設為手工,才能保證你的業務是處理完成的,注意業務的冪等性,可重復調用,手工確認代碼如下例子
    @Component public class RabbitReceiver {@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue-1", durable="true"),exchange = @Exchange(value = "exchange-1", durable="true", type= "topic", ignoreDeclarationExceptions = "true"),key = "springboot.*"))@RabbitHandlerpublic void onMessage(Message message, Channel channel) throws Exception {System.err.println("--------------------------------------");System.err.println("消費端Payload: " + message.getPayload());Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);//手工ACK,獲取deliveryTagchannel.basicAck(deliveryTag, false);} }

    1.6. 隊列屬性

  • queue : 隊列的名字
  • durable : 為true表示隊列中數據持久化到磁盤,可以防止mq宕機重啟數據丟失
  • exclusive : 為true表示排他性,只允許一個當前連接訪問該隊列,當前已連接就不允許新的連接進入否則報錯,當連接斷開當前隊列會銷毀
  • autoDelete : 為true表示自動刪除,當沒有Connection連接到隊列的時候,會自動刪除
  • arguments : 這個參數用來添加一些額外參數的,如下圖片
    • 比如添加x-message-ttl為5000,則表示消息超過5秒沒被處理就會超時過期;
    • x-expires設置120000表示隊列在2分鐘內沒被消費則被刪除;
    • x-max-length,x-max-length-bytes表示傳送數據的最大長度和字節數
    • x-dead-letter-exchange,x-dead-letter-routing-key表示死信交換機和死信路由,放在需要過期或處理失敗的隊列屬性中,這些數據會轉發到死信隊列存儲起來,創建普通的交換機和隊列綁定,把交換機名填到x-dead-letter-exchange的值,填寫路由鍵要符合死信隊列的路由鍵
    • x-max-priority,表示設置優先級,范圍為0~255,只有當消息堆積的時候,這個優先級才有意義,數字越大優先級越高
    • x-queue-mode當為lazy,表示惰性隊列,3.6.0之后才被引入的概念,相比默認的模式,惰性隊列模式會將生產者產生的消息直接存到磁盤中,這當然會增加IO開銷,但適合應對大量消息堆積的情況;因為當大量消息堆積時,內存也不夠存放,會將消息轉存到磁盤,這個過程也是比較耗時且過程中不能接收新的消息。如果需要將普通隊列轉換成惰性隊列需要將原來的隊列刪除,重新創建個惰性隊列綁定。

    1.7. 交換機屬性

  • exchange : 交換機名稱
  • type : 交換機類型
  • durable : 持久化,同隊列
  • autoDelete : 是否自動刪除,同隊列
  • internal : 若為true,表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定。
  • arguments : 額外參數,目前只有個alternate-exchange,表示當生產者發送消息到這個交換機,路由不到該交換機的隊列,則會嘗試這個參數指定的交換機進行路由,若路由鍵匹配,則路由到alternate-exchange指定的隊列,相當于轉發了,剛好和上一個參數internal配合,若不想本交換機起到路由隊列的作用,可以設置internal為true,把消息都轉發到alternate-exchange指定的交換機,由該交換機來路由指定隊列,
    • 如下圖:exchange0設置了alternate-exchange交換機為exchange1,生產者發送數據到exchange0路由鍵為test1,在exchange0路由不到,則轉發到exchange1判斷路由符合,發送到隊列queue1

    1.8. 磁盤和內存

    在RabbitMQ的管理界面,當我們集群部署時可以看到Nodes節點中Info字段可能為disc也可能ram,表示了磁盤存儲或內存儲存。事實上,在集群部署的時候,我們至少要一個磁盤儲存,它代表了將交換機,隊列,綁定,用戶等元數據持久化保存到磁盤,一遍重啟RabbitMQ也能恢復到原先的狀態,當只有一個節點時,必定是磁盤存儲;而內存儲存也有它的優勢,就是效率更高速度更快

    1.9. 報錯案例

    • 當報下列錯誤,表示你一定存在排他性隊列,也就是設置了exclusive屬性的隊列,由于同一個連接創建的不同通道可以訪問同一個隊列,此時由于這個排他屬性會得到資源被鎖定錯誤,也就是下列的錯誤。
    • 由此我們可以知道,若你把隊列設置成了exclusive屬性的,那么就別創建新的連接去訪問同一個隊列
    ESOURCE_LOCKED - cannot obtain exclusive access to locked queue xxxxxx

    總結

    以上是生活随笔為你收集整理的rabbitmq direct 多个消费者_RabbitMQ实战应用技巧的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。