日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程语言 > python >内容正文

python

python分布式定时任务_Python 定时任务框架 APScheduler 详解

發(fā)布時(shí)間:2023/12/20 python 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python分布式定时任务_Python 定时任务框架 APScheduler 详解 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

APScheduler

最近想寫個(gè)任務(wù)調(diào)度程序,于是研究了下 Python 中的任務(wù)調(diào)度工具,比較有名的是:Celery,RQ,APScheduler。

Celery:非常強(qiáng)大的分布式任務(wù)調(diào)度框架

RQ:基于Redis的作業(yè)隊(duì)列工具

APScheduler:一款強(qiáng)大的任務(wù)調(diào)度工具

RQ 參考 Celery,據(jù)說(shuō)要比 Celery 輕量級(jí)。在我看來(lái) Celery 和 RQ 太重量級(jí)了,需要單獨(dú)啟動(dòng)進(jìn)程,并且依賴第三方數(shù)據(jù)庫(kù)或者緩存,適合嵌入到較大型的 python 項(xiàng)目中。其次是 Celery 和 RQ 目前的最新版本都不支持動(dòng)態(tài)的添加定時(shí)任務(wù)(celery 官方不支持,可以使用第三方的 redisbeat 或者 redbeat 實(shí)現(xiàn)),所以對(duì)于一般的項(xiàng)目推薦用 APScheduler,簡(jiǎn)單高效。

Apscheduler是一個(gè)基于Quartz的python定時(shí)任務(wù)框架,相關(guān)的 api 接口調(diào)用起來(lái)比較方便,目前其提供了基于日期、固定時(shí)間間隔以及corntab類型的任務(wù),并且可持久化任務(wù);同時(shí)它提供了多種不同的調(diào)用器,方便開(kāi)發(fā)者根據(jù)自己的需求進(jìn)行使用,也方便與數(shù)據(jù)庫(kù)等第三方的外部持久化儲(chǔ)存機(jī)制進(jìn)行協(xié)同工作,非常強(qiáng)大。

安裝

最簡(jiǎn)單的方法是使用 pip 安裝:

$ pip install apscheduler

$ python setup.py install

目前版本:3.6.3

基本概念

APScheduler 具有四種組件:

triggers(觸發(fā)器)

jobstores (job 存儲(chǔ))

executors (執(zhí)行器)

schedulers (調(diào)度器)

triggers:觸發(fā)器管理著 job 的調(diào)度方式。

jobstores: 用于 job 數(shù)據(jù)的持久化。默認(rèn) job 存儲(chǔ)在內(nèi)存中,還可以存儲(chǔ)在各種數(shù)據(jù)庫(kù)中。除了內(nèi)存方式不需要序列化之外(一個(gè)例外是使用 ProcessPoolExecutor),其余都需要 job 函數(shù)參數(shù)可序列化。另外多個(gè)調(diào)度器之間絕對(duì)不能共享 job 存儲(chǔ)(APScheduler 原作者的意思是不支持分布式,但是我們可以通過(guò)重寫部分函數(shù)實(shí)現(xiàn),具體方法后面再介紹)。

executors:負(fù)責(zé)處理 job。通常使用線程池(默認(rèn))或者進(jìn)程池來(lái)運(yùn)行 job。當(dāng) job 完成時(shí),會(huì)通知調(diào)度器并發(fā)出合適的事件。

schedulers: 將 job 與以上組件綁定在一起。通常在程序中僅運(yùn)行一個(gè)調(diào)度器,并且不直接處理 jobstores ,executors 或 triggers,而是通過(guò)調(diào)度器提供的接口,比如添加,修改和刪除 job。

選擇正確的調(diào)度器,job 存儲(chǔ),執(zhí)行器和觸發(fā)器

調(diào)度器的選擇主要取決于編程環(huán)境以及 APScheduler 的用途。主要有以下幾種跳度器:

apscheduler.schedulers.blocking.BlockingScheduler:當(dāng)調(diào)度器是程序中唯一運(yùn)行的東西時(shí)使用,阻塞式。

apscheduler.schedulers.background.BackgroundScheduler:當(dāng)調(diào)度器需要后臺(tái)運(yùn)行時(shí)使用。

apscheduler.schedulers.asyncio.AsyncIOScheduler:當(dāng)程序使用 asyncio 框架時(shí)使用。

apscheduler.schedulers.gevent.GeventScheduler:當(dāng)程序使用 gevent 框架時(shí)使用。

