将一个简单远程调用的方式例子改为异步调用
生活随笔
收集整理的這篇文章主要介紹了
将一个简单远程调用的方式例子改为异步调用
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
將一個簡單遠程調用的方式例子改為異步調用
package com.xsxy.asynctest.test03;import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors;/*** 將一個簡單遠程調用的方式例子改為異步調用*/ public class Test04CompletableFuturePractice {public static void main(String[] args) {List<String> ipList = new ArrayList<>();for (int i = 0; i < 10; i++) {ipList.add("192.168.0." + i);}syncMethod(ipList);AsyncMethod(ipList);}/*** 同步順序調用耗時 time:10138** @param ipList*/public static void syncMethod(List<String> ipList) {long start = System.currentTimeMillis();ipList.forEach(ip -> {rpcCall(ip, 8080);});System.out.println("time:" + (System.currentTimeMillis() - start));}/*** 異步調用耗時 time:4029** @param ipList*/public static void AsyncMethod(List<String> ipList) {long start = System.currentTimeMillis();// 同步調用轉異步List<CompletableFuture<String>> completableFutureList = ipList.stream().map(ip -> CompletableFuture.supplyAsync(() -> rpcCall(ip, 9090))).collect(Collectors.toList());// 阻塞等待所有調用都結束List<String> resList = completableFutureList.stream().map(CompletableFuture::join).collect(Collectors.toList());// resList.forEach(System.out::println);System.out.println("time:" + (System.currentTimeMillis() - start));}public static String rpcCall(String ip, int port) {System.out.println("rpcCall=ip:" + ip + ",port:" + port);try {TimeUnit.SECONDS.sleep(1);} catch (Exception e) {}return "res" + port;} }其他completableFuture的使用例子
package com.xsxy.asynctest.test03;import java.util.concurrent.*; import java.util.function.Consumer; import java.util.function.Supplier;public class Test01CompletableFutureSet {public static final int AVALIABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();public static ThreadPoolExecutor executor = new ThreadPoolExecutor(AVALIABLE_PROCESSORS, AVALIABLE_PROCESSORS, 1, TimeUnit.MINUTES,new LinkedBlockingQueue<>(5), new ThreadPoolExecutor.CallerRunsPolicy());public static void main(String[] args) throws Exception {// waitNotify();// runAsync();// runAsync1WithBizExecutor();// supplyAsync();// thenRun();thenAccept();}/*** 這里使用CompletableFuture實現了 通知等待模型,主線程調用future的get()方法等待future返回結果,一開始由于future結果沒有設置,* 所以主線程被阻塞掛起,等異步任務休眠3s,然后調用future的complete方法模擬主線程等待的條件完成,這時候主線程就會從get()方法返回。*/public static void waitNotify() throws ExecutionException, InterruptedException, TimeoutException {CompletableFuture completableFuture = new CompletableFuture();executor.execute(() -> {System.out.println("executor sleep 3 seconds");try {TimeUnit.SECONDS.sleep(3);} catch (Exception e) {}completableFuture.complete("executor done");});System.out.println("main wait completableFurture result");// 阻塞獲取completableFuture結果System.out.println(completableFuture.get());// 設置超時時間,超時時會保timeoutException異常// System.out.println(completableFuture.get(1, TimeUnit.SECONDS));System.out.println("main end");}/*** 實現無返回值的異步計算:當你想異步執行一個任務,并且不需要任務的執行結果時可以使用該方法,比如異步打日志,異步做消息通知等* 在默認情況下,runAsync(Runnablerunnable)方法是使用整個JVM內唯一的ForkJoinPool.commonPool()線程池來執行異步任務的,* 使用runAsync (Runnable runnable, Executor executor)*/public static void runAsync() throws Exception {CompletableFuture future = CompletableFuture.runAsync(() -> {System.out.println("execute completableFuture task");try {TimeUnit.SECONDS.sleep(3);} catch (Exception e) {}});System.out.println("mian");// 無返回值System.out.println(future.get());System.out.println("main end");}/*** 使用自定義線程池 實現無返回值的異步計算*/public static void runAsync1WithBizExecutor() throws Exception {// 使用自定義的線程池CompletableFuture future = CompletableFuture.runAsync(() -> {System.out.println("run completableFuture task");try {TimeUnit.SECONDS.sleep(3);} catch (Exception e) {}}, executor);System.out.println("main");System.out.println(future.get());System.out.println("main end");}/*** 實現有返回值的異步計算* 在默認情況下,supplyAsync(Supplier<U> supplier)方法是使用整個JVM內唯一的ForkJoinPool.commonPool()線程池來執行異步任務的,* 使用supply-Async(Supplier<U> supplier,Executor executor)方法允許我們使用自己制定的線程池來執行異步任務*/public static void supplyAsync() throws Exception {CompletableFuture future = CompletableFuture.supplyAsync(new Supplier<Object>() {@Overridepublic Object get() {System.out.println("completableFuture supplyAsync run");try {TimeUnit.SECONDS.sleep(3);} catch (Exception e) {}// 返回異步計算結果return "supplyAsync return";}});System.out.println("main");System.out.println(future.get());System.out.println("main end");}/*** 基于thenRun實現異步任務,執行完畢后,激活異步任務B執行,需要注意的是,這種方式激活的異步任務B是拿不到任務A的執行結果的*/public static void thenRun() throws ExecutionException, InterruptedException {CompletableFuture future = CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {System.out.println("completableFuture run");try {TimeUnit.SECONDS.sleep(2);} catch (Exception e) {}return "completableFuture supplyAsync return";}});CompletableFuture futuretwo = future.thenRun(() -> {System.out.println("thenRun --");try {TimeUnit.SECONDS.sleep(1);} catch (Exception e) {}});System.out.println("main");System.out.println(futuretwo.get());System.out.println("main end");}/*** 基于thenAccept實現異步任務,執行完畢后,激活異步任務B執行,需要注意的是,這種方式激活的異步任務B是可以拿到任務A的執行結果的*/public static void thenAccept() throws ExecutionException, InterruptedException {CompletableFuture future = CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {System.out.println("completableFuture run");try {TimeUnit.SECONDS.sleep(2);} catch (Exception e) {}return "completableFuture supplyAsync return";}});CompletableFuture futuretwo = future.thenAccept(new Consumer<String>() {@Overridepublic void accept(String o) {System.out.println(o);}});System.out.println("main");System.out.println(futuretwo.get());System.out.println("main end");} }三
package com.xsxy.asynctest.test03;import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Supplier;public class Test02TwoCompletableFuture {/*** CompletableFuture功能強大的原因之一是其可以讓兩個或者多個Completable-Future進行運算來產生結果* <p>* main函數中首先調用方法doSomethingOne("123")開啟了一個異步任務,并返回了對應的CompletableFuture對象,* 我們取名為future1,然后在future1的基礎上調用了thenCompose方法,企圖讓future1執行完畢后,激活使用其結果作* 為doSomethingTwo(String companyId)方法的參數的任務*/public static void main(String[] args) throws Exception {// 串行// testThenCompose();// 并行計算// testThencombine();// 批量testAllOf();}/*** 基于thenCompose實現當一個CompletableFuture執行完畢后,執行另外一個CompletableFuture*/public static void testThenCompose() throws Exception {CompletableFuture<String> future = doSomething1("123").thenCompose(id -> doSomething2(id));// CompletableFuture<String> future = doSomething1("123").thenCompose(Test02TwoCompletableFuture::doSomething2);System.out.println("main");System.out.println(future.get());System.out.println("main end");}/*** 基于thenCombine實現當一個CompletableFuture執行完畢后,執行另外一個CompletableFuture*/public static void testThencombine() throws Exception {// CompletableFuture<String> future = doSomething1("123").thenCombine(doSomething2("456"), (str1, str2) -> str1 + ":" + str2);CompletableFuture<String> future = doSomething1("123").thenCombine(doSomething2("456"), (str1, str2) -> {return str1 + ":" + str2;});System.out.println("main");System.out.println(future.get());System.out.println("main end");}/*** 基于allOf等待多個并發運行的CompletableFuture任務執行完畢** 調用了四次doSomethingOne方法,分別返回一個CompletableFuture對象,然后收集這些CompletableFuture到futureList列表。* 調用allOf方法把多個CompletableFuture轉換為一個result,代碼3在result上調用get()方法會阻塞調用線程,* 直到futureList列表中所有任務執行完畢才返回*/public static void testAllOf() throws ExecutionException, InterruptedException {List<CompletableFuture<String>> futureList = new ArrayList<>();futureList.add(doSomething1("1"));futureList.add(doSomething1("2"));futureList.add(doSomething1("3"));futureList.add(doSomething1("4"));CompletableFuture<Void> res = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()]));System.out.println("main");// 等待所有的future執行完畢System.out.println(res.get());System.out.println("main end ");}public static CompletableFuture<String> doSomething1(String id) {return CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {System.out.println("doSomething1 execute");try {TimeUnit.SECONDS.sleep(2);} catch (Exception e) {}return id;}});}public static CompletableFuture<String> doSomething2(String id) {return CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {System.out.println("doSomething2 execute");try {TimeUnit.SECONDS.sleep(2);} catch (Exception e) {}return id + "doSomething2";}});} }四
package com.xsxy.asynctest.test03;import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit;public class Test03CompletableFutureWithException {/*** 之前的測試completableFuture都是基于流程正常流轉,如果出現異常怎么處理*/public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = new CompletableFuture<>();new Thread(() -> {try {TimeUnit.SECONDS.sleep(2);// 測試異常if (true) {throw new RuntimeException("");}future.complete("complete");} catch (Exception e) {e.printStackTrace();// 先注釋掉, 和下邊的輸出語句同時放開(注釋掉32,放開33)// future.completeExceptionally(e);}}, "thread-1").start();System.out.println("main");System.out.println(future.get());// System.out.println(future.exceptionally(t -> "aaaaaa").get());System.out.println("main end");} }總結
以上是生活随笔為你收集整理的将一个简单远程调用的方式例子改为异步调用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 远程debug
- 下一篇: 将一个简单远程调用的方式例子改为异步调用