说说 RabbiMQ 的应答模式
RabbiMQ 我們都很熟悉了,是很常用的一個開源消息隊列。搞懂 RabbiMQ 的應(yīng)答模式對我們排查錯誤很有幫助,也能避免一些坑。本文說說 RabbiMQ 的應(yīng)答模式。
生產(chǎn)者發(fā)出一條消息給 RabbiMQ ,RabbiMQ 將消息推送給消費者,消費者處理完消息后告訴 RabbiMQ,我已經(jīng)接收到消息并處理了,RabbiMQ 收到通知后會將消息從隊列中刪除。消費者通知 MQ 的這個過程就是消息的應(yīng)答。在 RabbiMQ 中有兩種應(yīng)答模式:自動應(yīng)答和手動應(yīng)答。
版本
dotNET Core :3.1
RabbitMQ:3.8.2
RabbitMQ.Client:6.2.1
自動應(yīng)答
當 RabbiMQ 開啟了消息的自動應(yīng)答,一旦 RabbiMQ 將消息分發(fā)給了消費者,就會將消息從內(nèi)存中刪除。這種情況下,如果正在執(zhí)行的消費者掛掉,就會丟失正在處理的消息。
生產(chǎn)者代碼
static?void?Main(string[]?args) {ConnectionFactory?factory?=?new?ConnectionFactory{UserName?=?"oec2003",Password?=?"000000",HostName?=?"10.211.55.6"};using?(var?connection?=?factory.CreateConnection())using?(var?channel?=?connection.CreateModel()){Console.WriteLine("RabbitMQ連接成功,請輸入消息,輸入exit退出");channel.QueueDeclare("oec2003",?false,?false,?false,?null);string?input;do{input?=?Console.ReadLine();var?body?=?Encoding.UTF8.GetBytes(input);channel.BasicPublish("",?"oec2003",?null,?body);}while?(input.Trim().ToLower()?!=?"exit");} }消費者代碼
static?void?Main(string[]?args) {ConnectionFactory?factory?=?new?ConnectionFactory{UserName?=?"oec2003",Password?=?"000000",HostName?=?"10.211.55.6"};using?(var?connection?=?factory.CreateConnection())using?(var?channel?=?connection.CreateModel()){Console.WriteLine("消費者開始監(jiān)聽......");channel.QueueDeclare("oec2003",?false,?false,?false,?null);EventingBasicConsumer?consumer?=?new?EventingBasicConsumer(channel);consumer.Received?+=?(ch,?ea)?=>{string?message?=?Encoding.Default.GetString(ea.Body.ToArray());Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd?HH:mm:ss")}:收到消息:?{message}");System.Threading.Thread.Sleep(10000);};channel.BasicConsume("oec2003",?true,?consumer);Console.ReadKey();} }channel.BasicConsume 方法的第二個參數(shù)設(shè)置為 true 表示自動應(yīng)答;
開啟自動應(yīng)答后,消息是生產(chǎn)者發(fā)布后,當有消費者連接上后,所有的消息都會被自動確認,并且從內(nèi)存中刪除,這時如果消費者進程掛掉,沒有處理的消息會丟失,正在處理中的消息也不會被重新投遞;
自動應(yīng)答的好處是消息隊列不會處于堵塞狀態(tài),但代價有點大,生產(chǎn)環(huán)境中還是不建議使用。
手動應(yīng)答
手動應(yīng)答,當消費者接收到消息處理完后,需要發(fā)送一個回執(zhí),告訴 RabbiMQ 服務(wù)端,這時 RabbiMQ 才會將該消息刪除。
生產(chǎn)者的代碼和上面的一樣,消費者代碼需要做相關(guān)調(diào)整,如下:
static?void?Main(string[]?args) {ConnectionFactory?factory?=?new?ConnectionFactory{UserName?=?"oec2003",Password?=?"000000",HostName?=?"10.211.55.6"};using?(var?connection?=?factory.CreateConnection())using?(var?channel?=?connection.CreateModel()){Console.WriteLine("消費者開始監(jiān)聽......");EventingBasicConsumer?consumer?=?new?EventingBasicConsumer(channel);consumer.Received?+=?(ch,?ea)?=>{string?message?=?Encoding.Default.GetString(ea.Body.ToArray());Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd?HH:mm:ss")}:收到消息:?{message}");channel.BasicAck(ea.DeliveryTag,?false);};channel.BasicConsume("oec2003",?false,?consumer);Console.ReadKey();} }channel.BasicConsume 方法的第二個參數(shù)設(shè)置為 false ,表示手動應(yīng)答模式;
在處理完消息后調(diào)用 channel.BasicAck(ea.DeliveryTag, false); 來進行應(yīng)答,告訴 RabbiMQ 消息已經(jīng)收到,RabbiMQ 收到這個回執(zhí)后,才會刪除消息。
可能遇到的問題
流量控制問題
在手動模式下,生產(chǎn)者發(fā)送消息后消息會從 Ready 進入到 Unacked 中,當消費者進行應(yīng)答之后消息從 Unacked 中刪除。
如果消息的產(chǎn)生速度遠遠大于消費者的處理速度,這時消息就會都在消費者處進行積壓了。我們會看到 Unacked 中的數(shù)量會越來越大,這樣消費者的壓力就會越來越大,這時就需要使用 Qos 來進行限流。
Qos
在消費者中使用 channel.BasicQos(0, 2, false); 來進行 Qos 的設(shè)置,如下圖:
BasicQos 方法有三個參數(shù):
prefetchSize:批量獲取消息的總大小,0為不限制;
prefetchCount:每次處理消息的個數(shù),比如 prefetchCount 設(shè)置為 2 ,那么處于 Unacked 狀態(tài)的消息最多就 2 條,當其中一條進行了得到了應(yīng)答后,才會從 Ready 中轉(zhuǎn)入一條到 Unacked
global:設(shè)置為 true 表示對 channel 進行控制,否則對每個消費者進行限制,一個 channel 可以有多個消費者
為什么使用 Qos :
提高服務(wù)穩(wěn)定性,因為有 prefetchCount 參數(shù)的控制,不會有海量的數(shù)據(jù)涌進來導致消費者服務(wù)掛掉;
提高吞吐量,當隊列有多個消費者時,每個消費者的能力不一樣,我們可以通過 prefetchCount 參數(shù)來合理安排每個消費者的處理能力,不會出現(xiàn)有的空閑,有的積壓。
prefetchCount 是一個非常關(guān)鍵的參數(shù),當消費者處理消息時,出現(xiàn)一些異常情況,導致無法進行 Ack 應(yīng)答,沒有應(yīng)答的數(shù)量大于等于 prefetchCount 時,隊列就會發(fā)生堵塞。所以我們一定要確保消息的處理能夠被異常捕獲,并在 finally 中進行 Ack 應(yīng)答,代碼如下:
try {string?message?=?Encoding.Default.GetString(ea.Body.ToArray());Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd?HH:mm:ss")}:收到消息:?{message}");if?(message?==?"error"){throw?new?Exception("mq?error");}else?if?(message?==?"sleep"){System.Threading.Thread.Sleep(60000);} } catch?(Exception) {//處理異常 } finally {channel.BasicAck(ea.DeliveryTag,?false); }一旦隊列堵塞了,一種處理方式就是斷掉客戶端,這樣,處在 Unacked 中的消息會重新回到 Ready 中,會重新進行投遞進行消費。
總結(jié)
1、自動應(yīng)答模式需要慎用,特別是生產(chǎn)環(huán)境;
2、不開啟 Qos ,消費者可能會面臨很大壓力,但消息不會堵塞(測試過 500 個未進行 Ack 沒有造成堵塞),現(xiàn)在不確定在沒有 Qos 的情況下,有沒有默認的最大 prefetchCount ;
3、開啟 Qos ,prefetchCount 的值很關(guān)鍵,并且需要做好異常處理,防止堵塞。
希望本文對您有所幫助!
總結(jié)
以上是生活随笔為你收集整理的说说 RabbiMQ 的应答模式的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: OpenTelemetry - 云原生下
- 下一篇: 微软2020开源回顾:止不住的挨骂,停不