5分钟快速掌握 Python 定时任务框架
APScheduler 簡(jiǎn)介
在實(shí)際開發(fā)中我們經(jīng)常會(huì)碰上一些重復(fù)性或周期性的任務(wù),比如像每天定時(shí)爬取某個(gè)網(wǎng)站的數(shù)據(jù)、一定周期定時(shí)運(yùn)行代碼訓(xùn)練模型等,類似這類的任務(wù)通常需要我們手動(dòng)來進(jìn)行設(shè)定或調(diào)度,以便其能夠在我們?cè)O(shè)定好的時(shí)間內(nèi)運(yùn)行。
在 Windows 上我們可以通過計(jì)劃任務(wù)來手動(dòng)實(shí)現(xiàn),而在 Linux 系統(tǒng)上往往我們會(huì)用到更多關(guān)于 crontab 的相關(guān)操作。但手動(dòng)管理并不是一個(gè)很好的選擇,如果我們需要有十幾個(gè)不同的定時(shí)任務(wù)需要管理,那么每次通過人工來進(jìn)行干預(yù)未免有些笨拙,那這時(shí)候就真的是「人工智能」了。
所以將這些定時(shí)任務(wù)的調(diào)度代碼化才是能夠讓我們很好地從這種手動(dòng)管理的純?nèi)肆Σ僮髦薪饷摮鰜怼?/p>
在 Python 生態(tài)中對(duì)于定時(shí)任務(wù)的一些操作主要有那么幾個(gè):
schedule:第三方模塊,該模塊適合比較輕量級(jí)的一些調(diào)度任務(wù),但卻不適用于復(fù)雜時(shí)間的調(diào)度
APScheduler:第三方定時(shí)任務(wù)框架,是對(duì) Java 第三方定時(shí)任務(wù)框架?Quartz?的模仿與移植,能提供比?schedule?更復(fù)雜的應(yīng)用場(chǎng)景,并且各種組件都是模塊化,易于使用與二次開發(fā)。
Celery Beat:屬于?celery?這分布式任務(wù)隊(duì)列第三方庫(kù)下的一個(gè)定時(shí)任務(wù)組件,如果使用需要配合 RabbitMQ 或 Redis 這類的消息隊(duì)列套件,需要花費(fèi)一定的時(shí)間在環(huán)境搭建上,但在高版本中已經(jīng)不支持 Windows。
所以為了滿足能夠相對(duì)復(fù)雜的時(shí)間條件,又不需要在前期的環(huán)境搭建上花費(fèi)很多時(shí)間的前提下,選擇?APScheduler?來對(duì)我們的調(diào)度任務(wù)或定時(shí)任務(wù)進(jìn)行管理是個(gè)性價(jià)比極高的選擇。而本文主要會(huì)帶你快速上手有關(guān)?APScheduler?的使用。
APScheduler 概念與組件
雖然說官方文檔上的內(nèi)容不是很多,而且所列舉的 API 不是很多,但這側(cè)面也反映了這一框架的簡(jiǎn)單易用。所以在使用?APScheduler?之前,我們需要對(duì)這個(gè)框架的一些概念簡(jiǎn)單了解,主要有那么以下幾個(gè):
-
觸發(fā)器(trigger)
-
任務(wù)持久化(job stores)
-
執(zhí)行器(executor)
-
調(diào)度器(scheduler)
觸發(fā)器(trigger)
所謂的觸發(fā)器就是用以觸發(fā)定時(shí)任務(wù)的組件,在?APScheduler中主要是指時(shí)間觸發(fā)器,并且主要有三類時(shí)間觸發(fā)器可供使用:
-
date:日期觸發(fā)器。日期觸發(fā)器主要是在某一日期時(shí)間點(diǎn)上運(yùn)行任務(wù)時(shí)調(diào)用,是?APScheduler?里面最簡(jiǎn)單的一種觸發(fā)器。所以通常也適用于一次性的任務(wù)或作業(yè)調(diào)度。
-
interval:間隔觸發(fā)器。間隔觸發(fā)器是在日期觸發(fā)器基礎(chǔ)上擴(kuò)展了對(duì)時(shí)間部分,比如時(shí)、分、秒、天、周這幾個(gè)部分的設(shè)定。是我們用以對(duì)重復(fù)性任務(wù)進(jìn)行設(shè)定或調(diào)度的一個(gè)常用調(diào)度器。設(shè)定了時(shí)間部分之后,從起始日期開始(默認(rèn)是當(dāng)前)會(huì)按照設(shè)定的時(shí)間去執(zhí)行任務(wù)。
-
cron:cron?表達(dá)式觸發(fā)器。cron?表達(dá)式觸發(fā)器就等價(jià)于我們 Linux 上的 crontab,它主要用于更復(fù)雜的日期時(shí)間進(jìn)行設(shè)定。但需要注意的是,APScheduler?不支持?6 位及以上的 cron 表達(dá)式,最多只支持到 5 位。
任務(wù)持久化(job stores)
任務(wù)持久化主要是用于將設(shè)定好的調(diào)度任務(wù)進(jìn)行存儲(chǔ),即便是程序因?yàn)橐馔馇闆r,如斷電、電腦或服務(wù)器重啟時(shí),只要重新運(yùn)行程序時(shí),APScheduler?就會(huì)根據(jù)對(duì)存儲(chǔ)好的調(diào)度任務(wù)結(jié)果進(jìn)行判斷,如果出現(xiàn)已經(jīng)過期但未執(zhí)行的情況會(huì)進(jìn)行相應(yīng)的操作。
APScheduler?為我們提供了多種持久化任務(wù)的途徑,默認(rèn)是使用memory?也就是內(nèi)存的形式,但內(nèi)存并不是持久化最好的方式。最好的方式則是通過像數(shù)據(jù)庫(kù)這樣的載體來將我們的定時(shí)任務(wù)寫入到磁盤當(dāng)中,只要磁盤沒有損壞就能將數(shù)據(jù)給恢復(fù)。
APScheduler?支持的且常用的數(shù)據(jù)庫(kù)主要有:
-
sqlalchemy?形式的數(shù)據(jù)庫(kù),這里就主要是指各種傳統(tǒng)的關(guān)系型數(shù)據(jù)庫(kù),如 MySQL、PostgreSQL、SQLite 等。
-
mongodb?非結(jié)構(gòu)化的 Mongodb 數(shù)據(jù)庫(kù),該類型數(shù)據(jù)庫(kù)經(jīng)常用于對(duì)非結(jié)構(gòu)化或版結(jié)構(gòu)化數(shù)據(jù)的存儲(chǔ)或操作,如 JSON。
-
redis?內(nèi)存數(shù)據(jù)庫(kù),通常用作數(shù)據(jù)緩存來使用,當(dāng)然通過一些主從復(fù)制等方式也能實(shí)現(xiàn)當(dāng)中數(shù)據(jù)的持久化或保存。
通常我們可以在創(chuàng)建?Scheduler?實(shí)例時(shí)創(chuàng)建,或是單獨(dú)為任務(wù)指定。配置的方式相對(duì)簡(jiǎn)單,我們只需要指定對(duì)應(yīng)的數(shù)據(jù)庫(kù)鏈接即可。
執(zhí)行器(executor)
執(zhí)行器顧名思義就是執(zhí)行我們?nèi)蝿?wù)的對(duì)象,在計(jì)算機(jī)內(nèi)通常要么是 CPU 調(diào)度任務(wù),要么是單獨(dú)維護(hù)一個(gè)線程來運(yùn)行任務(wù)。所以?APScheduler?里的執(zhí)行器通常就是?ThreadPoolExecutor?或?ProcessPoolExecutor?這樣的線程池和進(jìn)程池兩種。
當(dāng)然如果是和協(xié)程或異步相關(guān)的任務(wù)調(diào)度,還可以使用對(duì)應(yīng)的?AsyncIOExecutor、TwistedExecutor?和?GeventExecutor?三種執(zhí)行器。
調(diào)度器(scheduler)
調(diào)度器的選擇主要取決于你當(dāng)前的程序環(huán)境以及?APScheduler的用途。根據(jù)用途的不同,APScheduler?又提供了以下幾種調(diào)度器:
-
BlockingScheduler:阻塞調(diào)度器,當(dāng)程序中沒有任何存在主進(jìn)程之中運(yùn)行東西時(shí),就則使用該調(diào)度器。
-
BackgroundScheduler:后臺(tái)調(diào)度器,在不使用后面任何的調(diào)度器且希望在應(yīng)用程序內(nèi)部運(yùn)行時(shí)的后臺(tái)啟動(dòng)時(shí)才進(jìn)行使用,如當(dāng)前你已經(jīng)開啟了一個(gè) Django 或 Flask 服務(wù)。
-
AsyncIOScheduler:AsyncIO?調(diào)度器,如果代碼是通過?asyncio?模塊進(jìn)行異步操作,使用該調(diào)度器。
-
GeventScheduler:Gevent?調(diào)度器,如果代碼是通過?gevent?模塊進(jìn)行協(xié)程操作,使用該調(diào)度器
-
TornadoScheduler:Tornado?調(diào)度器,在?Tornado?框架中使用
-
TwistedScheduler:Twisted?調(diào)度器,在基于?Twisted的框架或應(yīng)用程序中使用
-
QtScheduler:Qt?調(diào)度器,在構(gòu)建?Qt?應(yīng)用中進(jìn)行使用。
通常情況下如果不是和 Web 項(xiàng)目或應(yīng)用集成共存,那么往往都首選?BlockingScheduler?調(diào)度器來進(jìn)行操作,它會(huì)在當(dāng)前進(jìn)程中啟動(dòng)相應(yīng)的線程來進(jìn)行任務(wù)調(diào)度與處理;反之,如果是和 Web 項(xiàng)目或應(yīng)用共存,那么需要選擇?BackgroundScheduler?調(diào)度器,因?yàn)樗粫?huì)干擾當(dāng)前應(yīng)用的線程或進(jìn)程狀況。
基于對(duì)以上的概念和組件認(rèn)識(shí),我們就能基本上摸清?APScheduler?的運(yùn)行流程:
設(shè)定調(diào)度器(scheduler)用以對(duì)任務(wù)的調(diào)度與安排進(jìn)行全局統(tǒng)籌
對(duì)相應(yīng)的函數(shù)或方法上設(shè)定相應(yīng)的觸發(fā)器(trigger),并添加到調(diào)度器中
如有任務(wù)持久化(job stores)需要?jiǎng)t需要設(shè)定對(duì)應(yīng)的持久化層,否則默認(rèn)使用內(nèi)存存儲(chǔ)任務(wù)
當(dāng)觸發(fā)器被觸發(fā)時(shí),就將任務(wù)交由執(zhí)行器(executor)進(jìn)行執(zhí)行
APScheduler 快速上手
雖然?APScheduler?里面的概念和組件看起來有點(diǎn)多,但在使用上并不算很復(fù)雜,我們可以通過本節(jié)的示例就能夠很快使用。
選擇對(duì)應(yīng)的 scheduler
在使用之前我們需要先實(shí)例化一個(gè)?scheduler?對(duì)象,所有的?scheduler?對(duì)象都被放在了?apscheduler.schedulers?模塊下,我們可以直接通過查看 API 文檔或者借助 IDE 補(bǔ)全的提示來獲取相應(yīng)的?scheduler?對(duì)象。
這里我直接選取了最基礎(chǔ)的?BlockingScheduler:
#?main.pyfrom?apscheduler.schedulers.blocking?import?BlockingSchedulerscheduler?=?BlockingScheduler()配置 scheduler
對(duì)于?scheduler?的一些配置我們可以直接在實(shí)例化對(duì)象時(shí)就進(jìn)行配置,當(dāng)然也可以在創(chuàng)建實(shí)例化對(duì)象之后再進(jìn)行配置。
實(shí)例化時(shí)進(jìn)行參數(shù)配置:
#?main.py from?datetime?import?datetimefrom?apscheduler.executors.pool?import?ThreadPoolExecutor from?apscheduler.jobstores.sqlalchemy?import?SQLAlchemyJobStore from?apscheduler.schedulers.blocking?import?BlockingScheduler#?任務(wù)持久化?使用?SQLite jobstores?=?{'default':?SQLAlchemyJobStore(url?=?'sqlite:///jobs.db') } #?執(zhí)行器配置 executors?=?{'default':?ThreadPoolExecutor(20), } #?關(guān)于?Job?的相關(guān)配置,見官方文檔?API job_defaults?=?{'coalesce':?False,'next_run_time':?datetime.now() } scheduler?=?BlockingScheduler(jobstores?=?jobstores,executors?=?executors,job_defaults?=?job_defaults,timezone?=?'Asia/Shanghai' )或是通過?scheduler.configure?方法進(jìn)行同樣的操作:
scheduler?=?BlockingScheduler() scheduler.configure(jobstores=jobstores,?executors=executors,?job_defaults=job_defaults,?timezone='Asia/Shanghai')添加并執(zhí)行你的任務(wù)
創(chuàng)建?scheduler?對(duì)象之后,我們需要調(diào)用其下的?add_job()或是?scheduled_job()?方法來將我們需要執(zhí)行的函數(shù)進(jìn)行注冊(cè)。前者是以傳參的形式指定對(duì)應(yīng)的函數(shù)名,而后者則是以裝飾器的形式直接對(duì)我們要執(zhí)行的函數(shù)進(jìn)行修飾。
比如我現(xiàn)在有一個(gè)輸出此時(shí)此刻時(shí)間的函數(shù)?now():
from?datetime?import?datetimedef?now(trigger):print(f"trigger:{trigger}?->?{datetime.now()}")然后我打算每 5 秒的時(shí)候運(yùn)行一次,那我們使用?add_job()?可以這樣寫:
if?__name__?==?'__main__':scheduler.add_job(now,?trigger?=?"interval",?args?=?("interval",),?seconds?=?5)scheduler.start()在調(diào)用?start()?方法之后調(diào)度器就會(huì)開始執(zhí)行,并在控制臺(tái)上看到對(duì)應(yīng)的結(jié)果了:
trigger:interval?->?2021-01-16?21:19:43.356674 trigger:interval?->?2021-01-16?21:19:46.679849 trigger:interval?->?2021-01-16?21:19:48.356595當(dāng)然使用?@scheduled_job?的方式來裝飾我們的任務(wù)或許會(huì)更加自由一些,于是上面的例子就可以寫成這樣:
@scheduler.scheduled_job(trigger?=?"interval",?args?=?("interval",),?seconds?=?5) def?now(trigger):print(f"trigger:{trigger}?->?{datetime.now()}")if?__name__?==?'__main__':scheduler.start()運(yùn)行之后就會(huì)在控制臺(tái)看到同樣的結(jié)果了。
不過需要注意的是,添加任務(wù)一定要在?start()?方法執(zhí)行前調(diào)用,否則會(huì)找不到任務(wù)或是拋出異常。
將 APScheduler 集成到 Web 項(xiàng)目中
如果你是正在做有關(guān)的 Web 項(xiàng)目且存在一些定時(shí)任務(wù),那么得益于?APScheduler?由于多樣的調(diào)度器,我們能夠?qū)⑵浜臀覀兊捻?xiàng)目結(jié)合到一起。
如果你正在使用?Flask,那么?Flask-APScheduler?這一別人寫好的第三方包裝庫(kù)就很適合你,雖然它沒有相關(guān)的文檔,但只要你了解了前面我所介紹的有關(guān)于?APScheduler?的概念和組件,你就能很輕易地看懂這個(gè)第三方庫(kù)倉(cāng)庫(kù)里的示例代碼。
如果你使用的不是 Flask 框架,那么?APScheduler?本身也提供了一些對(duì)任務(wù)或作業(yè)的增刪改查操作,我們可以自己編寫一套合適的 API。
這里我使用的是?FastAPI?這一目前流行的 Web 框架。demo 項(xiàng)目結(jié)構(gòu)如下:
temp-scheduler ├──?config.py???????#?配置項(xiàng) ├──?main.py?????????#?API?文件 └──?scheduler.py????#?APScheduler?相關(guān)設(shè)置安裝依賴
這里我們需要的依賴不多,只需要簡(jiǎn)單幾個(gè)即可:
pip?install?fastapi?apscheduler?sqlalchemy?uvicorn配置項(xiàng)
如果項(xiàng)目中模塊過多,那么使用一個(gè)文件或模塊來進(jìn)行統(tǒng)一管理是最好的選擇。這里的?config.py?我們主要像 Flask 的配置那樣簡(jiǎn)單設(shè)定:
from?apscheduler.executors.pool?import?ThreadPoolExecutor from?apscheduler.jobstores.sqlalchemy?import?SQLAlchemyJobStore from?apscheduler.schedulers.blocking?import?BlockingSchedulerclass?SchedulerConfig:JOBSTORES?=?{"default":?SQLAlchemyJobStore(url="sqlite:///job.db")}EXECUTORS?=?{"default":?ThreadPoolExecutor(20)}JOB_DEFAULTS?=?{"coalesce":?False}@classmethoddef?to_dict(cls):return?{"jobstores":?cls.JOBSTORES,"executors":?cls.EXECUTORS,"job_defaults":?cls.JOB_DEFAULTS,}在?SchedulerConfig?配置項(xiàng)中我們可以自己實(shí)現(xiàn)一個(gè)?to_dict()?類方法,以便我們后續(xù)傳參時(shí)通過解包的方式直接傳入配置參數(shù)即可。
Scheduler 相關(guān)設(shè)置
scheduler.py?模塊的設(shè)定也比較簡(jiǎn)單,即設(shè)定對(duì)應(yīng)的?scheduler?調(diào)度器即可。由于是演示 demo 我還將要定期執(zhí)行的任務(wù)也放在了這個(gè)模塊當(dāng)中:
import?logging from?datetime?import?datetimefrom?apscheduler.schedulers.background?import?BackgroundSchedulerfrom?config?import?SchedulerConfigscheduler?=?BackgroundScheduler() logger?=?logging.getLogger(__name__)def?init_scheduler()?->?None:#?config?schedulerscheduler.configure(**SchedulerConfig.to_dict())logger.info("scheduler?is?running...")#?schedule?testscheduler.add_job(func=mytask,trigger="date",args=("APScheduler?Initialize.",),next_run_time=datetime.now(),)scheduler.start()def?mytask(message:?str)?->?None:print(f"[{datetime.now()}]?message:?{message}")在這一部分中:
-
init_scheduler()?方法主要用于在 API 服務(wù)啟動(dòng)時(shí)被調(diào)用,然后對(duì)?scheduler?對(duì)象的配置以及測(cè)試
-
mytask()?則是我們要定期執(zhí)行的任務(wù),后續(xù)我們可以通過 APScheduler 提供的方法來自行添加任務(wù)
API 設(shè)置
在?main.py?模塊就主要存放著我們由 FastAPI 所構(gòu)建的相關(guān) API。如果在后續(xù)開發(fā)時(shí)存在多個(gè)接口,此時(shí)就需要將不同接口放在不同模塊文件中,以達(dá)到路由的分發(fā)與管理,類似于 Flask 的藍(lán)圖模式。
import?logging import?uuid from?datetime?import?datetime from?typing?import?Any,?Dict,?Optional,?Sequence,?Unionfrom?fastapi?import?FastAPI from?pydantic?import?BaseModelfrom?scheduler?import?init_scheduler,?mytask,?schedulerlogger?=?logging.getLogger(__name__)app?=?FastAPI(title="APScheduler?API") app.add_event_handler("startup",?init_scheduler)class?Job(BaseModel):id:?Union[int,?str,?uuid.UUID]name:?Optional[str]?=?Nonefunc:?Optional[str]?=?Noneargs:?Optional[Sequence[Optional[str]]]?=?Nonekwargs:?Optional[Dict[str,?Any]]?=?Noneexecutor:?Optional[str]?=?Nonemisfire_grace_time:?Optional[str]?=?Nonecoalesce:?Optional[bool]?=?Nonemax_instances:?Optional[int]?=?Nonenext_run_time:?Optional[Union[str,?datetime]]?=?None@app.post("/add") def?add_job(message:?str,trigger:?str,trigger_args:?Optional[dict],id:?Union[str,?int,?uuid.UUID], ):try:scheduler.add_job(func=mytask,trigger=trigger,kwargs={"message":?message},id=id,**trigger_args,)except?Exception?as?e:logger.exception(e.args)return?{"status_code":?0,?"message":?"添加失敗"}return?{"status_code":?1,?"message":?"添加成功"}@app.delete("/delete/{id}") def?delete_job(id:?Union[str,?int,?uuid.UUID]):"""delete?exist?job?by?id"""try:scheduler.remove_job(job_id=id)except?Exception:return?dict(message="刪除失敗",status_code=0,)return?dict(message="刪除成功",status_code=1,)@app.put("/reschedule/{id}") def?reschedule_job(id:?Union[str,?int,?uuid.UUID],?trigger:?str,?trigger_args:?Optional[dict] ):try:scheduler.reschedule_job(job_id=id,?trigger=trigger,?**trigger_args)except?Exception?as?e:logger.exception(e.args)return?dict(message="修改失敗",status_code=0,)return?dict(message="修改成功",status_code=1,)@app.get("/job") def?get_all_jobs():jobs?=?Nonetry:job_list?=?scheduler.get_jobs()if?job_list:jobs?=?[Job(**task.__getstate__())?for?task?in?job_list]except?Exception?as?e:logger.exception(e.args)return?dict(message="查詢失敗",status_code=0,jobs=jobs,)return?dict(message="查詢成功",status_code=1,jobs=jobs,)@app.get("/job/{id}") def?get_job_by_id(id:?Union[int,?str,?uuid.UUID]):jobs?=?[]try:job?=?scheduler.get_job(job_id=id)if?job:jobs?=?[Job(**job.__getstate__())]except?Exception?as?e:logger.exception(e.args)return?dict(message="查詢失敗",status_code=0,jobs=jobs,)return?dict(message="查詢成功",status_code=1,jobs=jobs,)以上代碼看起來很多,其實(shí)核心的就那么幾點(diǎn):
FastAPI 對(duì)象?app?的初始化。這里用到的?add_event_handler()?方法就有點(diǎn)像 Flask 中的?before_first_request,會(huì)在 Web 服務(wù)請(qǐng)求伊始進(jìn)行操作,理解為初始化相關(guān)的操作即可。
API 接口路由。路由通過?app?對(duì)象下的對(duì)應(yīng) HTTP 方法來實(shí)現(xiàn),如?GET、POST、PUT?等。這里的裝飾器用法其實(shí)也和 Flask 很類似,就不多贅述。
scheduler?對(duì)象的增刪改查。從?scheduler.py?模塊中引入我們創(chuàng)建好的?scheduler?對(duì)象之后就可以直接用來做增刪改查的操作:
增:使用?add_job()?方法,其主要的參數(shù)是要運(yùn)行的函數(shù)(或方法)、觸發(fā)器以及觸發(fā)器參數(shù)等
刪:使用?delete_job()?方法,我們需要傳入一個(gè)對(duì)應(yīng)任務(wù)的?id?參數(shù),用以能夠查找到對(duì)應(yīng)的任務(wù)
改:使用?reschedule_job()?方法,這里也需要一個(gè)對(duì)應(yīng)任務(wù)的?id?參數(shù),以及需要重新修改的觸發(fā)器及其參數(shù)
查:使用?get_jobs()?和?get_job()?兩個(gè)方法,前者是直接獲取到當(dāng)前調(diào)度的所有任務(wù),返回的是一個(gè)包含了?APScheduler.job.Job?對(duì)象的列表,而后者是通過?id?參數(shù)來查找對(duì)應(yīng)的任務(wù)對(duì)象;這里我通過底層源碼使用?__getstate__()?來獲取到任務(wù)的相關(guān)信息,這些信息我們通過事先設(shè)定好的?Job?對(duì)象來對(duì)其進(jìn)行序列化,最后將信息從接口中返回。
總結(jié)
以上是生活随笔為你收集整理的5分钟快速掌握 Python 定时任务框架的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 处理项目重大质量问题的思路和原则
- 下一篇: 利用pyinstaller打包Pytho