日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java rx.observable_Rxjava2 Observable的条件操作符详解及实例

發布時間:2023/12/2 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java rx.observable_Rxjava2 Observable的条件操作符详解及实例 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

簡要:

需求了解:

在使用 Rxjava 開發中,經常有一些各種條件的操作 ,如比較兩個 Observable 誰先發射了數據、跳過指定條件的 Observable 等一系列的條件操作需求,那么很幸運, Rxjava 中已經有了很多條件操作符,一起來了解一下吧。

下面列出了一些Rxjava的用于條件操作符:

Amb:給定兩個或多個Observables,它只發射首先發射數據或通知的那個Observable的所有數據。

DefaultIfEmpty:發射來自原始Observable的值,如果原始 Observable 沒有發射任何數據項,就發射一個默認值。

SwitchIfEmpty:如果原始Observable沒有發射數據時,發射切換一個指定的Observable繼續發射數據。

SkipUntil:丟棄原始 Observable 發射的數據,直到第二個 Observable 發射了一個數據,然后發射原始 Observable 的剩余數據。

SkipWhile:丟棄原始 Observable 發射的數據,直到一個特定的條件為假,然后發射原始 Observable 剩余的數據。

TakeUntil:發射來自原始 Observable 的數據,直到第二個 Observable 發射了一個數據或一個通知。

1. Amb

給定兩個或多個Observables,它只發射首先發射數據或通知的那個Observable的所有數據。

解析: 對多個Observable進行監聽,首先發射通知(包括數據)的Observable將會被觀察者觀察,發射這個Observable的所有數據。

示例代碼:

// 創建Observable

Observable delayObservable = Observable.range(1, 5)

.delay(100, TimeUnit.MILLISECONDS); // 延遲100毫秒發射數據

Observable rangeObservable = Observable.range(6, 5);

// 創建Observable的集合

ArrayList> list = new ArrayList<>();

list.add(delayObservable);

list.add(rangeObservable);

// 創建Observable的數組

Observable[] array = new Observable[2];

array[0] = delayObservable;

array[1] = rangeObservable;

/**

* 1. ambWith(ObservableSource extends T> other)

* 與另外一個Observable比較,只發射首先發射通知的Observable的數據

*/

rangeObservable.ambWith(delayObservable)

.subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Exception {

System.out.println("--> accept(1): " + integer);

}

});

System.in.read();

System.out.println("------------------------------------------------");

/**

* 2. amb(Iterable extends ObservableSource extends T>> sources)

* 接受一個Observable類型的集合, 只發射集合中首先發射通知的Observable的數據

*/

Observable.amb(list)

.subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Exception {

System.out.println("--> accept(2): " + integer);

}

});

System.in.read();

System.out.println("------------------------------------------------");

/**

* 3. ambArray(ObservableSource extends T>... sources)

* 接受一個Observable類型的數組, 只發射數組中首先發射通知的Observable的數據

*/

Observable.ambArray(array)

.subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Exception {

System.out.println("--> accept(3): " + integer);

}

});

System.in.read();

輸出:

--> accept(1): 6

--> accept(1): 7

--> accept(1): 8

--> accept(1): 9

--> accept(1): 10

------------------------------------------------

--> accept(2): 6

--> accept(2): 7

--> accept(2): 8

--> accept(2): 9

--> accept(2): 10

------------------------------------------------

--> accept(3): 6

--> accept(3): 7

--> accept(3): 8

--> accept(3): 9

--> accept(3): 10

2. DefaultIfEmpty

發射來自原始Observable的值,如果原始 Observable 沒有發射數據項,就發射一個默認值。

解析: DefaultIfEmpty 簡單的精確地發射原始Observable的值,如果原始Observable沒有發射任何數據正常終止(以 onCompleted 的形式), DefaultIfEmpty 返回的Observable就發射一個你提供的默認值。如果你需要發射更多的數據,或者切換備用的Observable,你可以考慮使用 switchIfEmpty 操作符 。

示例代碼:

/**

* defaultIfEmpty(@NotNull T defaultItem)

* 如果原始Observable沒有發射任何數據正常終止(以 onCompleted 的形式),

* DefaultIfEmpty 返回的Observable就發射一個你提供的默認值defaultItem。

*/

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter emitter) throws Exception {

emitter.onComplete(); // 不發射任何數據,直接發射完成通知

}

}).defaultIfEmpty("No Data emitter!!!")

