日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > java >内容正文

java

[译]震惊!RxJava 5 个不为人知的小秘密

發(fā)布時間:2024/1/17 java 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 [译]震惊!RxJava 5 个不为人知的小秘密 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
  • 原文地址:5 Not So Obvious Things About RxJava
  • 原文作者:Jag Saund
  • 譯文出自:掘金翻譯計劃
  • 譯者: skyar2009
  • 校對者:Danny1451, yunshuipiao

震驚!RxJava 5 個不為人知的小秘密

無論你是剛剛接觸 RxJava,還是已經(jīng)使用過一段時間,關(guān)于 RxJava 你總會有些新的知識要學(xué)。在使用 RxJava 框架過程中,我發(fā)現(xiàn)了 5 點不那么明顯的知識,使我可以充分挖掘它的潛能。

注釋 本文引用的 APIs 是基于 RxJava 1.2.6

1. 什么時候使用 map,什么時候使用 flatMap

map 和 flatMap 是常用的兩個 ReactiveX 操作。它們往往是你最先接觸的兩個操作,并且很難確定使用哪個是正確的。

mapflatMap 都是對 Observable 發(fā)出的每一個元素執(zhí)行轉(zhuǎn)換方法。但是,map 只輸出一個元素,flatMap 輸出 0 或多個元素。

在上面的例子中,map 操作對每一個字符串執(zhí)行了 split 方法并輸出了一個包含字符串?dāng)?shù)組的元素。當(dāng)你想將一個元素轉(zhuǎn)換成另一個時使用 map。

有些時候,我們執(zhí)行的方法返回多個元素,并且我們希望將他們添加到同一個流中。這種情況下,flatMap 是一個好的選擇。在上面的例子中 flatMap 操作將字符串?dāng)?shù)組處理后輸出到了同一個序列。

2. 避免使用 Observable.create(…) 創(chuàng)建 Observable

有些時候你需要將同步或異步的 API 轉(zhuǎn)成響應(yīng)式的 API。使用 Observable.create 看起來是個極具誘惑性的選擇,但它有如下要求:

  • 當(dāng)取消 Observable 訂閱時需要注銷回調(diào) (否則會造成內(nèi)存泄露)
  • 只有當(dāng)有訂閱者訂閱時才能使用 onNext 或 onCompleted 發(fā)送事件
  • 使用 onError 向上游傳遞錯誤
  • 處理背壓

很難正確的實現(xiàn)以上要求,幸運的是,你可以不這么做。有一些靜態(tài)工具方法可以幫你解決:

syncOnSubscribe

一個可以創(chuàng)建安全 OnSubscribe<T> 的工具,它創(chuàng)建的 OnSubscribe<T> 能夠正確地處理來自訂閱者的背壓請求。當(dāng)你需要將一個同步獲取式的阻塞 API 轉(zhuǎn)成響應(yīng)式 API 時可以使用。

public Observable<byte[]> readFile(@NonNull FileInputStream stream) {final SyncOnSubscribe<FileInputStream, byte[]> fileReader = SyncOnSubscribe.createStateful(() -> stream,(stream, output) -> {try {final byte[] buffer = new byte[BUFFER_SIZE];int count = stream.read(buffer);if (count < 0) {output.onCompleted();} else {output.onNext(buffer);}} catch (IOException error) {output.onError(error);}return stream;},s -> IOUtil.closeSilently(s));return Observable.create(fileReader); }復(fù)制代碼

fromCallable

一個靜態(tài)工具,可以對簡單的同步 API 進(jìn)行封裝并將之轉(zhuǎn)化成響應(yīng)式 API。更贊的是,fromCallable 也可以處理檢查到的異常。

public Observable<Boolean> enablePushNotifications(boolean enable) {return Observable.fromCallable(() -> sharedPrefs.edit().putBoolean(KEY_PUSH_NOTIFICATIONS_PREFS, enable).commit()); }復(fù)制代碼

fromEmitter

一個靜態(tài)工具,對異步 API 進(jìn)行封裝并可以管理 Observable 被取消訂閱時釋放的資源。不像 fromCallable,你可以輸出多個元素。

import android.bluetooth.le.BluetoothLeScanner; import android.bluetooth.le.ScanCallback; import android.bluetooth.le.ScanResult; import android.support.annotation.NonNull; import rx.Emitter; import rx.Observable;import java.util.List;public class RxBluetoothScanner {public static class ScanResultException extends RuntimeException {public ScanResultException(int errorCode) {super("Bluetooth scan failed. Error code: " + errorCode);}}private RxBluetoothScanner() {}@NonNullpublic static Observable<ScanResult> scan(@NonNull final BluetoothLeScanner scanner) {return Observable.fromEmitter(scanResultEmitter -> {final ScanCallback scanCallback = new ScanCallback() {@Overridepublic void onScanResult(int callbackType, @NonNull ScanResult result) {scanResultEmitter.onNext(result);}@Overridepublic void onBatchScanResults(@NonNull List<ScanResult> results) {for (ScanResult r : results) {scanResultEmitter.onNext(r);}}@Overridepublic void onScanFailed(int errorCode) {scanResultEmitter.onError(new ScanResultException(errorCode));}};scanResultEmitter.setCancellation(() -> scanner.stopScan(scanCallback));scanner.startScan(scanCallback);}, Emitter.BackpressureMode.BUFFER);} }復(fù)制代碼

