轻量级定时任务框架:APScheduler
一、APScheduler簡介
APScheduler(Advanced Python Scheduler)是一個輕量級的Python定時任務調度框架(Python庫)。APScheduler有三個內置的調度系統,其中包括:
-
cron式調度(可選開始/結束時間)
-
基于間隔的執行(以偶數間隔運行作業,也可以選擇開始/結束時間)
-
一次性延遲執行任務(在指定的日期/時間內運行作業一次)
支持的后端存儲作業
APScheduler可以任意混合和匹配調度系統和作業存儲的后端,其中支持后端存儲作業包括:
-
Memory
-
SQLAlchemy
-
MongoDB
-
Redis
-
RethinkDB
-
ZooKeeper
集成的Python框架
APScheduler內繼承了幾個常見的Python框架:
-
asyncio
-
gevent
-
tornado
-
qt
二、APScheduler下載安裝
使用pip安裝:
pip?install?apscheduler pip?install?apscheduler==3.6.3如果超時或者出現別的情況,可以選擇:
#?法1使用豆瓣源下載 pip?install?-i?https://pypi.doubanio.com/simple/?apscheduler #?法2使用清華源下載 pip?install?-i?https://pypi.tuna.tsinghua.edu.cn/simple?apscheduler要是再不行,點擊該鏈接或者pypi官網下載了。下載并解壓縮,進入跟setup.py文件同級的目錄,打開cmd,使用命令進行下載:
python?setup.py?install三、APScheduler組件
APScheduler共有4種組件,分別是:
-
觸發器(trigger),觸發器中包含調度邏輯,每個作業都有自己的觸發器來決定下次運行時間。除了它們自己初始配置以外,觸發器完全是無狀態的。
-
作業存儲器(job store),存儲被調度的作業,默認的作業存儲器只是簡單地把作業保存在內存中,其他的作業存儲器則是將作業保存在數據庫中,當作業被保存在一個持久化的作業存儲器中的時候,該作業的數據會被序列化,并在加載時被反序列化,需要說明的是,作業存儲器不能共享調度器。
-
執行器(executor),處理作業的運行,通常通過在作業中提交指定的可調用對象到一個線程或者進程池來進行,當作業完成時,執行器會將通知調度器。
-
調度器(scheduler),配置作業存儲器和執行器可以在調度器中完成。例如添加、修改、移除作業,根據不同的應用場景,可以選擇不同的調度器,可選的將在下一小節展示。
各組件簡介
調度器
-
BlockingScheduler : 當調度器是你應用中唯一要運行的東西時。
-
BackgroundScheduler : 當你沒有運行任何其他框架并希望調度器在你應用的后臺執行時使用(充電樁即使用此種方式)。
-
AsyncIOScheduler : 當你的程序使用了asyncio(一個異步框架)的時候使用。
-
GeventScheduler : 當你的程序使用了gevent(高性能的Python并發框架)的時候使用。
-
TornadoScheduler : 當你的程序基于Tornado(一個web框架)的時候使用。
-
TwistedScheduler : 當你的程序使用了Twisted(一個異步框架)的時候使用
-
QtScheduler : 如果你的應用是一個Qt應用的時候可以使用。
作業存儲器
如果你的應用在每次啟動的時候都會重新創建作業,那么使用默認的作業存儲器(MemoryJobStore)即可,但是如果你需要在調度器重啟或者應用程序奔潰的情況下任然保留作業,你應該根據你的應用環境來選擇具體的作業存儲器。例如:使用Mongo或者SQLAlchemy JobStore (用于支持大多數RDBMS)
執行器
對執行器的選擇取決于你使用上面哪些框架,大多數情況下,使用默認的ThreadPoolExecutor已經能夠滿足需求。如果你的應用涉及到CPU密集型操作,你可以考慮使用ProcessPoolExecutor來使用更多的CPU核心。你也可以同時使用兩者,將ProcessPoolExecutor作為第二執行器。
觸發器
當你調度作業的時候,你需要為這個作業選擇一個觸發器,用來描述這個作業何時被觸發,APScheduler有三種內置的觸發器類型:
-
date 一次性指定日期
-
interval 在某個時間范圍內間隔多長時間執行一次
-
cron 和Linux crontab格式兼容,最為強大
四、使用
當你需要調度作業的時候,你需要為這個作業選擇一個觸發器,用來描述該作業將在何時被觸發,APScheduler有3中內置的觸發器類型:
-
新建一個調度器(scheduler)
-
添加一個調度任務(job store)
-
運行調度任務
添加任務
有兩種方式可以添加一個新的作業:
-
add_job來添加作業
-
裝飾器模式添加作業
指定時間執行任務,只執行一次
import?datetime from?apscheduler.schedulers.blocking?import?BlockingScheduler def?job2(text):print('job2',?datetime.datetime.now(),?text) scheduler?=?BlockingScheduler() scheduler.add_job(job2,?'date',?run_date=datetime.datetime(2020,?2,?25,?19,?5,?6),?args=['text'],?id='job2') scheduler.start()上例中,只在2010-2-25 19:05:06執行一次,args傳遞一個text參數。
間隔時間執行任務
下面來個簡單的例子,作業每個5秒執行一次:
import?datetime from?apscheduler.schedulers.blocking?import?BlockingSchedulerdef?job1():print('job1',?datetime.datetime.now()) scheduler?=?BlockingScheduler() scheduler.add_job(job1,?'interval',?seconds=5,?id='job1')??#?每隔5秒執行一次 scheduler.start()每天凌晨1點30分50秒執行一次
#?裝飾器的方式from?apscheduler.schedulers.blocking?import?BlockingScheduler??#?后臺運行 sc?=?BlockingScheduler() f?=?open('t1.text',?'a',?encoding='utf8')@sc.scheduled_job('cron',?day_of_week='*',?hour=1,?minute='30',?second='50') def?check_db():print(111111111111) if?__name__?==?'__main__':try:sc.start()f.write('定時任務成功執行')except?Exception?as?e:sc.shutdown()f.write('定時任務執行失敗')finally:f.close()每幾分鐘執行一次:
import?datetime from?apscheduler.schedulers.blocking?import?BlockingSchedulerdef?job1():print('job1',?datetime.datetime.now()) scheduler?=?BlockingScheduler() #?每隔2分鐘執行一次,?*/1:每隔1分鐘執行一次 scheduler.add_job(job1,?'cron',?minute="*/2",?id='job1')? scheduler.start()每小時執行一次:
import?datetime from?apscheduler.schedulers.blocking?import?BlockingSchedulerdef?job1():print('job1',?datetime.datetime.now()) scheduler?=?BlockingScheduler() #?每小時執行一次 scheduler.add_job(job1,?'interval',?hours=1,?id='job1') #?每小時執行一次,上下浮動120秒區間內 #?scheduler.add_job(job1,?'interval',?hours=1,?id='job1',?jitter=120) scheduler.start()總結
以上是生活随笔為你收集整理的轻量级定时任务框架:APScheduler的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Linux 系统审计操作行为的 5 种解
- 下一篇: 处理项目重大质量问题的思路和原则