JUC系列(十一) | Java 8 CompletableFuture 异步编程
多線程一直Java開發(fā)中的難點,也是面試中的常客,趁著還有時間,打算鞏固一下JUC方面知識,我想機(jī)會隨處可見,但始終都是留給有準(zhǔn)備的人的,希望我們都能加油!!!
沉下去,再浮上來,我想我們會變的不一樣的
來自朋友圈
作者:徐四喜
CompletableFuture
一、什么是CompletableFuture?
在Java中CompletableFuture用于異步編程,異步通常意味著非阻塞,可以使我們的任務(wù)單獨運行在與主線程分離的其他線程中,并且通過回調(diào)可以在主線程中得到異步任務(wù)的執(zhí)行狀態(tài),是否完成,和是否異常等信息。
在這種方式中,主線程不會被阻塞,因為子線程是另外一條線程在執(zhí)行,所以不需要一直等到子線程完成。主線程就可以并行的執(zhí)行其他任務(wù)。這種并行方式,可以極大的提供程序性能。
CompletableFuture實現(xiàn)了 Future, CompletionStage接口。
二、Future 與 CompletableFuture
CompletableFuture是 FutureAPI的擴(kuò)展。
Future表示異步計算的結(jié)果。 提供了檢查計算是否完成、等待計算完成以及檢索計算結(jié)果的方法。 結(jié)果只能在計算完成后使用get方法檢索,必要時阻塞,直到它準(zhǔn)備好。 取消由cancel方法執(zhí)行。 提供了其他方法來確定任務(wù)是正常完成還是被取消。 一旦計算完成,就不能取消計算。
Future 的主要缺點如下:
(1)不支持手動完成
(2)Future 的結(jié)果在非阻塞的情況下,不能執(zhí)行更進(jìn)一步的操作
- Future不會通知你它已經(jīng)完成了,它提供了一個阻塞的 get() 方法通知你結(jié)果。你無法給 Future 植入一個回調(diào)函數(shù),當(dāng) Future結(jié)果可用的時候,用該回調(diào)函數(shù)自動的調(diào)用 Future 的結(jié)果。
(3)不能夠支持鏈?zhǔn)秸{(diào)用
- 對于 Future的執(zhí)行結(jié)果,我們想繼續(xù)傳到下一個 Future處理使用,從而形成一個鏈?zhǔn)降恼{(diào)用,這在 Future 中是沒法實現(xiàn)的。
- 鏈?zhǔn)秸{(diào)用就是將這一個執(zhí)行結(jié)果,繼續(xù)傳遞給下一個繼續(xù)使用,形成一條鏈。即職責(zé)鏈模式,例如Web中的Filter。
(4)不支持多個 Future 合并
- 例如我一個Futrue計算 10的平方,另一個Futrue計算100的平方,我沒有辦法直接將他們合起來。
(5)不支持異常處理
-
Future 的 API 沒有任何的異常處理的 api,所以運行時,很有可能無法定位到錯誤。
-
Future API:
- public interface Future<V> {boolean cancel(boolean mayInterruptIfRunning); //嘗試取消此任務(wù)的執(zhí)行。boolean isCancelled();//如果此任務(wù)在正常完成之前被取消,則返回true boolean isDone(); //如果此任務(wù)完成,則返回true 。 完成可能是由于正常終止、異常或取消——在所有這些情況下,此方法將返回true V get() throws InterruptedException, ExecutionException; //獲得任務(wù)計算結(jié)果V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;//可等待多少時間去獲得任務(wù)計算結(jié)果 }
三、應(yīng)用
3.1、創(chuàng)建CompletableFuture對象
CompletableFuture提供了四個靜態(tài)方法用來創(chuàng)建CompletableFuture對象:
//runAsync 返回void 函數(shù)第二個參數(shù)表示是用我們自己創(chuàng)建的線程池,否則采用默認(rèn)的ForkJoinPool.commonPool() public static CompletableFuture<Void> runAsync(Runnable runnable) public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) //supplyAsync 異步返回一個結(jié)果 函數(shù)第二個參數(shù)表示是用我們自己創(chuàng)建的線程池,否則采用默認(rèn)的ForkJoinPool.commonPool() //Supplier 是一個函數(shù)式接口,代表是一個生成者的意思 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)3.2、場景一:主動完成任務(wù)
場景:主線程里面創(chuàng)建一個 CompletableFuture,然后主線程調(diào)用 get 方法會 阻塞,最后我們在一個子線程中使其終止。
/*** @Author: crush* @Date: 2021-08-23 9:08* version 1.0*/ public class CompletableFutureDemo1 {/*** 主線程里面創(chuàng)建一個 CompletableFuture,然后主線程調(diào)用 get 方法會阻塞,最后我們在一個子線程中使其終止** @param args*/public static void main(String[] args) throws Exception {CompletableFuture<String> future = new CompletableFuture<>();new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + "子線程開始干活");//子線程睡 5 秒Thread.sleep(5000); // //在子線程中完成主線程 如果注釋掉這一行代碼將會一直停住future.complete("success");} catch (Exception e) {e.printStackTrace();}}, "A").start();//主線程調(diào)用 get 方法阻塞System.out.println("主線程調(diào)用 get 方法獲取結(jié)果為: " + future.get());System.out.println("主線程完成,阻塞結(jié)束!!!!!!");} }3.3、場景二:沒有返回值的異步任務(wù)
runAsync:返回一個新的 CompletableFuture,它在運行給定操作后由在ForkJoinPool.commonPool()運行的任務(wù)異步完成。
如果你想異步的運行一個后臺任務(wù)并且不需要任務(wù)返回結(jié)果,就可以使用runAsync
/*** @Author: crush* @Date: 2021-08-23 9:08* version 1.0*/ public class CompletableFutureDemo2 {/*** 沒有返回值的異步任務(wù)** @param args*/public static void main(String[] args) throws Exception {System.out.println("主線程開始");//運行一個沒有返回值的異步任務(wù)CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {try {System.out.println("子線程啟動干活");Thread.sleep(5000);System.out.println("子線程完成");} catch (Exception e) {e.printStackTrace();}});//主線程阻塞future.get();System.out.println("主線程結(jié)束");} }3.4、場景三:有返回值的異步任務(wù)
supplyAsync:返回任務(wù)結(jié)果。
CompletableFuture.supplyAsync()它持有supplier<T> 并且返回CompletableFuture<T>,T 是通過調(diào)用 傳入的supplier取得的值的類型。
Supplier<T>是一個簡單的函數(shù)式接口,表示supplier的結(jié)果。它有一個get()方法,該方法可以寫入你的后臺任務(wù)中,并且返回結(jié)果。
public static <T> CompletableFuture<T> supplyAsync(Supplier<T> supplier) {return asyncSupplyStage(ASYNC_POOL, supplier); } /*** @Author: crush* @Date: 2021-08-23 9:08* version 1.0*/ public class CompletableFutureDemo2 {/*** 有返回值的異步任務(wù)** @param args*/public static void main(String[] args) throws Exception {System.out.println("主線程開始");//運行一個沒有返回值的異步任務(wù)CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {System.out.println("子線程啟動干活");Thread.sleep(5000);} catch (Exception e) {e.printStackTrace();}return "子線程任務(wù)完成";});//主線程阻塞System.out.println(future.get());System.out.println("主線程結(jié)束");} } /*** 主線程開始* 子線程啟動干活* 子線程任務(wù)完成* 主線程結(jié)束*/3.5、場景四:線程串行化
當(dāng)一個線程依賴另一個線程時,可以使用 thenApply方法來把這兩個線程串行化。
/*** @Author: crush* @Date: 2021-08-23 9:08* version 1.0*/ public class CompletableFutureDemo4 {private static String action="";/*** 線程依賴* 1、我到了燒烤店,* 2、開始點燒烤* 3、和朋友次完燒烤 ,給女朋友帶奶茶回去* @param args*/public static void main(String[] args) throws Exception {System.out.println("主線程開始");CompletableFuture<String> future =CompletableFuture.supplyAsync(() -> {action="和朋友一起去次燒烤 ";return action;}).thenApply(string -> {return action+"開始點燒烤!!";}).thenApply(String->{return action+"和朋友次完燒烤,給女朋友帶杯奶茶回去!!";});String str = future.get();System.out.println("主線程結(jié)束, 子線程的結(jié)果為:" + str);} } /*** 主線程開始* 主線程結(jié)束, 子線程的結(jié)果為:和朋友一起去次燒烤 和朋友次完燒烤,給女朋友帶杯奶茶回去!!*/3.6、場景五:thenAccept 消費處理結(jié)果
如果你不想從你的回調(diào)函數(shù)中返回任何東西,僅僅想在Future完成后運行一些代碼片段,你可以使用thenAccept()和 thenRun()方法,這些方法經(jīng)常在調(diào)用鏈的最末端的最后一個回調(diào)函數(shù)中使用。
thenAccept消費處理結(jié)果, 接收任務(wù)的處理結(jié)果,并消費處理,無返回結(jié)果。
/*** @Author: crush* @Date: 2021-08-23 9:08* version 1.0*/ public class CompletableFutureDemo5 {private static String action = "";public static void main(String[] args) throws Exception {System.out.println("主線程開始");CompletableFuture.supplyAsync(() -> {try {action = "逛淘寶,想買雙鞋 ";} catch (Exception e) {e.printStackTrace();}return action;}).thenApply(string -> {return action + "選中了,下單成功!!";}).thenApply(String -> {return action + "等待快遞到來";}).thenAccept(new Consumer<String>() {@Overridepublic void accept(String s) {System.out.println("子線程全部處理完成,最后調(diào)用了 accept,結(jié)果為:" + s);}});} } /**主線程開始子線程全部處理完成,最后調(diào)用了 accept,結(jié)果為:逛淘寶,想買雙鞋 等待快遞到來*/3.7、場景六:異常處理
exceptionally 異常處理,出現(xiàn)異常時觸發(fā),可以回調(diào)給你一個從原始Future中生成的錯誤恢復(fù)的機(jī)會。你可以在這里記錄這個異常并返回一個默認(rèn)值。
/*** @Author: crush* @Date: 2021-08-23 9:08* version 1.0*/ public class CompletableFutureDemo6 {public static void main(String[] args) throws Exception{System.out.println("主線程開始");CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {int i= 1/0;System.out.println("子線程執(zhí)行中");return i;}).exceptionally(ex -> {System.out.println(ex.getMessage());return -1;});System.out.println(future.get());} } /*** 主線程開始* java.lang.ArithmeticException: / by zero* -1*/使用 handle() 方法處理異常
API提供了一個更通用的方法 - handle()從異常恢復(fù),無論一個異常是否發(fā)生它都會被調(diào)用
3.8、場景七: 結(jié)果合并
thenCompose 合并兩個有依賴關(guān)系的 CompletableFutures的執(zhí)行結(jié)果
/*** @Author: crush* @Date: 2021-08-23 9:08* version 1.0*/ public class CompletableFutureDemo7 {private static Integer num = 10;public static void main(String[] args) throws Exception {System.out.println("主線程開始");//第一步加 10CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {System.out.println("讓num+10;任務(wù)開始");num += 10;return num;});//合并CompletableFuture<Integer> future1 = future.thenCompose(i ->//再來一個 CompletableFutureCompletableFuture.supplyAsync(() -> {return i + 1;}));System.out.println(future.get());System.out.println(future1.get());} } /*** 主線程開始* 讓num+10;任務(wù)開始* 20* 21*/thenCombine 合并兩個沒有依賴關(guān)系的 CompletableFutures任務(wù)
package com.crush.juc09;import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.function.BiFunction;/*** @Author: crush* @Date: 2021-08-23 9:08* version 1.0*/ public class CompletableFutureDemo8 {private static Integer sum = 0;private static Integer count = 1;public static void main(String[] args) throws Exception{System.out.println("主線程開始");CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {System.out.println("從1+...+50開始");for (int i=1;i<=50;i++){sum+=i;}System.out.println("sum::"+sum);return sum;});CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {System.out.println("從1*...*10開始");for (int i=1;i<=10;i++){count=count*i;}System.out.println("count::"+count);return count;});//合并兩個結(jié)果CompletableFuture<Object> future = job1.thenCombine(job2, newBiFunction<Integer, Integer, List<Integer>>() {@Overridepublic List<Integer> apply(Integer a, Integer b) {List<Integer> list = new ArrayList<>();list.add(a);list.add(b);return list;}});System.out.println("合并結(jié)果為:" + future.get());} } /**主線程開始從1*...*10開始從1+...+50開始sum::1275count::3628800合并結(jié)果為:[1275, 3628800]*/3.9、場景八:合并多個任務(wù)的結(jié)果
allOf 與 anyOf
allOf: 一系列獨立的 future任務(wù),等其所有的任務(wù)執(zhí)行完后做一些事情
/*** @Author: crush* @Date: 2021-08-23 9:08* version 1.0*/ public class CompletableFutureDemo9 {private static Integer num = 10;public static void main(String[] args) throws Exception{System.out.println("主線程開始");List<CompletableFuture> list = new ArrayList<>();CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {System.out.println("加 10 任務(wù)開始");num += 10;return num;});list.add(job1);CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {System.out.println("乘以 10 任務(wù)開始");num = num * 10;return num;});list.add(job2);CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {System.out.println("減以 10 任務(wù)開始");num = num - 10;return num;});list.add(job3);CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {System.out.println("除以 10 任務(wù)開始");num = num / 10;return num;});list.add(job4);//多任務(wù)合并List<Integer> collect =list.stream().map(CompletableFuture<Integer>::join).collect(Collectors.toList());System.out.println(collect);}} /**主線程開始乘以 10 任務(wù)開始加 10 任務(wù)開始減以 10 任務(wù)開始除以 10 任務(wù)開始[110, 100, 100, 10] */anyOf: 只要在多個 future里面有一個返回,整個任務(wù)就可以結(jié)束,而不需要等到每一個 future 結(jié)束
package com.crush.juc09;import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors;/*** @Author: crush* @Date: 2021-08-23 9:08* version 1.0*/ public class CompletableFutureDemo10 {private static Integer num = 10;/*** 先對一個數(shù)加 10,然后取平方* @param args*/public static void main(String[] args) throws Exception{System.out.println("主線程開始");CompletableFuture<Integer>[] futures = new CompletableFuture[4];CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {try{Thread.sleep(5000);System.out.println("加 10 任務(wù)開始");num += 10;return num;}catch (Exception e){return 0;}});futures[0] = job1;CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {try{Thread.sleep(2000);System.out.println("乘以 10 任務(wù)開始");num = num * 10;return num;}catch (Exception e){return 1;}});futures[1] = job2;CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {try{Thread.sleep(3000);System.out.println("減以 10 任務(wù)開始");num = num - 10;return num;}catch (Exception e){return 2;}});futures[2] = job3;CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {try{Thread.sleep(4000);System.out.println("除以 10 任務(wù)開始");num = num / 10;return num;}catch (Exception e){return 3;}});futures[3] = job4;CompletableFuture<Object> future = CompletableFuture.anyOf(futures);System.out.println(future.get());} } //主線程開始 //乘以 10 任務(wù)開始 //100四、小結(jié)
本文只是做了一點簡單介紹,還需要大家更深入的了解。
🌈自言自語
最近又開始了JUC的學(xué)習(xí),感覺Java內(nèi)容真的很多,但是為了能夠走的更遠(yuǎn),還是覺得應(yīng)該需要打牢一下基礎(chǔ)。
最近在持續(xù)更新中,如果你覺得對你有所幫助,也感興趣的話,關(guān)注我吧,讓我們
一起學(xué)習(xí),一起討論吧。
你好,我是博主寧在春,Java學(xué)習(xí)路上的一顆小小的種子,也希望有一天能扎根長成蒼天大樹。
希望與君共勉😁
我們:待別時相見時,都已有所成。
總結(jié)
以上是生活随笔為你收集整理的JUC系列(十一) | Java 8 CompletableFuture 异步编程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Minio 小技巧 | 通过编码设置桶策
- 下一篇: Java设计模式-责任链模式