Java中如何使用非阻塞异步编程——CompletableFuture
分享一波:程序員賺外快-必看的巔峰干貨
對于Node開發(fā)者來說,非阻塞異步編程是他們引以為傲的地方。而在JDK8中,也引入了非阻塞異步編程的概念。所謂非阻塞異步編程,就是一種不需要等待返回結(jié)果的多線程的回調(diào)方法的封裝。使用非阻塞異步編程,可以很大程度上解決高并發(fā)場景的各種問題,提高程序的運行效率。
為什么要使用非阻塞異步編程
在jdk8之前,我們使用java的多線程編程,一般是通過Runnable中的run方法進行的。這種方法有個明顯的缺點:沒有返回值。這時候,大家會使用Callable+Future的方式去實現(xiàn),代碼如下。
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future stringFuture = executor.submit(new Callable() {
@Override
public String call() throws Exception {
Thread.sleep(2000);
return “async thread”;
}
});
Thread.sleep(1000);
System.out.println(“main thread”);
System.out.println(stringFuture.get());
}
這無疑是對高并發(fā)訪問的一種緩沖方法。這種方式有一個致命的缺點就是阻塞式調(diào)用,當(dāng)調(diào)用了get方法之后,會有大量的時間耗費在等待返回值之中。
不管怎么看,這種做法貌似都不太妥當(dāng),至少在代碼美觀性上就看起來很蛋疼。而且某些場景無法使用,比如:
多個異步線程執(zhí)行時間可能不一致,我們的主線程不能一直等著。 兩個異步任務(wù)之間相互獨立,但是第二個依賴第一個的執(zhí)行結(jié)果在這種場景下,CompletableFuture的優(yōu)勢就展現(xiàn)出來了 。同時,CompletableFuture的封裝中使用了函數(shù)式編程,這讓我們的代碼顯得更加簡潔、優(yōu)雅。
不了解函數(shù)式編程的朋友,可以參考我之前的博客。JDK8新特性
CompletableFuture使用詳解
runAsync和supplyAsync方法
CompletableFuture提供了四個靜態(tài)方法來創(chuàng)建一個異步操作。
public static CompletableFuture runAsync(Runnable runnable)
public static CompletableFuture runAsync(Runnable runnable, Executor executor)
public static CompletableFuture supplyAsync(Supplier supplier)
public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
沒有指定Executor的方法會使用ForkJoinPool.commonPool() 作為它的線程池執(zhí)行異步代碼。如果指定線程池,則使用指定的線程池運行。以下所有的方法都類同。
runAsync方法不支持返回值。 supplyAsync可以支持返回值。代碼示例
/*** 無返回值** @throws Exception*/ @Test public void testRunAsync() throws Exception {CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (Exception ignored) {}System.out.println("run end ...");});future.get(); }/*** 有返回值** @throws Exception*/ @Test public void testSupplyAsync() throws Exception {CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {System.out.println("run end...");return System.currentTimeMillis();});Long time = future.get();System.out.println(time); }計算結(jié)果完成時的回調(diào)方法
當(dāng)CompletableFuture的計算結(jié)果完成,或者拋出異常的時候,可以執(zhí)行特定的操作。
public CompletableFuture whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture exceptionally(Function<Throwable,? extends T> fn)
這里需要說的一點是,whenComplete和whenCompleteAsync的區(qū)別。
whenComplete:使用執(zhí)行當(dāng)前任務(wù)的線程繼續(xù)執(zhí)行whenComplete的任務(wù)。 whenCompleteAsync:使用新的線程執(zhí)行任務(wù)。 exceptionally:執(zhí)行出現(xiàn)異常時,走這個方法。代碼示例
/*** 當(dāng)CompletableFuture的計算結(jié)果完成,或者拋出異常的時候,可以執(zhí)行特定的Action。* whenComplete:是執(zhí)行當(dāng)前任務(wù)的線程執(zhí)行繼續(xù)執(zhí)行 whenComplete 的任務(wù)。* whenCompleteAsync:是執(zhí)行把 whenCompleteAsync 這個任務(wù)繼續(xù)提交給線程池來進行執(zhí)行。* exceptionally:執(zhí)行出現(xiàn)異常時,走這個方法** @throws Exception*/ @Test public void testWhenComplete() throws Exception {CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("運行結(jié)束");}).whenComplete((t, action) -> {System.out.println("執(zhí)行完成");}).exceptionally(t -> {System.out.println("出現(xiàn)異常:" + t.getMessage());return null;});TimeUnit.SECONDS.sleep(2); }thenApply
當(dāng)一個線程依賴另一個線程時,可以使用thenApply方法把這兩個線程串行化,第二個任務(wù)依賴第一個任務(wù)的返回值。
代碼示例
/*** 當(dāng)一個線程依賴另一個線程時,可以使用 thenApply 方法來把這兩個線程串行化。* 第二個任務(wù)依賴第一個任務(wù)的結(jié)果** @throws Exception*/ @Test public void testThenApply() throws Exception {CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {long result = new Random().nextInt();System.out.println("result:" + result);return result;}).thenApply(t -> {long result = t * 5;System.out.println("result2:" + result);return result;});Long result = future.get();System.out.println(result); }handle
handle是執(zhí)行任務(wù)完成時對結(jié)果的處理。與thenApply方法處理方式基本一致,
不同的是,handle是在任務(wù)完成后執(zhí)行,不管這個任務(wù)是否出現(xiàn)了異常,而thenApply只可以執(zhí)行正常的任務(wù),任務(wù)出現(xiàn)了異常則不執(zhí)行。
代碼示例
/*** handle 是執(zhí)行任務(wù)完成時對結(jié)果的處理。* handle 方法和 thenApply 方法處理方式基本一樣。* 不同的是 handle 是在任務(wù)完成后再執(zhí)行,還可以處理異常的任務(wù)。* thenApply 只可以執(zhí)行正常的任務(wù),任務(wù)出現(xiàn)異常則不執(zhí)行 thenApply 方法。** @throws Exception*/ @Test public void testHandle() throws Exception {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {int i = 10 / 0;return i;}).handle((p, t) -> {int result = -1;if (t == null) {result = p * 2;} else {System.out.println(t.getMessage());}return result;});System.out.println(future.get()); }thenAccept
thenAccept用于接收任務(wù)的處理結(jié)果,并消費處理,無返回結(jié)果。
代碼示例
/*** 接收任務(wù)的處理結(jié)果,并消費處理,無返回結(jié)果。** @throws Exception*/ @Test public void testThenAccept() throws Exception {CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {return new Random().nextInt();}).thenAccept(num -> {System.out.println(num);});System.out.println(future.get()); }thenRun
上個任務(wù)執(zhí)行完之后再執(zhí)行thenRun的任務(wù),二者只存在先后執(zhí)行順序的關(guān)系,后者并不依賴前者的計算結(jié)果,同時,沒有返回值。
代碼示例
/*** 該方法同 thenAccept 方法類似。不同的是上個任務(wù)處理完成后,并不會把計算的結(jié)果傳給 thenRun 方法。* 只是處理玩任務(wù)后,執(zhí)行 thenRun 的后續(xù)操作。** @throws Exception*/ @Test public void testThenRun() throws Exception {CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {return new Random().nextInt();}).thenRun(() -> {System.out.println("進入了thenRun");});System.out.println(future.get()); }thenCombine
thenCombine會把兩個CompletableFuture的任務(wù)都執(zhí)行完成后,把兩個任務(wù)的返回值一塊交給thenCombine處理(有返回值)。
代碼示例
/*** thenCombine 會把 兩個 CompletableFuture 的任務(wù)都執(zhí)行完成后* 把兩個任務(wù)的結(jié)果一塊交給 thenCombine 來處理。** @throws Exception*/ @Test public void testThenCombine() throws Exception {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {return "hello";}).thenCombine(CompletableFuture.supplyAsync(() -> {return "world";}), (t1, t2) -> {return t1 + " " + t2;});System.out.println(future.get()); }thenAcceptBoth
當(dāng)兩個CompletableFuture都執(zhí)行完成后,把結(jié)果一塊交給thenAcceptBoth處理(無返回值)
代碼示例
/*** 當(dāng)兩個 CompletableFuture 都執(zhí)行完成后* 把結(jié)果一塊交給thenAcceptBoth來進行消耗** @throws Exception*/ @Test public void testThenAcceptBoth() throws Exception {CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {return "hello";}).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {return "world";}), (t1, t2) -> {System.out.println(t1 + " " + t2);});System.out.println(future.get()); }applyToEither
兩個CompletableFuture,誰執(zhí)行返回的結(jié)果快,就用哪個的結(jié)果進行下一步操作(有返回值)。
代碼示例
/*** 兩個CompletableFuture,誰執(zhí)行返回的結(jié)果快,我就用那個CompletionStage的結(jié)果進行下一步的轉(zhuǎn)化操作** @throws Exception*/ @Test public void testApplyToEither() throws Exception {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return "hello";}).applyToEither(CompletableFuture.supplyAsync(() -> {return "world";}), (t) -> {return t;});System.out.println(future.get()); }acceptEither
兩個CompletableFuture,誰執(zhí)行返回的結(jié)果快,就用哪個的結(jié)果進行下一步操作(無返回值)。
代碼示例
/*** 兩個CompletableFuture,誰執(zhí)行返回的結(jié)果快,我就用那個CompletionStage的結(jié)果進行下一步的消耗操作。** @throws Exception*/ @Test public void testAcceptEither() throws Exception {CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {return "hello";}).acceptEither(CompletableFuture.supplyAsync(() -> {return "world";}), t1 -> {System.out.println(t1);});System.out.println(future.get()); }runAfterEither
兩個CompletableFuture,任何一個完成了都會執(zhí)行下一步操作
代碼示例
/*** 兩個CompletableFuture,任何一個完成了都會執(zhí)行下一步的操作** @throws Exception*/ @Test public void testRunAfterEither() throws Exception {CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {return "hello";}).runAfterEither(CompletableFuture.supplyAsync(() -> {return "world";}), () -> {System.out.println("執(zhí)行完了");});System.out.println(future.get()); }runAfterBoth
兩個CompletableFuture,都完成了才會執(zhí)行下一步操作。
代碼示例
/*** 兩個CompletableFuture,都完成了計算才會執(zhí)行下一步的操作** @throws Exception*/ @Test public void testRunAfterBoth() throws Exception {CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {return "hello";}).runAfterBoth(CompletableFuture.supplyAsync(() -> {return "world";}), () -> {System.out.println("執(zhí)行完了");});System.out.println(future.get()); }thenCompose
thenCompose方法允許對兩個CompletableFuture進行流水線操作,當(dāng)?shù)谝粋€操作完成時,將其結(jié)果作為參數(shù)傳遞給第二個操作。
代碼示例
/*** thenCompose 方法允許你對兩個 CompletableFuture 進行流水線操作,* 第一個操作完成時,將其結(jié)果作為參數(shù)傳遞給第二個操作。* @throws Exception*/ @Test public void testThenCompose() throws Exception {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {int t = new Random().nextInt();System.out.println(t);return t;}).thenCompose(param -> {return CompletableFuture.supplyAsync(() -> {int t = param * 2;System.out.println("t2=" + t);return t;});});System.out.println(future.get()); }結(jié)語
CompletableFuture是jdk8中新增的一個特性,特點是非阻塞異步編程。合理的使用非阻塞異步編程,比如將兩步關(guān)聯(lián)不大的操作并行處理,可以優(yōu)化代碼的執(zhí)行效率。同時,在高并發(fā)場景下,CompletableFuture也可以進行有效的性能優(yōu)化。
*************************************優(yōu)雅的分割線 **********************************
分享一波:程序員賺外快-必看的巔峰干貨
如果以上內(nèi)容對你覺得有用,并想獲取更多的賺錢方式和免費的技術(shù)教程
請關(guān)注微信公眾號:HB荷包
一個能讓你學(xué)習(xí)技術(shù)和賺錢方法的公眾號,持續(xù)更新
總結(jié)
以上是生活随笔為你收集整理的Java中如何使用非阻塞异步编程——CompletableFuture的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: dubbo控制台安装
- 下一篇: Java Html转pdf实战