日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程语言 > java >内容正文

java

1.x到2.x的迁移:可观察与可观察:RxJava FAQ

發(fā)布時(shí)間:2023/12/3 java 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 1.x到2.x的迁移:可观察与可观察:RxJava FAQ 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

標(biāo)題不是錯(cuò)誤。 rx.Observable 1.x的io.reactivex.Observable與2.x的io.reactivex.Observable完全不同。 盲目升級(jí)rx依賴(lài)關(guān)系并重命名項(xiàng)目中的所有導(dǎo)入將進(jìn)行編譯(稍作更改),但不能保證相同的行為。 在項(xiàng)目的早期, Observable in 1.x中沒(méi)有背壓的概念,但后來(lái)包含了背壓。 這到底是什么意思? 假設(shè)我們有一個(gè)流,每1毫秒產(chǎn)生一個(gè)事件,但是處理一個(gè)這樣的項(xiàng)目需要1 。 您會(huì)發(fā)現(xiàn)從長(zhǎng)遠(yuǎn)來(lái)看,它不可能以這種方式工作:

import rx.Observable; //RxJava 1.x import rx.schedulers.Schedulers;Observable.interval(1, MILLISECONDS).observeOn(Schedulers.computation()).subscribe(x -> sleep(Duration.ofSeconds(1)));

MissingBackpressureException在幾百毫秒內(nèi)MissingBackpressureException 。 但是這個(gè)異常是什么意思呢? 好吧,基本上,這是一個(gè)安全網(wǎng)(或如果有的話(huà),請(qǐng)進(jìn)行健全性檢查),以防止損害應(yīng)用程序。 RxJava自動(dòng)發(fā)現(xiàn)生產(chǎn)者溢出了消費(fèi)者,并主動(dòng)終止了流以避免進(jìn)一步的損害。 那么,如果我們只是在這里和那里搜索并替換少量進(jìn)口商品,該怎么辦?

import io.reactivex.Observable; //RxJava 2.x import io.reactivex.schedulers.Schedulers;Observable.interval(1, MILLISECONDS).observeOn(Schedulers.computation()).subscribe(x -> sleep(Duration.ofSeconds(1)));

例外不見(jiàn)了! 我們的吞吐量也是如此……一段時(shí)間后,應(yīng)用程序停止運(yùn)行,一直處于無(wú)休止的GC循環(huán)中。 您會(huì)看到,RxJava 1.x中的Observable在各處都有斷言(綁定隊(duì)列,檢查等),確保您沒(méi)有在任何地方溢出。 例如,默認(rèn)情況下,1.x中的observeOn()運(yùn)算符的隊(duì)列限制為128個(gè)元素。 當(dāng)在整個(gè)堆棧上正確實(shí)現(xiàn)背壓時(shí), observeOn()運(yùn)算符會(huì)要求上游傳遞不超過(guò)128個(gè)元素來(lái)填充其內(nèi)部緩沖區(qū)。 然后,與此調(diào)度程序分開(kāi)的線程(工作人員)從該隊(duì)列中拾取事件。 當(dāng)隊(duì)列幾乎變空時(shí), observeOn()運(yùn)算符會(huì)要求更多( request()方法)。 當(dāng)生產(chǎn)者不遵守背壓請(qǐng)求并發(fā)送比允許的更多的數(shù)據(jù)時(shí),此機(jī)制就會(huì)破裂,從而使消耗者有效溢出。 observeOn()運(yùn)算符內(nèi)的內(nèi)部隊(duì)列已滿(mǎn),而interval()運(yùn)算符仍在發(fā)出新事件-因?yàn)檫@就是interval()要做的事情。

在1.x中Observable到的發(fā)現(xiàn)了這種溢出,并通過(guò)MissingBackpressureException快速失敗。 字面上的意思是: 我盡了最大努力使系統(tǒng)保持健康狀態(tài),但是我的上游不尊重背壓-缺少背壓實(shí)現(xiàn) 。 但是,在2.x中Observable到的沒(méi)有這種安全機(jī)制。 希望你會(huì)成為一個(gè)好公民,生產(chǎn)緩慢或消費(fèi)快速的人。 當(dāng)系統(tǒng)運(yùn)行Observable ,兩個(gè)Observable的行為相同。 但是,在1.x負(fù)載下,快速失敗,而2.x則緩慢而痛苦地失敗。

這是否意味著RxJava 2.x向后退? 恰恰相反! 在2.x中,有一個(gè)重要的區(qū)別:

  • Observable不在乎背壓,這極大地簡(jiǎn)化了其設(shè)計(jì)和實(shí)現(xiàn)。 根據(jù)定義,它應(yīng)用于建模不支持背壓的流,例如用戶(hù)界面事件
  • Flowable確實(shí)支持背壓,并已采取所有安全措施。 換句話(huà)說(shuō),計(jì)算管道中的所有步驟都確保您不會(huì)溢出使用者。

2.x在可以支持背壓的流(簡(jiǎn)單地說(shuō)“ 如果需要可以減慢 ”)和不支持背壓的流之間進(jìn)行了重要區(qū)分。 從類(lèi)型系統(tǒng)的角度來(lái)看,很清楚,我們正在處理哪種源及其保證。 那么我們應(yīng)該如何將interval()示例遷移到RxJava 2.x? 比您想像的容易:

Flowable.interval(1, MILLISECONDS).observeOn(Schedulers.computation()).subscribe(x -> sleep(Duration.ofSeconds(1)));

這么簡(jiǎn)單。 您可能會(huì)問(wèn)自己一個(gè)問(wèn)題,為什么Flowable可以有一個(gè)interval()運(yùn)算符,按照定義,它不能支持背壓? 假設(shè)所有interval()均以恒定速率傳遞事件后,它就不會(huì)放慢速度! 好吧,如果您看一下interval()的聲明,您會(huì)注意到:

@BackpressureSupport(BackpressureKind.ERROR)

簡(jiǎn)而言之,這告訴我們,每當(dāng)無(wú)法再保證背壓時(shí),RxJava都會(huì)處理它并拋出MissingBackpressureException 。 這正是我們運(yùn)行Flowable.interval()程序時(shí)發(fā)生的事情–它快速失敗,而不是破壞整個(gè)應(yīng)用程序的穩(wěn)定性。

因此,總結(jié)起來(lái),每當(dāng)您從1.x看到Observable時(shí),您可能想要的是從2.x開(kāi)始的Flowable 。 至少,除非您的流按定義不支持背壓。 盡管名稱(chēng)相同,但這兩個(gè)主要版本中的Observable很大不同。 但是,一旦您進(jìn)行搜索并從Observable 替換為Flowable您將注意到遷移并不是那么簡(jiǎn)單。 這與API更改無(wú)關(guān),區(qū)別更加深刻。

在2.x中沒(méi)有直接等效于Observable.create()簡(jiǎn)單Flowable.create() 。 過(guò)去我過(guò)度使用Observable.create()工廠方法是一個(gè)錯(cuò)誤。 create()允許您以任意速率發(fā)出事件,而完全忽略了背壓。 2.x擁有一些友好的功能來(lái)處理背壓請(qǐng)求,但它們需要仔細(xì)設(shè)計(jì)流。 這將在下一個(gè)常見(jiàn)問(wèn)題解答中介紹。

翻譯自: https://www.javacodegeeks.com/2017/08/1-x-2-x-migration-observable-vs-observable-rxjava-faq.html

總結(jié)

以上是生活随笔為你收集整理的1.x到2.x的迁移:可观察与可观察:RxJava FAQ的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。