python elif可以单独使用_Celery在python中的单独使用
簡單使用:
1.目錄結(jié)構(gòu)
-app_task.py
-worker.py
-result.py
2.在需要進(jìn)行異步執(zhí)行的文件app_task.py中導(dǎo)入celery,并實(shí)例化出一個(gè)對(duì)象,傳入消息中間和數(shù)據(jù)存儲(chǔ)配置參數(shù)
broker = 'redis://127.0.0.1:6379/1' #使用redis第一個(gè)庫
backend = 'redis://127.0.0.1:6379/2' #使用redis第二個(gè)庫
cel = celery.Celery('test',broker=broker,backend=backend)
3.在需要進(jìn)行異步執(zhí)行的函數(shù)上添加裝飾器
@cle.taskdefadd(x,y):return x+y
4.新建文件worker.py用來添加任務(wù)
from app_task importadd
result= add.delay(4,2)print(result) #此結(jié)果不是add執(zhí)行結(jié)果,而是放入消息隊(duì)列中的一個(gè)任務(wù)id
5.新建文件result.py用來接收結(jié)果
from celery.result importAsyncResultfrom app_task importcel#2e76b24d-364f-46b7-a0eb-f69f663dfb0d
async1 = AsyncResult(id="56db7832-d0f4-4b9b-ba92-4fb08d0f9592", app=cel) #id處放置消息隊(duì)列中任務(wù)的id
if async1.successful(): #判斷執(zhí)行任務(wù)成功
result =async1.get()print(result)#result.forget() # 將結(jié)果刪除
elifasync1.failed():print('執(zhí)行失敗')elif async1.status == 'PENDING':print('任務(wù)等待中被執(zhí)行')elif async1.status == 'RETRY':print('任務(wù)異常后正在重試')elif async1.status == 'STARTED':print('任務(wù)已經(jīng)開始被執(zhí)行')
6.添加任務(wù)+啟動(dòng)工人+查看結(jié)果
1.運(yùn)行worker.py
2.終端中輸入命令:celery worker -A app_task -l info -P eventlet # 其中app_task為配置參數(shù)的文件名
3.運(yùn)行result.py查看結(jié)果
多任務(wù)使用:
1.目錄結(jié)構(gòu),將簡單使用中的app_task改為一個(gè)celery_task文件夾,其中放至少兩個(gè)文件:celery.py(名字固定)/task1.py(方具體任務(wù)的文件)/根據(jù)需求放置其他任務(wù)文件
-celery_task
-celery.py # 配置參數(shù)
-task1.py # 任務(wù)文件,根據(jù)需求添加
-task2.py
...
-worker.py # 添加任務(wù)
-result.py # 查看結(jié)果
2.在任務(wù)文件task1.py中導(dǎo)入cel,并以裝飾器形式加在函數(shù)上
from .celery importcel
@cel.taskdefmul(x,y):return x*y
3.celery.py中配置參數(shù)
from celery importCelery
cel=Celery('celery_demo',
backend= 'redis://127.0.0.1:6379/1',
broker= 'redis://127.0.0.1:6379/2',
include=['celery_task.task1','celery_task.task2']
)
cel.conf.timezone= 'Asia/Shanghai'cel.conf.enable_utc= False
4.worker.py和簡單使用中的相同
5.result.py與簡單使用中的相同
6.添加任務(wù)+啟動(dòng)工人+查看結(jié)果
1.運(yùn)行worker.py
2.終端中輸入命令:celery worker -A celery_task -l info -P eventlet? ? # 其中celery_task為文件夾名
3.運(yùn)行result.py查看結(jié)果
延時(shí)任務(wù):
1.目錄結(jié)構(gòu)
-celery_task
-celery.py
-task1.py
-task2.py
...
-worker.py
-result.py
2.task1.py與多任務(wù)相同
3.celery.py與多任務(wù)相同
4.worker.py中將傳入的時(shí)間改為指定時(shí)間
ctime = datetime.now() #當(dāng)前時(shí)間
utc_time = datetime.utcfromtimestamp(ctime.timestamp()) #轉(zhuǎn)成本地時(shí)間
time_delta = timedelta(seconds=30) #設(shè)置延時(shí)30s
task_time = utc_time+time_delta #設(shè)定時(shí)間點(diǎn)為30s后
ret1 = add.apply_async(args=[2,3],eta=task_time)
5.result.py與簡單使用中的相同
6.與多任務(wù)相同
定時(shí)任務(wù):
1.目錄結(jié)構(gòu),去掉worker.py,添加任務(wù)變?yōu)樵诮K端輸入命令
-celery.py
-celery.py
-task1.py
-task2.py
...
-result.py
2.task1.py與多任務(wù)相同
3.celery.py中與定時(shí)任務(wù)相比,添加下面參數(shù)
1.設(shè)定時(shí)間間隔,循環(huán)執(zhí)行
cel.conf.beat_schedule ={'add-every-10-seconds':{ #名字隨意起
'task': 'celery_task.task1.mul', #指定要執(zhí)行的任務(wù)
'schedule': timedelta(seconds=2), #指定時(shí)間間隔
'args': (2,3) #傳入?yún)?shù)
}
}
2.指定年月日具體時(shí)間,循環(huán)執(zhí)行
from celery.schedules importcrontab
cel.conf.beat_schedule={'add-every-10-seconds':{'task': 'celery_task.task1.mul','schedule': crontab(minute=28,hour=2,day_of_month=23,month_of_year=6),'args': (6,3)
}
}
4.沒有worker.py
5.result.py與簡單使用中的相同
6.添加任務(wù)+啟動(dòng)工人+查看結(jié)果
1.啟動(dòng)一個(gè)終端,輸入:celery beat -A celery_task -l info # celery_task為文件夾名
2.再啟動(dòng)一個(gè)終端,輸入:celery worker -A celery_task -l info -P eventlet
3.運(yùn)行result.py查看結(jié)果
總結(jié)
以上是生活随笔為你收集整理的python elif可以单独使用_Celery在python中的单独使用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 服务器采购框架合同协议书范本,手写一个满
- 下一篇: python模型的属性是什么_pytho