日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

Celery-分布式任务队列

發布時間:2025/7/14 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Celery-分布式任务队列 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、介紹

官方文檔:http://docs.celeryproject.org/en/latest/index.html

pip3 install celery

Celery是一個專注于實時處理和任務調度的分布式任務隊列,通過它可以輕松的實現任務的異步處理。

使用Celery的常見場景:

  • Web應用。當用戶觸發的一個操作需要較長時間才能執行完成時,可以把它作為任務交給Celery去異步執行,執行完再返回給用戶。這段時間用戶不需要等待,提高了網站的整體吞吐量和響應時間。
  • 定時任務。生產環境經常會跑一些定時任務。假如你有上千臺的服務器、上千種任務,定時任務的管理很困難,Celery可以幫助我們快速在不同的機器設定不同種任務。
  • 同步完成的附加工作都可以異步完成。比如發送短信/郵件、推送消息、清理/設置緩存等。

Celery包含如下組件:

  • Celery Beat:任務調度器,Beat進程會讀取配置文件的內容,周期性地將配置中到期需要執行的任務發送給任務隊列。
  • Celery Worker:執行任務的消費者,通常會在多臺服務器運行多個消費者來提高執行效率。
  • Broker:消息代理,或者叫作消息中間件,接受任務生產者發送過來的任務消息,存進隊列再按序分發給任務消費方(通常是消息隊列或者數據庫)。
  • Producer:調用了Celery提供的API、函數或者裝飾器而產生任務并交給任務隊列處理的都是任務生產者。
  • Result Backend:任務處理完后保存狀態信息和結果,以供查詢。

二、簡單示例

創建一個tasks.py:

from celery import Celeryapp = Celery("tasks", broker="amqp://pd:123456@localhost:5672//",backend="redis://:123456@localhost:6379/0")@app.task def add(x, y):return x+y

啟動Celery Worker來開始監聽并執行任務:

celery -A tasks worker -l info

更多有關命令:

celery worker --help

再打開一個終端, 進行命令行模式,調用任務:

>>> from tasks import add >>> relt = add.delay(10, 10) >>> relt.ready() # 檢查任務是否已經完成 True >>> relt.get() # 獲取任務結果,可設置timeout超時 20 >>> relt <AsyncResult: 470d5f45-42eb-4b0c-bd38-06b85fa5599b> >>> relt.id '470d5f45-42eb-4b0c-bd38-06b85fa5599b' >>> relt.result 20 >>> relt.status 'SUCCESS' from celery import Celery from celery.result import AsyncResultapp = Celery("tasks", broker="amqp://pd:123456@localhost:5672/pdvhost",backend="redis://:123456@localhost:6379/0")result = AsyncResult(id="470d5f45-42eb-4b0c-bd38-06b85fa5599b", app=app) print(result.get()) # 20 View Code

三、配置

官方文檔,配置:http://docs.celeryproject.org/en/latest/userguide/configuration.html#configuration

像上面簡單示例中,要想添加配置,則可以直接在應用程序設置配置:

app.conf.task_serializer = "json"

如果您一次配置多個設置,則:

app.conf.update(task_serializer="json",accept_content=["json"],result_serializer="json",timezone="Europe/Oslo",enable_utc=True, )

對于大型項目,建議使用專用配置模塊。因為項目復雜,最好做到程序的解耦,所以將配置保存在集中位置是一個非常好的選擇,一般默認 celeryconfig.py 模塊是用來保存配置的,你也可以使用自己定義的名字,然后通過調用 app.config_from_object() 方法告訴 Celery 實例使用配置模塊:

app.config_from_object("celeryconfig") # 或者 from . import celeryconfig app.config_from_object(celeryconfig)

四、在項目中使用Celery

項目布局:

