python 消息队列、异步分布式
一.消息隊列
消息隊列:是在消息的傳輸過程中保存消息的容器。
消息隊列最經典的用法就是消費者和生成者之間通過消息管道來傳遞消息,消費者和生成者是不同的進程。生產者往管道中寫消息,消費者從管道中讀消息。
操作系統(tǒng)提供了很多機制來實現進程間的通信 ,multiprocessing模塊就提供了Queue和Pipe兩種方法來實現。
其中P指producer,即生產者;C指consumer,即消費者。中間的紅色表示消息隊列,實例中表現為HELLO隊列。
1.生產消費實例 Queue 單向進行,即生產者只進行發(fā)消息,消費者只進行收
#導入模塊 from multiprocessing import Queue from threading import Threadimport time#創(chuàng)建生產者 def producer(q):print("start producer")for i in range(10):q.put(i) #發(fā)消息time.sleep(0.5)print("end producer")#創(chuàng)建消費者,消費者一般是個死循環(huán),要一直監(jiān)聽是否有需要處理的信息。 def customer(q):print("start customer")while 1:data = q.get() #收消息print("customer has get value {0}".format(data))if __name__ == '__main__':q = Queue() #創(chuàng)建一個隊列pro = Thread(target=producer,args=(q,))cus = Thread(target=customer,args=(q,))pro.start()cus.start()生產一個則消費一個。
2.通過Mutiprocess里面的Pipe來實現消息隊列:
Pipe方法返回(conn1, conn2)代表一個管道的兩個端。Pipe方法有duplex參數,如果duplex參數為True(默認值),那么這個管道是全雙工模式,也就是說conn1和conn2均可收發(fā)。duplex為False,conn1只負責接受消息,conn2只負責發(fā)送消息。
send和recv方法分別是發(fā)送和接受消息的方法。
close方法表示關閉管道,當消息接受結束以后,關閉管道。
from multiprocessing import Pipe, Process from threading import Threadimport timedef proc1(pipe):for i in range(10):print("send {0}".format(i))pipe.send(i)time.sleep(0.5)print("end proc1")def proc2(pipe):n =10while n:print("proc2 recv {0}".format(pipe.recv()))n -=1if __name__ == '__main__':(p1,p2) = Pipe(duplex=False)pr = Process(target=proc1,args=(p2,))cu = Process(target=proc2,args=(p1,))pr.start()cu.start() ''' send 0 proc2 recv 0 send 1 proc2 recv 1 send 2 proc2 recv 2 send 3 proc2 recv 3 send 4 proc2 recv 4 send 5 proc2 recv 5 send 6 proc2 recv 6 send 7 proc2 recv 7 send 8 proc2 recv 8 send 9 proc2 recv 9 end proc1 '''3.Python提供了Queue模塊來專門實現消息隊列Queue對象
Queue對象實現一個fifo隊列(其他的還有l(wèi)ifo、priority隊列,這里不再介紹)。queue只有maxsize一個構造參數,用來指定隊列容量,指定為0的時候代表容量無限。主要有以下成員函數:
q = Queue(maxsize=0) #指定隊列大小,0表示無限 q.qsize() #返回當前隊列的空間 q.empty() #判斷當前隊列是否為空 q.full() #判斷當前隊列是否滿了 q.put() #發(fā)消息 q.get() #獲取消息 q.task_done() #接受消息的線程條用該函數來說明消息對應的任務是否已經完成 q.join() #等待隊列為空,再執(zhí)行別的操作二.Celery異步分布式
Celery是一個python開發(fā)的異步分布式任務調度模塊。
Celery本身并不提供消息服務,使用第三方服務,也就是borker來傳遞任務,目前支持rebbimq,redis, 數據庫等。
`這里我們使用redis
連接url的格式為:
redis://:password@hostname:port/db_number
例如:
BROKER_URL = ‘redis://localhost:6379/0’
1.安裝celery
pip install celery
pip install redis
在服務器上安裝redis服務器,并啟動redis
第一個簡單的例子:在任意路徑下創(chuàng)建一個文件。
2.啟動worker
#celery -A sixgod worker -l info
3.生產者
4.傳入信息
from sixgod import addre = add.delay(10,20)
5.獲取
運行:
823220ed-abff-45cb-a5f4-42c53c4d33e9Celery異步分布式
Celery 是一個python開發(fā)的異步分布式任務調度模塊,是一個消息傳輸的中間件,可以理解為一個郵箱,每當應用程序調用celery的異步任務時,會向broker(任務隊列)傳遞消息,然后celery的worker從中取消息
Celery 用于存儲消息以及celery執(zhí)行的一些消息和結果
對于brokers,官方推薦是rabbitmq和redis
對于backend,也就是指數據庫,為了簡單一般使用redis
總結
以上是生活随笔為你收集整理的python 消息队列、异步分布式的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python的f-string 格式化字
- 下一篇: Python MongoDB--PyMo