定时任务的实现原理,看完就能手撸一个!
一、摘要
在很多業務的系統中,我們常常需要定時的執行一些任務,例如定時發短信、定時變更數據、定時發起促銷活動等等。
在上篇文章中,我們簡單的介紹了定時任務的使用方式,不同的架構對應的解決方案也有所不同,總結起來主要分單機和分布式兩大類,本文會重點分析下單機的定時任務實現原理以及優缺點,分布式框架的實現原理會在后續文章中進行分析。
從單機角度,定時任務實現主要有以下 3 種方案:
while + sleep 組合
最小堆實現
時間輪實現
二、while+sleep組合
while+sleep 方案,簡單的說,就是定義一個線程,然后 while 循環,通過 sleep 延遲時間來達到周期性調度任務。
簡單示例如下:
public?static?void?main(String[]?args)?{final?long?timeInterval?=?5000;new?Thread(new?Runnable()?{@Overridepublic?void?run()?{while?(true)?{System.out.println(Thread.currentThread().getName()?+?"每隔5秒執行一次");try?{Thread.sleep(timeInterval);}?catch?(InterruptedException?e)?{e.printStackTrace();}}}}).start(); }實現上非常簡單,如果我們想在創建一個每隔3秒鐘執行一次任務,怎么辦呢?
同樣的,也可以在創建一個線程,然后間隔性的調度方法;但是如果創建了大量這種類型的線程,這個時候會發現大量的定時任務線程在調度切換時性能消耗會非常大,而且整體效率低!
面對這種在情況,大佬們也想到了,于是想出了用一個線程將所有的定時任務存起來,事先排好序,按照一定的規則來調度,這樣不就可以極大的減少每個線程的切換消耗嗎?
正因此,JDK 中的 Timer 定時器由此誕生了!
三、最小堆實現
所謂最小堆方案,正如我們上面所說的,每當有新任務加入的時候,會把需要即將要執行的任務排到前面,同時會有一個線程不斷的輪詢判斷,如果當前某個任務已經到達執行時間點,就會立即執行,具體實現代表就是 JDK 中的 Timer 定時器!
3.1、Timer
首先我們來一個簡單的 Timer 定時器例子
public?static?void?main(String[]?args)?{Timer?timer?=?new?Timer();//每隔1秒調用一次timer.schedule(new?TimerTask()?{@Overridepublic?void?run()?{System.out.println("test1");}},?1000,?1000);//每隔3秒調用一次timer.schedule(new?TimerTask()?{@Overridepublic?void?run()?{System.out.println("test2");}},?3000,?3000);}實現上,好像跟我們上面介紹的 while+sleep 方案差不多,同樣也是起一個TimerTask線程任務,只不過共用一個Timer調度器。
下面我們一起來打開源碼看看里面到底有些啥!
進入Timer.schedule()方法
從方法上可以看出,這里主要做參數驗證,其中TimerTask是一個線程任務,delay表示延遲多久執行(單位毫秒),period表示多久執行一次(單位毫秒)
public?void?schedule(TimerTask?task,?long?delay,?long?period)?{if?(delay?<?0)throw?new?IllegalArgumentException("Negative?delay.");if?(period?<=?0)throw?new?IllegalArgumentException("Non-positive?period.");sched(task,?System.currentTimeMillis()+delay,?-period); }接著看sched()方法
這步操作中,可以很清晰的看到,在同步代碼塊里,會將task對象加入到queue
private?void?sched(TimerTask?task,?long?time,?long?period)?{if?(time?<?0)throw?new?IllegalArgumentException("Illegal?execution?time.");//?Constrain?value?of?period?sufficiently?to?prevent?numeric//?overflow?while?still?being?effectively?infinitely?large.if?(Math.abs(period)?>?(Long.MAX_VALUE?>>?1))period?>>=?1;synchronized(queue)?{if?(!thread.newTasksMayBeScheduled)throw?new?IllegalStateException("Timer?already?cancelled.");synchronized(task.lock)?{if?(task.state?!=?TimerTask.VIRGIN)throw?new?IllegalStateException("Task?already?scheduled?or?cancelled");task.nextExecutionTime?=?time;task.period?=?period;task.state?=?TimerTask.SCHEDULED;}queue.add(task);if?(queue.getMin()?==?task)queue.notify();} }我們繼續來看queue對象
任務會將入到TaskQueue隊列中,同時在Timer初始化階段會將TaskQueue作為參數傳入到TimerThread線程中,并且起到線程
public?class?Timer?{private?final?TaskQueue?queue?=?new?TaskQueue();private?final?TimerThread?thread?=?new?TimerThread(queue);public?Timer()?{this("Timer-"?+?serialNumber());}public?Timer(String?name)?{thread.setName(name);thread.start();}//... }而TaskQueue其實是一個最小堆的數據實體類,源碼如下
每當有新元素加入的時候,會對原來的數組進行重排,會將即將要執行的任務排在數組的前面
class?TaskQueue?{private?TimerTask[]?queue?=?new?TimerTask[128];private?int?size?=?0;void?add(TimerTask?task)?{//?Grow?backing?store?if?necessaryif?(size?+?1?==?queue.length)queue?=?Arrays.copyOf(queue,?2*queue.length);queue[++size]?=?task;fixUp(size);}private?void?fixUp(int?k)?{while?(k?>?1)?{int?j?=?k?>>?1;if?(queue[j].nextExecutionTime?<=?queue[k].nextExecutionTime)break;TimerTask?tmp?=?queue[j];queue[j]?=?queue[k];queue[k]?=?tmp;k?=?j;}}//.... }最后我們來看看TimerThread
TimerThread其實就是一個任務調度線程,首先從TaskQueue里面獲取排在最前面的任務,然后判斷它是否到達任務執行時間點,如果已到達,就會立刻執行任務
class?TimerThread?extends?Thread?{boolean?newTasksMayBeScheduled?=?true;private?TaskQueue?queue;TimerThread(TaskQueue?queue)?{this.queue?=?queue;}public?void?run()?{try?{mainLoop();}?finally?{//?Someone?killed?this?Thread,?behave?as?if?Timer?cancelledsynchronized(queue)?{newTasksMayBeScheduled?=?false;queue.clear();??//?Eliminate?obsolete?references}}}/***?The?main?timer?loop.??(See?class?comment.)*/private?void?mainLoop()?{while?(true)?{try?{TimerTask?task;boolean?taskFired;synchronized(queue)?{//?Wait?for?queue?to?become?non-emptywhile?(queue.isEmpty()?&&?newTasksMayBeScheduled)queue.wait();if?(queue.isEmpty())break;?//?Queue?is?empty?and?will?forever?remain;?die//?Queue?nonempty;?look?at?first?evt?and?do?the?right?thinglong?currentTime,?executionTime;task?=?queue.getMin();synchronized(task.lock)?{if?(task.state?==?TimerTask.CANCELLED)?{queue.removeMin();continue;??//?No?action?required,?poll?queue?again}currentTime?=?System.currentTimeMillis();executionTime?=?task.nextExecutionTime;if?(taskFired?=?(executionTime<=currentTime))?{if?(task.period?==?0)?{?//?Non-repeating,?removequeue.removeMin();task.state?=?TimerTask.EXECUTED;}?else?{?//?Repeating?task,?reschedulequeue.rescheduleMin(task.period<0???currentTime???-?task.period:?executionTime?+?task.period);}}}if?(!taskFired)?//?Task?hasn't?yet?fired;?waitqueue.wait(executionTime?-?currentTime);}if?(taskFired)??//?Task?fired;?run?it,?holding?no?lockstask.run();}?catch(InterruptedException?e)?{}}} }總結這個利用最小堆實現的方案,相比 while + sleep 方案,多了一個線程來管理所有的任務,優點就是減少了線程之間的性能開銷,提升了執行效率;但是同樣也帶來的了一些缺點,整體的新加任務寫入效率變成了 O(log(n))。
同時,細心的發現,這個方案還有以下幾個缺點:
串行阻塞:調度線程只有一個,長任務會阻塞短任務的執行,例如,A任務跑了一分鐘,B任務至少需要等1分鐘才能跑
容錯能力差:沒有異常處理能力,一旦一個任務執行故障,后續任務都無法執行
3.2、ScheduledThreadPoolExecutor
鑒于 Timer 的上述缺陷,從 Java 5 開始,推出了基于線程池設計的 ScheduledThreadPoolExecutor 。
其設計思想是,每一個被調度的任務都會由線程池來管理執行,因此任務是并發執行的,相互之間不會受到干擾。需要注意的是,只有當任務的執行時間到來時,ScheduledThreadPoolExecutor 才會真正啟動一個線程,其余時間 ScheduledThreadPoolExecutor 都是在輪詢任務的狀態。
簡單的使用示例:
public?static?void?main(String[]?args)?{ScheduledThreadPoolExecutor?executor?=?new?ScheduledThreadPoolExecutor(3);//啟動1秒之后,每隔1秒執行一次executor.scheduleAtFixedRate((new?Runnable()?{@Overridepublic?void?run()?{System.out.println("test3");}}),1,1,?TimeUnit.SECONDS);//啟動1秒之后,每隔3秒執行一次executor.scheduleAtFixedRate((new?Runnable()?{@Overridepublic?void?run()?{System.out.println("test4");}}),1,3,?TimeUnit.SECONDS); }同樣的,我們首先打開源碼,看看里面到底做了啥
進入scheduleAtFixedRate()方法
首先是校驗基本參數,然后將任務作為封裝到ScheduledFutureTask線程中,ScheduledFutureTask繼承自RunnableScheduledFuture,并作為參數調用delayedExecute()方法進行預處理
public?ScheduledFuture<?>?scheduleAtFixedRate(Runnable?command,long?initialDelay,long?period,TimeUnit?unit)?{if?(command?==?null?||?unit?==?null)throw?new?NullPointerException();if?(period?<=?0)throw?new?IllegalArgumentException();ScheduledFutureTask<Void>?sft?=new?ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay,?unit),unit.toNanos(period));RunnableScheduledFuture<Void>?t?=?decorateTask(command,?sft);sft.outerTask?=?t;delayedExecute(t);return?t; }繼續看delayedExecute()方法
可以很清晰的看到,當線程池沒有關閉的時候,會通過super.getQueue().add(task)操作,將任務加入到隊列,同時調用ensurePrestart()方法做預處理
private?void?delayedExecute(RunnableScheduledFuture<?>?task)?{if?(isShutdown())reject(task);else?{super.getQueue().add(task);if?(isShutdown()?&&!canRunInCurrentRunState(task.isPeriodic())?&&remove(task))task.cancel(false);else//預處理ensurePrestart();} }其中super.getQueue()得到的是一個自定義的new DelayedWorkQueue()阻塞隊列,數據存儲方面也是一個最小堆結構的隊列,這一點在初始化new ScheduledThreadPoolExecutor()的時候,可以看出!
public?ScheduledThreadPoolExecutor(int?corePoolSize)?{super(corePoolSize,?Integer.MAX_VALUE,?0,?NANOSECONDS,new?DelayedWorkQueue()); }打開源碼可以看到,DelayedWorkQueue其實是ScheduledThreadPoolExecutor中的一個靜態內部類,在添加的時候,會將任務加入到RunnableScheduledFuture數組中,同時線程池中的Woker線程會通過調用任務隊列中的take()方法獲取對應的ScheduledFutureTask線程任務,接著執行對應的任務線程
static?class?DelayedWorkQueue?extends?AbstractQueue<Runnable>implements?BlockingQueue<Runnable>?{private?static?final?int?INITIAL_CAPACITY?=?16;private?RunnableScheduledFuture<?>[]?queue?=new?RunnableScheduledFuture<?>[INITIAL_CAPACITY];private?final?ReentrantLock?lock?=?new?ReentrantLock();private?int?size?=?0;???//....public?boolean?add(Runnable?e)?{return?offer(e);}public?boolean?offer(Runnable?x)?{if?(x?==?null)throw?new?NullPointerException();RunnableScheduledFuture<?>?e?=?(RunnableScheduledFuture<?>)x;final?ReentrantLock?lock?=?this.lock;lock.lock();try?{int?i?=?size;if?(i?>=?queue.length)grow();size?=?i?+?1;if?(i?==?0)?{queue[0]?=?e;setIndex(e,?0);}?else?{siftUp(i,?e);}if?(queue[0]?==?e)?{leader?=?null;available.signal();}}?finally?{lock.unlock();}return?true;}public?RunnableScheduledFuture<?>?take()?throws?InterruptedException?{final?ReentrantLock?lock?=?this.lock;lock.lockInterruptibly();try?{for?(;;)?{RunnableScheduledFuture<?>?first?=?queue[0];if?(first?==?null)available.await();else?{long?delay?=?first.getDelay(NANOSECONDS);if?(delay?<=?0)return?finishPoll(first);first?=?null;?//?don't?retain?ref?while?waitingif?(leader?!=?null)available.await();else?{Thread?thisThread?=?Thread.currentThread();leader?=?thisThread;try?{available.awaitNanos(delay);}?finally?{if?(leader?==?thisThread)leader?=?null;}}}}}?finally?{if?(leader?==?null?&&?queue[0]?!=?null)available.signal();lock.unlock();}} }回到我們最開始說到的ScheduledFutureTask任務線程類,最終執行任務的其實就是它
ScheduledFutureTask任務線程,才是真正執行任務的線程類,只是繞了一圈,做了很多包裝,run()方法就是真正執行定時任務的方法。
private?class?ScheduledFutureTask<V>extends?FutureTask<V>?implements?RunnableScheduledFuture<V>?{/**?Sequence?number?to?break?ties?FIFO?*/private?final?long?sequenceNumber;/**?The?time?the?task?is?enabled?to?execute?in?nanoTime?units?*/private?long?time;/***?Period?in?nanoseconds?for?repeating?tasks.??A?positive*?value?indicates?fixed-rate?execution.??A?negative?value*?indicates?fixed-delay?execution.??A?value?of?0?indicates?a*?non-repeating?task.*/private?final?long?period;/**?The?actual?task?to?be?re-enqueued?by?reExecutePeriodic?*/RunnableScheduledFuture<V>?outerTask?=?this;/***?Overrides?FutureTask?version?so?as?to?reset/requeue?if?periodic.*/public?void?run()?{boolean?periodic?=?isPeriodic();if?(!canRunInCurrentRunState(periodic))cancel(false);else?if?(!periodic)ScheduledFutureTask.super.run();else?if?(ScheduledFutureTask.super.runAndReset())?{setNextRunTime();reExecutePeriodic(outerTask);}}//... }3.3、小結
ScheduledExecutorService 相比 Timer 定時器,完美的解決上面說到的 Timer 存在的兩個缺點!
在單體應用里面,使用 ScheduledExecutorService 可以解決大部分需要使用定時任務的業務需求!
但是這是否意味著它是最佳的解決方案呢?
我們發現線程池中 ScheduledExecutorService 的排序容器跟 Timer 一樣,都是采用最小堆的存儲結構,新任務加入排序效率是O(log(n)),執行取任務是O(1)。
這里的寫入排序效率其實是有空間可提升的,有可能優化到O(1)的時間復雜度,也就是我們下面要介紹的時間輪實現!
四、時間輪實現
所謂時間輪(RingBuffer)實現,從數據結構上看,簡單的說就是循環隊列,從名稱上看可能感覺很抽象。
它其實就是一個環形的數組,如圖所示,假設我們創建了一個長度為 8 的時間輪。
插入、取值流程:
1.當我們需要新建一個 1s 延時任務的時候,則只需要將它放到下標為 1 的那個槽中,2、3、...、7也同樣如此。
2.而如果是新建一個 10s 的延時任務,則需要將它放到下標為 2 的槽中,但同時需要記錄它所對應的圈數,也就是 1 圈,不然就和 2 秒的延時消息重復了
3.當創建一個 21s 的延時任務時,它所在的位置就在下標為 5 的槽中,同樣的需要為他加上圈數為 2,依次類推...
因此,總結起來有兩個核心的變量:
數組下標:表示某個任務延遲時間,從數據操作上對執行時間點進行取余
圈數:表示需要循環圈數
通過這張圖可以更直觀的理解!
當我們需要取出延時任務時,只需要每秒往下移動這個指針,然后取出該位置的所有任務即可,取任務的時間消耗為O(1)。
當我們需要插入任務式,也只需要計算出對應的下表和圈數,即可將任務插入到對應的數組位置中,插入任務的時間消耗為O(1)。
如果時間輪的槽比較少,會導致某一個槽上的任務非常多,那么效率也比較低,這就和 HashMap 的 hash 沖突是一樣的,因此在設計槽的時候不能太大也不能太小。
4.1、代碼實現
首先創建一個RingBufferWheel時間輪定時任務管理器
接著,編寫一個客戶端,測試客戶端
運行結果:
如果要周期性執行任務,可以在任務執行完成之后,再重新加入到時間輪中。
詳細源碼分析地址:[https://crossoverjie.top/2019/09/27/algorithm/time%20wheel/]
4.2、應用
時間輪的應用還是非常廣的,例如在 Disruptor 項目中就運用到了 RingBuffer,還有Netty中的HashedWheelTimer工具原理也差不多等等,有興趣的同學,可以閱讀一下官方對應的源碼!
五、小結
本文主要圍繞單體應用中的定時任務原理進行分析,可能也有理解不對的地方,歡迎批評指出!
六、參考
1、簡書 - 談談定時任務解決方案原理
2、crossoverJie's Blog - 延時消息之時間輪
往期推薦史上最全的延遲任務實現方式匯總!附代碼(強烈推薦)
2020-04-14
定時任務最簡單的3種實現方法(超好用)
2020-08-18
文件寫入的6種方法,這種方法性能最好
2020-12-22
關注我,每天陪你進步一點點!
總結
以上是生活随笔為你收集整理的定时任务的实现原理,看完就能手撸一个!的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 最详细的 IDEA调试教程
- 下一篇: linq to js使用汇总