日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Mono 的执行流程

發布時間:2023/12/18 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Mono 的执行流程 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

  • 前言
  • 一、示例
  • 二、流程
    • 1、構建數據發布者
    • 2、構建數據訂閱者
    • 3、建立訂閱關系
    • 4、請求數據
    • 5、發布數據
    • 6、發布完成


前言

本文主要同時簡單的示例來分析一下Mono在發布訂閱過程中的執行流程。

一、示例

@Testpublic void executeProcessTest() {Mono.just("hello mono").filter(v -> v != null).map(v -> v + " map").defaultIfEmpty("default value").subscribe(System.out::println);}

二、流程

1、構建數據發布者

(1)Mono.just(“hello mono”)

返回 MonoJust,包裝值

public static <T> Mono<T> just(T data) {return onAssembly(new MonoJust<>(data));}MonoJust(T value) {this.value = Objects.requireNonNull(value, "value");}

(2)filter

返回 MonoFilterFuseable ,包裝 MonoJust 和 predicate

public final Mono<T> filter(final Predicate<? super T> tester) {if (this instanceof Fuseable) {return onAssembly(new MonoFilterFuseable<>(this, tester));}return onAssembly(new MonoFilter<>(this, tester));}MonoFilterFuseable(Mono<? extends T> source, Predicate<? super T> predicate) {super(source);this.predicate = Objects.requireNonNull(predicate, "predicate");}

(3)map

返回 MonoMapFuseable 包裝 MonoFilterFuseable 和 mapper

public final <R> Mono<R> map(Function<? super T, ? extends R> mapper) {if (this instanceof Fuseable) {return onAssembly(new MonoMapFuseable<>(this, mapper));}return onAssembly(new MonoMap<>(this, mapper));}MonoMapFuseable(Mono<? extends T> source, Function<? super T, ? extends R> mapper) {super(source);this.mapper = Objects.requireNonNull(mapper, "mapper");}

(4)defaultIfEmpty

返回MonoDefaultIfEmpty,包裝 MonoMapFuseable 和 defaultValue

public final Mono<T> defaultIfEmpty(T defaultV) {if (this instanceof Fuseable.ScalarCallable) {try {T v = block();if (v == null) {return Mono.just(defaultV);}}catch (Throwable e) {//leave MonoError returns as this}return this;}return onAssembly(new MonoDefaultIfEmpty<>(this, defaultV));}MonoDefaultIfEmpty(Mono<? extends T> source, T defaultValue) {super(source);this.defaultValue = Objects.requireNonNull(defaultValue, "defaultValue");}

數據發布者的發布流程:

數據 -> MonoJust -> MonoFilterFuseable -> MonoMapFuseable -> MonoDefaultIfEmpty

2、構建數據訂閱者

從示例中的 subscribe() 開始

(1) subscribe()

傳入 consumer 消費者

public final Disposable subscribe(Consumer<? super T> consumer) {Objects.requireNonNull(consumer, "consumer");return subscribe(consumer, null, null);}public final Disposable subscribe(@Nullable Consumer<? super T> consumer,@Nullable Consumer<? super Throwable> errorConsumer,@Nullable Runnable completeConsumer) {return subscribe(consumer, errorConsumer, completeConsumer, (Context) null);}public final Disposable subscribe(@Nullable Consumer<? super T> consumer,@Nullable Consumer<? super Throwable> errorConsumer,@Nullable Runnable completeConsumer,@Nullable Context initialContext) {return subscribeWith(new LambdaMonoSubscriber<>(consumer, errorConsumer,completeConsumer, null, initialContext));}

創建 LambdaMonoSubscriber 對象,包裝最終的消費者consumer

(2)subscribeWith()

public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) {subscribe(subscriber);return subscriber;} public final void subscribe(Subscriber<? super T> actual) {//最后一層發布者,這里是 MonoDefaultIfEmptyCorePublisher publisher = Operators.onLastAssembly(this);//最后一層訂閱者,這里是 LambdaMonoSubscriber CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);//發布者與訂閱者建立聯系try {if (publisher instanceof OptimizableOperator) {OptimizableOperator operator = (OptimizableOperator) publisher;while (true) {subscriber = operator.subscribeOrReturn(subscriber);if (subscriber == null) {// null means "I will subscribe myself", returning...return;}OptimizableOperator newSource = operator.nextOptimizableSource();if (newSource == null) {publisher = operator.source();break;}operator = newSource;}}//發布者發布數據給訂閱者publisher.subscribe(subscriber);}catch (Throwable e) {Operators.reportThrowInSubscribe(subscriber, e);return;}}

(3)發布者與訂閱者建立聯系的過程

核心方法:

subscriber = operator.subscribeOrReturn(subscriber);

a). operator 是 MonoDefaultIfEmpty,subscriber 是 LambdaMonoSubscriber

返回 DefaultIfEmptySubscriber,作為 LambdaMonoSubscriber 的發布者

@Overridepublic CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {return new FluxDefaultIfEmpty.DefaultIfEmptySubscriber<>(actual, defaultValue);}

b). operator 是 MonoMapFuseable ,subscriber 是 DefaultIfEmptySubscriber

返回 MapFuseableSubscriber,作為 DefaultIfEmptySubscriber 的發布者

public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super R> actual) {if (actual instanceof ConditionalSubscriber) {ConditionalSubscriber<? super R> cs = (ConditionalSubscriber<? super R>) actual;return new FluxMapFuseable.MapFuseableConditionalSubscriber<>(cs, mapper);}return new FluxMapFuseable.MapFuseableSubscriber<>(actual, mapper);}

