惯用并发:flatMap()与parallel()– RxJava常见问题解答
簡(jiǎn)單,有效和安全的并發(fā)是RxJava的設(shè)計(jì)原則之一。 然而,具有諷刺意味的是,它可能是該庫(kù)中最容易被誤解的方面之一。 讓我們舉一個(gè)簡(jiǎn)單的例子:假設(shè)我們有一堆UUID并且對(duì)于每個(gè)UUID ,我們必須執(zhí)行一組任務(wù)。 第一個(gè)問(wèn)題是每個(gè)UUID都要執(zhí)行I / O密集型操作,例如,從數(shù)據(jù)庫(kù)加載對(duì)象:
Flowable<UUID> ids = Flowable.fromCallable(UUID::randomUUID).repeat().take(100);ids.subscribe(id -> slowLoadBy(id));首先,為了測(cè)試,我將生成100個(gè)隨機(jī)UUID。 然后,對(duì)于每個(gè)UUID,我想使用以下方法加載記錄:
Person slowLoadBy(UUID id) {//... }slowLoadBy()的實(shí)現(xiàn)是無(wú)關(guān)緊要的,請(qǐng)記住它是緩慢且阻塞的。 使用subscribe()調(diào)用slowLoadBy()有許多缺點(diǎn):
- subscribe()根據(jù)設(shè)計(jì)是單線程的,無(wú)法解決。 每個(gè)UUID順序加載
- 當(dāng)您調(diào)用subscribe() ,無(wú)法進(jìn)一步轉(zhuǎn)換Person對(duì)象。 這是終端操作
一種更健壯,甚至更殘破的方法是map()每個(gè)UUID :
Flowable<Person> people = ids.map(id -> slowLoadBy(id)); //BROKEN這是非常可讀的,但不幸的是損壞了。 就像訂閱者一樣,運(yùn)算符也是單線程的。 這意味著在任何給定時(shí)間只能映射一個(gè)UUID ,此處也不允許并發(fā)。 更糟糕的是,我們從上游繼承線程/工作者。 這有幾個(gè)缺點(diǎn)。 如果上游使用某些專用的調(diào)度程序產(chǎn)生事件,我們將劫持該調(diào)度程序中的線程。 例如,許多操作符(例如interval() Schedulers.computation()透明地使用Schedulers.computation()線程池。 我們突然開始在完全不適合該池的池上執(zhí)行I / O密集型操作。 此外,我們通過(guò)這一阻塞性順序步驟降低了整個(gè)管道的速度。 非常非常糟糕。
您可能已經(jīng)聽說(shuō)過(guò)這個(gè)subscribeOn()運(yùn)算符,以及它如何啟用并發(fā)。 確實(shí),但是在應(yīng)用它時(shí)必須非常小心。 以下示例(再次)是錯(cuò)誤的 :
import io.reactivex.schedulers.Schedulers;Flowable<Person> people = ids.subscribeOn(Schedulers.io()).map(id -> slowLoadBy(id)); //BROKEN上面的代碼段仍然損壞。 observeOn() subscribeOn() (以及該對(duì)象的observeOn() )幾乎不會(huì)將執(zhí)行切換到其他工作程序(線程),而不會(huì)引入任何并發(fā)性。 流仍然按順序處理所有事件,但是在不同的線程上。 換句話說(shuō),我們現(xiàn)在不是在從上游繼承的線程上順序使用事件,而是在io()線程上順序使用事件。 那么,這個(gè)神話般的flatMap()運(yùn)算符呢?
flatMap()運(yùn)算符可以解救
flatMap()運(yùn)算符通過(guò)將事件流分成子流來(lái)啟用并發(fā)。 但首先,還有一個(gè)破碎的示例:
Flowable<Person> asyncLoadBy(UUID id) {return Flowable.fromCallable(() -> slowLoadBy(id)); }Flowable<Person> people = ids.subscribeOn(Schedulers.io()).flatMap(id -> asyncLoadBy(id)); //BROKEN哦,天哪,這還是壞了 ! flatMap()運(yùn)算符在邏輯上做兩件事:
- 在每個(gè)上游事件上應(yīng)用轉(zhuǎn)換( id -> asyncLoadBy(id) )–這將產(chǎn)生Flowable<Flowable<Person>> 。 這是有道理的,對(duì)于每個(gè)上游UUID我們都有一個(gè)Flowable<Person>因此最終得到的是Person對(duì)象流
- 然后flatMap()嘗試一次訂閱所有這些內(nèi)部子流。 每當(dāng)任何子流發(fā)出Person事件時(shí),它將作為外部Flowable的結(jié)果透明地傳遞。
從技術(shù)上講, flatMap()僅創(chuàng)建和預(yù)訂前128個(gè)(默認(rèn)情況下,可選的maxConcurrency參數(shù))子流。 同樣,當(dāng)最后一個(gè)子流完成時(shí), Person外部流也將完成。 現(xiàn)在,這到底為什么被打破? 除非明確要求,否則RxJava不會(huì)引入任何線程池。 例如,這段代碼仍在阻塞:
log.info("Setup"); Flowable<String> blocking = Flowable.fromCallable(() -> {log.info("Starting");TimeUnit.SECONDS.sleep(1);log.info("Done");return "Hello, world!";}); log.info("Created"); blocking.subscribe(s -> log.info("Received {}", s)); log.info("Done");仔細(xì)查看輸出,特別是有關(guān)事件和線程的順序:
19:57:28.847 | INFO | main | Setup 19:57:28.943 | INFO | main | Created 19:57:28.949 | INFO | main | Starting 19:57:29.954 | INFO | main | Done 19:57:29.955 | INFO | main | Received Hello, world! 19:57:29.957 | INFO | main | Done沒(méi)有任何并發(fā)??,沒(méi)有額外的線程。 僅將阻塞代碼包裝在Flowable中不會(huì)神奇地增加并發(fā)性。 您必須顯式使用… subscribeOn() :
log.info("Setup"); Flowable<String> blocking = Flowable.fromCallable(() -> {log.info("Starting");TimeUnit.SECONDS.sleep(1);log.info("Done");return "Hello, world!";}).subscribeOn(Schedulers.io()); log.info("Created"); blocking.subscribe(s -> log.info("Received {}", s)); log.info("Done");這次的輸出更有希望:
19:59:10.547 | INFO | main | Setup 19:59:10.653 | INFO | main | Created 19:59:10.662 | INFO | main | Done 19:59:10.664 | INFO | RxCachedThreadScheduler-1 | Starting 19:59:11.668 | INFO | RxCachedThreadScheduler-1 | Done 19:59:11.669 | INFO | RxCachedThreadScheduler-1 | Received Hello, world!但是我們上次確實(shí)使用了subscribeOn() ,這是怎么回事? 嗯,外部流級(jí)別的subscribeOn()基本上說(shuō)所有事件都應(yīng)在此流中的不同線程上順序處理。 我們并不是說(shuō)應(yīng)該同時(shí)運(yùn)行許多子流。 并且由于所有子流都處于阻塞狀態(tài),因此當(dāng)RxJava嘗試訂閱所有子流時(shí),它會(huì)有效地依次依次訂閱。 asyncLoadBy()并不是真正的async ,因此,當(dāng)flatMap()運(yùn)算符嘗試對(duì)其進(jìn)行訂閱時(shí),它會(huì)阻塞。 修復(fù)很容易。 通常,您會(huì)將subscribeOn()放在asyncLoadBy()但出于教育目的,我將其直接放置在asyncLoadBy()道中:
Flowable<Person> people = ids.flatMap(id -> asyncLoadBy(id).subscribeOn(Schedulers.io()));現(xiàn)在它就像一種魅力! 默認(rèn)情況下,RxJava將接收前128個(gè)上游事件( UUID ),將它們轉(zhuǎn)換為子流并訂閱所有這些事件。 如果子流是異步且高度可并行化的(例如,網(wǎng)絡(luò)調(diào)用), asyncLoadBy()獲得128個(gè)并發(fā)調(diào)用asyncLoadBy() 。 并發(fā)級(jí)別(128)可通過(guò)maxConcurrency參數(shù)配置:
Flowable<Person> people = ids.flatMap(id ->asyncLoadBy(id).subscribeOn(Schedulers.io()),10 //maxConcurrency);那是很多工作,您不覺(jué)得嗎? 并發(fā)性不應(yīng)該更具聲明性嗎? 我們不再處理Executor和期貨,但似乎這種方法太容易出錯(cuò)。 它難道不像Java 8流中的parallel()一樣簡(jiǎn)單?
輸入ParallelFlowable
讓我們首先來(lái)看一下我們的示例,并通過(guò)添加filter()使它更加復(fù)雜:
Flowable<Person> people = ids.map(this::slowLoadBy) //BROKEN.filter(this::hasLowRisk); //BROKENhasLowRisk()是慢速謂詞:
boolean hasLowRisk(Person p) {//slow... }我們已經(jīng)知道解決此問(wèn)題的慣用方法是使用flatMap()兩次:
Flowable<Person> people = ids.flatMap(id -> asyncLoadBy(id).subscribeOn(io())).flatMap(p -> asyncHasLowRisk(p).subscribeOn(io()));asyncHasLowRisk()相當(dāng)模糊-謂詞通過(guò)時(shí)返回單元素流,失敗則返回空流。 這是使用flatMap()模擬filter() flatMap() 。 我們可以做得更好嗎? 從RxJava 2.0.5開始,有一個(gè)新的運(yùn)算符叫做… parallel() ! 令人驚訝的是,由于許多誤解和濫用,在RxJava成為1.0之前已刪除了同名的運(yùn)算符。 2.x中的parallel()似乎最終以一種安全和聲明性的方式解決了慣用并發(fā)問(wèn)題。 首先,讓我們看一些漂亮的代碼!
Flowable<Person> people = ids.parallel(10).runOn(Schedulers.io()).map(this::slowLoadBy).filter(this::hasLowRisk).sequential();就這樣! parallel()和sequential()之間的代碼塊parallel()運(yùn)行。 我們有什么在這里? 首先,新的parallel()運(yùn)算符將Flowable<UUID>轉(zhuǎn)換為ParallelFlowable<UUID> ,該API的API比Flowable小得多。 您將在第二秒看到原因。 可選的int參數(shù)(在我們的示例中為10 )定義并發(fā)性,或者(如文檔所述)定義創(chuàng)建多少個(gè)并發(fā)的“ rails”。 因此,對(duì)于我們來(lái)說(shuō),我們將單個(gè)Flowable<Person>分成10個(gè)并發(fā)的獨(dú)立軌道(認(rèn)為是thread )。 來(lái)自UUID原始流的事件被拆分( modulo 10 )為不同的軌,子流彼此獨(dú)立。 將它們視為將上游事件發(fā)送到10個(gè)單獨(dú)的線程中。 但是首先,我們必須使用方便的runOn()運(yùn)算符定義這些線程的來(lái)源。 這比Java 8流上的parallel()好得多,在Java 8流上,您無(wú)法控制并發(fā)級(jí)別。
至此,我們有了一個(gè)ParallelFlowable 。 當(dāng)事件出現(xiàn)在上游( UUID )中時(shí),它將委派給10個(gè)“軌道”,并發(fā),獨(dú)立的管道之一。 管道提供了可以安全地并行運(yùn)行的運(yùn)算符的有限子集,例如map()和filter() ,還包括reduce() 。 沒(méi)有buffer() , take()等,因?yàn)橐淮卧诙鄠€(gè)子流上調(diào)用它們的語(yǔ)義尚不清楚。 我們的阻塞slowLoadBy()和hasLowRisk()仍按順序調(diào)用,但僅在單個(gè)“ rail”中。 因?yàn)槲覀儸F(xiàn)在有10個(gè)并發(fā)的“ rails”,所以我們無(wú)需花費(fèi)太多精力就可以有效地并行化它們。
當(dāng)事件到達(dá)子流(“ rail”)的末尾時(shí),它們會(huì)遇到sequential()運(yùn)算符。 該運(yùn)算符將ParallelFlowable回Flowable 。 只要我們的映射器和過(guò)濾器是線程安全的, parallel() / sequential()對(duì)就提供了非常簡(jiǎn)單的并行化流的方法。 一個(gè)小警告-您將不可避免地使郵件重新排序。 順序map()和filter()始終保留順序(就像大多數(shù)運(yùn)算符一樣)。 但是,一旦在parallel()塊中運(yùn)行它們,順序就會(huì)丟失。 這允許更大的并發(fā)性,但是您必須牢記這一點(diǎn)。
您是否應(yīng)該使用parallel()而不是嵌套的flatMap()來(lái)并行化代碼? 這取決于您,但是parallel()似乎更容易閱讀和掌握。
翻譯自: https://www.javacodegeeks.com/2017/09/idiomatic-concurrency-flatmap-vs-parallel-rxjava-faq.html
總結(jié)
以上是生活随笔為你收集整理的惯用并发:flatMap()与parallel()– RxJava常见问题解答的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: linux安装库文件(linux安装库)
- 下一篇: Java命令行界面(第19部分):jCl