celery介紹
Celery 是一個強大的分布式任務隊列,它可以讓任務的執(zhí)行完全脫離主程序,甚至可以被分配到其他主機上運行。我們通常使用它來實現(xiàn)異步任務( async task )和定時任務( crontab )。 異步任務比如是發(fā)送郵件、或者文件上傳, 圖像處理等等一些比較耗時的操作 ,定時任務是需要在特定時間執(zhí)行的任務。它的架構(gòu)組成如下圖:
任務隊列
任務隊列是一種跨線程、跨機器工作的一種機制.
任務隊列中包含稱作任務的工作單元。有專門的工作進程持續(xù)不斷的監(jiān)視任務隊列,并從中獲得新的任務并處理.
任務模塊
包含異步任務和定時任務。其中,異步任務通常在業(yè)務邏輯中被觸發(fā)并發(fā)往任務隊列,而定時任務由 Celery Beat 進程周期性地將任務發(fā)往任務隊列。
消息中間件 Broker
Broker ,即為任務調(diào)度隊列,接收任務生產(chǎn)者發(fā)來的消息(即任務),將任務存入隊列。 Celery 本身不提供隊列服務,官方推薦使用 RabbitMQ 和 Redis 等。
任務執(zhí)行單元 Worker
Worker 是執(zhí)行任務的處理單元,它實時監(jiān)控消息隊列,獲取隊列中調(diào)度的任務,并執(zhí)行它。
任務結(jié)果存儲 Backend
Backend 用于存儲任務的執(zhí)行結(jié)果,以供查詢。同消息中間件一樣,存儲也可使用 RabbitMQ, Redis 和 MongoDB 等。
使用 Celery 實現(xiàn)異步任務的步驟:
(1) 創(chuàng)建一個 Celery 實例
(2) 啟動 Celery Worker ,通過delay() 或 apply_async()(delay 方法封裝了 apply_async, apply_async支持更多的參數(shù) ) 將任務發(fā)布到broker
(3) 應用程序調(diào)用異步任務
(4)存儲結(jié)果 (發(fā)布的任務需要return才會有結(jié)果,否則為空)
Celery Beat:任務調(diào)度器,Beat進程會讀取配置文件的內(nèi)容,周期性地將配置中到期需要執(zhí)行的任務發(fā)送給任務隊列
使用 Celery 實現(xiàn)定時任務的步驟:
(1) 創(chuàng)建一個 Celery 實例
(2) 配置文件中配置任務 ,發(fā)布任務 celery A xxx beat
(3) 啟動 Celery Worker
(4) 存儲結(jié)果
celery定時任務簡單使用
以下是使用celery實現(xiàn)一個定時任務的demo,能夠良好的定時執(zhí)行。
目錄結(jié)構(gòu)如下
shylin@shylin:~/Desktop$ tree celery_task
celery_task
├── celeryconfig.py ? ?# celeryconfig配置文件
├── celeryconfig.pyc
├── celery.py ? # celery對象
├── celery.pyc
├── epp_scripts ? # 任務函數(shù)
│ ? ├── __init__.py
│ ? ├── __init__.pyc
│ ? ├── test1.py
│ ? ├── test1.pyc
│ ? ├── test2.py
│ ? └── test2.pyc
├── __init__.py
└── __init__.pyc
celery配置文件 celeryconfig.py
from __future__ import absolute_import # 拒絕隱式引入,因為celery.py的名字和celery的包名沖突,需要使用這條語句讓程序正確地運行
from celery.schedules import crontabbroker_url = "redis://127.0.0.1:6379/5" ?
result_backend = "redis://127.0.0.1:6379/6"broker_url = "redis://127.0.0.1:6379/2" ? # 使用redis存儲任務隊列
result_backend = "redis://127.0.0.1:6379/6" ?# 使用redis存儲結(jié)果task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = "Asia/Shanghai" ?# 時區(qū)設置
worker_hijack_root_logger = False ?# celery默認開啟自己的日志,可關閉自定義日志,不關閉自定義日志輸出為空
result_expires = 60 * 60 * 24 ?# 存儲結(jié)果過期時間(默認1天)# 導入任務所在文件
imports = ["celery_task.epp_scripts.test1", ?# 導入py文件"celery_task.epp_scripts.test2",
]# 需要執(zhí)行任務的配置
beat_schedule = {"test1": {"task": "celery_task.epp_scripts.test1.celery_run", ?#執(zhí)行的函數(shù)"schedule": crontab(minute="*/1"), ? # every minute 每分鐘執(zhí)行?"args": () ?# # 任務函數(shù)參數(shù)},"test2": {"task": "celery_task.epp_scripts.test2.celery_run","schedule": crontab(minute=0, hour="*/1"), ? # every minute 每小時執(zhí)行"args": ()},}"schedule": crontab()與crontab的語法基本一致
"schedule": crontab(minute="*/10", ?# 每十分鐘執(zhí)行
"schedule": crontab(minute="*/1"), ? # 每分鐘執(zhí)行
"schedule": crontab(minute=0, hour="*/1"), ? ?# 每小時執(zhí)行
celery初始化文件
# coding:utf-8
from __future__ import absolute_import # 拒絕隱式引入,因為celery.py的名字和celery的包名沖突,需要使用這條語句讓程序正確地運行
from celery import Celery# 創(chuàng)建celery應用對象
app = Celery("celery_demo")# 導入celery的配置信息
app.config_from_object("celery_task.celeryconfig")
任務函數(shù)(epp_scripts目錄下)
# test1.py
from celery_task.celery import appdef test11():print("test11----------------")def test22():print("test22--------------")test11()@app.task
def celery_run():test11()test22()if __name__ == '__main__':celery_run()
------------------------------------------------------------
# test2.py
from celery_task.celery import appdef test33():print("test33----------------")# print("------"*50)def test44():print("test44--------------")# print("------" * 50)test33()@app.task
def celery_run():test33()test44()if __name__ == '__main__':celery_run()
發(fā)布任務
# 在celery_task同級目錄下執(zhí)行
shylin@shylin:~/Desktop$ celery -A celery_task beat
celery beat v4.2.0 (windowlicker) is starting.
__ ? ?- ? ?... __ ? - ? ? ? ?_
LocalTime -> 2018-06-29 09:42:02
Configuration ->. broker -> redis://127.0.0.1:6379/5. loader -> celery.loaders.app.AppLoader. scheduler -> celery.beat.PersistentScheduler. db -> celerybeat-schedule. logfile -> [stderr]@%WARNING. maxinterval -> 5.00 minutes (300s)
執(zhí)行任務
# 在celery_task同級目錄下執(zhí)行
shylin@shylin:~/Desktop$ celery -A celery_task worker --loglevel=info-------------- celery@shylin v4.2.0 (windowlicker)
---- **** -----?
--- * *** ?* -- Linux-4.15.0-23-generic-x86_64-with-Ubuntu-18.04-bionic 2018-06-29 12:06:53
-- * - **** ---?
- ** ---------- [config]
- ** ---------- .> app: ? ? ? ? belletone:0x7f5b876f1a10
- ** ---------- .> transport: ? redis://127.0.0.1:6379/5
- ** ---------- .> results: ? ? redis://127.0.0.1:6379/6
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----?-------------- [queues].> celery ? ? ? ? ? exchange=celery(direct) key=celery[tasks]. celery_task.epp_scripts.test1.celery_run. celery_task.epp_scripts.test2.celery_run[2018-06-29 12:06:54,107: INFO/MainProcess] Connected to redis://127.0.0.1:6379/5
[2018-06-29 12:06:54,116: INFO/MainProcess] mingle: searching for neighbors
[2018-06-29 12:06:55,143: INFO/MainProcess] mingle: all alone
[2018-06-29 12:06:55,161: INFO/MainProcess] celery@shylin ready.
[2018-06-29 12:07:00,073: INFO/MainProcess] Received task: celery_task.epp_scripts.test2.celery_run[f4522425-b744-4f1a-8c6c-eb37ab99842b] ?
[2018-06-29 12:07:00,075: INFO/MainProcess] Received task: celery_task.epp_scripts.test1.celery_run[3e00aa9c-0947-49b9-8ee4-cc75d6dc37ab] ?
[2018-06-29 12:07:00,078: WARNING/ForkPoolWorker-6] test33----------------
[2018-06-29 12:07:00,079: WARNING/ForkPoolWorker-6] test44--------------
[2018-06-29 12:07:00,079: WARNING/ForkPoolWorker-6] test33----------------
[2018-06-29 12:07:00,079: WARNING/ForkPoolWorker-4] test11----------------
[2018-06-29 12:07:00,081: WARNING/ForkPoolWorker-4] test22--------------
[2018-06-29 12:07:00,081: WARNING/ForkPoolWorker-4] test11----------------
[2018-06-29 12:07:00,094: INFO/ForkPoolWorker-6] Task celery_task.epp_scripts.test2.celery_run[f4522425-b744-4f1a-8c6c-eb37ab99842b] succeeded in 0.0169868329995s: None
[2018-06-29 12:07:00,094: INFO/ForkPoolWorker-4] Task celery_task.epp_scripts.test1.celery_run[3e00aa9c-0947-49b9-8ee4-cc75d6dc37ab] succeeded in 0.0161407030009s: None
celery相關命令
# 在celery_task同級目錄下執(zhí)行 ? celery worker/beat xxx
celery -A celery_task beat ?# 發(fā)布任務
celery -A celery_task worker --loglevel=info ?# 執(zhí)行任務
celery -B -A celery_task worker --loglevel=info ?# 合并成一條/home/shylin/.virtualenvs/belle/bin/celery -B -A /home/shylin/Desktop/sky_server worker --loglevel=infocommand= /usr/local/thirdparty/sky_server_env/bin/celery ?-B -A celery_task worker
directory=/usr/local/cloud ? # celery_task work不確定是否可行?
# 注意修改broker路徑
# celery_task放在 /usr/local/cloud/
python -m celeryconfig # 檢查配置文件nohup /usr/local/thirdparty/sky_server_env/bin/celery ?-B -A celery_task worker -l info --workdir=/usr/local/cloud/ & ? ? # 啟動命令
定時方式
from celery.schedules import crontab
from datetime import timedelta
......方式一:"schedule": timedelta(seconds=30), # hours=xx,minutes=xx 每小時/每分鐘 ?(此項可以精確到秒)方式二:"schedule": crontab(minute="*/10"), ? # every 10 minutes ?
# 后臺啟動 celery worker進程?
celery multi start work_1 -A appcelery ?
# work_1 為woker的名稱,可以用來進行對該進程進行管理# 多進程相關
celery multi stop WOERNAME ? ? ?# 停止worker進程,有的時候這樣無法停止進程,就需要加上-A 項目名,才可以刪掉
celery multi restart WORKNAME ? ? ? ?# 重啟worker進程# 查看進程數(shù)
celery status -A celery_task ? ? ? # 查看該項目運行的進程數(shù) ? celery_task同級目錄下執(zhí)行完畢后會在當前目錄下產(chǎn)生一個二進制文件,celerybeat-schedule 。
該文件用于存放上次執(zhí)行結(jié)果:1、如果存在celerybeat-schedule文件,那么讀取后根據(jù)上一次執(zhí)行的時間,繼續(xù)執(zhí)行。2、如果不存在celerybeat-schedule文件,那么會立即執(zhí)行一次。3、如果存在celerybeat-schedule文件,讀取后,發(fā)現(xiàn)間隔時間已過,那么會立即執(zhí)行。
————————————————
版權聲明:本文為CSDN博主「Shyllin」的原創(chuàng)文章,遵循 CC 4.0 BY-SA 版權協(xié)議,轉(zhuǎn)載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/Shyllin/article/details/80940643
總結(jié)
以上是生活随笔為你收集整理的celery定时任务简单使用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。