日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程语言 > python >内容正文

python

python运维开发之第十一天(RabbitMQ,redis)

發(fā)布時(shí)間:2025/7/25 python 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python运维开发之第十一天(RabbitMQ,redis) 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

一、RabbitMQ

python的Queue與RabbitMQ之間的理解:

python的進(jìn)程或線程Queue只能python自己用。RabbitMQ隊(duì)列多個(gè)應(yīng)用之間共享隊(duì)列,互相通信。

1、簡(jiǎn)單的實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者

  生產(chǎn)者

  (1)建立socket連接;(2)聲明一個(gè)管道;(3)聲明隊(duì)列(queue);(4)通過(guò)管道發(fā)消息;(5)routing_key(queue名字);(6)body(內(nèi)容)

  消費(fèi)者

  (1)建立連接;(2)聲明管道;(3)聲明隊(duì)列;(4)消費(fèi)者聲明隊(duì)列(防止生產(chǎn)者后啟動(dòng),消費(fèi)者報(bào)錯(cuò));(5)消費(fèi)消息;(6)callback如果收到消息就調(diào)用函數(shù)處理消息 queue隊(duì)列名字;

#!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : Willpower-chen # @blog: http://www.cnblogs.com/willpower-chen/import pika #建立socket連接 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #聲明一個(gè)管道 channel = connection.channel() #聲明一個(gè)隊(duì)列 channel.queue_declare(queue='hello') #通過(guò)管道發(fā)消息,routing_key 隊(duì)列queue名字 ,body發(fā)送內(nèi)容 channel.basic_publish(exchange='',routing_key='hello',body='Hello World! 1 2') print("[x] send 'Hello World! 1 2 '") connection.close() producer #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : Willpower-chen # @blog: http://www.cnblogs.com/willpower-chen/import pika,time #建立連接 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #聲明一個(gè)管道 channel = connection.channel() #聲明隊(duì)列,防止生產(chǎn)者(發(fā)送端)沒(méi)開(kāi)啟,消費(fèi)者端報(bào)錯(cuò) channel.queue_declare(queue='hello') #ch管道的內(nèi)存對(duì)象地址,如果收到消息就調(diào)用函數(shù)callback,處理消息 def callbak(ch,method,properties,body):print("[x] Received %r " % body)# time.sleep(30) #消費(fèi)消息 channel.basic_consume(callbak,queue='hello',no_ack=True #消息有沒(méi)處理,都不給生產(chǎn)者發(fā)確認(rèn)消息 ) print('[*] Waitting for messages TO exit press ctrl+c') channel.start_consuming() #開(kāi)始 consumer

?2、消費(fèi)者對(duì)生產(chǎn)者,可以1對(duì)多,而且默認(rèn)是輪詢(xún)機(jī)制

no_ack=True如果注釋掉的話,消費(fèi)者端不給服務(wù)器端確認(rèn)收到消息,服務(wù)器端就不會(huì)把要發(fā)的消息從隊(duì)列里清除

如下圖注釋了no_ack,加了一個(gè)時(shí)間,

? ? ?

開(kāi)啟三個(gè)消費(fèi)者,一個(gè)生產(chǎn)者,生產(chǎn)者只send一次數(shù)據(jù),挨個(gè)停止consumer,會(huì)發(fā)現(xiàn)同一條消息會(huì)被重新發(fā)給下一個(gè)consumer,直到producer收到consumer的確認(rèn)收到的消息

?

3、隊(duì)列查詢(xún)

清除隊(duì)列消息

?

4、消息持久化

(1)durable只是隊(duì)列持久化

channel.queue_declare(queue='hello',durable=True)

生產(chǎn)者和消費(fèi)者都需要添加durable=True

(2)要實(shí)現(xiàn)消息持久化,還需要

5、消息(1對(duì)多)實(shí)現(xiàn)權(quán)重功能

消費(fèi)者端添加在消費(fèi)消息之前

channel.basic_qos(prefetch_count=1)

?

6、廣播消息fanout(純廣播)訂閱發(fā)布

#!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : Willpower-chen # @blog: http://www.cnblogs.com/willpower-chen/import pika import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()channel.exchange_declare(exchange='logs',type='fanout') #message = ' '.join(sys.argv[1:]) or "info: Hello World!" message = "info: Hello World!2"channel.basic_publish(exchange='logs',routing_key='',body=message) print(" [x] Sent %r" % message)connection.close() fanout_producer #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : Willpower-chen # @blog: http://www.cnblogs.com/willpower-chen/ import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()channel.exchange_declare(exchange='logs',type='fanout') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue print("random queuename",queue_name)channel.queue_bind(exchange='logs',queue=queue_name)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r" % body)channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming() fanout_consumer

7、direct廣播模式(有選擇性的發(fā)送接收消息)

import pika import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()channel.exchange_declare(exchange='direct_logs',type='direct')severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close() direct_producer #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : Willpower-chen # @blog: http://www.cnblogs.com/willpower-chen/ import pika import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()channel.exchange_declare(exchange='direct_logs',type='direct')result = channel.queue_declare(exclusive=True) queue_name = result.method.queueseverities = sys.argv[1:] if not severities:sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])sys.exit(1)for severity in severities:channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)print(severities) print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming() direct_consumer

8、更細(xì)致的消息判斷 type = topic

#!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : Willpower-chen # @blog: http://www.cnblogs.com/willpower-chen/import pika import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()channel.exchange_declare(exchange='topic_logs',type='topic')routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs',routing_key=routing_key,body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close() topic_producer #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : Willpower-chen # @blog: http://www.cnblogs.com/willpower-chen/import pika import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()channel.exchange_declare(exchange='topic_logs',type='topic')result = channel.queue_declare(exclusive=True) queue_name = result.method.queuebinding_keys = sys.argv[1:] if not binding_keys:sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])sys.exit(1)for binding_key in binding_keys:channel.queue_bind(exchange='topic_logs',queue=queue_name,routing_key=binding_key)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming() topic_consumer

?

?

?

?

?

??

轉(zhuǎn)載于:https://www.cnblogs.com/willpower-chen/p/5977633.html

總結(jié)

以上是生活随笔為你收集整理的python运维开发之第十一天(RabbitMQ,redis)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。