太实用了!Schedule模块, Python 周期任务神器!
如果你想在Linux服務器上周期性地執行某個 Python 腳本,最出名的選擇應該是 Crontab 腳本,但是 Crontab 具有以下缺點:
1.不方便執行秒級的任務。?
2.當需要執行的定時任務有上百個的時候,Crontab的管理就會特別不方便。?
另外一個選擇是 Celery,但是 Celery 的配置比較麻煩,如果你只是需要一個輕量級的調度工具,Celery 不會是一個好選擇。
在你想要使用一個輕量級的任務調度工具,而且希望它盡量簡單、容易使用、不需要外部依賴,最好能夠容納 Crontab 的所有基本功能,那么 Schedule 模塊是你的不二之選。
使用它來調度任務可能只需要幾行代碼,感受一下:
# Python 實用寶典 import?schedule import?timedef?job():print("I'm working...")schedule.every(10).minutes.do(job)while?True:schedule.run_pending()time.sleep(1)上面的代碼表示每10分鐘執行一次 job 函數,非常簡單方便。你只需要引入 schedule 模塊,通過調用?scedule.every(時間數).時間類型.do(job)??發布周期任務。
發布后的周期任務需要用?run_pending?函數來檢測是否執行,因此需要一個?While?循環不斷地輪詢這個函數。
下面具體講講Schedule模塊的安裝和初級、進階使用方法。
1.準備
請選擇以下任一種方式輸入命令安裝依賴:
1. Windows 環境 打開 Cmd (開始-運行-CMD)。
2. MacOS 環境 打開 Terminal (command+空格輸入Terminal)。
3. 如果你用的是 VSCode編輯器 或 Pycharm,可以直接使用界面下方的Terminal.
2.基本使用
最基本的使用在文首已經提到過,下面給大家展示更多的調度任務例子:
# Python 實用寶典 import?schedule import?timedef?job():print("I'm working...")# 每十分鐘執行任務 schedule.every(10).minutes.do(job) # 每個小時執行任務 schedule.every().hour.do(job) # 每天的10:30執行任務 schedule.every().day.at("10:30").do(job) # 每個月執行任務 schedule.every().monday.do(job) # 每個星期三的13:15分執行任務 schedule.every().wednesday.at("13:15").do(job) # 每分鐘的第17秒執行任務 schedule.every().minute.at(":17").do(job)while?True:schedule.run_pending()time.sleep(1)可以看到,從月到秒的配置,上面的例子都覆蓋到了。不過如果你想只運行一次任務的話,可以這么配:
# Python 實用寶典 import?schedule import?timedef?job_that_executes_once():# 此處編寫的任務只會執行一次...return?schedule.CancelJobschedule.every().day.at('22:30').do(job_that_executes_once)while?True:schedule.run_pending()time.sleep(1)參數傳遞
如果你有參數需要傳遞給作業去執行,你只需要這么做:
# Python 實用寶典 import?scheduledef?greet(name):print('Hello', name)# do() 將額外的參數傳遞給job函數 schedule.every(2).seconds.do(greet, name='Alice') schedule.every(4).seconds.do(greet, name='Bob')獲取目前所有的作業
如果你想獲取目前所有的作業:
# Python 實用寶典 import?scheduledef?hello():print('Hello world')schedule.every().second.do(hello)all_jobs = schedule.get_jobs()取消所有作業
如果某些機制觸發了,你需要立即清除當前程序的所有作業:
# Python 實用寶典 import?scheduledef?greet(name):print('Hello {}'.format(name))schedule.every().second.do(greet)schedule.clear()標簽功能
在設置作業的時候,為了后續方便管理作業,你可以給作業打個標簽,這樣你可以通過標簽過濾獲取作業或取消作業。
# Python 實用寶典 import?scheduledef?greet(name):print('Hello {}'.format(name))# .tag 打標簽 schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend') schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend') schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer') schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')# get_jobs(標簽):可以獲取所有該標簽的任務 friends = schedule.get_jobs('friend')# 取消所有 daily-tasks 標簽的任務 schedule.clear('daily-tasks')
設定作業截止時間
如果你需要讓某個作業到某個時間截止,你可以通過這個方法:
# Python 實用寶典 import?schedule from?datetime import?datetime, timedelta, timedef?job():print('Boo')# 每個小時運行作業,18:30后停止 schedule.every(1).hours.until("18:30").do(job)# 每個小時運行作業,2030-01-01 18:33 today schedule.every(1).hours.until("2030-01-01 18:33").do(job)# 每個小時運行作業,8個小時后停止 schedule.every(1).hours.until(timedelta(hours=8)).do(job)# 每個小時運行作業,11:32:42后停止 schedule.every(1).hours.until(time(11, 33, 42)).do(job)# 每個小時運行作業,2020-5-17 11:36:20后停止 schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)截止日期之后,該作業將無法運行。
立即運行所有作業,而不管其安排如何
如果某個機制觸發了,你需要立即運行所有作業,可以調用?schedule.run_all()?:
# Python 實用寶典 import?scheduledef?job_1():print('Foo')def?job_2():print('Bar')schedule.every().monday.at("12:40").do(job_1) schedule.every().tuesday.at("16:40").do(job_2)schedule.run_all()# 立即運行所有作業,每次作業間隔10秒 schedule.run_all(delay_seconds=10)3.高級使用
裝飾器安排作業
如果你覺得設定作業這種形式太啰嗦了,也可以使用裝飾器模式:
# Python 實用寶典 from?schedule import?every, repeat, run_pending import?time# 此裝飾器效果等同于 schedule.every(10).minutes.do(job) @repeat(every(10).minutes) def?job():print("I am a scheduled job")while?True:run_pending()time.sleep(1)并行執行
默認情況下,Schedule 按順序執行所有作業。其背后的原因是,很難找到讓每個人都高興的并行執行模型。
不過你可以通過多線程的形式來運行每個作業以解決此限制:
# Python 實用寶典 import?threading import?time import?scheduledef?job1():print("I'm running on thread %s"?% threading.current_thread()) def?job2():print("I'm running on thread %s"?% threading.current_thread()) def?job3():print("I'm running on thread %s"?% threading.current_thread())def?run_threaded(job_func):job_thread = threading.Thread(target=job_func)job_thread.start()schedule.every(10).seconds.do(run_threaded, job1) schedule.every(10).seconds.do(run_threaded, job2) schedule.every(10).seconds.do(run_threaded, job3)while?True:schedule.run_pending()time.sleep(1)日志記錄
Schedule 模塊同時也支持 logging 日志記錄,這么使用:
# Python 實用寶典 import?schedule import?logginglogging.basicConfig() schedule_logger = logging.getLogger('schedule') # 日志級別為DEBUG schedule_logger.setLevel(level=logging.DEBUG)def?job():print("Hello, Logs")schedule.every().second.do(job)schedule.run_all()schedule.clear()效果如下:
DEBUG:schedule:Running *all*?1 jobs with 0s delay in between DEBUG:schedule:Running job Job(interval=1, unit=seconds, do=job, args=(), kwargs={}) Hello, Logs DEBUG:schedule:Deleting *all*?jobs異常處理
Schedule 不會自動捕捉異常,它遇到異常會直接拋出,這會導致一個嚴重的問題:后續所有的作業都會被中斷執行,因此我們需要捕捉到這些異常。
你可以手動捕捉,但是某些你預料不到的情況需要程序進行自動捕獲,加一個裝飾器就能做到了:
# Python 實用寶典 import?functoolsdef?catch_exceptions(cancel_on_failure=False):def?catch_exceptions_decorator(job_func):@functools.wraps(job_func)def?wrapper(*args, **kwargs):try:return?job_func(*args, **kwargs)except:import?tracebackprint(traceback.format_exc())if?cancel_on_failure:return?schedule.CancelJobreturn?wrapperreturn?catch_exceptions_decorator@catch_exceptions(cancel_on_failure=True) def?bad_task():return?1?/ 0schedule.every(5).minutes.do(bad_task)這樣,bad_task?在執行時遇到的任何錯誤,都會被?catch_exceptions ?捕獲,這點在保證調度任務正常運轉的時候非常關鍵。
我們的文章到此就結束啦,如果你喜歡今天的Python 實戰教程,請持續關注Python實用寶典。
這是我開發的機器人公眾號小號,目前增加了天氣查詢,955公司名單,關注時間查詢;后面還會增加圖片功能和每日送書抽獎送書活動,以及調戲功能,歡迎來體驗,捧場。
一個機器人公眾號已經上線,歡迎調戲
年度爆款文案
1).臥槽!Pdf轉Word用Python輕松搞定!
2).學Python真香!我用100行代碼做了個網站,幫人PS旅行圖片,賺個雞腿吃
3).首播過億,火爆全網,我分析了《乘風破浪的姐姐》,發現了這些秘密?
4).80行代碼!用Python做一個哆來A夢分身?
5).你必須掌握的20個python代碼,短小精悍,用處無窮?
6).30個Python奇淫技巧集?
7).我總結的80頁《菜鳥學Python精選干貨.pdf》,都是干貨?
8).再見Python!我要學Go了!2500字深度分析!
9).發現一個舔狗福利!這個Python爬蟲神器太爽了,自動下載妹子圖片
點閱讀原文,看B站我的視頻!
總結
以上是生活随笔為你收集整理的太实用了!Schedule模块, Python 周期任务神器!的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: “约见”面试官系列之常见面试题之第八十二
- 下一篇: 用Python执行SQL、Excel常见