queuedeclare参数说明_MQ 学习笔记之RabbitMQ
組件
一、 Connection (連接)
RabbitMQ 的 socket 的長鏈接,它封裝了 socket 協議相關部分邏輯
二、 Channel (信道)
建立在 Connection 連接之上的一種輕量級的連接,我們大部分的業務操作是在 Channel 這個接口中完成的,包括交換機的聲明 exchangeDeclare、
定義隊列的聲明 queueDeclare、
隊列的綁定 queueBind、
發布消息 basicPublish、
消費消息 basicConsume 等。
如果把 Connection 比作一條光纖電纜的話,那么 Channel 信道就比作成光纖電纜中的其中一束光纖。一個 Connection 上可以創建任意數量的 Channel。
Channel 中的方法聲明交換器
channel.exchangeDeclare(
exchange : "ExchangeName",//交換器的名字
type: "direct",// 模式
durable: false, // 持久化
autoDelete: false, //自動刪除
arguments:null 參數
);聲明隊列
channel.queueDeclare(
queue: 隊列的名稱,
durable: flase //是否持久化,false: 隊列在內存中,服務器掛掉后,隊列就沒了;true: 服務器重啟后,隊列將會重新生成。注意:只是隊列持久化,不代表隊列中的消息持久化!!!!
exclusive: 是否專屬,專屬的范圍針對的是連接,一個連接下面的多個信道是可見的。對于其他連接是不可見的。連接斷開后,該隊列會被刪除。
autoDelete: 是否自動刪除,
arguments: 隊列參數
);發送消息
channel.basicPublish(
exchange: "name", //交換機名稱
routingKey: "key", //路由鍵
basicProperties: null, //該條消息的配置
body: Encoding.Default.GetBytes(msg) //消息字節數組
);接收消息
channel.BasicGet(
queue: QueueName, //隊列名稱
autoAck: true //是否自動確認
);消費者設置
channel.basicConsume(
queue:QUEUE_NAME,
autoAck:true, //是否自動ack,如果不自動ack,需要使用 channel.ack、channel.nack、channel.basicReject
consumer // 消費者
);設置只處理一條消息
channel.basicQos(
0, // prefetchSize:0
1, prefetchCount:1 , 告訴 RabbitMQ, 不要同時給一個消費者推送多于 1 條消息,即一旦有 1 個消息還沒有 ack (確認),則該消費者將 block 掉,直到有消息確認
false //global:true\false 是否將上面設置應用于 channel,簡單點說,就是上面限制是 channel 級別的還是 consumer 級別
);自動應答
channel.basicAck(
e.DeliveryTag, // deliveryTag : e.DeliveryTag, 該消息的標記,ulong 類型.
false //multiple:是否批量.true: 將一次性確認所有小于 deliveryTag 的消息.
);
三、 Exchange (交換器)
1. RoutingKey
用自己的話說,這個 RoutingKey 就是表示生產者在發送消息時,交換器需要根據傳入的 key 去根據隊列和交換器綁定時用的 bindingKey 去匹配,這個 RoutingKey 生產者告訴交換器根據什么去分發;fanout 的 routingKey 失去作用,交換器會把所有的消息發送到綁定的隊列中;
2. Exchange 的創建direct direct 可以使用默認的 Exchange,不需要聲明,但需要指定消息發送到那個隊列,路由 key 就是隊列的名稱
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());fanout
fanout 的 routing key 是無效的,會把消息發送到所有的綁定隊列
channel.exchangeDeclare(EXCHANGE_NAME, "fanout")
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());topic
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
注意:使用 fanout 和 topic 需要先聲明 交換器類型;需要將 exchange 個 queue 綁定;
四、 Queue (隊列)
1. 隊列的聲明
channel.queueDeclare(
queue: QueueName, //隊列名稱
durable: false, //隊列是否持久化.false:隊列在內存中,服務器掛掉后,隊列就沒了;true:服務器重啟后,隊列將會重新生成.注意:只是隊列持久化,不代表隊列中的消息持久化!!!!
exclusive: false, //隊列是否專屬,專屬的范圍針對的是連接,也就是說,一個連接下面的多個信道是可見的.對于其他連接是不可見的.連接斷開后,該隊列會被刪除.注意,不是信道斷開,是連接斷開.并且,就算設置成了持久化,也會刪除.
autoDelete: true, //如果所有消費者都斷開連接了,是否自動刪除.如果還沒有消費者從該隊列獲取過消息或者監聽該隊列,那么該隊列不會刪除.只有在有消費者從該隊列獲取過消息后,該隊列才有可能自動刪除(當所有消費者都斷開連接,不管消息是否獲取完)
arguments: null //隊列的配置
);
2. 隊列詳細參數(arguments)消息生存時間 TTL 消息的存活時間,過了這個時間就會根據配置,到死信隊列中
隊列生存時間 Exp
隊列在指定的時間內沒有被使用 (訪問) 就會被刪除
如果有消費者訂閱,則不會被刪除
隊列最大消息數量 lim 隊列可以容納的消息的最大條數。超出后根據溢出策略進行處理
隊列消息最大容量 limB 隊列可以容納的消息的最大字節數,超出后同消息最大數
溢出策略 Ovfl 隊列中的消息溢出時,如何處理這些消息。要么丟棄隊列頭部的消息,要么拒絕接收后面生產者發送過來的所有消息,默認是丟棄之前的消息
死信隊列交換器 DXL (dead-letter-exchange) 死信交換機的名稱,當隊列中的消息的生存期到了,或者因長度限制被丟棄時,消息會被推送到 (綁定到) 這臺交換機 (的隊列中), 而不是直接丟掉.
私信隊列 routingKey DLK (dead-letter-routing-key) 死信隊列的 routingKey
優先級 Pri 設置該隊列中的消息的優先級最大值。發布消息的時候,可以指定消息的優先級,優先級高的先被消費注意 一旦聲明隊列參數創建,不可以修改;只能刪除重新創建了
3. binding key
簡單的理解為,交換器需要根據 key 值來給我發送消息,根據 bindingKey 的規則去匹配 RoutingKey,匹配上了就發送到隊列,僅對 direct 和 topic 模式有效;
五、消息 (measage)
channel.basicPublish(
exchange: "test_exchange",
routingKey: "",
mandatory: false,
basicProperties: null,
body: Encoding.Default.GetBytes(msg)
);exchange: 交換機名稱
routingKey: 路由 key若模式為 fanout 模式,就會忽略 key,所有綁定的隊列都會收到消息
該值僅對 direct 和 topic 模式有效mandatory: 消息處理當為 true 時,exchange 根據模式和 routKey 無法找到 queue 時,將消息返回給生產者;
當為 false 時, broker 直接丟棄消息basicProperties: 消息的基本屬性content_type 消息內容的類型,如 "application/json"
content_encoding 消息內容的編碼格式
priority 消息的優先級,上面文章已經講過了.
correlation_id 用于將 RPC 響應與請求相關聯.
reply_to 回調隊列
expiration 消息過期時間,單位毫秒。該參數值優先級 > 隊列參數設置中的消息生存期
message_id 消息 id
timestamp 消息的時間戳 ...body : 消息體
訂閱模式
1. Direct這個模式一對一,消息會根據 bingding key 精確匹配
2. fanout消息是一對多,一個消息可以發送到交換器下所有的隊列中
可以不傳入 routingKey 聲明是無效的,會把所有的消息發送到綁定到交換器上所有的隊列上;
3. Topic這個模式是一對多,消息根據 bingding key 匹配,分發到多個隊列中;
消息確認機制
發布者確認使用事務的方式 發送消息使用事務
confirms同步; 發布消息后,發送者等待確認結果
異步; 發布消息后時,提供回調接口,失敗后服務端會回調接口,這個了發送者需要將消息存儲下來,防止丟失;
消費者確認自動確認 (autoAck : true) 自動確認模式中,消息在發送到消費者后即被認為 "成功消費". 這種模式可以降低吞吐量(只要消費者可以跟上), 以降低交付和消費者處理的安全性 這種事不安全的,可能會丟失消息
手動確認 (autoAck : false)肯定確認 BasicAck 消息消費成功,隊列可以刪除消息
否定確認 BasicNack/BasicReject
2.1 requeue: false 隊列直接丟棄消息 2.2 requeue: true 重新入對,再次發送
消費者確認根據消息的吞吐量和消息的安全綜合考慮, 如果手動應答,消費者就需要做好冪等性的處理,因為網絡及其他情可能會導致消息多次發送;
遠程調用過程
RPC 調用過程當客戶端啟動時,創建一個匿名的獨占回調隊列.(匿名最好,當然也可以不是匿名的)
對于 RPC 請求,客戶端發送帶有兩個屬性的消息: ReplyTo(設置為回調隊列)和 CorrelationId(設置為每個請求的唯一值)。
請求被發送到隊列
RPC worker(正在等待該隊列上的請求。當請求出現時,它會執行函數并使用 ReplyTo 屬性中的隊列將結果返回給客戶端。
客戶端等待回調隊列上的數據。出現消息時,它會檢查 CorrelationId 屬性。如果它與請求中的值匹配,則將響應返回給應用程序。
總結
以上是生活随笔為你收集整理的queuedeclare参数说明_MQ 学习笔记之RabbitMQ的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ID3DXMesh的数据导出和导入
- 下一篇: 浪潮服务器nf5280m2安装系统,破茧