探寻用户自定义定时任务的实践方案
導讀
工作中會遇到一些由用戶自定義定時任務的業務場景,常用的開源框架(如 XXL-Job、Quartz)設計的初衷是給開發人員使用,并不適合開放給用戶創建大量的自定義任務。本文借鑒開源框架定時任務作業的思想,結合 j.u.c 的 ScheduledExecutor,提供一種定時任務的實現方法,以解決用戶自定義定時任務場景的問題。希望對大家有所幫助。
作者:楊凱 | 網易智企資深開發工程師
用戶自定義定時任務
談到定時任務的實現,我們優先想到的是引入優秀的開源框架方案去解決,常見的開源產品上文也提到過,如Quartz、XXL-Job、ElasticJob 等,但是開源框架應用到用戶自定義任務上,存在以下需要問題或不足:
-
開源框架從任務創建到執行有一套標準方案,用戶自定義任務在何時,何地插入符合開源框架標準任務并能控制生效、停止是一個需要考慮的復雜問題。
-
開源框架(如 XXL-Job)對任務的管理和業務容器是解耦的,如果用戶要完成任務的創建、修改需要業務服務反向調用操作任務中心,這不符合任務中心設計原則。
-
開源框架設計的初衷是給程序開發者創建和控制任務。一般情況下,任務執行的策略、目的都比較明確,不像用戶自定義任務存在頻繁修改和相同業務背景多個任務定義使用同一個處理邏輯。
-
開源框架未提供用戶友好的任務配置界面。
設計用戶自定義任務組件除了要考慮上面的問題,還需要站在用戶角度思考用戶自定義任務的特點:
-
開始和結束可控
用戶自定義定時任務業務依賴性強,可以多次創建和更新任務,但不會執行,也會在任務執行期間人為停止。所以任務組件要將業務任務創建和作業任務的創建區分,只創建、加載用戶確定執行的任務。
-
執行策略和執行時間對用戶友好
程序開發者創建定時任務,執行策略(單個任務循環、單次)和執行時間是由配置的 Cron 表達式確定,但是 Cron 表達式對用戶不友好,容易配置出錯。用戶自定義定時任務在設置定時策略和執行時間時,需要提供用戶友好的配置界面,任務組件內部轉換成對應的 Cron 表達式。
-
執行時間范圍可控
完成一、二步的配置后,需要給用戶提供一個任務執行的時間范圍,在這個時間范圍內才會執行任務。 簡單的用戶自定義定時任務的界面如下:
清楚了用戶自定義定時任務的特點,定義任務模型 TaskScheduleDefine 為:
| id | 任務的唯一標識 |
| busId | 業務維度的 ID:可以根據業務背景決定是唯一還是指定 |
| taskName | 任務名稱 |
| beanName | 任務處理類實例名稱 |
| cron | cron 表達式 |
| startTime | 用戶定義的開始時間 |
| endTime | 用戶定義的結束時間 |
| isPermanent | 是否永久任務 |
| multiple | 是否允許同一時間任務任務并行執行 |
| once | 是否單次任務 |
| valid | 任務是否有效 |
定時任務執行周期
定時任務從創建到執行可分為如下階段:
-
創建:界面化的配置(如 XXL-Job),代碼配置(如 Quartz,spring-schedule)。
-
加載:任務加載到應用緩存,可以在創建時進行,但實際上任務創建和加載任務是分開的,比如當任務被修改時,實際上是有一個更新的過程的,可以把這種更新叫做任務的重載。
-
調度:判斷被加載的任務是否滿足執行條件(如果支持分布式調度要決定那一臺服務器去執行),如果滿足,開始執行。
-
執行:開源框架都會完成上面的三個步驟(調度中心或應用本身),業務開發者只用關注業務邏輯部分,做到任務調度和業務執行解耦。
本文介紹的任務組件也是基于這個思想去實現用戶自定義任務。
用戶自定義任務設計
應用啟動時,初始化任務加載線程和任務調度線程(類似于 XXL-Job 的 scheduleThread 和 ringThread)
//上傳+加載,支持本地和數據庫任務 uploadAndLoadDefinition(); //初始化調度, 調度由維護任務來處理,由調度任務來喚起相應的具體執行 internalScheduledExecutor.scheduleAtFixedRate(new SpringTaskMonitor(), 10, 45, TimeUnit.SECONDS); //定義維護 internalScheduledExecutor.scheduleAtFixedRate(new SpringTaskDefinitionMonitor(), 1, 2, TimeUnit.MINUTES);任務創建
將業務任務執行和停止與作業任務創建和失效關聯,達到用戶自定義定時任務的初衷,作業任務完全由用戶決定。
任務加載
任務加載使用 j.u.c 提供的定時任務線程池 ScheduledThreadPoolExecutor 的 scheduleAtFixedRate 方法,周期性的觸發任務的加載,保證緩存中任務的及時更新。不同的是用戶自定義任務一般都是提前創建好的,不需要不間斷的去查詢,而且可以通過開始和結束時間雙重保證任務正確觸發。
注冊任務部分邏輯:
//獲取全部任務列表defineList更新任務defineList.forEach(t -> {String key = t.getBeanName() + t.getBusId();val task = TaskDefinitions.registered(key);//沒有(并且有效),就添加if (task == null) {if (t.getValid()) {TaskDefinitions.registerTask(new ScheduleTask(t));changedList.add(t);}}//有,就替換定義else {boolean changed = task.updateDefine(t);if (changed) {changedList.add(t);}}});//打印變化的任務日志 } //ScheduleTask任務定義,updateDefine這個對象的屬性 public class ScheduleTask {private long id;private TaskScheduleDefine localScheduleDefine;private CronSequenceGenerator cronGenerator;public ScheduleTask(TaskScheduleDefine taskScheduleDefine) {this.id = taskScheduleDefine.getId();this.localScheduleDefine = taskScheduleDefine; } }任務調度
調度任務的部分邏輯:
public class SpringTaskMonitor implements Runnable {private static Date DATE_INIT = new Date();@Overridepublic void run() {ExceptionUtils.doActionLogE(this::doRun);}private void doRun() throws Throwable {val taskScheduleDefineMapper = ApplicationContextUtils.getReadyApplicationContext().getBean(TaskScheduleDefineMapper.class);val taskScheduleRecordMapper = ApplicationContextUtils.getReadyApplicationContext().getBean(TaskScheduleRecordMapper.class);TaskDefinitions.getTaskMap().values().forEach(t -> {//1.無效任務if (!t.getLocalScheduleDefine().getValid()) {return;}//2.設置了過期時間Date now = new Date();if (!t.getLocalScheduleDefine().getIsPermanent()) {Date endTime = t.getLocalScheduleDefine().getEndTime();if (null == endTime || endTime.before(now)) {TaskDefinitions.getTaskMap().remove(t.getLocalScheduleDefine().getBeanName() + t.getLocalScheduleDefine().getBusId());taskScheduleDefineMapper.updateTaskValid(t.getLocalScheduleDefine().getId(), false);return;}}val lastRecord = taskScheduleRecordMapper.getLast(t.getLocalScheduleDefine().getId());Date date = lastRecord == null ? DATE_INIT : lastRecord.getExecuteDate();boolean shouldRun = false;Date nextDate = t.cronGenerator().next(date);//首次執行且執行時間未到重置開始時間if (null != t.getLocalScheduleDefine().getStartTime() && nextDate.before(t.getLocalScheduleDefine().getStartTime())) {DATE_INIT = new Date();log.warn("任務執行時間未到設置的開始時間,重新設置系統時間{},本次任務忽略:{}", DateUtil.formatDate(DATE_INIT, "yyyy-MM-dd HH:mm:ss"), GsonUtil.toJson(t));return;}if (DateUtils.addSeconds(nextDate, 30).before(now)) {shouldRun = true;}if (shouldRun) {TaskWork localWork = (TaskWork) ApplicationContextUtils.getReadyApplicationContext().getBean(t.getLocalScheduleDefine().getBeanName());SpringTaskExecutor.getExecutorService().submit(() -> localWork.runJob(t));}});} }上述流程較清晰的還原了任務調度的一些主要邏輯。從任務調度的部分代碼中可以看出,整個調度過程異常被捕獲,出現異常不會影響下一次的調度執行,任務的 misfire 問題處理策略是:
-
任務過了用戶的設定時間不執行
-
任務未到用戶的設定時間不執行
-
任務首次執行出了異常(以數據庫執行記錄為準),以當前時間為觸發頻率立刻觸發一次執行,然后按照 Cron 頻率依次執行(類似類似于 Quartz 的默認 withMisfireHandlingInstructionFireAndProceed 模式)
-
定時任務已有執行記錄,以錯過的第一個頻率時間立刻開始執行,重做錯過的所有頻率周期后,重當下一次觸發頻率發生時間大于當前時間后,再按照正常的 Cron 頻率依次執行(類似于 Quartz的withMisfireHandlingInstructionIgnoreMisfires 模式)
另外,需要考慮的是在同一個業務場景下,用戶會創建多個任務定義,但它們執行的業務邏輯是一樣的(執行策略,執行時間等不一樣)。
任務執行
任務調度提交的任務給線程池處理,執行前后根據任務定義對任務做一些通用處理(黃色框部分),具體的執行業務邏輯交給接口 LocalWork 實現類的 execute() 方法處理。
/*** description: 輔助來完成默認的localWork方法*/ public class TaskWorkUtils {static void helpRun(TaskWork localWork, ScheduleTask scheduleTask) {//部分偽代碼如下}} //是否任務有執行過 boolean executed = false; TaskScheduleRecord record = null; Date executeDate = new Date(); try {//根據需要決定是否獲取鎖后執行(redisLock,zkLock,dbLock都可以,保證任務唯一執行)String lockName = localWork.getClass().getSimpleName() + scheduleTask.getLocalScheduleDefine().getBusId();//獲取不到鎖return//獲取到執行下面邏輯record = ExceptionUtils.doFunLogE(() -> {TaskScheduleRecord newRecord = buildRecord(scheduleTask, executeDate);newRecord.setId(taskRecordService.save(newRecord));return newRecord;});//如果不能保存成功,表示出現了數據庫異常,相應狀態不能存取,則直接返回,不再執行if (record == null) {return;}executed = true;localWork.execute(record); } catch (Throwable throwable) {log.error("執行任務時出現異常信息:{}", throwable.getMessage(), throwable);e = throwable; } finally {//釋放鎖:releaseLock()//記錄異常日志,更新任務狀態和失敗原因if (record != null) { }}if (!scheduleTask.getLocalScheduleDefine().getOnce()&&executed) {Date next = scheduleTask.cronGenerator().next(executeDate);long delay = next.getTime() - executeDate.getTime();SpringTaskExecutor.getExecutorService().schedule(() -> localWork.runJob(scheduleTask), delay, TimeUnit.MILLISECONDS);}}如果要保證任務在集群中保證唯一執行可通過分布式鎖實現,具體的key已給參考,因為沒有提供集群節點注冊的功能,負載均衡的調度只能依賴集群中節點獲取鎖的隨機性,即那個節點獲取到鎖,任務在哪個節點執行。
當任務執行出錯時(保存完執行記錄后),不影響下一次任務的執行,但會更新此次任務執行的結果和失敗原因。
任務設計小結
應用啟動時,初始化任務,開啟任務加載線程,開啟任務調度線程。任務加載線程周期性的從 DB 中獲取全部任務,并更新緩存中任務實例;調度線程負責對任務定義實例進行一系列的判斷,決定是否交給執行線程池去執行,任務加載和調用可以使用一個定時線程池。
private ScheduledExecutorService internalScheduledExecutor = new ScheduledThreadPoolExecutor(2, new ThreadFactoryBuilder().setNameFormat("task-internal-%d").build());執行任務的線程池接收到提交的任務,執行前后做統一處理,任務執行的具體業務邏輯交給具體的實現類去做。整個處理流程中,需要兩張表(任務定義表+任務執行記錄表),2 個定時線程池可完成。
總結
本文基于用戶自定義定時任務的特點,從任務創建、任務加載、任務調度、任務執行四個方面詳細的介紹了任務執行的過程,對定時任務中常見的問題和處理過程附帶了部分代碼供參考,在支持一般定時任務的同時給大家提供了一種用戶自定義定時任務的實踐方法。
總結
以上是生活随笔為你收集整理的探寻用户自定义定时任务的实践方案的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 技术干货 | 如何在 Electron
- 下一篇: 技术实践 | 如何基于 Flink 实现