日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

通过实例理解 JDK8 的 CompletableFuture

發(fā)布時(shí)間:2023/12/3 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 通过实例理解 JDK8 的 CompletableFuture 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

轉(zhuǎn)載自?通過(guò)實(shí)例理解 JDK8 的 CompletableFuture

?

前言

Java 5 并發(fā)庫(kù)主要關(guān)注于異步任務(wù)的處理,它采用了這樣一種模式,producer 線程創(chuàng)建任務(wù)并且利用阻塞隊(duì)列將其傳遞給任務(wù)的 consumer。這種模型在 Java 7 和 8 中進(jìn)一步發(fā)展,并且開(kāi)始支持另外一種風(fēng)格的任務(wù)執(zhí)行,那就是將任務(wù)的數(shù)據(jù)集分解為子集,每個(gè)子集都可以由獨(dú)立且同質(zhì)的子任務(wù)來(lái)負(fù)責(zé)處理。

這種風(fēng)格的基礎(chǔ)庫(kù)也就是 fork/join 框架,它允許程序員規(guī)定數(shù)據(jù)集該如何進(jìn)行分割,并且支持將子任務(wù)提交到默認(rèn)的標(biāo)準(zhǔn)線程池中,也就是"通用的"ForkJoinPool。Java 8 中,fork/join 并行功能借助并行流的機(jī)制變得更加具有可用性。但是,不是所有的問(wèn)題都適合這種風(fēng)格的并行處理:所處理的元素必須是獨(dú)立的,數(shù)據(jù)集要足夠大,并且在并行加速方面,每個(gè)元素的處理成本要足夠高,這樣才能補(bǔ)償建立 fork/join 框架所消耗的成本。CompletableFuture 類則是 Java 8 在并行流方面的創(chuàng)新。

準(zhǔn)備知識(shí)

異步計(jì)算

所謂異步調(diào)用其實(shí)就是實(shí)現(xiàn)一個(gè)可無(wú)需等待被調(diào)用函數(shù)的返回值而讓操作繼續(xù)運(yùn)行的方法。在 Java 語(yǔ)言中,簡(jiǎn)單的講就是另啟一個(gè)線程來(lái)完成調(diào)用中的部分計(jì)算,使調(diào)用繼續(xù)運(yùn)行或返回,而不需要等待計(jì)算結(jié)果。但調(diào)用者仍需要取線程的計(jì)算結(jié)果。

回調(diào)函數(shù)

回調(diào)函數(shù)比較通用的解釋是,它是一個(gè)通過(guò)函數(shù)指針調(diào)用的函數(shù)。如果你把函數(shù)的指針(地址)作為參數(shù)傳遞給另一個(gè)函數(shù),當(dāng)這個(gè)指針被用為調(diào)用它所指向的函數(shù)時(shí),我們就說(shuō)這是回調(diào)函數(shù)。回調(diào)函數(shù)不是由該函數(shù)的實(shí)現(xiàn)方直接調(diào)用,而是在特定的事件或條件發(fā)生時(shí)由另外一方調(diào)用的,用于對(duì)該事件或條件進(jìn)行響應(yīng)。

回調(diào)函數(shù)的機(jī)制:

(1)定義一個(gè)回調(diào)函數(shù);

(2)提供函數(shù)實(shí)現(xiàn)的一方在初始化時(shí)候,將回調(diào)函數(shù)的函數(shù)指針注冊(cè)給調(diào)用者;

(3)當(dāng)特定的事件或條件發(fā)生的時(shí)候,調(diào)用者使用函數(shù)指針調(diào)用回調(diào)函數(shù)對(duì)事件進(jìn)行處理。

回調(diào)函數(shù)通常與原始調(diào)用者處于同一層次,如圖 1 所示:

圖 1 回調(diào)函數(shù)示例圖

Future 接口介紹

