日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

【RabbitMQ】8、RabbitMQ之mandatory和immediate

發布時間:2023/11/29 编程问答 45 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【RabbitMQ】8、RabbitMQ之mandatory和immediate 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1. 概述

mandatory和immediate是AMQP協議中basic.publish方法中的兩個標識位,它們都有當消息傳遞過程中不可達目的地時將消息返回給生產者的功能。對于剛開始接觸RabbitMQ的朋友特別容易被這兩個參數搞混,這里博主整理了寫資料,簡單講解下這兩個標識位。

mandatory?
當mandatory標志位設置為true時,如果exchange根據自身類型和消息routeKey無法找到一個符合條件的queue,那么會調用basic.return方法將消息返回給生產者(Basic.Return + Content-Header + Content-Body);當mandatory設置為false時,出現上述情形broker會直接將消息扔掉。

mandatory標志的作用:在消息沒有被路由到合適隊列情況下會將消息返還給消息發布者,同時我們測試了哪些情況下消息不會到達合適的隊列,測試1演示的是創建了exchange但是沒有為他綁定隊列導致的消息未到達合適隊列,測試3演示的是創建了exchange同時創建了queue,但是在將兩者綁定的時候,使用的bindingKey和消息發布者使用的rountingKey不一致導致的消息未到達合適隊列;

immediate?
當immediate標志位設置為true時,如果exchange在將消息路由到queue(s)時發現對于的queue上么有消費者,那么這條消息不會放入隊列中。當與消息routeKey關聯的所有queue(一個或者多個)都沒有消費者時,該消息會通過basic.return方法返還給生產者。

概括來說,mandatory標志告訴服務器至少將該消息route到一個隊列中,否則將消息返還給生產者;immediate標志告訴服務器如果該消息關聯的queue上有消費者,則馬上將消息投遞給它,如果所有queue都沒有消費者,直接把消息返還給生產者,不用將消息入隊列等待消費者了。


2. mandatory

在生產者通過channle的basicPublish方法發布消息時,通常有幾個參數需要設置,為此我們有必要了解清楚這些參數代表的具體含義及其作用,查看channel接口,會發現存在3個重載的basicPublish方法:

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException; void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException; void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;

mandatory和immediate上面已經解釋過了,其余的參數分別是:?
exchange:交換機名稱?
routingkey:路由鍵?
props:消息屬性字段,比如消息頭部信息等等?
body:消息主體部分

本節主要講述mandatory, 下面我們寫一個demo,在RabbitMQ broker中有:?
exchange : exchange.mandatory.test?
queue: queue.mandatory.test?
exchange路由到queue的routingkey是mandatory?
這里先不講當前的exchange綁定到queue中,即:

channel.basicPublish(exchangeName, "", mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes());

詳細代碼如下:

package com.vms.test.zzh.rabbitmq.self;import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * Created by hidden on 2017/2/7. */ public class RBmandatoryTest { public static final String ip = "10.198.197.73"; public static final int port = 5672; public static final String username = "root"; public static final String password = "root"; public static final String queueName = "queue.mandatory.test"; public static final String exchangeName = "exchange.mandatory.test"; public static final String routingKey = "mandatory"; public static final Boolean mandatory = true; public static final Boolean immediate = false; public static void main(String[] args) { try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(ip); factory.setPort(port); factory.setUsername(username); factory.setPassword(password); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.basicQos(1); channel.basicPublish(exchangeName, "", mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes()); // channel.close(); // connection.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }

運行,之后通過wireshark抓包工具可以看到如下圖所示:?

這里可以看到最后執行了basic.return方法,將發布者發出的消息返回給了發布者,查看協議的arguments參數部分可以看到:reply-text字段值為NO_ROUTE,表示消息并沒有路由到合適的隊列中;

那么我們該怎么獲取到沒有被正確路由到合適隊列的消息呢?這時候可以通過為channel信道設置ReturnListener監聽器來實現,具體代碼(main函數部分):

try {ConnectionFactory factory = new ConnectionFactory();factory.setHost(ip);factory.setPort(port); factory.setUsername(username); factory.setPassword(password); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.basicQos(1); channel.basicPublish(exchangeName, "", mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes()); channel.addReturnListener(new ReturnListener() { public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties basicProperties, byte[] body) throws IOException { String message = new String(body); System.out.println("Basic.return返回的結果是:"+message); } }); // channel.close(); // connection.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }

運行結果:

Basic.return返回的結果是:===mandatory===

下面我們來看一下,設置mandatory標志且exchange路由到queue中,代碼部分只需要將:

channel.basicPublish(exchangeName, "", mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes());

改為

channel.basicPublish(exchangeName, routingKey, mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes());

即可。?
通過wireshark抓包如下:?

可以看到并不會有basic.return方法被調用。查看RabbitMQ管理界面發現消息已經到達了隊列。


3. immediate

在RabbitMQ3.0以后的版本里,去掉了immediate參數的支持,發送帶immediate標記的publish會返回如下錯誤:?
“{amqp_error,not_implemented,”immediate=true”,’basic.publish’}”

為什么移除immediate標記,參見如下版本變化描述:?
Removal of “immediate” flag?
What changed? We removed support for the rarely-used “immediate” flag on AMQP’s basic.publish.?
Why on earth did you do that? Support for “immediate” made many parts of the codebase more complex, particularly around mirrored queues. It also stood in the way of our being able to deliver substantial performance improvements in mirrored queues.?
What do I need to do? If you just want to be able to publish messages that will be dropped if they are not consumed immediately, you can publish to a queue with a TTL of 0.?
If you also need your publisher to be able to determine that this has happened, you can also use the DLX feature to route such messages to another queue, from which the publisher can consume them.?
這段解釋的大概意思是:immediate標記會影響鏡像隊列性能,增加代碼復雜性,并建議采用“TTL”和“DLX”等方式替代。

出處:https://yq.aliyun.com/articles/238349?spm=5176.8091938.0.0.EfSZh4

http://www.mamicode.com/info-detail-1673003.html?spm=5176.100239.blogcont238349.8.FAbhIg

轉載于:https://www.cnblogs.com/wangzhongqiu/p/7832796.html

總結

以上是生活随笔為你收集整理的【RabbitMQ】8、RabbitMQ之mandatory和immediate的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。