django-celery使用
1.新進(jìn)一個django項(xiàng)目
- proj/- proj/__init__.py- proj/settings.py- proj/urls.py - manage.py2.在該項(xiàng)目創(chuàng)建一個proj / proj / celery.py模塊來定義Celery實(shí)例
from __future__ import absolute_importimport osfrom celery import Celery# set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings') #注意這里的projfrom django.conf import settings # noqa app = Celery('proj') #創(chuàng)建應(yīng)用,再這之前要先設(shè)置上面的os.environ.setdefault,設(shè)置意味著celery命令行程序?qū)⒅繢jango項(xiàng)目的位置 # 你可以在這里直接傳遞對象,但是使用字符串更好,因?yàn)楫?dāng)使用Windows或execv時,worker不必序列化對象app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) #打開自動發(fā)現(xiàn)apps里面的那些應(yīng)用有沒有包含tasks.py文件,
#那么Celery應(yīng)用就會自動去檢索創(chuàng)建的任務(wù)。比如你添加了一個任務(wù),在django中會實(shí)時地檢索出來。
#例如:這樣您就不必手動將各個模塊添加到CELERY_IMPORTS設(shè)置中
#- app1/ # - app1/tasks.py # - app1/models.py #- app2/ # - app2/tasks.py # - app2/models.py
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
?實(shí)例2
# -*- coding: utf-8 -*- """ celery 任務(wù)示例本地啟動celery命令: python manage.py celery worker --settings=settings 周期性任務(wù)還需要啟動celery調(diào)度命令:python manage.py celerybeat --settings=settings """ import datetimefrom celery import task from celery.schedules import crontab from celery.task import periodic_taskfrom common.log import logger@task() def async_task(x, y):"""定義一個 celery 異步任務(wù)"""logger.error(u"celery 定時任務(wù)執(zhí)行成功,執(zhí)行結(jié)果:{:0>2}:{:0>2}".format(x, y))return x + ydef execute_task():"""執(zhí)行 celery 異步任務(wù)調(diào)用celery任務(wù)方法:task.delay(arg1, arg2, kwarg1='x', kwarg2='y')task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})delay(): 簡便方法,類似調(diào)用普通函數(shù)apply_async(): 設(shè)置celery的額外執(zhí)行選項(xiàng)時必須使用該方法,如定時(eta)等詳見 :http://celery.readthedocs.org/en/latest/userguide/calling.html"""now = datetime.datetime.now()logger.error(u"celery 定時任務(wù)啟動,將在10s后執(zhí)行,當(dāng)前時間:{}".format(now))# 調(diào)用定時任務(wù)async_task.apply_async(args=[now.hour, now.minute], eta=now + datetime.timedelta(seconds=10))@periodic_task(run_every=crontab(minute='*', hour='*', day_of_week="*")) def get_time():"""celery 周期任務(wù)示例run_every=crontab(minute='*/5', hour='*', day_of_week="*"):每 5 分鐘執(zhí)行一次任務(wù)periodic_task:程序運(yùn)行時自動觸發(fā)周期任務(wù)"""execute_task()now = datetime.datetime.now()logger.error(u"celery 周期任務(wù)調(diào)用成功,當(dāng)前時間:{}".format(now))3.修改proj / proj /__init__.py
from __future__ import absolute_import #目的是拒絕隱士引入,celery.py和celery沖突。 #這將確保在Django啟動時始終導(dǎo)入應(yīng)用程序,以便@shared_task裝飾器使用 from .celery import app as celery_app # noqa4.在配置文件proj/settings.py里面添加
import djcelery djcelery.setup_loader() BROKER_URL = 'redis://127.0.0.1:6379/10'#BROKER_URL:broker是代理人,它負(fù)責(zé)分發(fā)任務(wù)給worker去執(zhí)行。我使用的是Redis作為broker CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/11'#不填就是默認(rèn)'djcelery.backends.database:DatabaseBackend' # CELERY_RESULT_BACKEND = 'redis://localhost:6379'CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = False #剛開始時區(qū)不準(zhǔn),一直是UTC時間,后來索性把utc禁用
CELERYD_CONCURRENCY = 10
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' # 定時任務(wù)
# 這是使用了django-celery默認(rèn)的數(shù)據(jù)庫調(diào)度模型,任務(wù)執(zhí)行周期都被存在你指定的orm數(shù)據(jù)庫中
CELERYD_MAX_TASKS_PER_CHILD = 1 # 每個worker最多執(zhí)行1個任務(wù)就會被銷毀,可防止內(nèi)存泄露
INSTALLED_APPS += ( 'djcelery')
?實(shí)例2
# 是否啟用celery任務(wù) IS_USE_CELERY = True # 本地開發(fā)的 celery 的消息隊(duì)列(RabbitMQ)信息 BROKER_URL_DEV = 'amqp://guest:guest@127.0.0.1:5672/' # TOCHANGE 調(diào)用celery任務(wù)的文件路徑, List of modules to import when celery starts. CELERY_IMPORTS = ('home_application.celery_tasks', ) # =============================================================================== # CELERY 配置 # =============================================================================== if IS_USE_CELERY:try:import djceleryINSTALLED_APPS += ('djcelery', # djcelery )djcelery.setup_loader()CELERY_ENABLE_UTC = FalseCELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"if "celery" in sys.argv:DEBUG = False# celery 的消息隊(duì)列(RabbitMQ)信息BROKER_URL = os.environ.get('BK_BROKER_URL', BROKER_URL_DEV)if RUN_MODE == 'DEVELOP':from celery.signals import worker_process_init@worker_process_init.connectdef configure_workers(*args, **kwargs):import djangodjango.setup()except:pass?
5.創(chuàng)建任務(wù) tasks,在某個應(yīng)用中demoapp/tasks.py
@task() 裝飾器說明
如果使用了多個裝飾器那么需要task裝飾器在最后即在最上面,一般情況使用的是從celeryapp中引入的app作為的裝飾器:app.task(),如果是django那種在app中定義的task則需要使用@shared_task
from __future__ import absolute_import from celery import shared_task@shared_task def add(x, y):#這里可以把x+y結(jié)果寫入數(shù)據(jù)庫return x + y@shared_task def mul(x, y):return x * y@shared_task def xsum(numbers):return sum(numbers)@task(ignore_result=True,max_retries=1,default_retry_delay=10) #創(chuàng)建測試用的task
def just_print():print "Print from celery task"
6.快速執(zhí)行
只需要找到方法,用方法加delay就可。
比如上面的文件:
from demoapp.tasks import add r = add.delay(3,5) # 執(zhí)行這一行就是再下發(fā)任務(wù)在后臺執(zhí)行了,結(jié)果到時候在取就可以了,使用技巧,一般會創(chuàng)建一個標(biāo)識健,存到數(shù)據(jù)庫,然后傳到我們要執(zhí)行的方法,在執(zhí)行方法里面寫上功能,把執(zhí)行結(jié)果保存到指定數(shù)據(jù)庫,等執(zhí)行成功后,就用我們之前的標(biāo)識鍵去查。
7.定時任務(wù)的使用(第一種方法-使用數(shù)據(jù)庫)
使用其實(shí)就是修改數(shù)據(jù)庫,修改后django-celery會實(shí)時推送到celery-beat里生效。所以只要再開發(fā)一個頁面去配置djcelery_periodictask及其它表就可以了。
?
djcelery提供了一些Model(定義在djcelery/models.py文件),數(shù)據(jù)庫模型如下,
| periodictask | 描述定時任務(wù)。重要字段有: name: 字符串,標(biāo)識符 task,字符串,任務(wù)函數(shù)/類所在的路徑,一般是celery_imports + function name。 interval:外鍵指向intervalschedule,表示每隔多少時間執(zhí)行 crontab:外鍵指向crontabschedule,表示在某一時刻執(zhí)行。 enabled:是否生效 expires:? 任務(wù)過期時間 | ? |
| intervalschedule | 表示時間間隔,有兩個參數(shù): every:正數(shù);period間隔單位。比如intervalschedule(every=2, period='day')表示每隔2天 | ? |
| crontabschedule | 表示某一時刻,有minute、hour、day_of-week、day_of_month、day_of_year它們的組合意義,參見cron時間表示法,比如0 0 * 10 * 表示每個月的10號凌晨。 | ? |
說明:
-
任務(wù)和定時任務(wù)的區(qū)別:定時任務(wù) = 任務(wù) +?intervalschedule/crontabschedule?。兩個定時任務(wù)可以執(zhí)行同一個任務(wù)。
-
任務(wù)沒有相應(yīng)的Model,用字符串表示,即periodictask模型的task字段
-
定時任務(wù)有相應(yīng)的Model即periodictask。
-
通過它提供的Model Query API來操作,同平常的數(shù)據(jù)庫查詢一樣。
8.定時任務(wù)示例
admin畢竟是給后臺管理人員使用的,它所有的參數(shù)都暴露給使用者了。下面是一個實(shí)際使用的例子。
?
需求:ajax實(shí)現(xiàn)月度定時任務(wù)monthly_reading_task的執(zhí)行和控制,即
每個月的某一天執(zhí)行該任務(wù);可以選擇開啟或者關(guān)閉該定時任務(wù);能夠選擇任務(wù)在哪一天(1-28日)執(zhí)行。
界面看起來是這樣的:
基本上就是Model的增刪改查。就不是通過admin來操作了。
查詢?nèi)蝿?wù)信息
def read(self, request, *args, **kwargs):try:task = celery_models.PeriodicTask.objects.get(name=self.TASK_NAME)if task.enabled:return {'enabled': True,'day_of_month': int(task.crontab.day_of_month),'last_run_at': task.last_run_at if task.last_run_at else '0'}else:return {'enabled': False}except celery_models.PeriodicTask.DoesNotExist:return {'enabled': False}?
更新日期
def create(self, request, *args, **kwargs):enabled = request.POST.get('enabled', None)if enabled not in [self.ENABLED_POST_VALUE, self.DISABLED_POST_VALUE]:return self.operate_fail('無效參數(shù)')if enabled == self.DISABLED_POST_VALUE:self.disable_task(self.TASK_NAME)return self.operate_success()else:try:day_of_month = int(request.POST.get('day_of_month', ''))if day_of_month > 28 or day_of_month < 1:return self.operate_fail('日期必須在1-28日之間')task, created = celery_models.PeriodicTask.objects.get_or_create(name="monthly_reading",task="mrs_app.my_celery.tasks.monthly_reading_task")if created:crontab = celery_models.CrontabSchedule.objects.create(day_of_month=day_of_month,hour=0,minute=0)crontab.save()task.crontab = crontabtask.enabled = Truetask.save()else:task.crontab.day_of_month = day_of_monthtask.crontab.save()task.enabled = Truetask.save()return self.operate_success()except ValueError:return self.operate_fail('抄表日不能為空')新建定時任務(wù)
def celery_get_tag(request):name = 'test'task = 'home_application.celery_tasks.async_task'task_args ={"x":1, "y":1}crontab_time = {'month_of_year':'*','day_of_month':'*','day_of_week':'*','hour':'*','minute':'*'}create_task(name, task , task_args, crontab_time)result = return_result(status=True, code=200, message="添加任務(wù)成功")return result#創(chuàng)建任務(wù) def create_task(name, task, task_args, crontab_time):'''name # 任務(wù)名字task # 執(zhí)行的任務(wù) "myapp.tasks.add"task_args # 任務(wù)參數(shù) {"x":1, "Y":1}crontab_time # 定時任務(wù)時間 格式:{'month_of_year': 9 # 月份'day_of_month': 5 # 日期'hour': 01 # 小時'minute':05 # 分鐘}'''# task任務(wù), created是否定時創(chuàng)建task, created = celery_models.PeriodicTask.objects.get_or_create(name=name,task=task)# 獲取 crontabcrontab = celery_models.CrontabSchedule.objects.filter(**crontab_time).first()if crontab is None:# 如果沒有就創(chuàng)建,有的話就繼續(xù)復(fù)用之前的crontabcrontab = celery_models.CrontabSchedule.objects.create(**crontab_time)task.crontab = crontab # 設(shè)置crontabtask.enabled = True # 開啟tasktask.kwargs = json.dumps(task_args) # 傳入task參數(shù)#expiration = timezone.now() + datetime.timedelta(day=1)#task.expires = expiration # 設(shè)置任務(wù)過期時間為現(xiàn)在時間的一天以后 task.save()result = return_result(status=True, code=200, message="添加任務(wù)成功")關(guān)閉定時
def disable_task(self, name):try:task = celery_models.PeriodicTask.objects.get(name=name)task.enabled = Falsetask.save()return Trueexcept celery_models.PeriodicTask.DoesNotExist:return True9.定時任務(wù)的使用(第二種方法-CELERYBEAT_SCHEDULE)
djcelery在初始化中主要完成兩件:
在settings.CELERY_IMPORTS定義下的模塊搜索所有任務(wù)。這個對數(shù)據(jù)庫沒有任何改變,只是用Admin添加定時任務(wù)時periodictask.task字段變成選擇框,列出了所有定義的任務(wù)。
從settings.CELERYBEAT_SCHEDULE創(chuàng)建定時任務(wù),這個會創(chuàng)建數(shù)據(jù)記錄,相當(dāng)于celery_models.PeriodicTask.objects.create(..)語句。
所以在settings文件中可添加配置信息,例如:
CELERYBEAT_SCHEDULE = {'add-every-3-minutes': {'task': 'mrs_app.my_celery.tasks.monthly_reading_task','schedule': timedelta(minutes=3)# 'schedule': crontab(minute=u'40', hour=u'17',), }, }| celerybeat_schedule | 定義任務(wù),上面注釋表示每隔3分鐘執(zhí)行monthly_reading_task任務(wù) |
??
schedule就是執(zhí)行計劃,可以用crontab格式,用這種配置,會自動更新數(shù)據(jù)庫。一般不用這種方法。
?10.啟動
啟動 python manage.py celery worker -l info
如果有定時任務(wù)的話,還需要啟動心跳
?另開一個cmd窗口 python manage.py celery beat ?(windows下-B選項(xiàng)不可用)
或者后臺:/home/python3/bin/python3 /project/manage.py celery worker --loglevel=info >/dev/null 2>&1 &/home/python3/bin/python3 /project/manage.py celery beat >> /project/celery.log 2>&1 &
(參考)官網(wǎng)和https://my.oschina.net/kinegratii/blog/292395
轉(zhuǎn)載于:https://www.cnblogs.com/CGCong/p/9391436.html
總結(jié)
以上是生活随笔為你收集整理的django-celery使用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 虚拟机安装CentOS,网络配置
- 下一篇: 线程、协成、IO模型