python 消息队列如何接收处理_python使用消息队列RabbitMq(进阶)
importpika
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()#聲明queue
channel.queue_declare(queue='hello')#RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')print("[x] Sent 'Hello World!'")
connection.close()
發送
__author__ = 'hardy'
importpika
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()#You may ask why we declare the queue again ? we have already declared it in our previous code.#We could avoid that if we were sure that the queue already exists. For example if send.py program#was run before. But we're not yet sure which program to run first. In such cases it's a good#practice to repeat declaring the queue in both programs.
channel.queue_declare(queue='hello')defcallback(ch, method, properties, body):print("[x] Received %r" %body)
channel.basic_consume(callback,
queue='hello',
no_ack=True)print('[*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
接收
消息隊列的發送端流程
1、連接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
2、聲明queue
channel.queue_declare(queue='hello')
隊列持久化
channel.queue_declare(queue='hello', durable=True)
3、發送消息
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
消息持久化(必須隊列持久化)
channel.basic_publish(exchange='',
routing_key="hello",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
4、關閉
connection.close()
消息隊列接收端流程
1、連接
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
2、聲明queue
channel.queue_declare(queue='hello')
3、創建回調函數(處理數據)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
4、設置
channel.basic_consume(callback,
queue='hello',
no_ack=True)
5、開始接收數據
channel.start_consuming()
6、確認消息被消費
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback,
queue='task_queue',
no_ack=True #no_ack=True消息不需要確認,默認no_ack=false,消息需要確認
)
總結
以上是生活随笔為你收集整理的python 消息队列如何接收处理_python使用消息队列RabbitMq(进阶)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mysql base64 乱码_PHP
- 下一篇: python多线程下载ts_基于Pyth