RabbitMQ消息可靠性分析和应用
2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
RabbitMQ流程簡(jiǎn)介(帶Exchange)
? ? ? ?RabbitMQ使用一些機(jī)制來(lái)保證可靠性,如持久化、消費(fèi)確認(rèn)及發(fā)布確認(rèn)等。
? ? ? ?先看以下這個(gè)圖:
?
? ? ? ?P為生產(chǎn)者,X為中轉(zhuǎn)站(Exchange),紅色部分為消息隊(duì)列,C1、C2為消費(fèi)者。
? ? ? ?整個(gè)流程分成三部分:第一,生產(chǎn)者生產(chǎn)消息,發(fā)送到中轉(zhuǎn)站;第二,中轉(zhuǎn)站按定義的規(guī)則轉(zhuǎn)發(fā)消息到消息隊(duì)列;第三,消費(fèi)者從消息隊(duì)列獲取消息進(jìn)行消費(fèi)(處理)。
RabbitMQ消息可靠性分析和應(yīng)用
? ? ? ?應(yīng)用代碼均使用C#客戶端代碼實(shí)現(xiàn)。
一、發(fā)布確認(rèn)
? ? ? ?生產(chǎn)者生產(chǎn)消息,發(fā)送到中轉(zhuǎn)站的過(guò)程中,可能會(huì)因?yàn)榫W(wǎng)絡(luò)丟包、網(wǎng)絡(luò)故障等問(wèn)題造成消息丟失。為了確保生產(chǎn)者發(fā)送的消息不會(huì)丟失,RabbitMQ提供了發(fā)布確認(rèn)(Publisher Confirms)機(jī)制,從而提高消息的可靠性(注意:發(fā)布確認(rèn)機(jī)制不能和事務(wù)機(jī)制一起使用)。
? ? ? ?單條消息發(fā)布確認(rèn):
| 1 2 3 4 5 6 7 8 9 10 | channel.ConfirmSelect();//發(fā)布確認(rèn)機(jī)制 string?message =?"msg"; var?body = Encoding.UTF8.GetBytes(message); channel.BasicPublish( ????????exchange:?"MarkTopicChange", ????????routingKey:?"MarkRouteKey.one", ????????basicProperties:?null, ????????body: body ????????); bool?isPublished = channel.WaitForConfirms();//通道(channel)里消息發(fā)送成功返回true |
? ? ? ?使用channel.ConfirmSelect,一旦信道進(jìn)入確認(rèn)模式,所有在該信道上面發(fā)布的消息都會(huì)被指派一個(gè)唯一的ID(從1開(kāi)始)。消息被投遞到所有匹配的隊(duì)列之后,RabbitMQ就會(huì)發(fā)送(Basic.Ack)給生產(chǎn)者(包含消息的唯一ID),生產(chǎn)者從而知道消息發(fā)送成功。
? ? ? ?多條消息發(fā)布確認(rèn):
| 1 2 3 4 5 6 7 8 9 10 11 12 13 | channel.ConfirmSelect();//發(fā)布確認(rèn)機(jī)制 foreach?(var?itemMsg?in?lstMsg) { ????byte[] sendBytes = Encoding.UTF8.GetBytes(itemMsg); ????//發(fā)布消息 ????channel.BasicPublish( ????????exchange:?"MarkTopicChange", ????????routingKey:?"MarkRouteKey.one", ????????basicProperties:?null, ????????body: sendBytes ????????); } bool?isAllPublished = channel.WaitForConfirms();//通道(channel)里所有消息均發(fā)送才返回true |
? ? ? ?注意:多消息發(fā)布確認(rèn)機(jī)制情況下,倘若要發(fā)送100條消息,發(fā)送90條后,突然網(wǎng)絡(luò)故障,后面的消息發(fā)送失敗了,那么isAllPublished返回的是false,而前面90條消息已經(jīng)發(fā)送到消息隊(duì)列了。我們還不知道哪些消息是發(fā)送失敗的,所以很多條消息發(fā)布確認(rèn),建議分幾次發(fā)送或多通道發(fā)送。
? ? ? ?此外,需要確保在中轉(zhuǎn)站(Exchange)的消息可以順利到達(dá)消息隊(duì)列。
? ? ? ?(1)首先需要定義匹配的Exchange和Queue,根據(jù)Exchange的類型和routingKey確定轉(zhuǎn)發(fā)的關(guān)系。
? ? ? ?(2)設(shè)置BasicPublish方法中mandatory參數(shù)為true,然后監(jiān)聽(tīng)Exchange中沒(méi)有匹配的隊(duì)列的消息,然后進(jìn)行相操作。
? ? ? ?(3)確保消息隊(duì)列有足夠內(nèi)存存儲(chǔ)消息。
? ? ? ?RabbitMQ默認(rèn)配置vm_memory_high_watermark為0.4。意思是控制消息占40%內(nèi)存左右。vm_memory_high_watermark_paging_ratio為0.5,當(dāng)消息占用內(nèi)存超過(guò)50%,RabbitMQ會(huì)把消息轉(zhuǎn)移到磁盤上以釋放內(nèi)存。當(dāng)磁盤剩余空間小于閥值disk_free_limit(默認(rèn)為50M),所有生產(chǎn)者阻塞,避免充滿磁盤,導(dǎo)致所有的寫操作失敗。
? ? ? ?RabbitMQ配置文件一般在%APPDATA%\RabbitMQ\rabbitmq.config.
? ? ? ?%APPDATA% 一般為 C:\Users\%USERNAME%\AppData\Roaming(Windows環(huán)境)
二、持久化
? ? ? ?消息存放到消息隊(duì)列后,在不配置消息持久化的情況下,若服務(wù)器重啟、關(guān)閉或宕機(jī)等,消息都會(huì)丟失。配置持久化可以有效提高消息的可靠性。持久化需要同時(shí)配置消息持久化和隊(duì)列持久化。單配置消息持久化,隊(duì)列消失了,消息沒(méi)有地方存放;單配置隊(duì)列持久化,隊(duì)列還在,消息沒(méi)了。
? ? ? ?隊(duì)列持久化在定義隊(duì)列時(shí)候配置
| 1 2 3 4 5 6 7 8 | //定義隊(duì)列 channel.QueueDeclare( ????queue:?"Mark_Queue",?//隊(duì)列名稱 ????durable:?true,?//隊(duì)列磁盤持久化?????????????????? ????exclusive:?false,//是否排他的,false。如果一個(gè)隊(duì)列聲明為排他隊(duì)列,該隊(duì)列首次聲明它的連接可見(jiàn),并在連接斷開(kāi)時(shí)自動(dòng)刪除 ????autoDelete:?false,//是否自動(dòng)刪除,一般設(shè)成false ????arguments:?null ????); |
消息持久化在發(fā)布消息時(shí)候配置
| 1 2 3 4 5 6 7 8 9 10 | //消息持久化,把DeliveryMode設(shè)成2 IBasicProperties properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; ????//發(fā)布消息 ????channel.BasicPublish( ????????exchange:?"MarkTopicChange", ????????routingKey:?"MarkRouteKey.one", ????????basicProperties: properties, ????????body: sendBytes ????????); |
? ? ? ?如何配置了事務(wù)機(jī)制或發(fā)布確認(rèn)(publisher confirm)機(jī)制,服務(wù)端的返回Basic.Ack是在消息落盤之后執(zhí)行的,進(jìn)一步的提高了消息的可靠性。
? ? ? ?為了防止磁盤損壞帶來(lái)的消息丟失,可以配置鏡像隊(duì)列,這里不作介紹。
三、消費(fèi)確認(rèn)
? ? ? ?為了確保消息被消費(fèi)者消費(fèi),RabbitMQ提供消費(fèi)確認(rèn)模式(consumer Acknowledgements)。自動(dòng)確認(rèn)模式,當(dāng)消費(fèi)者成功接收到消息后,自動(dòng)通知RabbitMQ,把消息隊(duì)列中相應(yīng)消息刪除。這很大程度上滿足不了我們,假如消費(fèi)者接收到消息后,服務(wù)器宕機(jī),消息還沒(méi)處理完成,這樣就會(huì)造成消息丟失。手動(dòng)確認(rèn)模式,當(dāng)消費(fèi)者成功處理完消息后,手動(dòng)發(fā)消息通知RabbitMQ,把消息隊(duì)列中相應(yīng)消息刪除。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | consumer.Received += (model, ea) => { ????var?body = ea.Body; ????var?message = Encoding.UTF8.GetString(body); ????var?routingKey = ea.RoutingKey; ????Console.WriteLine(" [x] Received '{0}':'{1}'", ??????????????????????routingKey, ??????????????????????message); ? //確認(rèn)該消息已被消費(fèi),發(fā)刪除消息給RabbitMQ,把消息隊(duì)列中的消息刪除 channel.BasicAck(ea.DeliveryTag,?false); //消費(fèi)消息失敗,拒絕此消息,重回隊(duì)列,讓它可以繼續(xù)發(fā)送到其他消費(fèi)者 //channel.BasicReject(ea.DeliveryTag, true); //消費(fèi)消息失敗,拒絕多條消息,重回隊(duì)列,讓它們可以繼續(xù)發(fā)送到其他消費(fèi)者 //channel.BasicNack(ea.DeliveryTag, true, true); }; //手動(dòng)確認(rèn)消息,把a(bǔ)utoAck設(shè)成false channel.BasicConsume(queue:?"Mark_Queue", ?????????????????????autoAck:?false, ?????????????????????consumer: consumer); |
? ? ? ?這里值得注意的是,消息處理完成后,一定要把處理完成的消息發(fā)送到RabbitMQ(channel.BasicAck(ea.DeliveryTag, false)),不然RabbitMQ會(huì)一直等待,從而造成內(nèi)存泄露。若處理消息過(guò)程中發(fā)生異常,可以使用channel.BasicReject(ea.DeliveryTag, true)來(lái)拒絕此消息,讓它重回隊(duì)列。若RabbitMQ收不到消費(fèi)者任何確認(rèn)消息的信號(hào)(包括確認(rèn)信號(hào),拒絕信號(hào)燈),直到此消費(fèi)者斷開(kāi)連接,消息才能重回隊(duì)列,繼續(xù)發(fā)送到其他消費(fèi)者。
? ? ? ?提醒一下,假如消費(fèi)者消費(fèi)消息的方法不支持并發(fā)(取決于需求),可以限制消費(fèi)者每次只接收一條消息。
| 1 | channel.BasicQos(0, 1,?false); |
轉(zhuǎn)載于:https://my.oschina.net/u/4052893/blog/3006677
總結(jié)
以上是生活随笔為你收集整理的RabbitMQ消息可靠性分析和应用的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 我的VS CODE
- 下一篇: 2018年澳门就业情况理想 最新失业率维