Mono 的创建
目錄
- 前言
- 一、響應(yīng)式數(shù)據(jù)流
- 二、Mono的創(chuàng)建
前言
????????Reactor 是一個(gè)基于 JVM 之上的異步應(yīng)用基礎(chǔ)庫(kù)。為 Java 、Groovy 和其他 JVM 語(yǔ)言提供了構(gòu)建基于事件和數(shù)據(jù)驅(qū)動(dòng)應(yīng)用的抽象庫(kù),可用它構(gòu)建時(shí)效性流式數(shù)據(jù)應(yīng)用,實(shí)現(xiàn)應(yīng)用高效、異步地傳遞消息。
????????Reactor 性能相當(dāng)高,在最新的硬件平臺(tái)上,使用無(wú)堵塞分發(fā)器每秒鐘可處理 1500 萬(wàn)事件。
????????Reactor 中的兩個(gè)重要的概念是Flux 和 Mono。Mono 表示包含 0 或者 1 個(gè)元素的異步序列。Flux 表示包含 0 到 N 個(gè)元素的異步序列。他們包含三種不同類型的消息通知:正常的包含元素的消息、序列結(jié)束的消息和序列出錯(cuò)的消息。
一、響應(yīng)式數(shù)據(jù)流
????????響應(yīng)式數(shù)據(jù)流 作為一種新的數(shù)據(jù)流規(guī)范應(yīng)用于 Java 9 及其后續(xù)版本,旨在提供同/異步數(shù)據(jù)序列流式控制機(jī)制。
1、響應(yīng)式數(shù)據(jù)流接口
org.reactivestreams.Pubslisher:數(shù)據(jù)流發(fā)布者(信號(hào)從 0 到 N,N 可為無(wú)窮)。提供兩個(gè)可選終端事件:錯(cuò)誤和完成。
org.reactivestreams.Subscriber:數(shù)據(jù)流消費(fèi)者(信號(hào)從 0 到 N,N 可為無(wú)窮)。消費(fèi)者初始化過(guò)程中,會(huì)請(qǐng)求生產(chǎn)者當(dāng)前需要訂閱多少數(shù)據(jù)。其他情況,通過(guò)接口回調(diào)與數(shù)據(jù)生產(chǎn)方交互: 下一條(新消息)和狀態(tài)。狀態(tài)包括:完成/錯(cuò)誤,可選。
org.reactivestreams.Subscription:初始化階段將一個(gè)小追蹤器傳遞給訂閱者。它控制著我們準(zhǔn)備好來(lái)消費(fèi)多少數(shù)據(jù),以及我們想要什么時(shí)候停止消費(fèi)(取消)。
org.reactivestreams.Processor:同時(shí)作為發(fā)布者和訂閱者的組件的標(biāo)記。
2、響應(yīng)式數(shù)據(jù)流發(fā)布協(xié)議
訂閱者有兩種方式向發(fā)布者請(qǐng)求數(shù)據(jù):
無(wú)界的:訂閱者只需要調(diào)用 Subscription#request(Long.MAX_VALUE) 即可。
有界的:訂閱者保留數(shù)據(jù)引用,調(diào)用request(long) 方法消費(fèi)。
二、Mono的創(chuàng)建
(1)empty
不包含任何元素,可以發(fā)布結(jié)束消息
@Testpublic void empty(){Mono.empty().subscribe(System.out::println);}(2)just
包含指定元素
@Testpublic void just(){Mono.just("hello mono").subscribe(System.out::println);}(3)justOrEmpty
有元素時(shí)相當(dāng)于just,
沒(méi)有元素時(shí)相當(dāng)于empty,
元素是Optional時(shí),則根據(jù)Optional里是否有值來(lái)創(chuàng)建just或empty。
(4)never
不包含任何元素
@Testpublic void never(){Mono.never().subscribe(System.out::println);}(5)from
@Testpublic void from() {//從 Publisher 生成 MonoMono.from(Mono.just("hello mono")).subscribe(System.out::println);//從 Publisher 生成 Mono,會(huì)對(duì)Flux類型進(jìn)行包裝Mono.fromDirect(Mono.just("hello mono")).subscribe(System.out::println);//從 Supplier 生成 MonoMono.fromSupplier(() -> "hello mono").subscribe(System.out::println);//從 Runnable 生成 MonoMono.fromRunnable(() -> System.out.println("hello mono")).subscribe(System.out::println);//從 Callable 生成 MonoMono.fromCallable(() ->"hello mono").subscribe(System.out::println);//從 CompletableFuture 生成 MonoMono.fromFuture(CompletableFuture.completedFuture("hello mono")).subscribe(System.out::println);//從 CompletionStage 生成 MonoMono.fromCompletionStage(CompletableFuture.completedFuture("hello mono")).subscribe(System.out::println);//從 Supplier<? extends CompletionStage<? extends T> 生成 MonoMono.fromCompletionStage(() -> CompletableFuture.completedFuture("hello mono")).subscribe(System.out::println);}(6)defer
@Testpublic void defer() {//從 Supplier 獲取 monoMono.defer(() -> Mono.just("hello mono")).subscribe(System.out::println);//從 Function<ContextView, ? extends Mono<? extends T>> 獲取 monoMono.deferContextual(view -> Mono.just("hello mono")).subscribe(System.out::println);}(7)delay
@Testpublic void delay() throws InterruptedException {//指定延時(shí)時(shí)間,發(fā)布值是0Mono.delay(Duration.of(5, ChronoUnit.SECONDS)).subscribe(System.out::println);Mono.delay(Duration.of(5, ChronoUnit.SECONDS), Schedulers.parallel()).subscribe(System.out::println);TimeUnit.SECONDS.sleep(15);}(8)error
@Testpublic void error() throws InterruptedException {//包含異常的 monoMono.error(new RuntimeException("出錯(cuò)了")).subscribe(System.out::println);//Supplier 提供包含異常的 monoMono.error(() -> new RuntimeException("出錯(cuò)了")).subscribe(System.out::println);}(9)first
@Testpublic void first() throws InterruptedException {//處理第一個(gè) mono,如果第一個(gè)取消了,處理第二個(gè)...Mono.firstWithSignal(Mono.empty(), Mono.just("mono hello")).subscribe(System.out::println);Mono.firstWithValue(Mono.just("hello mono"), Mono.just("mono hello")).subscribe(System.out::println);Mono.firstWithSignal(List.of(Mono.just("hello mono"), Mono.just("mono hello"))).subscribe(System.out::println);Mono.firstWithValue(List.of(Mono.just("hello mono"), Mono.just("mono hello"))).subscribe(System.out::println);}(10)sequenceEqual
@Testpublic void sequenceEqual() throws InterruptedException {//比較兩個(gè) mono 是都相同Mono.sequenceEqual(Mono.just("hello mono"), Mono.just("hello mono")).subscribe(System.out::println);Mono.sequenceEqual(Mono.just("hello mono"), Mono.just("hello mono"), Object::equals).subscribe(System.out::println);Mono.sequenceEqual(Mono.just("hello mono"), Mono.just("hello mono"), Object::equals, 16).subscribe(System.out::println);}(11)using
@Testpublic void using() throws InterruptedException {//callable返回mono,//function對(duì)mono進(jìn)行操作//consumer執(zhí)行清理操作//eager 為true時(shí),consumer在subscribe之前調(diào)用Mono.using(() -> Mono.just("hello mono"), Function.identity(), t -> System.out.println(t)).subscribe(System.out::println);}(12)when
@Testpublic void when() throws InterruptedException {//執(zhí)行預(yù)設(shè)的操作Mono.when(Mono.just("hello mono").filter(a -> a.equals("hello mono"))).subscribe(System.out::println);}(13)zip
壓縮操作
總結(jié)
- 上一篇: 为win7系统盘减肥
- 下一篇: Objective-C——initial