日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

CompletableFuture 实现异步计算

發(fā)布時間:2025/3/16 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 CompletableFuture 实现异步计算 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

在Markdown的語法中,<u>下劃線</u>中的文字會被解析器加上下劃線,為了不影響閱讀,本文中JDK文檔涉及到<U>都會替換為<N>,請各位注意。


概述

Java 1.8 新增加的 CompletableFuture 類內(nèi)部是使用 ForkJoinPool 來實現(xiàn)的,CompletableFuture 實現(xiàn)了 Future接口 和 CompletionStage接口。

在之前的Future的介紹和基本用法一文中,我們了解到 Future 表示異步計算的結(jié)果。

CompletionStage 代表了一個特定的計算的階段,可以同步或者異步的被完成。

CompletionStage 可以被看作一個計算流水線上的一個單元,最終會產(chǎn)生一個最終結(jié)果,這意味著幾個 CompletionStage 可以串聯(lián)起來,一個完成的階段可以觸發(fā)下一階段的執(zhí)行,接著觸發(fā)下一次,直到完成所有階段。

Future的弊端

Future是Java 5添加的類,用來描述一個異步計算的結(jié)果。你可以使用isDone方法檢查計算是否完成,或者使用get阻塞住調(diào)用線程,直到計算完成返回結(jié)果,你也可以使用cancel方法停止任務的執(zhí)行。

雖然Future以及相關(guān)使用方法提供了異步執(zhí)行任務的能力,但是對于結(jié)果的獲取卻是很不方便,只能通過阻塞或者輪詢的方式得到任務的結(jié)果。阻塞的方式顯然和我們的異步編程的初衷相違背,輪詢的方式又會耗費無謂的CPU資源,而且也不能及時地得到計算結(jié)果。

package net.ijiangtao.tech.concurrent.jsd.future.completable;import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future;/*** java5 future** @author ijiangtao* @create 2019-07-22 9:40**/ public class Java5Future {public static void main(String[] args) throws ExecutionException, InterruptedException {//通過 while 循環(huán)等待異步計算處理成功ExecutorService pool = Executors.newFixedThreadPool(10);Future<Integer> f1 = pool.submit(() -> {// 長時間的異步計算 ……Thread.sleep(1);// 然后返回結(jié)果return 1001;});while (!f1.isDone())System.out.println("is not done yet");;System.out.println("while isDone,result=" + f1.get());//通過阻塞的方式等待異步處理成功Future<Integer> f2 = pool.submit(() -> {// 長時間的異步計算 ……Thread.sleep(1);// 然后返回結(jié)果return 1002;});System.out.println("after blocking,result=" + f2.get());}} 復制代碼

CompletableFuture的方法

前面我們提到 CompletableFuture 為我們提供了異步計算的實現(xiàn),而這些實現(xiàn)都是通過它的方法實現(xiàn)的。

如果你打開它的文檔CompletableFuture-Java8Docs,你會發(fā)現(xiàn)CompletableFuture提供了將近60個方法。雖然方法很多,如果你仔細看的話,你會這些方法其中很多都是有相似性的。

只要你熟練掌握了這些方法,就能夠得心應手地使用 CompletableFuture 來進行異步計算了。

Java的CompletableFuture類總是遵循這樣的原則:

  • 方法名不以Async結(jié)尾的方法由原來的線程計算
  • 方法名以Async結(jié)尾且參數(shù)中沒有Executor的方法由默認的線程池ForkJoinPool.commonPool()計算
  • 方法名以Async結(jié)尾且參數(shù)中有Executor的方法由指定的線程池Executor計算

下面不再一一贅述了。

supplyAsync ... 異步計算結(jié)果

返回值方法名及參數(shù)方法說明
static CompletableFuturesupplyAsync(Supplier supplier)Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool.commonPool() with the value obtained by calling the given Supplier.
static CompletableFuturesupplyAsync(Supplier supplier, Executor executor)Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor with the value obtained by calling the given Supplier.

supplyAsync方法以Supplier函數(shù)式接口類型為參數(shù),CompletableFuture的計算結(jié)果類型為U。因為該方法的參數(shù)類型都是函數(shù)式接口,所以可以使用lambda表達式實現(xiàn)異步任務。后面講解其他方法的時候,會舉例子。

runAsync方法也好理解,它以Runnable函數(shù)式接口類型為參數(shù),所以CompletableFuture的計算結(jié)果也為空(Runnable的run方法返回值為空)。這里就不再一一介紹了,感興趣的小伙伴可以查看API文檔。

get ... 阻塞結(jié)果

返回值方法名及參數(shù)方法說明
Tget()Waits if necessary for this future to complete, and then returns its result.
Tget(long timeout, TimeUnit unit)Waits if necessary for at most the given time for this future to complete, and then returns its result, if available.
TgetNow(T valueIfAbsent)Returns the result value (or throws any encountered exception) if completed, else returns the given valueIfAbsent.
Tjoin()Returns the result value when complete, or throws an (unchecked) exception if completed exceptionally.

你可以像使用Future一樣使用CompletableFuture,來進行阻塞式的計算(雖然不推薦使用)。其中g(shù)etNow方法比較特殊:結(jié)果已經(jīng)計算完則返回結(jié)果或者拋出異常,否則返回給定的valueIfAbsent值。

join和get方法都可以阻塞到計算完成然后獲得返回結(jié)果,但兩者的處理流程有所不同,可以參考下面一段代碼來比較兩者處理異常的不同之處:

public static void main(String[] args) {try {new CompletableFutureDemo().test2();new CompletableFutureDemo().test3();} catch (Exception e) {e.printStackTrace();}}public void test2() throws Exception {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {int i = 1 / 0;return 100;});future.join();}public void test3() throws Exception {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {int i = 1 / 0;return 100;});future.get();} 復制代碼

