gitlab10.x迁移_1.x到2.x的迁移:可观察与可观察:RxJava FAQ
gitlab10.x遷移
標(biāo)題不是錯誤。 rx.Observable 1.x的io.reactivex.Observable與2.x的io.reactivex.Observable完全不同。 盲目升級rx依賴關(guān)系并重命名項(xiàng)目中的所有導(dǎo)入將進(jìn)行編譯(稍作更改),但不能保證相同的行為。 在項(xiàng)目的早期, Observable in 1.x中沒有背壓的概念,但后來包含了背壓。 它實(shí)際上是什么意思? 假設(shè)我們有一個流,每1毫秒產(chǎn)生一個事件,但是處理一個這樣的項(xiàng)目需要1 秒 。 您會發(fā)現(xiàn)從長遠(yuǎn)來看,它不可能以這種方式工作:
import rx.Observable; //RxJava 1.x import rx.schedulers.Schedulers;Observable.interval(1, MILLISECONDS).observeOn(Schedulers.computation()).subscribe(x -> sleep(Duration.ofSeconds(1)));MissingBackpressureException在幾百毫秒內(nèi)MissingBackpressureException 。 但是這個異常是什么意思呢? 好吧,基本上,這是一個安全網(wǎng)(或如果有的話,請進(jìn)行健全性檢查),以防止損害應(yīng)用程序。 RxJava會自動發(fā)現(xiàn)生產(chǎn)者溢出了消費(fèi)者,并主動終止了流以避免進(jìn)一步的損害。 那么,如果我們只是在這里和那里搜索并替換少量進(jìn)口商品,該怎么辦?
import io.reactivex.Observable; //RxJava 2.x import io.reactivex.schedulers.Schedulers;Observable.interval(1, MILLISECONDS).observeOn(Schedulers.computation()).subscribe(x -> sleep(Duration.ofSeconds(1)));例外不見了! 我們的吞吐量也是如此……一段時間后,應(yīng)用程序停滯不前,一直處于無休止的GC循環(huán)中。 您會看到,RxJava 1.x中的Observable在各處都有斷言(綁定隊(duì)列,檢查等),確保您沒有在任何地方溢出。 例如,默認(rèn)情況下,1.x中的observeOn()運(yùn)算符的隊(duì)列限制為128個元素。 當(dāng)在整個堆棧上正確實(shí)現(xiàn)了背壓時, observeOn()運(yùn)算符會要求上游傳遞不超過128個元素來填充其內(nèi)部緩沖區(qū)。 然后,與此調(diào)度程序分開的線程(工作人員)從該隊(duì)列中拾取事件。 當(dāng)隊(duì)列幾乎變空時, observeOn()運(yùn)算符會要求( request()方法)提供更多信息。 當(dāng)生產(chǎn)者不遵守背壓請求并發(fā)送比允許的更多的數(shù)據(jù)時,此機(jī)制就會破裂,從而有效地消耗了消費(fèi)者。 observeOn()運(yùn)算符內(nèi)的內(nèi)部隊(duì)列已滿,但interval()運(yùn)算符仍在發(fā)出新事件-因?yàn)檫@是interval()要做的事情。
在1.x中Observable到的發(fā)現(xiàn)了這種溢出,并通過MissingBackpressureException快速失敗。 字面上的意思是: 我盡了最大努力使系統(tǒng)保持健康狀態(tài),但是我的上游不尊重背壓-缺少背壓實(shí)現(xiàn) 。 但是,2.x中的Observable沒有這樣的安全機(jī)制。 希望你會成為一個好公民,生產(chǎn)緩慢或消費(fèi)快速的人,這是一種香草。 當(dāng)系統(tǒng)運(yùn)行Observable ,兩個Observable的行為相同。 但是,在1.x負(fù)載下,快速失敗,而2.x則緩慢而痛苦地失敗。
這是否意味著RxJava 2.x向后退? 恰恰相反! 在2.x中,有一個重要的區(qū)別:
- Observable不在乎背壓,這極大地簡化了其設(shè)計(jì)和實(shí)現(xiàn)。 根據(jù)定義,它應(yīng)用于建模不支持背壓的流,例如用戶界面事件
- Flowable確實(shí)支持背壓,并已采取所有安全措施。 換句話說,計(jì)算管道中的所有步驟都確保您不會溢出使用者。
2.x在可以支持背壓的流(簡單地說“ 如果需要,可以減慢 ”)和不支持背壓的流之間進(jìn)行了重要區(qū)分。 從類型系統(tǒng)的角度來看,很清楚,我們正在處理哪種源及其保證。 那么,我們應(yīng)該如何將我們的interval()示例遷移到RxJava 2.x? 比您想像的容易:
Flowable.interval(1, MILLISECONDS).observeOn(Schedulers.computation()).subscribe(x -> sleep(Duration.ofSeconds(1)));這么簡單。 您可能會問自己一個問題,為什么Flowable可以具有按定義不能支持反壓的interval()運(yùn)算符? 假設(shè)所有interval()均以恒定速率傳遞事件后,它就不會減速! 好吧,如果看一下interval()的聲明,您會注意到:
@BackpressureSupport(BackpressureKind.ERROR)簡而言之,這告訴我們,無論何時不再可以保證背壓,RxJava都會照顧好它并拋出MissingBackpressureException 。 這正是我們運(yùn)行Flowable.interval()程序時發(fā)生的事情–它快速失敗,而不是破壞整個應(yīng)用程序的穩(wěn)定性。
因此,總結(jié)起來,每當(dāng)您從1.x看到Observable時,您可能想要的是2.x的Flowable 。 至少,除非您的流按定義不支持背壓。 盡管名稱相同,但是這兩個主要版本中的Observable很大不同。 但是,一旦您進(jìn)行了搜索并從Observable 替換為Flowable您會注意到遷移并不是那么簡單。 這與API更改無關(guān),區(qū)別更加深刻。
沒有簡單的Flowable.create()直接與2.x中的Observable.create()等效。 我自己過去在過度使用Observable.create()工廠方法時犯了一個錯誤。 create()允許您以任意速率發(fā)出事件,完全忽略背壓。 2.x具有一些友好的功能來處理背壓請求,但它們需要仔細(xì)設(shè)計(jì)流。 這將在下一個常見問題解答中介紹。
翻譯自: https://www.javacodegeeks.com/2017/08/1-x-2-x-migration-observable-vs-observable-rxjava-faq.html
gitlab10.x遷移
總結(jié)
以上是生活随笔為你收集整理的gitlab10.x迁移_1.x到2.x的迁移:可观察与可观察:RxJava FAQ的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spring health_为什么Spr
- 下一篇: java 函数式编程 示例_功能Java