阿里技术专家加多:Java异步编程实战之基于JDK中的Future实现异步编程
正文共:14244?字?8?圖
預計閱讀時間:?36?分鐘
本節內容摘自《Java異步編程實戰》中的一小節。
一、前言
本節主要講解如何使用JDK中的Future實現異步編程,這包含如何使用FutureTask實現異步編程以及其內部實現原理以及FutureTask的局限性。
二、 JDK 中的Future
在Java并發包(JUC包)中Future代表著異步計算結果,Future中提供了一些列方法用來檢查計算結果是否已經完成,還提供了同步等待任務執行完成的方法,以及獲取計算結果的方法等。當計算結果完成時只能通過提供的get系列方法來獲取結果,如果使用了不帶超時時間的get方法則在計算結果完成前,調用線程會被一直阻塞。另外計算任務是可以使用cancle方法來取消的,但是一旦一個任務計算完成,則不能再被取消了。
首先我們看下Future接口的類圖結構如圖3-1-1:
圖3-1-1Future類圖
如上圖3-1-1Future類總共就有5個接口方法,下面我們來一一講解:
V get() throws InterruptedException, ExecutionException :等待異步計算任務完成,并返回結果;如果當前任務計算還沒完成則會阻塞調用線程直到任務完成;如果在等待結果的過程中有其他線程取消了該任務,則調用線程拋出CancellationException異常;如果在等待結果的過程中有其他線程中斷了該線程,則調用線程拋出InterruptedException異常;如果任務計算過程中拋出了異常,則調用線程會拋出ExecutionException異常。
V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException:相比get()方法多了超時時間,不同在于線程調用了該方法后在任務結果沒有計算出來前調用線程不會一直被阻塞,而是會在等待timeout個unit單位的時間后拋出TimeoutException異常后返回。添加超時時間這避免了調用線程死等的情況,讓調用線程可以及時釋放。
boolean isDone():如果計算任務已經完成則返回true,否則返回false,需要注意的是任務完成是指任務正常完成了或者由于拋出異常而完成了或者任務被取消了。
boolean cancel(boolean mayInterruptIfRunning) :嘗試取消任務的執行;如果當前任務已經完成或者任務已經被取消了,則嘗試取消任務會失敗;如果任務還沒被執行時候,調用了取消任務,則任務將永遠不會被執行;如果任務已經開始運行了,這時候取消任務,則參數mayInterruptIfRunning將決定是否要將正在執行任務的線程中斷,如果為true則標識要中斷,否則標識不中斷;當調用取消任務后,在調用isDone()方法,后者會返回true,隨后調用isCancelled()方法也會一直返回true;該方法會返回false,如果任務不能被取消,比如任務已經完成了,任務已經被取消了。
boolean isCancelled():如果任務在被執行完畢前被取消了,則該方法返回true,否則返回false。
三 JDK中的FutureTask
3.1 FutureTask 概述
FutureTask代表了一個可被取消的異步計算任務,該類實現了Future接口,比如提供了啟動和取消任務、查詢任務是否完成、獲取計算結果的接口。
FutureTask任務的結果只有當任務完成后才能獲取,并且只能通過get系列方法獲取,當結果還沒出來時候,線程調用get系列方法會被阻塞;另外一旦任務被執行完成,任務不能被重啟,除非運行時候使用了runAndReset方法;FutureTask中的任務可以是Callable類型,也可以是Runnable類型(因為FutureTask實現了Runnable接口),FutureTask類型的任務可以被提交到線程池執行。
我們修改上節的例子如下:
public class AsyncFutureExample {public static String doSomethingA() {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("--- doSomethingA---");return "TaskAResult";}public static String doSomethingB() {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("--- doSomethingB---");return "TaskBResult";}public static void main(String[] args) throws InterruptedException, ExecutionException {long start = System.currentTimeMillis();// 1.創建future任務FutureTask<String> futureTask = new FutureTask<String>(() -> {String result = null;try {result = doSomethingA();} catch (Exception e) {e.printStackTrace();}return result;});// 2.開啟異步單元執行任務AThread thread = new Thread(futureTask, "threadA");thread.start();// 3.執行任務BString taskBResult = doSomethingB();// 4.同步等待線程A運行結束String taskAResult = futureTask.get();//5.打印兩個任務執行結果System.out.println(taskAResult + " " + taskBResult); System.out.println(System.currentTimeMillis() - start);} }如上代碼doSomethingA和doSomethingB方法都是有返回值的任務,main函數內代碼1創建了一個異步任務futureTask,其內部執行任務doSomethingA。
代碼2則創建了一個線程,并且以futureTask為執行任務,并且啟動;代碼3使用main線程執行任務doSomethingB,這時候任務doSomethingB和doSomethingA是并發運行的,等main函數運行doSomethingB完畢后,執行代碼4同步等待doSomethingA任務完成,然后代碼5打印兩個任務的執行結果。
如上可知使用FutureTask可以獲取到異步任務的結果。
當然我們也可以把FutureTask提交到線程池來執行,使用線程池運行方式代碼如下:
// 0自定義線程池private final static int AVALIABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();private final static ThreadPoolExecutor POOL_EXECUTOR = new ThreadPoolExecutor(AVALIABLE_PROCESSORS,AVALIABLE_PROCESSORS * 2, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(5),new ThreadPoolExecutor.CallerRunsPolicy());public static void main(String[] args) throws InterruptedException, ExecutionException {long start = System.currentTimeMillis();// 1.創建future任務FutureTask<String> futureTask = new FutureTask<String>(() -> {String result = null;try {result = doSomethingA();} catch (Exception e) {e.printStackTrace();}return result;});// 2.開啟異步單元執行任務APOOL_EXECUTOR.execute(futureTask);// 3.執行任務BString taskBResult = doSomethingB();// 4.同步等待線程A運行結束String taskAResult = futureTask.get();// 5.打印兩個任務執行結果System.out.println(taskAResult + " " + taskBResult);System.out.println(System.currentTimeMillis() - start);}如上可知代碼0創建了一個線程池,代碼2添加異步任務到線程池,這里我們是調用了線程池的execute方法把futureTask提交到線程池的,其實下面代碼與上面是等價的:
public static void main(String[] args) throws InterruptedException, ExecutionException {long start = System.currentTimeMillis();// 1.開啟異步單元執行任務AFuture<String> futureTask = POOL_EXECUTOR.submit(() -> {String result = null;try {result = doSomethingA();} catch (Exception e) {e.printStackTrace();}return result;});// 2.執行任務BString taskBResult = doSomethingB();// 3.同步等待線程A運行結束String taskAResult = futureTask.get();// 4.打印兩個任務執行結果System.out.println(taskAResult + " " + taskBResult);System.out.println(System.currentTimeMillis() - start);}如上代碼1我們調用了線程池的submit方法提交了一個任務到線程池,然后返回了一個FutureTask對象。
3.2 FutureTask的類圖結構:
由于FutureTask在異步編程領域還是比較重要的,所以我們有必要探究下其原理,以便加深對異步的理解,首先我們來看下其類圖結構如圖3-2-2-1:圖3-2-2-1 FutureTask的類圖
如上時序圖3-2-2-1FutureTask實現了Future接口的所有方法,并且實現了Runnable接口,所以其是可執行任務,可以投遞到線程池或者線程來執行。
FutureTask中變量state是一個使用volatile關鍵字修飾(用來解決多線程下內存不可見問題,具體可以參考《Java并發編程之美》一書)的int類型,用來記錄任務狀態,任務狀態枚舉值如下:
一開始任務狀態會被初始化為NEW;當通過set、 setException、 cancel函數設置任務結果時候,任務會轉換為終止狀態;在任務完成過程中,設置任務狀態會變為COMPLETING(當結果被使用set方法設置時候),也可能會經過INTERRUPTING狀態(當使用cancel(true)方法取消任務并中斷任務時候);當任務被中斷后,任務狀態為INTERRUPTED;當任務被取消后,任務狀態為CANCELLED;當任務正常終止時候,任務狀態為NORMAL;當任務執行異常后,任務會變為EXCEPTIONAL狀態。
另外在任務運行過程中,任務可能的狀態轉換路徑如下:
NEW -> COMPLETING -> NORMAL :正常終止流程轉換
NEW -> COMPLETING -> EXCEPTIONAL:執行過程中發生異常流程轉換
NEW -> CANCELLED:任務還沒開始就被取消
NEW -> INTERRUPTING -> INTERRUPTED:任務被中斷
從上述轉換可知,任務最終只有四種終態:NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTED,另外可知任務的狀態值是從上到下遞增的。
類圖中callable是有返回值的可執行任務,創建FutureTask對象時候,可以通過構造函數傳遞該任務。
類圖中outcome是任務運行的結果,可以通過get系列方法來獲取該結果,另外outcome這里沒有被修飾為volatile,是因為變量state已經被使用volatile修飾了,這里是借用volatile的內存語義來保證寫入outcome時候會把值刷新到主內存,讀取時候會從主內存讀取,從而避免多線程下內存不可見問題(可以參考《Java并發編程之美》一書)。
類圖中runner變量,記錄了運行該任務的線程,這個是在FutureTask的run方法內使用CAS函數設置的。
類圖中waiters變量是一個WaitNode節點,使用Treiber stack實現的無鎖棧,棧頂元素就是使用waiters代表,棧用來記錄所有等待任務結果的線程節點,其定義為:
可知其是一個簡單的鏈表,用來記錄所有等待結果而被阻塞的線程。
最后我們看下其構造函數:
public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW; }如上代碼可知構造函數內保存了傳遞的callable任務到callable變量,并且設置任務狀態為NEW,這里由于state為volatile修飾,所以寫入state的值可以保證callable的寫入也會被刷入主內存,這避免了多線程下內存不可見性。
另外還有一個構造函數:
public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW; }該函數傳入一個Runnable類型的任務,由于該任務是不具有返回值的,所以這里使用Executors.callable方法進行適配,適配為Callable類型的任務.
Executors.callable(runnable, result);把Runnable類型任務轉換為了callable:
public static <T> Callable<T> callable(Runnable task, T result) {if (task == null)throw new NullPointerException();return new RunnableAdapter<T>(task, result);}static final class RunnableAdapter<T> implements Callable<T> {final Runnable task;final T result;RunnableAdapter(Runnable task, T result) {this.task = task;this.result = result;}public T call() {task.run();return result;}}如上可知使用了適配器模式來做轉換。
另外FutureTask中使用了UNSAFE機制來操作內存變量:
private static final sun.misc.Unsafe UNSAFE;private static final long stateOffset;//state變量的偏移地址private static final long runnerOffset;//runner變量的偏移地址private static final long waitersOffset;//waiters變量的偏移地址static {try {//獲取UNSAFE的實例UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = FutureTask.class;//獲取變量state的偏移地址stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("state"));//獲取變量runner的偏移地址runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner"));//獲取變量waiters變量的偏移地址waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));} catch (Exception e) {throw new Error(e);} }如上代碼分別獲取了FutureTask中幾個變量在FutureTask對象內的內存地址偏移量,以便實現中使用UNSAFE的CAS操作來操作這些變量。
3.3 FutureTask的run() 方法
該方法是任務的執行體,線程是調用該方法來具體運行任務的,如果任務沒有被取消,則該方法會運行任務,并且設置結果到outcome變量里面,其代碼:
public void run() {//1.如果任務不是初始化的NEW狀態,或者使用CAS設置runner為當前線程失敗,則直接返回if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;//2.如果任務不為null,并且任務狀態為NEW,則執行任務try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;//2.1執行任務,如果OK則設置ran標記為truetry {result = c.call();ran = true;} catch (Throwable ex) {//2.2執行任務出現異常,則標記false,并且設置異常result = null;ran = false;setException(ex);}//3.任務執行正常,則設置結果if (ran)set(result);}} finally {runner = null;int s = state;//4.為了保證調用cancel(true)的線程在該run方法返回前中斷任務執行的線程if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);} } private void handlePossibleCancellationInterrupt(int s) {//為了保證調用cancel在該run方法返回前中斷任務執行的線程//這里使用Thread.yield()讓run方法執行線程讓出cpu執行權,以便讓//cancel(true)的線程執行cancel(true)中的代碼中斷任務線程if (s == INTERRUPTING)while (state == INTERRUPTING)Thread.yield(); // wait out pending interrupt }如上代碼1,如果任務不是初始化的NEW狀態,或者使用CAS設置runner為當前線程失敗,則直接返回;這個可以防止同一個FutureTask對象被提交給多個線程來執行,導致run方法被多個線程同時執行造成混亂。
代碼2,如果任務不為null,并且任務狀態為NEW,則執行任務,其中代碼2.1調用c.call()具體執行任務,如果任務執行OK,則調用set方法把結果記錄到result,并設置ran為true;否則執行任務過程中拋出異常則設置result為null,ran為false,并且調用setException設置異常信息后,任務就處于終止狀態,其中setException代碼如下:
如上代碼,使用CAS嘗試設置任務狀態state為COMPLETING,如果CAS成功則把異常信息設置到outcome變量,并且設置任務狀態為EXCEPTIONAL終止狀態,然后調用finishCompletion,其代碼:
private void finishCompletion() {//a遍歷鏈表節點for (WaitNode q; (q = waiters) != null;) {//a.1 CAS設置當前waiters節點為nullif (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {//a.1.1for (;;) {//喚醒當前q節點對應的線程Thread t = q.thread;if (t != null) {q.thread = null;LockSupport.unpark(t);}//獲取q的下一個節點WaitNode next = q.next;if (next == null)break;q.next = null; //help gcq = next;}break;}}//b。所有阻塞的線程都被喚醒后,調用done方法done();callable = null; // callable設置為null }如上代碼比較簡單,就是當任務已經處于終態后,激活waiters鏈表中所有由于等待獲取結果而被阻塞的線程,并從waiters鏈表中移除他們,等所有由于等待該任務結果的線程被喚醒后,調用done()方法,done默認實現為空實現。
上面我們講了當任務執行過程中出現異常后如何處理的,下面我們看代碼3,當任務是正常執行完畢后set(result)的實現:
protected void set(V v) {//3.1if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion();} }如上代碼3.1,使用CAS嘗試設置任務狀態state為COMPLETING,如果CAS成功則把任務結果設置到outcome變量,并且設置任務狀態為NORMAL終止狀態,然后調用finishCompletion喚醒所有因為等待結果而被阻塞的線程。
3.4 FutureTask的get()方法
等待異步計算任務完成,并返回結果;如果當前任務計算還沒完成則會阻塞調用線程直到任務完成;如果在等待結果的過程中有其他線程取消了該任務,則調用線程會拋出CancellationException異常;如果在等待結果的過程中有線程中斷了該線程,則拋出InterruptedException異常;如果任務計算過程中拋出了異常,則會拋出ExecutionException異常。
其代碼如下:
public V get() throws InterruptedException, ExecutionException {//1.獲取狀態,如有需要則等待int s = state;if (s <= COMPLETING)//等待任務終止s = awaitDone(false, 0L);//2.返回結果return report(s); }如上代碼1獲取任務的狀態,如果任務狀態的值小于等于COMPLETING則說明任務還沒有被完成,所以調用awaitDone掛起調用線程。
代碼2如果任務已經被完成,則返回結果。下面我們看awaitDone方法實現:
如上代碼1.1獲取設置的超時時間,如果傳遞的timed為false說明沒有設置超時時間,則deadline設置為0
代碼1.2無限循環等待任務完成,其中代碼1.2.1如果發現當前線程被中斷則從等待鏈表中移除當前線程對應的節點(如果隊列里面有該節點的話),然后拋出InterruptedException異常;代碼1.2.2如果發現當前任務狀態大于COMPLETING說明任務已經進入了終態(可能是NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTED中的一種),則把執行任務的線程的引用設置為null,并且返回結果;
代碼1.2.3如果當前任務狀態為COMPLETING說明任務已經接近完成了,就差設置結果到outCome了,則這時候讓當前線程放棄CPU執行,意在讓任務執行線程獲取到CPU然后讓任務狀態從COMPLETING轉換到終態NORMAL,這樣可以避免當前調用get系列的方法的線程被掛起,然后在被喚醒的開銷;
代碼1.2.4如果當前q為null,則創建一個與當前線程相關的節點,代碼1.2.5如果當前線程對應節點還沒放入到waiters管理的等待列表,則使用CAS操作放入;
代碼1.2.6如果設置了超時時間則使用LockSupport.parkNanos(this, nanos)讓當前線程掛起deadline時間,否則會調用 LockSupport.park(this);讓線程一直掛起直到其他線程調用了unpark方法,并且以當前線程為參數(比如finishCompletion()方法)。
另外帶超時參數的V get(long timeout, TimeUnit unit)方法與get()方法類似,只是添加了超時時間,這里不再累述。
3.5 FutureTask的cancel(boolean mayInterruptIfRunning)方法
嘗試取消任務的執行,如果當前任務已經完成或者任務已經被取消了,則嘗試取消任務會失敗;如果任務還沒被執行時候,調用了取消任務,則任務將永遠不會被執行;如果任務已經開始運行了,這時候取消任務,則參數mayInterruptIfRunning將決定是否要將正在執行任務的線程中斷,如果為true則標識要中斷,否則標識不中斷;
當調用取消任務后,在調用isDone()方法,后者會返回true,隨后調用isCancelled()方法也會一直返回true;該方法會返回false,如果任務不能被取消,比如任務已經完成了,任務已經被取消了。
cancel方法的代碼如下:
public boolean cancel(boolean mayInterruptIfRunning) {//1.如果任務狀態為New則使用CAS設置任務狀態為INTERRUPTING或者CANCELLEDif (!(state == NEW &&UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;//2.如果設置了中斷正常執行任務線程,則中斷try { if (mayInterruptIfRunning) {try {Thread t = runner;if (t != null)t.interrupt();} finally { // final stateUNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}} finally {//3.移除并激活所有因為等待結果而被阻塞的線程finishCompletion();}return true; }如上代碼1,如果任務狀態為New則使用CAS設置任務狀態為INTERRUPTING或者CANCELLED,如果mayInterruptIfRunning設置為了true則說明要中斷正在執行任務的線程,則使用CAS設置任務狀態為INTERRUPTING,否則設置為CANCELLED;如果CAS失敗則直接返回false。
如果CAS成功,則說明當前任務狀態已經為INTERRUPTING或者CANCELLED,如果mayInterruptIfRunning為true則中斷執行任務的線程,然后設置任務狀態為INTERRUPTED。
最后代碼3移除并激活所有因為等待結果而被阻塞的線程。
另外我們可以使用isCancelled()方法判斷一個任務是否被取消了,使用isDone()方法判斷一個任務是否處于終態了。
總結:當我們創建一個FutureTask時候,其任務狀態初始化為NEW,當我們把任務提交到線程或者線程池后,會有一個線程來執行該FutureTask任務,具體是調用其run方法來執行任務。在任務執行過程中,我們可以在其他線程調用FutureTask的get()方法來等待獲取結果,如果當前任務還在執行,則調用get的線程會被阻塞然后放入FutureTask內的阻塞鏈表隊列;多個線程可以同時調用get方法,這些線程可能都會被阻塞到了阻塞鏈表隊列。當任務執行完畢后會把結果或者異常信息設置到outcome變量,然后會移除和喚醒FutureTask內的阻塞鏈表隊列里面的線程節點,然后這些由于調用FutureTask的get方法而被阻塞的線程就會被激活。
3.6 FutureTask的局限性
FutureTask雖然提供了用來檢查任務是否執行完成、等待任務執行結果、獲取任務執行結果的方法,但是這些特色并不足以讓我們寫出簡潔的并發代碼。比如它并不能清楚的表達出多個FutureTask之間的關系,另外為了從Future獲取結果,我們必須調用get()方法,而該方法還是會在任務執行完畢前阻塞調用線程的,這明顯不是我們想要的。
我們真正要想要的是:
可以將兩個或者多個異步計算結合在一起變成一個,這包含兩個或者多個異步計算是相互獨立的時候或者第二個異步計算依賴第一個異步計算結果的時候。
對反應式編程的支持,也就是當任務計算完成后能進行通知,并且可以以計算結果作為一個行為動作的參數進行下一步計算,而不是僅僅提供調用線程以阻塞的方式獲取計算結果。
可以通過編程的方式手動設置(代碼的方式)Future的結果;FutureTask則不可以讓用戶通過函數來設置其計算結果,而是其任務內部來進行設置。
可以等多個Future對應的計算結果都出來后做一些事情
為了克服FutureTask的局限性,以及滿足我們對異步編程的需要,JDK8中提供了CompletableFuture,CompletableFuture是一個可以通過編程方式顯式的設置計算結果和狀態以便讓任務結束的Future,本書后面章節我們會具體講解。
四、總結
《Java異步編程實戰》一書是國內首本系統講解Java異步編程的書籍,本書涵蓋了Java中常見的異步編程場景:這包含單JVM內的異步編程、以及跨主機通過網絡通訊的遠程過程調用的異步調用與異步處理、Web請求的異步處理、以及常見的異步編程框架原理解析和golang語言內置的異步編程能力。識別下方二維碼即可購買本書:
? ? ? ?? #專注技術人的成長# 精彩推薦1.?漫畫:程序員真是太太太太太太太太有趣了! 2.?漫畫:程序員真的是太太太太太太太太難了!3.?知道創宇楊冀龍:技術人的商業思維都是錘出來的,真實需求長在客戶的KPI上 4.?漫畫:35歲的IT何去何從? 5.?漫畫:從修燈泡來看各種 IT 崗位,你是哪一種?點我支持加多????
總結
以上是生活随笔為你收集整理的阿里技术专家加多:Java异步编程实战之基于JDK中的Future实现异步编程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 美团技术:到店结算平台实践(胶片)
- 下一篇: Ubuntu16.04LTS安装ROS