python 任务调度 celery_python任务调度模块celery(二)
關(guān)于celery的的基礎(chǔ)介紹及安裝使用參見python任務(wù)調(diào)度模塊celery。
多worker和多隊列
首先是多worker和多隊列的原理及流程圖。
一般情況下對于多worker和多隊列的配置文件單獨寫在一個配置文件,方便管理和配置。
定義任務(wù)列表
multique.py1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16from celery import Celery
app = Celery()
app.config_from_object("celeryconfig")
def (x, y):
return x*y
def taskB(x, y, z):
return x+y+z
def add(x, y):
return x+y
配置文件
celeryconfig.py1
2
3
4
5
6
7
8
9
10
11
12
13
14
15from kombu import Queue, Exchange
BROKER_URL = "redis://118.24.18.158:6380/1"
CELERY_RESULT_BACKEND = "redis://118.24.18.158:6380/2"
CELERY_QUEUES = {
Queue("default", Exchange("default"), routing_key="default"),
Queue("for_task_A", Exchange("for_task_A"), routing_key="for_task_A"),
Queue("for_task_B", Exchange("for_task_B"), routing_key="for_task_B")
}
CELERY_ROUTES = {
"multique.taskA": {"queue": "for_task_A", "routing_key": "for_task_A"},
"multique.taskB": {"queue": "for_task_B", "routing_key": "for_task_B"}
}
啟動celery worker監(jiān)聽1
2celery -A multique worker -l=info -n workerA.%h -Q for_task_A
celery -A multique worker -l=info -n workerB.%h -Q for_task_B
調(diào)用任務(wù)
multicelery.py1
2
3
4
5
6
7
8
9
10
11import time
from queue1.multique import *
re1 = taskA.delay(10, 20)
re2 = taskB.delay(100, 200, 300)
re3 = add.delay(1000, 2000)
time.sleep(2)
print(re1.result)
print(re2.result) #輸出結(jié)果:600
print(re3.status) #輸出結(jié)果:PENDING
print(re3.result) #輸出結(jié)果:None
我們看到狀態(tài)是PENDING,表示沒有執(zhí)行,這個是因為沒有celeryconfig.py文件中指定改route到哪一個Queue中,所以會被發(fā)動到默認的名字celery的Queue中,但是我們還沒有啟動worker執(zhí)行celery中的任務(wù)。下面,我們來啟動一個worker來執(zhí)行celery隊列中的任務(wù)。1celery -A multique worker -l info -n worker.%h -Q celery
再次調(diào)用任務(wù),狀態(tài)應(yīng)該為SUCCESS,結(jié)果為3000。
celery定時任務(wù)
celery定時任務(wù),Celery Beat進程通過讀取配置文件的內(nèi)容,周期性的將定時任務(wù)發(fā)往任務(wù)隊列。
以上面多worker的異步任務(wù)為例,配置文件celeryconfig.py 中添加CELERYBEAT_SCHEDULE變量,添加內(nèi)容如下1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18CELERY_TIMEZONE = 'UTC' #指定時區(qū),默認為UTC
CELERYBEAT_SCHEDULE = {
'taskA_schedule': {
'task': 'multique.taskA',
'schedule': 2, #每2s執(zhí)行一次
'args': (5, 6) #傳遞函數(shù)參數(shù)
},
'taskB_scheduler': {
'task': "multique.taskB",
"schedule": 10,
"args":(10, 20, 30)
},
'add_schedule': {
"task": "multique.add",
"schedule": 5,
"args": (1, 2)
}
}
參數(shù)說明task
指定任務(wù)的名字
schedule
設(shè)定任務(wù)的調(diào)度方式(設(shè)定任務(wù)如何重復(fù)執(zhí)行),可以是一個表示秒的整數(shù),也可以是一個 timedelta 對象,或者是一個 crontab 對象
args
任務(wù)的參數(shù)列表
kwargs
任務(wù)的參數(shù)字典
options
所有 apply_async 所支持的參數(shù)
啟動celery worker進程在項目根目錄執(zhí)行命令1celery -A celeryapp worker -l=info #celeryapp為項目文件所在的package名稱
啟動celery beat進程
啟動Celery Beat進程,定時將任務(wù)發(fā)送到Broker,在項目根目錄執(zhí)行下面命令1celery beat -A celeryapp
之后在啟動的worker窗口可以看到任務(wù)定時執(zhí)行的情況。
啟動worker和beat進程也可以放在同一個命令中執(zhí)行1celery -B -A celeryapp worker --loglevel=info
更多celery定時任務(wù)相關(guān)內(nèi)容點擊Periodic Tasks查看官方介紹。
總結(jié)
以上是生活随笔為你收集整理的python 任务调度 celery_python任务调度模块celery(二)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 不刷新页面的tab_SwiftUI小技巧
- 下一篇: 时间控件_Selenium时间控件的处理