Java多线程——FutureTask源码解析
一個很常見的多線程案例是,我們安排主線程作為分配任務和匯總的一方,然后將計算工作切分為多個子任務,安排多個線程去計算,最后所有的計算結果由主線程進行匯總。比如,歸并排序,字符頻率的統計等等。
我們知道Runnable是不返回計算結果的,如果想利用多線程的話,只能存儲到一個實例的內部變量里面進行交互,但存在一個問題,如何判斷是否已經計算完成了。用Thread.join是一個方案,但是我們只能依次等待一個線程結束后處理一個線程,如果線程1恰好特別慢,則后續已經完成的線程不能被及時處理。我們希望能夠獲知線程的執行狀態,發現哪個線程處理完就先統計它的計算結果。可以考慮使用Callable和FutureTask來完成。
先說Callable它是一個功能接口,它只有一個方法V call(),計算一個結果,失敗的話拋出一個異常。和Runnable不同的是,它不能直接交給Thread來執行,所以需要一個別的類來封裝它與Runnable,這個類就是FutureTask。FutureTask是一個類,繼承了RunnableFuture,而RunnableFuture是一個多繼承接口,它繼承了Runnable和 Future,所以FutureTask是可以作為實現了Runnable的實例交給Thread執行。
從內部變量來看,含有一個下層Callable實例,一個狀態表示,一個返回結果,以及對運行線程的記錄
/*** 任務的運行狀態,最初是NEW。運行狀態只在set, setException和cancel方法中過度到最終狀態。* 在完成過程中,狀態可能發生轉移到COMPLETING(在設置結果時)或者INTERRUPTING(僅當中斷運行來滿足cancel(true)時)。* 從這些中間狀態轉移到最終狀態使用成本更低有序/懶惰寫入,因為值是唯一的且之后不能再修改。** Possible state transitions:* NEW -> COMPLETING -> NORMAL* NEW -> COMPLETING -> EXCEPTIONAL* NEW -> CANCELLED* NEW -> INTERRUPTING -> INTERRUPTED*/private volatile int state;private static final int NEW = 0;private static final int COMPLETING = 1;private static final int NORMAL = 2;private static final int EXCEPTIONAL = 3;private static final int CANCELLED = 4;private static final int INTERRUPTING = 5;private static final int INTERRUPTED = 6;/** 下層的callable,運行后為null */private Callable<V> callable;/** get()操作返回的結果或者拋出的異常*/private Object outcome; // 不是volatile,由reads/writes狀態來保護/** 運行callable的線程,在run()通過CAS修改*/private volatile Thread runner;/** 等待線程的Treiber堆棧 */private volatile WaitNode waiters;構造函數
構造函數總共有兩種重載,第一種直接給出Callable實例
public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW; // 確保callable的可見性}第二種,給出Runnable實例和期望的返回結果,如果Runnable實例運行成功則返回的是result
public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);//創建一個callablethis.state = NEW; // 確保callable的可見性}run
run方法是Thread運行FutureTask內任務的接口。首先,根據最上方的進入條件可以看出,只有成功競爭到修改runnerOffset成功的線程才能執行后續方法,而搜索整個類文件,可以發現只有在run結束后才會重置為null,所以同一時間只能有一個線程執行run方法成功。然后要檢查state和callable的狀態,因為run會將它們修改。調用callable.call方法獲取返回結果,成功的話設置結果,失敗的話設置返回結果為異常。無論是否執行成功,runner會被重置為null。
public void run() {if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;//狀態必須是NEW且修改執行線程成功,否則直接返回,避免被多個線程同時執行try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {result = c.call();//調用callable.call方法獲取返回結果ran = true;//執行成功} catch (Throwable ex) {result = null;ran = false;//執行失敗setException(ex);//設置返回異常并喚醒等待線程解除阻塞}if (ran)set(result);//設置結果}} finally {//runner直到狀態設置完成不能為null來避免并發調用run()runner = null;//在將runner設置為null后需要重新讀取state避免漏掉中斷int s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}set方法先修改state為COMPLETING,然后將outcome設置為剛才計算出來的結果,最后設置state為NORMAL,并調用finishCompletion。這個方法移除并通知所有等待的線程解除阻塞,調用done(),并將callable設為null。done方法默認是什么也不做。
protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最終狀態finishCompletion();//喚醒等待線程,將callable設為null}}private void finishCompletion() {// assert state > COMPLETING;for (WaitNode q; (q = waiters) != null;) {if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {Thread t = q.thread;if (t != null) {q.thread = null;LockSupport.unpark(t);//如果線程被park阻塞,解除阻塞}WaitNode next = q.next;if (next == null)break;q.next = null; // 取消連接幫助gcq = next;}break;}}done();//未重寫時什么也不做callable = null; // to reduce footprint減少覆蓋區}setException跟set邏輯上基本一樣,除了設置返回結果是Throwable對象
protected void setException(Throwable t) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//修改狀態outcome = t;//結果為ThrowableUNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // 最終狀態finishCompletion();}}get
get方法如果FutureTask已經執行完成則返回結果,否則會等待并阻止線程調度。等待時長可以輸入,單位為納秒,不輸入為不限時等待,限時等待超時仍然沒有完成會拋出異常。
public V get() throws InterruptedException, ExecutionException {int s = state;if (s <= COMPLETING)s = awaitDone(false, 0L);//等待完成return report(s);//檢查時返回結果還是拋出異常}public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)throw new NullPointerException();int s = state;if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)//限時等待完成throw new TimeoutException();return report(s);}awaitDone這個方法會阻塞當前線程(get方法的調用線程)的調度并增加等待結點,阻塞時長根據輸入的時間長度決定。如果執行Callable任務的線程完成了運行或者被中斷,則會解除棧中等待結點對應線程的阻塞。然后會根據執行結果決定是否要拋出異常還是返回執行完成的結果。
private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;//是否完成入棧for (;;) {if (Thread.interrupted()) {//檢查線程是否已經被中斷removeWaiter(q);//移除被中斷的等待結點throw new InterruptedException();}int s = state;if (s > COMPLETING) {//已經完成if (q != null)q.thread = null;//移除等待return s;}else if (s == COMPLETING) // cannot time out yet還沒有超時Thread.yield();//已經在賦值,所以只需讓出時間片等待賦值完成//下方都是還在沒有完成call方法的情況else if (q == null)q = new WaitNode();else if (!queued)queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);//q加入到棧的最前方else if (timed) {nanos = deadline - System.nanoTime();if (nanos <= 0L) {//超時了removeWaiter(q);//移除超時的等待結點return state;}LockSupport.parkNanos(this, nanos);//阻塞當前線程nanos納秒}elseLockSupport.park(this);//阻塞當前線程}}cancel
cancel輸入的參數表示如果當前還在運行中是否要中斷執行線程,如果輸入參數是false則只有線程已經執行完成或者拋出異常或者已經被中斷時可以把狀態修改為CANCELLED,如果是true則會中斷線程并將狀態改為INTERRUPTED。所以,cancel在該任務已經結束或者已被取消,或者競爭修改狀態失敗時都會失敗。如果中斷成功,會釋放所有被阻塞的等待線程。
public boolean cancel(boolean mayInterruptIfRunning) {if (!(state == NEW &&UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;//已經完成或者被取消或者競爭取消失敗返回falsetry { // in case call to interrupt throws exceptionif (mayInterruptIfRunning) {try {Thread t = runner;if (t != null)t.interrupt();} finally { // final stateUNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}} finally {finishCompletion();}return true;}狀態檢查
非常簡單的兩個方法。因為CANCELLED是state中最大的,所以只有cancel方法成功才會是這種狀態。而isDone只要不是還在運行或者還沒有被執行就是返回true。
public boolean isCancelled() {return state >= CANCELLED;}public boolean isDone() {return state != NEW;}簡單的使用示例
public class CallableTest implements Callable<Integer>{private int start;public CallableTest(int start) {this.start = start;}@Overridepublic Integer call() throws Exception {Thread.sleep(500);return start + 1;}public static void main(String args[]) throws InterruptedException, ExecutionException{long start = System.currentTimeMillis();FutureTask<Integer> task1 = new FutureTask<>(new CallableTest(2));new Thread(task1).start();FutureTask<Integer> task2 = new FutureTask<>(new CallableTest(4));new Thread(task2).start();System.out.println(task1.get() + task2.get());//8long end = System.currentTimeMillis();System.out.println(end - start);//506} }總結
以上是生活随笔為你收集整理的Java多线程——FutureTask源码解析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: MugLife app是一款可以将静态照
- 下一篇: Java编译器优化与运行期优化技术浅析