apscheduler.schedulers.tornado.TornadoScheduler:當(dāng)構(gòu)建 Tornado 程序時(shí)使用

apscheduler.schedulers.twisted.TwistedScheduler:當(dāng)構(gòu)建 Twisted 程序時(shí)使用

apscheduler.schedulers.qt.QtScheduler:當(dāng)構(gòu)建 Qt 程序時(shí)使用

要選擇適當(dāng)?shù)?job 存儲(chǔ),需要看 job 是否需要持久化。如果程序啟動(dòng)會(huì)重新創(chuàng)建作業(yè),則可以使用默認(rèn)的內(nèi)存方式(MemoryJobStore)。如果需要 job 在程序重新啟動(dòng)或崩潰后繼續(xù)存在,那么建議使用其他 job 存儲(chǔ)方式。系統(tǒng)內(nèi)置主要有以下幾種 job 存儲(chǔ):

apscheduler.jobstores.memory.MemoryJobStore:使用內(nèi)存存儲(chǔ)

apscheduler.jobstores.mongodb.MongoDBJobStore:使用 MongoDB 存儲(chǔ)

apscheduler.jobstores.redis.RedisJobStore:使用 redis 存儲(chǔ)

apscheduler.jobstores.rethinkdb.RethinkDBJobStore:使用 rethinkdb 存儲(chǔ)

apscheduler.jobstores.sqlalchemy.SQLAlchemyJobStore:使用 ORM 框架 SQLAlchemy,后端可以是 sqlite、mysql、PoatgreSQL 等數(shù)據(jù)庫(kù)

apscheduler.jobstores.zookeeper.ZooKeeperJobStore:使用 zookeeper 存儲(chǔ)

執(zhí)行器的選擇要根據(jù) job 的類型。默認(rèn)的線程池執(zhí)行器 apscheduler.executors.pool.ThreadPoolExecutor 可以滿足大多數(shù)情況。如果 job 屬于 CPU 密集型操作則建議使用進(jìn)程池執(zhí)行器 apscheduler.executors.pool.ProcessPoolExecutor。當(dāng)然也可以同時(shí)使用兩者,將進(jìn)程池執(zhí)行器添加為輔助執(zhí)行器。

當(dāng)添加 job 時(shí),可以選擇一個(gè)觸發(fā)器,它管理著 job 的調(diào)度方式。APScheduler 內(nèi)置三種觸發(fā)器:

apscheduler.triggers.date:在某個(gè)特定時(shí)間僅運(yùn)行一次 job 時(shí)使用

apscheduler.triggers.interval:當(dāng)以固定的時(shí)間間隔運(yùn)行 job 時(shí)使用

apscheduler.triggers.cron:當(dāng)在特定時(shí)間定期運(yùn)行 job 時(shí)使用

配置調(diào)度器

APScheduler 提供了多種不同的方式來(lái)配置調(diào)度器。

假設(shè)使用默認(rèn) job 存儲(chǔ)和默認(rèn)執(zhí)行器運(yùn)行 BackgroundScheduler:

from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()

以上創(chuàng)建了一個(gè) BackgroundScheduler 調(diào)度器,job 存儲(chǔ)使用默認(rèn)的 MemoryJobStore,執(zhí)行器使用默認(rèn)的 ThreadPoolExecutor,最大線程數(shù) 10 個(gè)。

假如想做以下設(shè)置:

一個(gè)名為 mongo 的 job 存儲(chǔ),后端使用 MongoDB

一個(gè)名為 default 的 job 存儲(chǔ),后端使用數(shù)據(jù)庫(kù)(使用 Sqlite)

一個(gè)名為 default 的線程池執(zhí)行器,最大線程數(shù) 20 個(gè)

一個(gè)名為 processpool 的進(jìn)程池執(zhí)行器,最大進(jìn)程數(shù) 5 個(gè)

調(diào)度器使用 UTC 時(shí)區(qū)

開(kāi)啟 job 合并

job 最大實(shí)例限制為 3 個(gè)

方法一:

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler

from apscheduler.jobstores.mongodb import MongoDBJobStore

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

jobstores = {

'mongo': MongoDBJobStore(),

'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')

}

executors = {

'default': ThreadPoolExecutor(20),

'processpool': ProcessPoolExecutor(5)

}

job_defaults = {

'coalesce': False,

'max_instances': 3

}

scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

方法二:

from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler({

'apscheduler.jobstores.mongo': {

'type': 'mongodb'

},

'apscheduler.jobstores.default': {

'type': 'sqlalchemy',

'url': 'sqlite:///jobs.sqlite'

},

'apscheduler.executors.default': {

'class': 'apscheduler.executors.pool:ThreadPoolExecutor',

'max_workers': '20'

},

'apscheduler.executors.processpool': {

'type': 'processpool',

'max_workers': '5'

},

'apscheduler.job_defaults.coalesce': 'false',

'apscheduler.job_defaults.max_instances': '3',

'apscheduler.timezone': 'UTC',

})

方法三:

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

from apscheduler.executors.pool import ProcessPoolExecutor

jobstores = {

'mongo': {'type': 'mongodb'},

'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')

}

executors = {

'default': {'type': 'threadpool', 'max_workers': 20},

'processpool': ProcessPoolExecutor(max_workers=5)

}

job_defaults = {

'coalesce': False,

'max_instances': 3

}

scheduler = BackgroundScheduler()

scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

啟動(dòng)調(diào)度器

只需調(diào)用 start() 即可啟動(dòng)調(diào)度器。對(duì)于 BlockingScheduler 以外的調(diào)度器,都會(huì)直接返回,返回后可以繼續(xù)其他工作,比如添加 job;對(duì)于 BlockingScheduler ,必須在完成所有初始化已經(jīng)添加好 job 后才能調(diào)用 start()。

注意:調(diào)度器啟動(dòng)后就無(wú)法更改配置了。

添加 job

兩種方式:

使用方法 add_job()

使用裝飾器 scheduled_job()

第一種是最常用方法,第二種方法適合程序運(yùn)行后不需要更改的作業(yè)。 add_job() 會(huì)返回一個(gè) apscheduler.job.Job 實(shí)例,可以用于修改或者刪除 job 等。如果添加 job 時(shí),調(diào)度器尚未啟動(dòng),則會(huì)暫停調(diào)度 job,并且僅在調(diào)度器啟動(dòng)時(shí)才計(jì)算其首次運(yùn)行時(shí)間

添加 job 時(shí)第二個(gè)參數(shù)是 trigger,正如前面所說(shuō),可以指定三種類型的觸發(fā)器:cron、interval 和 date。

cron:在特定時(shí)間定期運(yùn)行 job

兼容 unix/linux 系統(tǒng) crontab 格式,但是比其多了秒(second)、年(year)、第多少周(week)以及限定開(kāi)始時(shí)間(start_date)和結(jié)束時(shí)間(end_date)的功能,并且天(day)的設(shè)置更加靈活,支持類似 last fri 的格式,具體見(jiàn)以下的詳解。

主要參數(shù):

year(int|str) - 年,4位數(shù)

month(int|str) - 月,1-12

day(int|str) - 日,1-31

week(int|str) - 一年中的第多少周,1-53

day_of_week(int|str) - 星期,0-6 或者 mon,tue,wed,thu,fri,sat,sun

hour(int|str) - 小時(shí),0-23

minute(int|str) - 分,0-59

second(int|str) - 秒,0-59

start_date(date|datetime|str) - 開(kāi)始時(shí)間

end_date(date|datetime|str) - 結(jié)束時(shí)間

不同于 unix/linux 系統(tǒng) crond 格式,添加 job 時(shí)可以忽略不必要的字段。

大于最小有效值的字段默認(rèn)為*,而較小的字段默認(rèn)為其最小值,除了 week 和 day_of_week 默認(rèn)為 *。

可能這種表述不是太理解,舉幾個(gè)例子:

day=1, minute=20 最小有效值字段為 minute 故等價(jià)于 year='*', month='*', day=1, week='*', day_of_week='*', hour='*', minute=20, second=0,意思是在每年每月 1 號(hào)每小時(shí)的 20 分 0 秒運(yùn)行;

hour=1 最小有效值字段為 hour 故等價(jià)于 year='*', month='*', day=*, week='*', day_of_week='*', hour=1, minute=0, second=0,意思是在每年每月每天 1 點(diǎn)的 0 分 0 秒運(yùn)行;

month=6, hour=1 最小有效值字段也為 hour 故等價(jià)于 year='*', month=6, day=*, week='*', day_of_week='*', hour=1, minute=0, second=0,意思是在每年 6 月每天 0 點(diǎn) 0 分 0 秒運(yùn)行;

