quartz集群调度机制调研及源码分析---转载
quartz2.2.1集群調度機制調研及源碼分析
引言
quartz集群架構
調度器實例化
調度過程
觸發器的獲取
觸發trigger:
Job執行過程:
總結:
附:
?
引言
quratz是目前最為成熟,使用最廣泛的java任務調度框架,功能強大配置靈活.在企業應用中占重要地位.quratz在集群環境中的使用方式是每個企業級系統都要考慮的問題.早在2006年,在ITeye上就有一篇關于quratz集群方案的討論:http://www.iteye.com/topic/40970?ITeye創始人@Robbin在8樓給出了自己對quartz集群應用方案的意見.
后來有人總結了三種quratz集群方案:http://www.iteye.com/topic/114965
1.單獨啟動一個Job Server來跑job,不部署在web容器中.其他web節點當需要啟動異步任務的時候,可以通過種種方式(DB, JMS, Web Service, etc)通知Job Server,而Job Server收到這個通知之后,把異步任務加載到自己的任務隊列中去。
2.獨立出一個job server,這個server上跑一個spring+quartz的應用,這個應用專門用來啟動任務。在jobserver上加上hessain,得到業務接口,這樣jobserver就可以調用web container中的業務操作,也就是正真執行任務的還是在cluster中的tomcat。在jobserver啟動定時任務之后,輪流調用各地址上的業務操作(類似apache分發tomcat一樣),這樣可以讓不同的定時任務在不同的節點上運行,減低了一臺某個node的壓力
3.quartz本身事實上也是支持集群的。在這種方案下,cluster上的每一個node都在跑quartz,然后也是通過數據中記錄的狀態來判斷這個操作是否正在執行,這就要求cluster上所有的node的時間應該是一樣的。而且每一個node都跑應用就意味著每一個node都需要有自己的線程池來跑quartz.
總的來說,第一種方法,在單獨的server上執行任務,對任務的適用范圍有很大的限制,要訪問在web環境中的各種資源非常麻煩.但是集中式的管理容易從架構上規避了分布式環境的種種同步問題.第二種方法在在第一種方法的基礎上減輕了jobserver的重量,只發送調用請求,不直接執行任務,這樣解決了獨立server無法訪問web環境的問題,而且可以做到節點的輪詢.可以有效地均衡負載.第三種方案是quartz自身支持的集群方案,在架構上完全是分布式的,沒有集中的管理,quratz通過數據庫鎖以及標識字段保證多個節點對任務不重復獲取,并且有負載平衡機制和容錯機制,用少量的冗余,換取了高可用性(high avilable HA)和高可靠性.(個人認為和git的機制有異曲同工之處,分布式的冗余設計,換取可靠性和速度).
本文旨在研究quratz為解決分布式任務調度中存在的防止重復執行和負載均衡等問題而建立的機制.以調度流程作為順序,配合源碼理解其中原理.
quratz的配置,及具體應用請參考CRM項目組的另一篇文章:CRM使用Quartz集群總結分享
quartz集群架構
quartz的分布式架構如上圖,可以看到數據庫是各節點上調度器的樞紐.各個節點并不感知其他節點的存在,只是通過數據庫來進行間接的溝通.
實際上,quartz的分布式策略就是一種以數據庫作為邊界資源的并發策略.每個節點都遵守相同的操作規范,使得對數據庫的操作可以串行執行.而不同名稱的調度器又可以互不影響的并行運行.
組件間的通訊圖如下:(*注:主要的sql語句附在文章最后)
quartz運行時由QuartzSchedulerThread類作為主體,循環執行調度流程。JobStore作為中間層,按照quartz的并發策略執行數據庫操作,完成主要的調度邏輯。JobRunShellFactory負責實例化JobDetail對象,將其放入線程池運行。LockHandler負責獲取LOCKS表中的數據庫鎖。
整個quartz對任務調度的時序大致如下:
梳理一下其中的流程,可以表示為:
0.調度器線程run()
1.獲取待觸發trigger
? ? 1.1數據庫LOCKS表TRIGGER_ACCESS行加鎖
? ? 1.2讀取JobDetail信息
? ? 1.3讀取trigger表中觸發器信息并標記為"已獲取"
? ? 1.4commit事務,釋放鎖
2.觸發trigger
? ? 2.1數據庫LOCKS表STATE_ACCESS行加鎖
? ? 2.2確認trigger的狀態
? ? 2.3讀取trigger的JobDetail信息
? ? 2.4讀取trigger的Calendar信息
? ? 2.3更新trigger信息
? ? 2.3commit事務,釋放鎖
3實例化并執行Job
? ? 3.1從線程池獲取線程執行JobRunShell的run方法
可以看到,這個過程中有兩個相似的過程:同樣是對數據表的更新操作,同樣是在執行操作前獲取鎖 操作完成后釋放鎖.這一規則可以看做是quartz解決集群問題的核心思想.
規則流程圖:
進一步解釋這條規則就是:一個調度器實例在執行涉及到分布式問題的數據庫操作前,首先要獲取QUARTZ2_LOCKS表中對應當前調度器的行級鎖,獲取鎖后即可執行其他表中的數據庫操作,隨著操作事務的提交,行級鎖被釋放,供其他調度器實例獲取.
集群中的每一個調度器實例都遵循這樣一種嚴格的操作規程,那么對于同一類調度器來說,每個實例對數據庫的操作只能是串行的.而不同名的調度器之間卻可以并行執行.
下面我們深入源碼,從微觀上觀察quartz集群調度的細節
調度器實例化
一個最簡單的quartz helloworld應用如下:
public class HelloWorldMain {Log log = LogFactory.getLog(HelloWorldMain.class);public void run() {try {//取得Schedule對象SchedulerFactory sf = new StdSchedulerFactory();Scheduler sch = sf.getScheduler(); JobDetail jd = new JobDetail("HelloWorldJobDetail",Scheduler.DEFAULT_GROUP,HelloWorldJob.class);Trigger tg = TriggerUtils.makeMinutelyTrigger(1);tg.setName("HelloWorldTrigger");sch.scheduleJob(jd, tg);sch.start();} catch ( Exception e ) {e.printStackTrace();}}public static void main(String[] args) {HelloWorldMain hw = new HelloWorldMain();hw.run();} }我們看到初始化一個調度器需要用工廠類獲取實例:
| SchedulerFactory sf =?new?StdSchedulerFactory(); Scheduler sch = sf.getScheduler();? |
然后啟動:
| sch.start(); |
跟進初始化調度器方法sched = instantiate();發現是一個700多行的初始化方法,涉及到
- 讀取配置資源,
- 生成QuartzScheduler對象,
- 創建該對象的運行線程,并啟動線程;
- 初始化JobStore,QuartzScheduler,DBConnectionManager等重要組件,
至此,調度器的初始化工作已完成,初始化工作中quratz讀取了數據庫中存放的對應當前調度器的鎖信息,對應CRM中的表QRTZ2_LOCKS,中的STATE_ACCESS,TRIGGER_ACCESS兩個LOCK_NAME.
當調用sch.start();方法時,scheduler做了如下工作:
1.通知listener開始啟動
2.啟動調度器線程
3.啟動plugin
4.通知listener啟動完成
public void start() throws SchedulerException {if (shuttingDown|| closed) {throw new SchedulerException("The Scheduler cannot be restarted after shutdown() has been called.");}// QTZ-212 : calling new schedulerStarting() method on the listeners// right after entering start()//通知該調度器的listener啟動開始 notifySchedulerListenersStarting();if (initialStart == null) {initialStart = new Date();//啟動調度器的線程this.resources.getJobStore().schedulerStarted(); //啟動plugins startPlugins();} else {resources.getJobStore().schedulerResumed();}schedThread.togglePause(false);getLog().info("Scheduler " + resources.getUniqueIdentifier() + " started.");//通知該調度器的listener啟動完成 notifySchedulerListenersStarted();}調度過程
調度器啟動后,調度器的線程就處于運行狀態了,開始執行quartz的主要工作–調度任務.
前面已介紹過,任務的調度過程大致分為三步:
1.獲取待觸發trigger
2.觸發trigger
3.實例化并執行Job
下面分別分析三個階段的源碼.
QuartzSchedulerThread是調度器線程類,調度過程的三個步驟就承載在run()方法中,分析見代碼注釋:
public void run() {boolean lastAcquireFailed = false;// while (!halted.get()) {try {// check if we're supposed to pause...synchronized (sigLock) {while (paused && !halted.get()) {try {// wait until togglePause(false) is called...sigLock.wait(1000L);} catch (InterruptedException ignore) {}}if (halted.get()) {break;}}/獲取當前線程池中線程的數量int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...List<OperableTrigger> triggers = null;long now = System.currentTimeMillis();clearSignaledSchedulingChange();try {//調度器在trigger隊列中尋找30秒內一定數目的trigger準備執行調度,//參數1:nolaterthan = now+3000ms,參數2 最大獲取數量,大小取線程池線程剩余量與定義值得較小者//參數3 時間窗口 默認為0,程序會在nolaterthan后加上窗口大小來選擇triggertriggers = qsRsrcs.getJobStore().acquireNextTriggers(now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());//上一步獲取成功將失敗標志置為false;lastAcquireFailed = false;if (log.isDebugEnabled())log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");} catch (JobPersistenceException jpe) {if(!lastAcquireFailed) {qs.notifySchedulerListenersError("An error occurred while scanning for the next triggers to fire.",jpe);}//捕捉到異常則值標志為true,再次嘗試獲取lastAcquireFailed = true;continue;} catch (RuntimeException e) {if(!lastAcquireFailed) {getLog().error("quartzSchedulerThreadLoop: RuntimeException "+e.getMessage(), e);}lastAcquireFailed = true;continue;}if (triggers != null && !triggers.isEmpty()) {now = System.currentTimeMillis();long triggerTime = triggers.get(0).getNextFireTime().getTime();long timeUntilTrigger = triggerTime - now;//計算距離trigger觸發的時間while(timeUntilTrigger > 2) {synchronized (sigLock) {if (halted.get()) {break;}//如果這時調度器發生了改變,新的trigger添加進來,那么有可能新添加的trigger比當前待執行的trigger//更急迫,那么需要放棄當前trigger重新獲取,然而,這里存在一個值不值得的問題,如果重新獲取新trigger//的時間要長于當前時間到新trigger出發的時間,那么即使放棄當前的trigger,仍然會導致xntrigger獲取失敗,//但我們又不知道獲取新的trigger需要多長時間,于是,我們做了一個主觀的評判,若jobstore為RAM,那么//假定獲取時間需要7ms,若jobstore是持久化的,假定其需要70ms,當前時間與新trigger的觸發時間之差小于// 這個值的我們認為不值得重新獲取,返回false//這里判斷是否有上述情況發生,值不值得放棄本次trigger,若判定不放棄,則線程直接等待至trigger觸發的時刻if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {try {// we could have blocked a long while// on 'synchronize', so we must recomputenow = System.currentTimeMillis();timeUntilTrigger = triggerTime - now;if(timeUntilTrigger >= 1)sigLock.wait(timeUntilTrigger);} catch (InterruptedException ignore) {}}}//該方法調用了上面的判定方法,作為再次判定的邏輯//到達這里有兩種情況1.決定放棄當前trigger,那么再判定一次,如果仍然有放棄,那么清空triggers列表并// 退出循環 2.不放棄當前trigger,且線程已經wait到trigger觸發的時刻,那么什么也不做if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {break;}now = System.currentTimeMillis();timeUntilTrigger = triggerTime - now;//這時觸發器已經即將觸發,值會<2 }// this happens if releaseIfScheduleChangedSignificantly decided to release triggersif(triggers.isEmpty())continue;// set triggers to 'executing'List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();boolean goAhead = true;synchronized(sigLock) {goAhead = !halted.get();}if(goAhead) {try {//觸發triggers,結果付給bndles,注意,從這里返回后,trigger在數據庫中已經經過了鎖定,解除鎖定,這一套過程//所以說,quratz定不是等到job執行完才釋放trigger資源的占有,而是讀取完本次觸發所需的信息后立即釋放資源//然后再執行jobsList<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);if(res != null)bndles = res;} catch (SchedulerException se) {qs.notifySchedulerListenersError("An error occurred while firing triggers '"+ triggers + "'", se);//QTZ-179 : a problem occurred interacting with the triggers from the db//we release them and loop againfor (int i = 0; i < triggers.size(); i++) {qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));}continue;}}//迭代trigger的信息,分別跑jobfor (int i = 0; i < bndles.size(); i++) {TriggerFiredResult result = bndles.get(i);TriggerFiredBundle bndle = result.getTriggerFiredBundle();Exception exception = result.getException();if (exception instanceof RuntimeException) {getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));continue;}// it's possible to get 'null' if the triggers was paused,// blocked, or other similar occurrences that prevent it being// fired at this time... or if the scheduler was shutdown (halted)//在特殊情況下,bndle可能為null,看triggerFired方法可以看到,當從數據庫獲取trigger時,如果status不是//STATE_ACQUIRED,那么會直接返回空.quratz這種情況下本調度器啟動重試流程,重新獲取4次,若仍有問題,// 則拋出異常.if (bndle == null) {qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));continue;}//執行jobJobRunShell shell = null;try {//創建一個job的Runshellshell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);shell.initialize(qs);} catch (SchedulerException se) {qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);continue;}//把runShell放在線程池里跑if (qsRsrcs.getThreadPool().runInThread(shell) == false) {// this case should never happen, as it is indicative of the// scheduler being shutdown or a bug in the thread pool or// a thread pool being used concurrently - which the docs// say not to do...getLog().error("ThreadPool.runInThread() return false!");qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);}}continue; // while (!halted) }} else { // if(availThreadCount > 0)// should never happen, if threadPool.blockForAvailableThreads() follows contractcontinue; // while (!halted) }//保證負載平衡的方法,每次執行一輪觸發后,本scheduler會等待一個隨機的時間,這樣就使得其他節點上的scheduler可以得到資源.long now = System.currentTimeMillis();long waitTime = now + getRandomizedIdleWaitTime();long timeUntilContinue = waitTime - now;synchronized(sigLock) {try {if(!halted.get()) {// QTZ-336 A job might have been completed in the mean time and we might have// missed the scheduled changed signal by not waiting for the notify() yet// Check that before waiting for too long in case this very job needs to be// scheduled very soonif (!isScheduleChanged()) {sigLock.wait(timeUntilContinue);}}} catch (InterruptedException ignore) {}}} catch(RuntimeException re) {getLog().error("Runtime error occurred in main trigger firing loop.", re);}} // while (!halted)// drop references to scheduler stuff to aid garbage collection...qs = null;qsRsrcs = null;}調度器每次獲取到的trigger是30s內需要執行的,所以要等待一段時間至trigger執行前2ms.在等待過程中涉及到一個新加進來更緊急的trigger的處理邏輯.分析寫在注釋中,不再贅述.
可以看到調度器的只要在運行狀態,就會不停地執行調度流程.值得注意的是,在流程的最后線程會等待一個隨機的時間.這就是quartz自帶的負載平衡機制.
以下是三個步驟的跟進:
觸發器的獲取
調度器調用:
| triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); |
在數據庫中查找一定時間范圍內將會被觸發的trigger.參數的意義如下:參數1:nolaterthan = now+3000ms,即未來30s內將會被觸發.參數2 最大獲取數量,大小取線程池線程剩余量與定義值得較小者.參數3 時間窗口 默認為0,程序會在nolaterthan后加上窗口大小來選擇trigger.quratz會在每次觸發trigger后計算出trigger下次要執行的時間,并在數據庫QRTZ2_TRIGGERS中的NEXT_FIRE_TIME字段中記錄.查找時將當前毫秒數與該字段比較,就能找出下一段時間內將會觸發的觸發器.查找時,調用在JobStoreSupport類中的方法:
public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)throws JobPersistenceException {String lockName;if(isAcquireTriggersWithinLock() || maxCount > 1) {lockName = LOCK_TRIGGER_ACCESS;} else {lockName = null;}return executeInNonManagedTXLock(lockName,new TransactionCallback<List<OperableTrigger>>() {public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);}},new TransactionValidator<List<OperableTrigger>>() {public Boolean validate(Connection conn, List<OperableTrigger> result) throws JobPersistenceException {//...異常處理回調方法 }});}該方法關鍵的一點在于執行了executeInNonManagedTXLock()方法,這一方法指定了一個鎖名,兩個回調函數.在開始執行時獲得鎖,在方法執行完畢后隨著事務的提交鎖被釋放.在該方法的底層,使用 for update語句,在數據庫中加入行級鎖,保證了在該方法執行過程中,其他的調度器對trigger進行獲取時將會等待該調度器釋放該鎖.此方法是前面介紹的quartz集群策略的的具體實現,這一模板方法在后面的trigger觸發過程還會被使用.
| public?static?final?String SELECT_FOR_LOCK =?"SELECT * FROM " ????????????+ TABLE_PREFIX_SUBST + TABLE_LOCKS +?" WHERE "?+ COL_SCHEDULER_NAME +?" = "?+ SCHED_NAME_SUBST ????????????+?" AND "?+ COL_LOCK_NAME +?" = ? FOR UPDATE"; |
進一步解釋:quratz在獲取數據庫資源之前,先要以for update方式訪問LOCKS表中相應LOCK_NAME數據將改行鎖定.如果在此前該行已經被鎖定,那么等待,如果沒有被鎖定,那么讀取滿足要求的trigger,并把它們的status置為STATE_ACQUIRED,如果有tirgger已被置為STATE_ACQUIRED,那么說明該trigger已被別的調度器實例認領,無需再次認領,調度器會忽略此trigger.調度器實例之間的間接通信就體現在這里.
JobStoreSupport.acquireNextTrigger()方法中:
int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);
最后釋放鎖,這時如果下一個調度器在排隊獲取trigger的話,則仍會執行相同的步驟.這種機制保證了trigger不會被重復獲取.按照這種算法正常運行狀態下調度器每次讀取的trigger中會有相當一部分已被標記為被獲取.
獲取trigger的過程進行完畢.
觸發trigger:
QuartzSchedulerThread line336:
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
調用JobStoreSupport類的triggersFired()方法:
public List<TriggerFiredResult> triggersFired(final List<OperableTrigger> triggers) throws JobPersistenceException {return executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS,new TransactionCallback<List<TriggerFiredResult>>() {public List<TriggerFiredResult> execute(Connection conn) throws JobPersistenceException {List<TriggerFiredResult> results = new ArrayList<TriggerFiredResult>();TriggerFiredResult result;for (OperableTrigger trigger : triggers) {try {TriggerFiredBundle bundle = triggerFired(conn, trigger);result = new TriggerFiredResult(bundle);} catch (JobPersistenceException jpe) {result = new TriggerFiredResult(jpe);} catch(RuntimeException re) {result = new TriggerFiredResult(re);}results.add(result);}return results;}},new TransactionValidator<List<TriggerFiredResult>>() {@Overridepublic Boolean validate(Connection conn, List<TriggerFiredResult> result) throws JobPersistenceException {//...異常處理回調方法 }});}此處再次用到了quratz的行為規范:executeInNonManagedTXLock()方法,在獲取鎖的情況下對trigger進行觸發操作.其中的觸發細節如下:
protected TriggerFiredBundle triggerFired(Connection conn,OperableTrigger trigger)throws JobPersistenceException {JobDetail job;Calendar cal = null;// Make sure trigger wasn't deleted, paused, or completed...try { // if trigger was deleted, state will be STATE_DELETEDString state = getDelegate().selectTriggerState(conn,trigger.getKey());if (!state.equals(STATE_ACQUIRED)) {return null;}} catch (SQLException e) {throw new JobPersistenceException("Couldn't select trigger state: "+ e.getMessage(), e);}try {job = retrieveJob(conn, trigger.getJobKey());if (job == null) { return null; }} catch (JobPersistenceException jpe) {try {getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);getDelegate().updateTriggerState(conn, trigger.getKey(),STATE_ERROR);} catch (SQLException sqle) {getLog().error("Unable to set trigger state to ERROR.", sqle);}throw jpe;}if (trigger.getCalendarName() != null) {cal = retrieveCalendar(conn, trigger.getCalendarName());if (cal == null) { return null; }}try {getDelegate().updateFiredTrigger(conn, trigger, STATE_EXECUTING, job);} catch (SQLException e) {throw new JobPersistenceException("Couldn't insert fired trigger: "+ e.getMessage(), e);}Date prevFireTime = trigger.getPreviousFireTime();// call triggered - to update the trigger's next-fire-time state... trigger.triggered(cal);String state = STATE_WAITING;boolean force = true;if (job.isConcurrentExectionDisallowed()) {state = STATE_BLOCKED;force = false;try {getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),STATE_BLOCKED, STATE_WAITING);getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),STATE_BLOCKED, STATE_ACQUIRED);getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),STATE_PAUSED_BLOCKED, STATE_PAUSED);} catch (SQLException e) {throw new JobPersistenceException("Couldn't update states of blocked triggers: "+ e.getMessage(), e);}}if (trigger.getNextFireTime() == null) {state = STATE_COMPLETE;force = true;}storeTrigger(conn, trigger, job, true, state, force, false);job.getJobDataMap().clearDirtyFlag();return new TriggerFiredBundle(job, trigger, cal, trigger.getKey().getGroup().equals(Scheduler.DEFAULT_RECOVERY_GROUP), new Date(), trigger.getPreviousFireTime(), prevFireTime, trigger.getNextFireTime());}該方法做了以下工作:
1.獲取trigger當前狀態
2.通過trigger中的JobKey讀取trigger包含的Job信息
3.將trigger更新至觸發狀態
4.結合calendar的信息觸發trigger,涉及多次狀態更新
5.更新數據庫中trigger的信息,包括更改狀態至STATE_COMPLETE,及計算下一次觸發時間.
6.返回trigger觸發結果的數據傳輸類TriggerFiredBundle
?
從該方法返回后,trigger的執行過程已基本完畢.回到執行quratz操作規范的executeInNonManagedTXLock方法,將數據庫鎖釋放.
trigger觸發操作完成
Job執行過程:
再回到線程類QuartzSchedulerThread的 line353這時觸發器都已出發完畢,job的詳細信息都已就位
QuartzSchedulerThread line:368
?
| qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); shell.initialize(qs); |
?
為每個Job生成一個可運行的RunShell,并放入線程池運行.
在最后調度線程生成了一個隨機的等待時間,進入短暫的等待,這使得其他節點的調度器都有機會獲取數據庫資源.如此就實現了quratz的負載平衡.
這樣一次完整的調度過程就結束了.調度器線程進入下一次循環.
總結:
簡單地說,quartz的分布式調度策略是以數據庫為邊界資源的一種異步策略.各個調度器都遵守一個基于數據庫鎖的操作規則保證了操作的唯一性.同時多個節點的異步運行保證了服務的可靠.但這種策略有自己的局限性.摘錄官方文檔中對quratz集群特性的說明:
Only one node will fire the job for each firing. What I mean by that is, if the job has a repeating trigger that tells it to fire every 10 seconds, then at 12:00:00 exactly one node will run the job, and at 12:00:10 exactly one node will run the job, etc. It won't necessarily be the same node each time - it will more or less be random which node runs it. The load balancing mechanism is near-random for busy schedulers (lots of triggers) but favors the same node for non-busy (e.g. few triggers) schedulers.?
The clustering feature works best for scaling out long-running and/or cpu-intensive jobs (distributing the work-load over multiple nodes). If you need to scale out to support thousands of short-running (e.g 1 second) jobs, consider partitioning the set of jobs by using multiple distinct schedulers (including multiple clustered schedulers for HA). The scheduler makes use of a cluster-wide lock, a pattern that degrades performance as you add more nodes (when going beyond about three nodes - depending upon your database's capabilities, etc.).
說明指出,集群特性對于高cpu使用率的任務效果很好,但是對于大量的短任務,各個節點都會搶占數據庫鎖,這樣就出現大量的線程等待資源.這種情況隨著節點的增加會越來越嚴重.
附:
通訊圖中關鍵步驟的主要sql語句:
3. select TRIGGER_ACCESS from QRTZ2_LOCKS for update 4. SELECT TRIGGER_NAME, TRIGGER_GROUP, NEXT_FIRE_TIME, PRIORITY FROM QRTZ2_TRIGGERS WHERE SCHEDULER_NAME = 'CRMscheduler' AND TRIGGER_STATE = 'ACQUIRED' AND NEXT_FIRE_TIME <= '{timekey 30s latter}' AND ( MISFIRE_INSTR = -1 OR ( MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= '{timekey now}' ) ) ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC; 5. SELECT * FROM QRTZ2_JOB_DETAILS WHERE SCHEDULER_NAME = CRMscheduler AND JOB_NAME = ? AND JOB_GROUP = ?; 6. UPDATE TQRTZ2_TRIGGERS SET TRIGGER_STATE = 'ACQUIRED' WHERE SCHED_NAME = 'CRMscheduler' AND TRIGGER_NAME = '{triggerName}' AND TRIGGER_GROUP = '{triggerGroup}' AND TRIGGER_STATE = 'waiting'; 7. INSERT INTO QRTZ2_FIRED_TRIGGERS (SCHEDULER_NAME, ENTRY_ID, TRIGGER_NAME, TRIGGER_GROUP, INSTANCE_NAME, FIRED_TIME, SCHED_TIME, STATE, JOB_NAME, JOB_GROUP, IS_NONCONCURRENT, REQUESTS_RECOVERY, PRIORITY) VALUES( 'CRMscheduler', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?); 8. commit; 12. select STAT_ACCESS from QRTZ2_LOCKS for update 13. SELECT TRIGGER_STATE FROM QRTZ2_TRIGGERS WHERE SCHEDULER_NAME = 'CRMscheduler' AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?; 14. SELECT TRIGGER_STATE FROM QRTZ2_TRIGGERS WHERE SCHEDULER_NAME = 'CRMscheduler' AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?; 14. SELECT * FROM QRTZ2_JOB_DETAILS WHERE SCHEDULER_NAME = CRMscheduler AND JOB_NAME = ? AND JOB_GROUP = ?; 15. SELECT * FROM QRTZ2_CALENDARS WHERE SCHEDULER_NAME = 'CRMscheduler' AND CALENDAR_NAME = ?; 16. UPDATE QRTZ2_FIRED_TRIGGERS SET INSTANCE_NAME = ?, FIRED_TIME = ?, SCHED_TIME = ?, ENTRY_STATE = ?, JOB_NAME = ?, JOB_GROUP = ?, IS_NONCONCURRENT = ?, REQUESTS_RECOVERY = ? WHERE SCHEDULER_NAME = 'CRMscheduler' AND ENTRY_ID = ?; 17. UPDATE TQRTZ2_TRIGGERS SET TRIGGER_STATE = ? WHERE SCHED_NAME = 'CRMscheduler' AND TRIGGER_NAME = '{triggerName}' AND TRIGGER_GROUP = '{triggerGroup}' AND TRIGGER_STATE = ?; 18. UPDATE QRTZ2_TRIGGERS SET JOB_NAME = ?, JOB_GROUP = ?, DESCRIPTION = ?, NEXT_FIRE_TIME = ?, PREV_FIRE_TIME = ?, TRIGGER_STATE = ?, TRIGGER_TYPE = ?, START_TIME = ?, END_TIME = ?, CALENDAR_NAME = ?, MISFIRE_INSTRUCTION = ?, PRIORITY = ?, JOB_DATAMAP = ? WHERE SCHEDULER_NAME = SCHED_NAME_SUBST AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?; 19. commit;原文地址:http://demo.netfoucs.com/gklifg/article/details/27090179
?
總結
以上是生活随笔為你收集整理的quartz集群调度机制调研及源码分析---转载的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Xcode6中添加pch文件
- 下一篇: docker一次构建,快速部署