日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

flowable背压 取消_使用Flowable.generate()生成可感知背压的流– RxJava常见问题解答...

發布時間:2023/12/3 java 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 flowable背压 取消_使用Flowable.generate()生成可感知背压的流– RxJava常见问题解答... 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

flowable背壓 取消

RxJava缺少創建無限自然數流的工廠。 這樣的流很有用,例如,當您想通過壓縮兩個事件的順序來為可能的無限事件流分配唯一的序列號時:

Flowable<Long> naturalNumbers = //???Flowable<Event> someInfiniteEventStream = //... Flowable<Pair<Long, Event>> sequenced = Flowable.zip(naturalNumbers,someInfiniteEventStream,Pair::of );

實現naturalNumbers令人驚訝地復雜。 在RxJava 1.x中,您可以短暫地放棄不遵守反壓的Observable :

import rx.Observable; //RxJava 1.xObservable<Long> naturalNumbers = Observable.create(subscriber -> {long state = 0;//poor solution :-(while (!subscriber.isUnsubscribed()) {subscriber.onNext(state++);} });

這樣的流沒有背壓是什么意思? 好吧,基本上,流可以輕松地以CPU內核允許的速度生成事件(不斷增加的state變量),每秒數百萬。 但是,當使用者無法如此Swift地使用事件時,未處理事件的積壓開始出現:

naturalNumbers // .observeOn(Schedulers.io()).subscribe(x -> {//slooow, 1 millisecond});

上面的程序(帶有observeOn()運算符的注釋掉)可以正常運行,因為它具有意外的反壓。 默認情況下,所有內容在RxJava中都是單線程的,因此生產者和使用者在同一個線程中工作。 實際上,調用subscriber.onNext()會阻止,因此while循環會自動對其進行限制。 但是,嘗試取消注釋observeOn() ,災難會在幾毫秒后發生。 訂閱回調在設計上是單線程的。 對于每個元素,它至少需要1毫秒,因此該流每秒可以處理不超過1000個事件。 我們有些幸運。 RxJava快速發現這種災難性狀況,并因MissingBackpressureException而快速失敗

我們最大的錯誤是生產事件,而沒有考慮消費者的速度。 順便說一下,這是響應流背后的核心思想:不允許生產者發出比消費者請求更多的事件。 在RxJava 1.x中,即使實現最簡單的流(從頭開始考慮背壓)也不是一件容易的事。 RxJava 2.x帶來了幾個方便的運算符,這些運算符建立在以前版本的經驗基礎之上。 首先RxJava 2.x時不允許你實現Flowable (背壓-aware)的相同的方式,你可以與Observable 。 創建Flowable會使消費者使消息過載是不可能的:

Flowable<Long> naturalNumbers = Flowable.create(subscriber -> {long state = 0;while (!subscriber.isCancelled()) {subscriber.onNext(state++);} }, BackpressureStrategy.DROP);

您是否發現了這個額外的DROP參數? 在解釋之前,讓我們看一下使用慢速用戶訂閱時的輸出:

0 1 2 3 //...continuous numbers... 126 127 101811682 //...where did my 100M events go?!? 101811683 101811684 101811685 //...continuous numbers... 101811776 //...17M events disappeared again... 101811777 //...

你的旅費可能會改變。 怎么了? observeOn()運算符在調度程序(線程池)之間切換。 從未決事件隊列中合并的線程池。 該隊列是有限的,容量為128個元素。 知道此限制的observeOn()運算符僅從上游請求128個元素(我們的自定義Flowable )。 此時,它使我們的訂戶可以處理事件,每毫秒1次。 因此,大約100毫秒后, observeOn()發現其內部隊列幾乎為空,并要求更多。 會得到128、129、130…嗎? 沒有! 我們的Flowable在這0.1秒內產生了瘋狂的事件,并且(令人驚訝地)在該時間段內產生了超過1億個數字。 他們去哪了 好吧, observeOn()并沒有要求它們,因此DROP策略(強制性參數)只是丟棄了不需要的事件。

BackpressureStrategy