JDK5 新增了 Future 接口,用于描述一個(gè)異步計(jì)算的結(jié)果。雖然 Future 以及相關(guān)使用方法提供了異步執(zhí)行任務(wù)的能力,但是對(duì)于結(jié)果的獲取卻是很不方便,只能通過(guò)阻塞或者輪詢的方式得到任務(wù)的結(jié)果。阻塞的方式顯然和我們的異步編程的初衷相違背,輪詢的方式又會(huì)耗費(fèi)無(wú)謂的 CPU 資源,而且也不能及時(shí)地得到計(jì)算結(jié)果,為什么不能用觀察者設(shè)計(jì)模式呢?即當(dāng)計(jì)算結(jié)果完成及時(shí)通知監(jiān)聽(tīng)者。

有一些開(kāi)源框架實(shí)現(xiàn)了我們的設(shè)想,例如 Netty 的 ChannelFuture 類擴(kuò)展了 Future 接口,通過(guò)提供 addListener 方法實(shí)現(xiàn)支持回調(diào)方式的異步編程。Netty 中所有的 I/O 操作都是異步的,這意味著任何的 I/O 調(diào)用都將立即返回,而不保證這些被請(qǐng)求的 I/O 操作在調(diào)用結(jié)束的時(shí)候已經(jīng)完成。取而代之地,你會(huì)得到一個(gè)返回的 ChannelFuture 實(shí)例,這個(gè)實(shí)例將給你一些關(guān)于 I/O 操作結(jié)果或者狀態(tài)的信息。當(dāng)一個(gè) I/O 操作開(kāi)始的時(shí)候,一個(gè)新的 Future 對(duì)象就會(huì)被創(chuàng)建。在開(kāi)始的時(shí)候,新的 Future 是未完成的狀態(tài)--它既非成功、失敗,也非被取消,因?yàn)?I/O 操作還沒(méi)有結(jié)束。如果 I/O 操作以成功、失敗或者被取消中的任何一種狀態(tài)結(jié)束了,那么這個(gè) Future 將會(huì)被標(biāo)記為已完成,并包含更多詳細(xì)的信息(例如:失敗的原因)。請(qǐng)注意,即使是失敗和被取消的狀態(tài),也是屬于已完成的狀態(tài)。阻塞方式的示例代碼如清單 1 所示。

清單 1 阻塞方式示例代碼

1

2

3

4

5

6

// Start the connection attempt.

ChannelFuture Future = bootstrap.connect(new InetSocketAddress(host, port));

// Wait until the connection is closed or the connection attempt fails.

Future.getChannel().getCloseFuture().awaitUninterruptibly();

// Shut down thread pools to exit.

bootstrap.releaseExternalResources();

上面代碼使用的是 awaitUninterruptibly 方法,源代碼如清單 2 所示。

清單 2 awaitUninterruptibly 源代碼

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

publicChannelFutureawaitUninterruptibly() {

????boolean interrupted = false;

????synchronized (this) {

????????//循環(huán)等待到完成

????????while (!done) {

????????????checkDeadLock();

????????????waiters++;

????????try {

????????????wait();

????????} catch (InterruptedException e) {

????????????//不允許中斷

????????????interrupted = true;

????????} finally {

????????????waiters--;

????????}

????}

}

????if (interrupted) {

????Thread.currentThread().interrupt();

}

return this;

}

清單 3 異步非阻塞方式示例代碼

1

2

3

4

5

6

7

8

9

10

// Start the connection attempt.

ChannelFuture Future = bootstrap.connect(new InetSocketAddress(host, port));

Future.addListener(new ChannelFutureListener(){

????public void operationComplete(final ChannelFuture Future)

????????throws Exception

????????{??????????

????}

});

// Shut down thread pools to exit.

bootstrap.releaseExternalResources();

可以明顯的看出,在異步模式下,上面這段代碼沒(méi)有阻塞,在執(zhí)行 connect 操作后直接執(zhí)行到 printTime("異步時(shí)間: "),隨后 connect 完成,Future 的監(jiān)聽(tīng)函數(shù)輸出 connect 操作完成。

非阻塞則是添加監(jiān)聽(tīng)類 ChannelFutureListener,通過(guò)覆蓋 ChannelFutureListener 的 operationComplete 執(zhí)行業(yè)務(wù)邏輯。

清單 4 異步非阻塞方式示例代碼

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