3. 如何處理背壓

有時,Observable 產(chǎn)生事件過快以至于下游觀察者跟不上它的速度。當(dāng)這種情況發(fā)生時,你往往會遇到 MissingBackpressureException 異常。

RxJava 提供了一些方法管理背壓,但是具體使用哪一種需要視情況而定。

冷、熱 Observable

只有當(dāng)有訂閱時,冷 Observable 才會發(fā)送元素。觀察者訂閱冷 Observable 可以控制發(fā)送事件的速度而不需要犧牲流的完整性。冷 Observable 例子有:讀文件、數(shù)據(jù)庫查詢、網(wǎng)絡(luò)請求以及靜態(tài)迭代器轉(zhuǎn)成的 Observable。

熱 Observable 是連續(xù)的事件流,它的發(fā)出不依賴訂閱者的數(shù)量。當(dāng)一個觀察者訂閱了 Observable,那么它將面臨下面的一種情況:

  • 收到所有事件子集的重放
  • 收到所有事件的重放
  • 收到新的事件

熱 Observables 例子有:觸摸事件、通知以及進(jìn)度更新。

由于熱 Observable 發(fā)出事件的本性,我們不能控制它的速度。例如,你不能降低觸摸事件發(fā)出的速度。因此,最好是使用 BackpressureMode 提供的流控制策略。

使用一個響應(yīng)式獲取方法,冷 Observable 可以根據(jù)觀察者的反饋降低發(fā)送速度。更多知識,請看 ReactiveX 文檔的背壓與響應(yīng)式獲取方法.

BackpressureMode.NONE 和 BackpressureMode.ERROR

在這兩種模式中,發(fā)送的事件不是背壓。當(dāng)被觀察者的 16 元素緩沖區(qū)溢出時會拋出 MissingBackpressureException。

BackpressureMode.BUFFER

在這種模式下,有一個無限的緩沖區(qū)(初始化時是 128)。過快發(fā)出的元素都會放到緩沖區(qū)中。如果緩沖區(qū)中的元素?zé)o法消耗,會持續(xù)的積累直到內(nèi)存耗盡。結(jié)果是 OutOfMemoryException 異常。

BackpressureMode.DROP

這種模式是使用固定大小為 1 的緩沖區(qū)。如果下游觀察者無法處理,第一個元素會緩存下來后續(xù)的會被丟棄。當(dāng)消費者可以處理下一個元素時,它收到的將是 Observable 發(fā)出的第一個元素。

BackpressureMode.LATEST

這種模式與 BackpressureMode.DROP 類似,因為它也使用固定大小為 1 的緩沖區(qū)。然而,不是緩存第一個元素丟棄后續(xù)元素,BackpressureMode.LATEST 而是使用最新的元素替換緩沖區(qū)緩存的元素。當(dāng)消費者可以處理下一個元素時,它收到的是 Observable 最近一次發(fā)送的元素。

4. 如何防止無意的結(jié)束流錯誤

RxJava 通過給 Observable 序列發(fā)送 onError 通知不可恢復(fù)的錯誤,并且會結(jié)束序列。

有時,你不希望結(jié)束序列。對于這種情況,RxJava 提供了幾種不會結(jié)束序列的錯誤處理方法。

RxJava 提供了許多錯誤處理方法,但是有時你不希望結(jié)束序列。尤其是涉及到主題時。

onErrorResumeNext

使用 onErrorResumeNext 可以攔截 onError 并返回一個 Observable。或者對錯誤信息添加附加信息并返回一個新的錯誤,或者發(fā)送給 onNext 一個新的事件。

