日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

python 定时任务框架APScheduler

發布時間:2023/12/20 python 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python 定时任务框架APScheduler 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
1、 簡介

APScheduler的全稱是Advanced Python Scheduler。它是一個輕量級的 Python 定時任務調度框架。APScheduler 支持三種調度任務:固定時間間隔,固定時間點(日期),Linux 下的 Crontab 命令。同時,它還支持異步執行、后臺執行調度任務。

2、 安裝

使用 pip 包管理工具安裝 APScheduler 是最方便快捷的。

pip install APScheduler

3 、使用步驟

APScheduler 使用起來還算是比較簡單。運行一個調度任務只需要以下三部曲。

新建一個 schedulers (調度器) 。
添加一個調度任務(job stores)。
運行調度任務。

下面是執行定時任務的簡單示例代碼

#!/usr/bin/env python # coding:utf-8import time from apscheduler.schedulers.blocking import BlockingScheduler import loggingLOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s" logging.basicConfig(filename='mypython.log', level=logging.INFO, format=LOG_FORMAT)today = time.strftime('%Y%m%d', time.localtime(time.time())) nowDateTime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))global num num = 0class myAP:def __init__(self):print("hello")def sayHello():global numnum = num + 1print(nowDateTime+' 任務一 hello word %d' % num)def sayHello11():global numnum = num + 1print(nowDateTime+' 任務二 o(* ̄︶ ̄*)o hello word %d' % num)if __name__ == "__main__":# BlockingSchedulerscheduler = BlockingScheduler()# scheduler.add_job(sayHello, 'cron', day_of_week='0-6', hour=19, minute=50)scheduler.add_job(sayHello, 'interval', seconds=2)scheduler.add_job(sayHello11, 'interval', seconds=3)scheduler.start()

執行結果如圖

4、 基礎組件

APScheduler 有四種組件,分別是:調度器(scheduler),作業存儲(job store),觸發器(trigger),執行器(executor)。

schedulers(調度器)
它是任務調度器,屬于控制器角色。它配置作業存儲器和執行器可以在調度器中完成,例如添加、修改和移除作業。

triggers(觸發器)
描述調度任務被觸發的條件。不過觸發器完全是無狀態的。

job stores(作業存儲器)
任務持久化倉庫,默認保存任務在內存中,也可將任務保存都各種數據庫中,任務中的數據序列化后保存到持久化數據庫,從數據庫加載后又反序列化。

executors(執行器)
負責處理作業的運行,它們通常通過在作業中提交指定的可調用對象到一個線程或者進城池來進行。當作業完成時,執行器將會通知調度器。

4.1 schedulers(調度器)

我個人覺得 APScheduler 非常好用的原因。它提供 7 種調度器,能夠滿足我們各種場景的需要。例如:后臺執行某個操作,異步執行操作等。調度器分別是:

BlockingScheduler : 調度器在當前進程的主線程中運行,也就是會阻塞當前線程。
BackgroundScheduler : 調度器在后臺線程中運行,不會阻塞當前線程。
AsyncIOScheduler : 結合 asyncio 模塊(一個異步框架)一起使用。
GeventScheduler : 程序中使用 gevent(高性能的Python并發框架)作為IO模型,和 GeventExecutor 配合使用。
TornadoScheduler : 程序中使用 Tornado(一個web框架)的IO模型,用 ioloop.add_timeout 完成定時喚醒。
TwistedScheduler : 配合 TwistedExecutor,用 reactor.callLater 完成定時喚醒。
QtScheduler : 你的應用是一個 Qt 應用,需使用QTimer完成定時喚醒。

4.2 triggers(觸發器)

APScheduler 有三種內建的 trigger:
1)date 觸發器
date 是最基本的一種調度,作業任務只會執行一次。它表示特定的時間點觸發。它的參數如下:

參數說明
run_date (datetime 或 str)作業的運行日期或時間
timezone (datetime.tzinfo 或 str)指定時區

date 觸發器使用示例如下:

from datetime import datetime from datetime import date from apscheduler.schedulers.background import BackgroundSchedulerdef job_func(text):print(text)scheduler = BackgroundScheduler() # 在 2017-12-13 時刻運行一次 job_func 方法 scheduler .add_job(job_func, 'date', run_date=date(2017, 12, 13), args=['text']) # 在 2017-12-13 14:00:00 時刻運行一次 job_func 方法 scheduler .add_job(job_func, 'date', run_date=datetime(2017, 12, 13, 14, 0, 0), args=['text']) # 在 2017-12-13 14:00:01 時刻運行一次 job_func 方法 scheduler .add_job(job_func, 'date', run_date='2017-12-13 14:00:01', args=['text']) scheduler.start()
interval 觸發器