輸出如下:

java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zeroat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) Caused by: java.lang.ArithmeticException: / by zeroat net.ijiangtao.tech.concurrent.jsd.future.completable.CompletableFutureDemo.lambda$test2$0(CompletableFutureDemo.java:32)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)... 5 more 復制代碼

thenApply ... 轉(zhuǎn)換結(jié)果

返回值方法名及參數(shù)方法說明
CompletableFuturethenApply(Function<? super T,? extends U> fn)Returns a new CompletionStage that, when this stage completes normally, is executed with this stage's result as the argument to the supplied function.
CompletableFuturethenApplyAsync(Function<? super T,? extends U> fn)Returns a new CompletionStage that, when this stage completes normally, is executed using this stage's default asynchronous execution facility, with this stage's result as the argument to the supplied function.
CompletableFuturethenApplyAsync(Function<? super T,? extends U> fn, Executor executor)Returns a new CompletionStage that, when this stage completes normally, is executed using the supplied Executor, with this stage's result as the argument to the supplied function.

使用CompletableFuture,我們不必因為等待一個計算完成而阻塞著調(diào)用線程,而是告訴CompletableFuture當計算完成的時候請執(zhí)行某個function。而且我們還可以將這些操作串聯(lián)起來,或者將CompletableFuture組合起來。

這一組函數(shù)的功能是當原來的CompletableFuture計算完后,將結(jié)果傳遞給函數(shù)fn,將fn的結(jié)果作為新的CompletableFuture計算結(jié)果。因此它的功能相當于將CompletableFuture 轉(zhuǎn)換成 CompletableFuture 。

請看下面的例子:

