【JUC系列】Future异步回调模式
何為異步回調(diào)
前面只是一個(gè)例子,對(duì)并發(fā)的主要模式進(jìn)行形象的說(shuō)明。
下面正式來(lái)講下經(jīng)常使用的幾個(gè)和并發(fā)相關(guān)的概念。
1.2.1. 同步、異步、阻塞、非阻塞
一:同步
所謂同步,就是在發(fā)出一個(gè)功能調(diào)用時(shí),在沒有獲得結(jié)果以前,該調(diào)用就不返回。也就是必須一件一件事作,等前一件作完了才能作下一件事。
單線程模式,就是絕對(duì)同步的。
二: 異步
異步首先必須是多線程模式。是指當(dāng)前線程,向其余的異步線程發(fā)出調(diào)用指令。當(dāng)前線程和異步線程,邏輯上同時(shí)執(zhí)行。
三:阻塞
在異步的場(chǎng)景下,當(dāng)前線程阻塞住,等待異步線程的執(zhí)行結(jié)果。阻塞是指線程進(jìn)入非可執(zhí)行狀態(tài),在這個(gè)狀態(tài)下,cpu不會(huì)給線程分配時(shí)間片,即線程暫停運(yùn)行。
阻塞模式是效率比較低的,若是阻塞嚴(yán)重的話,至關(guān)于又回到了同步的時(shí)代。
四:非阻塞
非阻塞和阻塞的概念相對(duì)應(yīng),指在不能馬上獲得結(jié)果以前,當(dāng)前線程不會(huì)阻塞住,而會(huì)繼續(xù)向下執(zhí)行。回調(diào)就是一種非阻塞的異步模式。并發(fā)線程經(jīng)過回調(diào),能夠?qū)⒔Y(jié)果返回給發(fā)起線程。除了回調(diào),還有其余的非阻塞異步模式,好比消息通信、信號(hào)量等等。
1.2.2. 阻塞模式的泡茶案例圖解
阻塞模式的泡茶模型,對(duì)應(yīng)到前面的第二種泡茶喝的工序模型。
在阻塞模式泡茶喝的模型中,有三條線程,他們分別是:
線程一:燒水線程
洗好水壺,灌上涼水,放在火上;
線程二:清洗線程
洗茶壺、洗茶杯;
線程三:主線程
分別啟動(dòng)燒水線程、清洗線程。等水開了,等水杯洗好了,然后泡茶喝。
具體如下圖:
1.2.3. 回調(diào)模式的泡茶方法
前面提到,阻塞模式的效率不是最高的。
更高效率的是回調(diào)模式。主線程在等待的時(shí)間了,不是死等,而是去干讀書的活兒。等其他兩條線程完成后,通過回調(diào)方式,去完成泡茶的動(dòng)作。
在回調(diào)模式泡茶喝的模型中,還是三條線程,他們的工作稍微有些變動(dòng):
線程一:燒水線程
洗好水壺,灌上涼水,放在火上;燒好水后,去執(zhí)行泡茶回調(diào)。
線程二:清洗線程
洗茶壺、洗茶杯;清洗完成后,也去執(zhí)行一下泡茶的動(dòng)作。
線程三:主線程
分別啟動(dòng)燒水線程、清洗線程。然后去讀書。
具體如下圖:
嚴(yán)格來(lái)說(shuō),上圖是經(jīng)不起推敲的。
為啥呢? 那個(gè)泡茶喝回調(diào)方法,在執(zhí)行的流程上,不屬于主線程在執(zhí)行。只是在業(yè)務(wù)邏輯上,泡茶喝這個(gè)動(dòng)作與主線程上的其他動(dòng)作,關(guān)聯(lián)性更強(qiáng)。
上圖,更好的理解方式是,盡量站在業(yè)務(wù)流程的角度去理解。
回調(diào)不是唯一的非阻塞方式。
還有線程間通信、信號(hào)量等等,很多的非阻塞方式。但是回調(diào)卻是一種最好用的、也是開發(fā)中用的最多的線程間非阻塞的交互方式。
下面,從最原始的阻塞模式講起,起底整個(gè)異步回調(diào)模式。
?
1.3.?異步阻塞悶葫蘆——join
Java中,線程有一個(gè)join操作,也叫線程的合并。
join操作的作用,就是完成異步阻塞的工作——阻塞當(dāng)前的線程,直到異步的并發(fā)線程的執(zhí)行完成。
1.3.1. 線程的join 合并
如果線程A的執(zhí)行過程中,通過B.join操作,合并B線程,叫做線程的合并。合并的重要特點(diǎn)之一是,線程A進(jìn)入阻塞模式,直到B線程執(zhí)行完成。
為了方便表達(dá),模擬一下包工頭的甲方和乙方。
將發(fā)起合并的線程A叫做甲方線程,被發(fā)起的線程B為乙方線程。
簡(jiǎn)單的說(shuō),線程合并就是——甲方等待乙方執(zhí)行完成。換句話說(shuō),甲方將乙方線程合并到甲方線程。
在泡茶喝的例子中,主線程通過join操作,等待燒水線程和清洗線程。這就是一種異步阻塞。
1.3.2. join 異步阻塞實(shí)例代碼
先看實(shí)例,再看方法的詳細(xì)介紹。
泡茶喝的異步阻塞版本,實(shí)現(xiàn)如下:
package com.crazymakercircle.coccurent; import com.crazymakercircle.util.Print; /*** Created by 尼恩 at 瘋狂創(chuàng)客圈*/ public class JoinDemo {public static final int SLEEP_GAP = 500;public static String getCurThreadName() {return Thread.currentThread().getName();}static class HotWarterThread extends Thread {public HotWarterThread() {super("** 燒水-Thread");}public void run() {try {Print.tcfo("洗好水壺");Print.tcfo("灌上涼水");Print.tcfo("放在火上");//線程睡眠一段時(shí)間,代表燒水中Thread.sleep(SLEEP_GAP);Print.tcfo("水開了");} catch (InterruptedException e) {Print.tcfo(" 發(fā)生異常被中斷.");}Print.tcfo(" 運(yùn)行結(jié)束.");}}static class WashThread extends Thread {public WashThread() {super("$$ 清洗-Thread");}public void run() {try {Print.tcfo("洗茶壺");Print.tcfo("洗茶杯");Print.tcfo("拿茶葉");//線程睡眠一段時(shí)間,代表清洗中Thread.sleep(SLEEP_GAP);Print.tcfo("洗完了");} catch (InterruptedException e) {Print.tcfo(" 發(fā)生異常被中斷.");}Print.tcfo(" 運(yùn)行結(jié)束.");}}public static void main(String args[]) {Thread hThread = new HotWarterThread();Thread wThread = new WashThread();hThread.start();wThread.start();try {// 合并燒水-線程hThread.join();// 合并清洗-線程wThread.join();Thread.currentThread().setName("主線程");Print.tcfo("泡茶喝");} catch (InterruptedException e) {Print.tcfo(getCurThreadName() + "發(fā)生異常被中斷.");}Print.tcfo(getCurThreadName() + " 運(yùn)行結(jié)束.");} }演示程序中有三條線程:
一條是主線程main;
一條是燒水線程“hThread”;
一條是清洗線程“wThread”;
main線程,調(diào)用了hThread.join()實(shí)例方法,合并燒水線程,也調(diào)用了 wThread.join()實(shí)例方法,合并清洗線程。
另外說(shuō)明一下:hThread是這里的燒水線程實(shí)例的句柄,"** 燒水-Thread"是燒水線程實(shí)例的線程名稱,兩者不能混淆。
1.3.3. join方法的詳細(xì)介紹
join的方法應(yīng)用場(chǎng)景:異步阻塞場(chǎng)景。
具體來(lái)說(shuō):甲方(發(fā)起線程)的調(diào)用乙方(被發(fā)起線程)的join方法,等待乙方執(zhí)行完成;如果乙方?jīng)]有完成,甲方阻塞。
join是Thread類的一個(gè)實(shí)例方法,使用的方式大致如下:
// 合并燒水-線程 hThread.join(); // 合并清洗-線程 wThread.join();實(shí)際上,join方法是有三個(gè)重載版本:
實(shí)際上,join方法是有三個(gè)重載版本:
(1)void join(): 等待乙方線程執(zhí)行結(jié)束,甲方線程重啟執(zhí)行。
(2)void join(long millis): 等待乙方線程執(zhí)行一段時(shí)間,最長(zhǎng)等待時(shí)間為 millis 毫秒。超過millis 毫秒后,不論乙方是否結(jié)束,甲方線程重啟執(zhí)行。
(3)void join(long millis, int nanos): 等待乙方線程執(zhí)行一段時(shí)間,最長(zhǎng)等待時(shí)間為 millis 毫秒,加nanos 納秒。超過時(shí)間后,不論乙方是否結(jié)束,甲方線程重啟執(zhí)行。
強(qiáng)調(diào)一下容易混淆的幾點(diǎn):
(1)join方法是實(shí)例方法,需要使用線程句柄去調(diào)用,如thread.join();
(2)執(zhí)行到j(luò)oin代碼的時(shí)候,不是thread所指向的線程阻塞,而是當(dāng)前線程阻塞;
(3)thread線程代表的是被合并線程(乙方),當(dāng)前線程阻塞線程(甲方)。當(dāng)前線程讓出CPU,進(jìn)入等待狀態(tài)。
(4)只有等到thread線程執(zhí)行完成,或者超時(shí),當(dāng)前線程才能啟動(dòng)執(zhí)行。
join合并有一個(gè)很大的問題,就是沒有返回值。
如果燒水線程的水有問題,或者燒水壺壞了,mian線程是沒有辦法知道的。
如果清洗線程的茶杯有問題,清洗不來(lái)了,mian線程是沒有辦法知道的。
形象的說(shuō),join線程就是一個(gè)悶葫蘆。
還是異步阻塞,但是需要獲得結(jié)果,怎么辦呢?
可以使用java 的FutureTask 系列類。
1.4.?異步阻塞重武器——FutureTask系列類
FutureTask相關(guān)的類型,處于java.util.concurrent包中,不止一個(gè)類,是一個(gè)系列。同時(shí),這也是Java語(yǔ)言在1.5 版本之后提供了一種的新的多線程使用方法。
1.4.1. Callable接口
我們知道,異步線程的一個(gè)重要接口是Runnable,這里執(zhí)行異步線程的業(yè)務(wù)代碼。但是,Runnable的run方法有一個(gè)問題,它是沒有返回的。
因此,Runnable不能用在需要有異步返回值的異步場(chǎng)景。
Java語(yǔ)言在1.5 版本之后重新定義了一個(gè)新的、類似Runnable的接口,Callable接口,將run方法改為了call方法,并且?guī)狭朔祷刂怠?/p>
Callable的代碼如下:
package java.util.concurrent; @FunctionalInterface public interface Callable<V> {V call() throws Exception; }Callable接口位于java.util.concurrent包中,Callable接口是一個(gè)泛型接口。也是一個(gè)“函數(shù)式接口”。唯一的抽象方法call有返回值,返回值類型為泛型形參類型。call抽象方法還有一個(gè)Exception的異常聲明,容許方法的實(shí)現(xiàn)版本內(nèi)部的異常不經(jīng)過捕獲。
Callable接口類似于Runnable。不同的是,Runnable的唯一抽象方法run沒有返回值,也沒有強(qiáng)制審查異常的異常聲明。比較而言,Callable接口的功能更強(qiáng)大一些。
有一個(gè)異想天開的問題:
作為新版的Callable接口實(shí)例,能否作為Thread線程實(shí)例的target來(lái)使用呢?
答案是不能。
Callable接口與Runnable接口之間沒有任何的繼承關(guān)系,而且二者唯一方法在的名字上也不同。Callable接口實(shí)例沒有辦法作為Thread線程實(shí)例的target來(lái)使用。
我們知道,java里邊的線程類型,就是Thread。Callable需要異步執(zhí)行,就需要和Thread建立聯(lián)系。java提供了一個(gè)搭橋的角色——FutureTask類。
1.4.2. FutureTask類初探
顧名思義,這個(gè)是一個(gè)未來(lái)執(zhí)行的任務(wù),就相當(dāng)于新線程所執(zhí)行的操作。
FutureTask 類也位于 java.util.concurrent包。
FutureTask類 構(gòu)造函數(shù)的參數(shù)為 Callable,并且間接的繼承了Runnable接口。其構(gòu)造器代碼如下:
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }到了這里,FutureTask類的作用就大致明白了。
如果還不明白,看一段實(shí)例代碼:
Callable<Boolean> hJob = new HotWarterJob(); FutureTask<Boolean> hTask = new FutureTask<Boolean>(hJob); Thread hThread = new Thread(hTask, "** 燒水-Thread");FutureTask就像一座位于Callable與Thread之間的橋。FutureTask 封裝一個(gè)Callable,然后自身又作為Thread線程的target。
FutureTask還有一個(gè)十分重要的貢獻(xiàn)。
Thread線程執(zhí)行過程中,異步線程的代碼邏輯在Callable的call方法中,而call方法返回的結(jié)果,則需要通過 FutureTask 去獲取。
好了,這下就應(yīng)該基本清楚了。
總結(jié)一下FutureTask這個(gè)媒婆的作用:
(1)負(fù)責(zé)牽線
(2)通過媒婆取得結(jié)果
為了完成這個(gè)兩個(gè)偉大的使命,FutureTask有個(gè)相對(duì)比較復(fù)雜的繼承關(guān)系,具體如下圖:
首先,FutureTask實(shí)現(xiàn)了一個(gè)接口——RunnableFuture接口,而該RunnableFuture接口繼承了Runnable接口和Future接口。
Runnable接口我們很熟悉,就是那個(gè)java 線程Runnable,代表異步線程的代碼邏輯。
Future接口又是啥呢?
提前劇透下,這個(gè)接口,就是用來(lái)獲取異步線程結(jié)果的。
Future接口和Runnable接口一樣,都是牛氣沖天的接口。 而FutureTask 間接的實(shí)現(xiàn)這個(gè)兩大接口。
正因?yàn)镕utureTask能夠有兩個(gè)很牛逼的爹,所以自己家才很牛逼。
FutureTask 既能當(dāng)做一個(gè)Runnable 作為 target ,直接被Thread執(zhí)行;也能作為Future用來(lái)去取得Callable的計(jì)算結(jié)果。
1.4.3. Future接口
Future接口這個(gè)不是一個(gè)復(fù)雜的接口,梳理一下,主要提供了3大功能:
(1)獲取并發(fā)的任務(wù)完成后的執(zhí)行結(jié)果。
(2)能夠取消并發(fā)執(zhí)行中的任務(wù);
(3)判斷并發(fā)任務(wù)是否執(zhí)行完成;
當(dāng)然,第一點(diǎn)是最為常用的。也是這個(gè)接口的最初使命。
Future接口的代碼如下:
package java.util.concurrent; public interface Future<V> { ? boolean cancel(boolean mayInterruptRunning); ? boolean isCancelled(); ? boolean isDone(); ? V get() throws InterruptedException, ExecutionException; ? V get(long timeout,TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }對(duì)Future接口的方法,詳細(xì)說(shuō)明如下:
V get() :獲取并發(fā)任務(wù)執(zhí)行的結(jié)果。注意,這個(gè)方法是阻塞性的。如果并發(fā)任務(wù)沒有執(zhí)行完成,調(diào)用此方法的線程會(huì)一直阻塞,直到并發(fā)任務(wù)執(zhí)行完成。
V get(Long timeout , TimeUnit unit) :獲取并發(fā)任務(wù)執(zhí)行的結(jié)果。也是阻塞性的,但是會(huì)有阻塞的時(shí)間限制,如果阻塞時(shí)間超過設(shè)定的timeout時(shí)間,該方法將拋出異常。
boolean isDone():獲取并發(fā)任務(wù)的執(zhí)行狀態(tài)。如果任務(wù)執(zhí)行結(jié)束,返回true。
boolean isCancelled():獲取并發(fā)任務(wù)的取消狀態(tài)。如果任務(wù)完成前被取消,則返回true。
boolean cancel(boolean mayInterruptRunning):取消并發(fā)任務(wù)的執(zhí)行。
1.4.4. FutureTask再次深入
說(shuō)完了FutureTask的兩個(gè)爹,再來(lái)到FutureTask自身。
在FutureTask內(nèi)部,又有哪些成員和方法,具體的執(zhí)行并發(fā)任務(wù)、異步獲取任務(wù)結(jié)果的呢?
首先,FutureTask內(nèi)部有一個(gè) Callable類型的成員:
private Callable callable;
這個(gè)callable實(shí)例屬性,是構(gòu)造器傳進(jìn)來(lái)的。用來(lái)保存并發(fā)執(zhí)行的 Callable類型的任務(wù)。callable實(shí)例屬性,是構(gòu)造器強(qiáng)制性的,必須要在FutureTask實(shí)例構(gòu)造的時(shí)候進(jìn)行初始化。
其次,FutureTask內(nèi)部有一個(gè)run方法。
這個(gè)run方法,是Runnable接口在FutureTask內(nèi)部的實(shí)現(xiàn)。在這個(gè)run方法其中,會(huì)執(zhí)行到callable成員的call方法。執(zhí)行完成后,結(jié)果如何提供出去呢?這就是到了最后一點(diǎn)。
最后,FutureTask內(nèi)部有另一個(gè) Object 類型的重要成員——outcome實(shí)例屬性:
private Object outcome;掐指一算,就知道這個(gè)outcome屬性,是用來(lái)保存callable成員call方法的執(zhí)行結(jié)果。FutureTask類run方法執(zhí)行完成callable成員的call方法后,會(huì)將結(jié)果保存在outcome實(shí)例屬性,供FutureTask類的get實(shí)例方法獲取。
好了,重要將這個(gè)媒婆介紹完了。
如果還沒有清楚,不要緊,看一個(gè)實(shí)例就一目了然了。
1.4.5. 喝茶實(shí)例演進(jìn)之——獲取異步結(jié)果
回顧一下,前面的join悶葫蘆合并阻塞有一個(gè)很大的問題,就是沒有返回值。
如果燒水線程的水有問題,或者燒水壺壞了,mian線程是沒有辦法知道的。
如果清洗線程的茶杯有問題,清洗不來(lái)了,mian線程是沒有辦法知道的。
為了演示結(jié)果,給主類增加兩個(gè)成員:
static boolean warterOk = false; static boolean cupOk =false;代表燒水成功和清洗成功。初始值都為false。
燒水線程、清洗線程執(zhí)行完后,都需要返回結(jié)果。 主線程獲取后,保存在上面的兩個(gè)主類成員中。
廢話不多說(shuō),看代碼:
package com.crazymakercircle.coccurent; import com.crazymakercircle.util.Print; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; /*** Created by 尼恩 at 瘋狂創(chuàng)客圈*/ public class JavaFutureDemo {public static final int SLEEP_GAP = 500;public static String getCurThreadName(){return Thread.currentThread().getName();}static class HotWarterJob implements Callable<Boolean> //①{@Overridepublic Boolean call() throws Exception //②{try{Print.tcfo("洗好水壺");Print.tcfo("灌上涼水");Print.tcfo("放在火上");//線程睡眠一段時(shí)間,代表燒水中Thread.sleep(SLEEP_GAP);Print.tcfo("水開了");} catch (InterruptedException e){Print.tcfo(" 發(fā)生異常被中斷.");return false;}Print.tcfo(" 運(yùn)行結(jié)束.");return true;}}static class WashJob implements Callable<Boolean>{@Overridepublic Boolean call() throws Exception{try{Print.tcfo("洗茶壺");Print.tcfo("洗茶杯");Print.tcfo("拿茶葉");//線程睡眠一段時(shí)間,代表清洗中Thread.sleep(SLEEP_GAP);Print.tcfo("洗完了");} catch (InterruptedException e){Print.tcfo(" 清洗工作 發(fā)生異常被中斷.");return false;}Print.tcfo(" 清洗工作 運(yùn)行結(jié)束.");return true;}}static boolean warterOk = false;static boolean cupOk =false;public static void drinkTea(){if (warterOk && cupOk){Print.tcfo("泡茶喝");}else if (!warterOk){Print.tcfo("燒水失敗,沒有茶喝了");}else if (!cupOk){Print.tcfo("杯子洗不了,沒有茶喝了");}}public static void main(String args[]){Callable<Boolean> hJob = new HotWarterJob();//③FutureTask<Boolean> hTask =new FutureTask<Boolean>(hJob);//④Thread hThread = new Thread(hTask, "** 燒水-Thread");//⑤Callable<Boolean> wJob = new WashJob();//③FutureTask<Boolean> wTask =new FutureTask<Boolean>(wJob);//④Thread wThread = new Thread(wTask, "$$ 清洗-Thread");//⑤hThread.start();wThread.start();Thread.currentThread().setName("主線程");try{warterOk = hTask.get();cupOk = wTask.get(); // hThread.join(); // wThread.join();drinkTea();} catch (InterruptedException e){Print.tcfo(getCurThreadName() + "發(fā)生異常被中斷.");} catch (ExecutionException e){e.printStackTrace();}Print.tcfo(getCurThreadName() + " 運(yùn)行結(jié)束.");} }1.4.6. FutureTask使用流程
借助上面的喝茶實(shí)例代碼,說(shuō)明一下通過FutureTask獲取異步結(jié)果的流程步驟:
(1)異步代碼邏輯需要繼承Callable,通過call方法返回具體的值
static class WashJob implements Callable<Boolean> {@Overridepublic Boolean call() throws Exception{ //..業(yè)務(wù)代碼,并且有返回值 }(3)從異步邏輯到異步線程,需要媒婆類FutureTask搭橋
Callable<Boolean> hJob = new HotWarterJob();//異步邏輯 FutureTask<Boolean> hTask = new FutureTask<Boolean>(hJob);//媒婆實(shí)例 Thread hThread = new Thread(hTask, "** 燒水-Thread");//異步線程FutureTask和Callable都是泛型類,泛型參數(shù)表示返回結(jié)果的類型。所以,在使用的時(shí)候,倆個(gè)類型的泛型參數(shù)一定需要一致的。
FutureTask和Callable都是泛型類,泛型參數(shù)表示返回結(jié)果的類型。所以,在使用的時(shí)候,倆個(gè)類型的泛型參數(shù)一定需要一致的。
(3)取得異步線程的執(zhí)行結(jié)果,也需要FutureTask 媒婆實(shí)例做下二傳
warterOk = hTask.get();1.5.?Guava 的異步回調(diào)
在非常著名的google 提供的擴(kuò)展包 Guava中,提供了一種異步回調(diào)的解決方案。
為了實(shí)現(xiàn)異步回調(diào),Guava 對(duì)Java的Future 異步模式進(jìn)行能力導(dǎo)入:
(1)導(dǎo)入了一個(gè)新的接口 FutureCallback,代表回調(diào)執(zhí)行的業(yè)務(wù)邏輯
(2)對(duì)Java并發(fā)包中的 Future 接口進(jìn)行了擴(kuò)展,將回調(diào)邏輯作為監(jiān)聽器綁定到異步線程
1.5.1. 能力導(dǎo)入 —— FutureCallback
FutureCallback 是一個(gè)新增的接口,用來(lái)填寫回調(diào)邏輯。這個(gè)接口,是在實(shí)際開發(fā)中編程使用到的。回調(diào)的代碼,編寫在它的實(shí)現(xiàn)類中。
FutureCallback擁有兩個(gè)回調(diào)方法:
(1)onSuccess ,在異步線程執(zhí)行成功回調(diào)
(2)onFailure,在異步線程拋出異常時(shí)回調(diào)
FutureCallback的源碼如下:
public interface FutureCallback<V> {void onSuccess(@Nullable V var1);void onFailure(Throwable var1); }1.5.2. 能力擴(kuò)展 —— ListenableFuture
如果將回調(diào)方法,綁定到異步線程去呢?
Guava中,有一個(gè)非常關(guān)鍵的角色,ListenableFuture。看名稱,就能對(duì)應(yīng)出它與Java 中的原生接口的親戚關(guān)系。
如果沒有猜錯(cuò),這個(gè)接口是 Guava 對(duì)java 的Future接口的擴(kuò)展。
來(lái)看看 ListenableFuture接口的源碼,如下:
package com.google.common.util.concurrent; import java.util.concurrent.Executor; import java.util.concurrent.Future; public interface ListenableFuture<V> extends Future<V> {void addListener(Runnable var1, Executor var2); }前面講到,通過Java的Future接口,可以阻塞取得異步的結(jié)果。在這個(gè)基礎(chǔ)上,ListenableFuture增加了一個(gè)方法 —— addListener 。
這個(gè)方法的作用,就是將前一小節(jié)的FutureCallback 回調(diào)邏輯,綁定到異步線程上。 可以是,addListener 不直接在實(shí)際編程中使用。這個(gè)方法只在Guava內(nèi)部使用,如果對(duì)它感興趣,可以查看Guava源碼。
既然addListener 方法不能直接使用,那么,在實(shí)際編程中,如何將 FutureCallback 回調(diào)邏輯綁定到異步線程呢?
不慌,辦法總是有的。
需要用到Guava的Futures 工具類。這個(gè)類有一個(gè)addCallback 靜態(tài)方法,將ListenableFuture 的實(shí)例和FutureCallback 的回調(diào)實(shí)例,進(jìn)行綁定。
綁定的示意代碼如下:
Futures.addCallback( hFuture , new FutureCallback<Boolean>() {public void onSuccess(Boolean r){ //成功時(shí)候的回調(diào)邏輯}public void onFailure(Throwable t){//異常時(shí)候的回調(diào)邏輯 ? } });1.5.3. ListenableFuture 實(shí)例從何而來(lái)
從上文已知,原生java的Future接口的實(shí)例,一種方法是——直接構(gòu)建媒婆類FutureTask的實(shí)例,就是Future接口的實(shí)例。
當(dāng)然,還有第二種方法,就是通過線程池獲取Future接口的實(shí)例。具體的做法是向Java線程池提交異步任務(wù),包括Runnable或者Callable實(shí)例。
方法如下:
Future<Boolean> hTask = pool.submit(hJob); Future<Boolean> wTask = pool.submit(wJob);注意,pool 是一個(gè)Java 線程池。
如果要獲取Guava的ListenableFuture 實(shí)例,主要是通過類似上面的第二種方式——向線程池提交任務(wù)的異步任務(wù)的方式獲取。不過,用到的線程池,是Guava的線程池,不是Java的線程池。
Guava線程池,而是對(duì)Java線程池的一種裝飾。
兩種線程池的創(chuàng)建代碼,具體如下:
//java 線程池 ExecutorService jPool =Executors.*newFixedThreadPool*(10);//guava 線程池 ListeningExecutorService gPool =MoreExecutors.*listeningDecorator*(jPool);有了Guava的線程池之后,就可以通過提交任務(wù),來(lái)獲取ListenableFuture 實(shí)例了。代碼如下 :
ListenableFuture<Boolean> hFuture = gPool.submit(hJob);關(guān)于Gava的線程池,請(qǐng)關(guān)注【瘋狂創(chuàng)客圈】的線程池的博客文章。
1.5.4. Guava異步回調(diào)的流程
總結(jié)一下,Guava異步回調(diào)的流程如下:
第一步:創(chuàng)建Java的 Callable的異步任務(wù)實(shí)例。實(shí)例如下:
Callable<Boolean> hJob = new HotWarterJob();//異步任務(wù)Callable<Boolean> wJob = new WashJob();//異步任務(wù)異步任務(wù)也可以是Runnable類型。
第二步: 獲取Guava線程池
//java 線程池 ExecutorService jPool =Executors.*newFixedThreadPool*(10); //guava 線程池 ListeningExecutorService gPool =MoreExecutors.*listeningDecorator*(jPool);?第三步: 提交異步任務(wù)到Guava線程池,獲取ListenableFuture 實(shí)例
ListenableFuture<Boolean> hFuture = gPool.submit(hJob);第四步:創(chuàng)建回調(diào)的 FutureCallback 實(shí)例,通過Futures.addCallback,將回調(diào)邏輯綁定到ListenableFuture 實(shí)例。
Futures.*addCallback*( hFuture , new FutureCallback<Boolean>() {public void onSuccess(Boolean r){ //成功時(shí)候的回調(diào)邏輯}public void onFailure(Throwable t){//異常時(shí)候的回調(diào)邏輯 ? } });?完成以上四步,當(dāng)異步邏輯執(zhí)行完成后,就會(huì)回調(diào)FutureCallback 實(shí)例中的回調(diào)代碼。
1.5.5. 喝茶實(shí)例 —— 異步回調(diào)演進(jìn)
已經(jīng)對(duì)喝茶實(shí)例的代碼非常熟悉下,下面是Guava的異步回調(diào)的演進(jìn)版本,代碼如下:
package com.crazymakercircle.coccurent; import com.crazymakercircle.util.Print; import com.google.common.util.concurrent.*; import java.util.concurrent.*; /*** Created by 尼恩 at 瘋狂創(chuàng)客圈*/ public class GuavaFutureDemo {public static final int SLEEP_GAP = 500;public static String getCurThreadName(){return Thread.currentThread().getName();}static class HotWarterJob implements Callable<Boolean> //①{@Overridepublic Boolean call() throws Exception //②{try{Print.tcfo("洗好水壺");Print.tcfo("灌上涼水");Print.tcfo("放在火上");//線程睡眠一段時(shí)間,代表燒水中Thread.sleep(SLEEP_GAP);Print.tcfo("水開了");} catch (InterruptedException e){Print.tcfo(" 發(fā)生異常被中斷.");return false;}Print.tcfo(" 運(yùn)行結(jié)束.");return true;}}static class WashJob implements Callable<Boolean>{@Overridepublic Boolean call() throws Exception{try{Print.tcfo("洗茶壺");Print.tcfo("洗茶杯");Print.tcfo("拿茶葉");//線程睡眠一段時(shí)間,代表清洗中Thread.sleep(SLEEP_GAP);Print.tcfo("洗完了");} catch (InterruptedException e){Print.tcfo(" 清洗工作 發(fā)生異常被中斷.");return false;}Print.tcfo(" 清洗工作 運(yùn)行結(jié)束.");return true;}}static boolean warterOk = false;static boolean cupOk = false;public synchronized static void drinkTea(){if (warterOk && cupOk){Print.tcfo("泡茶喝");}else if (!warterOk){Print.tcfo("燒水失敗,沒有茶喝了");}else if (!cupOk){Print.tcfo("杯子洗不了,沒有茶喝了");}}public static void main(String args[]){Thread.currentThread().setName("主線程");Callable<Boolean> hJob = new HotWarterJob();//③Callable<Boolean> wJob = new WashJob();//③//java 線程池ExecutorService jPool =Executors.newFixedThreadPool(10);//guava 線程池ListeningExecutorService gPool =MoreExecutors.listeningDecorator(jPool);ListenableFuture<Boolean> hFuture = gPool.submit(hJob);Futures.addCallback(hFuture, new FutureCallback<Boolean>(){public void onSuccess(Boolean r){if (r){warterOk = true;drinkTea();}else{Print.tcfo("燒水失敗,沒有茶喝了");}}public void onFailure(Throwable t){Print.tcfo("燒水失敗,沒有茶喝了");}});ListenableFuture<Boolean> wFuture = gPool.submit(wJob);Futures.addCallback(wFuture, new FutureCallback<Boolean>(){public void onSuccess(Boolean r){if (r){cupOk = true;drinkTea();}else{Print.tcfo("清洗失敗,沒有茶喝了");}}public void onFailure(Throwable t){Print.tcfo("杯子洗不了,沒有茶喝了");}});try{Print.tcfo("讀書中......");Thread.sleep(100000);} catch (InterruptedException e){Print.tcfo(getCurThreadName() + "發(fā)生異常被中斷.");}Print.tcfo(getCurThreadName() + " 運(yùn)行結(jié)束.");gPool.shutdown();} }總結(jié)
以上是生活随笔為你收集整理的【JUC系列】Future异步回调模式的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【JUC系列】Java的锁机制
- 下一篇: 【计算机IO系列零】应用软件部分