month=6 最小有效值字段也為 month 故等價(jià)于 year='*', month=6, day=1, week='*', day_of_week='*', hour=0, minute=0, second=0,意思是在每年 6 月 1號(hào) 0 點(diǎn) 0 分 0 秒運(yùn)行;

year=2020 最小有效值字段也為 year 故等價(jià)于 year=2020, month=1, day=1, week='*', day_of_week='*', hour=0, minute=0, second=0,意思是在 2020 年 1 月 1 號(hào) 0 點(diǎn) 0 分 0 秒運(yùn)行;

參數(shù)還支持表達(dá)式,下表列出了從 year 到 second 字段可用的表達(dá)式。一個(gè)字段中可以給出多個(gè)表達(dá)式,用 , 分隔。

序號(hào)

表達(dá)式

可用字段

描述

1

*

所有

匹配字段所有取值

2

*/a

所有

匹配字段每遞增 a 后的值, 從字段最小值開(kāi)始,包括最小值,比如小時(shí)(hour)的 */5,則匹配0,5,10,15,20

3

a/b

所有

匹配字段每遞增 b 后的值, 從字段值 a 開(kāi)始,包括 a,比如小時(shí)(hour)的 2/9,則匹配2,11,20

4

a-b

所有

匹配字段 a 到 b 之間的取值,a 必須小于 b,包括 a 與 b,比如2-5,則匹配2,3,4,5

5

a-b/c

所有

匹配 a 到 b 之間每遞增 c 后的值,包括 a,不一定包括 b,比如1-20/5,則匹配1,6,11,16

6

xth y

day

匹配 y 在當(dāng)月的第 x 次,比如 3rd fri 指當(dāng)月的第三個(gè)周五

7

last x

day

匹配 x 在當(dāng)月的最后一次,比如 last fri 指當(dāng)月的最后一個(gè)周五

8

last

day

匹配當(dāng)月的最后一天

9

x,y,z

所有

匹配以 , 分割的多個(gè)表達(dá)式的組合

例:

import datetime

from apscheduler.schedulers.background import BackgroundScheduler

def job1():

print('job1')

def job2(x, y):

print('job2', x, y)

scheduler = BackgroundScheduler()

scheduler.start()

# 每天 2 點(diǎn)運(yùn)行

scheduler.add_job(

job1,

trigger='cron',

hour=2

)

# 每天 2 點(diǎn) 30 分 5 秒運(yùn)行

scheduler.add_job(

job2,

trigger='cron',

second=5,

minute=30,

hour=2,

args=['hello', 'world']

)

# 每 10 秒運(yùn)行一次

scheduler.add_job(

job1,

trigger='cron',

second='*/10'

)

# 每天 1:00,2:00,3:00 運(yùn)行

scheduler.add_job(

job1,

trigger='cron',

hour='1-3'

)

# 在 6,7,8,11,12 月的第三個(gè)周五 的 1:00,2:00,3:00 運(yùn)行

scheduler.add_job(

job1,

trigger='cron',

month='6-8,11-12',

day='3rd fri',

hour='1-3'

)

# 在 2019-12-31 號(hào)之前的周一到周五 5 點(diǎn) 30 分運(yùn)行

scheduler.add_job(

job1,

trigger='cron',

day_of_week='mon-fri',

hour=5,

minute=30,

end_date='2019-12-31'

)

interval:以固定的時(shí)間間隔運(yùn)行 job

主要參數(shù):

weeks(int) - 表示等待時(shí)間的周數(shù)

days(int) - 表示等待時(shí)間天數(shù)

hours(int) - 表示等待時(shí)間小時(shí)數(shù)

minutes(int) - 表示等待時(shí)間分鐘數(shù)

seconds(int) - 表示等待時(shí)間秒數(shù)

start_date(date|datetime|str) - 開(kāi)始時(shí)間

end_date(date|datetime|str) - 結(jié)束時(shí)間

例:

from apscheduler.schedulers.background import BackgroundScheduler

def job():

print('job')

scheduler = BackgroundScheduler()

scheduler.start()

# 每 2 小時(shí)運(yùn)行一次

scheduler.add_job(

job,

trigger='interval',

hours=2

)

# 2019-10-01 00:00:00 到 2019-10-31 23:59:59 之間每 2 小時(shí)運(yùn)行一次

