日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

Android RxJava 3.x 使用总结

發布時間:2024/9/30 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Android RxJava 3.x 使用总结 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

轉載請標明出處:http://blog.csdn.net/zhaoyanjun6/article/details/106720158
本文出自【趙彥軍的博客】

文章目錄

    • 依賴接入
    • Flowable
    • Single
    • Maybe
    • BackpressureStrategy
    • 線程切換
    • concat
      • 例子1

依賴接入

implementation 'io.reactivex.rxjava3:rxandroid:3.0.0' implementation "io.reactivex.rxjava3:rxjava:3.0.4"

Flowable

//java 方式 Flowable.just(1).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Throwable {}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Throwable {}});//或者用 Lambda 簡寫 Flowable.just(1).subscribe( it -> {}, throwable -> {});

range 一組序列數據

Flowable.range(0, 4).subscribe(it -> {//結果 0 1 2 3}, throwable -> {});

Single

Single只發射單個數據或錯誤事件,即使發射多個數據,后面發射的數據也不會處理。
只有 onSuccess 和 onError事件,沒有 onNext 、onComplete事件。

SingleEmitter

public interface SingleEmitter<@NonNull T> {void onSuccess(@NonNull T t);void onError(@NonNull Throwable t);void setDisposable(@Nullable Disposable d);void setCancellable(@Nullable Cancellable c);boolean isDisposed();boolean tryOnError(@NonNull Throwable t);}

示例1

Single.create(new SingleOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull SingleEmitter<Integer> emitter) throws Throwable {emitter.onSuccess(1);}}).subscribe(integer -> {}, throwable -> {});

示例2

Single.just(1).subscribe(integer -> {}, throwable -> {});

Maybe

Maybe 是 RxJava2.x 之后才有的新類型,可以看成是Single和Completable的結合。
Maybe 也只能發射單個事件或錯誤事件,即使發射多個數據,后面發射的數據也不會處理。
只有 onSuccess 、 onError 、onComplete事件,沒有 onNext 事件。

public interface MaybeEmitter<@NonNull T> {void onSuccess(@NonNull T t);void onError(@NonNull Throwable t);void onComplete();void setDisposable(@Nullable Disposable d);void setCancellable(@Nullable Cancellable c);boolean isDisposed();boolean tryOnError(@NonNull Throwable t);}

實例1

Maybe.create(new MaybeOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull MaybeEmitter<Integer> emitter) throws Throwable {emitter.onSuccess(1);emitter.onComplete();}}).subscribe(integer -> {}, throwable -> {});

實例2

Maybe.just(1).subscribe(integer -> {}, throwable -> {});

BackpressureStrategy

背壓策略

public enum BackpressureStrategy {/*** The {@code onNext} events are written without any buffering or dropping.* Downstream has to deal with any overflow.* <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.*/MISSING,/*** Signals a {@link io.reactivex.rxjava3.exceptions.MissingBackpressureException MissingBackpressureException}* in case the downstream can't keep up.*/ERROR,/*** Buffers <em>all</em> {@code onNext} values until the downstream consumes it.*/BUFFER,/*** Drops the most recent {@code onNext} value if the downstream can't keep up.*/DROP,/*** Keeps only the latest {@code onNext} value, overwriting any previous value if the* downstream can't keep up.*/LATEST }
  • MISSING 策略則表示通過 Create 方法創建的 Flowable 沒有指定背壓策略,不會對通過 OnNext 發射的數據做緩存或丟棄處理,需要下游通過背壓操作符
  • BUFFER 策略則在還有數據未下發完成時就算上游調用onComplete或onError也會等待數據下發完成
  • LATEST 策略則當產生背壓時僅會緩存最新的數據
  • DROP 策略為背壓時丟棄背壓數據
  • ERROR 策略是背壓時拋出異常調用onError
Flowable.create(new FlowableOnSubscribe<Long>() {@Overridepublic void subscribe(@NonNull FlowableEmitter<Long> emitter) throws Throwable {emitter.onNext(1L);emitter.onNext(2L);emitter.onComplete();}}, BackpressureStrategy.DROP).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(it -> {}, throwable -> {});

線程切換

RxUtil

