FutureTask源码
介紹
- FutureTask是一種異步任務(wù)(或異步計算),舉個栗子,主線程的邏輯中需要使用某個值,但這個值需要負(fù)責(zé)的運(yùn)算得來,那么主線程可以提前建立一個異步任務(wù)來計算這個值(在其他的線程中計算),然后去做其他事情,當(dāng)需要這個值的時候再通過剛才建立的異步任務(wù)來獲取這個值,有點(diǎn)并行的意思,這樣可以縮短整個主線程邏輯的執(zhí)行時間。
- 與1.6版本不同,1.7的FutureTask不再基于AQS來構(gòu)建,而是在內(nèi)部采用簡單的Treiber Stack來保存等待線程。
接口
public interface Future<V> {//取消任務(wù)的執(zhí)行。參數(shù)指定是否立即中斷任務(wù)執(zhí)行,或者等等任務(wù)結(jié)束boolean cancel(boolean mayInterruptIfRunning);//任務(wù)是否已經(jīng)取消,任務(wù)正常完成前將其取消,則返回 trueboolean isCancelled();//任務(wù)是否已經(jīng)完成。需要注意的是如果任務(wù)正常終止、異常或取消,都將返回trueboolean isDone();//等待任務(wù)執(zhí)行結(jié)束,然后獲得V類型的結(jié)果。InterruptedException 線程被中斷異常, ExecutionException任務(wù)執(zhí)行異常,如果任務(wù)被取消,還會拋出CancellationExceptionV get() throws InterruptedException, ExecutionException;//同上面的get功能一樣,多了設(shè)置超時時間。參數(shù)timeout指定超時時間,uint指定時間的單位,在枚舉類TimeUnit中有相關(guān)的定義。如果計算超時,將拋出TimeoutExceptionV get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException; } public interface RunnableFuture<V> extends Runnable, Future<V> {/*** Sets this Future to the result of its computation* unless it has been cancelled.*/void run(); }?源碼分析
運(yùn)行過程
FutureTask常用方式:
1.創(chuàng)建任務(wù),實(shí)際使用時,一般會結(jié)合線程池(ThreadPoolExecutor)使用,所以是在線程池內(nèi)部創(chuàng)建FutureTask。
2.執(zhí)行任務(wù),一般會有由工作線程(對于我們當(dāng)前線程來說的其他線程)調(diào)用FutureTask的run方法,完成執(zhí)行。
3.獲取結(jié)果,一般會有我們的當(dāng)前線程去調(diào)用get方法來獲取執(zhí)行結(jié)果,如果獲取時,任務(wù)并沒有被執(zhí)行完畢,當(dāng)前線程就會被阻塞,直到任務(wù)被執(zhí)行完畢,然后獲取結(jié)果。
4.取消任務(wù),某些情況下會放棄任務(wù)的執(zhí)行,進(jìn)行任務(wù)取消。
內(nèi)部結(jié)構(gòu)
public class FutureTask<V> implements RunnableFuture<V> {/** * 內(nèi)部狀態(tài)可能得遷轉(zhuǎn)過程: * NEW -> COMPLETING -> NORMAL //正常完成 * NEW -> COMPLETING -> EXCEPTIONAL //發(fā)生異常 * NEW -> CANCELLED //取消 * NEW -> INTERRUPTING -> INTERRUPTED //中斷 */ private volatile int state;private static final int NEW = 0;private static final int COMPLETING = 1;private static final int NORMAL = 2;private static final int EXCEPTIONAL = 3;private static final int CANCELLED = 4;private static final int INTERRUPTING = 5;private static final int INTERRUPTED = 6;/** 內(nèi)部的callable,運(yùn)行完成后設(shè)置為null */ private Callable<V> callable;/** 如果正常完成,就是執(zhí)行結(jié)果,通過get方法獲取;如果發(fā)生異常,就是具體的異常對象,通過get方法拋出。 */ private Object outcome; // 本身沒有volatile修飾, 依賴state的讀寫來保證可見性。 /** 執(zhí)行內(nèi)部callable的線程。 */ private volatile Thread runner;/** 存放等待線程的Treiber Stack*/ private volatile WaitNode waiters;//所謂的Treiber Stack就是由WaitNode組成的(一個單向鏈表)。static final class WaitNode { volatile Thread thread; //指向block線程volatile WaitNode next; //下一個nodeWaitNode() { thread = Thread.currentThread(); } } }創(chuàng)建
public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW; // ensure visibility of callable}public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW; // ensure visibility of callable}//以下方法為Executors的方法public static <T> Callable<T> callable(Runnable task, T result) {if (task == null)throw new NullPointerException();return new RunnableAdapter<T>(task, result);}static final class RunnableAdapter<T> implements Callable<T> {final Runnable task;final T result;RunnableAdapter(Runnable task, T result) {this.task = task;this.result = result;}public T call() {task.run();return result;}}必須把state的寫放到最后,因?yàn)閟tate本身由volatile修飾,所以可以保證callable的可見性。(因?yàn)楹罄m(xù)讀callable之前會先讀state,還記得這個volatile寫讀的HappenBefore規(guī)則吧)
狀態(tài)
/** * 內(nèi)部狀態(tài)可能得遷轉(zhuǎn)過程: * NEW -> COMPLETING -> NORMAL //正常完成 * NEW -> COMPLETING -> EXCEPTIONAL //發(fā)生異常 * NEW -> CANCELLED //取消 * NEW -> INTERRUPTING -> INTERRUPTED //中斷 */ public boolean isCancelled() {return state >= CANCELLED;}//只要不為NEW就表示結(jié)束public boolean isDone() {return state != NEW;}?
?
private V report(int s) throws ExecutionException {Object x = outcome;if (s == NORMAL)return (V)x;if (s >= CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);}protected void setException(Throwable t) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = t;UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final statefinishCompletion();}}get,set
public V get() throws InterruptedException, ExecutionException {int s = state;if (s <= COMPLETING)s = awaitDone(false, 0L);return report(s);}public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)throw new NullPointerException();int s = state;if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)throw new TimeoutException();return report(s);}protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion();}}get方法會block直到計算完成。awaitDone()方法:
private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;for (;;) {//中斷,則移除q,拋出IEif (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}int s = state;if (s > COMPLETING) {//處理完,返回,如果q!=null,則把線程解綁if (q != null)q.thread = null;return s;}else if (s == COMPLETING) // 任務(wù)正在執(zhí)行中,COMPLETING是中間狀態(tài)。Thread.yield(); //釋放CPU//以下代碼:state == NEW,else if (q == null) // q == null,則創(chuàng)建一個WaitNode,綁定Threadq = new WaitNode();else if (!queued) //未入隊,則入隊queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);else if (timed) { //超時判斷nanos = deadline - System.nanoTime();if (nanos <= 0L) {removeWaiter(q);return state;}LockSupport.parkNanos(this, nanos);}elseLockSupport.park(this);}}private void removeWaiter(WaitNode node) {if (node != null) {node.thread = null;retry:for (;;) { // restart on removeWaiter racefor (WaitNode pred = null, q = waiters, s; q != null; q = s) {s = q.next;if (q.thread != null)pred = q;else if (pred != null) {pred.next = s;if (pred.thread == null) // check for racecontinue retry;}else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,q, s))continue retry;}break;}}} private void finishCompletion() {// assert state > COMPLETING;for (WaitNode q; (q = waiters) != null;) {if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {//喚醒線程Thread t = q.thread;if (t != null) {q.thread = null;LockSupport.unpark(t);}//繼續(xù)下一個waiterWaitNode next = q.next;if (next == null)break;q.next = null; // unlink to help gcq = next;}break;}}done();callable = null; // to reduce footprint}get方法總結(jié):
1.首先檢查當(dāng)前任務(wù)的狀態(tài),如果狀態(tài)表示執(zhí)行完成,進(jìn)入第2步。
2.獲取執(zhí)行結(jié)果,也可能得到取消或者執(zhí)行異常,get過程結(jié)束。
3.如果當(dāng)前任務(wù)狀態(tài)表示未執(zhí)行或者正在執(zhí)行,那么當(dāng)前線程放入一個新建的等待節(jié)點(diǎn),然后進(jìn)入Treiber Stack進(jìn)行阻塞等待。
4.如果任務(wù)被工作線程(對當(dāng)前線程來說是其他線程)執(zhí)行完畢,執(zhí)行完畢時工作線程會喚醒Treiber Stack上等待的所有線程,所以當(dāng)前線程被喚醒,清空當(dāng)前等待節(jié)點(diǎn)上的線程域,然后進(jìn)入第2步。
5.當(dāng)前線程在阻塞等待結(jié)果過程中可能被中斷,如果被中斷,那么會移除當(dāng)前線程在Treiber Stack上對應(yīng)的等待節(jié)點(diǎn),然后拋出中斷異常,get過程結(jié)束。
6.當(dāng)前線程也可能執(zhí)行帶有超時時間的阻塞等待,如果超時時間過了,還沒得到執(zhí)行結(jié)果,那么會除當(dāng)前線程在Treiber Stack上對應(yīng)的等待節(jié)點(diǎn),然后拋出超時異常,get過程結(jié)束。
?
run
public void run() {//不是NEW狀態(tài)或者設(shè)置runner失敗,直接退出if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {//執(zhí)行任務(wù)result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}if (ran)set(result);}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;//處理可能發(fā)生的取消中斷(cancel(true))。 if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}/** * 確保cancel(true)產(chǎn)生的中斷發(fā)生在run或runAndReset方法過程中。 */ private void handlePossibleCancellationInterrupt(int s) { // 如果當(dāng)前正在中斷過程中,自旋等待一下,等中斷完成。 if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt // 這里的state狀態(tài)一定是INTERRUPTED; // 這里不能清除中斷標(biāo)記,因?yàn)闆]辦法區(qū)分來自cancel(true)的中斷。 // Thread.interrupted(); } protected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW; }可見runAndReset與run方法的區(qū)別只是執(zhí)行完畢后不設(shè)置結(jié)果、而且有返回值表示是否執(zhí)行成功。
cancel
//JDK 1.7 public boolean cancel(boolean mayInterruptIfRunning) { if (state != NEW) return false; //如果任務(wù)已經(jīng)執(zhí)行完畢,返回false。 if (mayInterruptIfRunning) { //如果有中斷任務(wù)的標(biāo)志,嘗試將任務(wù)狀態(tài)設(shè)置為INTERRUPTING if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING)) return false; //上面設(shè)置成功的話,這里進(jìn)行線程中斷。 Thread t = runner; if (t != null) t.interrupt(); //最后將任務(wù)狀態(tài)設(shè)置為INTERRUPTED,注意這里又是LazySet。 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state } //如果沒有中斷任務(wù)的標(biāo)志,嘗試將任務(wù)狀態(tài)設(shè)置為CANCELLED。 else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)) return false; //最后喚醒Treiber Stack中所有等待線程。 finishCompletion(); return true; } //JDK 1.8public boolean cancel(boolean mayInterruptIfRunning) {if (!(state == NEW &&UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;try { // in case call to interrupt throws exceptionif (mayInterruptIfRunning) {try {Thread t = runner;if (t != null)t.interrupt();} finally { // final stateUNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}} finally {finishCompletion();}return true;}在設(shè)置mayInterruptIfRunning為true的情況下,內(nèi)部首先通過一個原子操作將state從NEW轉(zhuǎn)變?yōu)镮NTERRUPTING,然后中斷執(zhí)行任務(wù)的線程,然后在通過一個LazySet的操作將state從INTERRUPTING轉(zhuǎn)變?yōu)镮NTERRUPTED,由于后面這個操作對其他線程并不會立即可見,所以handlePossibleCancellationInterrupt才會有一個自旋等待state從INTERRUPTING變?yōu)镮NTERRUPTED的過程。
?
?
總結(jié)
以上是生活随笔為你收集整理的FutureTask源码的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JDK1.8并发包中的类
- 下一篇: TimingWheel 时间轮详解