日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Celery参数详解、配置参数

發布時間:2025/3/15 编程问答 55 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Celery参数详解、配置参数 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

參數詳解

Celery--Worker

準備:

安裝

pip install celery?

easy_install celery?

使用Redis作為Broker時 ,需安裝 celery-with-redis, 一般使用rabbitmq作為Broker

開始:

使用

啟動一個worker

簡潔--celery -A proj.task worker --loglevel=info

解釋: -A 是指對應的應用程序, 其參數是項目中 Celery實例的位置,也即 celery_app = Celery()的位置。

????????????worker 是指這里要啟動其中的worker,此時,就啟動了一個worker

具體的參數還有很多:

????????可以使用celery worker --help 進行查看,如需查看celery的參數,可以celery --help 進行查看。

????????具體內容文末有詳細說明。

內部分析:

????????當啟動一個worker的時候,這個worker會與broker建立鏈接(tcp長鏈接),然后如果有數據傳輸,則會創建相應的channel, 這個連接可以有多個channel。然后,worker就會去borker的隊列里面取相應的task來進行消費了,這也是典型的消費者生產者模式。

????????其中,這個worker主要是有四部分組成的,task_pool, consumer, scheduler, mediator。其中,task_pool主要是用來存放的是一些worker,當啟動了一個worker,并且提供并發參數的時候,會將一些worker放在這里面。celery默認的并發方式是prefork,也就是多進程的方式,這里只是celery對multiprocessing.Pool進行了輕量的改造,然后給了一個新的名字叫做prefork,這個pool與多進程的進程池的區別就是這個task_pool只是存放一些運行的worker. consumer也就是消費者, 主要是從broker那里接受一些message,然后將message轉化為celery.worker.request.Request的一個實例。并且在適當的時候,會把這個請求包裝進Task中,Task就是用裝飾器app_celery.task()裝飾的函數所生成的類,所以可以在自定義的任務函數中使用這個請求參數,獲取一些關鍵的信息。此時,已經了解了task_pool和consumer。

????????接下來,這個worker具有兩套數據結構,這兩套數據結構是并行運行的,他們分別是 'ET時刻表' 、就緒隊列。

????????就緒隊列:那些 立刻就需要運行的task, 這些task到達worker的時候會被放到這個就緒隊列中等待consumer執行。

????????ETA:是那些有ETA參數,或是rate_limit參數。

????????未完,待續

附:

????????celery worker 的相關參數:

Usage: celery worker [options]Start worker instance.Examples::celery worker --app=proj -l infocelery worker -A proj -l info -Q hipri,lopricelery worker -A proj --concurrency=4celery worker -A proj --concurrency=1000 -P eventletcelery worker --autoscale=10,0Options:-A APP, --app=APP app instance to use (e.g. module.attr_name)-b BROKER, --broker=BROKERurl to broker. default is 'amqp://guest@localhost//'--loader=LOADER name of custom loader class to use.--config=CONFIG Name of the configuration module--workdir=WORKING_DIRECTORYOptional directory to change to after detaching.-C, --no-color-q, --quiet-c CONCURRENCY, --concurrency=CONCURRENCYNumber of child processes processing the queue. Thedefault is the number of CPUs available on yoursystem.-P POOL_CLS, --pool=POOL_CLSPool implementation: prefork (default), eventlet,gevent, solo or threads.--purge, --discard Purges all waiting tasks before the daemon is started.**WARNING**: This is unrecoverable, and the tasks willbe deleted from the messaging server.-l LOGLEVEL, --loglevel=LOGLEVELLogging level, choose between DEBUG, INFO, WARNING,ERROR, CRITICAL, or FATAL.-n HOSTNAME, --hostname=HOSTNAMESet custom hostname, e.g. 'w1.%h'. Expands: %h(hostname), %n (name) and %d, (domain).-B, --beat Also run the celery beat periodic task scheduler.Please note that there must only be one instance ofthis service.-s SCHEDULE_FILENAME, --schedule=SCHEDULE_FILENAMEPath to the schedule database if running with the -Boption. Defaults to celerybeat-schedule. The extension".db" may be appended to the filename. Applyoptimization profile. Supported: default, fair--scheduler=SCHEDULER_CLSScheduler class to use. Default iscelery.beat.PersistentScheduler-S STATE_DB, --statedb=STATE_DBPath to the state database. The extension '.db' may beappended to the filename. Default: None-E, --events Send events that can be captured by monitors likecelery events, celerymon, and others.--time-limit=TASK_TIME_LIMITEnables a hard time limit (in seconds int/float) fortasks.--soft-time-limit=TASK_SOFT_TIME_LIMITEnables a soft time limit (in seconds int/float) fortasks.--maxtasksperchild=MAX_TASKS_PER_CHILDMaximum number of tasks a pool worker can executebefore it's terminated and replaced by a new worker.-Q QUEUES, --queues=QUEUESList of queues to enable for this worker, separated bycomma. By default all configured queues are enabled.Example: -Q video,image-X EXCLUDE_QUEUES, --exclude-queues=EXCLUDE_QUEUES-I INCLUDE, --include=INCLUDEComma separated list of additional modules to import.Example: -I foo.tasks,bar.tasks--autoscale=AUTOSCALEEnable autoscaling by providing max_concurrency,min_concurrency. Example:: --autoscale=10,3 (alwayskeep 3 processes, but grow to 10 if necessary)--autoreload Enable autoreloading.--no-execv Don't do execv after multiprocessing child fork.--without-gossip Do not subscribe to other workers events.--without-mingle Do not synchronize with other workers at startup.--without-heartbeat Do not send event heartbeats.--heartbeat-interval=HEARTBEAT_INTERVALInterval in seconds at which to send worker heartbeat-O OPTIMIZATION-D, --detach-f LOGFILE, --logfile=LOGFILEPath to log file. If no logfile is specified, stderris used.--pidfile=PIDFILE Optional file used to store the process pid. Theprogram will not start if this file already exists andthe pid is still alive.--uid=UID User id, or user name of the user to run as afterdetaching.--gid=GID Group id, or group name of the main group to change toafter detaching.--umask=UMASK Effective umask (in octal) of the process afterdetaching. Inherits the umask of the parent processby default.--executable=EXECUTABLEExecutable to use for the detached process.--version show program's version number and exit-h, --help show this help message and exit

