原文:http://hot66hot.iteye.com/blog/1726143
在公司分享了Quartz,發布出來,希望大家討論補充.? CRM使用Quartz集群分享? 一:CRM對定時任務的依賴與問題? 二:什么是quartz,如何使用,集群,優化? 三:CRM中quartz與Spring結合使用? 1:CRM對定時任務的依賴與問題? 1)依賴? (1)每天晚上的定時任務,通過sql腳本 + crontab方式執行?
Xml代碼??
#crm?? 0?2?*?*?*?/opt/***/javafiles/***/shell/***_daily_stat.sql?? 30?7?*?*?*?/opt/***/javafiles/***/shell/***_data_fix?? 30?0?*?*?*?/opt/***/javafiles/***/shell/***_sync_log?? 0?1?*?*?*?/opt/***/javafiles/***/shell/***_clear_log?? 20?8?*?*?*?/opt/***/javafiles/***/shell/***_daily?>>?/var/***/logs/***_daily.log?2>&1?? 40?1?*?*?*?/opt/***/javafiles/***/shell/***_sync_account2?? 0?2?*?*?1?/opt/***/javafiles/***/shell/***_weekly?>>?/var/***/logs/***_weekly.log?2>&1?? 存在的問題:當需要跨庫或許數據的,sql無能為力,引入許多中間表,完成復雜統計需求。大范圍對線上熱表掃描,造成鎖表,延遲嚴重? (2)使用python(多數據源) + SQL的方式?
Python代碼??
def?connectCRM():?? ????return?MySQLdb.Connection("localhost",?"***",?"***",?"***",?3306,?charset="utf8")?? ?? def?connectTemp():?? ????return?MySQLdb.Connection("localhost",?"***",?"***",??"***",?3306,?charset="utf8")?? ?? def?connectOA():?? ????return?MySQLdb.Connection("localhost",?"***",?"***",??"***",?3306,?charset="utf8")?? ?? def?connectCore():?? ????return?MySQLdb.Connection("localhost",?"***",?"***",??"***",?3306,?charset="utf8")?? ?? def?connectCT():?? ????return?MySQLdb.Connection("localhost",?"***",?"***",?"***",?3306,?charset="utf8")?? 存在的問題:直接訪問數據,需要理解各系統的數據結構,無法滿足動態任務問題,各系統業務接口沒有重用? (3)使用spring + JDK timer方式調用接口完成定時任務?
Xml代碼??
<bean?id="accountStatusTaskScanner"??class="***.impl.AccountStatusTaskScanner"?/>?? ????<task:scheduler?id="taskScheduler"?pool-size="5"?/>?? ????<task:scheduled-tasks?scheduler="taskScheduler">?? ????<task:scheduled?ref="accountStatusTaskScanner"?method="execute"?cron="0?0?1?*?*??"?/>?? </task:scheduled-tasks>?? 使用寫死服務器Host(srv23)的方式,控制只在一臺服務器上執行task?
Java代碼??
public?abstract?class?SingletonServerTaskScanner?implements?TaskScanner?{?? ????private?final?Logger?logger?=?LoggerFactory.getLogger(SingletonServerTaskScanner.class);?? ????@Override?? ????public?void?execute()?{?? ????????String?hostname?=?"";?? ????????try?{?? ????????????hostname?=?InetAddress.getLocalHost().getHostName();?? ????????}?catch?(UnknownHostException?e)?{?? ????????????logger.error(e.getMessage(),?e);?? ????????}?? ???????? ????????if?(ConfigUtil.getValueByKey("core.scan.server").equals(hostname))?{?? ????????????doScan();?? ????????}?? ????}?? ????public?abstract?void?doScan();?? }?? //對于srv23的重啟,保存在內存中的任務將丟失,每次重啟srv23重新生成定時任務?
Java代碼??
public?class?CrmInitializer?implements?InitializingBean?{?? ????private?Logger?logger?=?LoggerFactory.getLogger(CrmInitializer.class);?? ????@Override?? ????public?void?afterPropertiesSet()?throws?Exception?{?? ???????? ????????logger.info("掃描商家狀態,創建定時任務");?? ????????accountStatusTaskScanner.execute();?? ???????? ????????logger.info("掃描N天未拜訪商家,創建定時任務");?? ????????nDaysActivityScanner.execute();?? ????}?? }?? ?
Java代碼??
public?class?SingletonServerTaskController?{?? ????????@Resource?? ????????private?AccountService?accountService;?? ????????@RequestMapping(value?=?"/reschedule")?? ????????public?@ResponseBody?? ????????????String?checkAndRescheduleAccount(Integer?accountId)?{?? ????????????logger.debug("reschedule?task?for?accountId:"?+?accountId);?? ????????????if?(isCurrentServer())?{?? ????????????????accountService.checkAndRescheduleAccount(Arrays.asList(accountId));?? ????????????}?? ????????????return?"ok";?? ????????}?? ????private?boolean?isCurrentServer()?{?? ????????String?hostname?=?"";?? ????????try?{?? ????????????hostname?=?InetAddress.getLocalHost().getHostName();?? ????????}?catch?(UnknownHostException?e)?{?? ????????????logger.error(e.getMessage(),?e);?? ????????}?? ????????if?(ConfigUtil.getValueByKey("core.scan.server").equals(hostname))?{?? ????????????return?true;?? ????????}?else?{?? ????????????return?false;?? ????????}?? ????}?? }?? 存在的問題:實現步驟復雜,分散,任務調度不能恢復,嚴重依賴于srv23,回調URL時可能失敗等情況。? CRM定時任務走過了很多彎路:? 定時任務多種實現方式,使配置和代碼分散在多處,難以維護和監控? 任務執行過程沒有保證,沒有錯誤恢復? 任務執行異常沒有反饋(郵件)? 沒有集群支持? CRM需要分布式的任務調度框架,統一解決問題.? JAVA可以使用的任務調度框架:Quartz , Jcrontab , cron4j , taobao-pamirs-schedule? 為什么選擇Quartz:? 1)資歷夠老,創立于1998年,比struts1還早,但是一直在更新(27 April 2012: Quartz 2.1.5 Released),文檔齊全.? 2)完全由Java寫成,設計用于J2SE和J2EE應用.方便集成:JVM,RMI.? 3)設計清晰簡單:核心概念scheduler,trigger,job,jobDetail,listener,calendar? 4)支持集群:org.quartz.jobStore.isClustered? 5)支持任務恢復:requestsRecovery? 從http://www.quartz-scheduler.org 獲取最新Quartz? 1)學習Quartz? ? 圖1 介紹了quartz關鍵的組件和簡單流程? (1)Quartz 的目錄結構和內容? docs/api????????????????????????????????????? Quartz 框架的JavaDoc Api 說明文檔? docs/dbTables??????????????????????????? 創建 Quartz 的數據庫對象的腳本? docs/wikidocs???????????????????????????? Quartz 的幫助文件,點擊 index.html 開始查看? Examples??????????????????????????????????? 多方面使用 Quartz 的例子Lib Quartz 使用到的第三方包? src/java/org/quartz????????????????????? 使用 Quartz 的客戶端程序源代碼,公有 API? src/java/org/quartz/core????????????? 使用 Quartz 的服務端程序源代碼,私有 API? src/java/org/quartz/simpl??????????? Quartz 提供的不衣賴于第三方產品的簡單實現? src/java/org/quartz/impl????????????? 依賴于第三方產品的支持模塊的實現? src/java/org/quartz/utils????????????? 整個框架要用到的輔助類和工具組件? src/jboss???????????????????????????????????? 提供了特定于 JBoss 特性的源代碼? src/oracle?????????????????????????????????? 提供了特定于 Oracle 特性的源代碼? src/weblogic????????????????????????????? 提供了特定于 WebLogic 特性的源代碼? Quartz 框架包含許多的類和接口,它們分布在大概 11 個包中。多數所要使用到的類或接口放置在 org.quartz 包中。這個包含蓋了 Quartz 框架的公有 API.? (2)Quartz核心接口 Scheduler? 圖2? Scheduler 是 Quartz 的主要 API。與Quartz大部分交互是發生于 Scheduler 之上的。客服端與Scheduler 交互是通過org.quartz.Scheduler接口。? Scheduler的實現:對方法調用會傳遞到 QuartzScheduler 實例上。QuartzScheduler 對于客戶端是不可見的,并且也不存在與此實例的直接交互。? ? 圖3? 創建Scheduler? Quartz 框架提供了 org.quartz.SchedulerFactory 接口。? SchedulerFactory 實例就是用來產生 Scheduler 實例的。當 Scheduler 實例被創建之后,就會存到一個倉庫中(org.quartz.impl.SchedulerRepository).? Scheduler 工廠分別是 org.quartz.impl.DirectSchedulerFactory 和 org.quartz.impl.StdSchedulerFactory? DirectSchedulerFactory 是為精細化控制 Scheduler 實例產生的工廠類,一般不用,不過有利于理解quartz內部組件。?
Java代碼??
--?最簡單?? public?void?createScheduler(ThreadPool?threadPool,?JobStore?jobStore);?? --?最復雜?? public?void?createScheduler(String?schedulerName,?String?schedulerInstanceId,ThreadPool?threadPool,?JobStore?jobStore,?String?rmiRegistryHost,?int?rmiRegistryPort);?? ?
Java代碼??
public?scheduler?createScheduler(){?? ?DirectSchedulerFactory?factory=DirectSchedulerFactory.getInstance();?? ?try?{?? ???? ????SimpleThreadPool?threadPool?=?new?SimpleThreadPool(10,?Thread.NORM_PRIORITY);?? ????threadPool.initialize();?? ???? ????JobStoreTX?jdbcJobStore?=?new?JobStoreTX();?? ????jdbcJobStore.setDataSource("someDatasource");?? ????????jdbcJobStore.setPostgresStyleBlobs(true);?? ????????jdbcJobStore.setTablePrefix("QRTZ_");?? ????????jdbcJobStore.setInstanceId("My?Instance");?? ?????? ????logger.info("Scheduler?starting?up...");?? ????factory.createScheduler(threadPool,jdbcJobStore);?? ???? ????????Scheduler?scheduler?=?factory.getScheduler();?? ?? ???? ????????scheduler.start();?? ????????return?scheduler;?? ????}?? ????????return?null;?? }?? org.quartz.impl.StdSchedulerFactory 依賴于屬性類(Properties)決定如何生產 Scheduler 實例? 通過加載屬性文件,Properties 提供啟動參數:?
Java代碼??
public?scheduler?createScheduler(){?? ???? ????StdSchedulerFactory?factory?=?new?StdSchedulerFactory();?? ?????? ???? ????Properties?props?=?new?Properties();?? ???? ????props.put(StdSchedulerFactory.PROP_THREAD_POOL_CLASS,"org.quartz.simpl.SimpleThreadPool");?? ????props.put("org.quartz.threadPool.threadCount",?"10");?? ?????? ????try?{?? ???????? ????????factory.initialize(props);?? ????????Scheduler?scheduler?=?factory.getScheduler();?? ????????logger.info("Scheduler?starting?up...");?? ????????scheduler.start();?? ????}?catch?(SchedulerException?ex)?{?? ????????logger.error(ex);?? ????}?? }?? 調用靜態方法 getDefaultScheduler() 方法中調用了空的構造方法。如果之前未調用過任何一個 initialize() 方法,那么無參的initialize() 方法會被調用。這會開始去按照下面說的順序加載文件。? 默認情況下,quartz.properties 會被定位到,并從中加載屬性。? properties加載順序:? 1. 檢查 System.getProperty("org.quartz.properties") 中是否設置了別的文件名? 2. 否則,使用 quartz.properties 作為要加載的文件名? 3. 試圖從當前工作目錄中加載這個文件? 4. 試圖從系統 classpath 下加載這個文件? 在 Quartz Jar 包中有一個默認的 quartz.properties 文件? 默認配置如下? # Default Properties file for use by StdSchedulerFactory? # to create a Quartz Scheduler Instance, if a different? # properties file is not explicitly specified.? org.quartz.scheduler.instanceName = DefaultQuartzScheduler? org.quartz.scheduler.rmi.export = false? org.quartz.scheduler.rmi.proxy = false? org.quartz.scheduler.wrapJobExecutionInUserTransaction = false? org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool? org.quartz.threadPool.threadCount = 10? org.quartz.threadPool.threadPriority = 5? org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true? org.quartz.jobStore.misfireThreshold = 60000? org.quartz.jobStore.class = org.quartz.simpl.RAMJobStore? 到此創建Scheduler完成? 通過Scheduler理解Quartz? Scheduler 的 API 可以分組成以下三個類別:? ·管理 Scheduler? (1)啟動 Scheduler?
Java代碼??
?scheduler.start();?? start() 方法被調用,Scheduler 就開始搜尋需要執行的 Job。在你剛得到一個 Scheduler 新的實例時,或者 Scheduler? 被設置為 standby 模式后,你才可以調用 start() 方法。?
Java代碼??
public?void?standby()?throws?SchedulerException;?? 只要調用了 shutdown() 方法之后,你就不能再調用 Scheduler 實例的 start() 方法了。? 這是因為 shutdown() 方法銷毀了為 Scheduler 創建的所有的資源(線程,數據庫連接等)。? 你可能需要Standby 模式:設置 Scheduler 為 standby 模式會導致 Scheduler搜尋要執行的 Job 的線程被暫停下來? 停止 Scheduler?
Java代碼??
public?void?shutdown(boolean?waitForJobsToComplete)?throws?SchedulerException;?? public?void?shutdown()?throws?SchedulerException;?? 其它管理Scheduler 方法見API...? 管理 Job? 什么是 Quartz Job?? 一個Quart Job就是一個任何一個繼承job或job子接口的Java類,你可以用這個類做任何事情!? org.quartz.Job 接口?
Java代碼??
public?void?execute(JobExecutionContext?context)throws?JobExecutionException;?? JobExecutionContext?? 當 Scheduler 調用一個 Job,一個 JobexecutionContext 傳遞給 execute() 方法。JobExecutionContext 對象讓 Job 能? 訪問 Quartz 運行時候環境和 Job 本身的數據。類似于在 Java Web 應用中的 servlet 訪問 ServletContext 。? 通過 JobExecutionContext,Job 可訪問到所處環境的所有信息,包括注冊到 Scheduler 上與該 Job 相關聯的 JobDetail 和 Trigger。? JobDetail? 部署在 Scheduler 上的每一個 Job 只創建了一個 JobDetail實例。JobDetail 是作為 Job 實例進行定義的? // Create the JobDetail? JobDetail jobDetail = new JobDetail("PrintInfoJob",Scheduler.DEFAULT_GROUP, PrintInfoJob.class);? // Create a trigger that fires now and repeats forever? Trigger trigger = TriggerUtils.makeImmediateTrigger(? SimpleTrigger.REPEAT_INDEFINITELY, 10000);? trigger.setName("PrintInfoJobTrigger");// register with the Scheduler? scheduler.scheduleJob(jobDetail, trigger);? JobDetail 被加到 Scheduler 中了,而不是 job。Job 類是作為 JobDetail 的一部份,job直到Scheduler準備要執行它的時候才會被實例化的,因此job不存在線成安全性問題.? 使用 JobDataMap 對象設定 Job 狀態?
Java代碼??
public?void?executeScheduler()?throws?SchedulerException{?? ????scheduler?=?StdSchedulerFactory.getDefaultScheduler();?? ????scheduler.start();?? ????logger.info("Scheduler?was?started?at?"?+?new?Date());?? ???? ????JobDetail?jobDetail?=?new?JobDetail("PrintJobDataMapJob",Scheduler.DEFAULT_GROUP,PrintJobDataMapJob.class);?? ???? ????jobDetail.getJobDataMap().put("name",?"John?Doe");?? ????jobDetail.getJobDataMap().put("age",?23);?? ????jobDetail.getJobDataMap().put("balance",new?BigDecimal(1200.37));?? ???? ????Trigger?trigger?=?TriggerUtils.makeImmediateTrigger(0,?10000);?? ????trigger.setName("PrintJobDataMapJobTrigger");?? ????scheduler.scheduleJob(jobDetail,?trigger);?? }?? public?class?PrintJobDataMapJob?implements?Job?{?? ????public?void?execute(JobExecutionContext?context)throws?JobExecutionException?{?? ????????logger.info("in?PrintJobDataMapJob");?? ???????? ????????JobDataMap?jobDataMap?=?context.getJobDetail().getJobDataMap();?? ???????? ????????Iterator?iter?=?jobDataMap.keySet().iterator();?? ????????while?(iter.hasNext())?{?? ????????????Object?key?=?iter.next();?? ????????????Object?value?=?jobDataMap.get(key);?? ????????????logger.info("Key:?"?+?key?+?"?-?Value:?"?+?value);?? ????????}?? ????}?? ?}?? 在Quartz 1.5之后,JobDataMap在 Trigger 級也是可用的。它的用途類似于Job級的JobDataMap,支持在同一個JobDetail上的多個Trigger。? 伴隨著加入到 Quartz 1.5 中的這一增強特性,可以使用 JobExecutionContext 的一個新的更方便的方法獲取到 Job 和 Trigger 級的并集的 map 中的值。? 這個方法就是getMergedJobDataMap() 取job 和 Trigger級的并集map,它能夠在 Job 中使用。管法推薦使用這個方法.? * 實際使用中trigger級別有時取不到map中的值, 使用getMergedJobDataMap 可以獲取到(官方推薦此方法).? 有狀態的Job: org.quartz.StatefulJob 接口? 當需要在兩次 Job 執行間維護狀態,使用StatefulJob 接口.? Job 和 StatefulJob 在框架中使用中存在兩個關鍵差異。? (一) JobDataMap 在每次執行之后重新持久化到 JobStore 中。這樣就確保你對 Job 數據的改變直到下次執行仍然保持著。? (二) 兩個或多個有狀態的 JobDetail 實例不能并發執行。保證JobDataMap線程安全? 注意:實際使用時使用jobStoreTX/jobStoreCMT ,StatefulJob,大量的trigger對應一個JobDetail的情況下Mysql會產生鎖超時問題.? 中斷 Job? Quartz 包括一個接口叫做 org.quartz.InterruptableJob,它擴展了普通的 Job 接口并提供了一個 interrupt() 方法: 沒有深入研究,只知道 Scheduler會調用自定義的Job的 interrupt()方法。由用戶決定 Job 決定如何中斷.沒有測試!!! job的特性? 易失性 volatility? 一個易失性的 Job 是在程序關閉之后不會被持久化。一個 Job 是通過調用 JobDetail 的 setVolatility(true)被設置為易失.? Job易失性的默認值是 false.? 注意:只有采用持久性JobStore時才有效? Job 持久性 durability? 設置JobDetail 的 setDurability(false),在所有的觸發器觸發之后JobDetail將從 JobStore 中移出。? Job持久性默認值是false.? Scheduler將移除沒有trigger關聯的jobDetail? Job 可恢復性 shuldRecover? 當一個Job在執行中,Scheduler非正常的關閉,設置JobDetail 的setRequestsRecovery(true) 在 Scheduler 重啟之后可恢復的Job還會再次被執行。這個? Job 會重新開始執行。注意job代碼事務特性.? Job可恢復性默認為false,Scheduler不會試著去恢復job操作。? ? 圖為表述沒有執行完成的job數據庫記錄? Scheduler 中移除 Job? 移除所有與這個 Job 相關聯的 Trigger;如果這個 Job 是非持久性的,它將會從 Scheduler 中移出。? 更直接的方式是使用 deleteJob() 方法,它還會刪除所有與當前job關聯的trigger? public boolean deleteJob(String jobName, String groupName) throws SchedulerException;? quartz 本身提供的 Job? org.quartz.jobs.FileScanJob 檢查某個指定文件是否變化,并在文件被改變時通知到相應監聽器的 Job? org.quartz.jobs.FileScanListener 在文件被修改后通知 FileScanJob 的監聽器? org.quartz.jobs.NativeJob 用來執行本地程序(如 windows 下 .exe 文件) 的 Job? org.quartz.jobs.NoOpJob 什么也不做,但用來測試監聽器不是很有用的。一些用戶甚至僅僅用它來導致一個監聽器的運行? org.quartz.jobs.ee.mail.SendMailJob 使用 JavaMail API 發送 e-mail 的 Job? org.quartz.jobs.ee.jmx.JMXInvokerJob 調用 JMX bean 上的方法的 Job? org.quartz.jobs.ee.ejb.EJBInvokerJob 用來調用 EJB 上方法的 Job? job的理解到此結束? 理解quartz Trigger? Job 包含了要執行任務的邏輯,但是Job不負責何時執行。這個事情由觸發器(Trigger)負責。? Quartz Trigger繼承了抽象的org.quartz.Trigger 類。? 目前,Quartz 有三個可用的實現? org.quartz.SimpleTrigger? org.quartz.CronTrigger? org.quartz.NthIncludeDayTrigger? 使用org.quartz.SimpleTrigger? SimpleTrigger 是設置和使用是最為簡單的一種 Quartz Trigger。它是為那種需要在特定的日期/時間啟動,且以一個可能的間隔時間重復執行 n 次的 Job 所設計的。? SimpleTrigger 存在幾個變種的構造方法。他們是從無參的版本一直到帶全部參數的版本。? 下面代碼版斷顯示了一個僅帶有trigger 的名字和組的簡單構造方法? SimpleTrigger sTrigger = new SimpleTrigger("myTrigger", Scheduler.DEFAULT_GROUP);? 這個 Trigger 會立即執行,而不重復。還有一個構造方法帶有多個參數,配置 Triiger 在某一特定時刻觸發,重復執行多次,和兩? 次觸發間的延遲時間。?
Java代碼??
public?SimpleTrigger(String?name,?String?group,String?jobName,?String?jobGroup,?? ?Date?startTime,Date?endTime,?int?repeatCount,?long?repeatInterval);?? 使用org.quartz.CronTrigger? CronTrigger 是基于 Unix 類似于 cron 的表達式觸發,也是功能最強大和最常用的Trigger? Cron表達式:?
Java代碼??
"0?0?12?*?*??"?????????????????????Fire?at?12pm?(noon)?every?day?? "0?15?10???*?*"???????????????????Fire?at?10:15am?every?day?? "0?15?10?*?*??"???????????????????Fire?at?10:15am?every?day?? "0?15?10?*?*???*"?????????????????Fire?at?10:15am?every?day?? "0?15?10?*?*???2005"???????????Fire?at?10:15am?every?day?during?the?year?2005?? "0?*?14?*?*??"?????????????????????Fire?every?minute?starting?at?2pm?and?ending?at?2:59pm,?every?day?? "0?0/5?14?*?*??"??????????????????Fire?every?5?minutes?starting?at?2pm?and?ending?at?2:55pm,?every?day?? "0?0/5?14,18?*?*??"??????????????Fire?every?5?minutes?starting?at?2pm?and?ending?at?2:55pm,?AND?fire?every?5?minutes?starting?at?6pm?and?ending?at?6:55pm,?every?day?? "0?0-5?14?*?*??"???????????????????Fire?every?minute?starting?at?2pm?and?ending?at?2:05pm,?every?day?? "0?10,44?14???3?WED"?????????Fire?at?2:10pm?and?at?2:44pm?every?Wednesday?in?the?month?of?March.?? "0?15?10???*?MON-FRI"????????Fire?at?10:15am?every?Monday,?Tuesday,?Wednesday,?Thursday?and?Friday?? "0?15?10?15?*??"??????????????????Fire?at?10:15am?on?the?15th?day?of?every?month?? "0?15?10?L?*??"????????????????????Fire?at?10:15am?on?the?last?day?of?every?month?? "0?15?10???*?6L"???????????????????Fire?at?10:15am?on?the?last?Friday?of?every?month?? "0?15?10???*?6L"???????????????????Fire?at?10:15am?on?the?last?Friday?of?every?month?? "0?15?10???*?6L?2002-2005"???Fire?at?10:15am?on?every?last?Friday?of?every?month?during?the?years?2002,?2003,?2004?and?2005?? "0?15?10???*?6#3"?????????????????Fire?at?10:15am?on?the?third?Friday?of?every?month?? 使用 org.quartz.NthIncludedDayTrigger? org.quartz.NthIncludedDayTrigger是設計用于在每一間隔類型的第幾天執行 Job。? 例如,你要在每個月的 12 號執行發工資提醒的Job。接下來的代碼片斷描繪了如何創建一個 NthIncludedDayTrigger.?
Java代碼??
NthIncludedDayTrigger?trigger?=?new?NthIncludedDayTrigger("MyTrigger",?Scheduler.DEFAULT_GROUP);?? trigger.setN(12);?? trigger.setIntervalType(NthIncludedDayTrigger.INTERVAL_TYPE_MONTHLY);?? jobDetail + trigger組成最基本的定時任務:? 特別注意:一個job可以對應多個Trgger , 一個Trigger只能對應一個job .? 如:CRM中N天未拜訪的job對應所有的N天未拜訪商家(一個商家一個trigger) 大約1:1000的比例? ??? job和trigger都是通過name 和 group 屬性確定唯一性的.? Quartz Calendar? Quartz 的 Calendar 對象與 Java API 的 java.util.Calendar不同。? Java 的 Calender 對象是通用的日期和時間工具;? Quartz 的 Calender 專門用于屏閉一個時間區間,使 Trigger 在這個區間中不被觸發。? 例如,讓我們假如取消節假日執行job。? Quartz包括許多的 Calender 實現足以滿足大部分的需求.? org.quartz.impl.calendar.BaseCalender 為高級的 Calender 實現了基本的功能,實現了 org.quartz.Calender 接口? org.quartz.impl.calendar.WeeklyCalendar 排除星期中的一天或多天,例如,可用于排除周末? org.quartz.impl.calendar.MonthlyCalendar 排除月份中的數天,例如,可用于排除每月的最后一天? org.quartz.impl.calendar.AnnualCalendar 排除年中一天或多天? org.quartz.impl.calendar.HolidayCalendar 特別的用于從 Trigger 中排除節假日? 使用Calendar,只需實例化后并加入你要排除的日期,然后用 Scheduler 注冊,最后必須讓Calender依附于Trigger實例。? 排除國慶節實例?
Java代碼??
private?void?scheduleJob(Scheduler?scheduler,?Class?jobClass)?{?? ????try?{?? ???????? ????????AnnualCalendar?cal?=?new?AnnualCalendar();?? ???????? ????????Calendar?gCal?=?GregorianCalendar.getInstance();?? ????????gCal.set(Calendar.MONTH,?Calendar.OCTOBER);?? ????????List<Calendar>?mayHolidays?=?new?ArraysList<Calendar>();?? ????????for(int?i=1;?i<=7;?i++){?? ????????????gCal.set(Calendar.DATE,?i);?? ????????????mayHolidays.add(gCal);?? ????????}?? ????????cal.setDaysExcluded(mayHolidays);?? ???????? ????????scheduler.addCalendar("crmHolidays",?cal,?true,?true);?? ???????? ????????Trigger?trigger?=?TriggerUtils.makeImmediateTrigger("myTrigger",-1,60000);?? ???????? ????????trigger.setCalendarName("crmHolidays");?? ????????JobDetail?jobDetail?=?new?JobDetail(jobClass.getName(),?Scheduler.DEFAULT_GROUP,?jobClass);?? ???????? ????????scheduler.scheduleJob(jobDetail,?trigger);?? ????}?catch?(SchedulerException?ex)?{?? ????????logger.error(ex);?? ????}?? }?? Quartz 監聽器? Quartz 提供了三種類型的監聽器:監聽Job,監聽Trigger,和監聽Scheduler.? 監聽器是作為擴展點存在的.? Quartz 監聽器是擴展點,可以擴展框架并定制來做特定的事情。跟Spring,Hibernate,Servlet監聽器類似.? 實現監聽? 1. 創建一個 Java 類,實現監聽器接口? 2. 用你的應用中特定的邏輯實現監聽器接口的所有方法? 3. 注冊監聽器? 全局和非全局監聽器? JobListener 和 TriggerListener 可被注冊為全局或非全局監聽器。一個全局監聽器能接收到所有的 Job/Trigger 的事件通知。? 而一個非全局監聽器只能接收到那些在其上已注冊了監聽器的 Job 或 Triiger 的事件。? 作者:James House描述全局和非全局監聽器? 全局監聽器是主動意識的,它們為了執行它們的任務而熱切的去尋找每一個可能的事件。通常,全局監聽器要做的工作不用指定到特定的 Job 或 Trigger。? 非全局監聽器一般是被動意識的,它們在所關注的 Trigger 激發之前或是 Job 執行之前什么事也不做。因此,非全局的監聽器比起全局監聽器而言更適合于修改或增加 Job 執行的工作。? 類似裝飾設計模式? 監聽 Job 事件? org.quartz.JobListener 接口包含一系列的方法,它們會由 Job 在其生命周期中產生的某些關鍵事件時被調用?
Java代碼??
public?interface?JobListener?{?? ???? ????public?String?getName();?? ?? ???? ????public?void?jobToBeExecuted(JobExecutionContext?context);?? ?? ????public?void?jobExecutionVetoed(JobExecutionContext?context);?? ?? ????public?void?jobWasExecuted(JobExecutionContext?context,JobExecutionException?jobException);?? ? 圖7 job listener參與job的執行生命周期? 注冊全局監聽器?
Java代碼??
scheduler.addGlobalJobListener(jobListener);?? 注冊非全局監聽器(依次完成,順序不能顛倒)?
Java代碼??
scheduler.addJobListener(jobListener);?? jobDetail.addJobListener(jobListener.getName());?? scheduler.addjob(jobDetail,true);?? 監聽 Trigger 事件? org.quartz.TriggerListener 接口定義Trigger監聽器?
Java代碼??
public?interface?TriggerListener?{?? ???? ????public?String?getName();?? ?? ???? ???? ????public?void?triggerFired(Trigger?trigger,?JobExecutionContext?context);?? ?? ???? ???? ????public?boolean?vetoJobExecution(Trigger?trigger,?JobExecutidonContext?context);?? ?? ???? ???? ????public?void?triggerMisfired(Trigger?trigger);?? ?? ???? ????public?void?triggerComplete(Trigger?trigger,?JobExecutionContext?context,?int?triggerInstructionCode);?? }?? triggerListener的注冊與jobListener相同? 監聽 Scheduler 事件? org.quartz.SchedulerListener 接口定義Trigger監聽器?
Java代碼??
public?interface?SchedulerListener?{?? ???? ????public?void?jobScheduled(Trigger?trigger);?? ?? ???? ????public?void?jobUnscheduled(String?triggerName,?String?triggerGroup);?? ?? ???? ????public?void?triggerFinalized(Trigger?trigger);?? ?? ???? ????public?void?triggersPaused(String?triggerName,?String?triggerGroup);?? ?? ???? ????public?void?triggersResumed(String?triggerName,String?triggerGroup);?? ?? ???? ????public?void?jobsPaused(String?jobName,?String?jobGroup);?? ?? ???? ????public?void?jobsResumed(String?jobName,?String?jobGroup);?? ?? ???? ???? ????public?void?schedulerError(String?msg,?SchedulerException?cause);?? ?? ???? ????public?void?schedulerShutdown();?? }?? 注冊SchedulerListener(SchedulerListener不存在全局非全局性)? scheduler.addSchedulerListener(schedulerListener);? 由于scheduler異常存在不打印問題,CRM使用監聽器代碼打印.?
Java代碼??
public?class?QuartzExceptionSchedulerListener?extends?SchedulerListenerSupport{?? ????private?Logger?logger?=?LoggerFactory.getLogger(QuartzExceptionSchedulerListener.class);?? ????@Override?? ????public?void?schedulerError(String?message,?SchedulerException?e)?{?? ????????super.schedulerError(message,?e);?? ????????logger.error(message,?e.getUnderlyingException());?? ????}?? }?? ?
Java代碼??
<bean??id="quartzExceptionSchedulerListener"??class="com.***.crm.quartz.listener.QuartzExceptionSchedulerListener"></bean>?? <!--?配置監聽器?-->?? <property?name="schedulerListeners">?? ????<list>?? ????????<ref?bean="quartzExceptionSchedulerListener"/>?? ????</list>?? </property>?? quartz與線程? 主處理線程:QuartzSchedulerThread? 啟動Scheduler時。QuartzScheduler被創建并創建一個org.quartz.core.QuartzSchedulerThread 類的實例。? QuartzSchedulerThread 包含有決定何時下一個Job將被觸發的處理循環。QuartzSchedulerThread 是一個 Java 線程。它作為一個非守護線程運行在正常優先級下。? QuartzSchedulerThread 的主處理輪循步驟:? 1. 當 Scheduler 正在運行時:? A. 檢查是否有轉換為 standby 模式的請求。? 1. 假如 standby 方法被調用,等待繼續的信號? B. 詢問 JobStore 下次要被觸發的 Trigger.? 1. 如果沒有 Trigger 待觸發,等候一小段時間后再次檢查? 2. 假如有一個可用的 Trigger,等待觸發它的確切時間的到來? D. 時間到了,為 Trigger 獲取到 triggerFiredBundle.? E. 使用Scheduler和triggerFiredBundle 為 Job 創建一個JobRunShell實例? F. 在ThreadPool 申請一個線程運行 JobRunShell 實例.? 代碼邏輯在QuartzSchedulerThread 的 run() 中,如下:?
Java代碼??
???public?void?run()?{?? ???????boolean?lastAcquireFailed?=?false;?? ???????while?(!halted.get())?{?? ???????????try?{?? ??????????????? ???????????????synchronized?(sigLock)?{?? ???????????????????while?(paused?&&?!halted.get())?{?? ???????????????????????try?{?? ??????????????????????????? ???????????????????????????sigLock.wait(1000L);?? ???????????????????????}?catch?(InterruptedException?ignore)?{?? ???????????????????????}?? ???????????????????}?? ????? ???????????????????if?(halted.get())?{?? ???????????????????????break;?? ???????????????????}?? ???????????????}?? ?? ???????????????int?availTreadCount?=?qsRsrcs.getThreadPool().blockForAvailableThreads();?? ???????????????if(availTreadCount?>?0)?{? ???????????????????Trigger?trigger?=?null;?? ?? ???????????????????long?now?=?System.currentTimeMillis();?? ???????????????????clearSignaledSchedulingChange();?? ???????????????????try?{?? ???????????????????????trigger?=?qsRsrcs.getJobStore().acquireNextTrigger(?? ???????????????????????????????ctxt,?now?+?idleWaitTime);?? ???????????????????????lastAcquireFailed?=?false;?? ???????????????????}?catch?(JobPersistenceException?jpe)?{?? ???????????????????????if(!lastAcquireFailed)?{?? ???????????????????????????qs.notifySchedulerListenersError(?? ???????????????????????????????"An?error?occured?while?scanning?for?the?next?trigger?to?fire.",?? ???????????????????????????????jpe);?? ???????????????????????}?? ???????????????????????lastAcquireFailed?=?true;?? ???????????????????}?catch?(RuntimeException?e)?{?? ???????????????????????if(!lastAcquireFailed)?{?? ???????????????????????????getLog().error("quartzSchedulerThreadLoop:?RuntimeException?"?? ???????????????????????????????????+e.getMessage(),?e);?? ???????????????????????}?? ???????????????????????lastAcquireFailed?=?true;?? ???????????????????}?? ?? ???????????????????if?(trigger?!=?null)?{?? ???????????????????????now?=?System.currentTimeMillis();?? ???????????????????????long?triggerTime?=?trigger.getNextFireTime().getTime();?? ???????????????????????long?timeUntilTrigger?=?triggerTime?-?now;?? ???????????????????????while(timeUntilTrigger?>?2)?{?? ????????????????????????synchronized(sigLock)?{?? ????????????????????????????if(!isCandidateNewTimeEarlierWithinReason(triggerTime,?false))?{?? ????????????????????????????????try?{?? ???????????????????????????????????? ???????????????????????????????????? ????????????????????????????????????now?=?System.currentTimeMillis();?? ????????????????????????????????????timeUntilTrigger?=?triggerTime?-?now;?? ????????????????????????????????????if(timeUntilTrigger?>=?1)?? ????????????????????????????????????????sigLock.wait(timeUntilTrigger);?? ????????????????????????????????}?catch?(InterruptedException?ignore)?{?? ????????????????????????????????}?? ????????????????????????????}?? ????????????????????????}????????????????????????????????? ????????????????????????if(releaseIfScheduleChangedSignificantly(trigger,?triggerTime))?{?? ????????????????????????????trigger?=?null;?? ????????????????????????????break;?? ????????????????????????}?? ????????????????????????now?=?System.currentTimeMillis();?? ????????????????????????timeUntilTrigger?=?triggerTime?-?now;?? ???????????????????????}?? ???????????????????????if(trigger?==?null)?? ????????????????????????continue;?? ????????????????????????? ??????????????????????? ???????????????????????TriggerFiredBundle?bndle?=?null;?? ?? ???????????????????????boolean?goAhead?=?true;?? ???????????????????????synchronized(sigLock)?{?? ????????????????????????goAhead?=?!halted.get();?? ???????????????????????}?? ?? ???????????????????????if(goAhead)?{?? ???????????????????????????try?{?? ???????????????????????????????bndle?=?qsRsrcs.getJobStore().triggerFired(ctxt,?? ???????????????????????????????????????trigger);?? ???????????????????????????}?catch?(SchedulerException?se)?{?? ???????????????????????????????qs.notifySchedulerListenersError(?? ???????????????????????????????????????"An?error?occured?while?firing?trigger?'"?? ???????????????????????????????????????????????+?trigger.getFullName()?+?"'",?se);?? ???????????????????????????}?catch?(RuntimeException?e)?{?? ???????????????????????????????getLog().error(?? ???????????????????????????????????"RuntimeException?while?firing?trigger?"?+?? ???????????????????????????????????trigger.getFullName(),?e);?? ??????????????????????????????? ??????????????????????????????? ???????????????????????????????releaseTriggerRetryLoop(trigger);?? ???????????????????????????}?? ???????????????????????}?? ????????????????????????? ??????????????????????? ??????????????????????? ??????????????????????? ???????????????????????if?(bndle?==?null)?{?? ???????????????????????????try?{?? ???????????????????????????????qsRsrcs.getJobStore().releaseAcquiredTrigger(ctxt,?? ???????????????????????????????????????trigger);?? ???????????????????????????}?catch?(SchedulerException?se)?{?? ???????????????????????????????qs.notifySchedulerListenersError(?? ???????????????????????????????????????"An?error?occured?while?releasing?trigger?'"?? ???????????????????????????????????????????????+?trigger.getFullName()?+?"'",?se);?? ??????????????????????????????? ??????????????????????????????? ???????????????????????????????releaseTriggerRetryLoop(trigger);?? ???????????????????????????}?? ???????????????????????????continue;?? ???????????????????????}?? ?? ??????????????????????? ??????????????????????? ??????????????????????? ??????????????????????? ??????????????????????? ??????????????????????? ???????????????????????JobRunShell?shell?=?null;?? ???????????????????????try?{?? ???????????????????????????shell?=?qsRsrcs.getJobRunShellFactory().borrowJobRunShell();?? ???????????????????????????shell.initialize(qs,?bndle);?? ???????????????????????}?catch?(SchedulerException?se)?{?? ???????????????????????????try?{?? ???????????????????????????????qsRsrcs.getJobStore().triggeredJobComplete(ctxt,?? ???????????????????????????????????????trigger,?bndle.getJobDetail(),?Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);?? ???????????????????????????}?catch?(SchedulerException?se2)?{?? ???????????????????????????????qs.notifySchedulerListenersError(?? ???????????????????????????????????????"An?error?occured?while?placing?job's?triggers?in?error?state?'"?? ???????????????????????????????????????????????+?trigger.getFullName()?+?"'",?se2);?? ??????????????????????????????? ??????????????????????????????? ???????????????????????????????errorTriggerRetryLoop(bndle);?? ???????????????????????????}?? ???????????????????????????continue;?? ???????????????????????}?? ?? ???????????????????????if?(qsRsrcs.getThreadPool().runInThread(shell)?==?false)?{?? ???????????????????????????try?{?? ??????????????????????????????? ??????????????????????????????? ??????????????????????????????? ??????????????????????????????? ???????????????????????????????getLog().error("ThreadPool.runInThread()?return?false!");?? ???????????????????????????????qsRsrcs.getJobStore().triggeredJobComplete(ctxt,?? ???????????????????????????????????????trigger,?bndle.getJobDetail(),?Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);?? ???????????????????????????}?catch?(SchedulerException?se2)?{?? ???????????????????????????????qs.notifySchedulerListenersError(?? ???????????????????????????????????????"An?error?occured?while?placing?job's?triggers?in?error?state?'"?? ???????????????????????????????????????????????+?trigger.getFullName()?+?"'",?se2);?? ??????????????????????????????? ??????????????????????????????? ???????????????????????????????releaseTriggerRetryLoop(trigger);?? ???????????????????????????}?? ???????????????????????}?? ???????????????????????continue;?? ???????????????????}?? ???????????????}?else?{? ???????????????????continue;? ???????????????}?? ?? ???????????????long?now?=?System.currentTimeMillis();?? ???????????????long?waitTime?=?now?+?getRandomizedIdleWaitTime();?? ???????????????long?timeUntilContinue?=?waitTime?-?now;?? ???????????????synchronized(sigLock)?{?? ????????????????try?{?? ????????????????????sigLock.wait(timeUntilContinue);?? ????????????????}?catch?(InterruptedException?ignore)?{?? ????????????????}?? ???????????????}?? ?? ???????????}?catch(RuntimeException?re)?{?? ???????????????getLog().error("Runtime?error?occured?in?main?trigger?firing?loop.",?re);?? ???????????}?? ???????}? ?? ??????? ???????qs?=?null;?? ???????qsRsrcs?=?null;?? ???}?? quartz工作者線程? Quartz 不會在主線程(QuartzSchedulerThread)中處理用戶的Job。Quartz 把線程管理的職責委托給ThreadPool。 一般的設置使用org.quartz.simpl.SimpleThreadPool。SimpleThreadPool 創建了一定數量的 WorkerThread 實例來使得Job能夠在線程中進行處理。? WorkerThread 是定義在 SimpleThreadPool 類中的內部類,它實質上就是一個線程。? 要創建 WorkerThread 的數量以及配置他們的優先級是在文件quartz.properties中并傳入工廠。? spring properties?
Java代碼??
<prop?key="org.quartz.threadPool.class">org.quartz.simpl.SimpleThreadPool</prop>?? <prop?key="org.quartz.threadPool.threadCount">20</prop>?? <prop?key="org.quartz.threadPool.threadPriority">5</prop>?? 主線程(QuartzSchedulerThread)請求ThreadPool去運行 JobRunShell 實例,ThreadPool 就檢查看是否有一個可用的工作者線? 程。假如所以已配置的工作者線程都是忙的,ThreadPool 就等待直到有一個變為可用。當一個工作者線程是可用的,? 并且有一個JobRunShell 等待執行,工作者線程就會調用 JobRunShell 類的 run() 方法。? Quartz 框架允許替換線程池,但必須實現org.quartz.spi.ThreadPool 接口.? ? 圖4 quartz內部的主線程和工作者線程? Quartz的存儲和持久化? Quartz 用 JobStores 對 Job、Trigger、calendar 和 Schduler 數據提供一種存儲機制。Scheduler 應用已配置的JobStore 來存儲和獲取到部署信息,并決定正被觸發執行的 Job 的職責。? 所有的關于哪個 Job 要執行和以什么時間表來執行他們的信息都來存儲在 JobStore。? 在 Quartz 中兩種可用的 Job 存儲類型是:? 內存(非持久化) 存儲? 持久化存儲? JobStore 接口? Quartz 為所有類型的Job存儲提供了一個接口。叫 JobStore。所有的Job存儲機制,不管是在哪里或是如何存儲他們的信息的,都必須實現這個接口。? JobStore 接口的 API 可歸納為下面幾類:? Job 相關的 API? Trigger 相關的 API? Calendar 相關的 API? Scheduler 相關的 API? 使用內存來存儲 Scheduler 信息? Quartz 的內存Job存儲類叫做 org.quartz.simple.RAMJobStore,它實現了JobStore 接口的。? RAMJobStore 是 Quartz 的默認的解決方案。? 使用這種內存JobStore的好處。? RAMJobStore是配置最簡單的 JobStore:默認已經配置好了。見quartz.jar:org.quartz.quartz.properties? RAMJobStore的速度非常快。所有的 quartz存儲操作都在計算機內存中? 使用持久性的 JobStore? 持久性 JobStore = JDBC + 關系型數據庫? Quartz 所有的持久化的 JobStore 都擴展自 org.quartz.impl.jdbcjobstore.JobStoreSupport 類。? ? 圖5? JobStoreSupport 實現了 JobStore 接口,是作為 Quartz 提供的兩個具體的持久性 JobStore 類的基類。? Quartz 提供了兩種不同類型的JobStoreSupport實現類,每一個設計為針對特定的數據庫環境和配置:? ·org.quartz.impl.jdbcjobstore.JobStoreTX? ·org.quartz.impl.jdbcjobstore.JobStoreCMT? 獨立環境中的持久性存儲? JobStoreTX 類設計為用于獨立環境中。這里的 "獨立",我們是指這樣一個環境,在其中不存在與應用容器的事務集成。? #properties配置? org.quartz.jobStore.class = org.quartz.ompl.jdbcjobstore.JobStoreTX? 依賴容器相關的持久性存儲? JobStoreCMT 類設計為與程序容器事務集成,容器管理的事物(Container Managed Transactions (CMT))? crm使用JobStoreTX 因為quart有長時間鎖等待情況,不參與系統本身事務(crm任務內事務與quartz本身事務分離). Quartz 數據庫結構? 表名描述? QRTZ_CALENDARS 以 Blob 類型存儲 Quartz 的 Calendar 信息? QRTZ_CRON_TRIGGERS 存儲 Cron Trigger,包括 Cron 表達式和時區信息? QRTZ_FIRED_TRIGGERS 存儲與已觸發的 Trigger 相關的狀態信息,以及相聯 Job 的執行信息? QRTZ_PAUSED_TRIGGER_GRPS 存儲已暫停的 Trigger 組的信息? QRTZ_SCHEDULER_STATE 存儲少量的有關 Scheduler 的狀態信息,和別的 Scheduler 實例(假如是用于一個集群中)? QRTZ_LOCKS 存儲程序的非觀鎖的信息(假如使用了悲觀鎖)? QRTZ_JOB_DETAILS 存儲每一個已配置的 Job 的詳細信息? QRTZ_JOB_LISTENERS 存儲有關已配置的 JobListener 的信息? QRTZ_SIMPLE_TRIGGERS 存儲簡單的 Trigger,包括重復次數,間隔,以及已觸的次數? QRTZ_BLOG_TRIGGERS Trigger 作為 Blob 類型存儲(用于 Quartz 用戶用 JDBC 創建他們自己定制的 Trigger 類型,JobStore 并不知道如何存儲實例的時候)? QRTZ_TRIGGER_LISTENERS 存儲已配置的 TriggerListener 的信息? QRTZ_TRIGGERS 存儲已配置的 Trigger 的信息? 所有的表默認以前綴QRTZ_開始。可以通過在 quartz.properties配置修改(org.quartz.jobStore.tablePrefix = QRTZ_)。? 可以對不同的Scheduler實例使用多套的表,通過改變前綴來實現。? 優化 quartz數據表結構? -- 1:對關鍵查詢路徑字段建立索引?
Java代碼??
create?index?idx_qrtz_t_next_fire_time?on?QRTZ_TRIGGERS(NEXT_FIRE_TIME);?? create?index?idx_qrtz_t_state?on?QRTZ_TRIGGERS(TRIGGER_STATE);?? create?index?idx_qrtz_t_nf_st?on?QRTZ_TRIGGERS(TRIGGER_STATE,NEXT_FIRE_TIME);?? create?index?idx_qrtz_ft_trig_group?on?QRTZ_FIRED_TRIGGERS(TRIGGER_GROUP);?? create?index?idx_qrtz_ft_trig_name?on?QRTZ_FIRED_TRIGGERS(TRIGGER_NAME);?? create?index?idx_qrtz_ft_trig_n_g?on?QRTZ_FIRED_TRIGGERS(TRIGGER_NAME,TRIGGER_GROUP);?? create?index?idx_qrtz_ft_trig_inst_name?on?QRTZ_FIRED_TRIGGERS(INSTANCE_NAME);?? create?index?idx_qrtz_ft_job_name?on?QRTZ_FIRED_TRIGGERS(JOB_NAME);?? create?index?idx_qrtz_ft_job_group?on?QRTZ_FIRED_TRIGGERS(JOB_GROUP);?? -- 2:根據Mysql innodb表結構特性,調整主鍵,降低二級索引的大小?
Java代碼??
ALTER?TABLE?QRTZ_TRIGGERS?? ADD?UNIQUE?KEY?IDX_NAME_GROUP(TRIGGER_NAME,TRIGGER_GROUP),?? DROP?PRIMARY?KEY,?? ADD?ID?INT?UNSIGNED?NOT?NULL?AUTO_INCREMENT?FIRST,?? ADD?PRIMARY?KEY?(ID);?? ALTER?TABLE?QRTZ_JOB_DETAILS?? ADD?UNIQUE?KEY?IDX_NAME_GROUP(JOB_NAME,JOB_GROUP),?? DROP?PRIMARY?KEY,?? ADD?ID?INT?UNSIGNED?NOT?NULL?AUTO_INCREMENT?FIRST,?? ADD?PRIMARY?KEY?(ID);?? Quartz集群? 只有使用持久的JobStore才能完成Quqrtz集群? ? 圖6? 一個 Quartz 集群中的每個節點是一個獨立的 Quartz 應用,它又管理著其他的節點。? 需要分別對每個節點分別啟動或停止。不像應用服務器的集群,獨立的 Quartz 節點并不與另一個節點或是管理節點通信。? Quartz 應用是通過數據庫表來感知到另一應用。? 配置集群?
Xml代碼??
<prop?key="org.quartz.jobStore.class">org.quartz.impl.jdbcjobstore.JobStoreTX</prop>?? <prop?key="org.quartz.jobStore.isClustered">true</prop>?? <prop?key="org.quartz.jobStore.clusterCheckinInterval">15000</prop>?? <prop?key="org.quartz.jobStore.maxMisfiresToHandleAtATime">1</prop>?? <prop?key="org.quartz.jobStore.dataSource">myDS</prop>?? <prop?key="org.quartz.dataSource.myDS.driver">${database.driverClassName}</prop>?? <prop?key="org.quartz.dataSource.myDS.URL">${database.url}</prop>?? <prop?key="org.quartz.dataSource.myDS.user">${database.username}</prop>?? <prop?key="org.quartz.dataSource.myDS.password">${database.password}</prop>?? <prop?key="org.quartz.dataSource.myDS.maxConnections">5</prop>?? org.quartz.jobStore.class 屬性為 JobStoreTX,? 將任務持久化到數據中。因為集群中節點依賴于數據庫來傳播Scheduler實例的狀態,你只能在使用 JDBC JobStore 時應用 Quartz 集群。? org.quartz.jobStore.isClustered 屬性為 true,通知Scheduler實例要它參與到一個集群當中。? org.quartz.jobStore.clusterCheckinInterval? 屬性定義了Scheduler 實例檢入到數據庫中的頻率(單位:毫秒)。? Scheduler 檢查是否其他的實例到了它們應當檢入的時候未檢入;? 這能指出一個失敗的 Scheduler 實例,且當前 Scheduler 會以此來接管任何執行失敗并可恢復的 Job。? 通過檢入操作,Scheduler 也會更新自身的狀態記錄。clusterChedkinInterval 越小,Scheduler 節點檢查失敗的 Scheduler 實例就越頻繁。默認值是 15000 (即15 秒)? 集群實現分析? Quartz原來碼分析:? 基于數據庫表鎖實現多Quartz_Node 對Job,Trigger,Calendar等同步機制?
Sql代碼??
CREATE?TABLE?`QRTZ_LOCKS`?(?? ??`LOCK_NAME`?varchar(40)?NOT?NULL,?? ??PRIMARY?KEY?(`LOCK_NAME`)?? )?ENGINE=InnoDB?DEFAULT?CHARSET=utf8;?? + |?LOCK_NAME???????|?? + |?CALENDAR_ACCESS?|??? |?JOB_ACCESS??????|??? |?MISFIRE_ACCESS??|??? |?STATE_ACCESS????|??? |?TRIGGER_ACCESS??|??? + 通過行級別鎖實現多節點處理?
Java代碼??
public?class?StdRowLockSemaphore?extends?DBSemaphore?{?? ?? ???? ????public?static?final?String?SELECT_FOR_LOCK?=?"SELECT?*?FROM?"?? ????????????+?TABLE_PREFIX_SUBST?+?TABLE_LOCKS?+?"?WHERE?"?+?COL_LOCK_NAME?? ????????????+?"?=???FOR?UPDATE";?? ?? ???? ????public?StdRowLockSemaphore()?{?? ????????super(DEFAULT_TABLE_PREFIX,?null,?SELECT_FOR_LOCK);?? ????}?? ?? ????public?StdRowLockSemaphore(String?tablePrefix,?String?seletWithLockSQL)?{?? ????????super(tablePrefix,?selectWithLockSQL,?SELECT_FOR_LOCK);?? ????}?? ?? ???? ????protected?void?executeSQL(Connection?conn,?String?lockName,?String?expandedSQL)?throws?LockException?{?? ????????PreparedStatement?ps?=?null;?? ????????ResultSet?rs?=?null;?? ????????try?{?? ????????????ps?=?conn.prepareStatement(expandedSQL);?? ????????????ps.setString(1,?lockName);?? ?? ????????????if?(getLog().isDebugEnabled())?{?? ????????????????getLog().debug(?? ????????????????????"Lock?'"?+?lockName?+?"'?is?being?obtained:?"?+??? ????????????????????Thread.currentThread().getName());?? ????????????}?? ????????????rs?=?ps.executeQuery();?? ????????????if?(!rs.next())?{?? ????????????????throw?new?SQLException(Util.rtp(?? ????????????????????"No?row?exists?in?table?"?+?TABLE_PREFIX_SUBST?+??? ????????????????????TABLE_LOCKS?+?"?for?lock?named:?"?+?lockName,?getTablePrefix()));?? ????????????}?? ????????}?catch?(SQLException?sqle)?{?? ????????????if?(getLog().isDebugEnabled())?{?? ????????????????getLog().debug(?? ????????????????????"Lock?'"?+?lockName?+?"'?was?not?obtained?by:?"?+??? ????????????????????Thread.currentThread().getName());?? ????????????}?? ????????????throw?new?LockException("Failure?obtaining?db?row?lock:?"?? ????????????????????+?sqle.getMessage(),?sqle);?? ????????}?finally?{?? ????????????if?(rs?!=?null)?{??? ????????????????try?{?? ????????????????????rs.close();?? ????????????????}?catch?(Exception?ignore)?{?? ????????????????}?? ????????????}?? ????????????if?(ps?!=?null)?{?? ????????????????try?{?? ????????????????????ps.close();?? ????????????????}?catch?(Exception?ignore)?{?? ????????????????}?? ????????????}?? ????????}?? ????}?? ?? ????protected?String?getSelectWithLockSQL()?{?? ????????return?getSQL();?? ????}?? ?? ????public?void?setSelectWithLockSQL(String?selectWithLockSQL)?{?? ????????setSQL(selectWithLockSQL);?? ????}?? }?? ?? ??? ????public?boolean?obtainLock(Connection?conn,?String?lockName)?throws?LockException?{?? ????????lockName?=?lockName.intern();?? ?? ????????Logger?log?=?getLog();?? ?? ????????if(log.isDebugEnabled())?{?? ????????????log.debug(?? ????????????????"Lock?'"?+?lockName?+?"'?is?desired?by:?"?? ????????????????????????+?Thread.currentThread().getName());?? ????????}?? ????????if?(!isLockOwner(conn,?lockName))?{?? ????????????executeSQL(conn,?lockName,?expandedSQL);?? ?????????????? ????????????if(log.isDebugEnabled())?{?? ????????????????log.debug(?? ????????????????????"Lock?'"?+?lockName?+?"'?given?to:?"?? ????????????????????????????+?Thread.currentThread().getName());?? ????????????}?? ????????????getThreadLocks().add(lockName);?? ???????????? ???????????? ????????}?else?if(log.isDebugEnabled())?{?? ????????????log.debug(?? ????????????????"Lock?'"?+?lockName?+?"'?Is?already?owned?by:?"?? ????????????????????????+?Thread.currentThread().getName());?? ????????}?? ????????return?true;?? ????}?? ?? ???? ????public?void?releaseLock(Connection?conn,?String?lockName)?{?? ????????lockName?=?lockName.intern();?? ?? ????????if?(isLockOwner(conn,?lockName))?{?? ????????????if(getLog().isDebugEnabled())?{?? ????????????????getLog().debug(?? ????????????????????"Lock?'"?+?lockName?+?"'?returned?by:?"?? ????????????????????????????+?Thread.currentThread().getName());?? ????????????}?? ????????????getThreadLocks().remove(lockName);?? ???????????? ????????}?else?if?(getLog().isDebugEnabled())?{?? ????????????getLog().warn(?? ????????????????"Lock?'"?+?lockName?+?"'?attempt?to?return?by:?"?? ????????????????????????+?Thread.currentThread().getName()?? ????????????????????????+?"?--?but?not?owner!",?? ????????????????new?Exception("stack-trace?of?wrongful?returner"));?? ????????}?? ????}?? JobStoreTX 控制并發代碼?
Java代碼??
???protected?Object?executeInLock(String?lockName,?TransactionCallback?txCallback)?throws?JobPersistenceException?{?? ???????return?executeInNonManagedTXLock(lockName,?txCallback);?? ???}?? ?? 使用JobStoreSupport.executeInNonManagedTXLock?實現:?? ???protected?Object?executeInNonManagedTXLock(?? ???????????String?lockName,??? ???????????TransactionCallback?txCallback)?throws?JobPersistenceException?{?? ???????boolean?transOwner?=?false;?? ???????Connection?conn?=?null;?? ???????try?{?? ???????????if?(lockName?!=?null)?{?? ??????????????? ??????????????? ???????????????if?(getLockHandler().requiresConnection())?{?? ???????????????????conn?=?getNonManagedTXConnection();?? ???????????????}?? ???????????? ???????????????transOwner?=?getLockHandler().obtainLock(conn,?lockName);?? ???????????}?? ???????????if?(conn?==?null)?{?? ???????????????conn?=?getNonManagedTXConnection();?? ???????????}?? ???????????? ???????????Object?result?=?txCallback.execute(conn);?? ???????? ???????????commitConnection(conn);?? ???????????Long?sigTime?=?clearAndGetSignalSchedulingChangeOnTxCompletion();?? ???????????if(sigTime?!=?null?&&?sigTime?>=?0)?{?? ???????????????signalSchedulingChangeImmediately(sigTime);?? ???????????}?? ???????????return?result;?? ???????}?catch?(JobPersistenceException?e)?{?? ???????????rollbackConnection(conn);?? ???????????throw?e;?? ???????}?catch?(RuntimeException?e)?{?? ???????????rollbackConnection(conn);?? ???????????throw?new?JobPersistenceException("Unexpected?runtime?exception:?"?+?e.getMessage(),?e);?? ???????}?finally?{?? ???????????try?{?? ???????????? ???????????????releaseLock(conn,?lockName,?transOwner);?? ???????????}?finally?{?? ???????????????cleanupConnection(conn);?? ???????????}?? ???????}?? ???}?? JobStoreCMT 控制并發代碼?
Java代碼??
?? ???protected?Object?executeInLock(String?lockName,?TransactionCallback?txCallback)?throws?JobPersistenceException?{?? ???????boolean?transOwner?=?false;?? ???????Connection?conn?=?null;?? ???????try?{?? ???????????if?(lockName?!=?null)?{?? ??????????????? ??????????????? ???????????????if?(getLockHandler().requiresConnection())?{?? ???????????????????conn?=?getConnection();?? ???????????????}?? ???????????????transOwner?=?getLockHandler().obtainLock(conn,?lockName);?? ???????????}?? ?? ???????????if?(conn?==?null)?{?? ???????????????conn?=?getConnection();?? ???????????}?? ???????? ???????????return?txCallback.execute(conn);?? ???????}?finally?{?? ???????????try?{?? ???????????????releaseLock(conn,?LOCK_TRIGGER_ACCESS,?transOwner);?? ???????????}?finally?{?? ???????????????cleanupConnection(conn);?? ???????????}?? ???????}?? ???}?? CRM中quartz與Spring結合使用? Spring 通過提供org.springframework.scheduling.quartz下的封裝類對quartz支持? 但是目前存在問題? 1:Spring3.0目前不支持Quartz2.x以上版本? Caused by: java.lang.IncompatibleClassChangeError: class org.springframework.scheduling.quartz.CronTriggerBean? has interface org.quartz.CronTrigger as super class? 原因是 org.quartz.CronTrigger在2.0從class變成了一個interface造成IncompatibleClassChangeError錯誤。? 解決:無解,要想使用spring和quartz結合的方式 只能使用Quartz1.x版本。? 2:org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean報? java.io.NotSerializableException異常,需要自己實現QuartzJobBean。? 解決:spring bug己經在http://jira.springframework.org/browse/SPR-3797找到解決方案,? 作者重寫了MethodInvokingJobDetailFactoryBean.? 3:Spring內bean必須要實現序列化接口,否則不能通過Sprng 屬性注入的方式為job提供業務對象? 解決:?
Java代碼??
@Service("springBeanService")?? public?class?SpringBeanService?implements?Serializable{private?static?final?long?serialVersionUID?=?-2228376078979553838L;?? ????public?<T>?T?getBean(Class<T>?clazz,String?beanName){?? ????????ApplicationContext?context?=?ContextLoader.getCurrentWebApplicationContext();?? ????????return?(T)context.getBean(beanName);?? ????}?? }?? CRM中quartz模塊部分代碼? 1:定義所有job的父類,并負責異常發送郵件任務和日志任務?
Java代碼??
public?abstract?class?BaseQuartzJob?implements?Job,?Serializable?{?? ????private?static?final?long?serialVersionUID?=?3347549365534415931L;?? ????private?Logger?logger?=?LoggerFactory.getLogger(this.getClass());?? ?????? ???? ????public?abstract?void?action(JobExecutionContext?context);?? ?????? ????@Override?? ????public?void?execute(JobExecutionContext?context)?throws?JobExecutionException?{?? ????????try?{?? ????????????long?start?=?System.currentTimeMillis();?? ????????????this.action(context);?? ????????????long?end?=?System.currentTimeMillis();?? ????????????JobDetail?jobDetail?=?context.getJobDetail();?? ????????????Trigger?trigger?=?context.getTrigger();?? ????????????StringBuilder?buffer?=?new?StringBuilder();?? ????????????buffer.append("jobName?=?").append(jobDetail.getName()).append("?triggerName?=?")?? ????????????.append(trigger.getName()).append("?執行完成?,?耗時:?").append((end?-?start)).append("?ms");?? ????????????logger.info(buffer.toString());?? ????????}?catch?(Exception?e)?{?? ????????????doResolveException(context?!=?null???context.getMergedJobDataMap()?:?null,?e);?? ????????}?? ????}?? ????@SuppressWarnings("unchecked")?? ????private?void?doResolveException(JobDataMap?dataMap,?Exception?ex)?{?? ???????? ???????? ????}?? }?? 2:抽象Quartz操作接口(實現類 toSee: QuartzServiceImpl)?
Java代碼??
@Service?? public?interface?QuartzService?{?? ?List<Map<String,?Object>>?getQrtzTriggers(Page?page,?String?orderName,?String?sortType);?? ?List<Map<String,?Object>>?getQrtzJobDetails();?? ?void?executeTriggerAction(String?name,?String?group,?Integer?action);?? ?void?executeJobAction(String?name,?String?group,?Integer?action);?? ?void?addTrigger(String?jobName,?String?jobGroup,?TriggerViewBean?triggerBean);?? ?? ?void?addTriggerForDate(JobDetail?jobDetail,?String?triggerName?,?String?? ?triggerGroup?,?Date?date,?Map<String,?Object>?triggerDataMap)?;?? ?List<Map<String,?Object>>?getSchedulers();?? ?public?Trigger?getTrigger(String?name,?String?group);?? ?public?JobDetail?getJobDetail(String?name,?String?group);?? }?? 3:在Spring配置job,trigger,Scheduler,Listener組件?
Xml代碼??
<bean?id="accountStatusTaskScannerJobDetail"?? ?class="org.springframework.scheduling.quartz.JobDetailBean">?? ????<property?name="name"?value="accountStatusTaskScannerJobDetail"></property>?? ????<property?name="group"?value="CrmAccountGroup"></property>?? ????<property?name="jobClass"?value="***.crm.quartz.job.AccountStatusTaskScannerJob"></property>?? ???? ????<property?name="requestsRecovery"?value="true"/>?? ???? ????<property?name="durability"?value="true"/>?? ????<property?name="volatility"?value="false"></property>?? </bean>?? <bean?id="accountStatusTaskScannerTrigger"?class="org.springframework.scheduling.quartz.CronTriggerBean">?? ?????<property?name="group"?value="CrmDealGroup"></property>?? ?????<property?name="name"?value="accountStatusTaskScannerTrigger"></property>?? ????<property?name="jobDetail"?ref="accountStatusTaskScannerJobDetail"></property>?? ????<property?name="cronExpression"?value="0?0?1?*?*??"></property>?? </bean>?? ?? <bean?id="quartzExceptionSchedulerListener"??? class="***.crm.quartz.listener.QuartzExceptionSchedulerListener"></bean>?? ?? <bean?id="quartzScheduler"?? ?class="org.springframework.scheduling.quartz.SchedulerFactoryBean">?? ????<property?name="quartzProperties">?? ????<props>?? ????????<prop?key="org.quartz.scheduler.instanceName">CRMscheduler</prop>?? ????????<prop?key="org.quartz.scheduler.instanceId">AUTO</prop>?? ???????? ????????<prop?key="org.quartz.threadPool.class">org.quartz.simpl.SimpleThreadPool</prop>?? ????????<prop?key="org.quartz.threadPool.threadCount">20</prop>?? ????????<prop?key="org.quartz.threadPool.threadPriority">5</prop>?? ???????? ????????<prop?key="org.quartz.jobStore.class">org.quartz.impl.jdbcjobstore.JobStoreTX</prop>?? ???????? ????????<prop?key="org.quartz.jobStore.isClustered">false</prop>?? ????????<prop?key="org.quartz.jobStore.clusterCheckinInterval">15000</prop>?? ????????<prop?key="org.quartz.jobStore.maxMisfiresToHandleAtATime">1</prop>?? ???????? ????????<prop?key="org.quartz.jobStore.dataSource">myDS</prop>?? ????????<prop?key="org.quartz.dataSource.myDS.driver">${database.driverClassName}</prop>?? ????????<prop?key="org.quartz.dataSource.myDS.URL">${database.url}</prop>?? ????????<prop?key="org.quartz.dataSource.myDS.user">${database.username}</prop>?? ????????<prop?key="org.quartz.dataSource.myDS.password">${database.password}</prop>?? ????????<prop?key="org.quartz.dataSource.myDS.maxConnections">5</prop>?? ????????<prop?key="org.quartz.jobStore.misfireThreshold">120000</prop>?? ????</props>?? ????</property>?? ????<property?name="schedulerName"?value="CRMscheduler"?/>?? ???? ????<property?name="startupDelay"?value="30"/>?? ????<property?name="applicationContextSchedulerContextKey"?value="applicationContextKey"?/>?? ???? ????<property?name="overwriteExistingJobs"?value="true"?/>?? ???? ????<property?name="autoStartup"?value="true"?/>?? ???? ????<property?name="triggers">?? ?<list>?? ????<ref?bean="dailyStatisticsTrigger"?/>?? ????<ref?bean="accountGrabedScannerTrigger"?/>?? ????<ref?bean="syncAccountFromPOITrigger"?/>?? ????<ref?bean="userSyncScannerTrigger"?/>?? ????<ref?bean="syncParentBranchFromPOITrigger"/>?? ????<ref?bean="privateReminderTrigger"?/>?? ????<ref?bean="onlineBranchesScannerTrigger"?/>?? ????<ref?bean="syncCtContactServiceTrigger"?/>?? ????<ref?bean="dealLinkDianpingScannerTrigger"?/>?? ????<ref?bean="accountStatusTaskScannerTrigger"/>?? ????<ref?bean="nDaysActivityScannerTrigger"/>?? ?</list>?? ?</property>?? ?<property?name="jobDetails">?? ????<list>?? ????????<ref?bean="myTestQuartzJobDetail"/>?? ????????<ref?bean="accountPrivateToProtectedJobDetail"/>?? ????????<ref?bean="accountProtectedToPublicJobDetail"/>?? ?<ref?bean="nDaysActivityToProtectedJobDetail"/>?? ?</list>?? ?</property>?? <property?name="schedulerListeners">?? ????<list>?? ????????<ref?bean="quartzExceptionSchedulerListener"/>?? ????</list>?? ?</property>?? </bean>?? Crm目前可以做到對Quartz實例的監控,操作.動態部署Trigger? ? ? 后續待開發功能和問題? 1:目前實現對job,Trigger操作,動態部署Trigger,后續需要加入Calendar(排除特定日期),Listener(動態加載監控),Job的動態部署(只要bean的名稱和方法名,就可完成對job生成,部署)? 2:由于Quartz集群中的job目前是在任意一臺server中執行,Quartz日志生成各自的系統目錄中, quartz日志無法統一.? 3:Quartz2.x已經支持可選節點執行job(期待Spring升級后對新Quartz支持)? 4:Quartz內部的DB操作大量Trigger存在嚴重競爭問題,瞬間大量trigger執行,目前只能通過(org.quartz.jobStore.tablePrefix = QRTZ)分表操作,存在長時間lock_wait(新版本據說有提高);? 5:如果有需要,可以抽取出Quartz,變成單獨的服務,供其它系統調度使用使用
轉載于:https://www.cnblogs.com/davidwang456/p/4205572.html
總結
以上是生活随笔 為你收集整理的项目中使用Quartz集群分享--转载 的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網站內容還不錯,歡迎將生活随笔 推薦給好友。