public Observable<SearchResult> search(@NotNull EditText searchView) {return RxTextView.textChanges(searchView) // In production, share this text view observable, don't create a new one each time.map(CharSequence::toString).debounce(500, TimeUnit.MILLISECONDS) // Avoid getting spammed with key stroke changes.filter(s -> s.length() > 1) // Only interested in queries of length greater than 1.observeOn(workerScheduler) // Next set of operations will be network so switch to an IO Scheduler (or worker).switchMap(query -> searchService.query(query)) // Take the latest observable from upstream and unsubscribe from any previous subscriptions.onErrorResumeNext(Observable.empty()); // <-- This will terminate upstream (ie. we will stop receiving text view changes after an error!) }復(fù)制代碼

使用 onErrorResumeNext 捕獲

使用該操作會修復(fù)下游序列,但是會結(jié)束上游序列因為已經(jīng)發(fā)送了 onError 通知。所以,如果你連接的是一個發(fā)布通知的主題,onError 通知會結(jié)束主題。

如果你希望上游繼續(xù)運行,可以在 onErrorResumeNext 操作中嵌套 flatMap 或 switchMap 操作。

public Observable<SearchResult> search(@NotNull EditText searchView) {return RxTextView.textChanges(searchView) // In production, share this text view observable, don't create a new one each time.map(CharSequence::toString).debounce(500, TimeUnit.MILLISECONDS) // Avoid getting spammed with key stroke changes.filter(s -> s.length() > 1) // Only interested in queries of length greater than 1.observeOn(workerScheduler) // Next set of operations will be network so switch to an IO Scheduler (or worker).switchMap(query -> searchService.query(query) // Take the latest observable from upstream and unsubscribe from any previous subscriptions.onErrorResumeNext(Observable.empty()); // <-- This fixes the problem since the error is not seen by the upstream observable }復(fù)制代碼

5. 如何共享你的 Observable

有時你需要將 Observable 的輸出共享給多個觀察者。RxJava 提供了 share 和 publish 兩種方式實現(xiàn) Observable 發(fā)送事件的多播。

Share

share 允許多個觀察者連接到源 Observable。下面的例子中,共享的是 Observable 發(fā)送的 MotionEvent 事件。然后,我們創(chuàng)建了另外兩個 Observable 分別過濾 DOWN 和 UP 觸摸事件。DOWN 事件我們畫紅圈,UP 事件我們畫籃圈。

public void touchEventHandler(@NotNull View view) {final Observable<MotionEvent> motionEventObservable = RxView.touches(view).share();// Capture down eventsfinal Observable<MotionEvent> downEventsObservable = motionEventObservable.filter(event -> event.getAction() == MotionEvent.ACTION_DOWN);// Capture up eventsfinal Observable<MotionEvent> upEventsObservable = motionEventObservable.filter(event -> event.getAction() == MotionEvent.ACTION_UP);// Show a red circle at the position where the down event ocurredsubscriptions.add(downEventsObservable.subscribe(event ->view.showCircle(event.getX(), event.getY(), Color.RED)));// Show a blue circle at the position where the up event ocurredsubscriptions.add(upEventsObservable.subscribe(event ->view.showCircle(event.getX(), event.getY(), Color.BLUE))); }復(fù)制代碼

然而,一旦有觀察者訂閱 Observable,Observable 就會開始發(fā)送事件。這樣就會造成后續(xù)的訂閱者會錯過一個或多個觸摸事件。

在這個例子中,“藍(lán)” 觀察者錯過了第一個事件。有些時候這沒問題,但是如果你不能接受錯過任何事件,那么你需要使用 publish 操作。

Publish

對 Observable 執(zhí)行 publish 操作會將值轉(zhuǎn)化為 ConnectedObservable。就像打開閥門一樣。下面的例子和上面一樣,需要注意的是我們現(xiàn)在使用的是 publish 操作。

public void touchEventHandler(@NotNull View view) {final ConnectedObservable<MotionEvent> motionEventObservable = RxView.touches(view).publish();// Capture down eventsfinal Observable<MotionEvent> downEventsObservable = motionEventObservable.filter(event -> event.getAction() == MotionEvent.ACTION_DOWN);// Capture up eventsfinal Observable<MotionEvent> upEventsObservable = motionEventObservable.filter(event -> event.getAction() == MotionEvent.ACTION_UP);// Show a red circle at the position where the down event ocurredsubscriptions.add(downEventsObservable.subscribe(event ->view.showCircle(event.getX(), event.getY(), Color.RED)));// Show a blue circle at the position where the up event ocurredsubscriptions.add(upEventsObservable.subscribe(event ->view.showCircle(event.getX(), event.getY(), Color.BLUE)));// Connect the source observable to begin emitting eventssubscriptions.add(motionEventObservable.connect()); }復(fù)制代碼

一旦必要的 Observables 訂閱了源,你需要執(zhí)行對源 ConnectedObservable 執(zhí)行 connect 來開始發(fā)送事件。

注意,一旦對源調(diào)用了 connect 方法,相同事件序列會分別發(fā)送給 “綠” 和 “藍(lán)” 觀察者。

掘金翻譯計劃 是一個翻譯優(yōu)質(zhì)互聯(lián)網(wǎng)技術(shù)文章的社區(qū),文章來源為 掘金 上的英文分享文章。內(nèi)容覆蓋 Android、iOS、React、前端、后端、產(chǎn)品、設(shè)計 等領(lǐng)域,想要查看更多優(yōu)質(zhì)譯文請持續(xù)關(guān)注 掘金翻譯計劃。

總結(jié)

以上是生活随笔為你收集整理的[译]震惊!RxJava 5 个不为人知的小秘密的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。