日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > python >内容正文

python

Python任务调度模块 – APScheduler,Flask-APScheduler实现定时任务

發(fā)布時間:2025/7/25 python 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Python任务调度模块 – APScheduler,Flask-APScheduler实现定时任务 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

1.安裝  

pip install apscheduler

  

  安裝完畢

2.?簡單任務(wù)

  首先,來個最簡單的例子,看看它的威力。

1 # coding:utf-8 2 from apscheduler.schedulers.blocking import BlockingScheduler 3 import datetime 4 5 6 def aps_test(): 7 print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), '你好' 8 9 10 scheduler = BlockingScheduler() 11 scheduler.add_job(func=aps_test, trigger='cron', second='*/5') 12 scheduler.start()

  看代碼,定義一個函數(shù),然后定義一個scheduler類型,添加一個job,然后執(zhí)行,就可以了,代碼是不是超級簡單,而且非常清晰。看看結(jié)果吧。

5秒整倍數(shù),就執(zhí)行這個函數(shù),是不是超級超級簡單?對了,apscheduler就是通俗易懂。

再寫一個帶參數(shù)的。

1 # coding:utf-8 2 from apscheduler.schedulers.blocking import BlockingScheduler 3 import datetime 4 5 6 def aps_test(x): 7 print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x 8 9 scheduler = BlockingScheduler() 10 scheduler.add_job(func=aps_test, args=('你好',), trigger='cron', second='*/5') 11 scheduler.start()

結(jié)果跟上面一樣的。

好了,上面只是給大家看的小例子,我們先從頭到位梳理一遍吧。apscheduler分為4個模塊,分別是Triggers,Job stores,Executors,Schedulers.從上面的例子我們就可以看出來了,triggers就是觸發(fā)器,上面的代碼中,用了cron,其實(shí)還有其他觸發(fā)器,看看它的源碼解釋。

The ``trigger`` argument can either be:#. the alias name of the trigger (e.g. ``date``, ``interval`` or ``cron``), in which case any extra keywordarguments to this method are passed on to the trigger's constructor#. an instance of a trigger class

看見沒有,源碼中解釋說,有date, interval, cron可供選擇,其實(shí)看字面意思也可以知道,date表示具體的一次性任務(wù),interval表示循環(huán)任務(wù),cron表示定時任務(wù),好了,分別寫個代碼看看效果最明顯。

1 # coding:utf-8 2 from apscheduler.schedulers.blocking import BlockingScheduler 3 import datetime 4 5 6 def aps_test(x): 7 print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x 8 9 scheduler = BlockingScheduler() 10 scheduler.add_job(func=aps_test, args=('定時任務(wù)',), trigger='cron', second='*/5') 11 scheduler.add_job(func=aps_test, args=('一次性任務(wù)',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12)) 12 scheduler.add_job(func=aps_test, args=('循環(huán)任務(wù)',), trigger='interval', seconds=3) 13 14 scheduler.start()

看看結(jié)果

其實(shí)應(yīng)該不用我解釋代碼,大家也可以看出結(jié)果了,非常清晰。除了一次性任務(wù),trigger是不要寫的,直接定義next_run_time就可以了,關(guān)于date這部分,官網(wǎng)沒有解釋,但是去看看源碼吧,看這行代碼。

1 def _create_trigger(self, trigger, trigger_args): 2 if isinstance(trigger, BaseTrigger): 3 return trigger 4 elif trigger is None: 5 trigger = 'date' 6 elif not isinstance(trigger, six.string_types): 7 raise TypeError('Expected a trigger instance or string, got %s instead' % trigger.__class__.__name__) 8 9 # Use the scheduler's time zone if nothing else is specified 10 trigger_args.setdefault('timezone', self.timezone) 11 12 # Instantiate the trigger class 13 return self._create_plugin_instance('trigger', trigger, trigger_args)

第4行,如果trigger為None,直接定義trigger為'date'類型。其實(shí)弄到這里,大家應(yīng)該自己拓展一下,如果實(shí)現(xiàn)web的異步任務(wù)。假設(shè)接到一個移動端任務(wù),任務(wù)完成后,發(fā)送一個推送到移動端,用date類型的trigger完成可以做的很好。

3.日志

  好了,scheduler的基本應(yīng)用,我想大家已經(jīng)會了,但這僅僅只是開始。如果代碼有意外咋辦?會阻斷整個任務(wù)嗎?如果我要計算密集型的任務(wù)咋辦?下面有個代碼,我們看看會發(fā)生什么情況。