c).operator 是 MonoFilterFuseable ,subscriber 是 MapFuseableSubscriber

返回 FilterFuseableSubscriber,作為 MapFuseableSubscriber 的發布者

public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {if (actual instanceof ConditionalSubscriber) {return new FluxFilterFuseable.FilterFuseableConditionalSubscriber<>((ConditionalSubscriber<? super T>) actual, predicate);}return new FluxFilterFuseable.FilterFuseableSubscriber<>(actual, predicate);}

此時發布者與訂閱者關系:

FilterFuseableSubscriber -> MapFuseableSubscriber -> DefaultIfEmptySubscriber -> LambdaMonoSubscriber -> consumer

3、建立訂閱關系

publisher.subscribe(subscriber);

此時publisher 是 MonoJust, subscriber 是 FilterFuseableSubscriber
創建 scalarSubscription ,包裝 FilterFuseableSubscriber

@Overridepublic void subscribe(CoreSubscriber<? super T> actual) {actual.onSubscribe(Operators.scalarSubscription(actual, value));}

根據發布訂閱關系依次調用訂閱者的 onSubscribe() 建立訂閱關系

FilterFuseableSubscriber -> MapFuseableSubscriber -> DefaultIfEmptySubscriber -> LambdaMonoSubscriber

進入 LambdaMonoSubscriber 的 onSubscribe()

@Overridepublic final void onSubscribe(Subscription s) {if (Operators.validate(subscription, s)) {this.subscription = s;if (subscriptionConsumer != null) {try {subscriptionConsumer.accept(s);}catch (Throwable t) {Exceptions.throwIfFatal(t);s.cancel();onError(t);}}else {//請求數據s.request(Long.MAX_VALUE);}}}

4、請求數據

通過訂閱關系調用 request() 請求數據,

s.request(Long.MAX_VALUE);

即根據下面的關系鏈反向請求數據

FilterFuseableSubscriber -> MapFuseableSubscriber -> DefaultIfEmptySubscriber -> LambdaMonoSubscriber

最終到了 Operators 類中

@Overridepublic void request(long n) {if (validate(n)) {if (ONCE.compareAndSet(this, 0, 1)) {Subscriber<? super T> a = actual;//發布數據a.onNext(value);if(once != 2) {//發布完成a.onComplete();}}}}

5、發布數據

從 FilterFuseableSubscriber 開始調用 onNext() 發布數據,根據依次發布給各自的訂閱者,最終數據到了最后一個訂閱者 LambdaMonoSubscriber

LambdaMonoSubscriber.java@Overridepublic final void onNext(T x) {Subscription s = S.getAndSet(this, Operators.cancelledSubscription());if (s == Operators.cancelledSubscription()) {Operators.onNextDropped(x, this.initialContext);return;}if (consumer != null) {try {//最終調用 consumer 消費數據consumer.accept(x);}catch (Throwable t) {Exceptions.throwIfFatal(t);s.cancel();doError(t);}}if (completeConsumer != null) {try {completeConsumer.run();}catch (Throwable t) {Operators.onErrorDropped(t, this.initialContext);}}}

6、發布完成

在數據發布依次到消費者消費后,進入第4步中的 a.onComplete();

依次調用各自的訂閱者調用 onComplete()。

總結

以上是生活随笔為你收集整理的Mono 的执行流程的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。