flatMap()和事件顺序– RxJava常见问题解答
正如我們已經發現的, flatMap()不會保留原始流的順序。 讓我們使用上一篇文章的GeoNames API示例進行說明 :
public interface GeoNames {Flowable<Long> populationOf(String city);}通過使用flatMap()請求多個城市的人口,我們不能保證它們會按順序到達:
Flowable<String> cities = Flowable.just("Warsaw", "Paris", "London", "Madrid");cities.flatMap(geoNames::populationOf).subscribe(response -> log.info("Population: {}", response));輸出有些令人驚訝:
17:09:49.838 | Rx-3 | --> GET .../searchJSON?q=London http/1.1 17:09:49.838 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1 17:09:49.838 | Rx-4 | --> GET .../searchJSON?q=Madrid http/1.1 17:09:49.838 | Rx-2 | --> GET .../searchJSON?q=Paris http/1.1 17:09:49.939 | Rx-4 | <-- 200 OK .../searchJSON?q=Madrid (98ms) 17:09:49.939 | Rx-3 | <-- 200 OK .../searchJSON?q=London (98ms) 17:09:49.956 | Rx-3 | Population: 7556900 17:09:49.958 | Rx-3 | Population: 3255944 17:09:51.099 | Rx-2 | <-- 200 OK .../searchJSON?q=Paris (1258ms) 17:09:51.100 | Rx-1 | <-- 200 OK .../searchJSON?q=Warsaw (1259ms) 17:09:51.100 | Rx-2 | Population: 2138551 17:09:51.100 | Rx-2 | Population: 1702139一段時間后,我們收到馬德里和倫敦的回復,隨后又被訂戶收到。 首先是7556900(倫敦人口)和3255944(馬德里)。不久之后,巴黎和華沙也到達了。 一方面,很好的是,我們可以在每個人口到達時立即進行處理。 這使系統看起來響應更快。 但是我們失去了一些東西。 輸入流是"Warsaw" , "Paris" , "London" , "Madrid"而結果流包含"London" , "Madrid" , "Paris" , "Warsaw"人口。 我們如何分辨哪個數字代表哪個城市?
顯然,以下解決方案是完全錯誤的 ,但在實際的代碼庫中并非聞所未聞:
Flowable<Long> populations = cities.flatMap(geoNames::populationOf); cities.zipWith(populations, Pair::of).subscribe(response -> log.info("Population: {}", response));它編譯,運行,甚至產生一些結果。 不幸的是,這些結果是完全錯誤的:
17:20:03.778 | Rx-2 | --> GET .../searchJSON?q=Paris http/1.1 17:20:03.778 | Rx-3 | --> GET .../searchJSON?q=London http/1.1 17:20:03.778 | Rx-4 | --> GET .../searchJSON?q=Madrid http/1.1 17:20:03.778 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1 17:20:03.953 | Rx-4 | <-- 200 OK .../searchJSON?q=Madrid (172ms) 17:20:03.959 | Rx-2 | <-- 200 OK .../searchJSON?q=Paris (179ms) 17:20:03.975 | Rx-2 | Population: (Warsaw,2138551) 17:20:03.976 | Rx-2 | Population: (Paris,3255944) 17:20:03.988 | Rx-3 | <-- 200 OK .../searchJSON?q=London (207ms) 17:20:03.988 | Rx-3 | Population: (London,7556900) 17:20:04.080 | Rx-1 | <-- 200 OK .../searchJSON?q=Warsaw (299ms) 17:20:04.080 | Rx-1 | Population: (Madrid,1702139)我們將城市與人口的隨機排列組合在一起。 更糟的是,在嘗試了十幾次之后,我設法得到了錯誤的結果。 由于某種原因,該程序大多數時候都在我的機器上運行。 您可以想象的最糟糕的錯誤。
flatMap()的問題在于它會釋放原始請求。 想象一下一個異步系統,在該系統上您收到某種隊列的響應,但不知道請求是什么。 一個明顯的解決方案是以某種方式將某種關聯ID甚至整個請求附加到響應中。 不幸的是, populationOf(String city)不返回原始請求( city ),僅返回響應( population )。 如果它返回諸如CityWithPopulation值對象或Pair<String, Long>類的東西, CityWithPopulation容易得多。 現在,假設我們通過附加請求( city )來增強原始方法:
Flowable<Pair<String, Long>> populationOfCity(String city) {Flowable<Long> population = geoNames.populationOf(city);return population.map(p -> Pair.of(city, p)); }現在,我們可以將這種方法用于更多的城市:
cities.flatMap(this::populationOfCity).subscribe(response -> log.info("Population: {}", response));…或避免使用額外的輔助方法:
cities.flatMap(city -> geoNames.populationOf(city).map(p -> Pair.of(city, p))).subscribe(response -> log.info("Population: {}", response));這次的result變量是Pair<String, Long>但是建議您使用更具表現力的value對象。
17:20:03.778 | Rx-2 | --> GET .../searchJSON?q=Paris http/1.1 17:20:03.778 | Rx-3 | --> GET .../searchJSON?q=London http/1.1 17:20:03.778 | Rx-4 | --> GET .../searchJSON?q=Madrid http/1.1 17:20:03.778 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1 17:20:03.953 | Rx-4 | <-- 200 OK .../searchJSON?q=Madrid (172ms) 17:20:03.959 | Rx-2 | <-- 200 OK .../searchJSON?q=Paris (179ms) 17:20:03.975 | Rx-2 | Population: (Paris,2138551) 17:20:03.976 | Rx-2 | Population: (Madrid,3255944) 17:20:03.988 | Rx-3 | <-- 200 OK .../searchJSON?q=London (207ms) 17:20:03.988 | Rx-3 | Population: (London,7556900) 17:20:04.080 | Rx-1 | <-- 200 OK .../searchJSON?q=Warsaw (299ms) 17:20:04.080 | Rx-1 | Population: (Warsaw,1702139)我發現帶有嵌套map() flatMap()添加了額外的上下文,這是處理亂序結果的最有效方法。 當然,它不是最易讀的反應式代碼,因此請確保將這種復雜性隱藏在某些外觀之下。
更新
正如DávidKarnok在對本文的評論中指出的那樣, flatMap()內的map()運算符是一種常見習語,以至于存在專門的flatMap()重載。 除了標準轉換函數(在我們的示例中為(String, Long) -> SomeType String -> Flowable<Long> )之外,它還采用了組合器雙功能(例如(String, Long) -> SomeType )。 此功能的目的是提供一種將輸入項與通過轉換生成的每個輸出項組合在一起的轉換。 這正是我們對嵌套map()所做的工作(將人口增加了對應的城市名稱),但要短得多:
Flowable<Pair<String, Long>> populations = cities.flatMap(city -> geoNames.populationOf(city), (city, pop) -> Pair.of(city, pop));第二個lambda表達式( (city, pop) -> Pair.of(city, pop) )是對populationOf()產生的每個下游事件執行的。 如果極端,可以使用方法引用:
Flowable<Pair<String, Long>> populations = cities.flatMap(geoNames::populationOf, Pair::of);花一點時間研究最后一個示例,一旦您掌握了它,它實際上就非常簡單:
- 為每個city找到人口pop
- 對于每個人口,通過形成Pair<String, Long>將其與city結合起來
PS:這是9年中的第200個帖子!
翻譯自: https://www.javacodegeeks.com/2017/08/flatmap-order-events-rxjava-faq.html
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的flatMap()和事件顺序– RxJava常见问题解答的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 油字怎么组词 油字的组词有哪些
- 下一篇: 如何脚踏实地构建Java Agent