Mono入门应用
前言
Flux 和 Mono 是 Reactor 中的兩個基本概念。Flux 表示的是包含 0 到 N 個元素的異步序列。在該序列中可以包含三種不同類型的消息通知:正常的包含元素的消息、序列結束的消息和序列出錯的消息。當消息通知產生時,訂閱者中對應的方法onNext(), onComplete()和 onError()會被調用。Mono 表示的是包含 0 或者 1 個元素的異步序列。該序列中同樣可以包含與 Flux 相同的三種類型的消息通知。Flux 和 Mono 之間可以進行轉換。對一個 Flux 序列進行計數操作,得到的結果是一個 Mono<Long>對象。把兩個 Mono 序列合并在一起,得到的是一個 Flux 對象。相對Flux,Mono可能稍微簡單一點,本章將從Mono的創建入手,開始Reactor之旅。
本章概要 1、通過Mono靜態方法創建:
靜態方法示例 1、empty():創建一個不包含任何元素,只發布結束消息的序列: Mono.empty().subscribe(System.out::println);
2、just():可以指定序列中包含的全部元素。創建出來的 Mono序列在發布這些元素之后會自動結束。 Mono.just("foo").subscribe(System.out::println);
3、justOrEmpty():從一個 Optional 對象或可能為 null 的對象中創建 Mono。只有 Optional 對象中包含值或對象不為 null 時,Mono 序列才產生對應的元素。 Mono.justOrEmpty(null).subscribe(System.out::println); Mono.justOrEmpty("justOrEmpty1").subscribe(System.out::println); Mono.justOrEmpty(Optional.of("justOrEmpty2")).subscribe(System.out::println);
4、error(Throwable error):創建一個只包含錯誤消息的序列。 Mono.error(newRuntimeException("error")).subscribe(System.out::println, System.err::println);
5、never():創建一個不包含任何消息通知的序列。 Mono.never().subscribe(System.out::println);
6、fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier():分別從 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中創建 Mono。 //通過fromRunnable創建,并實現異常處理 Mono.fromRunnable(() -> { System.out.println("thread run"); throw new RuntimeException("thread run error"); }).subscribe(System.out::println, System.err::println);//通過fromCallable創建 Mono.fromCallable(() -> "callable run ").subscribe(System.out::println);//通過fromSupplier創建 Mono.fromSupplier(() -> "create from supplier").subscribe(System.out::println); 7、delay(Duration duration)和 delayMillis(long duration):創建一個 Mono 序列,在指定的延遲時間之后,產生數字 0 作為唯一值。 longstart = System.currentTimeMillis(); Disposable disposable = Mono.delay(Duration.ofSeconds(2)).subscribe(n -> { System.out.println("生產數據源:"+ n); System.out.println("當前線程ID:"+ Thread.currentThread().getId() + ",生產到消費耗時:"+ (System.currentTimeMillis() - start)); }); System.out.println("主線程"+ Thread.currentThread().getId() + "耗時:"+ (System.currentTimeMillis() - start)); while(!disposable.isDisposed()) { } Note: 通過以上測試打印,可以發現消費延遲2S后發生在異步線程,通過Disposable獲取消費狀態,阻塞主線程至消費完成。
動態方法示例 1、通過 create()方法來使用 MonoSink 來創建 Mono。 Mono.create(sink -> sink.success("create MonoSink")).subscribe(System.out::println);
總結:以上僅僅是創建Mono實例的部分案例,還有更多方法會在后續進行逐步展開應用。
本章概要 1、通過Mono靜態方法創建:
- empty():創建一個不包含任何元素,只發布結束消息的序列。
- just():可以指定序列中包含的全部元素。創建出來的 Mono序列在發布這些元素之后會自動結束。
- justOrEmpty():從一個 Optional 對象或可能為 null 的對象中創建 Mono。只有 Optional 對象中包含值或對象不為 null 時,Mono 序列才產生對應的元素。
- error(Throwable error):創建一個只包含錯誤消息的序列。
- never():創建一個不包含任何消息通知的序列。
- fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier():分別從 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中創建 Mono。
- delay(Duration duration)和 delayMillis(long duration):創建一個 Mono 序列,在指定的延遲時間之后,產生數字 0 作為唯一值。
- 通過 create()方法來使用 MonoSink 來創建 Mono。
靜態方法示例 1、empty():創建一個不包含任何元素,只發布結束消息的序列: Mono.empty().subscribe(System.out::println);
2、just():可以指定序列中包含的全部元素。創建出來的 Mono序列在發布這些元素之后會自動結束。 Mono.just("foo").subscribe(System.out::println);
3、justOrEmpty():從一個 Optional 對象或可能為 null 的對象中創建 Mono。只有 Optional 對象中包含值或對象不為 null 時,Mono 序列才產生對應的元素。 Mono.justOrEmpty(null).subscribe(System.out::println); Mono.justOrEmpty("justOrEmpty1").subscribe(System.out::println); Mono.justOrEmpty(Optional.of("justOrEmpty2")).subscribe(System.out::println);
4、error(Throwable error):創建一個只包含錯誤消息的序列。 Mono.error(newRuntimeException("error")).subscribe(System.out::println, System.err::println);
5、never():創建一個不包含任何消息通知的序列。 Mono.never().subscribe(System.out::println);
6、fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier():分別從 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中創建 Mono。 //通過fromRunnable創建,并實現異常處理 Mono.fromRunnable(() -> { System.out.println("thread run"); throw new RuntimeException("thread run error"); }).subscribe(System.out::println, System.err::println);//通過fromCallable創建 Mono.fromCallable(() -> "callable run ").subscribe(System.out::println);//通過fromSupplier創建 Mono.fromSupplier(() -> "create from supplier").subscribe(System.out::println); 7、delay(Duration duration)和 delayMillis(long duration):創建一個 Mono 序列,在指定的延遲時間之后,產生數字 0 作為唯一值。 longstart = System.currentTimeMillis(); Disposable disposable = Mono.delay(Duration.ofSeconds(2)).subscribe(n -> { System.out.println("生產數據源:"+ n); System.out.println("當前線程ID:"+ Thread.currentThread().getId() + ",生產到消費耗時:"+ (System.currentTimeMillis() - start)); }); System.out.println("主線程"+ Thread.currentThread().getId() + "耗時:"+ (System.currentTimeMillis() - start)); while(!disposable.isDisposed()) { } Note: 通過以上測試打印,可以發現消費延遲2S后發生在異步線程,通過Disposable獲取消費狀態,阻塞主線程至消費完成。
動態方法示例 1、通過 create()方法來使用 MonoSink 來創建 Mono。 Mono.create(sink -> sink.success("create MonoSink")).subscribe(System.out::println);
總結:以上僅僅是創建Mono實例的部分案例,還有更多方法會在后續進行逐步展開應用。
總結
- 上一篇: mysql cluster 分片_MyS
- 下一篇: java web批量下载