通过实例理解 JDK8 的 CompletableFuture
轉(zhuǎn)載自?通過(guò)實(shí)例理解 JDK8 的 CompletableFuture
?
前言
Java 5 并發(fā)庫(kù)主要關(guān)注于異步任務(wù)的處理,它采用了這樣一種模式,producer 線程創(chuàng)建任務(wù)并且利用阻塞隊(duì)列將其傳遞給任務(wù)的 consumer。這種模型在 Java 7 和 8 中進(jìn)一步發(fā)展,并且開(kāi)始支持另外一種風(fēng)格的任務(wù)執(zhí)行,那就是將任務(wù)的數(shù)據(jù)集分解為子集,每個(gè)子集都可以由獨(dú)立且同質(zhì)的子任務(wù)來(lái)負(fù)責(zé)處理。
這種風(fēng)格的基礎(chǔ)庫(kù)也就是 fork/join 框架,它允許程序員規(guī)定數(shù)據(jù)集該如何進(jìn)行分割,并且支持將子任務(wù)提交到默認(rèn)的標(biāo)準(zhǔn)線程池中,也就是"通用的"ForkJoinPool。Java 8 中,fork/join 并行功能借助并行流的機(jī)制變得更加具有可用性。但是,不是所有的問(wèn)題都適合這種風(fēng)格的并行處理:所處理的元素必須是獨(dú)立的,數(shù)據(jù)集要足夠大,并且在并行加速方面,每個(gè)元素的處理成本要足夠高,這樣才能補(bǔ)償建立 fork/join 框架所消耗的成本。CompletableFuture 類則是 Java 8 在并行流方面的創(chuàng)新。
準(zhǔn)備知識(shí)
異步計(jì)算
所謂異步調(diào)用其實(shí)就是實(shí)現(xiàn)一個(gè)可無(wú)需等待被調(diào)用函數(shù)的返回值而讓操作繼續(xù)運(yùn)行的方法。在 Java 語(yǔ)言中,簡(jiǎn)單的講就是另啟一個(gè)線程來(lái)完成調(diào)用中的部分計(jì)算,使調(diào)用繼續(xù)運(yùn)行或返回,而不需要等待計(jì)算結(jié)果。但調(diào)用者仍需要取線程的計(jì)算結(jié)果。
回調(diào)函數(shù)
回調(diào)函數(shù)比較通用的解釋是,它是一個(gè)通過(guò)函數(shù)指針調(diào)用的函數(shù)。如果你把函數(shù)的指針(地址)作為參數(shù)傳遞給另一個(gè)函數(shù),當(dāng)這個(gè)指針被用為調(diào)用它所指向的函數(shù)時(shí),我們就說(shuō)這是回調(diào)函數(shù)。回調(diào)函數(shù)不是由該函數(shù)的實(shí)現(xiàn)方直接調(diào)用,而是在特定的事件或條件發(fā)生時(shí)由另外一方調(diào)用的,用于對(duì)該事件或條件進(jìn)行響應(yīng)。
回調(diào)函數(shù)的機(jī)制:
(1)定義一個(gè)回調(diào)函數(shù);
(2)提供函數(shù)實(shí)現(xiàn)的一方在初始化時(shí)候,將回調(diào)函數(shù)的函數(shù)指針注冊(cè)給調(diào)用者;
(3)當(dāng)特定的事件或條件發(fā)生的時(shí)候,調(diào)用者使用函數(shù)指針調(diào)用回調(diào)函數(shù)對(duì)事件進(jìn)行處理。
回調(diào)函數(shù)通常與原始調(diào)用者處于同一層次,如圖 1 所示:
圖 1 回調(diào)函數(shù)示例圖
Future 接口介紹
JDK5 新增了 Future 接口,用于描述一個(gè)異步計(jì)算的結(jié)果。雖然 Future 以及相關(guān)使用方法提供了異步執(zhí)行任務(wù)的能力,但是對(duì)于結(jié)果的獲取卻是很不方便,只能通過(guò)阻塞或者輪詢的方式得到任務(wù)的結(jié)果。阻塞的方式顯然和我們的異步編程的初衷相違背,輪詢的方式又會(huì)耗費(fèi)無(wú)謂的 CPU 資源,而且也不能及時(shí)地得到計(jì)算結(jié)果,為什么不能用觀察者設(shè)計(jì)模式呢?即當(dāng)計(jì)算結(jié)果完成及時(shí)通知監(jiān)聽(tīng)者。
有一些開(kāi)源框架實(shí)現(xiàn)了我們的設(shè)想,例如 Netty 的 ChannelFuture 類擴(kuò)展了 Future 接口,通過(guò)提供 addListener 方法實(shí)現(xiàn)支持回調(diào)方式的異步編程。Netty 中所有的 I/O 操作都是異步的,這意味著任何的 I/O 調(diào)用都將立即返回,而不保證這些被請(qǐng)求的 I/O 操作在調(diào)用結(jié)束的時(shí)候已經(jīng)完成。取而代之地,你會(huì)得到一個(gè)返回的 ChannelFuture 實(shí)例,這個(gè)實(shí)例將給你一些關(guān)于 I/O 操作結(jié)果或者狀態(tài)的信息。當(dāng)一個(gè) I/O 操作開(kāi)始的時(shí)候,一個(gè)新的 Future 對(duì)象就會(huì)被創(chuàng)建。在開(kāi)始的時(shí)候,新的 Future 是未完成的狀態(tài)--它既非成功、失敗,也非被取消,因?yàn)?I/O 操作還沒(méi)有結(jié)束。如果 I/O 操作以成功、失敗或者被取消中的任何一種狀態(tài)結(jié)束了,那么這個(gè) Future 將會(huì)被標(biāo)記為已完成,并包含更多詳細(xì)的信息(例如:失敗的原因)。請(qǐng)注意,即使是失敗和被取消的狀態(tài),也是屬于已完成的狀態(tài)。阻塞方式的示例代碼如清單 1 所示。
清單 1 阻塞方式示例代碼
| 1 2 3 4 5 6 | // Start the connection attempt. ChannelFuture Future = bootstrap.connect(new InetSocketAddress(host, port)); // Wait until the connection is closed or the connection attempt fails. Future.getChannel().getCloseFuture().awaitUninterruptibly(); // Shut down thread pools to exit. bootstrap.releaseExternalResources(); |
上面代碼使用的是 awaitUninterruptibly 方法,源代碼如清單 2 所示。
清單 2 awaitUninterruptibly 源代碼
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | publicChannelFutureawaitUninterruptibly() { ????boolean interrupted = false; ????synchronized (this) { ????????//循環(huán)等待到完成 ????????while (!done) { ????????????checkDeadLock(); ????????????waiters++; ????????try { ????????????wait(); ????????} catch (InterruptedException e) { ????????????//不允許中斷 ????????????interrupted = true; ????????} finally { ????????????waiters--; ????????} ????} } ????if (interrupted) { ????Thread.currentThread().interrupt(); } return this; } |
清單 3 異步非阻塞方式示例代碼
| 1 2 3 4 5 6 7 8 9 10 | // Start the connection attempt. ChannelFuture Future = bootstrap.connect(new InetSocketAddress(host, port)); Future.addListener(new ChannelFutureListener(){ ????public void operationComplete(final ChannelFuture Future) ????????throws Exception ????????{?????????? ????} }); // Shut down thread pools to exit. bootstrap.releaseExternalResources(); |
可以明顯的看出,在異步模式下,上面這段代碼沒(méi)有阻塞,在執(zhí)行 connect 操作后直接執(zhí)行到 printTime("異步時(shí)間: "),隨后 connect 完成,Future 的監(jiān)聽(tīng)函數(shù)輸出 connect 操作完成。
非阻塞則是添加監(jiān)聽(tīng)類 ChannelFutureListener,通過(guò)覆蓋 ChannelFutureListener 的 operationComplete 執(zhí)行業(yè)務(wù)邏輯。
清單 4 異步非阻塞方式示例代碼
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | public void addListener(final ChannelFutureListener listener) { ????if (listener == null) { ????throw new NullPointerException("listener"); } ????booleannotifyNow = false; ????synchronized (this) { ????????if (done) { ????????notifyNow = true; ????} else { ????????if (firstListener == null) { ????????//listener 鏈表頭 ????????firstListener = listener; ????} else { ????????if (otherListeners == null) { ????????otherListeners = new ArrayList<ChannelFutureListener>(1); ????????} ????????//添加到 listener 鏈表中,以便操作完成后遍歷操作 ????????otherListeners.add(listener); ????} ????...... ????if (notifyNow) { ????????//通知 listener 進(jìn)行處理 ????????notifyListener(listener); ????????} } |
這部分代碼的邏輯很簡(jiǎn)單,就是注冊(cè)回調(diào)函數(shù),當(dāng)操作完成后自動(dòng)調(diào)用回調(diào)函數(shù),就達(dá)到了異步的效果。
CompletableFuture 類介紹?
Java 8 中, 新增加了一個(gè)包含 50 個(gè)方法左右的類--CompletableFuture,它提供了非常強(qiáng)大的 Future 的擴(kuò)展功能,可以幫助我們簡(jiǎn)化異步編程的復(fù)雜性,并且提供了函數(shù)式編程的能力,可以通過(guò)回調(diào)的方式處理計(jì)算結(jié)果,也提供了轉(zhuǎn)換和組合 CompletableFuture 的方法。
對(duì)于阻塞或者輪詢方式,依然可以通過(guò) CompletableFuture 類的 CompletionStage 和 Future 接口方式支持。
CompletableFuture 類聲明了 CompletionStage 接口,CompletionStage 接口實(shí)際上提供了同步或異步運(yùn)行計(jì)算的舞臺(tái),所以我們可以通過(guò)實(shí)現(xiàn)多個(gè) CompletionStage 命令,并且將這些命令串聯(lián)在一起的方式實(shí)現(xiàn)多個(gè)命令之間的觸發(fā)。
我們可以通過(guò) CompletableFuture.supplyAsync(this::sendMsg); 這么一行代碼創(chuàng)建一個(gè)簡(jiǎn)單的異步計(jì)算。在這行代碼中,supplyAsync 支持異步地執(zhí)行我們指定的方法,這個(gè)例子中的異步執(zhí)行方法是 sendMsg。當(dāng)然,我們也可以使用 Executor 執(zhí)行異步程序,默認(rèn)是 ForkJoinPool.commonPool()。
我們也可以在異步計(jì)算結(jié)束之后指定回調(diào)函數(shù),例如 CompletableFuture.supplyAsync(this::sendMsg) .thenAccept(this::notify);這行代碼中的 thenAccept 被用于增加回調(diào)函數(shù),在我們的示例中 notify 就成了異步計(jì)算的消費(fèi)者,它會(huì)處理計(jì)算結(jié)果。
CompletableFuture 類使用示例
接下來(lái)我們通過(guò) 20 個(gè)示例看看 CompletableFuture 類具體怎么用。
創(chuàng)建完整的 CompletableFuture
清單 5 示例代碼
| 1 2 3 4 5 | static void completedFutureExample() { ????CompletableFuture<String>cf = CompletableFuture.completedFuture("message"); ????assertTrue(cf.isDone()); ????assertEquals("message", cf.getNow(null)); } |
以上代碼一般來(lái)說(shuō)被用于啟動(dòng)異步計(jì)算,getNow(null)返回計(jì)算結(jié)果或者 null。
運(yùn)行簡(jiǎn)單的異步場(chǎng)景
清單 6 示例代碼
| 1 2 3 4 5 6 7 8 9 | static void runAsyncExample() { ????CompletableFuture<Void>cf = CompletableFuture.runAsync(() -> { ????assertTrue(Thread.currentThread().isDaemon()); ????randomSleep(); }); ????assertFalse(cf.isDone()); ????sleepEnough(); ????assertTrue(cf.isDone()); } |
以上代碼的關(guān)鍵點(diǎn)有兩點(diǎn):
同步執(zhí)行動(dòng)作示例
清單 7 示例代碼
| 1 2 3 4 5 6 7 | static void thenApplyExample() { ????CompletableFuture<String>cf = CompletableFuture.completedFuture("message").thenApply(s -> { ????assertFalse(Thread.currentThread().isDaemon()); ????returns.toUpperCase(); ????}); ????assertEquals("MESSAGE", cf.getNow(null)); } |
以上代碼在異步計(jì)算正常完成的前提下將執(zhí)行動(dòng)作(此處為轉(zhuǎn)換成大寫字母)。
異步執(zhí)行動(dòng)作示例?
相較前一個(gè)示例的同步方式,以下代碼實(shí)現(xiàn)了異步方式,僅僅是在上面的代碼里的多個(gè)方法增加"Async"這樣的關(guān)鍵字。
清單 8 示例代碼
| 1 2 3 4 5 6 7 8 9 | static void thenApplyAsyncExample() { ????CompletableFuture<String>cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> { ????assertTrue(Thread.currentThread().isDaemon()); ????randomSleep(); ????returns.toUpperCase(); ????}); ????assertNull(cf.getNow(null)); ????assertEquals("MESSAGE", cf.join()); } |
使用固定的線程池完成異步執(zhí)行動(dòng)作示例?
我們可以通過(guò)使用線程池方式來(lái)管理異步動(dòng)作申請(qǐng),以下代碼基于固定的線程池,也是做一個(gè)大寫字母轉(zhuǎn)換動(dòng)作,代碼如清單 9 所示。
清單 9 示例代碼
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | staticExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactory() { ????int count = 1; ????@Override ????public Thread newThread(Runnable runnable) { ????????return new Thread(runnable, "custom-executor-" + count++); ????} ????}); ????????static void thenApplyAsyncWithExecutorExample() { ????????????CompletableFuture<String>cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> { ????????????assertTrue(Thread.currentThread().getName().startsWith("custom-executor-")); ????????????assertFalse(Thread.currentThread().isDaemon()); ????????????randomSleep(); ????????????returns.toUpperCase(); ????????}, executor); ????????assertNull(cf.getNow(null)); ????????assertEquals("MESSAGE", cf.join()); } |
作為消費(fèi)者消費(fèi)計(jì)算結(jié)果示例?
假設(shè)我們本次計(jì)算只需要前一次的計(jì)算結(jié)果,而不需要返回本次計(jì)算結(jié)果,那就有點(diǎn)類似于生產(chǎn)者(前一次計(jì)算)-消費(fèi)者(本次計(jì)算)模式了,示例代碼如清單 10 所示。
清單 10 示例代碼
| 1 2 3 4 5 6 | static void thenAcceptExample() { ????StringBuilder result = new StringBuilder(); ????CompletableFuture.completedFuture("thenAccept message") ????.thenAccept(s ->result.append(s)); ????assertTrue("Result was empty", result.length() > 0); } |
消費(fèi)者是同步執(zhí)行的,所以不需要在 CompletableFuture 里對(duì)結(jié)果進(jìn)行合并。
異步消費(fèi)示例?
相較于前一個(gè)示例的同步方式,我們也對(duì)應(yīng)有異步方式,代碼如清單 11 所示。
清單 11 示例代碼
| 1 2 3 4 5 6 7 | static void thenAcceptAsyncExample() { ????StringBuilder result = new StringBuilder(); ????CompletableFuture<Void>cf = CompletableFuture.completedFuture("thenAcceptAsync message") ????.thenAcceptAsync(s ->result.append(s)); ????cf.join(); ????assertTrue("Result was empty", result.length() > 0); } |
計(jì)算過(guò)程中的異常示例?
接下來(lái)介紹異步操作過(guò)程中的異常情況處理。下面這個(gè)示例中我們會(huì)在字符轉(zhuǎn)換異步請(qǐng)求中刻意延遲 1 秒鐘,然后才會(huì)提交到 ForkJoinPool 里面去執(zhí)行。
清單 12 示例代碼
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | static void completeExceptionallyExample() { ????????CompletableFuture<String>cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase, ????????CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS)); ????????CompletableFuture<String>exceptionHandler = cf.handle((s, th) -> { return (th != null) ? "message upon cancel" : ""; }); ????????cf.completeExceptionally(new RuntimeException("completed exceptionally")); ????????assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally()); ????try { ????????cf.join(); ????????fail("Should have thrown an exception"); ????????} catch(CompletionException ex) { // just for testing ????????????assertEquals("completed exceptionally", ex.getCause().getMessage()); ????} ?????assertEquals("message upon cancel", exceptionHandler.join()); } |
示例代碼中,首先我們創(chuàng)建一個(gè) CompletableFuture(計(jì)算完畢),然后調(diào)用 thenApplyAsync 返回一個(gè)新的 CompletableFuture,接著通過(guò)使用 delayedExecutor(timeout, timeUnit)方法延遲 1 秒鐘執(zhí)行。然后我們創(chuàng)建一個(gè) handler(exceptionHandler),它會(huì)處理異常,返回另一個(gè)字符串"message upon cancel"。接下來(lái)進(jìn)入 join()方法,執(zhí)行大寫轉(zhuǎn)換操作,并且拋出 CompletionException 異常。
取消計(jì)算任務(wù)
與前面一個(gè)異常處理的示例類似,我們可以通過(guò)調(diào)用 cancel(boolean mayInterruptIfRunning)方法取消計(jì)算任務(wù)。此外,cancel()方法與 completeExceptionally(new CancellationException())等價(jià)。
清單 13 示例代碼
| 1 2 3 4 5 6 7 8 | static void cancelExample() { ????CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase, ????CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS)); ????CompletableFuture cf2 = cf.exceptionally(throwable -> "canceled message"); ????assertTrue("Was not canceled", cf.cancel(true)); ????assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally()); ????assertEquals("canceled message", cf2.join()); } |
一個(gè) CompletableFuture VS 兩個(gè)異步計(jì)算
我們可以創(chuàng)建一個(gè) CompletableFuture 接收兩個(gè)異步計(jì)算的結(jié)果,下面代碼首先創(chuàng)建了一個(gè) String 對(duì)象,接下來(lái)分別創(chuàng)建了兩個(gè) CompletableFuture 對(duì)象 cf1 和 cf2,cf2 通過(guò)調(diào)用 applyToEither 方法實(shí)現(xiàn)我們的需求。
清單 14 示例代碼
| 1 2 3 4 5 6 7 8 9 | static void applyToEitherExample() { ????String original = "Message"; ????CompletableFuture cf1 = CompletableFuture.completedFuture(original) ????.thenApplyAsync(s -> delayedUpperCase(s)); ????CompletableFuture cf2 = cf1.applyToEither( ????CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)), ????s -> s + " from applyToEither"); ????assertTrue(cf2.join().endsWith(" from applyToEither")); } |
如果我們想要使用消費(fèi)者替換清單 14 的方法方式用于處理異步計(jì)算結(jié)果,代碼如清單 15 所示。
清單 15 示例代碼
| 1 2 3 4 5 6 7 8 9 10 | static void acceptEitherExample() { ????String original = "Message"; ????StringBuilder result = new StringBuilder(); ????CompletableFuture cf = CompletableFuture.completedFuture(original) ????.thenApplyAsync(s -> delayedUpperCase(s)) ????.acceptEither(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)), ????s -> result.append(s).append("acceptEither")); ????cf.join(); ????assertTrue("Result was empty", result.toString().endsWith("acceptEither")); } |
運(yùn)行兩個(gè)階段后執(zhí)行
下面這個(gè)示例程序兩個(gè)階段執(zhí)行完畢后返回結(jié)果,首先將字符轉(zhuǎn)為大寫,然后將字符轉(zhuǎn)為小寫,在兩個(gè)計(jì)算階段都結(jié)束之后觸發(fā) CompletableFuture。
清單 16 示例代碼
| 1 2 3 4 5 6 7 8 | static void runAfterBothExample() { ????String original = "Message"; ????StringBuilder result = new StringBuilder(); ????CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).runAfterBoth( ????CompletableFuture.completedFuture(original).thenApply(String::toLowerCase), ????() -> result.append("done")); ????assertTrue("Result was empty", result.length() > 0); } |
也可以通過(guò)以下方式處理異步計(jì)算結(jié)果,
清單 17 示例代碼
| 1 2 3 4 5 6 7 8 | static void thenAcceptBothExample() { ????String original = "Message"; ????StringBuilder result = new StringBuilder(); ????CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).thenAcceptBoth( ????CompletableFuture.completedFuture(original).thenApply(String::toLowerCase), ????(s1, s2) -> result.append(s1 + s2)); ????assertEquals("MESSAGEmessage", result.toString()); } |
整合兩個(gè)計(jì)算結(jié)果
我們可以通過(guò) thenCombine()方法整合兩個(gè)異步計(jì)算的結(jié)果,注意,以下代碼的整個(gè)程序過(guò)程是同步的,getNow()方法最終會(huì)輸出整合后的結(jié)果,也就是說(shuō)大寫字符和小寫字符的串聯(lián)值。
清單 18 示例代碼
| 1 2 3 4 5 6 7 | static void thenCombineExample() { ????String original = "Message"; ????CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s)) ????.thenCombine(CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)), ????(s1, s2) -> s1 + s2); ????assertEquals("MESSAGEmessage", cf.getNow(null)); } |
上面這個(gè)示例是按照同步方式執(zhí)行兩個(gè)方法后再合成字符串,以下代碼采用異步方式同步執(zhí)行兩個(gè)方法,由于異步方式情況下不能夠確定哪一個(gè)方法最終執(zhí)行完畢,所以我們需要調(diào)用 join()方法等待后一個(gè)方法結(jié)束后再合成字符串,這一點(diǎn)和線程的 join()方法是一致的,主線程生成并起動(dòng)了子線程,如果子線程里要進(jìn)行大量的耗時(shí)的運(yùn)算,主線程往往將于子線程之前結(jié)束,但是如果主線程處理完其他的事務(wù)后,需要用到子線程的處理結(jié)果,也就是主線程需要等待子線程執(zhí)行完成之后再結(jié)束,這個(gè)時(shí)候就要用到 join()方法了,即 join()的作用是:"等待該線程終止"。
清單 19 示例代碼
| 1 2 3 4 5 6 7 8 | static void thenCombineAsyncExample() { ????String original = "Message"; ????CompletableFuture cf = CompletableFuture.completedFuture(original) ????.thenApplyAsync(s -> delayedUpperCase(s)) ????.thenCombine(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)), ????assertEquals("MESSAGEmessage", cf.join()); ????(s1, s2) -> s1 + s2); } |
除了 thenCombine()方法以外,還有另外一種方法-thenCompose(),這個(gè)方法也會(huì)實(shí)現(xiàn)兩個(gè)方法執(zhí)行后的返回結(jié)果的連接。
清單 20 示例代碼
| 1 2 3 4 5 6 7 | static void thenComposeExample() { ????String original = "Message"; ????CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s)) ????.thenCompose(upper -> CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)) ????.thenApply(s -> upper + s)); ????assertEquals("MESSAGEmessage", cf.join()); } |
anyOf()方法
以下代碼模擬了如何在幾個(gè)計(jì)算過(guò)程中任意一個(gè)完成后創(chuàng)建 CompletableFuture,在這個(gè)例子中,我們創(chuàng)建了幾個(gè)計(jì)算過(guò)程,然后轉(zhuǎn)換字符串到大寫字符。由于這些 CompletableFuture 是同步執(zhí)行的(下面這個(gè)例子使用的是 thenApply()方法,而不是 thenApplyAsync()方法),使用 anyOf()方法后返回的任何一個(gè)值都會(huì)立即觸發(fā) CompletableFuture。然后我們使用 whenComplete(BiConsumer<? super Object, ? super Throwable> action)方法處理結(jié)果。
清單 21 示例代碼
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | static void anyOfExample() { ????StringBuilder result = new StringBuilder(); ????List messages = Arrays.asList("a", "b", "c"); ????List<CompletableFuture> futures = messages.stream() ????.map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s))) ????.collect(Collectors.toList()); ????CompletableFuture.anyOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((res, th) -> { ????????if(th == null) { ????????assertTrue(isUpperCase((String) res)); ????????result.append(res); ????} }); ????assertTrue("Result was empty", result.length() > 0); } |
當(dāng)所有的 CompletableFuture 完成后創(chuàng)建 CompletableFuture
清單 22 所示我們會(huì)以同步方式執(zhí)行多個(gè)異步計(jì)算過(guò)程,在所有計(jì)算過(guò)程都完成后,創(chuàng)建一個(gè) CompletableFuture。
清單 22 示例代碼
| 1 2 3 4 5 6 7 8 9 10 11 12 | static void allOfExample() { ????StringBuilder result = new StringBuilder(); ????List messages = Arrays.asList("a", "b", "c"); ????List<CompletableFuture> futures = messages.stream() ????.map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s))) ????.collect(Collectors.toList()); ????CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((v, th) -> { ????????futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null)))); ????????result.append("done"); }); ????assertTrue("Result was empty", result.length() > 0); } |
相較于前一個(gè)同步示例,我們也可以異步執(zhí)行,如清單 23 所示。
清單 23 示例代碼
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | static void allOfAsyncExample() { ????StringBuilder result = new StringBuilder(); ????List messages = Arrays.asList("a", "b", "c"); ????List<CompletableFuture> futures = messages.stream() ????.map(msg -> CompletableFuture.completedFuture(msg).thenApplyAsync(s -> delayedUpperCase(s))) ????.collect(Collectors.toList()); ????CompletableFuture allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])) .whenComplete((v, th) -> { ????futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null)))); ????result.append("done"); }); ????allOf.join(); ????assertTrue("Result was empty", result.length() > 0); } |
實(shí)際案例
以下代碼完成的操作包括:
清單 24 示例代碼
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | cars().thenCompose(cars -> { ????List<CompletionStage> updatedCars = cars.stream() ????.map(car -> rating(car.manufacturerId).thenApply(r -> { ????car.setRating(r); ????return car; ?????})).collect(Collectors.toList()); ????CompletableFuture done = CompletableFuture ????.allOf(updatedCars.toArray(new CompletableFuture[updatedCars.size()])); ????return done.thenApply(v -> updatedCars.stream().map(CompletionStage::toCompletableFuture) ????.map(CompletableFuture::join).collect(Collectors.toList())); ????}).whenComplete((cars, th) -> { ????if (th == null) { ????cars.forEach(System.out::println); ????} else { ????throw new RuntimeException(th); ????} }).toCompletableFuture().join(); |
結(jié)束語(yǔ)
Completable 類為我們提供了豐富的異步計(jì)算調(diào)用方式,我們可以通過(guò)上述基本操作描述及 20 個(gè)示例程序進(jìn)一步了解如果使用 CompletableFuture 類實(shí)現(xiàn)我們的需求,期待 JDK10 會(huì)有持續(xù)更新。
參考資源
參考 developerWorks 上的 Java 8 文章,了解更多 Java 8 知識(shí)。
參考書籍?Java 8 in Action?Raoul-Gabriel Urma
參考書籍?Mastering Lambdas: Java Programming in a Multicore World?Maurice Naftalin
參考文章?Java 8 CompletableFutures,這篇文章從基礎(chǔ)介紹了 CompletableFuture 類的使用方式。
?
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來(lái)咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)總結(jié)
以上是生活随笔為你收集整理的通过实例理解 JDK8 的 CompletableFuture的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 如何配置帧中继?
- 下一篇: 获取Spring的Application