日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ消息可靠性分析和应用

發(fā)布時(shí)間:2024/4/13 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ消息可靠性分析和应用 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>

RabbitMQ流程簡(jiǎn)介(帶Exchange)

? ? ? ?RabbitMQ使用一些機(jī)制來(lái)保證可靠性,如持久化、消費(fèi)確認(rèn)及發(fā)布確認(rèn)等。

? ? ? ?先看以下這個(gè)圖:

?

? ? ? ?P為生產(chǎn)者,X為中轉(zhuǎn)站(Exchange),紅色部分為消息隊(duì)列,C1、C2為消費(fèi)者。

? ? ? ?整個(gè)流程分成三部分:第一,生產(chǎn)者生產(chǎn)消息,發(fā)送到中轉(zhuǎn)站;第二,中轉(zhuǎn)站按定義的規(guī)則轉(zhuǎn)發(fā)消息到消息隊(duì)列;第三,消費(fèi)者從消息隊(duì)列獲取消息進(jìn)行消費(fèi)(處理)。

RabbitMQ消息可靠性分析和應(yīng)用

? ? ? ?應(yīng)用代碼均使用C#客戶端代碼實(shí)現(xiàn)。

一、發(fā)布確認(rèn)

? ? ? ?生產(chǎn)者生產(chǎn)消息,發(fā)送到中轉(zhuǎn)站的過(guò)程中,可能會(huì)因?yàn)榫W(wǎng)絡(luò)丟包、網(wǎng)絡(luò)故障等問(wèn)題造成消息丟失。為了確保生產(chǎn)者發(fā)送的消息不會(huì)丟失,RabbitMQ提供了發(fā)布確認(rèn)(Publisher Confirms)機(jī)制,從而提高消息的可靠性(注意:發(fā)布確認(rèn)機(jī)制不能和事務(wù)機(jī)制一起使用)。

? ? ? ?單條消息發(fā)布確認(rèn):

1

2

3

4

5

6

7

8

9

10

channel.ConfirmSelect();//發(fā)布確認(rèn)機(jī)制

string?message =?"msg";

var?body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(

????????exchange:?"MarkTopicChange",

????????routingKey:?"MarkRouteKey.one",

????????basicProperties:?null,

????????body: body

????????);

bool?isPublished = channel.WaitForConfirms();//通道(channel)里消息發(fā)送成功返回true

? ? ? ?使用channel.ConfirmSelect,一旦信道進(jìn)入確認(rèn)模式,所有在該信道上面發(fā)布的消息都會(huì)被指派一個(gè)唯一的ID(從1開(kāi)始)。消息被投遞到所有匹配的隊(duì)列之后,RabbitMQ就會(huì)發(fā)送(Basic.Ack)給生產(chǎn)者(包含消息的唯一ID),生產(chǎn)者從而知道消息發(fā)送成功。

? ? ? ?多條消息發(fā)布確認(rèn):

1

2

3

4

5

6

7

8

9

10

11

12

13

channel.ConfirmSelect();//發(fā)布確認(rèn)機(jī)制

foreach?(var?itemMsg?in?lstMsg)

{

????byte[] sendBytes = Encoding.UTF8.GetBytes(itemMsg);

????//發(fā)布消息

????channel.BasicPublish(

????????exchange:?"MarkTopicChange",

????????routingKey:?"MarkRouteKey.one",

????????basicProperties:?null,

????????body: sendBytes

????????);

}

bool?isAllPublished = channel.WaitForConfirms();//通道(channel)里所有消息均發(fā)送才返回true 

? ? ? ?注意:多消息發(fā)布確認(rèn)機(jī)制情況下,倘若要發(fā)送100條消息,發(fā)送90條后,突然網(wǎng)絡(luò)故障,后面的消息發(fā)送失敗了,那么isAllPublished返回的是false,而前面90條消息已經(jīng)發(fā)送到消息隊(duì)列了。我們還不知道哪些消息是發(fā)送失敗的,所以很多條消息發(fā)布確認(rèn),建議分幾次發(fā)送或多通道發(fā)送。

? ? ? ?此外,需要確保在中轉(zhuǎn)站(Exchange)的消息可以順利到達(dá)消息隊(duì)列。

? ? ? ?(1)首先需要定義匹配的Exchange和Queue,根據(jù)Exchange的類型和routingKey確定轉(zhuǎn)發(fā)的關(guān)系。

? ? ? ?(2)設(shè)置BasicPublish方法中mandatory參數(shù)為true,然后監(jiān)聽(tīng)Exchange中沒(méi)有匹配的隊(duì)列的消息,然后進(jìn)行相操作。

? ? ? ?(3)確保消息隊(duì)列有足夠內(nèi)存存儲(chǔ)消息。

? ? ? ?RabbitMQ默認(rèn)配置vm_memory_high_watermark為0.4。意思是控制消息占40%內(nèi)存左右。vm_memory_high_watermark_paging_ratio為0.5,當(dāng)消息占用內(nèi)存超過(guò)50%,RabbitMQ會(huì)把消息轉(zhuǎn)移到磁盤上以釋放內(nèi)存。當(dāng)磁盤剩余空間小于閥值disk_free_limit(默認(rèn)為50M),所有生產(chǎn)者阻塞,避免充滿磁盤,導(dǎo)致所有的寫操作失敗。

? ? ? ?RabbitMQ配置文件一般在%APPDATA%\RabbitMQ\rabbitmq.config.

