日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

反应式框架Reactor中的Mono和Flux

發(fā)布時間:2023/12/18 编程问答 46 豆豆
生活随笔 收集整理的這篇文章主要介紹了 反应式框架Reactor中的Mono和Flux 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Reactive基于事件驅(qū)動(事件模式或者說訂閱者模式),類似于Netty異步事件編程模型,對不同的事件做不同的處理。所有信息都通過一個編程模型處理,就像水在管道里面運動一樣(這里把事件比作水流)

  • 響應(yīng)流必須是無阻塞的。
  • 響應(yīng)流必須是一個數(shù)據(jù)流。
  • 它必須可以異步執(zhí)行。
  • 并且它也應(yīng)該能夠處理背壓。

背壓是反應(yīng)流中的一個重要概念,可以理解為,生產(chǎn)者可以感受到消費者反饋的消費壓力,并根據(jù)壓力進行動態(tài)調(diào)整生產(chǎn)速率。

反應(yīng)式編程框架主要采用了觀察者模式,而Spring Reactor的核心則是對觀察者模式的一種衍伸。關(guān)于觀察者模式的架構(gòu)中被觀察者(Observable)和觀察者(Subscriber)處在不同的線程環(huán)境中時,由于者各自的工作量不一樣,導(dǎo)致它們產(chǎn)生事件和處理事件的速度不一樣,這時就出現(xiàn)了兩種情況:

  • 被觀察者產(chǎn)生事件慢一些,觀察者處理事件很快。那么觀察者就會等著被觀察者發(fā)送事件好比觀察者在等米下鍋,程序等待)。
  • 被觀察者產(chǎn)生事件的速度很快,而觀察者處理很慢。那就出問題了,如果不作處理的話,事件會堆積起來,最終擠爆你的內(nèi)存,導(dǎo)致程序崩潰。(好比被觀察者生產(chǎn)的大米沒人吃,堆積最后就會爛掉)。為了方便下面理解Mono和Flux,也可以理解為Publisher(發(fā)布者也可以理解為被觀察者)主動推送數(shù)據(jù)給Subscriber(訂閱者也可以叫觀察者),如果Publisher發(fā)布消息太快,超過了Subscriber的處理速度,如何處理。這時就出現(xiàn)了Backpressure(背壓—–指在異步場景中,被觀察者發(fā)送事件速度遠快于觀察者的處理速度的情況下,一種告訴上游的被觀察者降低發(fā)送速度的策略

在傳統(tǒng)的編程范式中,我們一般通過迭代器(Iterator)模式來遍歷一個序列。這種遍歷方式是由調(diào)用者來控制節(jié)奏的,采用的是拉的方式。每次由調(diào)用者通過next()方法來獲取序列中的下一個值。

使用反應(yīng)式流時采用的則是推的方式,即常見的發(fā)布者-訂閱者模式。當(dāng)發(fā)布者有新的數(shù)據(jù)產(chǎn)生時,這些數(shù)據(jù)會被推送到訂閱者來進行處理。在反應(yīng)式流上可以添加各種不同的操作來對數(shù)據(jù)進行處理,形成數(shù)據(jù)處理鏈。這個以聲明式的方式添加的處理鏈只在訂閱者進行訂閱操作時才會真正執(zhí)行。

反應(yīng)式編程來源于數(shù)據(jù)流和變化的傳播,意味著由底層的執(zhí)行模型負責(zé)通過數(shù)據(jù)流來自動傳播變化。比如求值一個簡單的表達式 c=a+b,當(dāng) a 或者 b 的值發(fā)生變化時,傳統(tǒng)的編程范式需要對 a+b 進行重新計算來得到 c 的值。如果使用反應(yīng)式編程,當(dāng) a 或者 b 的值發(fā)生變化時,c 的值會自動更新。

  • Reactive Streams 是規(guī)范,
  • Reactor 實現(xiàn)了 Reactive Streams
  • Web Flux Reactor 為基礎(chǔ),實現(xiàn) Web 領(lǐng)域的反應(yīng)式編程框架

關(guān)于MonoFlux

在Reactor中,經(jīng)常使用的類并不多,主要有以下兩個:

Mono 實現(xiàn)了 org.reactivestreams.Publisher 接口,代表0到1個元素的發(fā)布者(Publisher)。

Flux 同樣實現(xiàn)了 org.reactivestreams.Publisher 接口,代表0到N個元素的發(fā)布者(Subscriber)。

Flux 和 Mono 是 Reactor 中的兩個基本概念。MonoFlux都是Publisher(發(fā)布者)。

Flux 表示的是包含 0 到 N 個元素的異步序列。在該序列中可以包含三種不同類型的消息通知:正常的包含元素的消息、序列結(jié)束的消息和序列出錯的消息。當(dāng)消息通知產(chǎn)生時,訂閱者中對應(yīng)的方法 onNext(), onComplete()和 onError()會被調(diào)用。

Mono 表示的是包含 0 或者 1 個元素的異步序列。該序列中同樣可以包含與 Flux 相同的三種類型的消息通知。Flux 和 Mono 之間可以進行轉(zhuǎn)換。對一個 Flux 序列進行計數(shù)操作,得到的結(jié)果是一個 Mono<Long>對象。把兩個 Mono 序列合并在一起,得到的是一個 Flux 對象。

Publisher

由于響應(yīng)流的特點,我們不能再返回一個簡單的POJO對象來表示結(jié)果了。必須返回一個類似Java中的Future的概念,在有結(jié)果可用時通知消費者進行消費響應(yīng)。

Reactive Stream規(guī)范中這種被定義為Publisher<T>?Publisher<T>是一個可以提供0-N個序列元素的提供者,并根據(jù)其訂閱者Subscriber<? super T>的需求推送元素。一個Publisher<T>可以支持多個訂閱者,并可以根據(jù)訂閱者的邏輯進行推送序列元素。

響應(yīng)式的一個重要特點:當(dāng)沒有訂閱時發(fā)布者什么也不做

區(qū)分響應(yīng)式API的類型:從返回的類型我們就可以知道一個方法會“發(fā)射并忘記”或“請求并等待”(Mono),還是在處理一個包含多個數(shù)據(jù)項的流(Flux)。Flux和Mono的一些操作利用了這個特點在這兩種類型間互相轉(zhuǎn)換。例如,調(diào)用Flux<T>的single()方法將返回一個Mono<T>,而使用concatWith()方法把兩個Mono串在一起就可以得到一個Flux。

Flux

flux可以觸發(fā)零到多個事件,并根據(jù)實際情況結(jié)束處理或觸發(fā)錯誤。

Flux?是一個發(fā)出(emit)0-N個元素組成的異步序列的Publisher<T>,可以被onComplete信號或者onError信號所終止。在響應(yīng)流規(guī)范中存在三種給下游消費者調(diào)用的方法?onNext,?onComplete, onError

創(chuàng)建

第一種方式是通過 Flux 類中的靜態(tài)方法。

just():可以指定序列中包含的全部元素。創(chuàng)建出來的 Flux 序列在發(fā)布這些元素之后會自動結(jié)束。

fromArray(),fromIterable()和 fromStream():可以從一個數(shù)組、Iterable 對象或 Stream 對象中創(chuàng)建 Flux 對象。

empty():創(chuàng)建一個不包含任何元素,只發(fā)布結(jié)束消息的序列。

never():創(chuàng)建一個不包含任何消息通知的序列。

range(int start, int count):創(chuàng)建包含從 start 起始的 count 個數(shù)量的 Integer 對象的序列。

interval(Duration period)和 interval(Duration delay, Duration period):創(chuàng)建一個包含了從 0 開始遞增的 Long 對象的序列。其中包含的元素按照指定的間隔來發(fā)布。除了間隔時間之外,還可以指定起始元素發(fā)布之前的延遲時間。

intervalMillis(long period)和 intervalMillis(long delay, long period):與 interval()方法的作用相同,只不過該方法通過毫秒數(shù)來指定時間間隔和延遲時間。

例1. 通過 Flux 類的靜態(tài)方法創(chuàng)建 Flux 序列

Flux.just("Hello", "World").subscribe(System.out::println);Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);Flux.empty().subscribe(System.out::println);Flux.range(1, 10).subscribe(System.out::println);Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);Flux.intervalMillis(1000).subscribe(System.out::println);

