Java 8 CompletableFuture 教程
Java 8 有大量的新特性和增強(qiáng)如 Lambda 表達(dá)式,Streams,CompletableFuture等。在本篇文章中我將詳細(xì)解釋清楚CompletableFuture以及它所有方法的使用。
什么是CompletableFuture?
在Java中CompletableFuture用于異步編程,異步編程是編寫非阻塞的代碼,運(yùn)行的任務(wù)在一個(gè)單獨(dú)的線程,與主線程隔離,并且會(huì)通知主線程它的進(jìn)度,成功或者失敗。
在這種方式中,主線程不會(huì)被阻塞,不需要一直等到子線程完成。主線程可以并行的執(zhí)行其他任務(wù)。
使用這種并行方式,可以極大的提高程序的性能。
Future vs CompletableFuture
CompletableFuture 是 Future API的擴(kuò)展。
Future 被用于作為一個(gè)異步計(jì)算結(jié)果的引用。提供一個(gè) isDone() 方法來檢查計(jì)算任務(wù)是否完成。當(dāng)任務(wù)完成時(shí),get() 方法用來接收計(jì)算任務(wù)的結(jié)果。
從 Callbale和 Future 教程可以學(xué)習(xí)更多關(guān)于 Future 知識(shí).
Future API 是非常好的 Java 異步編程進(jìn)階,但是它缺乏一些非常重要和有用的特性。
Future 的局限性
當(dāng)你寫了一個(gè)函數(shù),用于通過一個(gè)遠(yuǎn)程API獲取一個(gè)電子商務(wù)產(chǎn)品最新價(jià)格。因?yàn)檫@個(gè) API 太耗時(shí),你把它允許在一個(gè)獨(dú)立的線程中,并且從你的函數(shù)中返回一個(gè) Future。現(xiàn)在假設(shè)這個(gè)API服務(wù)宕機(jī)了,這時(shí)你想通過該產(chǎn)品的最新緩存價(jià)格手工完成這個(gè)Future 。你會(huì)發(fā)現(xiàn)無法這樣做。
Future 不會(huì)通知你它已經(jīng)完成了,它提供了一個(gè)阻塞的 get() 方法通知你結(jié)果。你無法給 Future 植入一個(gè)回調(diào)函數(shù),當(dāng) Future 結(jié)果可用的時(shí)候,用該回調(diào)函數(shù)自動(dòng)的調(diào)用 Future 的結(jié)果。
有時(shí)候你需要執(zhí)行一個(gè)長時(shí)間運(yùn)行的計(jì)算任務(wù),并且當(dāng)計(jì)算任務(wù)完成的時(shí)候,你需要把它的計(jì)算結(jié)果發(fā)送給另外一個(gè)長時(shí)間運(yùn)行的計(jì)算任務(wù)等等。你會(huì)發(fā)現(xiàn)你無法使用 Future 創(chuàng)建這樣的一個(gè)工作流。
假設(shè)你有10個(gè)不同的Future,你想并行的運(yùn)行,然后在它們運(yùn)行未完成后運(yùn)行一些函數(shù)。你會(huì)發(fā)現(xiàn)你也無法使用 Future 這樣做。
Future API 沒有任務(wù)的異常處理結(jié)構(gòu)居然有如此多的限制,幸好我們有CompletableFuture,你可以使用 CompletableFuture 達(dá)到以上所有目的。
CompletableFuture 實(shí)現(xiàn)了 Future 和 CompletionStage接口,并且提供了許多關(guān)于創(chuàng)建,鏈?zhǔn)秸{(diào)用和組合多個(gè) Future 的便利方法集,而且有廣泛的異常處理支持。
創(chuàng)建 CompletableFuture
1. 簡單的例子
可以使用如下無參構(gòu)造函數(shù)簡單的創(chuàng)建 CompletableFuture:
這是一個(gè)最簡單的 CompletableFuture,想獲取CompletableFuture 的結(jié)果可以使用 CompletableFuture.get() 方法:
String result = completableFuture.get()get() 方法會(huì)一直阻塞直到 Future 完成。因此,以上的調(diào)用將被永遠(yuǎn)阻塞,因?yàn)樵揊uture一直不會(huì)完成。
你可以使用 CompletableFuture.complete() 手工的完成一個(gè) Future:
completableFuture.complete("Future's Result")所有等待這個(gè) Future 的客戶端都將得到一個(gè)指定的結(jié)果,并且 completableFuture.complete() 之后的調(diào)用將被忽略。
2. 使用 runAsync() 運(yùn)行異步計(jì)算
如果你想異步的運(yùn)行一個(gè)后臺(tái)任務(wù)并且不想改任務(wù)返回任務(wù)東西,這時(shí)候可以使用 CompletableFuture.runAsync()方法,它持有一個(gè)Runnable 對(duì)象,并返回 CompletableFuture<Void>。
你也可以以 lambda 表達(dá)式的形式傳入 Runnable 對(duì)象:
// Using Lambda Expression CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {// Simulate a long-running Job try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new IllegalStateException(e);}System.out.println("I'll run in a separate thread than the main thread."); });在本文中,我使用lambda表達(dá)式會(huì)比較頻繁,如果以前你沒有使用過,建議你也多使用lambda 表達(dá)式。
3. 使用 supplyAsync() 運(yùn)行一個(gè)異步任務(wù)并且返回結(jié)果
當(dāng)任務(wù)不需要返回任何東西的時(shí)候, CompletableFuture.runAsync() 非常有用。但是如果你的后臺(tái)任務(wù)需要返回一些結(jié)果應(yīng)該要怎么樣?
CompletableFuture.supplyAsync() 就是你的選擇。它持有supplier<T> 并且返回CompletableFuture<T>,T 是通過調(diào)用 傳入的supplier取得的值的類型。
// Run a task specified by a Supplier object asynchronously CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new IllegalStateException(e);}return "Result of the asynchronous computation";} });// Block and get the result of the Future String result = future.get(); System.out.println(result);Supplier<T> 是一個(gè)簡單的函數(shù)式接口,表示supplier的結(jié)果。它有一個(gè)get()方法,該方法可以寫入你的后臺(tái)任務(wù)中,并且返回結(jié)果。
你可以使用lambda表達(dá)式使得上面的示例更加簡明:
// Using Lambda Expression CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new IllegalStateException(e);}return "Result of the asynchronous computation"; }); 一個(gè)關(guān)于Executor 和Thread Pool筆記你可能想知道,我們知道runAsync() 和supplyAsync()方法在單獨(dú)的線程中執(zhí)行他們的任務(wù)。但是我們不會(huì)永遠(yuǎn)只創(chuàng)建一個(gè)線程。
CompletableFuture可以從全局的 ForkJoinPool.commonPool()獲得一個(gè)線程中執(zhí)行這些任務(wù)。
但是你也可以創(chuàng)建一個(gè)線程池并傳給runAsync() 和supplyAsync()方法來讓他們從線程池中獲取一個(gè)線程執(zhí)行它們的任務(wù)。
CompletableFuture API 的所有方法都有兩個(gè)變體-一個(gè)接受Executor作為參數(shù),另一個(gè)不這樣: // Variations of runAsync() and supplyAsync() methods static CompletableFuture<Void> runAsync(Runnable runnable) static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
創(chuàng)建一個(gè)線程池,并傳遞給其中一個(gè)方法:
Executor executor = Executors.newFixedThreadPool(10); CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new IllegalStateException(e);}return "Result of the asynchronous computation"; }, executor);在 CompletableFuture 轉(zhuǎn)換和運(yùn)行
CompletableFuture.get()方法是阻塞的。它會(huì)一直等到Future完成并且在完成后返回結(jié)果。
但是,這是我們想要的嗎?對(duì)于構(gòu)建異步系統(tǒng),我們應(yīng)該附上一個(gè)回調(diào)給CompletableFuture,當(dāng)Future完成的時(shí)候,自動(dòng)的獲取結(jié)果。
如果我們不想等待結(jié)果返回,我們可以把需要等待Future完成執(zhí)行的邏輯寫入到回調(diào)函數(shù)中。
可以使用 thenApply(), thenAccept() 和thenRun()方法附上一個(gè)回調(diào)給CompletableFuture。
1. thenApply()
可以使用 thenApply() 處理和改變CompletableFuture的結(jié)果。持有一個(gè)Function<R,T>作為參數(shù)。Function<R,T>是一個(gè)簡單的函數(shù)式接口,接受一個(gè)T類型的參數(shù),產(chǎn)出一個(gè)R類型的結(jié)果。
你也可以通過附加一系列的thenApply()在回調(diào)方法 在CompletableFuture寫一個(gè)連續(xù)的轉(zhuǎn)換。這樣的話,結(jié)果中的一個(gè) thenApply方法就會(huì)傳遞給該系列的另外一個(gè) thenApply方法。
CompletableFuture<String> welcomeText = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new IllegalStateException(e);}return "Rajeev"; }).thenApply(name -> {return "Hello " + name; }).thenApply(greeting -> {return greeting + ", Welcome to the CalliCoder Blog"; });System.out.println(welcomeText.get()); // Prints - Hello Rajeev, Welcome to the CalliCoder Blog2. thenAccept() 和 thenRun()
如果你不想從你的回調(diào)函數(shù)中返回任何東西,僅僅想在Future完成后運(yùn)行一些代碼片段,你可以使用thenAccept() 和 thenRun()方法,這些方法經(jīng)常在調(diào)用鏈的最末端的最后一個(gè)回調(diào)函數(shù)中使用。
CompletableFuture.thenAccept() 持有一個(gè)Consumer<T> ,返回一個(gè)CompletableFuture<Void>。它可以訪問CompletableFuture的結(jié)果:
雖然thenAccept()可以訪問CompletableFuture的結(jié)果,但thenRun()不能訪Future的結(jié)果,它持有一個(gè)Runnable返回CompletableFuture<Void>:
// thenRun() example CompletableFuture.supplyAsync(() -> {// Run some computation }).thenRun(() -> {// Computation Finished. }); 異步回調(diào)方法的筆記CompletableFuture提供的所有回調(diào)方法都有兩個(gè)變體:
`// thenApply() variants
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)`
這些異步回調(diào)變體通過在獨(dú)立的線程中執(zhí)行回調(diào)任務(wù)幫助你進(jìn)一步執(zhí)行并行計(jì)算。
以下示例: CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new IllegalStateException(e);}return "Some Result" }).thenApply(result -> {/* Executed in the same thread where the supplyAsync() task is executedor in the main thread If the supplyAsync() task completes immediately (Remove sleep() call to verify)*/return "Processed Result" })
在以上示例中,在thenApply()中的任務(wù)和在supplyAsync()中的任務(wù)執(zhí)行在相同的線程中。任何supplyAsync()立即執(zhí)行完成,那就是執(zhí)行在主線程中(嘗試刪除sleep測試下)。
為了控制執(zhí)行回調(diào)任務(wù)的線程,你可以使用異步回調(diào)。如果你使用thenApplyAsync()回調(diào),將從ForkJoinPool.commonPool()獲取不同的線程執(zhí)行。
此外,如果你傳入一個(gè)Executor到thenApplyAsync()回調(diào)中,,任務(wù)將從Executor線程池獲取一個(gè)線程執(zhí)行。
Executor executor = Executors.newFixedThreadPool(2); CompletableFuture.supplyAsync(() -> {return "Some result" }).thenApplyAsync(result -> {// Executed in a thread obtained from the executorreturn "Processed Result" }, executor);組合兩個(gè)CompletableFuture
1. 使用 thenCompose() 組合兩個(gè)獨(dú)立的future
假設(shè)你想從一個(gè)遠(yuǎn)程API中獲取一個(gè)用戶的詳細(xì)信息,一旦用戶信息可用,你想從另外一個(gè)服務(wù)中獲取他的貸方。
考慮下以下兩個(gè)方法getUserDetail() 和getCreditRating()的實(shí)現(xiàn):
現(xiàn)在讓我們弄明白當(dāng)使用了thenApply()后是否會(huì)達(dá)到我們期望的結(jié)果-
CompletableFuture<CompletableFuture<Double>> result = getUserDetail(userId) .thenApply(user -> getCreditRating(user));在更早的示例中,Supplier函數(shù)傳入thenApply將返回一個(gè)簡單的值,但是在本例中,將返回一個(gè)CompletableFuture。以上示例的最終結(jié)果是一個(gè)嵌套的CompletableFuture。
如果你想獲取最終的結(jié)果給最頂層future,使用 thenCompose()方法代替-
因此,規(guī)則就是-如果你的回調(diào)函數(shù)返回一個(gè)CompletableFuture,但是你想從CompletableFuture鏈中獲取一個(gè)直接合并后的結(jié)果,這時(shí)候你可以使用thenCompose()。
2. 使用thenCombine()組合兩個(gè)獨(dú)立的 future
雖然thenCompose()被用于當(dāng)一個(gè)future依賴另外一個(gè)future的時(shí)候用來組合兩個(gè)future。thenCombine()被用來當(dāng)兩個(gè)獨(dú)立的Future都完成的時(shí)候,用來做一些事情。
當(dāng)兩個(gè)Future都完成的時(shí)候,傳給``thenCombine()的回調(diào)函數(shù)將被調(diào)用。
組合多個(gè)CompletableFuture
我們使用thenCompose() 和 thenCombine()把兩個(gè)CompletableFuture組合在一起。現(xiàn)在如果你想組合任意數(shù)量的CompletableFuture,應(yīng)該怎么做?我們可以使用以下兩個(gè)方法組合任意數(shù)量的CompletableFuture。
static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)1. CompletableFuture.allOf()
CompletableFuture.allOf的使用場景是當(dāng)你一個(gè)列表的獨(dú)立future,并且你想在它們都完成后并行的做一些事情。
假設(shè)你想下載一個(gè)網(wǎng)站的100個(gè)不同的頁面。你可以串行的做這個(gè)操作,但是這非常消耗時(shí)間。因此你想寫一個(gè)函數(shù),傳入一個(gè)頁面鏈接,返回一個(gè)CompletableFuture,異步的下載頁面內(nèi)容。
CompletableFuture<String> downloadWebPage(String pageLink) {return CompletableFuture.supplyAsync(() -> {// Code to download and return the web page's content}); }現(xiàn)在,當(dāng)所有的頁面已經(jīng)下載完畢,你想計(jì)算包含關(guān)鍵字CompletableFuture頁面的數(shù)量。可以使用CompletableFuture.allOf()達(dá)成目的。
List<String> webPageLinks = Arrays.asList(...) // A list of 100 web page links// Download contents of all the web pages asynchronously List<CompletableFuture<String>> pageContentFutures = webPageLinks.stream().map(webPageLink -> downloadWebPage(webPageLink)).collect(Collectors.toList());// Create a combined Future using allOf() CompletableFuture<Void> allFutures = CompletableFuture.allOf(pageContentFutures.toArray(new CompletableFuture[pageContentFutures.size()]) );使用CompletableFuture.allOf()的問題是它返回CompletableFuture<Void>。但是我們可以通過寫一些額外的代碼來獲取所有封裝的CompletableFuture結(jié)果。
// When all the Futures are completed, call `future.join()` to get their results and collect the results in a list - CompletableFuture<List<String>> allPageContentsFuture = allFutures.thenApply(v -> {return pageContentFutures.stream().map(pageContentFuture -> pageContentFuture.join()).collect(Collectors.toList()); });花一些時(shí)間理解下以上代碼片段。當(dāng)所有future完成的時(shí)候,我們調(diào)用了future.join(),因此我們不會(huì)在任何地方阻塞。
join()方法和get()方法非常類似,這唯一不同的地方是如果最頂層的CompletableFuture完成的時(shí)候發(fā)生了異常,它會(huì)拋出一個(gè)未經(jīng)檢查的異常。
現(xiàn)在讓我們計(jì)算包含關(guān)鍵字頁面的數(shù)量。
// Count the number of web pages having the "CompletableFuture" keyword. CompletableFuture<Long> countFuture = allPageContentsFuture.thenApply(pageContents -> {return pageContents.stream().filter(pageContent -> pageContent.contains("CompletableFuture")).count(); });System.out.println("Number of Web Pages having CompletableFuture keyword - " + countFuture.get());2. CompletableFuture.anyOf()
CompletableFuture.anyOf()和其名字介紹的一樣,當(dāng)任何一個(gè)CompletableFuture完成的時(shí)候【相同的結(jié)果類型】,返回一個(gè)新的CompletableFuture。以下示例:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new IllegalStateException(e);}return "Result of Future 1"; });CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new IllegalStateException(e);}return "Result of Future 2"; });CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {throw new IllegalStateException(e);}return "Result of Future 3"; });CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);System.out.println(anyOfFuture.get()); // Result of Future 2在以上示例中,當(dāng)三個(gè)中的任何一個(gè)CompletableFuture完成, anyOfFuture就會(huì)完成。因?yàn)閒uture2的休眠時(shí)間最少,因此她最先完成,最終的結(jié)果將是future2的結(jié)果。
CompletableFuture.anyOf()傳入一個(gè)Future可變參數(shù),返回CompletableFuture<Object>。CompletableFuture.anyOf()的問題是如果你的CompletableFuture返回的結(jié)果是不同類型的,這時(shí)候你講會(huì)不知道你最終CompletableFuture是什么類型。
CompletableFuture 異常處理
我們探尋了怎樣創(chuàng)建CompletableFuture,轉(zhuǎn)換它們,并組合多個(gè)CompletableFuture。現(xiàn)在讓我們弄明白當(dāng)發(fā)生錯(cuò)誤的時(shí)候我們應(yīng)該怎么做。
首先讓我們明白在一個(gè)回調(diào)鏈中錯(cuò)誤是怎么傳遞的。思考下以下回調(diào)鏈:
CompletableFuture.supplyAsync(() -> {// Code which might throw an exceptionreturn "Some result"; }).thenApply(result -> {return "processed result"; }).thenApply(result -> {return "result after further processing"; }).thenAccept(result -> {// do something with the final result });如果在原始的supplyAsync()任務(wù)中發(fā)生一個(gè)錯(cuò)誤,這時(shí)候沒有任何thenApply會(huì)被調(diào)用并且future將以一個(gè)異常結(jié)束。如果在第一個(gè)thenApply發(fā)生錯(cuò)誤,這時(shí)候第二個(gè)和第三個(gè)將不會(huì)被調(diào)用,同樣的,future將以異常結(jié)束。
1. 使用 exceptionally() 回調(diào)處理異常
exceptionally()回調(diào)給你一個(gè)從原始Future中生成的錯(cuò)誤恢復(fù)的機(jī)會(huì)。你可以在這里記錄這個(gè)異常并返回一個(gè)默認(rèn)值。
2. 使用 handle() 方法處理異常
API提供了一個(gè)更通用的方法 - handle()從異常恢復(fù),無論一個(gè)異常是否發(fā)生它都會(huì)被調(diào)用。
如果異常發(fā)生,res參數(shù)將是 null,否則,ex將是 null。
總結(jié)
以上是生活随笔為你收集整理的Java 8 CompletableFuture 教程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 逻辑判断-if语句/文件目录属性判断/c
- 下一篇: Java集合Stack源码深入解析