scheduler.add_job(

job,

trigger='interval',

hours=2,

start_date='2019-10-01 00:00:00',

end_date='2019-10-31 23:59:59',

)

# 每 2 天 3 小時(shí) 4 分鐘 5 秒 運(yùn)行一次

scheduler.add_job(

job,

trigger='interval',

days=2,

hours=3,

minutes=4,

seconds=5

)

date:某個(gè)特定時(shí)間僅運(yùn)行一次 job

例:

import datetime

from apscheduler.schedulers.background import BackgroundScheduler

def job():

print('job')

scheduler = BackgroundScheduler()

scheduler.start()

# 3 秒后運(yùn)行

scheduler.add_job(

job,

trigger='date',

run_date=datetime.datetime.now() + datetime.timedelta(seconds=3)

)

# 2019.11.22 00:00:00 運(yùn)行

scheduler.add_job(

job,

trigger='date',

run_date=datetime.date(2019, 11, 22),

)

# 2019.11.22 16:30:01 運(yùn)行

scheduler.add_job(

job,

trigger='date',

run_date=datetime.datetime(2019, 11, 22, 16, 30, 1),

)

# 2019.11.31 16:30:01 運(yùn)行

scheduler.add_job(

job,

trigger='date',

run_date='2019-11-31 16:30:01',

)

# 立即運(yùn)行

scheduler.add_job(

job,

trigger='date'

)

小提示:

如果想立即運(yùn)行 job ,則可以在添加 job 時(shí)省略 trigger 參數(shù);

添加 job 時(shí)的日期設(shè)置參數(shù) start_date、end_date 以及 run_date 都支持字符串格式('2019-12-31' 或者 '2019-12-31 12:01:30')、datetime.date(datetime.date(2019, 12, 31)) 或者 datetime.datetime(datetime.datetime(2019, 12, 31, 16, 30, 1));

刪除 job

當(dāng)調(diào)度器中刪除 job 時(shí),該 job 也將從其關(guān)聯(lián)的 job 存儲(chǔ)中刪除,并且將不再執(zhí)行。有兩種方法可以實(shí)現(xiàn)此目的:

通過(guò)調(diào)用方法 remove_job() ,指定 job ID 和 job 存儲(chǔ)別名

通過(guò)調(diào)用 add_job() 時(shí) 返回的 apscheduler.job.Job 實(shí)例的 remove() 方法

例:

job = scheduler.add_job(myfunc, 'interval', minutes=2)

job.remove()

或者:

scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')

scheduler.remove_job('my_job_id')

注意: 如果任務(wù)已經(jīng)調(diào)度完畢,并且之后也不會(huì)再被執(zhí)行的情況下,會(huì)被自動(dòng)刪除。

暫停和恢復(fù) job

暫停和恢復(fù) job 與 刪除 job 方法類似:

暫停:

job = scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')

job.pause()# or

scheduler.pause_job('my_job_id')

恢復(fù):

job = scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')

job.resume()# or

scheduler.resume_job('my_job_id')

獲取 job 列表

使用 get_jobs() 方法獲取一個(gè)列表,或者使用 print_jobs() 方法打印一個(gè)格式化的列表。

jobs = scheduler.get_jobs()# or

scheduler.print_jobs()

提示:可以使用 get_job(id) 獲取單個(gè) job 信息

修改 job

修改 job 依然與 刪除 job 方法類似,可以修改除 job id 以外的其他屬性。

例:

job.modify(max_instances=6, name='Alternate name')

如果想修改觸發(fā)器,可以使用 apscheduler.job.Job.reschedule 或者 apscheduler.schedulers.base.BaseScheduler.reschedule_job 。

例:

scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')

其實(shí)修改 job 也可以使用 add_job() 方法,只需要指定參數(shù) replace_existing=True 以及相同的 job_id 即可。

關(guān)閉調(diào)度器

關(guān)閉調(diào)度器方法:

scheduler.shutdown()

默認(rèn)情況下,會(huì)關(guān)閉 job 存儲(chǔ)和執(zhí)行器,并等待所有正在執(zhí)行的 job 完成。如果不想等待則可以使用以下方法關(guān)閉:

scheduler.shutdown(wait=False)

暫停/恢復(fù)調(diào)度器

暫停調(diào)度器:

scheduler.pause()

恢復(fù)調(diào)度器:

scheduler.resume()