?

常用配置介紹

設置時區
CELERY_TIMEZONE = 'Asia/Shanghai'
啟動時區設置
CELERY_ENABLE_UTC = True


限制任務的執行頻率
下面這個就是限制tasks模塊下的add函數,每秒鐘只能執行10次
CELERY_ANNOTATIONS = {'tasks.add':{'rate_limit':'10/s'}}
或者限制所有的任務的刷新頻率
CELERY_ANNOTATIONS = {'*':{'rate_limit':'10/s'}}
也可以設置如果任務執行失敗后調用的函數

def my_on_failure(self,exc,task_id,args,kwargs,einfo):print('task failed')CELERY_ANNOTATIONS = {'*':{'on_failure':my_on_failure}}

?

并發的worker數量,也是命令行-c指定的數目
事實上并不是worker數量越多越好,保證任務不堆積,加上一些新增任務的預留就可以了
CELERYD_CONCURRENCY = 20

?

celery worker每次去redis取任務的數量,默認值就是4
CELERYD_PREFETCH_MULTIPLIER = 4

?

每個worker執行了多少次任務后就會死掉,建議數量大一些
CELERYD_MAX_TASKS_PER_CHILD = 200

?

使用redis作為任務隊列
組成:?db+scheme://user:password@host:port/dbname
BROKER_URL = 'redis://127.0.0.1:6379/0'

?

celery任務執行結果的超時時間
CELERY_TASK_RESULT_EXPIRES = 1200
單個任務的運行時間限制,否則會被殺死
CELERYD_TASK_TIME_LIMIT = 60

?

使用redis存儲任務執行結果,默認不使用
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'

?

將任務結果使用'pickle'序列化成'json'格式
任務序列化方式
CELERY_TASK_SERIALIZER = 'pickle'
任務執行結果序列化方式
CELERY_RESULT_SERIALIZER = 'json'
也可以直接在Celery對象中設置序列化方式
app = Celery('tasks', broker='...', task_serializer='yaml')


關閉限速
CELERY_DISABLE_RATE_LIMITS = True

?

一份比較常用的配置文件:

在celery4.x以后,就是BROKER_URL,如果是以前,需要寫成CELERY_BROKER_URL
BROKER_URL = 'redis://127.0.0.1:6379/0'
指定結果的接收地址
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
?

指定任務序列化方式
CELERY_TASK_SERIALIZER = 'msgpack'
指定結果序列化方式
CELERY_RESULT_SERIALIZER = 'msgpack'
指定任務接受的序列化類型.
CELERY_ACCEPT_CONTENT = ['msgpack']
?

任務過期時間,celery任務執行結果的超時時間
CELERY_TASK_RESULT_EXPIRES = 24 * 60 * 60
?

任務發送完成是否需要確認,對性能會稍有影響
CELERY_ACKS_LATE = True
?

壓縮方案選擇,可以是zlib, bzip2,默認是發送沒有壓縮的數據
CELERY_MESSAGE_COMPRESSION = 'zlib'
?

規定完成任務的時間
在5s內完成任務,否則執行該任務的worker將被殺死,任務移交給父進程
CELERYD_TASK_TIME_LIMIT = 5
?

celery worker的并發數,默認是服務器的內核數目,也是命令行-c參數指定的數目
CELERYD_CONCURRENCY = 4
?

celery worker 每次去BROKER中預取任務的數量
CELERYD_PREFETCH_MULTIPLIER = 4
?

每個worker執行了多少任務就會死掉,默認是無限的
CELERYD_MAX_TASKS_PER_CHILD = 40
?

設置默認的隊列名稱,如果一個消息不符合其他的隊列就會放在默認隊列里面,如果什么都不設置的話,數據都會發送到默認的隊列中
CELERY_DEFAULT_QUEUE = "default"
隊列的詳細設置

CELERY_QUEUES = {"default": { # 這是上面指定的默認隊列"exchange": "default","exchange_type": "direct","routing_key": "default"},"topicqueue": { # 這是一個topic隊列 凡是topictest開頭的routing key都會被放到這個隊列"routing_key": "topic.#","exchange": "topic_exchange","exchange_type": "topic",},"task_eeg": { # 設置扇形交換機"exchange": "tasks","exchange_type": "fanout","binding_key": "tasks",},

或者配置成下面兩種方式:

# 配置隊列(settings.py) CELERY_QUEUES = (Queue('default', Exchange('default'), routing_key='default'),Queue('for_task_collect', Exchange('for_task_collect'), routing_key='for_task_collect'),Queue('for_task_compute', Exchange('for_task_compute'), routing_key='for_task_compute'), ) # 路由(哪個任務放入哪個隊列) CELERY_ROUTES = {'umonitor.tasks.multiple_thread_metric_collector': {'queue': 'for_task_collect', 'routing_key': 'for_task_collect'},'compute.tasks.multiple_thread_metric_aggregate': {'queue': 'for_task_compute', 'routing_key': 'for_task_compute'},'compute.tasks.test': {'queue': 'for_task_compute','routing_key': 'for_task_compute'}, }

?

總結

以上是生活随笔為你收集整理的Celery参数详解、配置参数的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。