第二百九十二节,RabbitMQ多设备消息队列-Python开发
RabbitMQ多設備消息隊列-Python開發
首先安裝Python開發連接RabbitMQ的API,pika模塊
pika模塊為第三方模塊
?
?對于RabbitMQ來說,生產和消費不再針對內存里的一個Queue對象,而是某臺服務器上的RabbitMQ Server實現的消息隊列。
?
?
生產者消費者一對一
不使用交換機
?
?
生產者主機
pika.PlainCredentials()設置RabbitMQ Server用戶名和密碼
ConnectionParameters()設置ip和端口
BlockingConnection()創建連接,自帶了socket邏輯部分
channel()獲取連接句柄
queue_declare(queue='隊列名稱')向RabbitMQ Server創建一個消息隊列,如果此隊列存在則不創建
basic_publish(exchange='交換機狀態',routing_key='隊列名稱',body='數據類容')向指定隊列里寫入數據
close()關閉連接
生產者執行了7次,可以看到hello隊列里有7條數據
?
消費者主機
callback(ch, method, properties, (body接收隊列里的數據))自定義獲取隊列數據后的回調函數
basic_consume(回調函數名稱,queue='隊列名稱',no_ack=True)在指定隊列里獲取數據,no_ack=True設置獲取到數據后,是否刪除隊列里對應的數據
start_consuming()等待獲取隊列數據
執行后循環獲取了7次,將hello隊列里的7條數據拿了出來
可以看到hello隊列里已經沒有了數據
?
?
?
?
保證數據不丟失介紹
保證消費者數據不丟失
當消費者主機從隊列里拿出數據時basic_consume()方法參數no_ack=True,表示拿出數據后立即刪除隊列里對應的數據,
如果消費者主機拿出數據,隊列也刪除了對應的數據,還沒來得及處理數據,消費者主機死機了,這樣數據就丟了
解決方法:
當消費者主機從隊列里拿出數據時basic_consume()方法參數no_ack=False,表示拿出數據后不刪除隊列里對應的數據
在消費者回調函數里處理數據,當數據處理完成后寫上ch.basic_ack(delivery_tag = method.delivery_tag),表示執行這串代碼后才刪除隊列里對應的數據
#!/usr/bin/env python # -*- coding:utf8 -*-import pika #導入連接操作RabbitMQ Server主機的模塊# ######################### 消費者 ######################### credentials = pika.PlainCredentials('guest', 'guest') #設置RabbitMQ Server用戶名和密碼 parameters = pika.ConnectionParameters('localhost', 5672,'/',credentials) #設置ip和端口 connection = pika.BlockingConnection(parameters) #創建連接,自帶了socket邏輯部分 channel = connection.channel() #獲取連接句柄 channel.queue_declare(queue='helloa',durable=True) #向RabbitMQ Server創建一個消息隊列,queue='hello'設置隊列名稱,如果此隊列存在則不創建def callback(ch, method, properties, body): #定義獲取隊列數據后的回調函數,body接收隊列里的數據內容print(" 你好 %r" % body)ch.basic_ack(delivery_tag=method.delivery_tag) #執行這串代碼后才刪除隊列里對應的數據#在指定隊列里獲取數據 channel.basic_consume(callback, #獲取到數據后執行回調函數queue='helloa', #指定獲取數據的隊列名稱no_ack=False) #設置獲取到數據后,是否刪除隊列里對應的數據,如果回調函數處理數據異常時會都丟失數據#如果要保證數據必須不丟失設置為False不刪除數據,當接執行回調函數里ch.basic_ack(delivery_tag=method.delivery_tag)時才刪除,但是效率不高 channel.start_consuming() #等待獲取隊列數據?
保證生產者數據不丟失
如果RabbitMQ Server主機隊列里有很多數據,此時RabbitMQ Server主機死機了,那么隊列里的數據也就丟了
解決方法:
當生產者向RabbitMQ Server主機隊列投遞數據時,數據同時也在RabbitMQ Server主機硬盤保存一份,那么即使死機重啟后數據也存在
basic_publish(properties=pika.BasicProperties(delivery_mode=2))投遞模式默認為1,修改成2表示投遞的數據在RabbitMQ Server主機硬盤上保存一份,當消費者操作后刪除隊列數據時,也跟隨刪除
queue_declare(durable=True)表示隊列里的數據開啟硬盤保存,注意:如果生產者設置了那么消費者也要設置
?
?
?
消息獲取順序
默認消息隊列里的數據是按照順序被消費者拿走,例如:消費者1 去隊列中獲取 奇數 序列的任務,消費者2去隊列中獲取 偶數 序列的任務。
誰來誰取,不再按照奇偶數排列
在消費者獲取隊列數據方法basic_consume()之前寫一個channel.basic_qos(prefetch_count=1)表示誰來誰取,不再按照奇偶數排列
#!/usr/bin/env python # -*- coding:utf8 -*-import pika #導入連接操作RabbitMQ Server主機的模塊# ######################### 消費者 ######################### credentials = pika.PlainCredentials('guest', 'guest') #設置RabbitMQ Server用戶名和密碼 parameters = pika.ConnectionParameters('localhost', 5672,'/',credentials) #設置ip和端口 connection = pika.BlockingConnection(parameters) #創建連接,自帶了socket邏輯部分 channel = connection.channel() #獲取連接句柄 channel.queue_declare(queue='helloa',durable=True) #向RabbitMQ Server創建一個消息隊列,queue='hello'設置隊列名稱,如果此隊列存在則不創建def callback(ch, method, properties, body): #定義獲取隊列數據后的回調函數,body接收隊列里的數據內容print(" 你好 %r" % body)ch.basic_ack(delivery_tag=method.delivery_tag) #執行這串代碼后才刪除隊列里對應的數據 channel.basic_qos(prefetch_count=1) #表示誰來誰取,不再按照奇偶數排列 #在指定隊列里獲取數據 channel.basic_consume(callback, #獲取到數據后執行回調函數queue='helloa', #指定獲取數據的隊列名稱no_ack=False) #設置獲取到數據后,是否刪除隊列里對應的數據,如果回調函數處理數據異常時會都丟失數據#如果要保證數據必須不丟失設置為False不刪除數據,當接執行回調函數里ch.basic_ack(delivery_tag=method.delivery_tag)時才刪除,但是效率不高 channel.start_consuming() #等待獲取隊列數據?
?
?
exchange交換機工作模型(fanout發布訂閱,direct關鍵字發送,topic模糊匹配)
fanout交換機,發布訂閱模式
發布者發布數據到交換機,交換機將數據分發到所有訂閱者創建的隊列里,每個訂閱者主機都獲取一份數據
列隊只能由訂閱者創建,訂閱者創建的列隊只要綁定了交換機都會獲取到,交換機分發的數據
發布訂閱和簡單的消息隊列區別在于,發布訂閱會將消息發送給所有的訂閱者,而消息隊列中的數據被消費一次便消失。所以,RabbitMQ實現發布和訂閱時,會為每一個訂閱者創建一個隊列,而發布者發布消息時,交換機會將消息放置在所有相關隊列中。
exchange type = fanout
?發布者
#!/usr/bin/env python # -*- coding:utf8 -*-import pika #導入連接操作RabbitMQ Server主機的模塊 import sys# ######################### 發布者 ######################### credentials = pika.PlainCredentials('guest', 'guest') #設置RabbitMQ Server用戶名和密碼 parameters = pika.ConnectionParameters('localhost', 5672,'/',credentials) #設置ip和端口 connection = pika.BlockingConnection(parameters) #創建連接,自帶了socket邏輯部分 channel = connection.channel() #獲取連接句柄 channel.exchange_declare(exchange='logs', #創建交換機,exchange='交換機名稱',type='fanout'交互機工作模式fanout為訂閱模式type='fanout')message = ' '.join(sys.argv[1:]) or "info: Hello World!" #設置向交換機發布的數據內容#向交換機發布內容 channel.basic_publish(exchange='logs', #設置要發布的交換機名稱routing_key='', #隊列名稱為空,因為數據是發布到交換機而不是隊列,所以為空body=message) #設置要發布的數據內容 print("數據內容以發布到交換機")connection.close() #關閉連接訂閱者
#!/usr/bin/env python # -*- coding:utf8 -*-import pika #導入連接操作RabbitMQ Server主機的模塊# ######################### 訂閱者 ######################### credentials = pika.PlainCredentials('guest', 'guest') #設置RabbitMQ Server用戶名和密碼 parameters = pika.ConnectionParameters('localhost', 5672,'/',credentials) #設置ip和端口 connection = pika.BlockingConnection(parameters) #創建連接,自帶了socket邏輯部分 channel = connection.channel() #獲取連接句柄 channel.exchange_declare(exchange='logs', #創建交換機,exchange='交換機名稱',type='fanout'交互機工作模式fanout為訂閱模式type='fanout')result = channel.queue_declare(exclusive=True) #創建專一訂閱消息隊列, queue_name = result.method.queue #隨機生成隊列名稱#將訂閱隊列綁定交換機 channel.queue_bind(exchange='logs', #exchange='要綁定的交換機名稱'queue=queue_name) #queue=要綁定交換機的隊列名稱def callback(ch, method, properties, body): #定義獲取隊列數據后的回調函數,body接收隊列里的數據內容print("%r" % body)#在指定隊列里獲取數據 channel.basic_consume(callback, #獲取到數據后執行回調函數queue=queue_name, #指定獲取數據的隊列名稱no_ack=True) #如果要保證數據必須不丟失設置為False不刪除數據,當接執行回調函數里ch.basic_ack(delivery_tag=method.delivery_tag)時才刪除,但是效率不高 channel.start_consuming() #等待獲取隊列數據#!/usr/bin/env python # -*- coding:utf8 -*-import pika #導入連接操作RabbitMQ Server主機的模塊# ######################### 訂閱者 ######################### credentials = pika.PlainCredentials('guest', 'guest') #設置RabbitMQ Server用戶名和密碼 parameters = pika.ConnectionParameters('localhost', 5672,'/',credentials) #設置ip和端口 connection = pika.BlockingConnection(parameters) #創建連接,自帶了socket邏輯部分 channel = connection.channel() #獲取連接句柄 channel.exchange_declare(exchange='logs', #創建交換機,exchange='交換機名稱',type='fanout'交互機工作模式fanout為訂閱模式type='fanout')result = channel.queue_declare(exclusive=True) #創建專一訂閱消息隊列, queue_name = result.method.queue #隨機生成隊列名稱#將訂閱隊列綁定交換機 channel.queue_bind(exchange='logs', #exchange='要綁定的交換機名稱'queue=queue_name) #queue=要綁定交換機的隊列名稱def callback(ch, method, properties, body): #定義獲取隊列數據后的回調函數,body接收隊列里的數據內容print("%r" % body)#在指定隊列里獲取數據 channel.basic_consume(callback, #獲取到數據后執行回調函數queue=queue_name, #指定獲取數據的隊列名稱no_ack=True) #如果要保證數據必須不丟失設置為False不刪除數據,當接執行回調函數里ch.basic_ack(delivery_tag=method.delivery_tag)時才刪除,但是效率不高 channel.start_consuming() #等待獲取隊列數據
?
?
?
direct交換機,關鍵字發送模式
exchange type = direct
也叫做完全匹配模式
RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,生產者設置關鍵字將數據發送到exchange交換機,exchange交換機根據 消費者列隊設置的關鍵字 判定應該將數據發送至指定隊列。
也就是說消費者列隊設置的關鍵字,和生產者發送數據設置的關鍵字,兩者只要是一樣關鍵字的,消費者主機都會獲取到一份數據
【重點】消費者可以設置多個關鍵字
?
生產者
#!/usr/bin/env python # -*- coding:utf8 -*-import pika #導入連接操作RabbitMQ Server主機的模塊 import sys# ######################### 生產者 ######################### credentials = pika.PlainCredentials('guest', 'guest') #設置RabbitMQ Server用戶名和密碼 parameters = pika.ConnectionParameters('localhost', 5672,'/',credentials) #設置ip和端口 connection = pika.BlockingConnection(parameters) #創建連接,自帶了socket邏輯部分 channel = connection.channel() #獲取連接句柄 channel.exchange_declare(exchange='logs', #創建交換機,exchange='交換機名稱',type='direct'交互機工作模式direct為關鍵字發送模式type='direct')message = ' '.join(sys.argv[1:]) or "info: Hello World!" #設置向交換機發布的數據內容#向交換機發布內容 channel.basic_publish(exchange='logs', #設置要發布的交換機名稱routing_key='severity', #設置信道關鍵字body=message) #設置要發布的數據內容 print("數據內容以發布到交換機")connection.close() #關閉連接消費者,設置一個信道關鍵字
#!/usr/bin/env python # -*- coding:utf8 -*-import pika #導入連接操作RabbitMQ Server主機的模塊# ######################### 消費者 ######################### credentials = pika.PlainCredentials('guest', 'guest') #設置RabbitMQ Server用戶名和密碼 parameters = pika.ConnectionParameters('localhost', 5672,'/',credentials) #設置ip和端口 connection = pika.BlockingConnection(parameters) #創建連接,自帶了socket邏輯部分 channel = connection.channel() #獲取連接句柄 channel.exchange_declare(exchange='logs', #創建交換機,exchange='交換機名稱',type='direct'交互機工作模式direct為關鍵字發送模式type='direct')result = channel.queue_declare(exclusive=True) #創建專一消息隊列, queue_name = result.method.queue #隨機生成隊列名稱#將隊列綁定交換機 channel.queue_bind(exchange='logs', #exchange='要綁定的交換機名稱'queue=queue_name, #queue=要綁定交換機的隊列名稱routing_key='severity',) #設置信道關鍵字def callback(ch, method, properties, body): #定義獲取隊列數據后的回調函數,body接收隊列里的數據內容print("%r" % body)#在指定隊列里獲取數據 channel.basic_consume(callback, #獲取到數據后執行回調函數queue=queue_name, #指定獲取數據的隊列名稱no_ack=True) #如果要保證數據必須不丟失設置為False不刪除數據,當接執行回調函數里ch.basic_ack(delivery_tag=method.delivery_tag)時才刪除,但是效率不高 channel.start_consuming() #等待獲取隊列數據消費者,設置多個信道關鍵字
#!/usr/bin/env python # -*- coding:utf8 -*-import pika #導入連接操作RabbitMQ Server主機的模塊# ######################### 消費者 ######################### credentials = pika.PlainCredentials('guest', 'guest') #設置RabbitMQ Server用戶名和密碼 parameters = pika.ConnectionParameters('localhost', 5672,'/',credentials) #設置ip和端口 connection = pika.BlockingConnection(parameters) #創建連接,自帶了socket邏輯部分 channel = connection.channel() #獲取連接句柄 channel.exchange_declare(exchange='logs', #創建交換機,exchange='交換機名稱',type='direct'交互機工作模式direct為關鍵字發送模式type='direct')result = channel.queue_declare(exclusive=True) #創建專一消息隊列, queue_name = result.method.queue #隨機生成隊列名稱#將隊列綁定交換機 channel.queue_bind(exchange='logs', #exchange='要綁定的交換機名稱'queue=queue_name, #queue=要綁定交換機的隊列名稱routing_key='severity',) #設置信道關鍵字 channel.queue_bind(exchange='logs', #exchange='要綁定的交換機名稱'queue=queue_name, #queue=要綁定交換機的隊列名稱routing_key='severity2',) #設置信道關鍵字 channel.queue_bind(exchange='logs', #exchange='要綁定的交換機名稱'queue=queue_name, #queue=要綁定交換機的隊列名稱routing_key='severity3',) #設置信道關鍵字def callback(ch, method, properties, body): #定義獲取隊列數據后的回調函數,body接收隊列里的數據內容print("%r" % body)#在指定隊列里獲取數據 channel.basic_consume(callback, #獲取到數據后執行回調函數queue=queue_name, #指定獲取數據的隊列名稱no_ack=True) #如果要保證數據必須不丟失設置為False不刪除數據,當接執行回調函數里ch.basic_ack(delivery_tag=method.delivery_tag)時才刪除,但是效率不高 channel.start_consuming() #等待獲取隊列數據?
?
?
?topic交換機,模糊匹配發送模式
exchange type = topic
在topic類型下,可以讓隊列綁定幾個模糊的關鍵字,之后發送者將數據發送到exchange交換機,exchange交換機將傳入的”列隊關鍵字“和 ”生產者關鍵字“進行匹配,匹配成功,則將數據發送到指定隊列。
- # 表示可以匹配 0 個 或 多個 單詞
- * ?表示只能匹配 一個 單詞
例如:
發送者關鍵字 生產者匹配 old.boy.python old.* -- 不匹配 old.boy.python old.# -- 匹配?
生產者
#!/usr/bin/env python # -*- coding:utf8 -*-import pika #導入連接操作RabbitMQ Server主機的模塊 import sys# ######################### 生產者 ######################### credentials = pika.PlainCredentials('guest', 'guest') #設置RabbitMQ Server用戶名和密碼 parameters = pika.ConnectionParameters('localhost', 5672,'/',credentials) #設置ip和端口 connection = pika.BlockingConnection(parameters) #創建連接,自帶了socket邏輯部分 channel = connection.channel() #獲取連接句柄 channel.exchange_declare(exchange='logs', #創建交換機,exchange='交換機名稱',type='topic'交互機工作模式topic為模糊匹配發送模式type='topic')routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' #設置發送關鍵字 message = ' '.join(sys.argv[2:]) or 'Hello World!' #設置發送內容#向交換機發送內容 channel.basic_publish(exchange='logs', #設置要發布的交換機名稱routing_key=routing_key, #設置信道關鍵字:anonymous.infobody=message) #設置要發布的數據內容:Hello World!print(" [x] Sent %r:%r" % (routing_key, message)) #打印出相關內容 connection.close() #關閉連接消費者
#!/usr/bin/env python # -*- coding:utf8 -*-import pika #導入連接操作RabbitMQ Server主機的模塊# ######################### 消費者 ######################### credentials = pika.PlainCredentials('guest', 'guest') #設置RabbitMQ Server用戶名和密碼 parameters = pika.ConnectionParameters('localhost', 5672,'/',credentials) #設置ip和端口 connection = pika.BlockingConnection(parameters) #創建連接,自帶了socket邏輯部分 channel = connection.channel() #獲取連接句柄 channel.exchange_declare(exchange='logs', #創建交換機,exchange='交換機名稱',type='topic'交互機工作模式topic為模糊匹配發送模式type='topic')result = channel.queue_declare(exclusive=True) #創建專一消息隊列, queue_name = result.method.queue #隨機生成隊列名稱#將隊列綁定交換機 channel.queue_bind(exchange='logs', #exchange='要綁定的交換機名稱'queue=queue_name, #queue=要綁定交換機的隊列名稱routing_key='anonymous.*',) #設置信道關鍵字匹配def callback(ch, method, properties, body): #定義獲取隊列數據后的回調函數,body接收隊列里的數據內容print("%r" % body)#在指定隊列里獲取數據 channel.basic_consume(callback, #獲取到數據后執行回調函數queue=queue_name, #指定獲取數據的隊列名稱no_ack=True) #如果要保證數據必須不丟失設置為False不刪除數據,當接執行回調函數里ch.basic_ack(delivery_tag=method.delivery_tag)時才刪除,但是效率不高 channel.start_consuming() #等待獲取隊列數據?
更多教程,可以查看官方教程
http://www.rabbitmq.com/getstarted.html
?
轉載于:https://www.cnblogs.com/adc8868/p/7064389.html
總結
以上是生活随笔為你收集整理的第二百九十二节,RabbitMQ多设备消息队列-Python开发的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 安装elasticsearch5.4.1
- 下一篇: Pycharm 创建 Django ad