聽起來不對,還有其他策略嗎? 是的,很多:

  • BackpressureStrategy.BUFFER :如果上游產生太多事件,則會將它們緩沖在無界隊列中。 沒有任何事件丟失,但是您的整個應用程序很可能會丟失。 如果幸運的話, OutOfMemoryError將拯救您。 我被困在5秒以上的長時間GC暫停中。
  • BackpressureStrategy.ERROR :如果發現事件的過度產生,將拋出MissingBackpressureException 。 這是一個理智(且安全)的策略。
  • BackpressureStrategy.LATEST :類似于DROP ,但是記住上次刪除的事件。 以防萬一需要更多數據,但我們只是刪除了所有內容–至少具有最后看到的價值。
  • BackpressureStrategy.MISSING :沒有安全措施,請加以處理。 下游運算符之一(如observeOn() )最有可能拋出MissingBackpressureException 。
  • BackpressureStrategy.DROP :刪除未請求的事件。

順便說一句,當您將Observable變為Flowable還必須提供BackpressureStrategy 。 RxJava必須知道如何限制過量產生的Observable 。 好的,那么簡單的序列自然數流的正確實現是什么?

認識

create()和generate()之間的區別在于責任。 假設Flowable.create()會在不考慮背壓的情況下完整地生成流。 它只是在需要時才產生事件。 另一方面,僅允許Flowable.generate()一次生成一個事件(或完成流)。 背壓機制透明地計算出當前需要多少個事件。 generate()調用適當的次數,例如,在observeOn()情況下, observeOn() 128次。

由于此運算符一次生成一個事件,因此通常需要某種狀態來確定上次出現的時間1 。 這就是generate()含義:(im)可變狀態的持有者和一個基于該狀態生成下一個事件的函數:

Flowable<Long> naturalNumbers =Flowable.generate(() -> 0L, (state, emitter) -> {emitter.onNext(state);return state + 1;});

generate()的第一個參數是初始狀態(工廠),在本例中為0L 。 現在,每當訂戶或任何下游運營商請求一些事件時,都會調用lambda表達式。 它的責任是根據提供的狀態以某種方式最多調用一次onNext() (最多發出一個事件)。 首次調用lambda時, state等于初始值0L 。 但是,我們可以修改狀態并返回其新值。 在此示例中,我們增加了long以便后續lambda表達式的調用接收到state = 1L 。 顯然,這種情況不斷發生,產生連續的自然數。

這樣的編程模型顯然比while循環難。 它還從根本上改變了實現事件源的方式。 與其在任何時候都想推送事件,不如只是被動地等待請求。 下游運營商和訂戶正在從您的流中提取數據。 這種轉變可在管道的所有級別上產生背壓。

generate()有一些風格。 首先,如果您的狀態是可變對象,則可以使用不需要返回新狀態值的重載版本。 盡管功能較少,但可變狀態往往會產生較少的垃圾。 這假設您的狀態不斷變化,并且每次都傳遞相同的狀態對象實例。 例如,您可以輕松地將Iterator (也是基于pull的!)變成具有反壓奇觀的流:

Iterator<Integer> iter = //...Flowable<String> strings = Flowable.generate(() -> iter, (iterator, emitter) -> {if (iterator.hasNext()) {emitter.onNext(iterator.next().toString());} else {emitter.onComplete();} });

請注意,流的類型( <String> )不必與狀態類型( Iterator<Integer> )相同。 當然,如果您有Java Collection并想將其轉換為流,則不必先創建迭代器。 使用Flowable.fromIterable()足夠了。 甚至更簡單的generate()版本都假定您根本沒有任何狀態。 例如隨機數流:

Flowable<Double> randoms = Flowable.generate(emitter -> emitter.onNext(Math.random()));

但老實說,您可能最終將需要一個Random實例:

Flowable.generate(Random::new, (random, emitter) -> {emitter.onNext(random.nextBoolean()); });

摘要

如您所見,RxJava 1.x中的Observable.create()和Flowable.create Flowable.create()有一些缺點。 如果您真的在乎大量并發系統的可伸縮性和運行狀況(否則您將不會閱讀本文!),則必須了解背壓。 如果您真的需要從頭開始創建流,而不是使用from*()系列方法或執行繁重工作的各種庫,請熟悉generate() 。 本質上,您必須學習如何將某些類型的數據源建模為奇特的迭代器。 可能會有更多文章解釋如何實現更多現實生活流。

這類似于無狀態HTTP協議,該協議在服務器上使用稱為會話*的小塊狀態來跟蹤過去的請求。

翻譯自: https://www.javacodegeeks.com/2017/08/generating-backpressure-aware-streams-flowable-generate-rxjava-faq.html

flowable背壓 取消

總結

以上是生活随笔為你收集整理的flowable背压 取消_使用Flowable.generate()生成可感知背压的流– RxJava常见问题解答...的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。