.subscribe(new Observer() {

@Override

public void onSubscribe(Disposable d) {

System.out.println("--> onSubscribe");

}

@Override

public void onNext(String s) {

System.out.println("--> onNext: " + s);

}

@Override

public void onError(Throwable e) {

System.out.println("--> onError: " + e);

}

@Override

public void onComplete() {

System.out.println("--> onComplete");

}

});

輸出:

--> onSubscribe

--> onNext: No Data emitter!!!

--> onComplete

3. SwitchIfEmpty

如果原始Observable沒有發射數據時,發射切換一個指定的Observable繼續發射數據。

解析: 如果原始 Observable 沒有發射數據時,發射切換指定的 other 繼續發射數據。

示例代碼:

/**

* switchIfEmpty(ObservableSource other)

* 如果原始Observable沒有發射數據時,發射切換指定的other繼續發射數據

*/

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter emitter) throws Exception {

emitter.onComplete(); // 不發射任何數據,直接發射完成通知

}

}).switchIfEmpty(Observable.just(888)) // 如果原始Observable沒有發射數據項,默認發射備用的Observable,發射數據項888

.subscribe(new Observer() {

@Override

public void onSubscribe(Disposable d) {

System.out.println("--> onSubscribe");

}

@Override

public void onNext(Integer integer) {

System.out.println("--> onNext: " + integer);

}

@Override

public void onError(Throwable e) {

System.out.println("--> onError: " + e);

}

@Override

public void onComplete() {

System.out.println("--> onComplete");

}

});

輸出:

--> onSubscribe

--> onNext: 888

--> onComplete

4. SkipUntil

丟棄原始 Observable 發射的數據,直到第二個 Observable 發射了一個數據,然后發射原始 Observable 的剩余數據。

示例代碼:

/**

* skipUntil(ObservableSource other)

* 丟棄原始Observable發射的數據,直到other發射了一個數據,然后發射原始Observable的剩余數據。

*/

Observable.intervalRange(1, 10, 0, 500, TimeUnit.MILLISECONDS)

// 丟棄2000毫秒的原始Observable發射的數據,接受后面的剩余部分數據

.skipUntil(Observable.timer(2000, TimeUnit.MILLISECONDS))

.subscribe(new Observer() {

@Override

public void onSubscribe(Disposable d) {

System.out.println("--> onSubscribe");

}

@Override

public void onNext(Long aLong) {

System.out.println("--> onNext: " + aLong);

}

@Override

public void onError(Throwable e) {

System.out.println("--> onError: " + e);

}

@Override

public void onComplete() {

System.out.println("--> onComplete");

}

});

System.in.read();

輸出:

--> onSubscribe

--> onNext: 5

--> onNext: 6

--> onNext: 7

--> onNext: 8

--> onNext: 9

--> onNext: 10

--> onComplete

5. SkipWhile

丟棄原始 Observable 發射的數據,直到一個特定的條件為假,然后發射原始 Observable 剩余的數據。

示例代碼:

/**

* skipWhile(Predicate super T> predicate)

* 丟棄原始 Observable 發射的數據,直到函數predicate的條件為假,然后發射原始Observable剩余的數據。

*/

Observable.intervalRange(1, 10, 0, 500, TimeUnit.MILLISECONDS)

.skipWhile(new Predicate() {

@Override

public boolean test(Long aLong) throws Exception {

if (aLong > 5) {

return false; // 當原始數據大于5時,發射后面的剩余部分數據

}

return true; // 丟棄原始數據項

}

}).subscribe(new Observer() {

@Override

public void onSubscribe(Disposable d) {

System.out.println("--> onSubscribe");

}

@Override

public void onNext(Long aLong) {

System.out.println("--> onNext: " + aLong);

}

@Override

public void onError(Throwable e) {

System.out.println("--> onError: " + e);

}

@Override

public void onComplete() {

System.out.println("--> onComplete");

}

});

System.in.read();

輸出:

--> onSubscribe

--> onNext: 6

--> onNext: 7

--> onNext: 8

--> onNext: 9

--> onNext: 10

--> onComplete

6. TakeUntil

發射來自原始 Observable 的數據,直到第二個 Observable 發射了一個數據或一個通知。

6.1 takeUntil(ObservableSource other)

TakeUntil 訂閱并開始發射原始 Observable,它還監視你提供的第二個 Observable。如果第二個 Observable 發射了一項數據或者發射了一個終止通知,TakeUntil 返回的 Observable 會停止發射原始 Observable 并終止。

解析: 第二個Observable發射一項數據或一個 onError 通知或一個 onCompleted 通知都會導致 takeUntil 停止發射數據。

示例代碼:

// 創建Observable,發送數字1~10,每間隔200毫秒發射一個數據

