日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ/pika模块

發布時間:2023/12/10 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ/pika模块 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

簡介

MessageQueue用于解決跨進程、跨線程、跨應用、跨網絡的通信問題。

RabbitMQ使用erlang開發,在windows上使用時要先安裝erlang。

官方的示例比較容易理解,可以點這里去看看。

結構

生產者 ---> exchange ---> queue ---> 消費者。

生產者負責提供消息,exchange負責分發消息到指定queue,queue存儲消息(默認臨時,可設置持久化),消費者接收處理消息。

基本模型

流程:

  • 建立到rabbitmq的連接
  • 建立通道
  • 聲明使用的隊列(生產者和消費者都要聲明,因為不能確定兩者誰先運行)
  • 生產/消費
  • 持續監聽/關閉連接
  • python中使用pika模塊來處理與rabbitmq的連接。

    # 生產者 import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost') ) channel = connection.channel() r = channel.queue_declare(queue='name', exclusive=False, durable=False) # exclusive設置True是隨機生成一個queue名字并返回,durable設置True是隊列持久化 queue_name = r.method.queuechannel.basic_publish(exchange = '', # 使用默認分發器routing_key = queue_name,properties = pika.BasicProperties(delivery_mode = 2 # 這個參數用于設置消息持久化),body = [data] )connection.close()# 消費者 import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost') ) channel = connection.channel() r = channel.queue_declare(queue='name', exclusive=False, durable=False) queue_name = r.method.queuedef callback(channel, method, properties, body):pass# channel.basic_ack(delivery_tag = method.delivery_tag) 在回調函數最后調用手工應答,表示消息處理完畢,queue可以刪除消息了channel.basic_consume(callback, # 這是個回調函數,接收生產者發來的bodyqueue = queue_name,no_ack = True # 設置True表示消息一經獲取就被從queue中刪除,如果這時消費者崩潰,則這條消息將永久丟失,所以去掉這個屬性,在回調函數中手工應答比較安全 )channel.basic_qos(prefetch_count = [num]) # 設置消費者的消費能力,數字越大,則說明該消費者能力越強,往往與設備性能成正比channel.start_consuming() # 阻塞模式獲取消息 # connection.process_data_events() 非阻塞模式獲取消息

    發布訂閱模型

    類似收音機廣播,訂閱者只要打開收音機就能收聽信息,但接收不到它打開之前的消息。

    包括發布訂閱模型以及接下來的一些模型,都是通過exchange和routing_key這兩個屬性來控制的。直接用官網的源碼來做注釋。

    流程:

  • 發布者設置發布頻道
  • 收聽者配置頻道信息
  • 收聽者通過隨機queue綁定頻道接收消息
  • # 發布者 #!/usr/bin/env python import pika import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()# 創建一個命名exchange,并設置其type為fanout,表示廣播 channel.exchange_declare(exchange='logs',exchange_type='fanout')# 從命令行接收輸入 message = ' '.join(sys.argv[1:]) or "info: Hello World!"# 由于是廣播模式,任意消費者只要設置同樣的exchange,就能以任意queue來接收消息,所以這里routing_key置空 channel.basic_publish(exchange='logs',routing_key='',body=message) print(" [x] Sent %r" % message) connection.close()# 收聽者 #!/usr/bin/env python import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()# 這里使用同樣的exchange配置,就像調節收音機頻道 channel.exchange_declare(exchange='logs',exchange_type='fanout')# 在基礎模型中提到過,設置exclusive=True表示生成隨機的queue result = channel.queue_declare(exclusive=True) queue_name = result.method.queue# 生成了queue,還要將它與exchange進行綁定,這樣消息才能通過exchange進入queue channel.queue_bind(exchange='logs',queue=queue_name)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r" % body)channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming()

    路由/級別模型

    將消息發送到指定的路由處,類似于logging模塊的分級日志消息。

    主要利用channel.queue_bind(routing_key=[route])這個方法,來為queue增加路由。

    流程:

  • 生產者向指定路由發送消息
  • 消費者綁定路由
  • 根據路由接收到不同的消息
  • # 生產者 #!/usr/bin/env python import pika import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()# 同樣使用命名exchange,主要是type為direct channel.exchange_declare(exchange='direct_logs',exchange_type='direct')# 將命令行輸入的路由作為接收消息的queue的屬性,只有匹配的才能接收到消息 severity = sys.argv[1] if len(sys.argv) > 2 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs',routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()# 消費者 #!/usr/bin/env python import pika import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()channel.exchange_declare(exchange='direct_logs',exchange_type='direct')result = channel.queue_declare(exclusive=True) queue_name = result.method.queue# 指定該消費者接收的消息路由 severities = sys.argv[1:] if not severities:sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])sys.exit(1)# 對該消費者的queue綁定路由 for severity in severities:channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming()

    細目模型/更細致的劃分

    這個模型比前幾種更強大,但是原理與路由模型是相同的。

    如果routing_key='#',它就相當于發布訂閱模式,向所有queue發送消息,如果routing_key值中不包含*,#,則相當于路由模型。

    該模型主要是實現了模糊匹配。

    # 生產者 #!/usr/bin/env python import pika import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()channel.exchange_declare(exchange='topic_logs',exchange_type='topic')routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs',routing_key=routing_key,body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()# 消費者 #!/usr/bin/env python import pika import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()channel.exchange_declare(exchange='topic_logs',exchange_type='topic')result = channel.queue_declare(exclusive=True) queue_name = result.method.queuebinding_keys = sys.argv[1:] if not binding_keys:sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])sys.exit(1)for binding_key in binding_keys:channel.queue_bind(exchange='topic_logs',queue=queue_name,routing_key=binding_key)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming()

    RPC模型

    前面的幾種模型都只能是一端發消息,另一端接收,RPC模型實現的就是單端收發功能。

    主要是通過兩個隊列實現,一個發,一個收。

    流程:

  • 客戶端發送消息到約定隊列,并且附帶返回隊列的名稱和驗證id
  • 服務器接到消息,將處理過的消息發送給指定隊列并附帶驗證id
  • 客戶端接到消息先驗證id,通過則處理消息
  • # 服務器 #!/usr/bin/env python import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()channel.queue_declare(queue='rpc_queue')def fib(n):if n == 0:return 0elif n == 1:return 1else:return fib(n-1) + fib(n-2)def on_request(ch, method, props, body):n = int(body)print(" [.] fib(%s)" % n)response = fib(n)ch.basic_publish(exchange='',routing_key=props.reply_to,properties=pika.BasicProperties(correlation_id = \props.correlation_id),body=str(response))ch.basic_ack(delivery_tag = method.delivery_tag)channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue='rpc_queue')print(" [x] Awaiting RPC requests") channel.start_consuming()# 客戶端 #!/usr/bin/env python import pika import uuidclass FibonacciRpcClient(object):def __init__(self):self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))self.channel = self.connection.channel()result = self.channel.queue_declare(exclusive=True)self.callback_queue = result.method.queueself.channel.basic_consume(self.on_response, no_ack=True,queue=self.callback_queue)def on_response(self, ch, method, props, body):if self.corr_id == props.correlation_id:self.response = bodydef call(self, n):self.response = Noneself.corr_id = str(uuid.uuid4())self.channel.basic_publish(exchange='',routing_key='rpc_queue',properties=pika.BasicProperties(reply_to = self.callback_queue,correlation_id = self.corr_id,),body=str(n))while self.response is None:self.connection.process_data_events()return int(self.response)fibonacci_rpc = FibonacciRpcClient()print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" % response)

    轉載于:https://www.cnblogs.com/ikct2017/p/9434468.html

    總結

    以上是生活随笔為你收集整理的RabbitMQ/pika模块的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。