? ? ? ?%APPDATA% 一般為 C:\Users\%USERNAME%\AppData\Roaming(Windows環(huán)境)

二、持久化

? ? ? ?消息存放到消息隊(duì)列后,在不配置消息持久化的情況下,若服務(wù)器重啟、關(guān)閉或宕機(jī)等,消息都會(huì)丟失。配置持久化可以有效提高消息的可靠性。持久化需要同時(shí)配置消息持久化和隊(duì)列持久化。單配置消息持久化,隊(duì)列消失了,消息沒(méi)有地方存放;單配置隊(duì)列持久化,隊(duì)列還在,消息沒(méi)了。

? ? ? ?隊(duì)列持久化在定義隊(duì)列時(shí)候配置

1

2

3

4

5

6

7

8

//定義隊(duì)列

channel.QueueDeclare(

????queue:?"Mark_Queue",?//隊(duì)列名稱

????durable:?true,?//隊(duì)列磁盤持久化??????????????????

????exclusive:?false,//是否排他的,false。如果一個(gè)隊(duì)列聲明為排他隊(duì)列,該隊(duì)列首次聲明它的連接可見(jiàn),并在連接斷開(kāi)時(shí)自動(dòng)刪除

????autoDelete:?false,//是否自動(dòng)刪除,一般設(shè)成false

????arguments:?null

????);

  消息持久化在發(fā)布消息時(shí)候配置

1

2

3

4

5

6

7

8

9

10

//消息持久化,把DeliveryMode設(shè)成2

IBasicProperties properties = channel.CreateBasicProperties();

properties.DeliveryMode = 2;

????//發(fā)布消息

????channel.BasicPublish(

????????exchange:?"MarkTopicChange",

????????routingKey:?"MarkRouteKey.one",

????????basicProperties: properties,

????????body: sendBytes

????????);

? ? ? ?如何配置了事務(wù)機(jī)制或發(fā)布確認(rèn)(publisher confirm)機(jī)制,服務(wù)端的返回Basic.Ack是在消息落盤之后執(zhí)行的,進(jìn)一步的提高了消息的可靠性。

? ? ? ?為了防止磁盤損壞帶來(lái)的消息丟失,可以配置鏡像隊(duì)列,這里不作介紹。

三、消費(fèi)確認(rèn)

? ? ? ?為了確保消息被消費(fèi)者消費(fèi),RabbitMQ提供消費(fèi)確認(rèn)模式(consumer Acknowledgements)。自動(dòng)確認(rèn)模式,當(dāng)消費(fèi)者成功接收到消息后,自動(dòng)通知RabbitMQ,把消息隊(duì)列中相應(yīng)消息刪除。這很大程度上滿足不了我們,假如消費(fèi)者接收到消息后,服務(wù)器宕機(jī),消息還沒(méi)處理完成,這樣就會(huì)造成消息丟失。手動(dòng)確認(rèn)模式,當(dāng)消費(fèi)者成功處理完消息后,手動(dòng)發(fā)消息通知RabbitMQ,把消息隊(duì)列中相應(yīng)消息刪除。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

consumer.Received += (model, ea) =>

{

????var?body = ea.Body;

????var?message = Encoding.UTF8.GetString(body);

????var?routingKey = ea.RoutingKey;

????Console.WriteLine(" [x] Received '{0}':'{1}'",

??????????????????????routingKey,

??????????????????????message);

?

//確認(rèn)該消息已被消費(fèi),發(fā)刪除消息給RabbitMQ,把消息隊(duì)列中的消息刪除

channel.BasicAck(ea.DeliveryTag,?false);

//消費(fèi)消息失敗,拒絕此消息,重回隊(duì)列,讓它可以繼續(xù)發(fā)送到其他消費(fèi)者

//channel.BasicReject(ea.DeliveryTag, true);

//消費(fèi)消息失敗,拒絕多條消息,重回隊(duì)列,讓它們可以繼續(xù)發(fā)送到其他消費(fèi)者

//channel.BasicNack(ea.DeliveryTag, true, true);

};

//手動(dòng)確認(rèn)消息,把a(bǔ)utoAck設(shè)成false

channel.BasicConsume(queue:?"Mark_Queue",

?????????????????????autoAck:?false,

?????????????????????consumer: consumer);

? ? ? ?這里值得注意的是,消息處理完成后,一定要把處理完成的消息發(fā)送到RabbitMQ(channel.BasicAck(ea.DeliveryTag, false)),不然RabbitMQ會(huì)一直等待,從而造成內(nèi)存泄露。若處理消息過(guò)程中發(fā)生異常,可以使用channel.BasicReject(ea.DeliveryTag, true)來(lái)拒絕此消息,讓它重回隊(duì)列。若RabbitMQ收不到消費(fèi)者任何確認(rèn)消息的信號(hào)(包括確認(rèn)信號(hào),拒絕信號(hào)燈),直到此消費(fèi)者斷開(kāi)連接,消息才能重回隊(duì)列,繼續(xù)發(fā)送到其他消費(fèi)者。

? ? ? ?提醒一下,假如消費(fèi)者消費(fèi)消息的方法不支持并發(fā)(取決于需求),可以限制消費(fèi)者每次只接收一條消息。

1

channel.BasicQos(0, 1,?false);

轉(zhuǎn)載于:https://my.oschina.net/u/4052893/blog/3006677

總結(jié)

以上是生活随笔為你收集整理的RabbitMQ消息可靠性分析和应用的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。