固定時間間隔觸發。interval 間隔調度,參數如下:

參數說明
weeks (int)間隔幾周
days (int)間隔幾天
hours (int)間隔幾小時
minutes (int)間隔幾分鐘
seconds (int)間隔多少秒
start_date (datetime 或 str)開始日期
end_date (datetime 或 str)結束日期
timezone (datetime.tzinfo 或str)時區
cron 觸發器

在特定時間周期性地觸發,和Linux crontab格式兼容。它是功能最強大的觸發器。
我們先了解 cron 參數:

參數說明
year (int 或 str)年,4位數字
month (int 或 str)月 (范圍1-12)
day (int 或 str)日 (范圍1-31
week (int 或 str)周 (范圍1-53)
day_of_week (int 或 str)周內第幾天或者星期幾 (范圍0-6 或者mon,tue,wed,thu,fri,sat,sun)
hour (int 或 str)時 (范圍0-23)
minute (int 或 str)分 (范圍0-59)
second (int 或 str)秒 (范圍0-59)
start_date (datetime 或 str)最早開始日期(包含)
end_date (datetime 或 str)最晚結束時間(包含)
timezone (datetime.tzinfo 或str)指定時區

cron 觸發器使用示例如下:

import datetime from apscheduler.schedulers.background import BackgroundSchedulerdef job_func(text): print("當前時間:", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) scheduler = BackgroundScheduler() # 在每年 1-3、7-9 月份中的每個星期一、二中的 00:00, 01:00, 02:00 和 03:00 執行 job_func 任務 scheduler .add_job(job_func, 'cron', month='1-3,7-9',day='0, tue', hour='0-3') scheduler.start()
4.3 作業存儲(job store)

該組件是對調度任務的管理。
1)添加 job
有兩種添加方法,其中一種上述代碼用到的 add_job(), 另一種則是scheduled_job()修飾器來修飾函數。

這個兩種辦法的區別是:第一種方法返回一個 apscheduler.job.Job 的實例,可以用來改變或者移除 job。第二種方法只適用于應用運行期間不會改變的 job。

第二種添加任務方式的例子:

@scheduler.scheduled_job(job_func, 'interval', minutes=2) def job_func(text): print(datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) scheduler = BackgroundScheduler() scheduler.start()

2)移除 job
移除 job 也有兩種方法:remove_job() 和 job.remove()。
remove_job() 是根據 job 的 id 來移除,所以要在 job 創建的時候指定一個 id。
job.remove() 則是對 job 執行 remove 方法即可

scheduler.add_job(job_func, 'interval', minutes=2, id='job_one') scheduler.remove_job(job_one) job = add_job(job_func, 'interval', minutes=2, id='job_one') job.remvoe()

3)獲取 job 列表
通過 scheduler.get_jobs() 方法能夠獲取當前調度器中的所有 job 的列表

修改 job
如果你因計劃改變要對 job 進行修改,可以使用Job.modify() 或者 modify_job()方法來修改 job 的屬性。但是值得注意的是,job 的 id 是無法被修改的。

scheduler.add_job(job_func, 'interval', minutes=2, id='job_one') scheduler.start() # 將觸發時間間隔修改成 5分鐘 scheduler.modify_job('job_one', minutes=5)job = scheduler.add_job(job_func, 'interval', minutes=2) # 將觸發時間間隔修改成 5分鐘 job.modify(minutes=5)

5)關閉 job
默認情況下調度器會等待所有正在運行的作業完成后,關閉所有的調度器和作業存儲。如果你不想等待,可以將 wait 選項設置為 False。

scheduler.shutdown() scheduler.shutdown(wait=false)
4.4 執行器(executor)

執行器顧名思義是執行調度任務的模塊。最常用的 executor 有兩種:ProcessPoolExecutor 和 ThreadPoolExecutor

下面是顯式設置 job store(使用mongo存儲)和 executor 的代碼的示例。

from pymongo import MongoClient from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutordef my_job():print 'hello world' host = '127.0.0.1' port = 27017 client = MongoClient(host, port)jobstores = {'mongo': MongoDBJobStore(collection='job', database='test', client=client),'default': MemoryJobStore() } executors = {'default': ThreadPoolExecutor(10),'processpool': ProcessPoolExecutor(3) } job_defaults = {'coalesce': False,'max_instances': 3 } scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults) scheduler.add_job(my_job, 'interval', seconds=5)try:scheduler.start() except SystemExit:client.close()

總結

以上是生活随笔為你收集整理的python 定时任务框架APScheduler的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。