public static void main(String[] args) throws Exception {try {// new CompletableFutureDemo().test1();} catch (Exception e) {e.printStackTrace();}try {//new CompletableFutureDemo().test2();//new CompletableFutureDemo().test3();} catch (Exception e) {e.printStackTrace();}new CompletableFutureDemo().test4();}public void test4() throws Exception {// Create a CompletableFutureCompletableFuture<Integer> calculateFuture = CompletableFuture.supplyAsync(() -> {System.out.println("1");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new IllegalStateException(e);}System.out.println("2");return 1 + 2;});// Attach a callback to the Future using thenApply()CompletableFuture<String> resultFuture = calculateFuture.thenApply(number -> {System.out.println("3");return "1 + 2 is " + number;});// Block and get the result of the future.System.out.println(resultFuture.get());} 復制代碼

輸出結(jié)果為:

1 2 3 1 + 2 is 3 復制代碼

thenAccept ... 純消費結(jié)果

返回值方法名及參數(shù)方法說明
CompletableFuturethenAccept(Consumer<? super T> action)Returns a new CompletionStage that, when this stage completes normally, is executed with this stage's result as the argument to the supplied action.
CompletableFuturethenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)Returns a new CompletionStage that, when this and the other given stage both complete normally, is executed with the two results as arguments to the supplied action.

篇幅原因,Async和Executor方法不再列舉。

只對結(jié)果執(zhí)行Action,而不返回新的計算值,因此計算值為Void。這就好像生產(chǎn)者生產(chǎn)了消息,消費者消費消息以后不再進行消息的生產(chǎn)一樣,因此thenAccept是對計算結(jié)果的純消費。

例如如下方法:

public void test5() throws Exception {// thenAccept() exampleCompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {return "ijiangtao";}).thenAccept(name -> {System.out.println("Hi, " + name);});System.out.println(future.get()); } 復制代碼

thenAccept的get返回為null:

Hi, ijiangtao null 復制代碼

thenAcceptBoth可以消費兩者(生產(chǎn)和消費)的結(jié)果,下面提供一個例子:

public void test6() throws Exception {CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "hello";}).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "ijiangtao";}), (s1, s2) -> {System.out.println(s1 + " " + s2);});while (true){} } 復制代碼

輸出如下:

hello ijiangtao 復制代碼

thenCombine ... 消費結(jié)果并返回

返回值方法名及參數(shù)方法說明
<U,V> CompletableFuturethenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)Returns a new CompletionStage that, when this and the other given stage both complete normally, is executed with the two results as arguments to the supplied function.
<U,V> CompletableFuturethenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)Returns a new CompletionStage that, when this and the other given stage complete normally, is executed using this stage's default asynchronous execution facility, with the two results as arguments to the supplied function.

從功能上來講, thenCombine的功能更類似thenAcceptBoth,只不過thenAcceptBoth是純消費,它的函數(shù)參數(shù)沒有返回值,而thenCombine的函數(shù)參數(shù)fn有返回值。

public void test7() throws Exception {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {return 1+2;});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {return "1+2 is";});CompletableFuture<String> f = future1.thenCombine(future2, (x, y) -> y + " " + x);System.out.println(f.get()); // 輸出:1+2 is 3 } 復制代碼

thenCompose ... 非嵌套整合

返回值方法名及參數(shù)方法說明
CompletableFuturethenCompose(Function<? super T,? extends CompletionStage> fn)Returns a new CompletionStage that, when this stage completes normally, is executed with this stage as the argument to the supplied function.

由于篇幅原因,Async和Executor方法不再列舉。

thenCompose方法接受一個Function作為參數(shù),這個Function的輸入是當前的CompletableFuture的計算值,返回結(jié)果將是一個新的CompletableFuture。

假如你需要將兩個CompletableFutures相互整合,如果使用thenApply,則結(jié)果會是嵌套的CompletableFuture:

CompletableFuture<String> getUsersName(Long id) {return CompletableFuture.supplyAsync(() -> {return "ijiangtao";}); }CompletableFuture<Integer> getUserAge(String userName) {return CompletableFuture.supplyAsync(() -> {return 20;}); }public void test8(Long id) throws Exception {CompletableFuture<CompletableFuture<Integer>> result1 = getUsersName(id).thenApply(userName -> getUserAge(userName)); } 復制代碼

