loglevel python 不输出_Python 通过 Celery 框架实现分布式任务队列!
Celery 是一個(gè)簡(jiǎn)單、靈活且可靠的分布式消息處理系統(tǒng),主要用來(lái)作為任務(wù)隊(duì)列對(duì)海量消息數(shù)據(jù)進(jìn)行實(shí)時(shí)的處理,在多個(gè)程序線程或者主機(jī)之間傳遞和分發(fā)工作任務(wù)。同時(shí)也支持計(jì)劃任務(wù)等需求。
一、環(huán)境配置
Celery 框架自身并不對(duì)傳入的消息進(jìn)行存儲(chǔ),因此在使用前需要先安裝第三方的 Message Broker。如 RabbitMQ 和 Redis 等。
安裝 RabbitMQ
對(duì)于 Linux 系統(tǒng),執(zhí)行以下命令:
$ sudo apt-get install rabbitmq-server # 安裝 RabbitMQ $ sudo rabbitmqctl add_user myuser mypassword # 添加用戶 myuser/mypassword $ sudo rabbitmqctl add_vhost myvhost # 添加 vhost $ sudo rabbitmqctl set_user_tags myuser mytag $ sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*" # 為用戶 myuser 設(shè)置訪問(wèn) myvhost 的權(quán)限通過(guò) Docker 安裝的步驟如下:
$ docker pull rabbitmq:3.8-management # 拉取 docker 鏡像(包含 web 管理) # 啟動(dòng) rabbitmq 容器 $ docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=myvhost -e RABBITMQ_DEFAULT_USER=myuser -e RABBITMQ_DEFAULT_PASS=mypassword rabbitmq:3.8-management安裝 Redis
$ sudo apt-get install redis-server
安裝 Celery
$ pip install celery
二、創(chuàng)建 Celery 應(yīng)用
Celery 應(yīng)用是該框架所能提供的所有功能(如管理 tasks 和 workers 等)的入口,須確保它可以被其他模塊導(dǎo)入。
以下是一段簡(jiǎn)單的 Celery app 代碼 tasks.py :
# tasks.py from celery import Celeryapp = Celery('tasks',broker='pyamqp://myuser:mypassword@localhost:5672/myvhost',backend='redis://localhost:6379/0')@app.task def add(x, y):return x + y使用 RabbitMQ 作為 broker 接收和發(fā)送任務(wù)消息,使用 Redis 作為 backend 存儲(chǔ)計(jì)算結(jié)果。
運(yùn)行 Celery worker 服務(wù)
$ celery -A tasks worker --loglevel=info
$ celery -A tasks worker --loglevel=info-------------- celery@skitarniu-ubuntu18 v4.3.0 (rhubarb) ---- **** ----- --- * *** * -- Linux-4.15.0-60-generic-x86_64-with-debian-buster-sid 2019-11-01 07:21:34 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: tasks:0x7f4f30b84a90 - ** ---------- .> transport: amqp://myuser:**@localhost:5672/myvhost - ** ---------- .> results: redis://localhost:6379/0 - *** --- * --- .> concurrency: 2 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ------------------- [queues].> celery exchange=celery(direct) key=celery[tasks]. tasks.add[2019-11-01 07:21:35,316: INFO/MainProcess] Connected to amqp://myuser:**@127.0.0.1:5672/myvhost [2019-11-01 07:21:35,367: INFO/MainProcess] mingle: searching for neighbors [2019-11-01 07:21:36,535: INFO/MainProcess] mingle: all alone [2019-11-01 07:21:36,782: INFO/MainProcess] celery@skitarniu-ubuntu18 ready.任務(wù)測(cè)試
進(jìn)入 Python Shell,執(zhí)行以下命令發(fā)布任務(wù)并獲取結(jié)果:
>>> from tasks import add >>> result = add.delay(4, 4) >>> result <AsyncResult: 6f435bc7-f194-469c-837f-54d77f880ace> >>> result.ready() True >>> result.get() 8 >>> result.traceback >>>delay() 方法用于發(fā)布任務(wù)消息,它是 apply_async() 方法的簡(jiǎn)寫(xiě),即以異步的方式將任務(wù)需求提交給前面啟動(dòng)好的 worker 去處理。 delay() 方法返回一個(gè) AsyncResult 對(duì)象。
result.ready() 方法可以用來(lái)檢查提交的任務(wù)是否已經(jīng)完成,返回布爾值。
result.get() 方法則用于獲取執(zhí)行完成后的結(jié)果。如任務(wù)未完成,則程序會(huì)一直等待直到有結(jié)果返回。因此該方法是 阻塞 的,并不常用。可以傳入 timeout 參數(shù)指定等待的時(shí)間上限。
如 result.get(timeout=1) ,嘗試獲取任務(wù)執(zhí)行后的結(jié)果,等待 1 秒。若 1 秒之后結(jié)果仍未返回,拋出 celery.exceptions.TimeoutError: The operation timed out. 異常。
如果任務(wù)執(zhí)行過(guò)程中有拋出異常,則使用 get() 方法獲取結(jié)果時(shí)會(huì)重新拋出該異常導(dǎo)致程序中斷。可以通過(guò)修改 propagate 參數(shù)避免此情況:
result.get(propagate=False)
result.traceback 則用于獲取任務(wù)的 traceback 信息。
三、Calling Tasks
Celery 定義了一些可供 task 實(shí)例調(diào)用的通用的 Calling API ,包括三個(gè)方法和一些標(biāo)準(zhǔn)的執(zhí)行選項(xiàng):
apply_async(args[, kwargs[, ...]]) delay(*args, **kwargs) calling (__call__)以下是一些常見(jiàn)的調(diào)用示例:
- T.delay(arg, kwarg=value)
- T.apply_async((arg,), {'kwarg': value})
- T.apply_async(countdown=10)
10 秒之后開(kāi)始執(zhí)行某個(gè)任務(wù) - T.apply_async(eta=now + timedelta(seconds=10))
10 秒之后開(kāi)始執(zhí)行某個(gè)任務(wù) - T.apply_async(countdown=60, expires=120)
預(yù)計(jì) 1 分鐘后開(kāi)始執(zhí)行,但 2 分鐘后還未執(zhí)行則失效 - T.apply_async(expires=now + timedelta(days=2))
2 天后失效
通過(guò) countdown 設(shè)置任務(wù)的延遲執(zhí)行:
>>> from tasks import add >>> result = add.apply_async((2, 3)) >>> result.get() 5 >>> delay_result = add.apply_async((2, 3), countdown=15) >>> delay_result.ready() False >>> delay_result.ready() False >>> delay_result.ready() False >>> delay_result.ready() True >>> delay_result.get() 5還可以通過(guò) eta (estimated time of arrival) 設(shè)置延遲執(zhí)行的時(shí)間:
>>> from datetime import datetime, timedelta >>> tomorrow = datetime.utcnow() + timedelta(days=1) >>> add.apply_async((2, 3), eta=tomorrow) <AsyncResult: c7dc6d7f-8b87-49d1-8077-73d7f046d709>此時(shí) worker 在命令行的日志輸出如下:
[2019-11-06 05:16:21,362: INFO/MainProcess] Received task: tasks.add[c7dc6d7f-8b87-49d1-8077-73d7f046d709] ETA:[2019-11-07 05:16:06.652736+00:00]四、計(jì)劃任務(wù)
Celery 允許像使用 crontab 那樣按計(jì)劃地定時(shí)執(zhí)行某個(gè)任務(wù)。參考代碼如下:
# tasks.py from celery import Celeryapp = Celery('tasks',broker='pyamqp://myuser:mypassword@localhost:5672/myvhost',backend='redis://localhost:6379/1')app.conf.beat_schedule = {'add-every-60-seconds': {'task': 'tasks.add','schedule': 60.0,'args': (16, 16)}, } app.conf.timezone = 'UTC'@app.task def add(x, y):print(x + y)運(yùn)行 celery -A tasks worker -B 啟動(dòng) worker 服務(wù)。
-B 選項(xiàng)表示 beat ,即 celery beat 服務(wù),負(fù)責(zé)執(zhí)行計(jì)劃任務(wù)。
輸出如下(每隔一分鐘執(zhí)行一次):
$ celery -A tasks worker -B ... [2019-11-06 05:41:34,057: WARNING/ForkPoolWorker-3] 32 [2019-11-06 05:42:33,998: WARNING/ForkPoolWorker-3] 32 [2019-11-06 05:43:34,056: WARNING/ForkPoolWorker-3] 32 [2019-11-06 05:44:34,105: WARNING/ForkPoolWorker-3] 32 [2019-11-06 05:45:34,157: WARNING/ForkPoolWorker-3] 32 ...同時(shí) Celery 也支持更復(fù)雜的 crontab 類(lèi)型的時(shí)間規(guī)劃:
from celery.schedules import crontabapp.conf.beat_schedule = {# Executes every Monday morning at 7:30 a.m.'add-every-monday-morning': {'task': 'tasks.add','schedule': crontab(hour=7, minute=30, day_of_week=1),'args': (16, 16),}, }Crontab 表達(dá)式支持的語(yǔ)法如下:
ExampleMeaningcrontab()每分鐘執(zhí)行一次crontab(minute=0, hour=0)每天半夜 0 點(diǎn)執(zhí)行crontab(minute=0, hour='*/3')每隔 3 小時(shí)執(zhí)行一次(從 0 時(shí)開(kāi)始)crontab(minute=0, hour='0,3,6,9,12,15,18,21')同上一條crontab(day_of_week='sunday')只在周日?qǐng)?zhí)行,每隔一分鐘執(zhí)行一次crontab(minute='*', hour='*', day_of_week='sun')同上一條crontab(minute='*/10', hour='3,17,22', day_of_week='thu,fri')只在周四、周五的 3、17、22 時(shí)執(zhí)行,每隔 10 分鐘執(zhí)行一次crontab(minute=0, hour='*/2,*/3')只在能被 2 或者 3 整除的整點(diǎn)執(zhí)行crontab(minute=0, hour='*/3,8-17')在能被 3 整除的整點(diǎn),和 8-17 點(diǎn)之間的整點(diǎn)執(zhí)行crontab(0, 0, day_of_month='2')在每個(gè)月的第二天的 0 時(shí)執(zhí)行crontab(0, 0, day_of_month='11', month_of_year='5')在每年的 5 月 11 號(hào) 0 點(diǎn)執(zhí)行
與50位技術(shù)專家面對(duì)面20年技術(shù)見(jiàn)證,附贈(zèng)技術(shù)全景圖總結(jié)
以上是生活随笔為你收集整理的loglevel python 不输出_Python 通过 Celery 框架实现分布式任务队列!的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: gb50268-2008给水排水管道施工
- 下一篇: python datetime time