啟動(dòng)調(diào)度器的時(shí)候可以指定 paused=True,以這種方式啟動(dòng)的調(diào)度器直接就是暫停狀態(tài)。

scheduler.start(paused=True)

限制 job 并發(fā)執(zhí)行實(shí)例數(shù)量

默認(rèn)情況下,每個(gè) job 僅允許 1 個(gè)實(shí)例同時(shí)運(yùn)行。這意味著,如果該 job 將要運(yùn)行,但是前一個(gè)實(shí)例尚未完成,則最新的 job 不會(huì)調(diào)度。可以在添加 job 時(shí)指定 max_instances 參數(shù)解除限制。

max_instances 可以在初始化調(diào)度器的時(shí)候設(shè)置一個(gè)全局默認(rèn)值,添加任務(wù)時(shí)可以再單獨(dú)指定

job 合并

當(dāng)由于某種原因?qū)е履硞€(gè) job 積攢了好幾次沒(méi)有實(shí)際運(yùn)行(比如說(shuō)系統(tǒng)掛了 5 分鐘后恢復(fù),有一個(gè)任務(wù)是每分鐘跑一次的,按道理說(shuō)這 5 分鐘內(nèi)本來(lái)是“計(jì)劃”運(yùn)行 5 次的,但實(shí)際沒(méi)有執(zhí)行),如果 coalesce 為 True,下次這個(gè) job 被 submit 給 executor 時(shí),只會(huì)執(zhí)行 1 次,也就是最后這次,如果為 False,那么會(huì)執(zhí)行 5 次(不一定,因?yàn)檫€有其他條件,看后面misfire_grace_time)。misfire_grace_time:單位為秒,假設(shè)有這么一種情況,當(dāng)某一 job 被調(diào)度時(shí)剛好線程池都被占滿,調(diào)度器會(huì)選擇將該 job 排隊(duì)不運(yùn)行,misfire_grace_time 參數(shù)則是在線程池有可用線程時(shí)會(huì)比對(duì)該 job 的應(yīng)調(diào)度時(shí)間跟當(dāng)前時(shí)間的差值,如果差值小于 misfire_grace_time 時(shí),調(diào)度器會(huì)再次調(diào)度該 job;反之該 job 的執(zhí)行狀態(tài)為 EVENTJOBMISSED 了,即錯(cuò)過(guò)運(yùn)行。

coalesce 與 misfire_grace_time 可以在初始化調(diào)度器的時(shí)候設(shè)置一個(gè)全局默認(rèn)值,添加任務(wù)時(shí)可以再單獨(dú)指定

調(diào)度器事件

調(diào)度器事件只有在某些情況下才會(huì)被觸發(fā),并且可以攜帶某些有用的信息。通過(guò) add_listener() 傳遞適當(dāng)參數(shù),可以實(shí)現(xiàn)監(jiān)聽(tīng)不同是事件,比如 job 運(yùn)行成功、運(yùn)行失敗等。具體支持的事件類型見(jiàn)官方文檔

例:

from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR

def my_listener(event):

if event.exception:

print('The job crashed :(')

else:

print('The job worked :)')

scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

擴(kuò)展 APScheduler

APScheduler 的四種組件都可以自定義擴(kuò)展:

triggers(觸發(fā)器)

jobstores (job 存儲(chǔ))

executors (執(zhí)行器)

schedulers (調(diào)度器)

具體方法參考官方文檔。

分布式 APScheduler

APScheduler 默認(rèn)是不支持分布式運(yùn)行的,詳見(jiàn)官方 FAQ。當(dāng)將其集成到 flask 或者 django 項(xiàng)目后,如果用 gunicorn 部署,gunicorn 可能會(huì)啟動(dòng)多個(gè) worker 從而導(dǎo)致 job 重復(fù)執(zhí)行。gunicorn 配置參數(shù) --preload 和 worker=1 后,只啟動(dòng)一個(gè) worker,可以適當(dāng)緩解這個(gè)問(wèn)題(這個(gè)方法有個(gè)問(wèn)題:當(dāng)自動(dòng)重啟 worker 的時(shí)候,如果這時(shí)后臺(tái)剛好有一個(gè)耗時(shí)任務(wù)正常執(zhí)行,比如需要執(zhí)行 30s,而系統(tǒng)中還有一個(gè)每秒執(zhí)行的任務(wù),這時(shí)就會(huì)丟失部分每秒執(zhí)行的任務(wù))。

