RabbitMQ 入门系列(3)— 生产者消费者 Python 代码实现
生產者消費者代碼示例
上一章節中對消息通信概念做了詳細的說明,本章節我們對 RabbitMQ 生產者和消費者代碼分別做一示例說明。
1. 生產者代碼
#!/usr/bin/env python
# coding=utf-8# producerimport pika# 指定遠程 rabbitmq 的用戶名密碼并創建憑證
credentials = pika.PlainCredentials(username="guest", password="guest")# 1. 創建 connect 連接
connect = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, virtual_host='/', credentials=credentials))# 2. 在 connect 上創建一個 channel
channel = connect.channel()# 3. 在 channel 上聲明交換器 exchange
channel.exchange_declare(exchange='hello', exchange_type='direct', passive=False, durable=True, auto_delete=False)# 4. 聲明一個隊列,生產者和消費者都要聲明一個相同的隊列,用來防止萬一某一方掛了,另一方能正常運行
channel.queue_declare(queue='hello')# 5. 通過鍵 'world' 將隊列和交換器綁定
channel.queue_bind(queue='hello', exchange='hello', routing_key='world')# 6. 創建純文本消息
msg_props = pika.BasicProperties()
msg_props.content_type = 'text/plain'# 7. 將消息發送到 RabbitMQ
message = 'quit'
channel.basic_publish(exchange='hello', routing_key='world', properties=msg_props, body=message)# 8. 關閉通道
channel.close()# 9. 當生產者發送完消息后,可選擇關閉連接
connect.close()
exchange_declare 方法參數詳解:
- exchange : 交換器的名稱;
- exchange_type : 交換器的類型,常見的如 fanout、direct 、topic;
- durable:設置是否持久化。
durable 設置為 true 表示持久化, 反之是非持久化。持久化可以將交換器存盤,在服務器重啟的時候不會丟失相關信息;
- autoDelete : 設置是否自動刪除。
autoDelete 設置為true 則表示自動刪除。自動刪除的前提是至少有一個隊列或者交換器與這個交換器綁定, 之后所有與這個交換器綁定的隊列或者交換器都與此解綁。注意不能錯誤地把這個參數理解為: “當與此交換器連接的客戶端都斷開時, RabbitMQ 會自動刪除本交換器”
- internal:設置是否是內置的。
如果設置為true,則表示是內置的交換器,客戶端程序無法直接發送消息到這個交換器中,只能通過交換器路由到交換器這種方式。
queue_declare 方法參數詳解:
-
queue : 隊列的名稱;
-
durable: 設置是否持久化。為 true 則設置隊列為持久化。持久化的隊列會存盤,在服務器重啟的時候可以保證不丟失相關信息;
-
exclusive : 設置是否排他。為true 則設置隊列為排他的。如果一個隊列被聲明為排他隊列,該隊列僅對首次聲明它的連接可見,并在連接斷開時自動刪除。這里需要注意三點:
- 排他隊列是基于連接( Connection) 可見的,同一個連接的不同信道 (Channel) 是可以同時訪問同一連接創建的排他隊列;
- "首次"是指如果一個連接己經聲明了一個排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同:
- 即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除,這種隊列適用于一個客戶端同時發送和讀取消息的應用場景;
-
autoDelete: 設置是否自動刪除。
為true 則設置隊列為自動刪除。自動刪除的前提是:至少有一個消費者連接到這個隊列,之后所有與這個隊列連接的消費者都斷開時,才會自動刪除。不能把這個參數錯誤地理解為:“當連接到此隊列的所有客戶端斷開時,這個隊列自動刪除”,因為生產者客戶端創建這個隊列,或者沒有消費者客戶端與這個隊列連接時,都不會自動刪除這個隊列。
2. 消費者代碼
#!/usr/bin/env python
# coding=utf-8# consumerimport pika# 指定遠程 rabbitmq 的用戶名密碼并創建憑證
credentials = pika.PlainCredentials(username="guest", password="guest")# 1. 創建 connect 連接
connect = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, virtual_host='/', credentials=credentials))# 2. 在 connect 上創建一個 channel
channel = connect.channel()# 3. 在 channel 上聲明交換器 exchange
channel.exchange_declare(exchange='hello', exchange_type='direct', passive=False, durable=True,auto_delete=False)# 4. 聲明一個隊列,生產者和消費者都要聲明一個相同的隊列,用來防止萬一某一方掛了,另一方能正常運行
channel.queue_declare(queue='hello')# 5. 通過鍵 'world' 將隊列和交換器綁定
channel.queue_bind(queue='hello', exchange='hello', routing_key='world')# 6. 定義一個回調函數,用來接收生產者發送的消息
'''在 Python3 中,bytes 和 str 的互相轉換方式是
str.encode('utf-8')
bytes.decode('utf-8')'''def callback(channel, method, properties, body):# 消息確認channel.basic_ack(delivery_tag=method.delivery_tag)if body.decode('utf-8') == "quit":# 停止消費,并退出channel.basic_cancel(consumer_tag='hello-consumer')channel.close()connect.close()else:print("msg is {}".format(body))print("msg type {}".format(type(body)))print("msg eval after type {}".format(type(eval(body))))# 7. 訂閱消費者
channel.basic_consume(callback, queue='hello', no_ack=False)# 8. 開始循環取消息
channel.start_consuming()
queue_bind 方法參數詳解:
- queue: 隊列名稱;
- exchange: 交換器的名稱;
- routingKey: 用來綁定隊列和交換器的路由鍵
3. 消費消息
RabbitMQ 的消費模式分兩種: 推 (Push) 模式和拉 (Pull) 模式。推模式采用 basic_consume 進行消費,而拉模式則是調用 basic_get 進行消費。
basic_get 可以單條地獲取消息,獲取消息完成后就關閉連接,通常不使用該方法。
basic_consume 參數說明:
- callback : 設置消費者的回調函數。用來處理 RabbitMQ 推送過來的消息;
- queue : 隊列的名稱;
- autoAck : 設置是否自動確認。建議設成false ,即不自動確認;
- consumerTag: 消費者標簽,用來區分多個消費者;
- noLocal : 設置為true 則表示不能將同一個Connectio口中生產者發送的消息傳送給這個Connection 中的消費者;
- exclusive : 設置是否排他;
4. 消費端的確認和拒絕
為了保證消息從隊列可靠地達到消費者, RabbitMQ 提供了消息確認機制( message acknowledgement) 。
消費者在訂閱隊列時,可以指定autoAck 參數,當autoAck 等于 false時, RabbitMQ 會等待消費者顯式地回復確認信號后才從內存(或者磁盤)中移去消息(實質上是先打上刪除標記,之后再刪除) 。
當 autoAck 等于 true 時, RabbitMQ 會自動把發送出去的消息置為確認,然后從內存(或者磁盤)中刪除,而不管消費者是否真正地消費到了這些消息。
采用消息確認機制后,只要設置 autoAck 參數為false,消費者就有足夠的時間處理消息(任務),不用擔心處理消息過程中消費者進程掛掉后消息丟失的問題, 因為 RabbitMQ 會一直等待持有消息直到消費者顯式調用 Basic.Ack 命令為止。
當 autoAck 參數置為 false ,對于 RabbitMQ 服務端而言,隊列中的消息分成了兩個部分:
-
等待投遞給消費者的消息:
-
己經投遞給消費者,但是還沒有收到消費者確認信號的消息。
如果 RabbitMQ 一直沒有收到消費者的確認信號,并且消費此消息的消費者己經斷開連接,則 RabbitMQ 會安排該消息重新進入隊列,等待投遞給下一個消費者,當然也有可能還是原來的那個消費者。
RabbitMQ 不會為未確認的消息設置過期時間,它判斷此消息是否需要重新投遞給消費者的唯一依據是消費該消息的消費者連接是否己經斷開,這么設計的原因是 RabbitMQ 允許消費者消費一條消息的時間可以很久很久。
在消費者接收到消息后,如果想明確拒絕當前的消息而不是確認,那么應該怎么做呢?
RabbitMQ 在2.0.0 版本開始引入了Basic.Reject 這個命令,Basic.Reject 命令一次只能拒絕一條消息,如果想要批量拒絕消息,則可以使用 Basic.Nack 這個命令。消費者客戶端可以調用channel.basicNack 方法來實現,
5. 關閉連接
在應用程序使用完之后,需要關閉連接,釋放資源:
channel.close()
conn.close()
顯式地關閉 channel 是個好習慣,但這不是必須的,在 connection 關閉的時候,channel 也會自動關閉。
總結
以上是生活随笔為你收集整理的RabbitMQ 入门系列(3)— 生产者消费者 Python 代码实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RabbitMQ 入门系列(2)— 生产
- 下一篇: RabbitMQ 入门系列(4)— Ra