日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

项目中使用Quartz集群分享--转载

發布時間:2025/4/5 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 项目中使用Quartz集群分享--转载 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

原文:http://hot66hot.iteye.com/blog/1726143

在公司分享了Quartz,發布出來,希望大家討論補充.?

CRM使用Quartz集群分享?
一:CRM對定時任務的依賴與問題?
二:什么是quartz,如何使用,集群,優化?
三:CRM中quartz與Spring結合使用?

1:CRM對定時任務的依賴與問題?
1)依賴?
(1)每天晚上的定時任務,通過sql腳本 + crontab方式執行?

Xml代碼??
  • #crm??
  • 0?2?*?*?*?/opt/***/javafiles/***/shell/***_daily_stat.sql??
  • 30?7?*?*?*?/opt/***/javafiles/***/shell/***_data_fix??
  • 30?0?*?*?*?/opt/***/javafiles/***/shell/***_sync_log??
  • 0?1?*?*?*?/opt/***/javafiles/***/shell/***_clear_log??
  • 20?8?*?*?*?/opt/***/javafiles/***/shell/***_daily?>>?/var/***/logs/***_daily.log?2>&1??
  • 40?1?*?*?*?/opt/***/javafiles/***/shell/***_sync_account2??
  • 0?2?*?*?1?/opt/***/javafiles/***/shell/***_weekly?>>?/var/***/logs/***_weekly.log?2>&1??

  • 存在的問題:當需要跨庫或許數據的,sql無能為力,引入許多中間表,完成復雜統計需求。大范圍對線上熱表掃描,造成鎖表,延遲嚴重?
    (2)使用python(多數據源) + SQL的方式?

    Python代碼??
  • def?connectCRM():??
  • ????return?MySQLdb.Connection("localhost",?"***",?"***",?"***",?3306,?charset="utf8")??
  • ??
  • def?connectTemp():??
  • ????return?MySQLdb.Connection("localhost",?"***",?"***",??"***",?3306,?charset="utf8")??
  • ??
  • def?connectOA():??
  • ????return?MySQLdb.Connection("localhost",?"***",?"***",??"***",?3306,?charset="utf8")??
  • ??
  • def?connectCore():??
  • ????return?MySQLdb.Connection("localhost",?"***",?"***",??"***",?3306,?charset="utf8")??
  • ??
  • def?connectCT():??
  • ????return?MySQLdb.Connection("localhost",?"***",?"***",?"***",?3306,?charset="utf8")??


  • 存在的問題:直接訪問數據,需要理解各系統的數據結構,無法滿足動態任務問題,各系統業務接口沒有重用?

    (3)使用spring + JDK timer方式調用接口完成定時任務?

    Xml代碼??
  • <bean?id="accountStatusTaskScanner"??class="***.impl.AccountStatusTaskScanner"?/>??
  • ????<task:scheduler?id="taskScheduler"?pool-size="5"?/>??
  • ????<task:scheduled-tasks?scheduler="taskScheduler">??
  • ????<task:scheduled?ref="accountStatusTaskScanner"?method="execute"?cron="0?0?1?*?*??"?/>??
  • </task:scheduled-tasks>??

  • 使用寫死服務器Host(srv23)的方式,控制只在一臺服務器上執行task?

    Java代碼??
  • public?abstract?class?SingletonServerTaskScanner?implements?TaskScanner?{??
  • ????private?final?Logger?logger?=?LoggerFactory.getLogger(SingletonServerTaskScanner.class);??
  • ????@Override??
  • ????public?void?execute()?{??
  • ????????String?hostname?=?"";??
  • ????????try?{??
  • ????????????hostname?=?InetAddress.getLocalHost().getHostName();??
  • ????????}?catch?(UnknownHostException?e)?{??
  • ????????????logger.error(e.getMessage(),?e);??
  • ????????}??
  • ????????//判斷是否為當前可執行服務器??
  • ????????if?(ConfigUtil.getValueByKey("core.scan.server").equals(hostname))?{??
  • ????????????doScan();??
  • ????????}??
  • ????}??
  • ????public?abstract?void?doScan();??
  • }??


  • //對于srv23的重啟,保存在內存中的任務將丟失,每次重啟srv23重新生成定時任務?

    Java代碼??
  • public?class?CrmInitializer?implements?InitializingBean?{??
  • ????private?Logger?logger?=?LoggerFactory.getLogger(CrmInitializer.class);??
  • ????@Override??
  • ????public?void?afterPropertiesSet()?throws?Exception?{??
  • ????????//?掃描商家狀態,創建定時任務??
  • ????????logger.info("掃描商家狀態,創建定時任務");??
  • ????????accountStatusTaskScanner.execute();??
  • ????????//?掃描N天未拜訪商家,創建定時任務??
  • ????????logger.info("掃描N天未拜訪商家,創建定時任務");??
  • ????????nDaysActivityScanner.execute();??
  • ????}??
  • }??
  • ?

    Java代碼??
  • //通過調用srv23的特定URL的方式,動態指定任務(如取消N天未拜訪,私海進保護期,保護期進公海等)??
  • public?class?SingletonServerTaskController?{??
  • ????????@Resource??
  • ????????private?AccountService?accountService;??
  • ????????@RequestMapping(value?=?"/reschedule")??
  • ????????public?@ResponseBody??
  • ????????????String?checkAndRescheduleAccount(Integer?accountId)?{??
  • ????????????logger.debug("reschedule?task?for?accountId:"?+?accountId);??
  • ????????????if?(isCurrentServer())?{??
  • ????????????????accountService.checkAndRescheduleAccount(Arrays.asList(accountId));??
  • ????????????}??
  • ????????????return?"ok";??
  • ????????}??
  • ????private?boolean?isCurrentServer()?{??
  • ????????String?hostname?=?"";??
  • ????????try?{??
  • ????????????hostname?=?InetAddress.getLocalHost().getHostName();??
  • ????????}?catch?(UnknownHostException?e)?{??
  • ????????????logger.error(e.getMessage(),?e);??
  • ????????}??
  • ????????if?(ConfigUtil.getValueByKey("core.scan.server").equals(hostname))?{??
  • ????????????return?true;??
  • ????????}?else?{??
  • ????????????return?false;??
  • ????????}??
  • ????}??
  • }??

  • 存在的問題:實現步驟復雜,分散,任務調度不能恢復,嚴重依賴于srv23,回調URL時可能失敗等情況。?
    CRM定時任務走過了很多彎路:?
    定時任務多種實現方式,使配置和代碼分散在多處,難以維護和監控?
    任務執行過程沒有保證,沒有錯誤恢復?
    任務執行異常沒有反饋(郵件)?
    沒有集群支持?
    CRM需要分布式的任務調度框架,統一解決問題.?
    JAVA可以使用的任務調度框架:Quartz , Jcrontab , cron4j , taobao-pamirs-schedule?
    為什么選擇Quartz:?
    1)資歷夠老,創立于1998年,比struts1還早,但是一直在更新(27 April 2012: Quartz 2.1.5 Released),文檔齊全.?
    2)完全由Java寫成,設計用于J2SE和J2EE應用.方便集成:JVM,RMI.?
    3)設計清晰簡單:核心概念scheduler,trigger,job,jobDetail,listener,calendar?
    4)支持集群:org.quartz.jobStore.isClustered?
    5)支持任務恢復:requestsRecovery?

    從http://www.quartz-scheduler.org 獲取最新Quartz?
    1)學習Quartz?

    ?
    圖1 介紹了quartz關鍵的組件和簡單流程?

    (1)Quartz 的目錄結構和內容?

    docs/api????????????????????????????????????? Quartz 框架的JavaDoc Api 說明文檔?
    docs/dbTables??????????????????????????? 創建 Quartz 的數據庫對象的腳本?
    docs/wikidocs???????????????????????????? Quartz 的幫助文件,點擊 index.html 開始查看?
    Examples??????????????????????????????????? 多方面使用 Quartz 的例子Lib Quartz 使用到的第三方包?
    src/java/org/quartz????????????????????? 使用 Quartz 的客戶端程序源代碼,公有 API?
    src/java/org/quartz/core????????????? 使用 Quartz 的服務端程序源代碼,私有 API?
    src/java/org/quartz/simpl??????????? Quartz 提供的不衣賴于第三方產品的簡單實現?
    src/java/org/quartz/impl????????????? 依賴于第三方產品的支持模塊的實現?
    src/java/org/quartz/utils????????????? 整個框架要用到的輔助類和工具組件?
    src/jboss???????????????????????????????????? 提供了特定于 JBoss 特性的源代碼?
    src/oracle?????????????????????????????????? 提供了特定于 Oracle 特性的源代碼?
    src/weblogic????????????????????????????? 提供了特定于 WebLogic 特性的源代碼?

    Quartz 框架包含許多的類和接口,它們分布在大概 11 個包中。多數所要使用到的類或接口放置在 org.quartz 包中。這個包含蓋了 Quartz 框架的公有 API.?

    (2)Quartz核心接口 Scheduler?


    圖2?
    Scheduler 是 Quartz 的主要 API。與Quartz大部分交互是發生于 Scheduler 之上的。客服端與Scheduler 交互是通過org.quartz.Scheduler接口。?
    Scheduler的實現:對方法調用會傳遞到 QuartzScheduler 實例上。QuartzScheduler 對于客戶端是不可見的,并且也不存在與此實例的直接交互。?

    ?

    圖3?

    創建Scheduler?
    Quartz 框架提供了 org.quartz.SchedulerFactory 接口。?
    SchedulerFactory 實例就是用來產生 Scheduler 實例的。當 Scheduler 實例被創建之后,就會存到一個倉庫中(org.quartz.impl.SchedulerRepository).?
    Scheduler 工廠分別是 org.quartz.impl.DirectSchedulerFactory 和 org.quartz.impl.StdSchedulerFactory?
    DirectSchedulerFactory 是為精細化控制 Scheduler 實例產生的工廠類,一般不用,不過有利于理解quartz內部組件。?

    Java代碼??
  • --?最簡單??
  • public?void?createScheduler(ThreadPool?threadPool,?JobStore?jobStore);??
  • --?最復雜??
  • public?void?createScheduler(String?schedulerName,?String?schedulerInstanceId,ThreadPool?threadPool,?JobStore?jobStore,?String?rmiRegistryHost,?int?rmiRegistryPort);??
  • ?

    Java代碼??
  • public?scheduler?createScheduler(){??
  • ?DirectSchedulerFactory?factory=DirectSchedulerFactory.getInstance();??
  • ?try?{??
  • ????//創建線程池??
  • ????SimpleThreadPool?threadPool?=?new?SimpleThreadPool(10,?Thread.NORM_PRIORITY);??
  • ????threadPool.initialize();??
  • ????//創建job存儲類??
  • ????JobStoreTX?jdbcJobStore?=?new?JobStoreTX();??
  • ????jdbcJobStore.setDataSource("someDatasource");??
  • ????????jdbcJobStore.setPostgresStyleBlobs(true);??
  • ????????jdbcJobStore.setTablePrefix("QRTZ_");??
  • ????????jdbcJobStore.setInstanceId("My?Instance");??
  • ??????
  • ????logger.info("Scheduler?starting?up...");??
  • ????factory.createScheduler(threadPool,jdbcJobStore);??
  • ????//?Get?a?scheduler?from?the?factory??
  • ????????Scheduler?scheduler?=?factory.getScheduler();??
  • ??
  • ????//?必須啟動scheduler??
  • ????????scheduler.start();??
  • ????????return?scheduler;??
  • ????}??
  • ????????return?null;??
  • }??


  • org.quartz.impl.StdSchedulerFactory 依賴于屬性類(Properties)決定如何生產 Scheduler 實例?

    通過加載屬性文件,Properties 提供啟動參數:?

    Java代碼??
  • public?scheduler?createScheduler(){??
  • ????//?Create?an?instance?of?the?factory??
  • ????StdSchedulerFactory?factory?=?new?StdSchedulerFactory();??
  • ??????
  • ????//?Create?the?properties?to?configure?the?factory??
  • ????Properties?props?=?new?Properties();??
  • ????//?required?to?supply?threadpool?class?and?num?of?threads??
  • ????props.put(StdSchedulerFactory.PROP_THREAD_POOL_CLASS,"org.quartz.simpl.SimpleThreadPool");??
  • ????props.put("org.quartz.threadPool.threadCount",?"10");??
  • ??????
  • ????try?{??
  • ????????//?Initialize?the?factory?with?properties??
  • ????????factory.initialize(props);??
  • ????????Scheduler?scheduler?=?factory.getScheduler();??
  • ????????logger.info("Scheduler?starting?up...");??
  • ????????scheduler.start();??
  • ????}?catch?(SchedulerException?ex)?{??
  • ????????logger.error(ex);??
  • ????}??
  • }??

  • 調用靜態方法 getDefaultScheduler() 方法中調用了空的構造方法。如果之前未調用過任何一個 initialize() 方法,那么無參的initialize() 方法會被調用。這會開始去按照下面說的順序加載文件。?
    默認情況下,quartz.properties 會被定位到,并從中加載屬性。?

    properties加載順序:?
    1. 檢查 System.getProperty("org.quartz.properties") 中是否設置了別的文件名?
    2. 否則,使用 quartz.properties 作為要加載的文件名?
    3. 試圖從當前工作目錄中加載這個文件?
    4. 試圖從系統 classpath 下加載這個文件?
    在 Quartz Jar 包中有一個默認的 quartz.properties 文件?

    默認配置如下?
    # Default Properties file for use by StdSchedulerFactory?
    # to create a Quartz Scheduler Instance, if a different?
    # properties file is not explicitly specified.?

    org.quartz.scheduler.instanceName = DefaultQuartzScheduler?
    org.quartz.scheduler.rmi.export = false?
    org.quartz.scheduler.rmi.proxy = false?
    org.quartz.scheduler.wrapJobExecutionInUserTransaction = false?
    org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool?
    org.quartz.threadPool.threadCount = 10?
    org.quartz.threadPool.threadPriority = 5?
    org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true?
    org.quartz.jobStore.misfireThreshold = 60000?
    org.quartz.jobStore.class = org.quartz.simpl.RAMJobStore?
    到此創建Scheduler完成?

    通過Scheduler理解Quartz?
    Scheduler 的 API 可以分組成以下三個類別:?
    ·管理 Scheduler?

    (1)啟動 Scheduler?

    Java代碼??
  • //Start?the?scheduler??
  • ?scheduler.start();??

  • start() 方法被調用,Scheduler 就開始搜尋需要執行的 Job。在你剛得到一個 Scheduler 新的實例時,或者 Scheduler?
    被設置為 standby 模式后,你才可以調用 start() 方法。?

    Java代碼??
  • public?void?standby()?throws?SchedulerException;??

  • 只要調用了 shutdown() 方法之后,你就不能再調用 Scheduler 實例的 start() 方法了。?
    這是因為 shutdown() 方法銷毀了為 Scheduler 創建的所有的資源(線程,數據庫連接等)。?
    你可能需要Standby 模式:設置 Scheduler 為 standby 模式會導致 Scheduler搜尋要執行的 Job 的線程被暫停下來?

    停止 Scheduler?

    Java代碼??
  • //waitForJobsToComplete?是否讓當前正在進行的Job正常執行完成才停止Scheduler??
  • public?void?shutdown(boolean?waitForJobsToComplete)?throws?SchedulerException;??
  • public?void?shutdown()?throws?SchedulerException;??

  • 其它管理Scheduler 方法見API...?
    管理 Job?
    什么是 Quartz Job??
    一個Quart Job就是一個任何一個繼承job或job子接口的Java類,你可以用這個類做任何事情!?

    org.quartz.Job 接口?

    Java代碼??
  • public?void?execute(JobExecutionContext?context)throws?JobExecutionException;??
  • JobExecutionContext??

  • 當 Scheduler 調用一個 Job,一個 JobexecutionContext 傳遞給 execute() 方法。JobExecutionContext 對象讓 Job 能?
    訪問 Quartz 運行時候環境和 Job 本身的數據。類似于在 Java Web 應用中的 servlet 訪問 ServletContext 。?
    通過 JobExecutionContext,Job 可訪問到所處環境的所有信息,包括注冊到 Scheduler 上與該 Job 相關聯的 JobDetail 和 Trigger。?
    JobDetail?
    部署在 Scheduler 上的每一個 Job 只創建了一個 JobDetail實例。JobDetail 是作為 Job 實例進行定義的?
    // Create the JobDetail?
    JobDetail jobDetail = new JobDetail("PrintInfoJob",Scheduler.DEFAULT_GROUP, PrintInfoJob.class);?
    // Create a trigger that fires now and repeats forever?
    Trigger trigger = TriggerUtils.makeImmediateTrigger(?
    SimpleTrigger.REPEAT_INDEFINITELY, 10000);?
    trigger.setName("PrintInfoJobTrigger");// register with the Scheduler?
    scheduler.scheduleJob(jobDetail, trigger);?
    JobDetail 被加到 Scheduler 中了,而不是 job。Job 類是作為 JobDetail 的一部份,job直到Scheduler準備要執行它的時候才會被實例化的,因此job不存在線成安全性問題.?

    使用 JobDataMap 對象設定 Job 狀態?

    Java代碼??
  • public?void?executeScheduler()?throws?SchedulerException{??
  • ????scheduler?=?StdSchedulerFactory.getDefaultScheduler();??
  • ????scheduler.start();??
  • ????logger.info("Scheduler?was?started?at?"?+?new?Date());??
  • ????//?Create?the?JobDetail??
  • ????JobDetail?jobDetail?=?new?JobDetail("PrintJobDataMapJob",Scheduler.DEFAULT_GROUP,PrintJobDataMapJob.class);??
  • ????//?Store?some?state?for?the?Job??
  • ????jobDetail.getJobDataMap().put("name",?"John?Doe");??
  • ????jobDetail.getJobDataMap().put("age",?23);??
  • ????jobDetail.getJobDataMap().put("balance",new?BigDecimal(1200.37));??
  • ????//?Create?a?trigger?that?fires?once??
  • ????Trigger?trigger?=?TriggerUtils.makeImmediateTrigger(0,?10000);??
  • ????trigger.setName("PrintJobDataMapJobTrigger");??
  • ????scheduler.scheduleJob(jobDetail,?trigger);??
  • }??
  • //Job?能通過?JobExecutionContext?對象訪問?JobDataMap??
  • public?class?PrintJobDataMapJob?implements?Job?{??
  • ????public?void?execute(JobExecutionContext?context)throws?JobExecutionException?{??
  • ????????logger.info("in?PrintJobDataMapJob");??
  • ????????//?Every?job?has?its?own?job?detail??
  • ????????JobDataMap?jobDataMap?=?context.getJobDetail().getJobDataMap();??
  • ????????//?Iterate?through?the?key/value?pairs??
  • ????????Iterator?iter?=?jobDataMap.keySet().iterator();??
  • ????????while?(iter.hasNext())?{??
  • ????????????Object?key?=?iter.next();??
  • ????????????Object?value?=?jobDataMap.get(key);??
  • ????????????logger.info("Key:?"?+?key?+?"?-?Value:?"?+?value);??
  • ????????}??
  • ????}??
  • ?}??

  • 在Quartz 1.5之后,JobDataMap在 Trigger 級也是可用的。它的用途類似于Job級的JobDataMap,支持在同一個JobDetail上的多個Trigger。?
    伴隨著加入到 Quartz 1.5 中的這一增強特性,可以使用 JobExecutionContext 的一個新的更方便的方法獲取到 Job 和 Trigger 級的并集的 map 中的值。?

    這個方法就是getMergedJobDataMap() 取job 和 Trigger級的并集map,它能夠在 Job 中使用。管法推薦使用這個方法.?

    * 實際使用中trigger級別有時取不到map中的值, 使用getMergedJobDataMap 可以獲取到(官方推薦此方法).?

    有狀態的Job: org.quartz.StatefulJob 接口?
    當需要在兩次 Job 執行間維護狀態,使用StatefulJob 接口.?

    Job 和 StatefulJob 在框架中使用中存在兩個關鍵差異。?
    (一) JobDataMap 在每次執行之后重新持久化到 JobStore 中。這樣就確保你對 Job 數據的改變直到下次執行仍然保持著。?
    (二) 兩個或多個有狀態的 JobDetail 實例不能并發執行。保證JobDataMap線程安全?

    注意:實際使用時使用jobStoreTX/jobStoreCMT ,StatefulJob,大量的trigger對應一個JobDetail的情況下Mysql會產生鎖超時問題.?

    中斷 Job?
    Quartz 包括一個接口叫做 org.quartz.InterruptableJob,它擴展了普通的 Job 接口并提供了一個 interrupt() 方法:
    沒有深入研究,只知道 Scheduler會調用自定義的Job的 interrupt()方法。由用戶決定 Job 決定如何中斷.沒有測試!!!

    job的特性?
    易失性 volatility?
    一個易失性的 Job 是在程序關閉之后不會被持久化。一個 Job 是通過調用 JobDetail 的 setVolatility(true)被設置為易失.?
    Job易失性的默認值是 false.?
    注意:只有采用持久性JobStore時才有效?

    Job 持久性 durability?
    設置JobDetail 的 setDurability(false),在所有的觸發器觸發之后JobDetail將從 JobStore 中移出。?
    Job持久性默認值是false.?
    Scheduler將移除沒有trigger關聯的jobDetail?

    Job 可恢復性 shuldRecover?
    當一個Job在執行中,Scheduler非正常的關閉,設置JobDetail 的setRequestsRecovery(true) 在 Scheduler 重啟之后可恢復的Job還會再次被執行。這個?
    Job 會重新開始執行。注意job代碼事務特性.?
    Job可恢復性默認為false,Scheduler不會試著去恢復job操作。?

    ?

    圖為表述沒有執行完成的job數據庫記錄?

    Scheduler 中移除 Job?
    移除所有與這個 Job 相關聯的 Trigger;如果這個 Job 是非持久性的,它將會從 Scheduler 中移出。?
    更直接的方式是使用 deleteJob() 方法,它還會刪除所有與當前job關聯的trigger?

    public boolean deleteJob(String jobName, String groupName) throws SchedulerException;?
    quartz 本身提供的 Job?
    org.quartz.jobs.FileScanJob 檢查某個指定文件是否變化,并在文件被改變時通知到相應監聽器的 Job?
    org.quartz.jobs.FileScanListener 在文件被修改后通知 FileScanJob 的監聽器?
    org.quartz.jobs.NativeJob 用來執行本地程序(如 windows 下 .exe 文件) 的 Job?
    org.quartz.jobs.NoOpJob 什么也不做,但用來測試監聽器不是很有用的。一些用戶甚至僅僅用它來導致一個監聽器的運行?
    org.quartz.jobs.ee.mail.SendMailJob 使用 JavaMail API 發送 e-mail 的 Job?
    org.quartz.jobs.ee.jmx.JMXInvokerJob 調用 JMX bean 上的方法的 Job?
    org.quartz.jobs.ee.ejb.EJBInvokerJob 用來調用 EJB 上方法的 Job?

    job的理解到此結束?

    理解quartz Trigger?
    Job 包含了要執行任務的邏輯,但是Job不負責何時執行。這個事情由觸發器(Trigger)負責。?
    Quartz Trigger繼承了抽象的org.quartz.Trigger 類。?
    目前,Quartz 有三個可用的實現?

    org.quartz.SimpleTrigger?
    org.quartz.CronTrigger?
    org.quartz.NthIncludeDayTrigger?
    使用org.quartz.SimpleTrigger?
    SimpleTrigger 是設置和使用是最為簡單的一種 Quartz Trigger。它是為那種需要在特定的日期/時間啟動,且以一個可能的間隔時間重復執行 n 次的 Job 所設計的。?

    SimpleTrigger 存在幾個變種的構造方法。他們是從無參的版本一直到帶全部參數的版本。?

    下面代碼版斷顯示了一個僅帶有trigger 的名字和組的簡單構造方法?

    SimpleTrigger sTrigger = new SimpleTrigger("myTrigger", Scheduler.DEFAULT_GROUP);?
    這個 Trigger 會立即執行,而不重復。還有一個構造方法帶有多個參數,配置 Triiger 在某一特定時刻觸發,重復執行多次,和兩?
    次觸發間的延遲時間。?

    Java代碼??
  • public?SimpleTrigger(String?name,?String?group,String?jobName,?String?jobGroup,??
  • ?Date?startTime,Date?endTime,?int?repeatCount,?long?repeatInterval);??

  • 使用org.quartz.CronTrigger?
    CronTrigger 是基于 Unix 類似于 cron 的表達式觸發,也是功能最強大和最常用的Trigger?
    Cron表達式:?

    Java代碼??
  • "0?0?12?*?*??"?????????????????????Fire?at?12pm?(noon)?every?day??
  • "0?15?10???*?*"???????????????????Fire?at?10:15am?every?day??
  • "0?15?10?*?*??"???????????????????Fire?at?10:15am?every?day??
  • "0?15?10?*?*???*"?????????????????Fire?at?10:15am?every?day??
  • "0?15?10?*?*???2005"???????????Fire?at?10:15am?every?day?during?the?year?2005??
  • "0?*?14?*?*??"?????????????????????Fire?every?minute?starting?at?2pm?and?ending?at?2:59pm,?every?day??
  • "0?0/5?14?*?*??"??????????????????Fire?every?5?minutes?starting?at?2pm?and?ending?at?2:55pm,?every?day??
  • "0?0/5?14,18?*?*??"??????????????Fire?every?5?minutes?starting?at?2pm?and?ending?at?2:55pm,?AND?fire?every?5?minutes?starting?at?6pm?and?ending?at?6:55pm,?every?day??
  • "0?0-5?14?*?*??"???????????????????Fire?every?minute?starting?at?2pm?and?ending?at?2:05pm,?every?day??
  • "0?10,44?14???3?WED"?????????Fire?at?2:10pm?and?at?2:44pm?every?Wednesday?in?the?month?of?March.??
  • "0?15?10???*?MON-FRI"????????Fire?at?10:15am?every?Monday,?Tuesday,?Wednesday,?Thursday?and?Friday??
  • "0?15?10?15?*??"??????????????????Fire?at?10:15am?on?the?15th?day?of?every?month??
  • "0?15?10?L?*??"????????????????????Fire?at?10:15am?on?the?last?day?of?every?month??
  • "0?15?10???*?6L"???????????????????Fire?at?10:15am?on?the?last?Friday?of?every?month??
  • "0?15?10???*?6L"???????????????????Fire?at?10:15am?on?the?last?Friday?of?every?month??
  • "0?15?10???*?6L?2002-2005"???Fire?at?10:15am?on?every?last?Friday?of?every?month?during?the?years?2002,?2003,?2004?and?2005??
  • "0?15?10???*?6#3"?????????????????Fire?at?10:15am?on?the?third?Friday?of?every?month??


  • 使用 org.quartz.NthIncludedDayTrigger?
    org.quartz.NthIncludedDayTrigger是設計用于在每一間隔類型的第幾天執行 Job。?
    例如,你要在每個月的 12 號執行發工資提醒的Job。接下來的代碼片斷描繪了如何創建一個 NthIncludedDayTrigger.?

    Java代碼??
  • //創建每個月的12號的NthIncludedDayTrigger??
  • NthIncludedDayTrigger?trigger?=?new?NthIncludedDayTrigger("MyTrigger",?Scheduler.DEFAULT_GROUP);??
  • trigger.setN(12);??
  • trigger.setIntervalType(NthIncludedDayTrigger.INTERVAL_TYPE_MONTHLY);??

  • jobDetail + trigger組成最基本的定時任務:?
    特別注意:一個job可以對應多個Trgger , 一個Trigger只能對應一個job .?

    如:CRM中N天未拜訪的job對應所有的N天未拜訪商家(一個商家一個trigger) 大約1:1000的比例?
    ??? job和trigger都是通過name 和 group 屬性確定唯一性的.?

    Quartz Calendar?
    Quartz 的 Calendar 對象與 Java API 的 java.util.Calendar不同。?
    Java 的 Calender 對象是通用的日期和時間工具;?
    Quartz 的 Calender 專門用于屏閉一個時間區間,使 Trigger 在這個區間中不被觸發。?
    例如,讓我們假如取消節假日執行job。?

    Quartz包括許多的 Calender 實現足以滿足大部分的需求.?

    org.quartz.impl.calendar.BaseCalender 為高級的 Calender 實現了基本的功能,實現了 org.quartz.Calender 接口?
    org.quartz.impl.calendar.WeeklyCalendar 排除星期中的一天或多天,例如,可用于排除周末?
    org.quartz.impl.calendar.MonthlyCalendar 排除月份中的數天,例如,可用于排除每月的最后一天?
    org.quartz.impl.calendar.AnnualCalendar 排除年中一天或多天?
    org.quartz.impl.calendar.HolidayCalendar 特別的用于從 Trigger 中排除節假日?

    使用Calendar,只需實例化后并加入你要排除的日期,然后用 Scheduler 注冊,最后必須讓Calender依附于Trigger實例。?

    排除國慶節實例?

    Java代碼??
  • private?void?scheduleJob(Scheduler?scheduler,?Class?jobClass)?{??
  • ????try?{??
  • ????????//?Create?an?instance?of?the?Quartz?AnnualCalendar??
  • ????????AnnualCalendar?cal?=?new?AnnualCalendar();??
  • ????????//?exclude?國慶節??
  • ????????Calendar?gCal?=?GregorianCalendar.getInstance();??
  • ????????gCal.set(Calendar.MONTH,?Calendar.OCTOBER);??
  • ????????List<Calendar>?mayHolidays?=?new?ArraysList<Calendar>();??
  • ????????for(int?i=1;?i<=7;?i++){??
  • ????????????gCal.set(Calendar.DATE,?i);??
  • ????????????mayHolidays.add(gCal);??
  • ????????}??
  • ????????cal.setDaysExcluded(mayHolidays);??
  • ????????//?Add?to?scheduler,?replace?existing,?update?triggers??
  • ????????scheduler.addCalendar("crmHolidays",?cal,?true,?true);??
  • ????????/*?
  • ????????*?Set?up?a?trigger?to?start?firing?now,?repeat?forever?
  • ????????*?and?have?(60000?ms)?between?each?firing.?
  • ????????*/??
  • ????????Trigger?trigger?=?TriggerUtils.makeImmediateTrigger("myTrigger",-1,60000);??
  • ????????//?Trigger?will?use?Calendar?to?exclude?firing?times??
  • ????????trigger.setCalendarName("crmHolidays");??
  • ????????JobDetail?jobDetail?=?new?JobDetail(jobClass.getName(),?Scheduler.DEFAULT_GROUP,?jobClass);??
  • ????????//?Associate?the?trigger?with?the?job?in?the?scheduler??
  • ????????scheduler.scheduleJob(jobDetail,?trigger);??
  • ????}?catch?(SchedulerException?ex)?{??
  • ????????logger.error(ex);??
  • ????}??
  • }??


  • Quartz 監聽器?
    Quartz 提供了三種類型的監聽器:監聽Job,監聽Trigger,和監聽Scheduler.?

    監聽器是作為擴展點存在的.?
    Quartz 監聽器是擴展點,可以擴展框架并定制來做特定的事情。跟Spring,Hibernate,Servlet監聽器類似.?
    實現監聽?
    1. 創建一個 Java 類,實現監聽器接口?
    2. 用你的應用中特定的邏輯實現監聽器接口的所有方法?
    3. 注冊監聽器?

    全局和非全局監聽器?
    JobListener 和 TriggerListener 可被注冊為全局或非全局監聽器。一個全局監聽器能接收到所有的 Job/Trigger 的事件通知。?
    而一個非全局監聽器只能接收到那些在其上已注冊了監聽器的 Job 或 Triiger 的事件。?

    作者:James House描述全局和非全局監聽器?
    全局監聽器是主動意識的,它們為了執行它們的任務而熱切的去尋找每一個可能的事件。通常,全局監聽器要做的工作不用指定到特定的 Job 或 Trigger。?
    非全局監聽器一般是被動意識的,它們在所關注的 Trigger 激發之前或是 Job 執行之前什么事也不做。因此,非全局的監聽器比起全局監聽器而言更適合于修改或增加 Job 執行的工作。?
    類似裝飾設計模式?
    監聽 Job 事件?
    org.quartz.JobListener 接口包含一系列的方法,它們會由 Job 在其生命周期中產生的某些關鍵事件時被調用?

    Java代碼??
  • public?interface?JobListener?{??
  • ????//命名jobListener?只對非全局監聽器有效??
  • ????public?String?getName();??
  • ??
  • ????//Scheduler?在?JobDetail?將要被執行時調用這個方法。??
  • ????public?void?jobToBeExecuted(JobExecutionContext?context);??
  • ??
  • //Scheduler?在?JobDetail?即將被執行,但又被否決時調用這個方法。??
  • ????public?void?jobExecutionVetoed(JobExecutionContext?context);??
  • ??
  • //Scheduler?在?JobDetail?被執行之后調用這個方法。??
  • ????public?void?jobWasExecuted(JobExecutionContext?context,JobExecutionException?jobException);??


  • ?
    圖7 job listener參與job的執行生命周期?

    注冊全局監聽器?

    Java代碼??
  • scheduler.addGlobalJobListener(jobListener);??


  • 注冊非全局監聽器(依次完成,順序不能顛倒)?

    Java代碼??
  • scheduler.addJobListener(jobListener);??
  • jobDetail.addJobListener(jobListener.getName());??
  • //如果已經存在jobDetail則覆蓋.??
  • scheduler.addjob(jobDetail,true);??

  • 監聽 Trigger 事件?
    org.quartz.TriggerListener 接口定義Trigger監聽器?

    Java代碼??
  • public?interface?TriggerListener?{??
  • ????//命名triggerListener?只對非全局監聽器有效??
  • ????public?String?getName();??
  • ??
  • ????//當與監聽器相關聯的?Trigger?被觸發,Job?上的?execute()?方法將要被執行時,調用這個方法。??
  • ????//在全局TriggerListener?情況下,這個方法為所有?Trigger?被調用。(不要做耗時操作)??
  • ????public?void?triggerFired(Trigger?trigger,?JobExecutionContext?context);??
  • ??
  • ????//在?Trigger?觸發后,Job?將要被執行時由調用這個方法。??
  • ????//TriggerListener給了一個選擇去否決?Job?的執行。假如這個方法返回?true,這個?Job?將不會為此次?Trigger?觸發而得到執行。??
  • ????public?boolean?vetoJobExecution(Trigger?trigger,?JobExecutidonContext?context);??
  • ??
  • ????//?Scheduler?調用這個方法是在?Trigger?錯過觸發時。??
  • ????//?JavaDoc?指出:你應該關注此方法中持續時間長的邏輯:在出現許多錯過觸發的?Trigger?時,長邏輯會導致骨牌效應。你應當保持這上方法盡量的小??
  • ????public?void?triggerMisfired(Trigger?trigger);??
  • ??
  • ????//Trigger?被觸發并且完成了Job的執行時調用這個方法。??
  • ????public?void?triggerComplete(Trigger?trigger,?JobExecutionContext?context,?int?triggerInstructionCode);??
  • }??

  • triggerListener的注冊與jobListener相同?

    監聽 Scheduler 事件?
    org.quartz.SchedulerListener 接口定義Trigger監聽器?

    Java代碼??
  • public?interface?SchedulerListener?{??
  • ????//有新的JobDetail部署調用這個方法。??
  • ????public?void?jobScheduled(Trigger?trigger);??
  • ??
  • ????//卸載時調用這個方法。??
  • ????public?void?jobUnscheduled(String?triggerName,?String?triggerGroup);??
  • ??
  • ????//當一個Trigger到達再也不會觸發時調用這個方法。??
  • ????public?void?triggerFinalized(Trigger?trigger);??
  • ??
  • ????//Scheduler?調用這個方法是發生在一個Trigger或多個Trigger被暫停時。假如是多個Trigger的話,triggerName?參數將為null。??
  • ????public?void?triggersPaused(String?triggerName,?String?triggerGroup);??
  • ??
  • ????//Scheduler?調用這個方法是發生成一個?Trigger?或?Trigger?組從暫停中恢復時。假如是多個Trigger的話,triggerName?參數將為?null。??
  • ????public?void?triggersResumed(String?triggerName,String?triggerGroup);??
  • ??
  • ????//當一個或一組?JobDetail?暫停時調用這個方法。??
  • ????public?void?jobsPaused(String?jobName,?String?jobGroup);??
  • ??
  • ????//當一個或一組?Job?從暫停上恢復時調用這個方法。假如是多個Job,jobName參數將為?null。??
  • ????public?void?jobsResumed(String?jobName,?String?jobGroup);??
  • ??
  • ????//?在Scheduler?的正常運行期間產生一個嚴重錯誤時調用這個方法。錯誤的類型會各式的,但是下面列舉了一些錯誤例子:??
  • ????//?可以使用?SchedulerException?的?getErrorCode()?或者?getUnderlyingException()?方法或獲取到特定錯誤的更詳盡的信息??
  • ????public?void?schedulerError(String?msg,?SchedulerException?cause);??
  • ??
  • ????//Scheduler?調用這個方法用來通知?SchedulerListener?Scheduler?將要被關閉。??
  • ????public?void?schedulerShutdown();??
  • }??


  • 注冊SchedulerListener(SchedulerListener不存在全局非全局性)?
    scheduler.addSchedulerListener(schedulerListener);?
    由于scheduler異常存在不打印問題,CRM使用監聽器代碼打印.?

    Java代碼??
  • public?class?QuartzExceptionSchedulerListener?extends?SchedulerListenerSupport{??
  • ????private?Logger?logger?=?LoggerFactory.getLogger(QuartzExceptionSchedulerListener.class);??
  • ????@Override??
  • ????public?void?schedulerError(String?message,?SchedulerException?e)?{??
  • ????????super.schedulerError(message,?e);??
  • ????????logger.error(message,?e.getUnderlyingException());??
  • ????}??
  • }??
  • ?

    Java代碼??
  • <bean??id="quartzExceptionSchedulerListener"??class="com.***.crm.quartz.listener.QuartzExceptionSchedulerListener"></bean>??
  • <!--?配置監聽器?-->??
  • <property?name="schedulerListeners">??
  • ????<list>??
  • ????????<ref?bean="quartzExceptionSchedulerListener"/>??
  • ????</list>??
  • </property>??

  • quartz與線程?
    主處理線程:QuartzSchedulerThread?
    啟動Scheduler時。QuartzScheduler被創建并創建一個org.quartz.core.QuartzSchedulerThread 類的實例。?
    QuartzSchedulerThread 包含有決定何時下一個Job將被觸發的處理循環。QuartzSchedulerThread 是一個 Java 線程。它作為一個非守護線程運行在正常優先級下。?

    QuartzSchedulerThread 的主處理輪循步驟:?
    1. 當 Scheduler 正在運行時:?
    A. 檢查是否有轉換為 standby 模式的請求。?
    1. 假如 standby 方法被調用,等待繼續的信號?
    B. 詢問 JobStore 下次要被觸發的 Trigger.?
    1. 如果沒有 Trigger 待觸發,等候一小段時間后再次檢查?
    2. 假如有一個可用的 Trigger,等待觸發它的確切時間的到來?
    D. 時間到了,為 Trigger 獲取到 triggerFiredBundle.?
    E. 使用Scheduler和triggerFiredBundle 為 Job 創建一個JobRunShell實例?
    F. 在ThreadPool 申請一個線程運行 JobRunShell 實例.?

    代碼邏輯在QuartzSchedulerThread 的 run() 中,如下:?

    Java代碼??
  • /**?
  • ?*?QuartzSchedulerThread.run?
  • ????*?<p>?
  • ????*?The?main?processing?loop?of?the?<code>QuartzSchedulerThread</code>.?
  • ????*?</p>?
  • ????*/??
  • ???public?void?run()?{??
  • ???????boolean?lastAcquireFailed?=?false;??
  • ???????while?(!halted.get())?{??
  • ???????????try?{??
  • ???????????????//?check?if?we're?supposed?to?pause...??
  • ???????????????synchronized?(sigLock)?{??
  • ???????????????????while?(paused?&&?!halted.get())?{??
  • ???????????????????????try?{??
  • ???????????????????????????//?wait?until?togglePause(false)?is?called...??
  • ???????????????????????????sigLock.wait(1000L);??
  • ???????????????????????}?catch?(InterruptedException?ignore)?{??
  • ???????????????????????}??
  • ???????????????????}??
  • ?????
  • ???????????????????if?(halted.get())?{??
  • ???????????????????????break;??
  • ???????????????????}??
  • ???????????????}??
  • ??
  • ???????????????int?availTreadCount?=?qsRsrcs.getThreadPool().blockForAvailableThreads();??
  • ???????????????if(availTreadCount?>?0)?{?//?will?always?be?true,?due?to?semantics?of?blockForAvailableThreads...??
  • ???????????????????Trigger?trigger?=?null;??
  • ??
  • ???????????????????long?now?=?System.currentTimeMillis();??
  • ???????????????????clearSignaledSchedulingChange();??
  • ???????????????????try?{??
  • ???????????????????????trigger?=?qsRsrcs.getJobStore().acquireNextTrigger(??
  • ???????????????????????????????ctxt,?now?+?idleWaitTime);??
  • ???????????????????????lastAcquireFailed?=?false;??
  • ???????????????????}?catch?(JobPersistenceException?jpe)?{??
  • ???????????????????????if(!lastAcquireFailed)?{??
  • ???????????????????????????qs.notifySchedulerListenersError(??
  • ???????????????????????????????"An?error?occured?while?scanning?for?the?next?trigger?to?fire.",??
  • ???????????????????????????????jpe);??
  • ???????????????????????}??
  • ???????????????????????lastAcquireFailed?=?true;??
  • ???????????????????}?catch?(RuntimeException?e)?{??
  • ???????????????????????if(!lastAcquireFailed)?{??
  • ???????????????????????????getLog().error("quartzSchedulerThreadLoop:?RuntimeException?"??
  • ???????????????????????????????????+e.getMessage(),?e);??
  • ???????????????????????}??
  • ???????????????????????lastAcquireFailed?=?true;??
  • ???????????????????}??
  • ??
  • ???????????????????if?(trigger?!=?null)?{??
  • ???????????????????????now?=?System.currentTimeMillis();??
  • ???????????????????????long?triggerTime?=?trigger.getNextFireTime().getTime();??
  • ???????????????????????long?timeUntilTrigger?=?triggerTime?-?now;??
  • ???????????????????????while(timeUntilTrigger?>?2)?{??
  • ????????????????????????synchronized(sigLock)?{??
  • ????????????????????????????if(!isCandidateNewTimeEarlierWithinReason(triggerTime,?false))?{??
  • ????????????????????????????????try?{??
  • ????????????????????????????????????//?we?could?have?blocked?a?long?while??
  • ????????????????????????????????????//?on?'synchronize',?so?we?must?recompute??
  • ????????????????????????????????????now?=?System.currentTimeMillis();??
  • ????????????????????????????????????timeUntilTrigger?=?triggerTime?-?now;??
  • ????????????????????????????????????if(timeUntilTrigger?>=?1)??
  • ????????????????????????????????????????sigLock.wait(timeUntilTrigger);??
  • ????????????????????????????????}?catch?(InterruptedException?ignore)?{??
  • ????????????????????????????????}??
  • ????????????????????????????}??
  • ????????????????????????}?????????????????????????????????
  • ????????????????????????if(releaseIfScheduleChangedSignificantly(trigger,?triggerTime))?{??
  • ????????????????????????????trigger?=?null;??
  • ????????????????????????????break;??
  • ????????????????????????}??
  • ????????????????????????now?=?System.currentTimeMillis();??
  • ????????????????????????timeUntilTrigger?=?triggerTime?-?now;??
  • ???????????????????????}??
  • ???????????????????????if(trigger?==?null)??
  • ????????????????????????continue;??
  • ?????????????????????????
  • ???????????????????????//?set?trigger?to?'executing'??
  • ???????????????????????TriggerFiredBundle?bndle?=?null;??
  • ??
  • ???????????????????????boolean?goAhead?=?true;??
  • ???????????????????????synchronized(sigLock)?{??
  • ????????????????????????goAhead?=?!halted.get();??
  • ???????????????????????}??
  • ??
  • ???????????????????????if(goAhead)?{??
  • ???????????????????????????try?{??
  • ???????????????????????????????bndle?=?qsRsrcs.getJobStore().triggerFired(ctxt,??
  • ???????????????????????????????????????trigger);??
  • ???????????????????????????}?catch?(SchedulerException?se)?{??
  • ???????????????????????????????qs.notifySchedulerListenersError(??
  • ???????????????????????????????????????"An?error?occured?while?firing?trigger?'"??
  • ???????????????????????????????????????????????+?trigger.getFullName()?+?"'",?se);??
  • ???????????????????????????}?catch?(RuntimeException?e)?{??
  • ???????????????????????????????getLog().error(??
  • ???????????????????????????????????"RuntimeException?while?firing?trigger?"?+??
  • ???????????????????????????????????trigger.getFullName(),?e);??
  • ???????????????????????????????//?db?connection?must?have?failed...?keep??
  • ???????????????????????????????//?retrying?until?it's?up...??
  • ???????????????????????????????releaseTriggerRetryLoop(trigger);??
  • ???????????????????????????}??
  • ???????????????????????}??
  • ?????????????????????????
  • ???????????????????????//?it's?possible?to?get?'null'?if?the?trigger?was?paused,??
  • ???????????????????????//?blocked,?or?other?similar?occurrences?that?prevent?it?being??
  • ???????????????????????//?fired?at?this?time...??or?if?the?scheduler?was?shutdown?(halted)??
  • ???????????????????????if?(bndle?==?null)?{??
  • ???????????????????????????try?{??
  • ???????????????????????????????qsRsrcs.getJobStore().releaseAcquiredTrigger(ctxt,??
  • ???????????????????????????????????????trigger);??
  • ???????????????????????????}?catch?(SchedulerException?se)?{??
  • ???????????????????????????????qs.notifySchedulerListenersError(??
  • ???????????????????????????????????????"An?error?occured?while?releasing?trigger?'"??
  • ???????????????????????????????????????????????+?trigger.getFullName()?+?"'",?se);??
  • ???????????????????????????????//?db?connection?must?have?failed...?keep?retrying??
  • ???????????????????????????????//?until?it's?up...??
  • ???????????????????????????????releaseTriggerRetryLoop(trigger);??
  • ???????????????????????????}??
  • ???????????????????????????continue;??
  • ???????????????????????}??
  • ??
  • ???????????????????????//?TODO:?improvements:??
  • ???????????????????????//??
  • ???????????????????????//?2-?make?sure?we?can?get?a?job?runshell?before?firing?trigger,?or??
  • ???????????????????????//???don't?let?that?throw?an?exception?(right?now?it?never?does,??
  • ???????????????????????//???but?the?signature?says?it?can).??
  • ???????????????????????//?3-?acquire?more?triggers?at?a?time?(based?on?num?threads?available?)??
  • ???????????????????????JobRunShell?shell?=?null;??
  • ???????????????????????try?{??
  • ???????????????????????????shell?=?qsRsrcs.getJobRunShellFactory().borrowJobRunShell();??
  • ???????????????????????????shell.initialize(qs,?bndle);??
  • ???????????????????????}?catch?(SchedulerException?se)?{??
  • ???????????????????????????try?{??
  • ???????????????????????????????qsRsrcs.getJobStore().triggeredJobComplete(ctxt,??
  • ???????????????????????????????????????trigger,?bndle.getJobDetail(),?Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);??
  • ???????????????????????????}?catch?(SchedulerException?se2)?{??
  • ???????????????????????????????qs.notifySchedulerListenersError(??
  • ???????????????????????????????????????"An?error?occured?while?placing?job's?triggers?in?error?state?'"??
  • ???????????????????????????????????????????????+?trigger.getFullName()?+?"'",?se2);??
  • ???????????????????????????????//?db?connection?must?have?failed...?keep?retrying??
  • ???????????????????????????????//?until?it's?up...??
  • ???????????????????????????????errorTriggerRetryLoop(bndle);??
  • ???????????????????????????}??
  • ???????????????????????????continue;??
  • ???????????????????????}??
  • ??
  • ???????????????????????if?(qsRsrcs.getThreadPool().runInThread(shell)?==?false)?{??
  • ???????????????????????????try?{??
  • ???????????????????????????????//?this?case?should?never?happen,?as?it?is?indicative?of?the??
  • ???????????????????????????????//?scheduler?being?shutdown?or?a?bug?in?the?thread?pool?or??
  • ???????????????????????????????//?a?thread?pool?being?used?concurrently?-?which?the?docs??
  • ???????????????????????????????//?say?not?to?do...??
  • ???????????????????????????????getLog().error("ThreadPool.runInThread()?return?false!");??
  • ???????????????????????????????qsRsrcs.getJobStore().triggeredJobComplete(ctxt,??
  • ???????????????????????????????????????trigger,?bndle.getJobDetail(),?Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);??
  • ???????????????????????????}?catch?(SchedulerException?se2)?{??
  • ???????????????????????????????qs.notifySchedulerListenersError(??
  • ???????????????????????????????????????"An?error?occured?while?placing?job's?triggers?in?error?state?'"??
  • ???????????????????????????????????????????????+?trigger.getFullName()?+?"'",?se2);??
  • ???????????????????????????????//?db?connection?must?have?failed...?keep?retrying??
  • ???????????????????????????????//?until?it's?up...??
  • ???????????????????????????????releaseTriggerRetryLoop(trigger);??
  • ???????????????????????????}??
  • ???????????????????????}??
  • ???????????????????????continue;??
  • ???????????????????}??
  • ???????????????}?else?{?//?if(availTreadCount?>?0)??
  • ???????????????????continue;?//?should?never?happen,?if?threadPool.blockForAvailableThreads()?follows?contract??
  • ???????????????}??
  • ??
  • ???????????????long?now?=?System.currentTimeMillis();??
  • ???????????????long?waitTime?=?now?+?getRandomizedIdleWaitTime();??
  • ???????????????long?timeUntilContinue?=?waitTime?-?now;??
  • ???????????????synchronized(sigLock)?{??
  • ????????????????try?{??
  • ????????????????????sigLock.wait(timeUntilContinue);??
  • ????????????????}?catch?(InterruptedException?ignore)?{??
  • ????????????????}??
  • ???????????????}??
  • ??
  • ???????????}?catch(RuntimeException?re)?{??
  • ???????????????getLog().error("Runtime?error?occured?in?main?trigger?firing?loop.",?re);??
  • ???????????}??
  • ???????}?//?loop...??
  • ??
  • ???????//?drop?references?to?scheduler?stuff?to?aid?garbage?collection...??
  • ???????qs?=?null;??
  • ???????qsRsrcs?=?null;??
  • ???}??


  • quartz工作者線程?
    Quartz 不會在主線程(QuartzSchedulerThread)中處理用戶的Job。Quartz 把線程管理的職責委托給ThreadPool。
    一般的設置使用org.quartz.simpl.SimpleThreadPool。SimpleThreadPool 創建了一定數量的 WorkerThread 實例來使得Job能夠在線程中進行處理。?
    WorkerThread 是定義在 SimpleThreadPool 類中的內部類,它實質上就是一個線程。?
    要創建 WorkerThread 的數量以及配置他們的優先級是在文件quartz.properties中并傳入工廠。?

    spring properties?

    Java代碼??
  • <prop?key="org.quartz.threadPool.class">org.quartz.simpl.SimpleThreadPool</prop>??
  • <prop?key="org.quartz.threadPool.threadCount">20</prop>??
  • <prop?key="org.quartz.threadPool.threadPriority">5</prop>??

  • 主線程(QuartzSchedulerThread)請求ThreadPool去運行 JobRunShell 實例,ThreadPool 就檢查看是否有一個可用的工作者線?
    程。假如所以已配置的工作者線程都是忙的,ThreadPool 就等待直到有一個變為可用。當一個工作者線程是可用的,?
    并且有一個JobRunShell 等待執行,工作者線程就會調用 JobRunShell 類的 run() 方法。?

    Quartz 框架允許替換線程池,但必須實現org.quartz.spi.ThreadPool 接口.?

    ?
    圖4 quartz內部的主線程和工作者線程?

    Quartz的存儲和持久化?
    Quartz 用 JobStores 對 Job、Trigger、calendar 和 Schduler 數據提供一種存儲機制。Scheduler 應用已配置的JobStore 來存儲和獲取到部署信息,并決定正被觸發執行的 Job 的職責。?
    所有的關于哪個 Job 要執行和以什么時間表來執行他們的信息都來存儲在 JobStore。?

    在 Quartz 中兩種可用的 Job 存儲類型是:?
    內存(非持久化) 存儲?
    持久化存儲?

    JobStore 接口?
    Quartz 為所有類型的Job存儲提供了一個接口。叫 JobStore。所有的Job存儲機制,不管是在哪里或是如何存儲他們的信息的,都必須實現這個接口。?
    JobStore 接口的 API 可歸納為下面幾類:?
    Job 相關的 API?
    Trigger 相關的 API?
    Calendar 相關的 API?
    Scheduler 相關的 API?

    使用內存來存儲 Scheduler 信息?
    Quartz 的內存Job存儲類叫做 org.quartz.simple.RAMJobStore,它實現了JobStore 接口的。?
    RAMJobStore 是 Quartz 的默認的解決方案。?
    使用這種內存JobStore的好處。?

    RAMJobStore是配置最簡單的 JobStore:默認已經配置好了。見quartz.jar:org.quartz.quartz.properties?
    RAMJobStore的速度非常快。所有的 quartz存儲操作都在計算機內存中?

    使用持久性的 JobStore?
    持久性 JobStore = JDBC + 關系型數據庫?

    Quartz 所有的持久化的 JobStore 都擴展自 org.quartz.impl.jdbcjobstore.JobStoreSupport 類。?

    ?

    圖5?
    JobStoreSupport 實現了 JobStore 接口,是作為 Quartz 提供的兩個具體的持久性 JobStore 類的基類。?
    Quartz 提供了兩種不同類型的JobStoreSupport實現類,每一個設計為針對特定的數據庫環境和配置:?
    ·org.quartz.impl.jdbcjobstore.JobStoreTX?
    ·org.quartz.impl.jdbcjobstore.JobStoreCMT?

    獨立環境中的持久性存儲?
    JobStoreTX 類設計為用于獨立環境中。這里的 "獨立",我們是指這樣一個環境,在其中不存在與應用容器的事務集成。?

    #properties配置?
    org.quartz.jobStore.class = org.quartz.ompl.jdbcjobstore.JobStoreTX?


    依賴容器相關的持久性存儲?
    JobStoreCMT 類設計為與程序容器事務集成,容器管理的事物(Container Managed Transactions (CMT))?

    crm使用JobStoreTX 因為quart有長時間鎖等待情況,不參與系統本身事務(crm任務內事務與quartz本身事務分離).

    Quartz 數據庫結構?

    表名描述?
    QRTZ_CALENDARS 以 Blob 類型存儲 Quartz 的 Calendar 信息?
    QRTZ_CRON_TRIGGERS 存儲 Cron Trigger,包括 Cron 表達式和時區信息?
    QRTZ_FIRED_TRIGGERS 存儲與已觸發的 Trigger 相關的狀態信息,以及相聯 Job 的執行信息?
    QRTZ_PAUSED_TRIGGER_GRPS 存儲已暫停的 Trigger 組的信息?
    QRTZ_SCHEDULER_STATE 存儲少量的有關 Scheduler 的狀態信息,和別的 Scheduler 實例(假如是用于一個集群中)?
    QRTZ_LOCKS 存儲程序的非觀鎖的信息(假如使用了悲觀鎖)?
    QRTZ_JOB_DETAILS 存儲每一個已配置的 Job 的詳細信息?
    QRTZ_JOB_LISTENERS 存儲有關已配置的 JobListener 的信息?
    QRTZ_SIMPLE_TRIGGERS 存儲簡單的 Trigger,包括重復次數,間隔,以及已觸的次數?
    QRTZ_BLOG_TRIGGERS Trigger 作為 Blob 類型存儲(用于 Quartz 用戶用 JDBC 創建他們自己定制的 Trigger 類型,JobStore 并不知道如何存儲實例的時候)?
    QRTZ_TRIGGER_LISTENERS 存儲已配置的 TriggerListener 的信息?
    QRTZ_TRIGGERS 存儲已配置的 Trigger 的信息?
    所有的表默認以前綴QRTZ_開始。可以通過在 quartz.properties配置修改(org.quartz.jobStore.tablePrefix = QRTZ_)。?
    可以對不同的Scheduler實例使用多套的表,通過改變前綴來實現。?

    優化 quartz數據表結構?
    -- 1:對關鍵查詢路徑字段建立索引?

    Java代碼??
  • create?index?idx_qrtz_t_next_fire_time?on?QRTZ_TRIGGERS(NEXT_FIRE_TIME);??
  • create?index?idx_qrtz_t_state?on?QRTZ_TRIGGERS(TRIGGER_STATE);??
  • create?index?idx_qrtz_t_nf_st?on?QRTZ_TRIGGERS(TRIGGER_STATE,NEXT_FIRE_TIME);??
  • create?index?idx_qrtz_ft_trig_group?on?QRTZ_FIRED_TRIGGERS(TRIGGER_GROUP);??
  • create?index?idx_qrtz_ft_trig_name?on?QRTZ_FIRED_TRIGGERS(TRIGGER_NAME);??
  • create?index?idx_qrtz_ft_trig_n_g?on?QRTZ_FIRED_TRIGGERS(TRIGGER_NAME,TRIGGER_GROUP);??
  • create?index?idx_qrtz_ft_trig_inst_name?on?QRTZ_FIRED_TRIGGERS(INSTANCE_NAME);??
  • create?index?idx_qrtz_ft_job_name?on?QRTZ_FIRED_TRIGGERS(JOB_NAME);??
  • create?index?idx_qrtz_ft_job_group?on?QRTZ_FIRED_TRIGGERS(JOB_GROUP);??


  • -- 2:根據Mysql innodb表結構特性,調整主鍵,降低二級索引的大小?

    Java代碼??
  • ALTER?TABLE?QRTZ_TRIGGERS??
  • ADD?UNIQUE?KEY?IDX_NAME_GROUP(TRIGGER_NAME,TRIGGER_GROUP),??
  • DROP?PRIMARY?KEY,??
  • ADD?ID?INT?UNSIGNED?NOT?NULL?AUTO_INCREMENT?FIRST,??
  • ADD?PRIMARY?KEY?(ID);??
  • ALTER?TABLE?QRTZ_JOB_DETAILS??
  • ADD?UNIQUE?KEY?IDX_NAME_GROUP(JOB_NAME,JOB_GROUP),??
  • DROP?PRIMARY?KEY,??
  • ADD?ID?INT?UNSIGNED?NOT?NULL?AUTO_INCREMENT?FIRST,??
  • ADD?PRIMARY?KEY?(ID);??

  • Quartz集群?
    只有使用持久的JobStore才能完成Quqrtz集群?

    ?
    圖6?
    一個 Quartz 集群中的每個節點是一個獨立的 Quartz 應用,它又管理著其他的節點。?
    需要分別對每個節點分別啟動或停止。不像應用服務器的集群,獨立的 Quartz 節點并不與另一個節點或是管理節點通信。?
    Quartz 應用是通過數據庫表來感知到另一應用。?

    配置集群?

    Xml代碼??
  • <prop?key="org.quartz.jobStore.class">org.quartz.impl.jdbcjobstore.JobStoreTX</prop>??
  • <!--?集群配置?-->??
  • <prop?key="org.quartz.jobStore.isClustered">true</prop>??
  • <prop?key="org.quartz.jobStore.clusterCheckinInterval">15000</prop>??
  • <prop?key="org.quartz.jobStore.maxMisfiresToHandleAtATime">1</prop>??
  • <!--?數據源配置?使用DBCP連接池?數據源與dataSource一致?-->??
  • <prop?key="org.quartz.jobStore.dataSource">myDS</prop>??
  • <prop?key="org.quartz.dataSource.myDS.driver">${database.driverClassName}</prop>??
  • <prop?key="org.quartz.dataSource.myDS.URL">${database.url}</prop>??
  • <prop?key="org.quartz.dataSource.myDS.user">${database.username}</prop>??
  • <prop?key="org.quartz.dataSource.myDS.password">${database.password}</prop>??
  • <prop?key="org.quartz.dataSource.myDS.maxConnections">5</prop>??


  • org.quartz.jobStore.class 屬性為 JobStoreTX,?
    將任務持久化到數據中。因為集群中節點依賴于數據庫來傳播Scheduler實例的狀態,你只能在使用 JDBC JobStore 時應用 Quartz 集群。?

    org.quartz.jobStore.isClustered 屬性為 true,通知Scheduler實例要它參與到一個集群當中。?

    org.quartz.jobStore.clusterCheckinInterval?

    屬性定義了Scheduler 實例檢入到數據庫中的頻率(單位:毫秒)。?
    Scheduler 檢查是否其他的實例到了它們應當檢入的時候未檢入;?
    這能指出一個失敗的 Scheduler 實例,且當前 Scheduler 會以此來接管任何執行失敗并可恢復的 Job。?
    通過檢入操作,Scheduler 也會更新自身的狀態記錄。clusterChedkinInterval 越小,Scheduler 節點檢查失敗的 Scheduler 實例就越頻繁。默認值是 15000 (即15 秒)?

    集群實現分析?
    Quartz原來碼分析:?
    基于數據庫表鎖實現多Quartz_Node 對Job,Trigger,Calendar等同步機制?

    Sql代碼??
  • --?數據庫鎖定表??
  • CREATE?TABLE?`QRTZ_LOCKS`?(??
  • ??`LOCK_NAME`?varchar(40)?NOT?NULL,??
  • ??PRIMARY?KEY?(`LOCK_NAME`)??
  • )?ENGINE=InnoDB?DEFAULT?CHARSET=utf8;??
  • --?記錄??
  • +-----------------+??
  • |?LOCK_NAME???????|??
  • +-----------------+??
  • |?CALENDAR_ACCESS?|???
  • |?JOB_ACCESS??????|???
  • |?MISFIRE_ACCESS??|???
  • |?STATE_ACCESS????|???
  • |?TRIGGER_ACCESS??|???
  • +-----------------+??


  • 通過行級別鎖實現多節點處理?

    Java代碼??
  • /**?
  • ?*?Internal?database?based?lock?handler?for?providing?thread/resource?locking??
  • ?*?in?order?to?protect?resources?from?being?altered?by?multiple?threads?at?the??
  • ?*?same?time.?
  • ?*??
  • ?*?@author?jhouse?
  • ?*/??
  • public?class?StdRowLockSemaphore?extends?DBSemaphore?{??
  • ??
  • ????/*?
  • ?????*?Constants.?
  • ?????*?鎖定SQL語句?
  • ?????*??
  • ?????*/??
  • ????public?static?final?String?SELECT_FOR_LOCK?=?"SELECT?*?FROM?"??
  • ????????????+?TABLE_PREFIX_SUBST?+?TABLE_LOCKS?+?"?WHERE?"?+?COL_LOCK_NAME??
  • ????????????+?"?=???FOR?UPDATE";??
  • ??
  • ????/**?
  • ?????*?This?constructor?is?for?using?the?<code>StdRowLockSemaphore</code>?as?
  • ?????*?a?bean.?
  • ?????*/??
  • ????public?StdRowLockSemaphore()?{??
  • ????????super(DEFAULT_TABLE_PREFIX,?null,?SELECT_FOR_LOCK);??
  • ????}??
  • ??
  • ????public?StdRowLockSemaphore(String?tablePrefix,?String?seletWithLockSQL)?{??
  • ????????super(tablePrefix,?selectWithLockSQL,?SELECT_FOR_LOCK);??
  • ????}??
  • ??
  • ????/**?
  • ?????*?Execute?the?SQL?select?for?update?that?will?lock?the?proper?database?row.?
  • ?????*?指定鎖定SQL?
  • ?????*/??
  • ????protected?void?executeSQL(Connection?conn,?String?lockName,?String?expandedSQL)?throws?LockException?{??
  • ????????PreparedStatement?ps?=?null;??
  • ????????ResultSet?rs?=?null;??
  • ????????try?{??
  • ????????????ps?=?conn.prepareStatement(expandedSQL);??
  • ????????????ps.setString(1,?lockName);??
  • ??
  • ????????????if?(getLog().isDebugEnabled())?{??
  • ????????????????getLog().debug(??
  • ????????????????????"Lock?'"?+?lockName?+?"'?is?being?obtained:?"?+???
  • ????????????????????Thread.currentThread().getName());??
  • ????????????}??
  • ????????????rs?=?ps.executeQuery();??
  • ????????????if?(!rs.next())?{??
  • ????????????????throw?new?SQLException(Util.rtp(??
  • ????????????????????"No?row?exists?in?table?"?+?TABLE_PREFIX_SUBST?+???
  • ????????????????????TABLE_LOCKS?+?"?for?lock?named:?"?+?lockName,?getTablePrefix()));??
  • ????????????}??
  • ????????}?catch?(SQLException?sqle)?{??
  • ????????????if?(getLog().isDebugEnabled())?{??
  • ????????????????getLog().debug(??
  • ????????????????????"Lock?'"?+?lockName?+?"'?was?not?obtained?by:?"?+???
  • ????????????????????Thread.currentThread().getName());??
  • ????????????}??
  • ????????????throw?new?LockException("Failure?obtaining?db?row?lock:?"??
  • ????????????????????+?sqle.getMessage(),?sqle);??
  • ????????}?finally?{??
  • ????????????if?(rs?!=?null)?{???
  • ????????????????try?{??
  • ????????????????????rs.close();??
  • ????????????????}?catch?(Exception?ignore)?{??
  • ????????????????}??
  • ????????????}??
  • ????????????if?(ps?!=?null)?{??
  • ????????????????try?{??
  • ????????????????????ps.close();??
  • ????????????????}?catch?(Exception?ignore)?{??
  • ????????????????}??
  • ????????????}??
  • ????????}??
  • ????}??
  • ??
  • ????protected?String?getSelectWithLockSQL()?{??
  • ????????return?getSQL();??
  • ????}??
  • ??
  • ????public?void?setSelectWithLockSQL(String?selectWithLockSQL)?{??
  • ????????setSQL(selectWithLockSQL);??
  • ????}??
  • }??
  • ??
  • ???/**?
  • ?????*?Grants?a?lock?on?the?identified?resource?to?the?calling?thread?(blocking?
  • ?????*?until?it?is?available).?
  • ?????*?獲取QRTZ_LOCKS行級鎖?
  • ?????*?@return?true?if?the?lock?was?obtained.?
  • ?????*/??
  • ????public?boolean?obtainLock(Connection?conn,?String?lockName)?throws?LockException?{??
  • ????????lockName?=?lockName.intern();??
  • ??
  • ????????Logger?log?=?getLog();??
  • ??
  • ????????if(log.isDebugEnabled())?{??
  • ????????????log.debug(??
  • ????????????????"Lock?'"?+?lockName?+?"'?is?desired?by:?"??
  • ????????????????????????+?Thread.currentThread().getName());??
  • ????????}??
  • ????????if?(!isLockOwner(conn,?lockName))?{??
  • ????????????executeSQL(conn,?lockName,?expandedSQL);??
  • ??????????????
  • ????????????if(log.isDebugEnabled())?{??
  • ????????????????log.debug(??
  • ????????????????????"Lock?'"?+?lockName?+?"'?given?to:?"??
  • ????????????????????????????+?Thread.currentThread().getName());??
  • ????????????}??
  • ????????????getThreadLocks().add(lockName);??
  • ????????????//getThreadLocksObtainer().put(lockName,?new??
  • ????????????//?Exception("Obtainer..."));??
  • ????????}?else?if(log.isDebugEnabled())?{??
  • ????????????log.debug(??
  • ????????????????"Lock?'"?+?lockName?+?"'?Is?already?owned?by:?"??
  • ????????????????????????+?Thread.currentThread().getName());??
  • ????????}??
  • ????????return?true;??
  • ????}??
  • ??
  • ????/**?
  • ?????*?Release?the?lock?on?the?identified?resource?if?it?is?held?by?the?calling?thread.?
  • ?????*?釋放QRTZ_LOCKS行級鎖?
  • ?????*/??
  • ????public?void?releaseLock(Connection?conn,?String?lockName)?{??
  • ????????lockName?=?lockName.intern();??
  • ??
  • ????????if?(isLockOwner(conn,?lockName))?{??
  • ????????????if(getLog().isDebugEnabled())?{??
  • ????????????????getLog().debug(??
  • ????????????????????"Lock?'"?+?lockName?+?"'?returned?by:?"??
  • ????????????????????????????+?Thread.currentThread().getName());??
  • ????????????}??
  • ????????????getThreadLocks().remove(lockName);??
  • ????????????//getThreadLocksObtainer().remove(lockName);??
  • ????????}?else?if?(getLog().isDebugEnabled())?{??
  • ????????????getLog().warn(??
  • ????????????????"Lock?'"?+?lockName?+?"'?attempt?to?return?by:?"??
  • ????????????????????????+?Thread.currentThread().getName()??
  • ????????????????????????+?"?--?but?not?owner!",??
  • ????????????????new?Exception("stack-trace?of?wrongful?returner"));??
  • ????????}??
  • ????}??


  • JobStoreTX 控制并發代碼?

    Java代碼??
  • /**?
  • ?*?Execute?the?given?callback?having?optionally?aquired?the?given?lock.?
  • ?*?For?<code>JobStoreTX</code>,?because?it?manages?its?own?transactions?
  • ?*?and?only?has?the?one?datasource,?this?is?the?same?behavior?as??
  • ?*?executeInNonManagedTXLock().??
  • ?*?@param?lockName?The?name?of?the?lock?to?aquire,?for?example??
  • ?*?"TRIGGER_ACCESS".??If?null,?then?no?lock?is?aquired,?but?the?
  • ?*?lockCallback?is?still?executed?in?a?transaction.?
  • ?*??
  • ?*?@see?JobStoreSupport#executeInNonManagedTXLock(String,?TransactionCallback)?
  • ????*?@see?JobStoreCMT#executeInLock(String,?TransactionCallback)?
  • ????*?@see?JobStoreSupport#getNonManagedTXConnection()?
  • ????*?@see?JobStoreSupport#getConnection()?
  • */??
  • ???protected?Object?executeInLock(String?lockName,?TransactionCallback?txCallback)?throws?JobPersistenceException?{??
  • ???????return?executeInNonManagedTXLock(lockName,?txCallback);??
  • ???}??
  • ??
  • 使用JobStoreSupport.executeInNonManagedTXLock?實現:??
  • /**?
  • ????*?Execute?the?given?callback?having?optionally?aquired?the?given?lock.?
  • ????*?This?uses?the?non-managed?transaction?connection.?
  • ????*??
  • ????*?@param?lockName?The?name?of?the?lock?to?aquire,?for?example??
  • ????*?"TRIGGER_ACCESS".??If?null,?then?no?lock?is?aquired,?but?the?
  • ????*?lockCallback?is?still?executed?in?a?non-managed?transaction.??
  • ????*/??
  • ???protected?Object?executeInNonManagedTXLock(??
  • ???????????String?lockName,???
  • ???????????TransactionCallback?txCallback)?throws?JobPersistenceException?{??
  • ???????boolean?transOwner?=?false;??
  • ???????Connection?conn?=?null;??
  • ???????try?{??
  • ???????????if?(lockName?!=?null)?{??
  • ???????????????//?If?we?aren't?using?db?locks,?then?delay?getting?DB?connection???
  • ???????????????//?until?after?acquiring?the?lock?since?it?isn't?needed.??
  • ???????????????if?(getLockHandler().requiresConnection())?{??
  • ???????????????????conn?=?getNonManagedTXConnection();??
  • ???????????????}??
  • ????????????//獲取鎖??
  • ???????????????transOwner?=?getLockHandler().obtainLock(conn,?lockName);??
  • ???????????}??
  • ???????????if?(conn?==?null)?{??
  • ???????????????conn?=?getNonManagedTXConnection();??
  • ???????????}??
  • ????????????//回調需要執行的sql語句如:(更新Trigger為運行中(ACQUIRED),刪除執行過的Trigger等)??
  • ???????????Object?result?=?txCallback.execute(conn);??
  • ????????//JobStoreTX自身維護事務??
  • ???????????commitConnection(conn);??
  • ???????????Long?sigTime?=?clearAndGetSignalSchedulingChangeOnTxCompletion();??
  • ???????????if(sigTime?!=?null?&&?sigTime?>=?0)?{??
  • ???????????????signalSchedulingChangeImmediately(sigTime);??
  • ???????????}??
  • ???????????return?result;??
  • ???????}?catch?(JobPersistenceException?e)?{??
  • ???????????rollbackConnection(conn);??
  • ???????????throw?e;??
  • ???????}?catch?(RuntimeException?e)?{??
  • ???????????rollbackConnection(conn);??
  • ???????????throw?new?JobPersistenceException("Unexpected?runtime?exception:?"?+?e.getMessage(),?e);??
  • ???????}?finally?{??
  • ???????????try?{??
  • ????????????//釋放鎖??
  • ???????????????releaseLock(conn,?lockName,?transOwner);??
  • ???????????}?finally?{??
  • ???????????????cleanupConnection(conn);??
  • ???????????}??
  • ???????}??
  • ???}??


  • JobStoreCMT 控制并發代碼?

    Java代碼??
  • /**?
  • ????*?Execute?the?given?callback?having?optionally?acquired?the?given?lock.???
  • ????*?Because?CMT?assumes?that?the?connection?is?already?part?of?a?managed?
  • ????*?transaction,?it?does?not?attempt?to?commit?or?rollback?the??
  • ????*?enclosing?transaction.?
  • ????*??
  • ????*?@param?lockName?The?name?of?the?lock?to?acquire,?for?example??
  • ????*?"TRIGGER_ACCESS".??If?null,?then?no?lock?is?acquired,?but?the?
  • ????*?txCallback?is?still?executed?in?a?transaction.?
  • ????*??
  • ????*?@see?JobStoreSupport#executeInNonManagedTXLock(String,?TransactionCallback)?
  • ????*?@see?JobStoreTX#executeInLock(String,?TransactionCallback)?
  • ????*?@see?JobStoreSupport#getNonManagedTXConnection()?
  • ????*?@see?JobStoreSupport#getConnection()?
  • ????*/??
  • ??
  • ???protected?Object?executeInLock(String?lockName,?TransactionCallback?txCallback)?throws?JobPersistenceException?{??
  • ???????boolean?transOwner?=?false;??
  • ???????Connection?conn?=?null;??
  • ???????try?{??
  • ???????????if?(lockName?!=?null)?{??
  • ???????????????//?If?we?aren't?using?db?locks,?then?delay?getting?DB?connection???
  • ???????????????//?until?after?acquiring?the?lock?since?it?isn't?needed.??
  • ???????????????if?(getLockHandler().requiresConnection())?{??
  • ???????????????????conn?=?getConnection();??
  • ???????????????}??
  • ???????????????transOwner?=?getLockHandler().obtainLock(conn,?lockName);??
  • ???????????}??
  • ??
  • ???????????if?(conn?==?null)?{??
  • ???????????????conn?=?getConnection();??
  • ???????????}??
  • ????????//沒有事務提交操作,與任務共享一個事務??
  • ???????????return?txCallback.execute(conn);??
  • ???????}?finally?{??
  • ???????????try?{??
  • ???????????????releaseLock(conn,?LOCK_TRIGGER_ACCESS,?transOwner);??
  • ???????????}?finally?{??
  • ???????????????cleanupConnection(conn);??
  • ???????????}??
  • ???????}??
  • ???}??


  • CRM中quartz與Spring結合使用?
    Spring 通過提供org.springframework.scheduling.quartz下的封裝類對quartz支持?
    但是目前存在問題?
    1:Spring3.0目前不支持Quartz2.x以上版本?

    Caused by: java.lang.IncompatibleClassChangeError: class org.springframework.scheduling.quartz.CronTriggerBean?
    has interface org.quartz.CronTrigger as super class?
    原因是 org.quartz.CronTrigger在2.0從class變成了一個interface造成IncompatibleClassChangeError錯誤。?

    解決:無解,要想使用spring和quartz結合的方式 只能使用Quartz1.x版本。?

    2:org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean報?
    java.io.NotSerializableException異常,需要自己實現QuartzJobBean。?

    解決:spring bug己經在http://jira.springframework.org/browse/SPR-3797找到解決方案,?
    作者重寫了MethodInvokingJobDetailFactoryBean.?

    3:Spring內bean必須要實現序列化接口,否則不能通過Sprng 屬性注入的方式為job提供業務對象?

    解決:?

    Java代碼??
  • //使用可序列化工具類獲取Spring容器對象??
  • @Service("springBeanService")??
  • public?class?SpringBeanService?implements?Serializable{private?static?final?long?serialVersionUID?=?-2228376078979553838L;??
  • ????public?<T>?T?getBean(Class<T>?clazz,String?beanName){??
  • ????????ApplicationContext?context?=?ContextLoader.getCurrentWebApplicationContext();??
  • ????????return?(T)context.getBean(beanName);??
  • ????}??
  • }??


  • CRM中quartz模塊部分代碼?
    1:定義所有job的父類,并負責異常發送郵件任務和日志任務?

    Java代碼??
  • public?abstract?class?BaseQuartzJob?implements?Job,?Serializable?{??
  • ????private?static?final?long?serialVersionUID?=?3347549365534415931L;??
  • ????private?Logger?logger?=?LoggerFactory.getLogger(this.getClass());??
  • ??????
  • ????//定義抽象方法,供子類實現??
  • ????public?abstract?void?action(JobExecutionContext?context);??
  • ??????
  • ????@Override??
  • ????public?void?execute(JobExecutionContext?context)?throws?JobExecutionException?{??
  • ????????try?{??
  • ????????????long?start?=?System.currentTimeMillis();??
  • ????????????this.action(context);??
  • ????????????long?end?=?System.currentTimeMillis();??
  • ????????????JobDetail?jobDetail?=?context.getJobDetail();??
  • ????????????Trigger?trigger?=?context.getTrigger();??
  • ????????????StringBuilder?buffer?=?new?StringBuilder();??
  • ????????????buffer.append("jobName?=?").append(jobDetail.getName()).append("?triggerName?=?")??
  • ????????????.append(trigger.getName()).append("?執行完成?,?耗時:?").append((end?-?start)).append("?ms");??
  • ????????????logger.info(buffer.toString());??
  • ????????}?catch?(Exception?e)?{??
  • ????????????doResolveException(context?!=?null???context.getMergedJobDataMap()?:?null,?e);??
  • ????????}??
  • ????}??
  • ????@SuppressWarnings("unchecked")??
  • ????private?void?doResolveException(JobDataMap?dataMap,?Exception?ex)?{??
  • ????????//發送郵件實現此處省略??
  • ????????//...??
  • ????}??
  • }??


  • 2:抽象Quartz操作接口(實現類 toSee: QuartzServiceImpl)?

    Java代碼??
  • /**?
  • ?*?
  • ?*?@author?zhangyijun?
  • ?*?@created?2012-10-22?
  • ?*?
  • ?*?@version?1.0?
  • ?*/??
  • @Service??
  • public?interface?QuartzService?{??
  • /**?
  • ?*?獲取所有trigger?
  • ?*?@param?page?
  • ?*?@param?orderName?
  • ?*?@param?sortType?
  • ?*?@return?
  • ?*/??
  • ?List<Map<String,?Object>>?getQrtzTriggers(Page?page,?String?orderName,?String?sortType);??
  • /**?
  • ?*?獲取所有jobDetail?
  • ?*?
  • ?*?@return?
  • ?*/??
  • ?List<Map<String,?Object>>?getQrtzJobDetails();??
  • /**?
  • ?*?執行Trigger操作?
  • ?*?
  • ?*?@param?name?
  • ?*?@param?group?
  • ?*?@param?action?
  • ?*?<br/>?
  • ?*/??
  • ?void?executeTriggerAction(String?name,?String?group,?Integer?action);??
  • /**?
  • ?*?執行JobDetail操作?
  • ?*?
  • ?*?@param?name?
  • ?*?@param?group?
  • ?*?@param?action?
  • ?*?<br/>?
  • ?*/??
  • ?void?executeJobAction(String?name,?String?group,?Integer?action);??
  • /**?
  • ?*?動態添加trigger?
  • ?*?
  • ?*?@param?jobName?
  • ?*?@param?jobGroup?
  • ?*?@param?triggerBean?
  • ?*/??
  • ?void?addTrigger(String?jobName,?String?jobGroup,?TriggerViewBean?triggerBean);??
  • /**?
  • ?*?定時執行任務?
  • ?*?
  • ?*?@param?jobDetail?
  • ?*?@param?data?
  • ?*/??
  • ??
  • ?void?addTriggerForDate(JobDetail?jobDetail,?String?triggerName?,?String??
  • ?triggerGroup?,?Date?date,?Map<String,?Object>?triggerDataMap)?;??
  • /**?
  • ?*?獲取分布式Scheduler列表?
  • ?*?
  • ?*?@return?
  • ?*/??
  • ?List<Map<String,?Object>>?getSchedulers();??
  • /**?
  • ?*?獲取觸發器?
  • ?*?@param?name?
  • ?*?@param?group?
  • ?*?@return?
  • ?*/??
  • ?public?Trigger?getTrigger(String?name,?String?group);??
  • /**?
  • ?*?獲取JobDetail?
  • ?*?@param?name?
  • ?*?@param?group?
  • ?*?@return?
  • ?*/??
  • ?public?JobDetail?getJobDetail(String?name,?String?group);??
  • }??

  • 3:在Spring配置job,trigger,Scheduler,Listener組件?

    Xml代碼??
  • <!--?掃描商家狀態創建定時任務?-->??
  • <bean?id="accountStatusTaskScannerJobDetail"??
  • ?class="org.springframework.scheduling.quartz.JobDetailBean">??
  • ????<property?name="name"?value="accountStatusTaskScannerJobDetail"></property>??
  • ????<property?name="group"?value="CrmAccountGroup"></property>??
  • ????<property?name="jobClass"?value="***.crm.quartz.job.AccountStatusTaskScannerJob"></property>??
  • ????<!--?requestsRecovery屬性為true,則當Quartz服務被中止后,再次啟動任務時會嘗試恢復執行之前未完成的所有任務-->??
  • ????<property?name="requestsRecovery"?value="true"/>??
  • ????<!--?標識job是持久的,刪除所有觸發器的時候不被刪除?-->??
  • ????<property?name="durability"?value="true"/>??
  • ????<property?name="volatility"?value="false"></property>??
  • </bean>??
  • <bean?id="accountStatusTaskScannerTrigger"?class="org.springframework.scheduling.quartz.CronTriggerBean">??
  • ?????<property?name="group"?value="CrmDealGroup"></property>??
  • ?????<property?name="name"?value="accountStatusTaskScannerTrigger"></property>??
  • ????<property?name="jobDetail"?ref="accountStatusTaskScannerJobDetail"></property>??
  • ????<property?name="cronExpression"?value="0?0?1?*?*??"></property>??
  • </bean>??
  • ??
  • <!--?定義Quartz?監聽器?-->??
  • <bean?id="quartzExceptionSchedulerListener"???
  • class="***.crm.quartz.listener.QuartzExceptionSchedulerListener"></bean>??
  • ??
  • <!--?Quartz調度工廠?-->??
  • <bean?id="quartzScheduler"??
  • ?class="org.springframework.scheduling.quartz.SchedulerFactoryBean">??
  • ????<property?name="quartzProperties">??
  • ????<props>??
  • ????????<prop?key="org.quartz.scheduler.instanceName">CRMscheduler</prop>??
  • ????????<prop?key="org.quartz.scheduler.instanceId">AUTO</prop>??
  • ????????<!--?線程池配置?-->??
  • ????????<prop?key="org.quartz.threadPool.class">org.quartz.simpl.SimpleThreadPool</prop>??
  • ????????<prop?key="org.quartz.threadPool.threadCount">20</prop>??
  • ????????<prop?key="org.quartz.threadPool.threadPriority">5</prop>??
  • ????????<!--?JobStore?配置?-->??
  • ????????<prop?key="org.quartz.jobStore.class">org.quartz.impl.jdbcjobstore.JobStoreTX</prop>??
  • ????????<!--?集群配置?-->??
  • ????????<prop?key="org.quartz.jobStore.isClustered">false</prop>??
  • ????????<prop?key="org.quartz.jobStore.clusterCheckinInterval">15000</prop>??
  • ????????<prop?key="org.quartz.jobStore.maxMisfiresToHandleAtATime">1</prop>??
  • ????????<!--?數據源配置?使用DBCP連接池?數據源與dataSource一致?-->??
  • ????????<prop?key="org.quartz.jobStore.dataSource">myDS</prop>??
  • ????????<prop?key="org.quartz.dataSource.myDS.driver">${database.driverClassName}</prop>??
  • ????????<prop?key="org.quartz.dataSource.myDS.URL">${database.url}</prop>??
  • ????????<prop?key="org.quartz.dataSource.myDS.user">${database.username}</prop>??
  • ????????<prop?key="org.quartz.dataSource.myDS.password">${database.password}</prop>??
  • ????????<prop?key="org.quartz.dataSource.myDS.maxConnections">5</prop>??
  • ????????<prop?key="org.quartz.jobStore.misfireThreshold">120000</prop>??
  • ????</props>??
  • ????</property>??
  • ????<property?name="schedulerName"?value="CRMscheduler"?/>??
  • ????<!--必須的,QuartzScheduler?延時啟動,應用啟動完后?QuartzScheduler?再啟動-->??
  • ????<property?name="startupDelay"?value="30"/>??
  • ????<property?name="applicationContextSchedulerContextKey"?value="applicationContextKey"?/>??
  • ????<!--可選,QuartzScheduler?啟動時更新己存在的Job,這樣就不用每次修改targetObject后刪除qrtz_job_details表對應記錄了?-->??
  • ????<property?name="overwriteExistingJobs"?value="true"?/>??
  • ????<!--?設置自動啟動?-->??
  • ????<property?name="autoStartup"?value="true"?/>??
  • ????<!--?注冊觸發器?-->??
  • ????<property?name="triggers">??
  • ?<list>??
  • ????<ref?bean="dailyStatisticsTrigger"?/>??
  • ????<ref?bean="accountGrabedScannerTrigger"?/>??
  • ????<ref?bean="syncAccountFromPOITrigger"?/>??
  • ????<ref?bean="userSyncScannerTrigger"?/>??
  • ????<ref?bean="syncParentBranchFromPOITrigger"/>??
  • ????<ref?bean="privateReminderTrigger"?/>??
  • ????<ref?bean="onlineBranchesScannerTrigger"?/>??
  • ????<ref?bean="syncCtContactServiceTrigger"?/>??
  • ????<ref?bean="dealLinkDianpingScannerTrigger"?/>??
  • ????<ref?bean="accountStatusTaskScannerTrigger"/>??
  • ????<ref?bean="nDaysActivityScannerTrigger"/>??
  • ?</list>??
  • ?</property>??
  • <!--?注冊jobDetail?-->??
  • ?<property?name="jobDetails">??
  • ????<list>??
  • ????????<ref?bean="myTestQuartzJobDetail"/>??
  • ????????<ref?bean="accountPrivateToProtectedJobDetail"/>??
  • ????????<ref?bean="accountProtectedToPublicJobDetail"/>??
  • ?<ref?bean="nDaysActivityToProtectedJobDetail"/>??
  • ?</list>??
  • ?</property>??
  • <property?name="schedulerListeners">??
  • ????<list>??
  • ????????<ref?bean="quartzExceptionSchedulerListener"/>??
  • ????</list>??
  • ?</property>??
  • </bean>??


  • Crm目前可以做到對Quartz實例的監控,操作.動態部署Trigger?


    ?
    ?

    后續待開發功能和問題?

    1:目前實現對job,Trigger操作,動態部署Trigger,后續需要加入Calendar(排除特定日期),Listener(動態加載監控),Job的動態部署(只要bean的名稱和方法名,就可完成對job生成,部署)?

    2:由于Quartz集群中的job目前是在任意一臺server中執行,Quartz日志生成各自的系統目錄中, quartz日志無法統一.?

    3:Quartz2.x已經支持可選節點執行job(期待Spring升級后對新Quartz支持)?

    4:Quartz內部的DB操作大量Trigger存在嚴重競爭問題,瞬間大量trigger執行,目前只能通過(org.quartz.jobStore.tablePrefix = QRTZ)分表操作,存在長時間lock_wait(新版本據說有提高);?

    5:如果有需要,可以抽取出Quartz,變成單獨的服務,供其它系統調度使用使用

    轉載于:https://www.cnblogs.com/davidwang456/p/4205572.html

    總結

    以上是生活随笔為你收集整理的项目中使用Quartz集群分享--转载的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。