那有沒(méi)有好的方法解決呢?肯定是有的,首先我們看看其基本原理:總的來(lái)說(shuō),其主要是利用 python threading Event 和 Lock 鎖來(lái)寫的。scheduler 在主循環(huán) (_main_loop)中,反復(fù)檢查是否有需要執(zhí)行的任務(wù),完成任務(wù)的檢查函數(shù)為 _process_jobs,這個(gè)函數(shù)主要有以下幾個(gè)步驟:

1、 詢問(wèn)儲(chǔ)存的每個(gè) jobStore,是否有到期要執(zhí)行的任務(wù)。

...

due_jobs = jobstore.get_due_jobs(now)

...

2、due_jobs 不為空,則計(jì)算這些 jobs 中每個(gè) job 需要運(yùn)行的時(shí)間點(diǎn),時(shí)間一到就 submit 給任務(wù)調(diào)度。

...

run_times = job._get_run_times(now)

run_times = run_times[-1:] if run_times and job.coalesce else run_times

if run_times:

try:

executor.submit_job(job, run_times)

except MaxInstancesReachedError:

...

3、在主循環(huán)中,如果不間斷地調(diào)用,而實(shí)際上沒(méi)有要執(zhí)行的 job,這會(huì)造成資源浪費(fèi)。因此在程序中,如果每次掉用 _process_jobs 后,進(jìn)行了預(yù)先判斷,判斷下一次要執(zhí)行的 job(離現(xiàn)在最近的)還要多長(zhǎng)時(shí)間,作為返回值告訴 main_loop, 這時(shí)主循環(huán)就可以去睡一覺(jué),等大約這么長(zhǎng)時(shí)間后再喚醒,執(zhí)行下一次 _process_jobs。

...

# Determine the delay until this method should be called again

if self.state == STATE_PAUSED:

wait_seconds = None

self._logger.debug('Scheduler is paused; waiting until resume() is called')

elif next_wakeup_time is None:

wait_seconds = None

self._logger.debug('No jobs; waiting until a job is added')

else:

wait_seconds = min(max(timedelta_seconds(next_wakeup_time - now), 0), TIMEOUT_MAX)

self._logger.debug('Next wakeup is due at %s (in %f seconds)', next_wakeup_time,

wait_seconds)

return wait_seconds

根據(jù)以上基本原理,其實(shí)可以發(fā)現(xiàn)重寫 _process_jobs 函數(shù)就能解決。主要思路是文件鎖,當(dāng) worker 準(zhǔn)備獲取要執(zhí)行的 job 時(shí)必須先獲取到文件鎖,獲取文件鎖后分配 job 到執(zhí)行器后,再釋放文件鎖。具體代碼如下:

from apscheduler.schedulers.background import BackgroundScheduler

from apscheduler.executors.base import MaxInstancesReachedError

from apscheduler.events import (

JobSubmissionEvent, EVENT_JOB_SUBMITTED, EVENT_JOB_MAX_INSTANCES,

)

from apscheduler.util import (

timedelta_seconds, TIMEOUT_MAX

)

from datetime import datetime, timedelta

import six

import fcntl

import os

#: constant indicating a scheduler's stopped state

STATE_STOPPED = 0

#: constant indicating a scheduler's running state (started and processing jobs)

STATE_RUNNING = 1

#: constant indicating a scheduler's paused state (started but not processing jobs)

STATE_PAUSED = 2

class DistributedBackgroundScheduler(BackgroundScheduler):

def __init__(self, *args, **kwargs):

super().__init__(*args, **kwargs)

def _process_jobs(self):

"""

Iterates through jobs in every jobstore, starts jobs that are due and figures out how long

to wait for the next round.

If the ``get_due_jobs()`` call raises an exception, a new wakeup is scheduled in at least

``jobstore_retry_interval`` seconds.

"""

if self.state == STATE_PAUSED:

self._logger.debug('pid: %s Scheduler is paused -- not processing jobs' % os.getpid())

return None

f = None

try:

f = open("scheduler.lock", "wb")

# 這里必須使用 lockf, 因?yàn)?gunicorn 的 worker 進(jìn)程都是 master 進(jìn)程 fork 出來(lái)的

# flock 會(huì)使子進(jìn)程擁有父進(jìn)程的鎖

# fcntl.flock(flock, fcntl.LOCK_EX | fcntl.LOCK_NB)

fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)