上面的這些靜態(tài)方法適合于簡單的序列生成,當(dāng)序列的生成需要復(fù)雜的邏輯時,則應(yīng)該使用 generate() create() 方法。

Flux.create((t) -> {t.next("create");t.next("create1");t.complete();}).subscribe(System.out::println);Flux.generate(sink -> {sink.next("Hello");sink.complete(); }).subscribe(System.out::println);final Random random = new Random(); Flux.generate(ArrayList::new, (list, sink) -> {int value = random.nextInt(100);list.add(value);sink.next(value);if (list.size() == 10) {sink.complete();}return list; }).subscribe(System.out::println);

create()方法與 generate()方法的不同之處在于所使用的是 FluxSink 對象。FluxSink 支持同步和異步的消息產(chǎn)生,并且可以在一次調(diào)用中產(chǎn)生多個元素。

Flux.create(sink -> {for (int i = 0; i < 10; i++) {sink.next(i);}sink.complete(); }).subscribe(System.out::println);

Mono

?多只觸發(fā)一個事件,可以把Mono<Void>用于在異步任務(wù)完成時發(fā)出通知?

Mono?是一個發(fā)出(emit)0-1個元素的Publisher<T>,可以被onComplete信號或者onError信號所終止。

Mono.fromSupplier(() -> "Hello").subscribe(System.out::println); Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println); Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);//empty():創(chuàng)建一個不包含任何元素,只發(fā)布結(jié)束消息的序列 Mono.empty().subscribe(System.out::println); #允許消費者在有結(jié)果可用時進行消費//just():可以指定序列中包含的全部元素。創(chuàng)建出來的 Mono序列在發(fā)布這些元素之后會自動結(jié)束。 Mono.just("www.xttblog.com").subscribe(System.out::println);//ustOrEmpty():從一個 Optional 對象或可能為 null 的對象中創(chuàng)建 Mono。 //只有 Optional 對象中包含值或?qū)ο蟛粸?null 時,Mono 序列才產(chǎn)生對應(yīng)的元素。 Mono.justOrEmpty(null).subscribe(System.out::println); Mono.justOrEmpty("業(yè)余草").subscribe(System.out::println); Mono.justOrEmpty(Optional.of("業(yè)余草")).subscribe(System.out::println);//error(Throwable error):創(chuàng)建一個只包含錯誤消息的序列。 Mono.error(new RuntimeException("error")).subscribe(System.out::println, System.err::println);//never():創(chuàng)建一個不包含任何消息通知的序列。 Mono.never().subscribe(System.out::println);//通過 create()方法來使用 MonoSink 來創(chuàng)建 Mono。 Mono.create(sink -> sink.success("業(yè)余草")).subscribe(System.out::println); //通過fromRunnable創(chuàng)建,并實現(xiàn)異常處理 Mono.fromRunnable(() -> {System.out.println("thread run"); throw new RuntimeException("thread run error"); }).subscribe(System.out::println, System.err::println);//通過fromCallable創(chuàng)建 Mono.fromCallable(() -> "callable run ").subscribe(System.out::println);//通過fromSupplier創(chuàng)建 Mono.fromSupplier(() -> "create from supplier").subscribe(System.out::println);//delay(Duration duration)和 delayMillis(long duration):創(chuàng)建一個 Mono 序列,在指定的延遲時間之后,產(chǎn)生數(shù)字 0 作為唯一值。 long start = System.currentTimeMillis(); Disposable disposable = Mono.delay(Duration.ofSeconds(2)).subscribe(n -> {System.out.println("生產(chǎn)數(shù)據(jù)源:"+ n);System.out.println("當(dāng)前線程ID:"+ Thread.currentThread().getId() + ",生產(chǎn)到消費耗時:"+ (System.currentTimeMillis() - start)); }); System.out.println("主線程"+ Thread.currentThread().getId() + "耗時:"+ (System.currentTimeMillis() - start)); while(!disposable.isDisposed()) { }

