一文揭秘定时任务调度框架quartz
之前寫(xiě)過(guò)quartz或者引用過(guò)quartz的一些文章,有很多人給我發(fā)消息問(wèn)quartz的相關(guān)問(wèn)題,
quartz 報(bào)錯(cuò):java.lang.classNotFoundException
quartz源碼分析之深刻理解job,sheduler,calendar,trigger及l(fā)istener之間的關(guān)系
Quartz框架多個(gè)trigger任務(wù)執(zhí)行出現(xiàn)漏執(zhí)行的問(wèn)題分析--轉(zhuǎn)
quartz集群調(diào)度機(jī)制調(diào)研及源碼分析---轉(zhuǎn)載
分布式定時(shí)任務(wù)調(diào)度系統(tǒng)技術(shù)選型--轉(zhuǎn)
趁著年底比較清閑,把quartz的問(wèn)題整理了一下,順帶翻了翻源碼,做了一些總結(jié),希望能幫助到一些人或者減少人們探索的時(shí)間。
注意,使用版本為quartz2.2.3? spring boot2.1.3
1.quartz的核心組件
?1.1 Job組件
1.1.1Job
Job負(fù)責(zé)任務(wù)執(zhí)行的邏輯,所有邏輯在execute()方法中,執(zhí)行所需要的數(shù)據(jù)存放在JobExecutionContext 中
Job實(shí)例:
@PersistJobDataAfterExecution @DisallowConcurrentExecution public class ColorJob implements Job {private static Logger _log = LoggerFactory.getLogger(ColorJob.class);// parameter names specific to this jobpublic static final String FAVORITE_COLOR = "favorite color";public static final String EXECUTION_COUNT = "count";// Since Quartz will re-instantiate a class every time it// gets executed, members non-static member variables can// not be used to maintain state!private int _counter = 1;/*** <p>* Empty constructor for job initialization* </p>* <p>* Quartz requires a public empty constructor so that the* scheduler can instantiate the class whenever it needs.* </p>*/public ColorJob() {}/*** <p>* Called by the <code>{@link org.quartz.Scheduler}</code> when a* <code>{@link org.quartz.Trigger}</code> fires that is associated with* the <code>Job</code>.* </p>* * @throws JobExecutionException* if there is an exception while executing the job.*/public void execute(JobExecutionContext context)throws JobExecutionException {// This job simply prints out its job name and the// date and time that it is runningJobKey jobKey = context.getJobDetail().getKey();// Grab and print passed parametersJobDataMap data = context.getJobDetail().getJobDataMap();String favoriteColor = data.getString(FAVORITE_COLOR);int count = data.getInt(EXECUTION_COUNT);_log.info("ColorJob: " + jobKey + " executing at " + new Date() + "\n" +" favorite color is " + favoriteColor + "\n" + " execution count (from job map) is " + count + "\n" + " execution count (from job member variable) is " + _counter);// increment the count and store it back into the // job map so that job state can be properly maintainedcount++;data.put(EXECUTION_COUNT, count);// Increment the local member variable // This serves no real purpose since job state can not // be maintained via member variables!_counter++;}}1.1.2 JobDetail存儲(chǔ)Job的信息
?主要負(fù)責(zé)
1.指定執(zhí)行的Job類(lèi),唯一標(biāo)識(shí)(job名稱和組別 名稱)
2.存儲(chǔ)JobDataMap信息
// job1 will only run 5 times (at start time, plus 4 repeats), every 10 secondsJobDetail job1 = newJob(ColorJob.class).withIdentity("job1", "group1").build();// pass initialization parameters into the jobjob1.getJobDataMap().put(ColorJob.FAVORITE_COLOR, "Green");job1.getJobDataMap().put(ColorJob.EXECUTION_COUNT, 1);?數(shù)據(jù)庫(kù)存儲(chǔ)如下:
1.1.3?Quartz JobBuilder提供了一個(gè)鏈?zhǔn)絘pi創(chuàng)建JobDetail
@Bean public JobDetail jobDetail() {return JobBuilder.newJob().ofType(SampleJob.class).storeDurably().withIdentity("Qrtz_Job_Detail") .withDescription("Invoke Sample Job service...").build(); }1.1.4?Spring JobDetailFactoryBean
? ?spring提供的一個(gè)創(chuàng)建JobDetail的方式工廠bean
@Bean public JobDetailFactoryBean jobDetail() {JobDetailFactoryBean jobDetailFactory = new JobDetailFactoryBean();jobDetailFactory.setJobClass(SampleJob.class);jobDetailFactory.setDescription("Invoke Sample Job service...");jobDetailFactory.setDurability(true);return jobDetailFactory; }?
1.2 Trigger組件
?
trigger的狀態(tài)不同
trigger的狀態(tài)// STATESString STATE_WAITING = "WAITING";String STATE_ACQUIRED = "ACQUIRED";String STATE_EXECUTING = "EXECUTING";String STATE_COMPLETE = "COMPLETE";String STATE_BLOCKED = "BLOCKED"; String STATE_ERROR = "ERROR"; String STATE_PAUSED = "PAUSED"; String STATE_PAUSED_BLOCKED = "PAUSED_BLOCKED"; String STATE_DELETED = "DELETED";狀態(tài)的表結(jié)構(gòu)
trigger的類(lèi)型
// TRIGGER TYPES/** Simple Trigger type. */String TTYPE_SIMPLE = "SIMPLE";/** Cron Trigger type. */String TTYPE_CRON = "CRON";/** Calendar Interval Trigger type. */String TTYPE_CAL_INT = "CAL_INT"; /** Daily Time Interval Trigger type. */ String TTYPE_DAILY_TIME_INT = "DAILY_I"; /** A general blob Trigger type. */ String TTYPE_BLOB = "BLOB";對(duì)應(yīng)表結(jié)構(gòu)
?
1.2.1 trigger實(shí)例
SimpleTrigger trigger1 = newTrigger().withIdentity("trigger1", "group1").startAt(startTime).withSchedule(simpleSchedule().withIntervalInSeconds(10).withRepeatCount(4)).build();Trigger存儲(chǔ)在mysql中
?1.2.2?Quartz TriggerBuilder
提供了一個(gè)鏈?zhǔn)絼?chuàng)建Trigger的api
@Bean public Trigger trigger(JobDetail job) {return TriggerBuilder.newTrigger().forJob(job).withIdentity("Qrtz_Trigger").withDescription("Sample trigger").withSchedule(simpleSchedule().repeatForever().withIntervalInHours(1)).build(); }1.2.3?Spring SimpleTriggerFactoryBean
?spring提供的一個(gè)創(chuàng)建SimpleTrigger的工廠類(lèi)
@Bean public SimpleTriggerFactoryBean trigger(JobDetail job) {SimpleTriggerFactoryBean trigger = new SimpleTriggerFactoryBean();trigger.setJobDetail(job);trigger.setRepeatInterval(3600000);trigger.setRepeatCount(SimpleTrigger.REPEAT_INDEFINITELY);return trigger; }1.3 調(diào)度組件
1.3.1 quartz提供的工廠類(lèi)
@Bean public Scheduler scheduler(Trigger trigger, JobDetail job) {StdSchedulerFactory factory = new StdSchedulerFactory();factory.initialize(new ClassPathResource("quartz.properties").getInputStream());Scheduler scheduler = factory.getScheduler();scheduler.setJobFactory(springBeanJobFactory());scheduler.scheduleJob(job, trigger);scheduler.start();return scheduler; }1.3.2 spring提供的工廠bean
@Bean public SchedulerFactoryBean scheduler(Trigger trigger, JobDetail job) {SchedulerFactoryBean schedulerFactory = new SchedulerFactoryBean();schedulerFactory.setConfigLocation(new ClassPathResource("quartz.properties"));schedulerFactory.setJobFactory(springBeanJobFactory());schedulerFactory.setJobDetails(job);schedulerFactory.setTriggers(trigger);return schedulerFactory; }2.工作原理
? 2.1 核心類(lèi)QuartzScheduler
Scheduler實(shí)現(xiàn)類(lèi)StdScheduler封裝了核心工作類(lèi)QuartzScheduler
/*** <p>* Construct a <code>StdScheduler</code> instance to proxy the given* <code>QuartzScheduler</code> instance, and with the given <code>SchedulingContext</code>.* </p>*/public StdScheduler(QuartzScheduler sched) {this.sched = sched;}? 2.2 JobDetail的存取
public void addJob(JobDetail jobDetail, boolean replace, boolean storeNonDurableWhileAwaitingScheduling) throws SchedulerException {validateState();if (!storeNonDurableWhileAwaitingScheduling && !jobDetail.isDurable()) {throw new SchedulerException("Jobs added with no trigger must be durable.");}resources.getJobStore().storeJob(jobDetail, replace);notifySchedulerThread(0L);notifySchedulerListenersJobAdded(jobDetail);}2.2.1 存儲(chǔ)JobDetail信息(以mysql Jdbc方式為例)
/*** <p>* Insert or update a job.* </p>*/protected void storeJob(Connection conn, JobDetail newJob, boolean replaceExisting)throws JobPersistenceException {boolean existingJob = jobExists(conn, newJob.getKey());try {if (existingJob) {if (!replaceExisting) { throw new ObjectAlreadyExistsException(newJob); }getDelegate().updateJobDetail(conn, newJob);} else {getDelegate().insertJobDetail(conn, newJob);}} catch (IOException e) {throw new JobPersistenceException("Couldn't store job: "+ e.getMessage(), e);} catch (SQLException e) {throw new JobPersistenceException("Couldn't store job: "+ e.getMessage(), e);}}調(diào)用StdJDBCDelegate實(shí)現(xiàn)
/*** <p>* Insert the job detail record.* </p>* * @param conn* the DB Connection* @param job* the job to insert* @return number of rows inserted* @throws IOException* if there were problems serializing the JobDataMap*/public int insertJobDetail(Connection conn, JobDetail job)throws IOException, SQLException {ByteArrayOutputStream baos = serializeJobData(job.getJobDataMap());PreparedStatement ps = null;int insertResult = 0;try {ps = conn.prepareStatement(rtp(INSERT_JOB_DETAIL));ps.setString(1, job.getKey().getName());ps.setString(2, job.getKey().getGroup());ps.setString(3, job.getDescription());ps.setString(4, job.getJobClass().getName());setBoolean(ps, 5, job.isDurable());setBoolean(ps, 6, job.isConcurrentExectionDisallowed());setBoolean(ps, 7, job.isPersistJobDataAfterExecution());setBoolean(ps, 8, job.requestsRecovery());setBytes(ps, 9, baos);insertResult = ps.executeUpdate();} finally {closeStatement(ps);}return insertResult;}注意:JobDataMap序列化后以Blob形式存儲(chǔ)到數(shù)據(jù)庫(kù)中
StdJDBCConstants中執(zhí)行sql如下:
String INSERT_JOB_DETAIL = "INSERT INTO "+ TABLE_PREFIX_SUBST + TABLE_JOB_DETAILS + " (" + COL_SCHEDULER_NAME + ", " + COL_JOB_NAME+ ", " + COL_JOB_GROUP + ", " + COL_DESCRIPTION + ", "+ COL_JOB_CLASS + ", " + COL_IS_DURABLE + ", " + COL_IS_NONCONCURRENT + ", " + COL_IS_UPDATE_DATA + ", " + COL_REQUESTS_RECOVERY + ", "+ COL_JOB_DATAMAP + ") " + " VALUES(" + SCHED_NAME_SUBST + ", ?, ?, ?, ?, ?, ?, ?, ?, ?)";2.2.2? 查詢JobDetail
? 強(qiáng)調(diào)一下,因JobDetail中的JobDataMap是以Blob形式存放到數(shù)據(jù)庫(kù)中的(也可以通過(guò)useProperties屬性修改成string存儲(chǔ),默認(rèn)是false,Blob形式存儲(chǔ)),所以查詢時(shí)需要特殊處理:StdJDBCDelegate.java
/*** <p>* Select the JobDetail object for a given job name / group name.* </p>* * @param conn* the DB Connection* @return the populated JobDetail object* @throws ClassNotFoundException* if a class found during deserialization cannot be found or if* the job class could not be found* @throws IOException* if deserialization causes an error*/public JobDetail selectJobDetail(Connection conn, JobKey jobKey,ClassLoadHelper loadHelper)throws ClassNotFoundException, IOException, SQLException {PreparedStatement ps = null;ResultSet rs = null;try {ps = conn.prepareStatement(rtp(SELECT_JOB_DETAIL));ps.setString(1, jobKey.getName());ps.setString(2, jobKey.getGroup());rs = ps.executeQuery();JobDetailImpl job = null;if (rs.next()) {job = new JobDetailImpl();job.setName(rs.getString(COL_JOB_NAME));job.setGroup(rs.getString(COL_JOB_GROUP));job.setDescription(rs.getString(COL_DESCRIPTION));job.setJobClass( loadHelper.loadClass(rs.getString(COL_JOB_CLASS), Job.class));job.setDurability(getBoolean(rs, COL_IS_DURABLE));job.setRequestsRecovery(getBoolean(rs, COL_REQUESTS_RECOVERY));Map<?, ?> map = null;if (canUseProperties()) {map = getMapFromProperties(rs);} else {map = (Map<?, ?>) getObjectFromBlob(rs, COL_JOB_DATAMAP);}if (null != map) {job.setJobDataMap(new JobDataMap(map));}}return job;} finally {closeResultSet(rs);closeStatement(ps);}}2.3 查詢trigger
/*** <p>* Retrieve the given <code>{@link org.quartz.Trigger}</code>.* </p>* * @return The desired <code>Trigger</code>, or null if there is no* match.*/public OperableTrigger retrieveTrigger(final TriggerKey triggerKey) throws JobPersistenceException {return (OperableTrigger)executeWithoutLock( // no locks necessary for read...new TransactionCallback() {public Object execute(Connection conn) throws JobPersistenceException {return retrieveTrigger(conn, triggerKey);}});}protected OperableTrigger retrieveTrigger(Connection conn, TriggerKey key)throws JobPersistenceException {try {return getDelegate().selectTrigger(conn, key);} catch (Exception e) {throw new JobPersistenceException("Couldn't retrieve trigger: "+ e.getMessage(), e);}}StdJDBCDelegate.java
/*** <p>* Select a trigger.* </p>* * @param conn* the DB Connection* @return the <code>{@link org.quartz.Trigger}</code> object* @throws JobPersistenceException */public OperableTrigger selectTrigger(Connection conn, TriggerKey triggerKey) throws SQLException, ClassNotFoundException,IOException, JobPersistenceException {PreparedStatement ps = null;ResultSet rs = null;try {OperableTrigger trigger = null;ps = conn.prepareStatement(rtp(SELECT_TRIGGER));ps.setString(1, triggerKey.getName());ps.setString(2, triggerKey.getGroup());rs = ps.executeQuery();if (rs.next()) {String jobName = rs.getString(COL_JOB_NAME);String jobGroup = rs.getString(COL_JOB_GROUP);String description = rs.getString(COL_DESCRIPTION);long nextFireTime = rs.getLong(COL_NEXT_FIRE_TIME);long prevFireTime = rs.getLong(COL_PREV_FIRE_TIME);String triggerType = rs.getString(COL_TRIGGER_TYPE);long startTime = rs.getLong(COL_START_TIME);long endTime = rs.getLong(COL_END_TIME);String calendarName = rs.getString(COL_CALENDAR_NAME);int misFireInstr = rs.getInt(COL_MISFIRE_INSTRUCTION);int priority = rs.getInt(COL_PRIORITY);Map<?, ?> map = null;if (canUseProperties()) {map = getMapFromProperties(rs);} else {map = (Map<?, ?>) getObjectFromBlob(rs, COL_JOB_DATAMAP);}Date nft = null;if (nextFireTime > 0) {nft = new Date(nextFireTime);}Date pft = null;if (prevFireTime > 0) {pft = new Date(prevFireTime);}Date startTimeD = new Date(startTime);Date endTimeD = null;if (endTime > 0) {endTimeD = new Date(endTime);}if (triggerType.equals(TTYPE_BLOB)) {rs.close(); rs = null;ps.close(); ps = null;ps = conn.prepareStatement(rtp(SELECT_BLOB_TRIGGER));ps.setString(1, triggerKey.getName());ps.setString(2, triggerKey.getGroup());rs = ps.executeQuery();if (rs.next()) {trigger = (OperableTrigger) getObjectFromBlob(rs, COL_BLOB);}}else {TriggerPersistenceDelegate tDel = findTriggerPersistenceDelegate(triggerType);if(tDel == null)throw new JobPersistenceException("No TriggerPersistenceDelegate for trigger discriminator type: " + triggerType);TriggerPropertyBundle triggerProps = null;try {triggerProps = tDel.loadExtendedTriggerProperties(conn, triggerKey);} catch (IllegalStateException isex) {if (isTriggerStillPresent(ps)) {throw isex;} else {// QTZ-386 Trigger has been deletedreturn null;}}TriggerBuilder<?> tb = newTrigger().withDescription(description).withPriority(priority).startAt(startTimeD).endAt(endTimeD).withIdentity(triggerKey).modifiedByCalendar(calendarName).withSchedule(triggerProps.getScheduleBuilder()).forJob(jobKey(jobName, jobGroup));if (null != map) {tb.usingJobData(new JobDataMap(map));}trigger = (OperableTrigger) tb.build();trigger.setMisfireInstruction(misFireInstr);trigger.setNextFireTime(nft);trigger.setPreviousFireTime(pft);setTriggerStateProperties(trigger, triggerProps);} }return trigger;} finally {closeResultSet(rs);closeStatement(ps);}}執(zhí)行的sql:
String SELECT_TRIGGER = "SELECT * FROM "+ TABLE_PREFIX_SUBST + TABLE_TRIGGERS + " WHERE "+ COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST+ " AND " + COL_TRIGGER_NAME + " = ? AND " + COL_TRIGGER_GROUP + " = ?";和JobDetail一樣,也存在Blob的問(wèn)題,不再贅述。
2.4 調(diào)度執(zhí)行線程QuartzSchedulerThread
/*** <p>* The main processing loop of the <code>QuartzSchedulerThread</code>.* </p>*/@Overridepublic void run() {boolean lastAcquireFailed = false;while (!halted.get()) {try {// check if we're supposed to pause...synchronized (sigLock) {while (paused && !halted.get()) {try {// wait until togglePause(false) is called...sigLock.wait(1000L);} catch (InterruptedException ignore) {}}if (halted.get()) {break;}}int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads... List<OperableTrigger> triggers = null;long now = System.currentTimeMillis();clearSignaledSchedulingChange();try { triggers = qsRsrcs.getJobStore().acquireNextTriggers(now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); //1.lastAcquireFailed = false;if (log.isDebugEnabled()) log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");} catch (JobPersistenceException jpe) {if(!lastAcquireFailed) {qs.notifySchedulerListenersError("An error occurred while scanning for the next triggers to fire.",jpe);}lastAcquireFailed = true;continue;} catch (RuntimeException e) {if(!lastAcquireFailed) {getLog().error("quartzSchedulerThreadLoop: RuntimeException "+e.getMessage(), e);}lastAcquireFailed = true;continue;}if (triggers != null && !triggers.isEmpty()) {now = System.currentTimeMillis();long triggerTime = triggers.get(0).getNextFireTime().getTime();long timeUntilTrigger = triggerTime - now;while(timeUntilTrigger > 2) {synchronized (sigLock) {if (halted.get()) {break;}if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {try {// we could have blocked a long while// on 'synchronize', so we must recomputenow = System.currentTimeMillis();timeUntilTrigger = triggerTime - now;if(timeUntilTrigger >= 1)sigLock.wait(timeUntilTrigger);} catch (InterruptedException ignore) {}}}if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {break;}now = System.currentTimeMillis();timeUntilTrigger = triggerTime - now;}// this happens if releaseIfScheduleChangedSignificantly decided to release triggersif(triggers.isEmpty())continue;// set triggers to 'executing'List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();boolean goAhead = true;synchronized(sigLock) {goAhead = !halted.get();}if(goAhead) {try {List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers); //2if(res != null)bndles = res;} catch (SchedulerException se) {qs.notifySchedulerListenersError("An error occurred while firing triggers '"+ triggers + "'", se);//QTZ-179 : a problem occurred interacting with the triggers from the db//we release them and loop againfor (int i = 0; i < triggers.size(); i++) {qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));}continue;}}for (int i = 0; i < bndles.size(); i++) {TriggerFiredResult result = bndles.get(i);TriggerFiredBundle bndle = result.getTriggerFiredBundle();Exception exception = result.getException();if (exception instanceof RuntimeException) {getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));continue;}// it's possible to get 'null' if the triggers was paused,// blocked, or other similar occurrences that prevent it being// fired at this time... or if the scheduler was shutdown (halted)if (bndle == null) {qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));continue;}JobRunShell shell = null;try {shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);shell.initialize(qs);} catch (SchedulerException se) {qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);continue;}if (qsRsrcs.getThreadPool().runInThread(shell) == false) {// this case should never happen, as it is indicative of the// scheduler being shutdown or a bug in the thread pool or// a thread pool being used concurrently - which the docs// say not to do...getLog().error("ThreadPool.runInThread() return false!");qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);}}continue; // while (!halted) }} else { // if(availThreadCount > 0)// should never happen, if threadPool.blockForAvailableThreads() follows contractcontinue; // while (!halted) }long now = System.currentTimeMillis();long waitTime = now + getRandomizedIdleWaitTime();long timeUntilContinue = waitTime - now;synchronized(sigLock) {try {if(!halted.get()) {// QTZ-336 A job might have been completed in the mean time and we might have// missed the scheduled changed signal by not waiting for the notify() yet// Check that before waiting for too long in case this very job needs to be// scheduled very soonif (!isScheduleChanged()) {sigLock.wait(timeUntilContinue);}}} catch (InterruptedException ignore) {}}} catch(RuntimeException re) {getLog().error("Runtime error occurred in main trigger firing loop.", re);}} // while (!halted)// drop references to scheduler stuff to aid garbage collection...qs = null;qsRsrcs = null;}?
2.4.1 獲取trigger(紅色1)
protected List<OperableTrigger> acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow)throws JobPersistenceException {if (timeWindow < 0) {throw new IllegalArgumentException();}List<OperableTrigger> acquiredTriggers = new ArrayList<OperableTrigger>();Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>();final int MAX_DO_LOOP_RETRY = 3;int currentLoopCount = 0;do {currentLoopCount ++;try {List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);// No trigger is ready to fire yet.if (keys == null || keys.size() == 0)return acquiredTriggers;long batchEnd = noLaterThan;for(TriggerKey triggerKey: keys) {// If our trigger is no longer available, try a new one.OperableTrigger nextTrigger = retrieveTrigger(conn, triggerKey);if(nextTrigger == null) {continue; // next trigger }// If trigger's job is set as @DisallowConcurrentExecution, and it has already been added to result, then// put it back into the timeTriggers set and continue to search for next trigger.JobKey jobKey = nextTrigger.getJobKey();JobDetail job;try {job = retrieveJob(conn, jobKey);} catch (JobPersistenceException jpe) {try {getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);getDelegate().updateTriggerState(conn, triggerKey, STATE_ERROR);} catch (SQLException sqle) {getLog().error("Unable to set trigger state to ERROR.", sqle);}continue;}if (job.isConcurrentExectionDisallowed()) {if (acquiredJobKeysForNoConcurrentExec.contains(jobKey)) {continue; // next trigger} else {acquiredJobKeysForNoConcurrentExec.add(jobKey);}}if (nextTrigger.getNextFireTime().getTime() > batchEnd) {break;}// We now have a acquired trigger, let's add to return list.// If our trigger was no longer in the expected state, try a new one.int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);if (rowsUpdated <= 0) {continue; // next trigger }nextTrigger.setFireInstanceId(getFiredTriggerRecordId());getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null);if(acquiredTriggers.isEmpty()) {batchEnd = Math.max(nextTrigger.getNextFireTime().getTime(), System.currentTimeMillis()) + timeWindow;}acquiredTriggers.add(nextTrigger);}// if we didn't end up with any trigger to fire from that first// batch, try again for another batch. We allow with a max retry count.if(acquiredTriggers.size() == 0 && currentLoopCount < MAX_DO_LOOP_RETRY) {continue;}// We are done with the while loop.break;} catch (Exception e) {throw new JobPersistenceException("Couldn't acquire next trigger: " + e.getMessage(), e);}} while (true);// Return the acquired trigger listreturn acquiredTriggers;}2.4.2 觸發(fā)trigger(紅色2)
protected TriggerFiredBundle triggerFired(Connection conn,OperableTrigger trigger)throws JobPersistenceException {JobDetail job;Calendar cal = null;// Make sure trigger wasn't deleted, paused, or completed...try { // if trigger was deleted, state will be STATE_DELETEDString state = getDelegate().selectTriggerState(conn,trigger.getKey());if (!state.equals(STATE_ACQUIRED)) {return null;}} catch (SQLException e) {throw new JobPersistenceException("Couldn't select trigger state: "+ e.getMessage(), e);}try {job = retrieveJob(conn, trigger.getJobKey());if (job == null) { return null; }} catch (JobPersistenceException jpe) {try {getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);getDelegate().updateTriggerState(conn, trigger.getKey(),STATE_ERROR);} catch (SQLException sqle) {getLog().error("Unable to set trigger state to ERROR.", sqle);}throw jpe;}if (trigger.getCalendarName() != null) {cal = retrieveCalendar(conn, trigger.getCalendarName());if (cal == null) { return null; }}try {getDelegate().updateFiredTrigger(conn, trigger, STATE_EXECUTING, job);} catch (SQLException e) {throw new JobPersistenceException("Couldn't insert fired trigger: "+ e.getMessage(), e);}Date prevFireTime = trigger.getPreviousFireTime();// call triggered - to update the trigger's next-fire-time state... trigger.triggered(cal);String state = STATE_WAITING;boolean force = true;if (job.isConcurrentExectionDisallowed()) {state = STATE_BLOCKED;force = false;try {getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),STATE_BLOCKED, STATE_WAITING);getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),STATE_BLOCKED, STATE_ACQUIRED);getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),STATE_PAUSED_BLOCKED, STATE_PAUSED);} catch (SQLException e) {throw new JobPersistenceException("Couldn't update states of blocked triggers: "+ e.getMessage(), e);}} if (trigger.getNextFireTime() == null) {state = STATE_COMPLETE;force = true;}storeTrigger(conn, trigger, job, true, state, force, false);job.getJobDataMap().clearDirtyFlag();return new TriggerFiredBundle(job, trigger, cal, trigger.getKey().getGroup().equals(Scheduler.DEFAULT_RECOVERY_GROUP), new Date(), trigger.getPreviousFireTime(), prevFireTime, trigger.getNextFireTime());}?2.4.3 數(shù)據(jù)庫(kù)鎖
?StdRowLockSemaphore針對(duì)支持select for update的數(shù)據(jù)庫(kù)如mysql
UpdateLockRowSemaphore針對(duì)不支持select for update的數(shù)據(jù)庫(kù)如mssqlserver
?StdRowLockSemaphore的實(shí)現(xiàn)如下:
public static final String SELECT_FOR_LOCK = "SELECT * FROM "+ TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST+ " AND " + COL_LOCK_NAME + " = ? FOR UPDATE";public static final String INSERT_LOCK = "INSERT INTO "+ TABLE_PREFIX_SUBST + TABLE_LOCKS + "(" + COL_SCHEDULER_NAME + ", " + COL_LOCK_NAME + ") VALUES (" + SCHED_NAME_SUBST + ", ?)";?
總結(jié):
? 1.quartz的三大組件Job/trigger/scheduler,job負(fù)責(zé)業(yè)務(wù)邏輯,trigger負(fù)責(zé)執(zhí)行時(shí)機(jī),scheduler負(fù)責(zé)調(diào)度Job和trigger來(lái)執(zhí)行。
? 2.使用mysql作為存儲(chǔ)的話,使用StdJDBCDelegate和數(shù)據(jù)庫(kù)進(jìn)行交互,交互的sql在StdJDBCConstants中定義
? ?3.QuartzScheduler是核心類(lèi),Scheduler做其代理,真正執(zhí)行的是QuartzSchedulerThread
? ?4.JobStore存儲(chǔ)控制,JobStoreSupport的兩個(gè)實(shí)現(xiàn)JobStoreCMT容器管理事務(wù),不需要使用commit和rollback;JobStoreTX用在單機(jī)環(huán)境,需要處理commit和rollback
5.數(shù)據(jù)庫(kù)鎖使用了悲觀鎖select for update,定義為Semaphore
6.qrtz_scheduler_state定義了掃描間隔集群掃描間隔
?
參考文獻(xiàn):
【1】https://www.baeldung.com/spring-quartz-schedule
?【2】https://blog.csdn.net/xiaojin21cen/article/details/79298883
轉(zhuǎn)載于:https://www.cnblogs.com/davidwang456/p/10329616.html
總結(jié)
以上是生活随笔為你收集整理的一文揭秘定时任务调度框架quartz的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: JAVA开发者的Golang快速指南
- 下一篇: Zuul1.0和2.0我们该如何选择?