javascript
Springboot定时任务原理及如何动态创建定时任务
一、前言
上周工作遇到了一個(gè)需求,同步多個(gè)省份銷號(hào)數(shù)據(jù),解綁微信粉絲。分省定時(shí)將銷號(hào)數(shù)據(jù)放到SFTP服務(wù)器上,我需要開(kāi)發(fā)定時(shí)任務(wù)去解析文件。因?yàn)槭嵌嗍》?#xff0c;服務(wù)器、文件名規(guī)則、數(shù)據(jù)規(guī)則都不一定,所以要做成可配置是有一定難度的。數(shù)據(jù)規(guī)則這塊必須強(qiáng)烈要求統(tǒng)一,服務(wù)器、文件名規(guī)則都可以從配置中心去讀。每新增一個(gè)省份的配置,后臺(tái)感知到后,動(dòng)態(tài)生成定時(shí)任務(wù)。
二、Springboot引入定時(shí)任務(wù)核心配置
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Import(SchedulingConfiguration.class) @Documented public @interface EnableScheduling {}@Configuration @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class SchedulingConfiguration {@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {return new ScheduledAnnotationBeanPostProcessor();}}接下來(lái)主要看一下這個(gè)核心后置處理器:ScheduledAnnotationBeanPostProcessor 。
@Override public Object postProcessAfterInitialization(Object bean, String beanName) {if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||bean instanceof ScheduledExecutorService) {// Ignore AOP infrastructure such as scoped proxies.return bean;}Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean); if (!this.nonAnnotatedClasses.contains(targetClass)) { Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> { Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations( method, Scheduled.class, Schedules.class); return (!scheduledMethods.isEmpty() ? scheduledMethods : null); }); if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(targetClass); if (logger.isTraceEnabled()) { logger.trace("No @Scheduled annotations found on bean class: " + targetClass); } } else { // Non-empty set of methods annotatedMethods.forEach((method, scheduledMethods) -> scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean))); if (logger.isTraceEnabled()) { logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName + "': " + annotatedMethods); } } } return bean; }1、處理Scheduled注解,通過(guò)ScheduledTaskRegistrar注冊(cè)定時(shí)任務(wù)。
private void finishRegistration() {if (this.scheduler != null) {this.registrar.setScheduler(this.scheduler);}if (this.beanFactory instanceof ListableBeanFactory) {Map<String, SchedulingConfigurer> beans =((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());AnnotationAwareOrderComparator.sort(configurers);for (SchedulingConfigurer configurer : configurers) {configurer.configureTasks(this.registrar);}}if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");try {// Search for TaskScheduler bean...this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));}catch (NoUniqueBeanDefinitionException ex) {logger.trace("Could not find unique TaskScheduler bean", ex);try {this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));}catch (NoSuchBeanDefinitionException ex2) {if (logger.isInfoEnabled()) {logger.info("More than one TaskScheduler bean exists within the context, and " +"none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +"(possibly as an alias); or implement the SchedulingConfigurer interface and call " +"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +ex.getBeanNamesFound());}}}catch (NoSuchBeanDefinitionException ex) {logger.trace("Could not find default TaskScheduler bean", ex);// Search for ScheduledExecutorService bean next...try {this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));}catch (NoUniqueBeanDefinitionException ex2) {logger.trace("Could not find unique ScheduledExecutorService bean", ex2);try {this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));}catch (NoSuchBeanDefinitionException ex3) {if (logger.isInfoEnabled()) {logger.info("More than one ScheduledExecutorService bean exists within the context, and " +"none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +"(possibly as an alias); or implement the SchedulingConfigurer interface and call " +"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +ex2.getBeanNamesFound());}}}catch (NoSuchBeanDefinitionException ex2) {logger.trace("Could not find default ScheduledExecutorService bean", ex2);// Giving up -> falling back to default scheduler within the registrar...logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");}}}this.registrar.afterPropertiesSet(); }1、通過(guò)一系列的SchedulingConfigurer動(dòng)態(tài)配置ScheduledTaskRegistrar。
2、向ScheduledTaskRegistrar注冊(cè)一個(gè)TaskScheduler(用于對(duì)Runnable的任務(wù)進(jìn)行調(diào)度,它包含有多種觸發(fā)規(guī)則)。
3、registrar.afterPropertiesSet(),在這開(kāi)始安排所有的定時(shí)任務(wù)開(kāi)始執(zhí)行了。
protected void scheduleTasks() {if (this.taskScheduler == null) {this.localExecutor = Executors.newSingleThreadScheduledExecutor();this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);}if (this.triggerTasks != null) {for (TriggerTask task : this.triggerTasks) {addScheduledTask(scheduleTriggerTask(task));}}if (this.cronTasks != null) {for (CronTask task : this.cronTasks) {addScheduledTask(scheduleCronTask(task));}}if (this.fixedRateTasks != null) {for (IntervalTask task : this.fixedRateTasks) {addScheduledTask(scheduleFixedRateTask(task));}}if (this.fixedDelayTasks != null) {for (IntervalTask task : this.fixedDelayTasks) {addScheduledTask(scheduleFixedDelayTask(task));}} }1、TriggerTask:動(dòng)態(tài)定時(shí)任務(wù)。通過(guò)Trigger#nextExecutionTime?給定的觸發(fā)上下文確定下一個(gè)執(zhí)行時(shí)間。
2、CronTask:動(dòng)態(tài)定時(shí)任務(wù),TriggerTask子類。通過(guò)cron表達(dá)式確定的時(shí)間觸發(fā)下一個(gè)任務(wù)執(zhí)行。
3、IntervalTask:一定時(shí)間延遲之后,周期性執(zhí)行的任務(wù)。
4、taskScheduler 如果為空,默認(rèn)是ConcurrentTaskScheduler,并使用默認(rèn)單線程的ScheduledExecutor。
三、主要看一下CronTask工作原理
ScheduledTaskRegistrar.java @Nullable public ScheduledTask scheduleCronTask(CronTask task) {ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);boolean newTask = false;if (scheduledTask == null) {scheduledTask = new ScheduledTask(task);newTask = true;}if (this.taskScheduler != null) {scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());}else {addCronTask(task);this.unresolvedTasks.put(task, scheduledTask);}return (newTask ? scheduledTask : null); }ConcurrentTaskScheduler.java @Override @Nullable public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {try {if (this.enterpriseConcurrentScheduler) {return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger);}else {ErrorHandler errorHandler =(this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));return new ReschedulingRunnable(task, trigger, this.scheduledExecutor, errorHandler).schedule();}}catch (RejectedExecutionException ex) {throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);} }ReschedulingRunnable.java @Nullable public ScheduledFuture<?> schedule() {synchronized (this.triggerContextMonitor) {this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);if (this.scheduledExecutionTime == null) {return null;}long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis();this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);return this;} }private ScheduledFuture<?> obtainCurrentFuture() {Assert.state(this.currentFuture != null, "No scheduled future");return this.currentFuture; }@Override public void run() {Date actualExecutionTime = new Date();super.run();Date completionTime = new Date();synchronized (this.triggerContextMonitor) {Assert.state(this.scheduledExecutionTime != null, "No scheduled execution");this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);if (!obtainCurrentFuture().isCancelled()) {schedule();}} }1、最終將task和trigger都封裝到了ReschedulingRunnable中。
2、ReschedulingRunnable實(shí)現(xiàn)了任務(wù)重復(fù)調(diào)度(schedule方法中調(diào)用調(diào)度器executor并傳入自身對(duì)象,executor會(huì)調(diào)用run方法,run方法又調(diào)用了schedule方法)。
3、ReschedulingRunnable schedule方法加了同步鎖,只能有一個(gè)線程拿到下次執(zhí)行時(shí)間并加入執(zhí)行器的調(diào)度。
4、不同的ReschedulingRunnable對(duì)象之間在線程池夠用的情況下是不會(huì)相互影響的,也就是說(shuō)滿足線程池的條件下,TaskScheduler的schedule方法的多次調(diào)用是可以交叉執(zhí)行的。
ScheduledThreadPoolExecutor.java public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();RunnableScheduledFuture<?> t = decorateTask(command,new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit)));delayedExecute(t);return t; }private void delayedExecute(RunnableScheduledFuture<?> task) {if (isShutdown())reject(task);else {super.getQueue().add(task);if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))task.cancel(false);elseensurePrestart();} }ScheduledFutureTask 工作原理如下圖所示【太懶了,不想畫(huà)圖了,盜圖一張】。
?
1、ScheduledFutureTask會(huì)放入優(yōu)先阻塞隊(duì)列:ScheduledThreadPoolExecutor.DelayedWorkQueue(二叉最小堆實(shí)現(xiàn))
2、上圖中的Thread對(duì)象即ThreadPoolExecutor.Worker,實(shí)現(xiàn)了Runnable接口
/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*/ Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this); }/** Delegates main run loop to outer runWorker */ public void run() {runWorker(this); }1、Worker中維護(hù)了Thread對(duì)象,Thread對(duì)象的Runnable實(shí)例即Worker自身
2、ThreadPoolExecutor#addWorker方法中會(huì)創(chuàng)建Worker對(duì)象,然后拿到Worker中的thread實(shí)例并start,這樣就創(chuàng)建了線程池中的一個(gè)線程實(shí)例
3、Worker的run方法會(huì)調(diào)用ThreadPoolExecutor#runWorker方法,這才是任務(wù)最終被執(zhí)行的地方,該方法示意如下
(1)首先取傳入的task執(zhí)行,如果task是null,只要該線程池處于運(yùn)行狀態(tài),就會(huì)通過(guò)getTask方法從workQueue中取任務(wù)。ThreadPoolExecutor的execute方法會(huì)在無(wú)法產(chǎn)生core線程的時(shí)候向 workQueue隊(duì)列中offer任務(wù)。
getTask方法從隊(duì)列中取task的時(shí)候會(huì)根據(jù)相關(guān)配置決定是否阻塞和阻塞多久。如果getTask方法結(jié)束,返回的是null,runWorker循環(huán)結(jié)束,執(zhí)行processWorkerExit方法。
至此,該線程結(jié)束自己的使命,從線程池中“消失”。
(2)在開(kāi)始執(zhí)行任務(wù)之前,會(huì)調(diào)用Worker的lock方法,目的是阻止task正在被執(zhí)行的時(shí)候被interrupt,通過(guò)調(diào)用clearInterruptsForTaskRun方法來(lái)保證的(后面可以看一下這個(gè)方法),該線程沒(méi)有自己的interrupt set了。
(3)beforeExecute和afterExecute方法用于在執(zhí)行任務(wù)前后執(zhí)行一些自定義的操作,這兩個(gè)方法是空的,留給繼承類去填充功能。
我們可以在beforeExecute方法中拋出異常,這樣task不會(huì)被執(zhí)行,而且在跳出該循環(huán)的時(shí)候completedAbruptly的值是true,表示the worker died due to user exception,會(huì)用decrementWorkerCount調(diào)整wc。
(4)因?yàn)镽unnable的run方法不能拋出Throwables異常,所以這里重新包裝異常然后拋出,拋出的異常會(huì)使當(dāng)當(dāng)前線程死掉,可以在afterExecute中對(duì)異常做一些處理。
(5)afterExecute方法也可能拋出異常,也可能使當(dāng)前線程死掉。
四、動(dòng)態(tài)創(chuàng)建定時(shí)任務(wù)
TaskConfiguration 配置類
@Configuration @EnableScheduling @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class TaskConfiguration {@Bean(name = ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public ScheduledExecutorService scheduledAnnotationProcessor() {return Executors.newScheduledThreadPool(5, new DefaultThreadFactory());}private static class DefaultThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;DefaultThreadFactory() {SecurityManager s = System.getSecurityManager();group = (s != null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();namePrefix = "pool-" +poolNumber.getAndIncrement() +"-schedule-";}@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);if (t.isDaemon()) {t.setDaemon(false);}if (t.getPriority() != Thread.NORM_PRIORITY) {t.setPriority(Thread.NORM_PRIORITY);}return t;}} }1、保證ConcurrentTaskScheduler不使用默認(rèn)單線程的ScheduledExecutor,而是corePoolSize=5的線程池
2、自定義線程池工廠類
DynamicTask 動(dòng)態(tài)定時(shí)任務(wù)
@Configuration public class DynamicTask implements SchedulingConfigurer {private static Logger LOGGER = LoggerFactory.getLogger(DynamicTask.class);private static final ExecutorService es = new ThreadPoolExecutor(10, 20,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(10),new DynamicTaskConsumeThreadFactory());private volatile ScheduledTaskRegistrar registrar;private final ConcurrentHashMap<String, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>();private final ConcurrentHashMap<String, CronTask> cronTasks = new ConcurrentHashMap<>();private volatile List<TaskConstant> taskConstants = Lists.newArrayList();@Overridepublic void configureTasks(ScheduledTaskRegistrar registrar) {this.registrar = registrar;this.registrar.addTriggerTask(() -> {if (!CollectionUtils.isEmpty(taskConstants)) {LOGGER.info("檢測(cè)動(dòng)態(tài)定時(shí)任務(wù)列表...");List<TimingTask> tts = new ArrayList<>();taskConstants.forEach(taskConstant -> {TimingTask tt = new TimingTask();tt.setExpression(taskConstant.getCron());tt.setTaskId("dynamic-task-" + taskConstant.getTaskId());tts.add(tt);});this.refreshTasks(tts);}}, triggerContext -> new PeriodicTrigger(5L, TimeUnit.SECONDS).nextExecutionTime(triggerContext));}public List<TaskConstant> getTaskConstants() {return taskConstants;}private void refreshTasks(List<TimingTask> tasks) {//取消已經(jīng)刪除的策略任務(wù)Set<String> taskIds = scheduledFutures.keySet();for (String taskId : taskIds) {if (!exists(tasks, taskId)) {scheduledFutures.get(taskId).cancel(false);}}for (TimingTask tt : tasks) {String expression = tt.getExpression();if (StringUtils.isBlank(expression) || !CronSequenceGenerator.isValidExpression(expression)) {LOGGER.error("定時(shí)任務(wù)DynamicTask cron表達(dá)式不合法: " + expression);continue;}//如果配置一致,則不需要重新創(chuàng)建定時(shí)任務(wù)if (scheduledFutures.containsKey(tt.getTaskId())&& cronTasks.get(tt.getTaskId()).getExpression().equals(expression)) {continue;}//如果策略執(zhí)行時(shí)間發(fā)生了變化,則取消當(dāng)前策略的任務(wù)if (scheduledFutures.containsKey(tt.getTaskId())) {scheduledFutures.remove(tt.getTaskId()).cancel(false);cronTasks.remove(tt.getTaskId());}CronTask task = new CronTask(tt, expression);ScheduledFuture<?> future = registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger());cronTasks.put(tt.getTaskId(), task);scheduledFutures.put(tt.getTaskId(), future);}}private boolean exists(List<TimingTask> tasks, String taskId) {for (TimingTask task : tasks) {if (task.getTaskId().equals(taskId)) {return true;}}return false;}@PreDestroypublic void destroy() {this.registrar.destroy();}public static class TaskConstant {private String cron;private String taskId;public String getCron() {return cron;}public void setCron(String cron) {this.cron = cron;}public String getTaskId() {return taskId;}public void setTaskId(String taskId) {this.taskId = taskId;}}private class TimingTask implements Runnable {private String expression;private String taskId;public String getTaskId() {return taskId;}public void setTaskId(String taskId) {this.taskId = taskId;}@Overridepublic void run() {//設(shè)置隊(duì)列大小10LOGGER.error("當(dāng)前CronTask: " + this);DynamicBlockingQueue queue = new DynamicBlockingQueue(3);es.submit(() -> {while (!queue.isDone() || !queue.isEmpty()) {try {String content = queue.poll(500, TimeUnit.MILLISECONDS);if (StringUtils.isBlank(content)) {return;}LOGGER.info("DynamicBlockingQueue 消費(fèi):" + content);TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}}});//隊(duì)列放入數(shù)據(jù)for (int i = 0; i < 5; ++i) {try {queue.put(String.valueOf(i));LOGGER.info("DynamicBlockingQueue 生產(chǎn):" + i);} catch (InterruptedException e) {e.printStackTrace();}}queue.setDone(true);}public String getExpression() {return expression;}public void setExpression(String expression) {this.expression = expression;}@Overridepublic String toString() {return ReflectionToStringBuilder.toString(this, ToStringStyle.JSON_STYLE, false, false, TimingTask.class);}}/*** 隊(duì)列消費(fèi)線程工廠類*/private static class DynamicTaskConsumeThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;DynamicTaskConsumeThreadFactory() {SecurityManager s = System.getSecurityManager();group = (s != null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();namePrefix = "pool-" +poolNumber.getAndIncrement() +"-dynamic-task-";}@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);if (t.isDaemon()) {t.setDaemon(false);}if (t.getPriority() != Thread.NORM_PRIORITY) {t.setPriority(Thread.NORM_PRIORITY);}return t;}}private static class DynamicBlockingQueue extends LinkedBlockingQueue<String> {DynamicBlockingQueue(int capacity) {super(capacity);}private volatile boolean done = false;public boolean isDone() {return done;}public void setDone(boolean done) {this.done = done;}} }1、taskConstants 動(dòng)態(tài)任務(wù)列表
2、ScheduledTaskRegistrar#addTriggerTask 添加動(dòng)態(tài)周期定時(shí)任務(wù),檢測(cè)動(dòng)態(tài)任務(wù)列表的變化
CronTask task = new CronTask(tt, expression); ScheduledFuture<?> future = registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger()); cronTasks.put(tt.getTaskId(), task); scheduledFutures.put(tt.getTaskId(), future);3、動(dòng)態(tài)創(chuàng)建cron定時(shí)任務(wù),拿到ScheduledFuture實(shí)例并緩存起來(lái)
4、在刷新任務(wù)列表時(shí),通過(guò)緩存的ScheduledFuture實(shí)例和CronTask實(shí)例,來(lái)決定是否取消、移除失效的動(dòng)態(tài)定時(shí)任務(wù)。
DynamicTaskTest 動(dòng)態(tài)定時(shí)任務(wù)測(cè)試類
@RunWith(SpringRunner.class) @SpringBootTest public class DynamicTaskTest {@Autowiredprivate DynamicTask dynamicTask;@Testpublic void test() throws InterruptedException {List<DynamicTask.TaskConstant> taskConstans = dynamicTask.getTaskConstants();DynamicTask.TaskConstant taskConstant = new DynamicTask.TaskConstant();taskConstant.setCron("0/5 * * * * ?");taskConstant.setTaskId("test1");taskConstans.add(taskConstant);DynamicTask.TaskConstant taskConstant1 = new DynamicTask.TaskConstant();taskConstant1.setCron("0/5 * * * * ?");taskConstant1.setTaskId("test2");taskConstans.add(taskConstant1);DynamicTask.TaskConstant taskConstant2 = new DynamicTask.TaskConstant();taskConstant2.setCron("0/5 * * * * ?");taskConstant2.setTaskId("test3");taskConstans.add(taskConstant2);TimeUnit.SECONDS.sleep(40);//移除并添加新的配置taskConstans.remove(taskConstans.size() - 1);DynamicTask.TaskConstant taskConstant3 = new DynamicTask.TaskConstant();taskConstant3.setCron("0/5 * * * * ?");taskConstant3.setTaskId("test4");taskConstans.add(taskConstant3); // TimeUnit.MINUTES.sleep(50);} }?
轉(zhuǎn)載于:https://www.cnblogs.com/hujunzheng/p/10353390.html
總結(jié)
以上是生活随笔為你收集整理的Springboot定时任务原理及如何动态创建定时任务的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: C / C++ 之整体知识总结,点进来,
- 下一篇: 菲律宾韩国都得到美国支持,为啥菲律宾经济