方案選擇:

  • RabbitMQ作為消息代理。不選Redis是因為如果Redis發生意外,會造成數據丟失等后果。
  • Msgpack做序列化。Msgpack是一個二進制的類json的序列化方案,它比json的數據結構更小,傳輸更快。
  • Redis做結果存儲。
  • pip3 install msgpack ########## celeryapp.py ########## from celery import Celery from . import celeryconfigapp = Celery("proj.celeryapp", include=["proj.tasks"]) app.config_from_object(celeryconfig)if __name__ == "__main__":app.start()########## tasks.py ########## from .celeryapp import app@app.task def add(x, y):return x+y@app.task def mul(x, y):return x*y########## celeryconfig.py ########## # 使用RabbitMQ作為消息代理 broker_url = "amqp://pd:123456@114.116.50.214:5672//" # # 把任務結果存在了Redis result_backend = "redis://:123456@114.116.50.214:6379/0" # 任務序列化和反序列化使用msgpack方案 task_serializer = "msgpack" # 讀取任務結果一般性能要求不高,所以使用了可讀性更好的json result_serializer = "json" # 任務過期時間 result_expires = 60*60*24 # 指定接受的內容類型 accept_content = ["json", "msgpack"] 代碼示例

    五、在后臺運行worker

    在生產中,我們需要在后臺運行worker,官方文檔daemonization教程中有詳細描述。

    守護程序腳本使用celery multi命令在后臺啟動一個或多個worker:

    # 啟動worker后臺運行 celery multi start w1 -A proj.celeryapp -l info celery multi start w2 -A proj.celeryapp -l info PS:如果使用的是默認的celery.py,那么直接proj即可# 重啟 celery multi restart w1 -A proj -l info# 停止 celery multi stop w1 -A proj -l info# 確保退出之前完成所有當前正在執行的任務 celery multi stopwait w1 -A proj -l info

    默認情況下,它會在當前目錄下創建的pid和日志文件,為了防止多個worker在彼此之上啟動,最好將這些文件放在專用目錄中:

    mkdir /var/run/celery mkdir /var/log/celery celery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid --logfile=/var/log/celery/%n%I.log

    六、指定隊列傳送任務

    官方文檔:https://celery.readthedocs.io/en/latest/userguide/routing.html#guide-routing

    在 celeryconfig.py 中加入以下配置:

    # 路由鍵以 task. 開頭的消息都進default隊列 # 路由鍵以 web. 開頭的消息都進web_tasks隊列 task_queues = (Queue("default", routing_key="task.#"),Queue("web_tasks", routing_key="web.#"), ) # 默認的交換機名字為tasks task_default_exchange = "tasks" # 設置默認交換類型為topic task_default_exchange_type = "topic" # 默認的路由鍵是 task.default task_default_routing_key = "task.default" # 要將任務路由到web_tasks隊列,可以在task_routes設置中添加條目 task_routes = {# tasks.add的消息會進入web_tasks隊列"proj.tasks.add": {"queue": "web_tasks","routing_key": "web.add",}, }

    其他代碼與上面 四 中的相同。

    啟動worker,指定該worker工作于哪個隊列:

    # 該worker只會執行web_tasks隊列中的任務 celery -A proj.celeryapp worker -Q web_tasks -l info

    七、定時任務

    官方文檔:https://celery.readthedocs.io/en/latest/userguide/periodic-tasks.html

    Celery支持定時任務,設定好任務的執行時間,Celery就會定時自動幫你執行, 這個定時任務模塊叫 celery beat。

    函數版tasks.py:

    from celery import Celery from celery.schedules import crontabapp = Celery("tasks", broker="amqp://pd:123456@localhost:5672//", backend="redis://:123456@localhost:6379/0") app.conf.timezone = "Asia/Shanghai"@app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs):# 每5秒執行一次 test("Hello")sender.add_periodic_task(5.0, test.s("Hello"), name="every-5s")# 每10秒執行一次 test("World")sender.add_periodic_task(10.0, test.s("World"), name="every-10s", expires=5)# 每周一早上 7:30 執行一次 test("Happy Mondays!") sender.add_periodic_task(crontab(hour=7, minute=30, day_of_week=1),test.s("Happy Mondays!"),)@app.task def test(arg):print(arg) View Code celery -A tasks worker -l info celery -A tasks beat -l info

    配置版:

    ########## celeryapp.py ########## from celery import Celery from . import celeryconfigapp = Celery("proj.celeryapp", include=["proj.tasks"]) app.config_from_object(celeryconfig)if __name__ == "__main__":app.start()########## celeryconfig.py ########## broker_url = "amqp://pd:123456@114.116.50.214:5672//" result_backend = "redis://:123456@114.116.50.214:6379/0" task_serializer = "msgpack" result_serializer = "json" result_expires = 60*60*24 accept_content = ["json", "msgpack"] timezone = "Asia/Shanghai"from celery.schedules import crontab beat_schedule = {"every-10s": {"task": "proj.tasks.add","schedule": 10.0,"args": (10, 10)},"every-monday-morning-7:30": {"task": "proj.tasks.mul","schedule": crontab(hour=7, minute=30, day_of_week=1),"args": (10, 10)} }########## tasks.py ########## from .celeryapp import app@app.task def add(x, y):return x+y@app.task def mul(x, y):return x*y View Code celery -A proj.celeryapp worker -l info celery -A proj.celeryapp beat -l info 

    八、在Django中使用celery

    發布任務

    https://celery.readthedocs.io/en/latest/django/first-steps-with-django.html#extensions

    項目布局:

    import os from celery import Celeryos.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings") app = Celery("mysite") app.config_from_object("django.conf:settings", namespace="CELERY") app.autodiscover_tasks()@app.task(bind=True) def debug_task(self):print("Request: {0!r}".format(self.request)) celeryapp.py from .celeryapp import app as celery_app __all__ = ["celery_app"] __init__.py

    settings.py,更多設置參考:https://celery.readthedocs.io/en/latest/userguide/configuration.html

    #for celery CELERY_BROKER_URL = "amqp://pd:123456@114.116.50.214:5672//" CELERY_RESULT_BACKEND = "redis://:123456@114.116.50.214:6379/0"

    在app里的tasks.py里編寫任務:

    from celery import shared_task@shared_task def add(x, y):return x+y@shared_task def mul(x, y):return x*y

    在views里調用celery task:

    from django.shortcuts import HttpResponse from app01 import tasksdef test(request):result = tasks.add.delay(100, 100)return HttpResponse(result.get())

    定時任務

    https://celery.readthedocs.io/en/latest/userguide/periodic-tasks.html#using-custom-scheduler-classes

    1、安裝?django-celery-beat

    pip3 install django-celery-beat

    2、在settings.py中設置

    INSTALLED_APPS = [...,'django_celery_beat', ]

    3、進行數據庫遷移,以便創建定時任務所需的表

    python3 manage.py migrate

    4、開始監測定時任務

    celery -A mysite.celeryapp beat -l info -S django

    5、在django-admin界面設置定時任務

    ?

    轉載于:https://www.cnblogs.com/believepd/p/10643392.html

    總結

    以上是生活随笔為你收集整理的Celery-分布式任务队列的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。