這時候可以使用thenCompose來獲得第二個計算的CompletableFuture:

public void test9(Long id) throws Exception {CompletableFuture<Integer> result2 = getUsersName(id).thenCompose(userName -> getUserAge(userName)); } 復制代碼

whenComplete ... 完成以后

返回值方法名及參數(shù)方法說明
CompletableFuturewhenComplete(BiConsumer<? super T,? super Throwable> action)Returns a new CompletionStage with the same result or exception as this stage, that executes the given action when this stage completes.
CompletableFuturewhenCompleteAsync(BiConsumer<? super T,? super Throwable> action)Returns a new CompletionStage with the same result or exception as this stage, that executes the given action using this stage's default asynchronous execution facility when this stage completes.
CompletableFuturewhenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)Returns a new CompletionStage with the same result or exception as this stage, that executes the given action using the supplied Executor when this stage completes.

當CompletableFuture的計算結(jié)果完成,或者拋出異常的時候,我們可以執(zhí)行特定的Action。whenComplete的參數(shù)Action的類型是BiConsumer<? super T,? super Throwable>,它可以處理正常的計算結(jié)果,或者異常情況。注意這幾個方法都會返回CompletableFuture,當Action執(zhí)行完畢后它的結(jié)果返回原始的CompletableFuture的計算結(jié)果或者返回異常。

whenComplete方法不以Async結(jié)尾,意味著Action使用相同的線程執(zhí)行,而以Async結(jié)尾的方法可能會使用其它的線程去執(zhí)行(如果使用相同的線程池,也可能會被同一個線程選中執(zhí)行)。

下面演示一下異常情況:

public void test10() throws Exception {String result = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}if (1 == 1) {throw new RuntimeException("an RuntimeException");}return "s1";}).whenComplete((s, t) -> {System.out.println("whenComplete s:"+s);System.out.println("whenComplete exception:"+t.getMessage());}).exceptionally(e -> {System.out.println("exceptionally exception:"+e.getMessage());return "hello ijiangtao";}).join();System.out.println(result); } 復制代碼

輸出:

whenComplete s:null whenComplete exception:java.lang.RuntimeException: an RuntimeException exceptionally exception:java.lang.RuntimeException: an RuntimeException hello ijiangtao 復制代碼

總結(jié)

Java5新增的Future類,可以實現(xiàn)阻塞式的異步計算,但這種阻塞的方式顯然和我們的異步編程的初衷相違背。為了解決這個問題,JDK吸收了Guava的設計思想,加入了Future的諸多擴展功能形成了CompletableFuture。

本文重點介紹了CompletableFuture的不同類型的API,掌握了這些API對于使用非阻塞的函數(shù)式異步編程進行日常開發(fā)非常有幫助,同時也為下面深入了解異步編程的各種原理和特性打下了良好的基礎。

相關(guān)資源

CompletableFuture - javase 8 docs

CompletableFuture - Guide To CompletableFuture

CompletableFuture - Java CompletableFuture Tutorial with Examples

CompletableFuture - Java 8: Writing asynchronous code with CompletableFuture

《寫給大忙人的JavaSE9核心技術(shù)》- 10.2 異步計算

CompletableFuture - 通過實例理解 JDK8 的 CompletableFuture

CompletableFuture - CompletableFuture 詳解

CompletableFuture - 使用 Java 8 的 CompletableFuture 實現(xiàn)函數(shù)式的回調(diào)

CompletableFuture - Java CompletableFuture 詳解

CompletableFuture - [譯]20個使用 Java CompletableFuture的例子


轉(zhuǎn)載于:https://juejin.im/post/5d35d6ebe51d45109b01b270

總結(jié)

以上是生活随笔為你收集整理的CompletableFuture 实现异步计算的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。