簡介
MessageQueue用于解決跨進程、跨線程、跨應用、跨網絡的通信問題。
RabbitMQ使用erlang開發,在windows上使用時要先安裝erlang。
官方的示例比較容易理解,可以點這里去看看。
結構
生產者 ---> exchange ---> queue ---> 消費者。
生產者負責提供消息,exchange負責分發消息到指定queue,queue存儲消息(默認臨時,可設置持久化),消費者接收處理消息。
基本模型
流程:
建立到rabbitmq的連接建立通道聲明使用的隊列(生產者和消費者都要聲明,因為不能確定兩者誰先運行)生產/消費持續監聽/關閉連接python中使用pika模塊來處理與rabbitmq的連接。
# 生產者
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
r = channel.queue_declare(queue='name', exclusive=False, durable=False) # exclusive設置True是隨機生成一個queue名字并返回,durable設置True是隊列持久化
queue_name = r.method.queuechannel.basic_publish(exchange = '', # 使用默認分發器routing_key = queue_name,properties = pika.BasicProperties(delivery_mode = 2 # 這個參數用于設置消息持久化),body = [data]
)connection.close()# 消費者
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
r = channel.queue_declare(queue='name', exclusive=False, durable=False)
queue_name = r.method.queuedef callback(channel, method, properties, body):pass# channel.basic_ack(delivery_tag = method.delivery_tag) 在回調函數最后調用手工應答,表示消息處理完畢,queue可以刪除消息了channel.basic_consume(callback, # 這是個回調函數,接收生產者發來的bodyqueue = queue_name,no_ack = True # 設置True表示消息一經獲取就被從queue中刪除,如果這時消費者崩潰,則這條消息將永久丟失,所以去掉這個屬性,在回調函數中手工應答比較安全
)channel.basic_qos(prefetch_count = [num]) # 設置消費者的消費能力,數字越大,則說明該消費者能力越強,往往與設備性能成正比channel.start_consuming() # 阻塞模式獲取消息
# connection.process_data_events() 非阻塞模式獲取消息
發布訂閱模型
類似收音機廣播,訂閱者只要打開收音機就能收聽信息,但接收不到它打開之前的消息。
包括發布訂閱模型以及接下來的一些模型,都是通過exchange和routing_key這兩個屬性來控制的。直接用官網的源碼來做注釋。
流程:
發布者設置發布頻道收聽者配置頻道信息收聽者通過隨機queue綁定頻道接收消息# 發布者
#!/usr/bin/env python
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()# 創建一個命名exchange,并設置其type為fanout,表示廣播
channel.exchange_declare(exchange='logs',exchange_type='fanout')# 從命令行接收輸入
message = ' '.join(sys.argv[1:]) or "info: Hello World!"# 由于是廣播模式,任意消費者只要設置同樣的exchange,就能以任意queue來接收消息,所以這里routing_key置空
channel.basic_publish(exchange='logs',routing_key='',body=message)
print(" [x] Sent %r" % message)
connection.close()# 收聽者
#!/usr/bin/env python
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()# 這里使用同樣的exchange配置,就像調節收音機頻道
channel.exchange_declare(exchange='logs',exchange_type='fanout')# 在基礎模型中提到過,設置exclusive=True表示生成隨機的queue
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue# 生成了queue,還要將它與exchange進行綁定,這樣消息才能通過exchange進入queue
channel.queue_bind(exchange='logs',queue=queue_name)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r" % body)channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming()
路由/級別模型
將消息發送到指定的路由處,類似于logging模塊的分級日志消息。
主要利用channel.queue_bind(routing_key=[route])這個方法,來為queue增加路由。
流程:
生產者向指定路由發送消息消費者綁定路由根據路由接收到不同的消息# 生產者
#!/usr/bin/env python
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()# 同樣使用命名exchange,主要是type為direct
channel.exchange_declare(exchange='direct_logs',exchange_type='direct')# 將命令行輸入的路由作為接收消息的queue的屬性,只有匹配的才能接收到消息
severity = sys.argv[1] if len(sys.argv) > 2 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()# 消費者
#!/usr/bin/env python
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.exchange_declare(exchange='direct_logs',exchange_type='direct')result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue# 指定該消費者接收的消息路由
severities = sys.argv[1:]
if not severities:sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])sys.exit(1)# 對該消費者的queue綁定路由
for severity in severities:channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming()
細目模型/更細致的劃分
這個模型比前幾種更強大,但是原理與路由模型是相同的。
如果routing_key='#',它就相當于發布訂閱模式,向所有queue發送消息,如果routing_key值中不包含*,#,則相當于路由模型。
該模型主要是實現了模糊匹配。
# 生產者
#!/usr/bin/env python
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.exchange_declare(exchange='topic_logs',exchange_type='topic')routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',routing_key=routing_key,body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()# 消費者
#!/usr/bin/env python
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.exchange_declare(exchange='topic_logs',exchange_type='topic')result = channel.queue_declare(exclusive=True)
queue_name = result.method.queuebinding_keys = sys.argv[1:]
if not binding_keys:sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])sys.exit(1)for binding_key in binding_keys:channel.queue_bind(exchange='topic_logs',queue=queue_name,routing_key=binding_key)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming()
RPC模型
前面的幾種模型都只能是一端發消息,另一端接收,RPC模型實現的就是單端收發功能。
主要是通過兩個隊列實現,一個發,一個收。
流程:
客戶端發送消息到約定隊列,并且附帶返回隊列的名稱和驗證id服務器接到消息,將處理過的消息發送給指定隊列并附帶驗證id客戶端接到消息先驗證id,通過則處理消息# 服務器
#!/usr/bin/env python
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()channel.queue_declare(queue='rpc_queue')def fib(n):if n == 0:return 0elif n == 1:return 1else:return fib(n-1) + fib(n-2)def on_request(ch, method, props, body):n = int(body)print(" [.] fib(%s)" % n)response = fib(n)ch.basic_publish(exchange='',routing_key=props.reply_to,properties=pika.BasicProperties(correlation_id = \props.correlation_id),body=str(response))ch.basic_ack(delivery_tag = method.delivery_tag)channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')print(" [x] Awaiting RPC requests")
channel.start_consuming()# 客戶端
#!/usr/bin/env python
import pika
import uuidclass FibonacciRpcClient(object):def __init__(self):self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))self.channel = self.connection.channel()result = self.channel.queue_declare(exclusive=True)self.callback_queue = result.method.queueself.channel.basic_consume(self.on_response, no_ack=True,queue=self.callback_queue)def on_response(self, ch, method, props, body):if self.corr_id == props.correlation_id:self.response = bodydef call(self, n):self.response = Noneself.corr_id = str(uuid.uuid4())self.channel.basic_publish(exchange='',routing_key='rpc_queue',properties=pika.BasicProperties(reply_to = self.callback_queue,correlation_id = self.corr_id,),body=str(n))while self.response is None:self.connection.process_data_events()return int(self.response)fibonacci_rpc = FibonacciRpcClient()print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)
轉載于:https://www.cnblogs.com/ikct2017/p/9434468.html
總結
以上是生活随笔為你收集整理的RabbitMQ/pika模块的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。