1 # coding:utf-8 2 from apscheduler.schedulers.blocking import BlockingScheduler 3 import datetime 4 5 6 def aps_test(x): 7 print 1/0 8 print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x 9 10 scheduler = BlockingScheduler() 11 scheduler.add_job(func=aps_test, args=('定時任務(wù)',), trigger='cron', second='*/5') 12 13 scheduler.start()

還是上面代碼,但我們中間故意加了個錯誤,看看會發(fā)生什么情況。

說我們沒有l(wèi)og文件,好吧,我們添加一個log文件,看看寫的什么。

1 # coding:utf-8 2 from apscheduler.schedulers.blocking import BlockingScheduler 3 import datetime 4 import logging 5 6 logging.basicConfig(level=logging.INFO, 7 format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', 8 datefmt='%Y-%m-%d %H:%M:%S', 9 filename='log1.txt', 10 filemode='a') 11 12 13 def aps_test(x): 14 print 1/0 15 print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x 16 17 scheduler = BlockingScheduler() 18 scheduler.add_job(func=aps_test, args=('定時任務(wù)',), trigger='cron', second='*/5') 19 scheduler._logger = logging 20 scheduler.start() View Code

終于可以看到了,這時候才看到錯誤,這個是一定要注意的。

其實(shí),到這里,完全可以執(zhí)行大多數(shù)任務(wù)了,但我們?yōu)榱诵?#xff0c;安全性,再往下面看看,還有什么。

4.刪除任務(wù)

假設(shè)我們有個奇葩任務(wù),要求執(zhí)行一定階段任務(wù)以后,刪除某一個循環(huán)任務(wù),其他任務(wù)照常進(jìn)行。有如下代碼:

1 # coding:utf-8 2 from apscheduler.schedulers.blocking import BlockingScheduler 3 import datetime 4 import logging 5 6 logging.basicConfig(level=logging.INFO, 7 format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', 8 datefmt='%Y-%m-%d %H:%M:%S', 9 filename='log1.txt', 10 filemode='a') 11 12 13 def aps_test(x): 14 print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x 15 16 17 def aps_date(x): 18 scheduler.remove_job('interval_task') 19 print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x 20 21 22 scheduler = BlockingScheduler() 23 scheduler.add_job(func=aps_test, args=('定時任務(wù)',), trigger='cron', second='*/5', id='cron_task') 24 scheduler.add_job(func=aps_date, args=('一次性任務(wù),刪除循環(huán)任務(wù)',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12), id='date_task') 25 scheduler.add_job(func=aps_test, args=('循環(huán)任務(wù)',), trigger='interval', seconds=3, id='interval_task') 26 scheduler._logger = logging 27 28 scheduler.start() View Code

看看結(jié)果,

在運(yùn)行過程中,成功刪除某一個任務(wù),其實(shí)就是為每個任務(wù)定義一個id,然后remove_job這個id,是不是超級簡單,直觀?那還有什么呢?

5.停止任務(wù),恢復(fù)任務(wù)

看看官方文檔,還有pause_job, resume_job,用法跟remove_job一樣,這邊就不詳細(xì)介紹了,就寫個代碼。

1 # coding:utf-8 2 from apscheduler.schedulers.blocking import BlockingScheduler 3 import datetime 4 import logging 5 6 logging.basicConfig(level=logging.INFO, 7 format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', 8 datefmt='%Y-%m-%d %H:%M:%S', 9 filename='log1.txt', 10 filemode='a') 11 12 13 def aps_test(x): 14 print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x 15 16 17 def aps_pause(x): 18 scheduler.pause_job('interval_task') 19 print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x 20 21 22 def aps_resume(x): 23 scheduler.resume_job('interval_task') 24 print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x 25 26 scheduler = BlockingScheduler() 27 scheduler.add_job(func=aps_test, args=('定時任務(wù)',), trigger='cron', second='*/5', id='cron_task') 28 scheduler.add_job(func=aps_pause, args=('一次性任務(wù),停止循環(huán)任務(wù)',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12), id='pause_task') 29 scheduler.add_job(func=aps_resume, args=('一次性任務(wù),恢復(fù)循環(huán)任務(wù)',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=24), id='resume_task') 30 scheduler.add_job(func=aps_test, args=('循環(huán)任務(wù)',), trigger='interval', seconds=3, id='interval_task') 31 scheduler._logger = logging 32 33 scheduler.start() View Code

看看結(jié)果

?是不是很容易?好了,刪除任務(wù),停止任務(wù),恢復(fù)任務(wù)就介紹到這,下面我們看看監(jiān)聽任務(wù)。

6.意外

