Java多线程(十二)之线程池深入分析(下)
一、數據結構與線程構造方法
?
由于已經看到了ThreadPoolExecutor的源碼,因此很容易就看到了ThreadPoolExecutor線程池的數據結構。圖1描述了這種數據結構。
圖1 ThreadPoolExecutor 數據結構
其實,即使沒有上述圖形描述ThreadPoolExecutor的數據結構,我們根據線程池的要求也很能夠猜測出其數據結構出來。
- 線程池需要支持多個線程并發執行,因此有一個線程集合Collection<Thread>來執行線程任務;
- 涉及任務的異步執行,因此需要有一個集合來緩存任務隊列Collection<Runnable>;
- 很顯然在多個線程之間協調多個任務,那么就需要一個線程安全的任務集合,同時還需要支持阻塞、超時操作,那么BlockingQueue是必不可少的;
- 既然是線程池,出發點就是提高系統性能同時降低資源消耗,那么線程池的大小就有限制,因此需要有一個核心線程池大小(線程個數)和一個最大線程池大小(線程個數),有一個計數用來描述當前線程池大小;
- 如果是有限的線程池大小,那么長時間不使用的線程資源就應該銷毀掉,這樣就需要一個線程空閑時間的計數來描述線程何時被銷毀;
- 前面描述過線程池也是有生命周期的,因此需要有一個狀態來描述線程池當前的運行狀態;
- 線程池的任務隊列如果有邊界,那么就需要有一個任務拒絕策略來處理過多的任務,同時在線程池的銷毀階段也需要有一個任務拒絕策略來處理新加入的任務;
- 上面種的線程池大小、線程空閑實際那、線程池運行狀態等等狀態改變都不是線程安全的,因此需要有一個全局的鎖(mainLock)來協調這些競爭資源;
- 除了以上數據結構以外,ThreadPoolExecutor還有一些狀態用來描述線程池的運行計數,例如線程池運行的任務數、曾經達到的最大線程數,主要用于調試和性能分析。
?
對于ThreadPoolExecutor而言,一個線程就是一個Worker對象,它與一個線程綁定,當Worker執行完畢就是線程執行完畢,這個在后面詳細討論線程池中線程的運行方式。
既然是線程池,那么就首先研究下線程的構造方法。
?
public interface ThreadFactory {
Thread newThread(Runnable r);
}
?
?
ThreadPoolExecutor使用一個線程工廠來構造線程。線程池都是提交一個任務Runnable,然后在某一個線程Thread中執行,ThreadFactory 負責如何創建一個新線程。
在J.U.C中有一個通用的線程工廠java.util.concurrent.Executors.DefaultThreadFactory,它的構造方式如下:
?
static class DefaultThreadFactory implements ThreadFactory {
static final AtomicInteger poolNumber = new AtomicInteger(1);
final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null)? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
?
?
在這個線程工廠中,同一個線程池的所有線程屬于同一個線程組,也就是創建線程池的那個線程組,同時線程池的名稱都是“pool-<poolNum>-thread-<threadNum>”,其中poolNum是線程池的數量序號,threadNum是此線程池中的線程數量序號。這樣如果使用jstack的話很容易就看到了系統中線程池的數量和線程池中線程的數量。另外對于線程池中的所有線程默認都轉換為非后臺線程,這樣主線程退出時不會直接退出JVM,而是等待線程池結束。還有一點就是默認將線程池中的所有線程都調為同一個級別,這樣在操作系統角度來看所有系統都是公平的,不會導致競爭堆積。
?
二、線程池中線程生命周期
?
一個線程Worker被構造出來以后就開始處于運行狀態。以下是一個線程執行的簡版邏輯。
?
private final class Worker implements Runnable {
private final ReentrantLock runLock = new ReentrantLock();
private Runnable firstTask;
Thread thread;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
}
private void runTask(Runnable task) {
final ReentrantLock runLock = this.runLock;
runLock.lock();
try {
task.run();
} finally {
runLock.unlock();
}
}
public void run() {
try {
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}
}
?
?
當提交一個任務時,如果需要創建一個線程(何時需要在下一節中探討)時,就調用線程工廠創建一個線程,同時將線程綁定到Worker工作隊列中。需要說明的是,Worker隊列構造的時候帶著一個任務Runnable,因此Worker創建時總是綁定著一個待執行任務。換句話說,創建線程的前提是有必要創建線程(任務數已經超出了線程或者強制創建新的線程,至于為何強制創建新的線程后面章節會具體分析),不會無緣無故創建一堆空閑線程等著任務。這是節省資源的一種方式。
一旦線程池啟動線程后(調用線程run())方法,那么線程工作隊列Worker就從第1個任務開始執行(這時候發現構造Worker時傳遞一個任務的好處了),一旦第1個任務執行完畢,就從線程池的任務隊列中取出下一個任務進行執行。循環如此,直到線程池被關閉或者任務拋出了一個RuntimeException。
由此可見,線程池的基本原理其實也很簡單,無非預先啟動一些線程,線程進入死循環狀態,每次從任務隊列中獲取一個任務進行執行,直到線程池被關閉。如果某個線程因為執行某個任務發生異常而終止,那么重新創建一個新的線程而已。如此反復。
其實,線程池原理看起來簡單,但是復雜的是各種策略,例如何時該啟動一個線程,何時該終止、掛起、喚醒一個線程,任務隊列的阻塞與超時,線程池的生命周期以及任務拒絕策略等等。
?
三、線程池任務執行流程
?
我們從一個API開始接觸Executor是如何處理任務隊列的。
java.util.concurrent.Executor.execute(Runnable)
Executes the given task sometime in the future. The task may execute in a new thread or in an existing pooled thread. If the task cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been reached, the task is handled by the current?RejectedExecutionHandler.
線程池中所有任務執行都依賴于此接口。這段話有以下幾個意思:
回答上面兩個“可能“。任務可能被執行,那不可能的情況就是上面說的情況3;可能不是立即執行,是因為任務可能還在隊列中排隊,因此還在等待分配線程執行。了解完了字面上的問題,我們再來看具體的實現。
?
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
?
?
這一段代碼看起來挺簡單的,其實這就是線程池最重要的一部分,如果能夠完全理解這一塊,線程池還是挺容易的。整個執行流程是這樣的:
文字描述步驟不夠簡單?下面圖形詳細表述了此過程。
老實說這個圖比上面步驟更難以理解,那么從何入手呢。
流程的入口很簡單,我們就是要執行一個任務(Runnable command),那么它的結束點在哪或者有哪幾個?
根據左邊這個圖我們知道可能有以下幾種出口:
(1)圖中的P1、P7,我們根據這條路徑可以看到,僅僅是將任務加入任務隊列(offer(command))了;
(2)圖中的P3,這條路徑不將任務加入任務隊列,但是啟動了一個新工作線程(Worker)進行掃尾操作,用戶處理為空的任務隊列;
(3)圖中的P4,這條路徑沒有將任務加入任務隊列,但是啟動了一個新工作線程(Worker),并且工作現場的第一個任務就是當前任務;
(4)圖中的P5、P6,這條路徑沒有將任務加入任務隊列,也沒有啟動工作線程,僅僅是拋給了任務拒絕策略。P2是任務加入了任務隊列卻因為線程池已經關閉于是又從任務隊列中刪除,并且拋給了拒絕策略。
如果上面的解釋還不清楚,可以去研究下面兩段代碼:
?
java.util.concurrent.ThreadPoolExecutor.addIfUnderCorePoolSize(Runnable)
java.util.concurrent.ThreadPoolExecutor.addIfUnderMaximumPoolSize(Runnable)
java.util.concurrent.ThreadPoolExecutor.ensureQueuedTaskHandled(Runnable)
?
?
那么什么時候一個任務被立即執行呢?
在線程池運行狀態下,如果線程池大小 小于 核心線程池大小或者線程池已滿(任務隊列已滿)并且線程池大小 小于 最大線程池大小(此時線程池大小 大于 核心線程池大小的),用程序描述為:
?
runState == RUNNING && ( poolSize < corePoolSize || poolSize < maxnumPoolSize && workQueue.isFull())?
?
上面的條件就是一個任務能夠被立即執行的條件。
有了execute的基礎,我們看看ExecutorService中的幾個submit方法的實現。
?
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Object> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
?
?
很簡單,不是么?對于一個線程池來說復雜的地方也就在execute方法的執行流程。在下一節中我們來討論下如何獲取任務的執行結果,也就是Future類的使用和原理。
?
四、線程池任務執行結果
?
這一節來探討下線程池中任務執行的結果以及如何阻塞線程、取消任務等等。
?
package info.imxylz.study.concurrency.future;
public class SleepForResultDemo implements Runnable {
static boolean result = false;
static void sleepWhile(long ms) {
try {
Thread.sleep(ms);
} catch (Exception e) {}
}
@Override
public void run() {
//do work
System.out.println("Hello, sleep a while.");
sleepWhile(2000L);
result = true;
}
public static void main(String[] args) {
SleepForResultDemo demo = new SleepForResultDemo();
Thread t = new Thread(demo);
t.start();
sleepWhile(3000L);
System.out.println(result);
}
}
?
?
在沒有線程池的時代里面,使用Thread.sleep(long)去獲取線程執行完畢的場景很多。顯然這種方式很笨拙,他需要你事先知道任務可能的執行時間,并且還會阻塞主線程,不管任務有沒有執行完畢。
?
package info.imxylz.study.concurrency.future;
public class SleepLoopForResultDemo implements Runnable {
boolean result = false;
volatile boolean finished = false;
static void sleepWhile(long ms) {
try {
Thread.sleep(ms);
} catch (Exception e) {}
}
@Override
public void run() {
//do work
try {
System.out.println("Hello, sleep a while.");
sleepWhile(2000L);
result = true;
} finally {
finished = true;
}
}
public static void main(String[] args) {
SleepLoopForResultDemo demo = new SleepLoopForResultDemo();
Thread t = new Thread(demo);
t.start();
while (!demo.finished) {
sleepWhile(10L);
}
System.out.println(demo.result);
}
}
?
?
使用volatile與while死循環的好處就是等待的時間可以稍微小一點,但是依然有CPU負載高并且阻塞主線程的問題。最簡單的降低CPU負載的方式就是使用Thread.join().
?
SleepLoopForResultDemo demo = new SleepLoopForResultDemo();
Thread t = new Thread(demo);
t.start();
t.join();
System.out.println(demo.result);
?
?
顯然這也是一種不錯的方式,另外還有自己寫鎖使用wait/notify的方式。其實join()從本質上講就是利用while和wait來實現的。
上面的方式中都存在一個問題,那就是會阻塞主線程并且任務不能被取消。為了解決這個問題,線程池中提供了一個Future接口。
在Future接口中提供了5個方法。
- V get() throws InterruptedException, ExecutionException: 等待計算完成,然后獲取其結果。
- V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException。最多等待為使計算完成所給定的時間之后,獲取其結果(如果結果可用)。
- boolean cancel(boolean mayInterruptIfRunning):試圖取消對此任務的執行。
- boolean isCancelled():如果在任務正常完成前將其取消,則返回?true。
- boolean isDone():如果任務已完成,則返回?true。 可能由于正常終止、異常或取消而完成,在所有這些情況中,此方法都將返回true。
API看起來容易,來研究下異常吧。get()請求獲取一個結果會阻塞當前進程,并且可能拋出以下三種異常:
- InterruptedException:執行任務的線程被中斷則會拋出此異常,此時不能知道任務是否執行完畢,因此其結果是無用的,必須處理此異常。
- ExecutionException:任務執行過程中(Runnable#run())方法可能拋出RuntimeException,如果提交的是一個java.util.concurrent.Callable<V>接口任務,那么java.util.concurrent.Callable.call()方法有可能拋出任意異常。
- CancellationException:實際上get()方法還可能拋出一個CancellationException的RuntimeException,也就是任務被取消了但是依然去獲取結果。
對于get(long timeout, TimeUnit unit)而言,除了get()方法的異常外,由于有超時機制,因此還可能得到一個TimeoutException。
boolean cancel(boolean mayInterruptIfRunning)方法比較復雜,各種情況比較多:
來看看Future接口的實現類java.util.concurrent.FutureTask<V>具體是如何操作的。
在FutureTask中使用了一個AQS數據結構來完成各種狀態以及加鎖、阻塞的實現。
在此AQS類java.util.concurrent.FutureTask.Sync中一個任務用4中狀態:
初始情況下任務狀態state=0,任務執行(innerRun)后狀態變為運行狀態RUNNING(state=1),執行完畢后變成運行結束狀態RAN(state=2)。任務在初始狀態或者執行狀態被取消后就變為狀態CANCELLED(state=4)。AQS最擅長無鎖情況下處理幾種簡單的狀態變更的。
?
void innerRun() {
if (!compareAndSetState(0, RUNNING))
return;
try {
runner = Thread.currentThread();
if (getState() == RUNNING) // recheck after setting thread
innerSet(callable.call());
else
releaseShared(0); // cancel
} catch (Throwable ex) {
innerSetException(ex);
}
}
?
?
執行一個任務有四步:設置運行狀態、設置當前線程(AQS需要)、執行任務(Runnable#run或者Callable#call)、設置執行結果。這里也可以看到,一個任務只能執行一次,因為執行完畢后它的狀態不在為初始值0,要么為CANCELLED,要么為RAN。
取消一個任務(cancel)又是怎樣進行的呢?對比下前面取消任務的描述是不是很簡單,這里無非利用AQS的狀態來改變任務的執行狀態,最終達到放棄未啟動或者正在執行的任務的目的。
?
boolean innerCancel(boolean mayInterruptIfRunning) {
for (;;) {
int s = getState();
if (ranOrCancelled(s))
return false;
if (compareAndSetState(s, CANCELLED))
break;
}
if (mayInterruptIfRunning) {
Thread r = runner;
if (r != null)
r.interrupt();
}
releaseShared(0);
done();
return true;
}
?
?
到目前為止我們依然沒有說明到底是如何阻塞獲取一個結果的。下面四段代碼描述了這個過程。
?
V innerGet() throws InterruptedException, ExecutionException {
acquireSharedInterruptibly(0);
if (getState() == CANCELLED)
throw new CancellationException();
if (exception != null)
throw new ExecutionException(exception);
return result;
}
//AQS#acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg); //park current Thread for result
}
protected int tryAcquireShared(int ignore) {
return innerIsDone()? 1 : -1;
}
boolean innerIsDone() {
return ranOrCancelled(getState()) && runner == null;
}
?
?
當調用Future#get()的時候嘗試去獲取一個共享變量。這就涉及到AQS的使用方式了。這里獲取一個共享變量的狀態是任務是否結束(innerIsDone()),也就是任務是否執行完畢或者被取消。如果不滿足條件,那么在AQS中就會doAcquireSharedInterruptibly(arg)掛起當前線程,直到滿足條件。AQS前面講過,掛起線程使用的是LockSupport的park方式,因此性能消耗是很低的。
至于將Runnable接口轉換成Callable接口,java.util.concurrent.Executors.callable(Runnable, T)也提供了一個簡單實現。
?
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
?
?
五、延遲、周期性任務調度的實現
?
java.util.concurrent.ScheduledThreadPoolExecutor是默認的延遲、周期性任務調度的實現。
有了整個線程池的實現,再回頭來看延遲、周期性任務調度的實現應該就很簡單了,因為所謂的延遲、周期性任務調度,無非添加一系列有序的任務隊列,然后按照執行順序的先后來處理整個任務隊列。如果是周期性任務,那么在執行完畢的時候加入下一個時間點的任務即可。
由此可見,ScheduledThreadPoolExecutor和ThreadPoolExecutor的唯一區別在于任務是有序(按照執行時間順序)的,并且需要到達時間點(臨界點)才能執行,并不是任務隊列中有任務就需要執行的。也就是說唯一不同的就是任務隊列BlockingQueue<Runnable> workQueue不一樣。ScheduledThreadPoolExecutor的任務隊列是java.util.concurrent.ScheduledThreadPoolExecutor.DelayedWorkQueue,它是基于java.util.concurrent.DelayQueue<RunnableScheduledFuture>隊列的實現。
DelayQueue是基于有序隊列PriorityQueue實現的。PriorityQueue?也叫優先級隊列,按照自然順序對元素進行排序,類似于TreeMap/Collections.sort一樣。
同樣是有序隊列,DelayQueue和PriorityQueue區別在什么地方?
由于DelayQueue在獲取元素時需要檢測元素是否“可用”,也就是任務是否達到“臨界點”(指定時間點),因此加入元素和移除元素會有一些額外的操作。
典型的,移除元素需要檢測元素是否達到“臨界點”,增加元素的時候如果有一個元素比“頭元素”更早達到臨界點,那么就需要通知任務隊列。因此這需要一個條件變量final Condition available 。
移除元素(出隊列)的過程是這樣的:
- 總是檢測隊列的頭元素(順序最小元素,也是最先達到臨界點的元素)
- 檢測頭元素與當前時間的差,如果大于0,表示還未到底臨界點,因此等待響應時間(使用條件變量available)
- 如果小于或者等于0,說明已經到底臨界點或者已經過了臨界點,那么就移除頭元素,并且喚醒其它等待任務隊列的線程。
-
public E take() throws InterruptedException {
-
final ReentrantLock lock = this.lock;
-
lock.lockInterruptibly();
-
try {
-
for (;;) {
-
E first = q.peek();
-
if (first == null) {
-
available.await();
-
} else {
-
long delay = first.getDelay(TimeUnit.NANOSECONDS);
-
if (delay > 0) {
-
long tl = available.awaitNanos(delay);
-
} else {
-
E x = q.poll();
-
assert x != null;
-
if (q.size() != 0)
-
available.signalAll(); // wake up other takers
-
return x;
- ?
-
}
-
}
-
}
-
} finally {
-
lock.unlock();
-
}
-
}
?
同樣加入元素也會有相應的條件變量操作。當前僅當隊列為空或者要加入的元素比隊列中的頭元素還小的時候才需要喚醒“等待線程”去檢測元素。因為頭元素都沒有喚醒那么比頭元素更延遲的元素就更加不會喚醒。
?
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
available.await();
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0) {
long tl = available.awaitNanos(delay);
} else {
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll(); // wake up other takers
return x;
}
}
}
} finally {
lock.unlock();
}
}
?
?
有了任務隊列后再來看Future在ScheduledThreadPoolExecutor中是如何操作的。
java.util.concurrent.ScheduledThreadPoolExecutor.ScheduledFutureTask<V>是繼承java.util.concurrent.FutureTask<V>的,區別在于執行任務是否是周期性的。
?
private void runPeriodic() {
boolean ok = ScheduledFutureTask.super.runAndReset();
boolean down = isShutdown();
// Reschedule if not cancelled and not shutdown or policy allows
if (ok && (!down ||
(getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
!isStopped()))) {
long p = period;
if (p > 0)
time += p;
else
time = now() - p;
ScheduledThreadPoolExecutor.super.getQueue().add(this);
}
// This might have been the final executed delayed
// task. Wake up threads to check.
else if (down)
interruptIdleWorkers();
}
/**
* Overrides FutureTask version so as to reset/requeue if periodic.
*/
public void run() {
if (isPeriodic())
runPeriodic();
else
ScheduledFutureTask.super.run();
}
}
?
?
如果不是周期性任務調度,那么就和java.util.concurrent.FutureTask.Sync的調度方式是一樣的。如果是周期性任務(isPeriodic())那么就稍微有所不同的。
先從功能/結構上分析下。第一種情況假設提交的任務每次執行花費10s,間隔(delay/period)為20s,對于scheduleAtFixedRate而言,每次執行開始時間20s,對于scheduleWithFixedDelay來說每次執行開始時間30s。第二種情況假設提交的任務每次執行時間花費20s,間隔(delay/period)為10s,對于scheduleAtFixedRate而言,每次執行開始時間10s,對于scheduleWithFixedDelay來說每次執行開始時間30s。(具體分析可以參考這里)
也就是說scheduleWithFixedDelay的執行開始時間為(delay+cost),而對于scheduleAtFixedRate來說執行開始時間為max(period,cost)。
回頭再來看上面源碼runPeriodic()就很容易了。但特別要提醒的,如果任務的任何一個執行遇到異常,則后續執行都會被取消,這從runPeriodic()就能看出。要強調的第二點就是同一個周期性任務不會被同時執行。就比如說盡管上面第二種情況的scheduleAtFixedRate任務每隔10s執行到達一個時間點,但是由于每次執行時間花費為20s,因此每次執行間隔為20s,只不過執行的任務次數會多一點。但從本質上講就是每隔20s執行一次,如果任務隊列不取消的話。
為什么不會同時執行?
這是因為ScheduledFutureTask執行的時候會將任務從隊列中移除來,執行完畢以后才會添加下一個同序列的任務,因此任務隊列中其實最多只有同序列的任務的一份副本,所以永遠不會同時執行(盡管要執行的時間在過去)。
?
ScheduledThreadPoolExecutor使用一個無界(容量無限,整數的最大值)的容器(DelayedWorkQueue隊列),根據ThreadPoolExecutor的原理,只要當容器滿的時候才會啟動一個大于corePoolSize的線程數。因此實際上ScheduledThreadPoolExecutor是一個固定線程大小的線程池,固定大小為corePoolSize,構造函數里面的Integer.MAX_VALUE其實是不生效的(盡管PriorityQueue使用數組實現有PriorityQueue大小限制,如果你的任務數超過了2147483647就會導致OutOfMemoryError,這個參考PriorityQueue的grow方法)。
?
再回頭看scheduleAtFixedRate等方法就容易多了。無非就是往任務隊列中添加一個未來某一時刻的ScheduledFutureTask任務,如果是scheduleAtFixedRate那么period/delay就是正數,如果是scheduleWithFixedDelay那么period/delay就是一個負數,如果是0那么就是一次性任務。直接調用父類ThreadPoolExecutor的execute/submit等方法就相當于period/delay是0,并且initialDelay也是0。
?
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
if (initialDelay < 0) initialDelay = 0;
long triggerTime = now() + unit.toNanos(initialDelay);
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Object>(command,
null,
triggerTime,
unit.toNanos(period)));
delayedExecute(t);
return t;
}
?
?
另外需要補充說明的一點,前面說過java.util.concurrent.FutureTask.Sync任務只能執行一次,那么在runPeriodic()里面怎么又將執行過的任務加入隊列中呢?這是因為java.util.concurrent.FutureTask.Sync提供了一個innerRunAndReset()方法,此方法不僅執行任務還將任務的狀態還原成0(初始狀態)了,所以此任務就可以重復執行。這就是為什么runPeriodic()里面調用runAndRest()的緣故。
?
boolean innerRunAndReset() {
if (!compareAndSetState(0, RUNNING))
return false;
try {
runner = Thread.currentThread();
if (getState() == RUNNING)
callable.call(); // don't set result
runner = null;
return compareAndSetState(RUNNING, 0);
} catch (Throwable ex) {
innerSetException(ex);
return false;
}
}
?
?
謝謝xylz的文章。
關于線程池由于時間原因,沒有好好整理。
?
內容來源:
深入淺出 Java Concurrency (33): 線程池 part 6 線程池的實現及原理 (1)
http://www.blogjava.net/xylz/archive/2011/01/18/343183.html
深入淺出 Java Concurrency (33): 線程池 part 6 線程池的實現及原理 (2)
http://www.blogjava.net/xylz/archive/2011/02/11/344091.html
深入淺出 Java Concurrency (33): 線程池 part 6 線程池的實現及原理 (3)
http://www.blogjava.net/xylz/archive/2011/02/13/344207.html
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的Java多线程(十二)之线程池深入分析(下)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java多线程(十一)之线程池深入分析(
- 下一篇: Java多线程(十)之Reentrant