package com.example.streamimport io.reactivex.rxjava3.android.schedulers.AndroidSchedulers import io.reactivex.rxjava3.core.FlowableTransformer import io.reactivex.rxjava3.core.MaybeTransformer import io.reactivex.rxjava3.core.ObservableTransformer import io.reactivex.rxjava3.core.SingleTransformer import io.reactivex.rxjava3.schedulers.Schedulers/*** @author yanjun.zhao* @time 2020/6/12 8:39 PM* @desc*/object RxUtil {/*** 線程切換*/fun <T> maybeToMain(): MaybeTransformer<T, T> {return MaybeTransformer { upstream ->upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())}}/*** 線程切換*/fun <T> singleToMain(): SingleTransformer<T, T> {return SingleTransformer { upstream ->upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())}}/*** 線程切換*/fun <T> flowableToMain(): FlowableTransformer<T, T> {return FlowableTransformer { upstream ->upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())}}fun <T> observableToMain(): ObservableTransformer<T, T> {return ObservableTransformer { upstream ->upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())}}}

具體實現

package com.example.streamimport android.os.Bundle import androidx.appcompat.app.AppCompatActivity import io.reactivex.rxjava3.core.Flowable import io.reactivex.rxjava3.core.Maybe import io.reactivex.rxjava3.core.Observable import io.reactivex.rxjava3.core.Singleclass MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)Single.just(1).map {//運行在子線程it}.compose(RxUtil.singleToMain()) //線程轉換.subscribe({//運行在主線程},{it.printStackTrace()})Maybe.just(1).map {//運行在子線程it}.compose(RxUtil.maybeToMain()) //線程轉換.subscribe({//運行在主線程},{it.printStackTrace()})Flowable.just(1).map {//運行在子線程it}.compose(RxUtil.flowableToMain()) //線程轉換.subscribe({//運行在主線程},{it.printStackTrace()})Observable.just(1).map {//運行在子線程it}.compose(RxUtil.observableToMain()) //線程轉換.subscribe({ it ->//運行在主線程},{it.printStackTrace()})} }

concat

Concat操作符連接多個Observable的輸出,就好像它們是一個Observable,第一個Observable發射的所有數據在第二個Observable發射的任何數據前面,以此類推。

直到前面一個Observable終止,Concat才會訂閱額外的一個Observable。注意:因此,如果你嘗試連接一個"熱"Observable(這種Observable在創建后立即開始發射數據,即使沒有訂閱者),Concat將不會看到也不會發射它之前發射的任何數據。

例子1

private var ob1 = Observable.create<String> {Log.d("concat-數據源1", " ${Thread.currentThread().name} ")it.onNext("a1")it.onComplete()}private var ob2 = Observable.create<String> {Log.d("concat-數據源2", " ${Thread.currentThread().name} ")it.onNext("a2")it.onComplete()}private var ob3 = Observable.create<String> {Log.d("concat-數據源3", " ${Thread.currentThread().name} ")it.onNext("a3")it.onComplete()}Observable.concat<String>(ob1, ob2, ob3).subscribeOn(Schedulers.io()).subscribe{Log.d("concat-結果", " ${Thread.currentThread().name} " + it)}

結果是:

concat-數據源1: RxCachedThreadScheduler-1 concat-結果: RxCachedThreadScheduler-1 concat-數據源2: RxCachedThreadScheduler-1 concat-結果: RxCachedThreadScheduler-1 concat-數據源3: RxCachedThreadScheduler-1 concat-結果: RxCachedThreadScheduler-1

結果分析:

  • concat 輸出結果是有序的
  • concat 會使三個數據源都會執行

那么如果我要實現哪個數據源有數據,我就用哪個數據,一旦獲取到想要的數據,后續數據源不再執行。其實很簡單,用 firstElement() ,這個需求有點像圖片加載流程 先從內存取,內存沒有從本地文件取,本都文件沒有就請求服務器。一旦哪個環節獲取到了數據,立刻停止后面的流程

Observable.concat<String>(ob1, ob2, ob3).firstElement().subscribeOn(Schedulers.io()).subscribe {Log.d("concat-結果", " ${Thread.currentThread().name} ")}}

運行結果為:

concat-數據源1: RxCachedThreadScheduler-1 concat-結果: RxCachedThreadScheduler-1

總結

以上是生活随笔為你收集整理的Android RxJava 3.x 使用总结的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。