public void addListener(final ChannelFutureListener listener) {

????if (listener == null) {

????throw new NullPointerException("listener");

}

????booleannotifyNow = false;

????synchronized (this) {

????????if (done) {

????????notifyNow = true;

????} else {

????????if (firstListener == null) {

????????//listener 鏈表頭

????????firstListener = listener;

????} else {

????????if (otherListeners == null) {

????????otherListeners = new ArrayList<ChannelFutureListener>(1);

????????}

????????//添加到 listener 鏈表中,以便操作完成后遍歷操作

????????otherListeners.add(listener);

????}

????......

????if (notifyNow) {

????????//通知 listener 進(jìn)行處理

????????notifyListener(listener);

????????}

}

這部分代碼的邏輯很簡(jiǎn)單,就是注冊(cè)回調(diào)函數(shù),當(dāng)操作完成后自動(dòng)調(diào)用回調(diào)函數(shù),就達(dá)到了異步的效果。

CompletableFuture 類介紹?

Java 8 中, 新增加了一個(gè)包含 50 個(gè)方法左右的類--CompletableFuture,它提供了非常強(qiáng)大的 Future 的擴(kuò)展功能,可以幫助我們簡(jiǎn)化異步編程的復(fù)雜性,并且提供了函數(shù)式編程的能力,可以通過(guò)回調(diào)的方式處理計(jì)算結(jié)果,也提供了轉(zhuǎn)換和組合 CompletableFuture 的方法。

對(duì)于阻塞或者輪詢方式,依然可以通過(guò) CompletableFuture 類的 CompletionStage 和 Future 接口方式支持。

CompletableFuture 類聲明了 CompletionStage 接口,CompletionStage 接口實(shí)際上提供了同步或異步運(yùn)行計(jì)算的舞臺(tái),所以我們可以通過(guò)實(shí)現(xiàn)多個(gè) CompletionStage 命令,并且將這些命令串聯(lián)在一起的方式實(shí)現(xiàn)多個(gè)命令之間的觸發(fā)。

我們可以通過(guò) CompletableFuture.supplyAsync(this::sendMsg); 這么一行代碼創(chuàng)建一個(gè)簡(jiǎn)單的異步計(jì)算。在這行代碼中,supplyAsync 支持異步地執(zhí)行我們指定的方法,這個(gè)例子中的異步執(zhí)行方法是 sendMsg。當(dāng)然,我們也可以使用 Executor 執(zhí)行異步程序,默認(rèn)是 ForkJoinPool.commonPool()。

我們也可以在異步計(jì)算結(jié)束之后指定回調(diào)函數(shù),例如 CompletableFuture.supplyAsync(this::sendMsg) .thenAccept(this::notify);這行代碼中的 thenAccept 被用于增加回調(diào)函數(shù),在我們的示例中 notify 就成了異步計(jì)算的消費(fèi)者,它會(huì)處理計(jì)算結(jié)果。

CompletableFuture 類使用示例

接下來(lái)我們通過(guò) 20 個(gè)示例看看 CompletableFuture 類具體怎么用。

創(chuàng)建完整的 CompletableFuture

清單 5 示例代碼

1

2

3

4

5

static void completedFutureExample() {

????CompletableFuture<String>cf = CompletableFuture.completedFuture("message");

????assertTrue(cf.isDone());

????assertEquals("message", cf.getNow(null));

}

以上代碼一般來(lái)說(shuō)被用于啟動(dòng)異步計(jì)算,getNow(null)返回計(jì)算結(jié)果或者 null。

運(yùn)行簡(jiǎn)單的異步場(chǎng)景

清單 6 示例代碼

1

2

3

4

5

6

7

8

9

static void runAsyncExample() {

????CompletableFuture<Void>cf = CompletableFuture.runAsync(() -> {

????assertTrue(Thread.currentThread().isDaemon());

????randomSleep();

});

????assertFalse(cf.isDone());

????sleepEnough();

????assertTrue(cf.isDone());

}

以上代碼的關(guān)鍵點(diǎn)有兩點(diǎn):

  • CompletableFuture 是異步執(zhí)行方式;
  • 使用 ForkJoinPool 實(shí)現(xiàn)異步執(zhí)行,這種方式使用了 daemon 線程執(zhí)行 Runnable 任務(wù)。
  • 同步執(zhí)行動(dòng)作示例

    清單 7 示例代碼

    1

    2

    3

    4

    5

    6

    7

    static void thenApplyExample() {

    ????CompletableFuture<String>cf = CompletableFuture.completedFuture("message").thenApply(s -> {

    ????assertFalse(Thread.currentThread().isDaemon());

    ????returns.toUpperCase();

    ????});

    ????assertEquals("MESSAGE", cf.getNow(null));

    }

    以上代碼在異步計(jì)算正常完成的前提下將執(zhí)行動(dòng)作(此處為轉(zhuǎn)換成大寫字母)。

    異步執(zhí)行動(dòng)作示例?

    相較前一個(gè)示例的同步方式,以下代碼實(shí)現(xiàn)了異步方式,僅僅是在上面的代碼里的多個(gè)方法增加"Async"這樣的關(guān)鍵字。

    清單 8 示例代碼

    1

    2

    3

    4

    5

    6

    7

    8

    9

    static void thenApplyAsyncExample() {

    ????CompletableFuture<String>cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {

    ????assertTrue(Thread.currentThread().isDaemon());

    ????randomSleep();

    ????returns.toUpperCase();

    ????});

    ????assertNull(cf.getNow(null));

    ????assertEquals("MESSAGE", cf.join());

    }

    使用固定的線程池完成異步執(zhí)行動(dòng)作示例?

    我們可以通過(guò)使用線程池方式來(lái)管理異步動(dòng)作申請(qǐng),以下代碼基于固定的線程池,也是做一個(gè)大寫字母轉(zhuǎn)換動(dòng)作,代碼如清單 9 所示。

    清單 9 示例代碼

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    staticExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactory() {

    ????int count = 1;

    ????@Override

    ????public Thread newThread(Runnable runnable) {

    ????????return new Thread(runnable, "custom-executor-" + count++);

    ????}

    ????});

    ????????static void thenApplyAsyncWithExecutorExample() {

    ????????????CompletableFuture<String>cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {

    ????????????assertTrue(Thread.currentThread().getName().startsWith("custom-executor-"));

    ????????????assertFalse(Thread.currentThread().isDaemon());

    ????????????randomSleep();

    ????????????returns.toUpperCase();

    ????????}, executor);

    ????????assertNull(cf.getNow(null));

    ????????assertEquals("MESSAGE", cf.join());

    }

    作為消費(fèi)者消費(fèi)計(jì)算結(jié)果示例?

    假設(shè)我們本次計(jì)算只需要前一次的計(jì)算結(jié)果,而不需要返回本次計(jì)算結(jié)果,那就有點(diǎn)類似于生產(chǎn)者(前一次計(jì)算)-消費(fèi)者(本次計(jì)算)模式了,示例代碼如清單 10 所示。

    清單 10 示例代碼

    1

    2

    3

    4

    5

    6

    static void thenAcceptExample() {

    ????StringBuilder result = new StringBuilder();

    ????CompletableFuture.completedFuture("thenAccept message")

    ????.thenAccept(s ->result.append(s));

    ????assertTrue("Result was empty", result.length() > 0);

    }

    消費(fèi)者是同步執(zhí)行的,所以不需要在 CompletableFuture 里對(duì)結(jié)果進(jìn)行合并。

    異步消費(fèi)示例?

    相較于前一個(gè)示例的同步方式,我們也對(duì)應(yīng)有異步方式,代碼如清單 11 所示。

    清單 11 示例代碼

    1

    2

    3

    4

    5

    6

    7

    static void thenAcceptAsyncExample() {

    ????StringBuilder result = new StringBuilder();

    ????CompletableFuture<Void>cf = CompletableFuture.completedFuture("thenAcceptAsync message")

    ????.thenAcceptAsync(s ->result.append(s));

    ????cf.join();

    ????assertTrue("Result was empty", result.length() > 0);

    }

    計(jì)算過(guò)程中的異常示例?

    接下來(lái)介紹異步操作過(guò)程中的異常情況處理。下面這個(gè)示例中我們會(huì)在字符轉(zhuǎn)換異步請(qǐng)求中刻意延遲 1 秒鐘,然后才會(huì)提交到 ForkJoinPool 里面去執(zhí)行。

    清單 12 示例代碼

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    static void completeExceptionallyExample() {

    ????????CompletableFuture<String>cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,

    ????????CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));

    ????????CompletableFuture<String>exceptionHandler = cf.handle((s, th) -> { return (th != null) ? "message upon cancel" : ""; });

    ????????cf.completeExceptionally(new RuntimeException("completed exceptionally"));

    ????????assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally());

    ????try {

    ????????cf.join();

    ????????fail("Should have thrown an exception");

    ????????} catch(CompletionException ex) { // just for testing

    ????????????assertEquals("completed exceptionally", ex.getCause().getMessage());

    ????}

    ?????assertEquals("message upon cancel", exceptionHandler.join());

    }

    示例代碼中,首先我們創(chuàng)建一個(gè) CompletableFuture(計(jì)算完畢),然后調(diào)用 thenApplyAsync 返回一個(gè)新的 CompletableFuture,接著通過(guò)使用 delayedExecutor(timeout, timeUnit)方法延遲 1 秒鐘執(zhí)行。然后我們創(chuàng)建一個(gè) handler(exceptionHandler),它會(huì)處理異常,返回另一個(gè)字符串"message upon cancel"。接下來(lái)進(jìn)入 join()方法,執(zhí)行大寫轉(zhuǎn)換操作,并且拋出 CompletionException 異常。

    取消計(jì)算任務(wù)

    與前面一個(gè)異常處理的示例類似,我們可以通過(guò)調(diào)用 cancel(boolean mayInterruptIfRunning)方法取消計(jì)算任務(wù)。此外,cancel()方法與 completeExceptionally(new CancellationException())等價(jià)。

    清單 13 示例代碼

    1

    2

    3

    4

    5

    6

    7

    8

    static void cancelExample() {

    ????CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,

    ????CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));

    ????CompletableFuture cf2 = cf.exceptionally(throwable -> "canceled message");

    ????assertTrue("Was not canceled", cf.cancel(true));

    ????assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally());

    ????assertEquals("canceled message", cf2.join());

    }

    一個(gè) CompletableFuture VS 兩個(gè)異步計(jì)算

    我們可以創(chuàng)建一個(gè) CompletableFuture 接收兩個(gè)異步計(jì)算的結(jié)果,下面代碼首先創(chuàng)建了一個(gè) String 對(duì)象,接下來(lái)分別創(chuàng)建了兩個(gè) CompletableFuture 對(duì)象 cf1 和 cf2,cf2 通過(guò)調(diào)用 applyToEither 方法實(shí)現(xiàn)我們的需求。

    清單 14 示例代碼

    1

    2

    3

    4

    5

    6

    7

    8

    9

    static void applyToEitherExample() {

    ????String original = "Message";

    ????CompletableFuture cf1 = CompletableFuture.completedFuture(original)

    ????.thenApplyAsync(s -> delayedUpperCase(s));

    ????CompletableFuture cf2 = cf1.applyToEither(

    ????CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),

    ????s -> s + " from applyToEither");

    ????assertTrue(cf2.join().endsWith(" from applyToEither"));

    }

    如果我們想要使用消費(fèi)者替換清單 14 的方法方式用于處理異步計(jì)算結(jié)果,代碼如清單 15 所示。

    清單 15 示例代碼

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    static void acceptEitherExample() {

    ????String original = "Message";

    ????StringBuilder result = new StringBuilder();

    ????CompletableFuture cf = CompletableFuture.completedFuture(original)

    ????.thenApplyAsync(s -> delayedUpperCase(s))

    ????.acceptEither(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),

    ????s -> result.append(s).append("acceptEither"));

    ????cf.join();

    ????assertTrue("Result was empty", result.toString().endsWith("acceptEither"));

    }

    運(yùn)行兩個(gè)階段后執(zhí)行

    下面這個(gè)示例程序兩個(gè)階段執(zhí)行完畢后返回結(jié)果,首先將字符轉(zhuǎn)為大寫,然后將字符轉(zhuǎn)為小寫,在兩個(gè)計(jì)算階段都結(jié)束之后觸發(fā) CompletableFuture。

    清單 16 示例代碼

    1

    2

    3

    4

    5

    6

    7

    8

    static void runAfterBothExample() {

    ????String original = "Message";

    ????StringBuilder result = new StringBuilder();

    ????CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).runAfterBoth(

    ????CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),

    ????() -> result.append("done"));

    ????assertTrue("Result was empty", result.length() > 0);

    }

    也可以通過(guò)以下方式處理異步計(jì)算結(jié)果,

    清單 17 示例代碼

    1

    2

    3

    4

    5

    6

    7

    8

    static void thenAcceptBothExample() {

    ????String original = "Message";

    ????StringBuilder result = new StringBuilder();

    ????CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).thenAcceptBoth(

    ????CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),

    ????(s1, s2) -> result.append(s1 + s2));

    ????assertEquals("MESSAGEmessage", result.toString());

    }

    整合兩個(gè)計(jì)算結(jié)果

    我們可以通過(guò) thenCombine()方法整合兩個(gè)異步計(jì)算的結(jié)果,注意,以下代碼的整個(gè)程序過(guò)程是同步的,getNow()方法最終會(huì)輸出整合后的結(jié)果,也就是說(shuō)大寫字符和小寫字符的串聯(lián)值。

    清單 18 示例代碼

    1

    2

    3

    4

    5

    6

    7

    static void thenCombineExample() {

    ????String original = "Message";

    ????CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))

    ????.thenCombine(CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)),

    ????(s1, s2) -> s1 + s2);

    ????assertEquals("MESSAGEmessage", cf.getNow(null));

    }

    上面這個(gè)示例是按照同步方式執(zhí)行兩個(gè)方法后再合成字符串,以下代碼采用異步方式同步執(zhí)行兩個(gè)方法,由于異步方式情況下不能夠確定哪一個(gè)方法最終執(zhí)行完畢,所以我們需要調(diào)用 join()方法等待后一個(gè)方法結(jié)束后再合成字符串,這一點(diǎn)和線程的 join()方法是一致的,主線程生成并起動(dòng)了子線程,如果子線程里要進(jìn)行大量的耗時(shí)的運(yùn)算,主線程往往將于子線程之前結(jié)束,但是如果主線程處理完其他的事務(wù)后,需要用到子線程的處理結(jié)果,也就是主線程需要等待子線程執(zhí)行完成之后再結(jié)束,這個(gè)時(shí)候就要用到 join()方法了,即 join()的作用是:"等待該線程終止"。

    清單 19 示例代碼

    1

    2

    3

    4

    5

    6

    7

    8

    static void thenCombineAsyncExample() {

    ????String original = "Message";

    ????CompletableFuture cf = CompletableFuture.completedFuture(original)

    ????.thenApplyAsync(s -> delayedUpperCase(s))

    ????.thenCombine(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),

    ????assertEquals("MESSAGEmessage", cf.join());

    ????(s1, s2) -> s1 + s2);

    }

    除了 thenCombine()方法以外,還有另外一種方法-thenCompose(),這個(gè)方法也會(huì)實(shí)現(xiàn)兩個(gè)方法執(zhí)行后的返回結(jié)果的連接。

    清單 20 示例代碼

    1

    2

    3

    4

    5

    6

    7

    static void thenComposeExample() {

    ????String original = "Message";

    ????CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))

    ????.thenCompose(upper -> CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s))

    ????.thenApply(s -> upper + s));

    ????assertEquals("MESSAGEmessage", cf.join());

    }

    anyOf()方法

    以下代碼模擬了如何在幾個(gè)計(jì)算過(guò)程中任意一個(gè)完成后創(chuàng)建 CompletableFuture,在這個(gè)例子中,我們創(chuàng)建了幾個(gè)計(jì)算過(guò)程,然后轉(zhuǎn)換字符串到大寫字符。由于這些 CompletableFuture 是同步執(zhí)行的(下面這個(gè)例子使用的是 thenApply()方法,而不是 thenApplyAsync()方法),使用 anyOf()方法后返回的任何一個(gè)值都會(huì)立即觸發(fā) CompletableFuture。然后我們使用 whenComplete(BiConsumer<? super Object, ? super Throwable> action)方法處理結(jié)果。

    清單 21 示例代碼

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    static void anyOfExample() {

    ????StringBuilder result = new StringBuilder();

    ????List messages = Arrays.asList("a", "b", "c");

    ????List<CompletableFuture> futures = messages.stream()

    ????.map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))

    ????.collect(Collectors.toList());

    ????CompletableFuture.anyOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((res, th) -> {

    ????????if(th == null) {

    ????????assertTrue(isUpperCase((String) res));

    ????????result.append(res);

    ????}

    });

    ????assertTrue("Result was empty", result.length() > 0);

    }

    當(dāng)所有的 CompletableFuture 完成后創(chuàng)建 CompletableFuture

    清單 22 所示我們會(huì)以同步方式執(zhí)行多個(gè)異步計(jì)算過(guò)程,在所有計(jì)算過(guò)程都完成后,創(chuàng)建一個(gè) CompletableFuture。

    清單 22 示例代碼

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    static void allOfExample() {

    ????StringBuilder result = new StringBuilder();

    ????List messages = Arrays.asList("a", "b", "c");

    ????List<CompletableFuture> futures = messages.stream()

    ????.map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))

    ????.collect(Collectors.toList());

    ????CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((v, th) -> {

    ????????futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));

    ????????result.append("done");

    });

    ????assertTrue("Result was empty", result.length() > 0);

    }

    相較于前一個(gè)同步示例,我們也可以異步執(zhí)行,如清單 23 所示。

    清單 23 示例代碼

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    static void allOfAsyncExample() {

    ????StringBuilder result = new StringBuilder();

    ????List messages = Arrays.asList("a", "b", "c");

    ????List<CompletableFuture> futures = messages.stream()

    ????.map(msg -> CompletableFuture.completedFuture(msg).thenApplyAsync(s -> delayedUpperCase(s)))

    ????.collect(Collectors.toList());

    ????CompletableFuture allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))

    .whenComplete((v, th) -> {

    ????futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));

    ????result.append("done");

    });

    ????allOf.join();

    ????assertTrue("Result was empty", result.length() > 0);

    }

    實(shí)際案例

    以下代碼完成的操作包括:

  • 首先異步地通過(guò)調(diào)用 cars()方法獲取 Car 對(duì)象,返回一個(gè) CompletionStage<List>實(shí)例。Cars()方法可以在內(nèi)部使用調(diào)用遠(yuǎn)端服務(wù)器上的 REST 服務(wù)等類似場(chǎng)景。
  • 然后和其他的 CompletionStage<List>組合,通過(guò)調(diào)用 rating(manufacturerId)方法異步地返回 CompletionStage 實(shí)例。
  • 當(dāng)所有的 Car 對(duì)象都被填充了 rating 后,調(diào)用 allOf()方法獲取最終值。
  • 調(diào)用 whenComplete()方法打印最終的評(píng)分(rating)。
  • 清單 24 示例代碼

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    cars().thenCompose(cars -> {

    ????List<CompletionStage> updatedCars = cars.stream()

    ????.map(car -> rating(car.manufacturerId).thenApply(r -> {

    ????car.setRating(r);

    ????return car;

    ?????})).collect(Collectors.toList());

    ????CompletableFuture done = CompletableFuture

    ????.allOf(updatedCars.toArray(new CompletableFuture[updatedCars.size()]));

    ????return done.thenApply(v -> updatedCars.stream().map(CompletionStage::toCompletableFuture)

    ????.map(CompletableFuture::join).collect(Collectors.toList()));

    ????}).whenComplete((cars, th) -> {

    ????if (th == null) {

    ????cars.forEach(System.out::println);

    ????} else {

    ????throw new RuntimeException(th);

    ????}

    }).toCompletableFuture().join();

    結(jié)束語(yǔ)

    Completable 類為我們提供了豐富的異步計(jì)算調(diào)用方式,我們可以通過(guò)上述基本操作描述及 20 個(gè)示例程序進(jìn)一步了解如果使用 CompletableFuture 類實(shí)現(xiàn)我們的需求,期待 JDK10 會(huì)有持續(xù)更新。

    參考資源

    參考 developerWorks 上的 Java 8 文章,了解更多 Java 8 知識(shí)。

    參考書籍?Java 8 in Action?Raoul-Gabriel Urma

    參考書籍?Mastering Lambdas: Java Programming in a Multicore World?Maurice Naftalin

    參考文章?Java 8 CompletableFutures,這篇文章從基礎(chǔ)介紹了 CompletableFuture 類的使用方式。

    ?

    創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來(lái)咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)

    總結(jié)

    以上是生活随笔為你收集整理的通过实例理解 JDK8 的 CompletableFuture的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 法国极品成人h版 | 精品人妻大屁股白浆无码 | 亚洲熟悉妇女xxx妇女av | 久久露脸国语精品国产91 | 青青草这里只有精品 | 中文字幕 国产 | 免费的理伦片在线播放 | 韩国中文字幕hd久久精品 | 中国毛片基地 | 色妇av | 国产一区二区精彩视频 | 伊人久久久久久久久久 | 欧美亚洲91 | 成人在线激情视频 | 久久久性视频 | 99精品久久久久久中文字幕 | 欧洲成人午夜精品无码区久久 | 国产精品视频在线观看免费 | av动漫免费观看 | 色94色欧美sute亚洲线路二 | 亚洲人成无码www久久久 | 狠狠操在线播放 | 一二三区在线 | av资源部 | 泰坦尼克号3小时49分的观看方法 | 亚洲在线视频一区 | 美乳人妻一区二区三区 | 日韩黄色成人 | 日韩一级久久 | 极品色av影院 | 日韩不卡一区二区三区 | 乐播av一区二区三区 | 欧美三级在线看 | 激情二区| 成人人人人人欧美片做爰 | 色老板av| 国产人与zoxxxx另类 | 伊人成人在线视频 | 波多野结衣毛片 | 日本黄色片免费 | 欧美在线一二三区 | 亚洲精品久久一区二区三区777 | 亚洲第一精品在线观看 | 动漫艳母在线观看 | 久久久免费精品视频 | 在线综合av | 久久久久久久久久国产精品 | 欧美午夜精品一区二区蜜桃 | 麻豆国产精品视频 | 91在线免费视频观看 | 亚洲91网| 精品一区二区三区视频 | 在线观看国产精品视频 | 伊人丁香 | 国产艳妇疯狂做爰视频 | 毛片你懂的 | 久青草国产在线 | 善良的公与媳hd中文字 | 超碰人人在线 | 成人小视频免费看 | 欧美高清hd | 免费视频久久久 | 1769国产精品视频 | 欧美三区 | 国产又粗又长又黄视频 | jizz中文字幕 | 狼人伊人干 | 大黑人交交护士xxxxhd | 欧美久久久久久久久久 | 在线免费观看高清视频 | 熟女人妻aⅴ一区二区三区60路 | www.超碰| 国产福利精品一区 | www.久草.com| 亚洲欧美激情图片 | 三级福利视频 | 成年人理论片 | 日本高清不卡码 | 人人搞人人爱 | 日韩国产精品视频 | 伊人免费视频 | av男人资源| 欧美激情在线免费 | 天堂av亚洲 | 性生交大片免费看狂欲 | 国产一区二区三区黄 | 国产吞精囗交免费视频网站 | 日韩精品免费一区二区三区 | 黄色1级片 | 亚色在线视频 | 日韩精品中文字幕一区二区三区 | av观看网 | 中文字幕在线欧美 | 校园激情av | 51人人看 | 在线看的免费网站 | 色综合色综合色综合 | 最新黄网 | 成人在线免费电影 |