Java 8 CompletableFuture
原文:Java 8 CompletableFutures Part I
- 作者:Bill Bejeck
- 譯者:noONE
譯者前言
JDK1.5就增加了Future接口,但是接口使用不是很能滿足異步開發(fā)的需求,使用起來不是那么友好。所以出現(xiàn)了很多第三方封裝的Future,Guava中就提供了一個更好的?ListenableFuture 類,Netty中則提供了一個自己的Future。所以,Java8中的CompletableFuture可以說是解決Future了一些痛點,可以優(yōu)雅得進行組合式異步編程,同時也更加契合函數(shù)式編程。
Java8已經(jīng)發(fā)布了很長一段時間,其中新增了一個很棒的并發(fā)控制工具,就是CompletableFuture類。CompletableFuture實現(xiàn)了Future接口,并且它可以顯式地設(shè)定值,更有意思的是我們可以進行鏈式處理,并且支持依賴行為,這些行為由CompletableFuture完成所觸發(fā)。CompletableFuture類似于Guava中的?ListenableFuture 類。它們兩個提供了類似的功能,本文不會再對它們進行對比。我已經(jīng)在之前的文章中介紹過ListenableFutrue。雖然對于ListenableFutrue的介紹有點過時,但是絕大數(shù)的知識仍然適用。CompletableFuture的文檔已經(jīng)非常全面了,但是缺少如何使用它們的具體示例 。本文意在通過單元測試中的一系列的簡單示例來展示如何使用CompletableFuture。最初我想在一篇文章中介紹完CompleteableFuture,但是信息太多了,分成三部分似乎更好一些:
CompletableFuture 入門
在開始使用CompletableFuture之前, 我們需要了解一些背景知識。CompletableFuture實現(xiàn)了?CompletionStage 接口。javadoc中簡明地介紹了CompletionStage :
一個可能的異步計算的階段,當另外一個CompletionStage?完成時,它會執(zhí)行一個操作或者計算一個值。一個階段的完成取決于它本身結(jié)算的結(jié)果,同時也可能反過來觸發(fā)其他依賴階段。
CompletionStage 的全部文檔的內(nèi)容很多,所以,我們在這里總結(jié)幾個關(guān)鍵點:
計算可以由?Future?,Consumer?或者?Runnable 接口中的 apply,accept?或者?run等方法表示。
計算的執(zhí)行主要有以下
a. 默認執(zhí)行(可能調(diào)用線程)
b. 使用默認的CompletionStage的異步執(zhí)行提供者異步執(zhí)行。這些方法名使用someActionAsync這種格式表示。
c. 使用?Executor 提供者異步執(zhí)行。這些方法同樣也是someActionAsync這種格式,但是會增加一個Executor參數(shù)。
接下來,我會在本文中直接引用CompletableFuture?和?CompletionStage 。
創(chuàng)建一個CompleteableFuture
創(chuàng)建一個CompleteableFuture很簡單,但是不是很清晰。最簡單的方法就是使用CompleteableFuture.completedFuture方法,該方法返回一個新的且完結(jié)的CompleteableFuture:
public void test_completed_future() throws Exception {String expectedValue = "the expected value";CompletableFuture<String> alreadyCompleted = CompletableFuture.completedFuture(expectedValue);assertThat(alreadyCompleted.get(), is(expectedValue)); } 復制代碼這樣看起來有點乏味,稍后,我們就會看到如何創(chuàng)建一個已經(jīng)完成的CompleteableFuture 會派上用場。
現(xiàn)在,讓我們看一下如何創(chuàng)建一個表示異步任務的CompleteableFuture :
private static ExecutorService service = Executors.newCachedThreadPool(); public void test_run_async() throws Exception {CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> System.out.println("running async task"), service);//utility testing methodpauseSeconds(1);assertThat(runAsync.isDone(), is(true)); } public void test_supply_async() throws Exception {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(simulatedTask(1, "Final Result"), service);assertThat(completableFuture.get(), is("Final Result")); } 復制代碼在第一個方法中,我們看到了runAsync任務,在第二個方法中,則是supplyAsync的示例。這可能是顯而易見的,然而使用runAsync還是使用supplyAsync,這取決于任務是否有返回值。在這兩個例子中,我們都提供了一個自定義的Executor,它作為一個異步執(zhí)行提供者。當使用supplyAsync方法時,我個人認為使用?Callable 而不是一個Supplier似乎更自然一些。因為它們都是函數(shù)式接口,Callable與異步任務的關(guān)系更緊密一些,并且它還可以拋出受檢異常,而Supplier則不會(盡管我們可以通過少量的代碼讓Supplier拋出受檢異常)。
增加監(jiān)聽器
現(xiàn)在,我們可以創(chuàng)建CompleteableFuture 對象去運行異步任務,讓我們開始學習如何去“監(jiān)聽”任務的完成,并且執(zhí)行隨后的一些動作。這里重點提一下,當增加對?CompletionStage 對象的追隨時,之前的任務需要徹底成功,后續(xù)的任務和階段才能運行。本文會介紹介紹一些處理失敗任務的方法,而在CompleteableFuture中鏈式處理錯誤的方案會在后續(xù)的文章中介紹。
public void test_then_run_async() throws Exception {Map<String,String> cache = new HashMap<>();cache.put("key","value");CompletableFuture<String> taskUsingCache = CompletableFuture.supplyAsync(simulatedTask(1,cache.get("key")),service);CompletableFuture<Void> cleanUp = taskUsingCache.thenRunAsync(cache::clear,service);cleanUp.get();String theValue = taskUsingCache.get();assertThat(cache.isEmpty(),is(true));assertThat(theValue,is("value")); } 復制代碼這個例子主要展示在第一個CompletableFuture成功結(jié)束后,運行一個清理的任務。 在之前的例子中,當最初的任務成功結(jié)束后,我們使用Runnable任務執(zhí)行。我們也可以定義一個后續(xù)任務,它可以直接獲取之前任務的成功結(jié)果。
public void test_accept_result() throws Exception {CompletableFuture<String> task = CompletableFuture.supplyAsync(simulatedTask(1, "add when done"), service);CompletableFuture<Void> acceptingTask = task.thenAccept(results::add);pauseSeconds(2);assertThat(acceptingTask.isDone(), is(true));assertThat(results.size(), is(1));assertThat(results.contains("add when done"), is(true)); } 復制代碼這是一個使用Accept 方法的例子,該方法會獲取CompletableFuture的結(jié)果,然后將結(jié)果傳給一個?Consumer 對象。在Java 8中,?Consumer 實例是沒有返回值的 ,如果想得到運行的副作用,需要把結(jié)果放到一個列表中。
組合與構(gòu)成任務
除了增加監(jiān)聽器去運行后續(xù)任務或者接受CompletableFuture的成功結(jié)果,我們還可以組合或者構(gòu)成任務。
構(gòu)成任務
構(gòu)成意味著獲取一個成功的CompletableFuture結(jié)果作為輸入,通過 一個Function 返回另外一個 CompletableFuture。下面是一個使用CompletableFuture.thenComposeAsync的例子:
public void test_then_compose() throws Exception {Function<Integer,Supplier<List<Integer>>> getFirstTenMultiples = num ->()->Stream.iterate(num, i -> i + num).limit(10).collect(Collectors.toList());Supplier<List<Integer>> multiplesSupplier = getFirstTenMultiples.apply(13);//Original CompletionStageCompletableFuture<List<Integer>> getMultiples = CompletableFuture.supplyAsync(multiplesSupplier, service);//Function that takes input from orignal CompletionStageFunction<List<Integer>, CompletableFuture<Integer>> sumNumbers = multiples ->CompletableFuture.supplyAsync(() -> multiples.stream().mapToInt(Integer::intValue).sum());//The final CompletableFuture composed of previous two.CompletableFuture<Integer> summedMultiples = getMultiples.thenComposeAsync(sumNumbers, service);assertThat(summedMultiples.get(), is(715)); } 復制代碼在這個列子中,第一個CompletionStage提供了一個列表,該列表包含10個數(shù)字,每個數(shù)字都乘以13。這個提供的Function獲取這些結(jié)果,并且創(chuàng)建另外一個CompletionStage,它將對列表中的數(shù)字求和。
組合任務
組合任務的完成是通過獲取兩個成功的CompletionStages,并且從中獲取BiFunction類型的參數(shù),進而產(chǎn)出另外的結(jié)果。以下是一個非常簡單的例子用來說明從組合的CompletionStages中獲取結(jié)果。
public void test_then_combine_async() throws Exception {CompletableFuture<String> firstTask = CompletableFuture.supplyAsync(simulatedTask(3, "combine all"), service);CompletableFuture<String> secondTask = CompletableFuture.supplyAsync(simulatedTask(2, "task results"), service);CompletableFuture<String> combined = firstTask.thenCombineAsync(secondTask, (f, s) -> f + " " + s, service);assertThat(combined.get(), is("combine all task results")); } 復制代碼這個例子展示了如何組合兩個異步任務的CompletionStage,然而,我們也可以組合已經(jīng)完成的CompletableFuture的異步任務。 組合一個已知的需要計算的值,也是一種很好的處理方式:
public void test_then_combine_with_one_supplied_value() throws Exception {CompletableFuture<String> asyncComputedValue = CompletableFuture.supplyAsync(simulatedTask(2, "calculated value"), service);CompletableFuture<String> knowValueToCombine = CompletableFuture.completedFuture("known value");BinaryOperator<String> calcResults = (f, s) -> "taking a " + f + " then adding a " + s;CompletableFuture<String> combined = asyncComputedValue.thenCombine(knowValueToCombine, calcResults);assertThat(combined.get(), is("taking a calculated value then adding a known value")); } 復制代碼最后,是一個使用CompletableFuture.runAfterbothAsync的例子
public void test_run_after_both() throws Exception {CompletableFuture<Void> run1 = CompletableFuture.runAsync(() -> {pauseSeconds(2);results.add("first task");}, service);CompletableFuture<Void> run2 = CompletableFuture.runAsync(() -> {pauseSeconds(3);results.add("second task");}, service);CompletableFuture<Void> finisher = run1.runAfterBothAsync(run2,() -> results. add(results.get(0)+ "&"+results.get(1)),service);pauseSeconds(4);assertThat(finisher.isDone(),is(true));assertThat(results.get(2),is("first task&second task")); } 復制代碼監(jiān)聽第一個結(jié)束的任務
在之前所有的例子中,所有的結(jié)果需要等待所有的CompletionStage結(jié)束,然而,需求并不總是這樣的。我們可能需要獲取第一個完成的任務的結(jié)果。下面的例子展示使用Consumer接受第一個完成的結(jié)果:
public void test_accept_either_async_nested_finishes_first() throws Exception {CompletableFuture<String> callingCompletable = CompletableFuture.supplyAsync(simulatedTask(2, "calling"), service);CompletableFuture<String> nestedCompletable = CompletableFuture.supplyAsync(simulatedTask(1, "nested"), service);CompletableFuture<Void> collector = callingCompletable.acceptEither(nestedCompletable, results::add);pauseSeconds(2);assertThat(collector.isDone(), is(true));assertThat(results.size(), is(1));assertThat(results.contains("nested"), is(true)); } 復制代碼類似功能的CompletableFuture.runAfterEither
public void test_run_after_either() throws Exception {CompletableFuture<Void> run1 = CompletableFuture.runAsync(() -> {pauseSeconds(2);results.add("should be first");}, service);CompletableFuture<Void> run2 = CompletableFuture.runAsync(() -> {pauseSeconds(3);results.add("should be second");}, service);CompletableFuture<Void> finisher = run1.runAfterEitherAsync(run2,() -> results.add(results.get(0).toUpperCase()),service);pauseSeconds(4);assertThat(finisher.isDone(),is(true));assertThat(results.get(1),is("SHOULD BE FIRST"));} 復制代碼多重組合
到目前為止,所有的組合/構(gòu)成的例子都只有兩個CompletableFuture對象。這里是有意為之,為了讓例子盡量的簡單明了。我們可以組合任意數(shù)量的CompletionStage。請注意,下面例子僅僅是為了說明而已!
public void test_several_stage_combinations() throws Exception {Function<String,CompletableFuture<String>> upperCaseFunction = s -> CompletableFuture.completedFuture(s.toUpperCase());CompletableFuture<String> stage1 = CompletableFuture.completedFuture("the quick ");CompletableFuture<String> stage2 = CompletableFuture.completedFuture("brown fox ");CompletableFuture<String> stage3 = stage1.thenCombine(stage2,(s1,s2) -> s1+s2);CompletableFuture<String> stage4 = stage3.thenCompose(upperCaseFunction);CompletableFuture<String> stage5 = CompletableFuture.supplyAsync(simulatedTask(2,"jumped over"));CompletableFuture<String> stage6 = stage4.thenCombineAsync(stage5,(s1,s2)-> s1+s2,service);CompletableFuture<String> stage6_sub_1_slow = CompletableFuture.supplyAsync(simulatedTask(4,"fell into"));CompletableFuture<String> stage7 = stage6.applyToEitherAsync(stage6_sub_1_slow,String::toUpperCase,service);CompletableFuture<String> stage8 = CompletableFuture.supplyAsync(simulatedTask(3," the lazy dog"),service);CompletableFuture<String> finalStage = stage7.thenCombineAsync(stage8,(s1,s2)-> s1+s2,service);assertThat(finalStage.get(),is("THE QUICK BROWN FOX JUMPED OVER the lazy dog")); } 復制代碼需要注意的是,組合CompletionStage的時候并不保證順序。在這些單元測試中,提供了一個時間去模擬任務以確保完成順序。
小結(jié)
本文主要是使用CompletableFuture類的第一部分。在后續(xù)文章中,將主要介紹錯誤處理及恢復,強制完成或取消。
資源
- CompletableFuture
- CompletionStage
- Source Code
關(guān)注我:
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅持創(chuàng)作打卡瓜分現(xiàn)金大獎總結(jié)
以上是生活随笔為你收集整理的Java 8 CompletableFuture的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 35. 通过实现一个序列加密的功能,熟悉
- 下一篇: Java微信订单查询