日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

rx.observable_使用Java 8 CompletableFuture和Rx-Java Observable

發布時間:2023/12/3 java 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 rx.observable_使用Java 8 CompletableFuture和Rx-Java Observable 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

rx.observable

我想探索一個使用Java 8 CompletableFuture和Rx-Java Observable的簡單分散聚集場景。

場景很簡單–產生大約10個任務,每個任務返回一個字符串,最終將結果收集到一個列表中。

順序的

其順序版本如下:

public void testSequentialScatterGather() throws Exception {List<String> list =IntStream.range(0, 10).boxed().map(this::generateTask).collect(Collectors.toList());logger.info(list.toString()); }private String generateTask(int i) {Util.delay(2000);return i + "-" + "test"; }

隨著CompletableFuture

可以使用稱為supplyAsync的實用程序方法來使方法返回CompletableFuture,我正在使用此方法的一種變體,它接受要使用的顯式Executor ,而且我故意為其中一個輸入拋出異常:

private CompletableFuture<String> generateTask(int i,ExecutorService executorService) {return CompletableFuture.supplyAsync(() -> {Util.delay(2000);if (i == 5) {throw new RuntimeException("Run, it is a 5!");}return i + "-" + "test";}, executorService); }

現在分散任務:

List<CompletableFuture<String>> futures =IntStream.range(0, 10).boxed().map(i -> this.generateTask(i, executors).exceptionally(t -> t.getMessage())).collect(Collectors.toList());

在分散任務結束時,結果是CompletableFuture列表。 現在,要從中獲取String列表有些棘手,這里我使用Stackoverflow中建議的一種解決方案:

CompletableFuture<List<String>> result = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));

CompletableFuture.allOf方法在這里純粹用于組成下一步操作,一旦所有分散的任務都完成,則一旦完成任務,期貨就會再次流式傳輸并收集到字符串列表中。

然后可以異步顯示最終結果:

result.thenAccept(l -> {logger.info(l.toString()); });

使用Rx-java Observable

使用Rx-java進行分散收集比使用CompletableFuture版本相對更清潔,因為Rx-java提供了更好的方法將結果組合在一起,這也是執行分散任務的方法:

private Observable<String> generateTask(int i, ExecutorService executorService) {return Observable.<String>create(s -> {Util.delay(2000);if ( i == 5) {throw new RuntimeException("Run, it is a 5!");}s.onNext( i + "-test");s.onCompleted();}).onErrorReturn(e -> e.getMessage()).subscribeOn(Schedulers.from(executorService)); }

并分散任務:

List<Observable<String>> obs =IntStream.range(0, 10).boxed().map(i -> generateTask(i, executors)).collect(Collectors.toList());

我又有了一個Observable的列表,而我需要的是一個結果列表,Observable提供了一個合并方法來做到這一點:

Observable<List<String>> merged = Observable.merge(obs).toList();

可以訂閱并在可用時打印結果:

merged.subscribe(l -> logger.info(l.toString()));

翻譯自: https://www.javacodegeeks.com/2015/08/using-java-8-completablefuture-and-rx-java-observable.html

rx.observable

總結

以上是生活随笔為你收集整理的rx.observable_使用Java 8 CompletableFuture和Rx-Java Observable的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。