日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ 队列消息持久化

發布時間:2023/12/20 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ 队列消息持久化 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

參考鏈接:

https://www.cnblogs.com/Keep-Ambition/p/8044752.html

?假如消息隊列test里面還有消息等待消費者(consumers)去接收,但是這個時候服務器端宕機了,這個時候消息是否還在?

?1、隊列消息非持久化

服務端(producer):

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import?pika # 聲明一個socket 實例 connect?=?pika.BlockingConnection(pika.ConnectionParameters("localhost")) # 聲明一個管道 channel?=?connect.channel() # 聲明queue名稱為test channel.queue_declare(queue="test") #RabbitMQ的消息永遠不會被直接發送到隊列中,它總是需要經過一次交換 channel.basic_publish(exchange='', ??????????????????????routing_key="test", ??????????????????????body="hello word") print("Sent 'hello world'") connect.close()

?

客戶端(consumers):

+ View Code

上面的服務端和客戶端聲明queue的方式都是非持久的

channel.queue_declare(queue="test") 

①服務端先發送往test隊列里發送兩條消息

②通過運行--services.msc進入服務重新啟動RabbitMQ

?

③再次查看消息隊列queue中的消息數量

?

通過小實驗可以看出,非持久聲明的queue,在服務端宕機后,消息隊列queue和消息都不復存在了

?

2、隊列消息持久化:

①隊列持久化很簡單,只需要在服務端(produce)聲明queue的時候添加一個參數:

channel.queue_declare(queue='shuaigaogao', durable=True)? # durable=True 持久化

②僅僅持久化隊列是沒有意義的,還需要多消息進行持久化

channel.basic_publish(exchange="", ??????????????????????routing_key="shuaigaogao",? #queue的名字 ??????????????????????body="hello world",?? #body是要發送的內容 ??????????????????????properties=pika.BasicProperties(delivery_mode=2,) # make message persistent=>使消息持久化的特性 ??????????????????????)

③最后一步,在服務端隊列消息都持久化了之后需要在客戶端聲明queue的時候也持久化

1 channel.queue_declare(queue='shuaigaogao', durable=True)

這樣就算再傳遞消息過程中,服務端的發生宕機,消息和隊列也不會丟失

小結:

  • RabbitMQ在服務端沒有聲明隊列和消息持久化時,隊列和消息是存在內存中的,服務端宕機了,隊列和消息也不會保留。
  • 服務端聲明持久化,客戶端想接受消息的話,必須也要聲明queue時,也要聲明持久化,不然的話,客戶端執行會報錯。
  • ?以上兩句是整篇文章的重中之重!!!

    ?

    RabbitMQ 消息公平分發?

    如果Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,很可能出現,一個機器配置不高的消費者那里堆積了很多消息處理不完,同時配置高的消費者卻一直很輕松。為解決此問題,可以在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。

    ?

    channel.basic_qos(prefetch_count=1)

    通俗的講就是消費者有多大本事,就干多少活,消費者處理的越慢,其消息分配分發的就少,反之消費者消息處理的多,處理的快,就可以多向這個消費者分配一些消息。服務端給客戶端發消息的時候,先檢查一下,這個消費者現在還有多少消息,如果處理的消息超過1條,就不給這個消費者發送消息了

    ?

    隊列消息持久化+公平分發示列:

    服務端:

    import?pika # 聲明一個socket 實例 connect =?pika.BlockingConnection(pika.ConnectionParameters("localhost")) # 聲明一個管道 channel =?connect.channel() # 聲明queue名稱為test channel.queue_declare(queue="test", durable=True)? # 隊列持久化 #RabbitMQ的消息永遠不會被直接發送到隊列中,它總是需要經過一次交換 channel.basic_publish(exchange='', ??????????????????????routing_key="test", ??????????????????????body="hello word", ??????????????????????properties=pika.BasicProperties(delivery_mode=2,))? # 消息持久化 print("Sent 'hello world'") connect.close()

    ?

    客戶端:

    import?pika import?time # 聲明socket實例 connect =?pika.BlockingConnection(pika.ConnectionParameters("localhost")) # 聲明一個管道? 雖然在之前的produce代碼中聲明過一次管道, # 但是在不知道produce中的管道是否運行之前(如果未運行,consumers中也不聲明的話就會報錯), # 在consumers中也聲明一次是一種正確的做法 channel =?connect.channel() #聲明queue channel.queue_declare(queue="test", durable=True) #回調函數 def?callback(ch, method, properites, body): ????time.sleep(30) ????print("-----", ch, method, properites, body) ????print("Received %r"?%?body) ????ch.basic_ack(delivery_tag=method.delivery_tag)? # 手動確認收到消息,添加手動確認時,no_ack必須為False,不然就會報錯 channel.basic_qos(prefetch_count=1)? # 在消息消費之前加上消息處理配置 channel.basic_consume(callback, ??????????????????????queue="test", ??????????????????????no_ack=False) print("Waiting for messages") #這個start只要一啟動,就一直運行,它不止收一條,而是永遠收下去,沒有消息就在這邊卡住 channel.start_consuming()

    總結

    以上是生活随笔為你收集整理的RabbitMQ 队列消息持久化的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。