?

總結(jié)

以上是生活随笔為你收集整理的反应式框架Reactor中的Mono和Flux的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 久久免费国产精品 | www在线视频| 可以看的av网址 | 一炮成瘾1v1高h | 欧美精品一区二区三区四区 | 看个毛片 | 99黄色网 | 99久久婷婷国产综合精品草原 | 日韩av网址在线观看 | 久久久久久久久影院 | 精品熟女一区二区三区 | 欧美久久影院 | 男人av影院 | 最新国产网站 | 蜜桃视频久久一区免费观看入口 | 国产视频一区二区三区在线播放 | 美女大bxxxxn内射 | www.香蕉视频| 日本打屁股网站 | 欧美午夜剧场 | 9i免费看片黄 | 亚洲一区二区三区四区av | 日韩网站视频 | 淫羞阁av导航 | 免费激情小视频 | www毛片| 性久久久久久久 | 黄色激情四射 | 男欢女爱久石 | 中文字幕永久在线播放 | 亚洲综合色网站 | 国产视频综合 | 亚洲风情亚aⅴ在线发布 | 日韩综合在线 | 好吊色视频在线观看 | 人妖性生活视频 | 成人一二三区 | 色天天色综合 | 亚洲一二三av | 久久久96| 女女互磨互喷水高潮les呻吟 | av免费播放| 黄黄的视频在线观看 | 老司机午夜免费福利 | 操碰人人 | 中文无码日韩欧 | 久久只有这里有精品 | 毛片久久久久久久 | 国产日韩欧美电影 | 精品久久久久久久久久久久久久久 | 精品久久久在线观看 | 亚洲精品成人在线 | 亚洲日日操 | www.狠狠干 | 日韩一区二区三区不卡视频 | 国产精品亚洲а∨天堂免在线 | 久久久久99精品成人片 | аⅴ天堂中文在线网 | 超碰精品在线观看 | 欧洲国产精品 | 日本成人精品 | 久草视频免费在线播放 | 九九热视频在线观看 | 神马久久春色 | 亚洲小视频在线 | 2018国产大陆天天弄 | 免费黄色激情视频 | 任你躁av一区二区三区 | 丰满少妇高潮在线观看 | 国产午夜免费 | 日本a在线天堂 | 大香蕉视频一区二区 | av漫画在线观看 | 女人又爽又黄免费女仆 | 国产淫视 | 日本一区二区黄色 | 国产第一精品视频 | 国产女18毛片多18精品 | 91九色精品 | 成人国产免费观看 | 成年人激情网 | 欧美专区日韩专区 | 香蕉黄色片 | 日日射天天干 | 日女人免费视频 | 韩国av一区 | 亚洲超碰av| av片免费 | 国产一久久 | 中国一级黄色大片 | 毛片大片 | 18成人免费观看网站 | 日本高清不卡一区 | 成人午夜福利一区二区 | 91在线综合 | 久久夜色精品国产欧美乱 | 婷婷欧美 | 欧美bdsm调教视频 | 嫩草视频在线免费观看 |