日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Rxjava2自我·解惑

發布時間:2025/3/15 编程问答 12 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Rxjava2自我·解惑 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

最近項目升級,從Rxjava1升級到了Rxjava2(之前一直想升級,但是去年項目實在多。。不敢造次。。),發現變化挺大的,并且最近逐漸有點強迫癥,受不了只會用API的自己。所以,廢話少說,看代碼吧。本人很菜,寫出來只是為了給自己一個交代,勿噴。 首先,不能一頭扎進入,先定個目標。我們看的目的是什么:

  • Rxjava 訂閱和終止訂閱的過程(Observable,Observer,Consumer,just,create等等)
  • Rxjava操作符原理(map,flatMap)
  • Rxjava線程調度原理(subscribeOn、observeOn、io、main)

Rxjava 訂閱過程

這里以Observable.just為例:

Observable.just(1).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onNext(Integer integer) {}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}}); 復制代碼

讓我們以圖片流的形式開始吧:

  • just()后返回了啥?
  • new ObservableJust()? 哦,new了對象,保存了我們傳遞進來的值。OK,出去。
  • RxJavaPlugins.onAssembly
  • Ok,至此第一步就看完了,也就是說:最后返回了ObservableJust對象。
  • 看subscribe()方法,subscribe可以看成是ObservableJust.subscribe(),但是我們可以發現,subscribe方法是final的,也就是說子類不可以重寫的,所以subscribe方法執行的就是它父類observable的方法。 這里其實用到了策略模式。很像項目中的BaseActvity,呵呵。
  • 走,看subscribeActual方法。
  • ScalarDisposable其實就是個線程,你們應該也猜到。

    7.run方法 好了,整個過程就這么完成了。 一臉懵逼。。。。 我們繼續看看Observable.create吧:

    Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {}}).subscribe(new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onNext(String s) {}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}}); 復制代碼
  • create()方法 代碼相似度999.999%,不過這次是返回了ObservableCreate。
  • 還是subscribe方法。
  • 到ObservableCreate看它的subscribeActual:
  • source.subscribe()方法 好吧,我傻,這里就是我們外面實現的方法。。。Σ(☉▽☉"a
  • 返回去看CreateEmitter類
  • 看它的父類ObservableEmitter
  • public interface ObservableEmitter<T> extends Emitter<T> {/*** Sets a Disposable on this emitter; any previous Disposable* or Cancellation will be unsubscribed/cancelled.* @param d the disposable, null is allowed*/void setDisposable(@Nullable Disposable d);/*** Sets a Cancellable on this emitter; any previous Disposable* or Cancellation will be unsubscribed/cancelled.* @param c the cancellable resource, null is allowed*/void setCancellable(@Nullable Cancellable c);/*** Returns true if the downstream disposed the sequence or the* emitter was terminated via {@link #onError(Throwable)}, {@link #onComplete} or a* successful {@link #tryOnError(Throwable)}.* <p>This method is thread-safe.* @return true if the downstream disposed the sequence or the emitter was terminated*/boolean isDisposed();/*** Ensures that calls to onNext, onError and onComplete are properly serialized.* @return the serialized ObservableEmitter*/@NonNullObservableEmitter<T> serialize();/*** Attempts to emit the specified {@code Throwable} error if the downstream* hasn't cancelled the sequence or is otherwise terminated, returning false* if the emission is not allowed to happen due to lifecycle restrictions.* <p>* Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called* if the error could not be delivered.* @param t the throwable error to signal if possible* @return true if successful, false if the downstream is not able to accept further* events* @since 2.1.1 - experimental*/@Experimentalboolean tryOnError(@NonNull Throwable t); } 復制代碼

    主要是一些取消的方法,它還有個父類Emitter:

    public interface Emitter<T> {/*** Signal a normal value.* @param value the value to signal, not null*/void onNext(@NonNull T value);/*** Signal a Throwable exception.* @param error the Throwable to signal, not null*/void onError(@NonNull Throwable error);/*** Signal a completion.*/void onComplete(); } 復制代碼

    這些方法再熟悉不過了。。。。 7. OK,這個類就這樣看完了,接下來看它的下一個方法:

    哦,接下來調用onSubscribe方法。 咦,這不就是我們傳遞進來的觀察者嗎? 慢著,慢著,有點眉目了,如果沒猜錯,應該是這樣的:

    observable 調用subscribe()后,最終會調用Observable.create(T)中T的subscribe()方法,在subscribe中,如果我們完成了,會調用emitter.onNext方法,如下,應該很多人都是這么寫的:

    @Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {//....N個邏輯后e.onNext("最終結果");}}) 復制代碼

    此時,由于CreateEmitter是持有我們的觀察者observer的,如圖:

    如果,沒猜錯,它就是個代理,實際上它會調用observer.onNext()方法。驗證一下吧。 嗯,的確如此。 好了,訂閱的過程解決了。 總結就是:

    Observable.xx方法會生成一個ObservableXX類,之后,當調用Observable.subscribe時,會去調用ObservableXX類的subscribeActual方法,這個方法會調用我們在外面實現observable的方法,并把包裝類傳遞過去。當我們發送事件時,會調用包裝類的onNext等方法,然后這個包裝類會幫我們調用傳遞進去的觀察者的onNext等方法。(onError,onComplete同理,只不過我沒截圖)

    接下來,取消訂閱過程。堅持一下,就快結束第一部分了。

    終止訂閱的過程

    Disposable disposable = Observable.create((ObservableOnSubscribe<String>) e -> {//....N個邏輯后e.onNext("最終結果");}).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {}});disposable.dispose(); 復制代碼
  • 直接點擊dispose方法 是個接口。 看它的實現類。
  • 找實現類
  • 找到了,直接看dispose方法。

  • LambdaObserver的dispose
  • @Overridepublic void dispose() {DisposableHelper.dispose(this);} 復制代碼
  • DisposableHelper
  • public static boolean dispose(AtomicReference<Disposable> field) {Disposable current = field.get();Disposable d = DISPOSED;if (current != d) {current = field.getAndSet(d);if (current != d) {if (current != null) {current.dispose();}return true;}}return false;} 復制代碼

    在終止訂閱時,執行dispose()會將一個單例對象DISPOSED,賦給當前的Disposable對象,由于枚舉成員本質是靜態常量,所以isDisposed(Disposable d)方法也只需要判斷當前對象的引用是否是DISPOSED即可。

    Rxjava 操作符

    先從map入手:

  • map()方法:
  • 感覺很相似呀,我的第七感要來了,感覺又會是外面包裝一層,然后返回這個包裝后的對象。。。看代碼驗證吧。
  • MapObserver的onNext方法 嗯,果然還是包了一層呀。 到這里,有沒有發現它的調用邏輯?反正我是等看了別人的分析博客才發現的。囧 copy一下別人博客的總結,總結的真的很好: 博客地址:Rxjava2源碼分析
  • RxJava 線程調度原理

    代碼如下:

    Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {//....N個邏輯后e.onNext("最終結果");}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {}}); 復制代碼

    照理圖片流。。

  • subscrbeOn方法
  • 經過前面的分析,我們也知道最后調用的是subscribeActual,所以直接看ObservableSubscribeOn中的subscribeActual。 跟之前一樣,還是New了個包裝類,把觀察者s傳遞了進去。這個對象等會看,我們先看s.onSubscribe()。onSubscribe這個方法是接口的方法,得看實現類。它的實現類是誰呢? 看subscribe方法: 原來是LambdaObserver,OK。
  • LambdaObserver的onSubscribe 這個onSubscribe是誰?回溯~
  • 看樣子,好像是個空實現。。。。好吧

    s.onSubscribe(parent)//暫時啥也沒做,如果我們沒有在外面實現onSubscribe方法的話。 復制代碼
  • SubscribeTask
  • parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); //重點應該是這句 復制代碼

    看一下~ SubscribeTask:

    是一個線程,當啟動時,調用上一個observable的subscribe方法。我們知道,最后就是調用observable的subscribeActual。 對我們這段代碼來說,這里的observable就是ObservableCreate。

    從這里可知,的確調用是自下而上調用的。

  • scheduler.scheduleDirect
  • 我們一句一句代碼看吧:

    final Worker w = createWorker(); 復制代碼

    點擊:

    @NonNullpublic abstract Worker createWorker(); 復制代碼

    抽象的,得看實現類:

    OK。不要陷入太深,知道創建的worker是什么東西之后我們看下一句。

    DisposeTask task = new DisposeTask(decoratedRun, w);w.schedule(task, delay, unit); 復制代碼

    new 了一個什么task后,調用了worker的schedule方法,這個 worker我們知道了,那我們去看它的schedule方法。

    從線程池中取了一個threadWorker,然后調用scheduleActual方法: 這里的executor是什么應該不難猜到,上一下代碼:

    public class NewThreadWorker extends Scheduler.Worker implements Disposable {private final ScheduledExecutorService executor;volatile boolean disposed;public NewThreadWorker(ThreadFactory threadFactory) {executor = SchedulerPoolFactory.create(threadFactory);} 復制代碼

    create()方法:

    /*** Creates a ScheduledExecutorService with the given factory.* @param factory the thread factory* @return the ScheduledExecutorService*/public static ScheduledExecutorService create(ThreadFactory factory) {final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);//創建了個線程池tryPutIntoPool(PURGE_ENABLED, exec);return exec;} 復制代碼

    總的來說就是: 把Observable包裝成任務,然后交給線程池調度執行。 總結

    • subscribeOn只生效一次的原因就是因為, 訂閱過程是自下而上的,當調用subscribe時,會一級一級向上調用上一級observable的方法,此時,最頂層的(最上游,最開始傳遞的)ObservableSubscribeOn會先被調用,從而確定了任務執行所在的線程,此后設置的線程都將無效。
    • 訂閱過程是自下而上的,然后數據發射事件,再自上而下傳遞數據。

    總結

    以上是生活随笔為你收集整理的Rxjava2自我·解惑的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。