日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

python 消息队列、异步分布式

發(fā)布時間:2024/4/13 python 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python 消息队列、异步分布式 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一.消息隊列

消息隊列:是在消息的傳輸過程中保存消息的容器。
消息隊列最經典的用法就是消費者和生成者之間通過消息管道來傳遞消息,消費者和生成者是不同的進程。生產者往管道中寫消息,消費者從管道中讀消息。
操作系統(tǒng)提供了很多機制來實現進程間的通信 ,multiprocessing模塊就提供了Queue和Pipe兩種方法來實現。

其中P指producer,即生產者;C指consumer,即消費者。中間的紅色表示消息隊列,實例中表現為HELLO隊列。

1.生產消費實例 Queue 單向進行,即生產者只進行發(fā)消息,消費者只進行收

#導入模塊 from multiprocessing import Queue from threading import Threadimport time#創(chuàng)建生產者 def producer(q):print("start producer")for i in range(10):q.put(i) #發(fā)消息time.sleep(0.5)print("end producer")#創(chuàng)建消費者,消費者一般是個死循環(huán),要一直監(jiān)聽是否有需要處理的信息。 def customer(q):print("start customer")while 1:data = q.get() #收消息print("customer has get value {0}".format(data))if __name__ == '__main__':q = Queue() #創(chuàng)建一個隊列pro = Thread(target=producer,args=(q,))cus = Thread(target=customer,args=(q,))pro.start()cus.start()

生產一個則消費一個。

2.通過Mutiprocess里面的Pipe來實現消息隊列:

Pipe方法返回(conn1, conn2)代表一個管道的兩個端。Pipe方法有duplex參數,如果duplex參數為True(默認值),那么這個管道是全雙工模式,也就是說conn1和conn2均可收發(fā)。duplex為False,conn1只負責接受消息,conn2只負責發(fā)送消息。

send和recv方法分別是發(fā)送和接受消息的方法。

close方法表示關閉管道,當消息接受結束以后,關閉管道。

from multiprocessing import Pipe, Process from threading import Threadimport timedef proc1(pipe):for i in range(10):print("send {0}".format(i))pipe.send(i)time.sleep(0.5)print("end proc1")def proc2(pipe):n =10while n:print("proc2 recv {0}".format(pipe.recv()))n -=1if __name__ == '__main__':(p1,p2) = Pipe(duplex=False)pr = Process(target=proc1,args=(p2,))cu = Process(target=proc2,args=(p1,))pr.start()cu.start() ''' send 0 proc2 recv 0 send 1 proc2 recv 1 send 2 proc2 recv 2 send 3 proc2 recv 3 send 4 proc2 recv 4 send 5 proc2 recv 5 send 6 proc2 recv 6 send 7 proc2 recv 7 send 8 proc2 recv 8 send 9 proc2 recv 9 end proc1 '''

3.Python提供了Queue模塊來專門實現消息隊列Queue對象

Queue對象實現一個fifo隊列(其他的還有l(wèi)ifo、priority隊列,這里不再介紹)。queue只有maxsize一個構造參數,用來指定隊列容量,指定為0的時候代表容量無限。主要有以下成員函數:

q = Queue(maxsize=0) #指定隊列大小,0表示無限 q.qsize() #返回當前隊列的空間 q.empty() #判斷當前隊列是否為空 q.full() #判斷當前隊列是否滿了 q.put() #發(fā)消息 q.get() #獲取消息 q.task_done() #接受消息的線程條用該函數來說明消息對應的任務是否已經完成 q.join() #等待隊列為空,再執(zhí)行別的操作

二.Celery異步分布式

Celery是一個python開發(fā)的異步分布式任務調度模塊。
Celery本身并不提供消息服務,使用第三方服務,也就是borker來傳遞任務,目前支持rebbimq,redis, 數據庫等。

`這里我們使用redis
連接url的格式為:
redis://:password@hostname:port/db_number
例如:
BROKER_URL = ‘redis://localhost:6379/0’

1.安裝celery
pip install celery
pip install redis

在服務器上安裝redis服務器,并啟動redis
第一個簡單的例子:在任意路徑下創(chuàng)建一個文件。

vim sixgod.py#/usr/bin/env #coding=utf-8 from celery import Celery,platforms platforms.C_FORCE_ROOT = Truebroker = "redis://localhost:6379/5" backend = "redis://localhost:6379/6" app = Celery("sixgod",broker=broker,backend=backend)@app.task def add(x,y):return x+y

2.啟動worker
#celery -A sixgod worker -l info

3.生產者

4.傳入信息

from sixgod import addre = add.delay(10,20)


5.獲取

re.result #獲取結果 re.ready) #是否處理 re.get #獲取結果re.id #獲取id from sixgod import addre = add.delay(100,200) print(re.id) #獲取id

運行:

823220ed-abff-45cb-a5f4-42c53c4d33e9

Celery異步分布式

Celery 是一個python開發(fā)的異步分布式任務調度模塊,是一個消息傳輸的中間件,可以理解為一個郵箱,每當應用程序調用celery的異步任務時,會向broker(任務隊列)傳遞消息,然后celery的worker從中取消息

Celery 用于存儲消息以及celery執(zhí)行的一些消息和結果

對于brokers,官方推薦是rabbitmq和redis

對于backend,也就是指數據庫,為了簡單一般使用redis

總結

以上是生活随笔為你收集整理的python 消息队列、异步分布式的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。