self._logger.info("pid: %s get Scheduler file lock success" % os.getpid())

except BaseException as exc:

self._logger.warning("pid: %s get Scheduler file lock error: %s" % (os.getpid(), str(exc)))

try:

if f:

f.close()

except BaseException:

pass

return None

else:

self._logger.debug('pid: %s Looking for jobs to run' % os.getpid())

now = datetime.now(self.timezone)

next_wakeup_time = None

events = []

with self._jobstores_lock:

for jobstore_alias, jobstore in six.iteritems(self._jobstores):

try:

due_jobs = jobstore.get_due_jobs(now)

except Exception as e:

# Schedule a wakeup at least in jobstore_retry_interval seconds

self._logger.warning('pid: %s Error getting due jobs from job store %r: %s',

os.getpid(), jobstore_alias, e)

retry_wakeup_time = now + timedelta(seconds=self.jobstore_retry_interval)

if not next_wakeup_time or next_wakeup_time > retry_wakeup_time:

next_wakeup_time = retry_wakeup_time

continue

for job in due_jobs:

# Look up the job's executor

try:

executor = self._lookup_executor(job.executor)

except BaseException:

self._logger.error(

'pid: %s Executor lookup ("%s") failed for job "%s" -- removing it from the '

'job store', os.getpid(), job.executor, job)

self.remove_job(job.id, jobstore_alias)

continue

run_times = job._get_run_times(now)

run_times = run_times[-1:] if run_times and job.coalesce else run_times

if run_times:

try:

executor.submit_job(job, run_times)

except MaxInstancesReachedError:

self._logger.warning(

'pid: %s Execution of job "%s" skipped: maximum number of running '

'instances reached (%d)', os.getpid(), job, job.max_instances)

event = JobSubmissionEvent(EVENT_JOB_MAX_INSTANCES, job.id,

jobstore_alias, run_times)

events.append(event)

except BaseException:

# 分配任務(wù)錯(cuò)誤后馬上釋放文件鎖,讓其他 worker 搶占

try:

fcntl.flock(f, fcntl.LOCK_UN)

f.close()

self._logger.info("pid: %s unlocked Scheduler file success" % os.getpid())

except:

pass

self._logger.exception('pid: %s Error submitting job "%s" to executor "%s"',

os.getpid(), job, job.executor)

break

else:

event = JobSubmissionEvent(EVENT_JOB_SUBMITTED, job.id, jobstore_alias,

run_times)

events.append(event)

# Update the job if it has a next execution time.

# Otherwise remove it from the job store.

job_next_run = job.trigger.get_next_fire_time(run_times[-1], now)

if job_next_run:

job._modify(next_run_time=job_next_run)

jobstore.update_job(job)

else:

self.remove_job(job.id, jobstore_alias)

# Set a new next wakeup time if there isn't one yet or

# the jobstore has an even earlier one

jobstore_next_run_time = jobstore.get_next_run_time()

if jobstore_next_run_time and (next_wakeup_time is None or

jobstore_next_run_time < next_wakeup_time):

next_wakeup_time = jobstore_next_run_time.astimezone(self.timezone)

# Dispatch collected events

for event in events:

self._dispatch_event(event)

# Determine the delay until this method should be called again

if next_wakeup_time is None:

wait_seconds = None

self._logger.debug('pid: %s No jobs; waiting until a job is added', os.getpid())

else:

wait_seconds = min(max(timedelta_seconds(next_wakeup_time - now), 0), TIMEOUT_MAX)

self._logger.debug('pid: %s Next wakeup is due at %s (in %f seconds)', os.getpid(), next_wakeup_time,

wait_seconds)

try:

fcntl.flock(f, fcntl.LOCK_UN)

f.close()

self._logger.info("pid: %s unlocked Scheduler file success" % os.getpid())

except:

pass

return wait_seconds

文件鎖只支持 unix/linux 系統(tǒng),并且只能實(shí)現(xiàn)本機(jī)的分布式。如果想實(shí)現(xiàn)多臺(tái)主機(jī)的的分布式,需要借助 redis 或者 zookeeper 實(shí)現(xiàn)分布鎖,原理和文件鎖一樣的,都是重寫 _process_jobs 函數(shù)實(shí)現(xiàn),代碼就不再贅述,有興趣的朋友可以自己研究一下。

總結(jié)

以上是生活随笔為你收集整理的python分布式定时任务_Python 定时任务框架 APScheduler 详解的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。