Observable observable = Observable.intervalRange(1, 10, 0, 200, TimeUnit.MILLISECONDS);

/**

* 1. takeUntil(ObservableSource other)

* 發射來自原始Observable的數據,直到other發射了一個數據或一個通知后停止發射原始Observable并終止。

*/

observable.takeUntil(Observable.timer(1000, TimeUnit.MILLISECONDS)) // 1000毫秒后停止發射原始數據

.subscribe(new Observer() {

@Override

public void onSubscribe(Disposable d) {

System.out.println("--> onSubscribe(1)");

}

@Override

public void onNext(Long aLong) {

System.out.println("--> onNext(1): " + aLong);

}

@Override

public void onError(Throwable e) {

System.out.println("--> onError(1): " + e);

}

@Override

public void onComplete() {

System.out.println("--> onComplete(1)");

}

});

System.in.read();

輸出:

--> onSubscribe(1)

--> onNext(1): 1

--> onNext(1): 2

--> onNext(1): 3

--> onNext(1): 4

--> onNext(1): 5

--> onComplete(1)

6.2 takeUntil(Predicate stopPredicate)

每次發射數據后,通過一個謂詞函數來判定是否需要終止發射數據。

解析: 每次發射數據后,通過一個謂詞函數 stopPredicate 來判定是否需要終止發射數據,如果 stopPredicate 返回 true 怎表示停止發射原始Observable后面的數據,否則繼續發射后面的數據。

示例代碼:

/**

* 2. takeUntil(Predicate super T> stopPredicate)

* 每次發射數據后,通過一個謂詞函數stopPredicate來判定是否需要終止發射數據

* 如果stopPredicate返回true怎表示停止發射后面的數據,否則繼續發射后面的數據

*/

observable.takeUntil(new Predicate() {

@Override

public boolean test(Long aLong) throws Exception { // 函數返回false則為繼續發射原始數據,true則停止發射原始數據

if(aLong > 5){

return true; // 滿足條件后,停止發射數據

}

return false; // 繼續發射數據

}

}).subscribe(new Observer() {

@Override

public void onSubscribe(Disposable d) {

System.out.println("--> onSubscribe(2)");

}

@Override

public void onNext(Long aLong) {

System.out.println("--> onNext(2): " + aLong);

}

@Override

public void onError(Throwable e) {

System.out.println("--> onError(2): " + e);

}

@Override

public void onComplete() {

System.out.println("--> onComplete(2)");

}

});

System.in.read();

輸出:

--> onSubscribe(2)

--> onNext(2): 1

--> onNext(2): 2

--> onNext(2): 3

--> onNext(2): 4

--> onNext(2): 5

--> onNext(2): 6

--> onComplete(2)

7. TakeWhile

發射原始Observable的數據,直到一個特定的條件,然后跳過剩余的數據。

解析: 發射原始 Observable 的數據,直到 predicate 的條件為 false ,然后跳過剩余的數據。

示例代碼:

// 創建Observable,發送數字1~10,每間隔200毫秒發射一個數據

Observable observable = Observable.intervalRange(1, 10, 0, 200, TimeUnit.MILLISECONDS);

/**

* takeWhile(Predicate predicate)

* 發射原始Observable的數據,直到predicate的條件為false,然后跳過剩余的數據

*/

observable.takeWhile(new Predicate() {

@Override

public boolean test(Long aLong) throws Exception { // 函數返回值決定是否繼續發射后續的數據

if(aLong > 5){

return false; // 滿足條件后跳過后面的數據

}

return true; // 繼續發射數據

}

}).subscribe(new Observer() {

@Override

public void onSubscribe(Disposable d) {

System.out.println("--> onSubscribe");

}

@Override

public void onNext(Long aLong) {

System.out.println("--> onNext: " + aLong);

}

@Override

public void onError(Throwable e) {

System.out.println("--> onError: " + e);

}

@Override

public void onComplete() {

System.out.println("--> onComplete");

}

});

System.in.read();

輸出:

--> onSubscribe(1)

--> onNext(1): 1

--> onNext(1): 2

--> onNext(1): 3

--> onNext(1): 4

--> onNext(1): 5

--> onComplete(1)

小結

本節主要介紹了Rxjava條件操作符可以根據不同的條件進行數據的發射,變換等相關行為。

提示:以上使用的Rxjava2版本: 2.2.12

Rx介紹與講解及完整目錄參考:Rxjava2 介紹與詳解實例

實例代碼:

總結

以上是生活随笔為你收集整理的java rx.observable_Rxjava2 Observable的条件操作符详解及实例的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。