日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Celery的简单使用

發布時間:2025/3/21 编程问答 51 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Celery的简单使用 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

    • celery簡單使用
    • Celery執行異步任務
    • 多任務結構
    • Celery執行定時任務
    • 類似于contab的定時任務
    • Django中使用Celery

celery簡單使用

安裝celery

pip install celery消息中間件:RabbitMQ/Redisapp=Celery('任務名',backend='xxx',broker='xxx')

Celery執行異步任務

基本使用

創建項目celerytest

  • 創建py文件:celery_app_task.py
import celery import time # broker='redis://127.0.0.1:6379/2' 不加密碼 backend='redis://:123456@127.0.0.1:6379/1' broker='redis://:123456@127.0.0.1:6379/2' cel=celery.Celery('test',backend=backend,broker=broker) @cel.task def add(x,y):return x+y
  • 創建py文件:add_task.py,添加任務
from celery_app_task import add result = add.delay(4,5) print(result.id)
  • 創建py文件:run.py,執行任務,或者使用命令執行:celery worker -A celery_app_task -l info

注:windows下:celery worker -A celery_app_task -l info -P eventlet

from celery_app_task import cel if __name__ == '__main__':cel.worker_main()# cel.worker_main(argv=['--loglevel=info')
  • 創建py文件:result.py,查看任務執行結果
from celery.result import AsyncResult from celery_app_task import celasync = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=cel)if async.successful():result = async.get()print(result)# result.forget() # 將結果刪除 elif async.failed():print('執行失敗') elif async.status == 'PENDING':print('任務等待中被執行') elif async.status == 'RETRY':print('任務異常后正在重試') elif async.status == 'STARTED':print('任務已經開始被執行')

執行 add_task.py,添加任務,并獲取任務ID

執行 run.py ,或者執行命令:celery worker -A celery_app_task -l info

執行 result.py,檢查任務狀態并獲取結果

多任務結構

pro_cel├── celery_task# celery相關文件夾│ ├── celery.py # celery連接和配置相關文件,必須叫這個名字│ └── tasks1.py # 所有任務函數│ └── tasks2.py # 所有任務函數├── check_result.py # 檢查結果└── send_task.py # 觸發任務

celery.py

from celery import Celerycel = Celery('celery_demo',broker='redis://127.0.0.1:6379/1',backend='redis://127.0.0.1:6379/2',# 包含以下兩個任務文件,去相應的py文件中找任務,對多個任務做分類include=['celery_task.tasks1','celery_task.tasks2'])# 時區 cel.conf.timezone = 'Asia/Shanghai' # 是否使用UTC cel.conf.enable_utc = False

tasks1.py

import time from celery_task.celery import cel@cel.task def test_celery(res):time.sleep(5)return "test_celery任務結果:%s"%res

tasks2.py

import time from celery_task.celery import cel @cel.task def test_celery2(res):time.sleep(5)return "test_celery2任務結果:%s"%res

check_result.py

from celery.result import AsyncResult from celery_task.celery import celasync = AsyncResult(id="08eb2778-24e1-44e4-a54b-56990b3519ef", app=cel)if async.successful():result = async.get()print(result)# result.forget() # 將結果刪除,執行完成,結果不會自動刪除# async.revoke(terminate=True) # 無論現在是什么時候,都要終止# async.revoke(terminate=False) # 如果任務還沒有開始執行呢,那么就可以終止。 elif async.failed():print('執行失敗') elif async.status == 'PENDING':print('任務等待中被執行') elif async.status == 'RETRY':print('任務異常后正在重試') elif async.status == 'STARTED':print('任務已經開始被執行')

send_task.py

from celery_task.tasks1 import test_celery from celery_task.tasks2 import test_celery2# 立即告知celery去執行test_celery任務,并傳入一個參數 result = test_celery.delay('第一個的執行') print(result.id) result = test_celery2.delay('第二個的執行') print(result.id)

添加任務(執行send_task.py),開啟work:celery worker -A celery_task -l info -P eventlet,檢查任務執行結果(執行check_result.py)

Celery執行定時任務

設定時間讓celery執行一個任務

add_task.py

from celery_app_task import add from datetime import datetime# 方式一 # v1 = datetime(2019, 2, 13, 18, 19, 56) # print(v1) # v2 = datetime.utcfromtimestamp(v1.timestamp()) # print(v2) # result = add.apply_async(args=[1, 3], eta=v2) # print(result.id)# 方式二 ctime = datetime.now() # 默認用utc時間 utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) from datetime import timedelta time_delay = timedelta(seconds=10) task_time = utc_ctime + time_delay# 使用apply_async并設定時間 result = add.apply_async(args=[4, 3], eta=task_time) print(result.id)

類似于contab的定時任務

多任務結構中celery.py修改如下

from datetime import timedelta from celery import Celery from celery.schedules import crontabcel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=['celery_task.tasks1','celery_task.tasks2', ]) cel.conf.timezone = 'Asia/Shanghai' cel.conf.enable_utc = Falsecel.conf.beat_schedule = {# 名字隨意命名'add-every-10-seconds': {# 執行tasks1下的test_celery函數'task': 'celery_task.tasks1.test_celery',# 每隔2秒執行一次# 'schedule': 1.0,# 'schedule': crontab(minute="*/1"),'schedule': timedelta(seconds=2),# 傳遞參數'args': ('test',)},# 'add-every-12-seconds': {# 'task': 'celery_task.tasks1.test_celery',# 每年4月11號,8點42分執行# 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),# 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),# 'args': (16, 16)# }, }

啟動一個beat:celery beat -A celery_task -l info

啟動work執行:celery worker -A celery_task -l info -P eventlet

Django中使用Celery

  • 在項目目錄下創建celeryconfig.py
import djcelery djcelery.setup_loader() CELERY_IMPORTS=('app01.tasks', ) #有些情況可以防止死鎖 CELERYD_FORCE_EXECV=True # 設置并發worker數量 CELERYD_CONCURRENCY=4 #允許重試 CELERY_ACKS_LATE=True # 每個worker最多執行100個任務被銷毀,可以防止內存泄漏 CELERYD_MAX_TASKS_PER_CHILD=100 # 超時時間 CELERYD_TASK_TIME_LIMIT=12*30
  • 在app01目錄下創建tasks.py
from celery import task @task def add(a,b):with open('a.text', 'a', encoding='utf-8') as f:f.write('a')print(a+b)
  • 視圖函數views.py
from django.shortcuts import render,HttpResponse from app01.tasks import add from datetime import datetime def test(request):# result=add.delay(2,3)ctime = datetime.now()# 默認用utc時間utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())from datetime import timedeltatime_delay = timedelta(seconds=5)task_time = utc_ctime + time_delayresult = add.apply_async(args=[4, 3], eta=task_time)print(result.id)return HttpResponse('ok')
  • settings.py
#INSTALLED_APPS = [ # 'djcelery', # 'app01' #]from djagocele import celeryconfig BROKER_BACKEND='redis' BOOKER_URL='redis://127.0.0.1:6379/1' CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'

參考鏈接

總結

以上是生活随笔為你收集整理的Celery的简单使用的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。