日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

python apscheduler一次只有一个job_Python使用APScheduler实现定时任务过程解析

發布時間:2023/12/20 python 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python apscheduler一次只有一个job_Python使用APScheduler实现定时任务过程解析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

前言

APScheduler是基于Quartz的一個Python定時任務框架。提供了基于日期、固定時間間隔以及crontab類型的任務,并且可以持久化任務。

一、安裝APScheduler

pip install apscheduler

二、基本概念

APScheduler有四大組件:

1、觸發器 triggers :

觸發器包含調度邏輯。每個作業都有自己的觸發器,用于確定下一個任務何時運行。除了初始配置之外,觸發器是完全無狀態的。

有三種內建的trigger:

(1)date: 特定的時間點觸發

(2)interval: 固定時間間隔觸發

(3)cron: 在特定時間周期性地觸發

2、任務儲存器 job stores:用于存放任務,把任務存放在內存(為默認MemoryJobStore)或數據庫中。

3、執行器 executors: 執行器是將任務提交到線程池或進程池中運行,當任務完成時,執行器通知調度器觸發相應的事件。

4、調度器 schedulers: 把上方三個組件作為參數,通過創建調度器實例來運行

根據開發需求選擇相應的組件,下面是不同的調度器組件:

BlockingScheduler 阻塞式調度器:適用于只跑調度器的程序。

BackgroundScheduler 后臺調度器:適用于非阻塞的情況,調度器會在后臺獨立運行。

AsyncIOScheduler AsyncIO調度器,適用于應用使用AsnycIO的情況。

GeventScheduler Gevent調度器,適用于應用通過Gevent的情況。

TornadoScheduler Tornado調度器,適用于構建Tornado應用。

TwistedScheduler Twisted調度器,適用于構建Twisted應用。

QtScheduler Qt調度器,適用于構建Qt應用。

三、使用步驟

1、新建一個調度器schedulers

2、添加調度任務

3、運行調度任務

四、使用實例

1、觸發器date

特定的時間點觸發,只執行一次。參數如下:

參數

說明

run_date (datetime 或 str)

作業的運行日期或時間

timezone (datetime.tzinfo 或 str)

指定時區

使用例子:

from datetime import datetime

from datetime import date

from apscheduler.schedulers.blocking import BlockingScheduler

def job(text):

print(text)

scheduler = BlockingScheduler()

# 在 2019-8-30 運行一次 job 方法

scheduler.add_job(job, 'date', run_date=date(2019, 8, 30), args=['text1'])

# 在 2019-8-30 01:00:00 運行一次 job 方法

scheduler.add_job(job, 'date', run_date=datetime(2019, 8, 30, 1, 0, 0), args=['text2'])

# 在 2019-8-30 01:00:01 運行一次 job 方法

scheduler.add_job(job, 'date', run_date='2019-8-30 01:00:00', args=['text3'])

scheduler.start()

2、觸發器interval

固定時間間隔觸發。參數如下:

參數

說明

weeks (int)

間隔幾周

days (int)

間隔幾天

hours (int)

間隔幾小時

minutes (int)

間隔幾分鐘

seconds (int)

間隔多少秒

start_date (datetime 或 str)

開始日期

end_date (datetime 或 str)

結束日期

timezone (datetime.tzinfo 或str)

使用例子:

import time

from apscheduler.schedulers.blocking import BlockingScheduler

def job(text):

t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))

print('{} --- {}'.format(text, t))

scheduler = BlockingScheduler()

# 每隔 1分鐘 運行一次 job 方法

scheduler.add_job(job, 'interval', minutes=1, args=['job1'])

# 在 2019-08-29 22:15:00至2019-08-29 22:17:00期間,每隔1分30秒 運行一次 job 方法

scheduler.add_job(job, 'interval', minutes=1, seconds = 30, start_date='2019-08-29 22:15:00', end_date='2019-08-29 22:17:00', args=['job2'])

scheduler.start()

'''

運行結果:

job2 --- 2019-08-29 22:15:00

job1 --- 2019-08-29 22:15:46

job2 --- 2019-08-29 22:16:30

job1 --- 2019-08-29 22:16:46

job1 --- 2019-08-29 22:17:46

...余下省略...

'''

3、觸發器cron

在特定時間周期性地觸發。參數如下:

參數

說明

year (int 或 str)

年,4位數字

month (int 或 str)

月 (范圍1-12)

day (int 或 str)

日 (范圍1-31)

week (int 或 str)

周 (范圍1-53)

day_of_week (int 或 str)

周內第幾天或者星期幾 (范圍0-6 或者 mon,tue,wed,thu,fri,sat,sun)

hour (int 或 str)

時 (范圍0-23)

minute (int 或 str)

分 (范圍0-59)

second (int 或 str)

秒 (范圍0-59)

start_date (datetime 或 str)

最早開始日期(包含)

end_date (datetime 或 str)

最晚結束時間(包含)

timezone (datetime.tzinfo 或str)

指定時區

這些參數支持算數表達式,取值格式有如下:

使用例子:

import time

from apscheduler.schedulers.blocking import BlockingScheduler

def job(text):

t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))

print('{} --- {}'.format(text, t))

scheduler = BlockingScheduler()

# 在每天22點,每隔 1分鐘 運行一次 job 方法

scheduler.add_job(job, 'cron', hour=22, minute='*/1', args=['job1'])

# 在每天22和23點的25分,運行一次 job 方法

scheduler.add_job(job, 'cron', hour='22-23', minute='25', args=['job2'])

scheduler.start()

'''

運行結果:

job1 --- 2019-08-29 22:25:00

job2 --- 2019-08-29 22:25:00

job1 --- 2019-08-29 22:26:00

job1 --- 2019-08-29 22:27:00

...余下省略...

'''

4、通過裝飾器scheduled_job()添加方法

添加任務的方法有兩種:

(1)通過調用add_job()---見上面1至3代碼

(2)通過裝飾器scheduled_job():

第一種方法是最常用的方法。第二種方法主要是方便地聲明在應用程序運行時不會更改的任務。該 add_job()方法返回一個apscheduler.job.Job實例,可以使用該實例稍后修改或刪除該任務。

import time

from apscheduler.schedulers.blocking import BlockingScheduler

scheduler = BlockingScheduler()

@scheduler.scheduled_job('interval', seconds=5)

def job1():

t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))

print('job1 --- {}'.format(t))

@scheduler.scheduled_job('cron', second='*/7')

def job2():

t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))

print('job2 --- {}'.format(t))

scheduler.start()

'''

運行結果:

job2 --- 2019-08-29 22:36:35

job1 --- 2019-08-29 22:36:37

job2 --- 2019-08-29 22:36:42

job1 --- 2019-08-29 22:36:42

job1 --- 2019-08-29 22:36:47

job2 --- 2019-08-29 22:36:49

...余下省略...

'''

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。

總結

以上是生活随笔為你收集整理的python apscheduler一次只有一个job_Python使用APScheduler实现定时任务过程解析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。