任何代碼都可能發(fā)生意外,關(guān)鍵是,發(fā)生意外了,如何第一時間知道,這才是公司最關(guān)心的,apscheduler已經(jīng)為我們想到了這些。

看下面的代碼,

1 # coding:utf-8 2 from apscheduler.schedulers.blocking import BlockingScheduler 3 from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR 4 import datetime 5 import logging 6 7 logging.basicConfig(level=logging.INFO, 8 format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', 9 datefmt='%Y-%m-%d %H:%M:%S', 10 filename='log1.txt', 11 filemode='a') 12 13 14 def aps_test(x): 15 print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x 16 17 18 def date_test(x): 19 print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x 20 print 1/0 21 22 23 def my_listener(event): 24 if event.exception: 25 print '任務(wù)出錯了!!!!!!' 26 else: 27 print '任務(wù)照常運(yùn)行...' 28 29 scheduler = BlockingScheduler() 30 scheduler.add_job(func=date_test, args=('一定性任務(wù),會出錯',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=15), id='date_task') 31 scheduler.add_job(func=aps_test, args=('循環(huán)任務(wù)',), trigger='interval', seconds=3, id='interval_task') 32 scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) 33 scheduler._logger = logging 34 35 scheduler.start() View Code

看看結(jié)果

是不是很直觀,在生產(chǎn)環(huán)境中,你可以把出錯信息換成發(fā)送一封郵件或者發(fā)送一個短信,這樣定時任務(wù)出錯就可以立馬就知道了。

  好了,今天就講到這,以后我們有機(jī)會再來拓展這個apscheduler,這個非常強(qiáng)大而且直觀的后臺任務(wù)庫。?

7作業(yè)運(yùn)行的控制

add_job的第二個參數(shù)是trigger,它管理著作業(yè)的調(diào)度方式。它可以為date, interval或者cron。對于不同的trigger,對應(yīng)的參數(shù)也相同。

(1). cron定時調(diào)度

year?(int|str) – 4-digit year
month?(int|str) – month (1-12)
day?(int|str) – day of the (1-31)
week?(int|str) – ISO week (1-53)
day_of_week?(int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
hour?(int|str) – hour (0-23)
minute?(int|str) – minute (0-59)
second?(int|str) – second (0-59)
start_date?(datetime|str) – earliest possible date/time to trigger on (inclusive)
end_date?(datetime|str) – latest possible date/time to trigger on (inclusive)
timezone?(datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone)
和Linux的Crontab一樣,它的值格式為:

ExpressionFieldDescription
*anyFire on every value
*/aanyFire every?a?values, starting from the minimum
a-banyFire on any value within the?a-b?range (a must be smaller than b)
a-b/canyFire every?c?values within the?a-b?range
xth?ydayFire on the?x?-th occurrence of weekday?y?within the month
last?xdayFire on the last occurrence of weekday?x?within the month
lastdayFire on the last day within the month
x,y,zanyFire on any matching expression; can combine any number of any of the above expressions

幾個例子如下:

?

1 2 3 4 5 # Schedules job_function to be run on the third Friday # of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00 sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3') # Runs from Monday to Friday at 5:30 (am) until 2014-05-30 00:00:00 sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')

?

(2). interval 間隔調(diào)度

它的參數(shù)如下:
weeks?(int) – number of weeks to wait
days?(int) – number of days to wait
hours?(int) – number of hours to wait
minutes?(int) – number of minutes to wait
seconds?(int) – number of seconds to wait
start_date?(datetime|str) – starting point for the interval calculation
end_date?(datetime|str) – latest possible date/time to trigger on
timezone?(datetime.tzinfo|str) – time zone to use for the date/time calculations
例子:

?

1 2 # Schedule job_function to be called every two hours sched.add_job(job_function, 'interval', hours=2)

?

(3). date 定時調(diào)度

最基本的一種調(diào)度,作業(yè)只會執(zhí)行一次。它的參數(shù)如下:
run_date?(datetime|str) – the date/time to run the job at
timezone?(datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already
例子:

?

1 2 3 4 # The job will be executed on November 6th, 2009 sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text']) # The job will be executed on November 6th, 2009 at 16:30:05 sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text'])

?

?7.Flask-APScheduler使用

https://www.jianshu.com/p/2628f566b31c

參考: https://www.cnblogs.com/yueerwanwan0204/p/5480870.html http://debugo.com/apscheduler/

?http://jinbitou.net/2016/12/19/2263.html

?

?

?

?

?

?

?

總結(jié)

以上是生活随笔為你收集整理的Python任务调度模块 – APScheduler,Flask-APScheduler实现定时任务的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。