RxJava中的doOnSubscribe默认运行线程分析
假設你對RxJava1.x還不是了解,能夠參考以下文章。
1. RxJava使用介紹 【視頻教程】
2. RxJava操作符
??? Creating Observables(Observable的創建操作符) 【視頻教程】
??? Transforming Observables(Observable的轉換操作符) 【視頻教程】
??? Filtering Observables(Observable的過濾操作符) 【視頻教程】
??? Combining Observables(Observable的組合操作符) 【視頻教程】
??? Error Handling Operators(Observable的錯誤處理操作符) 【視頻教程】
??? Observable Utility Operators(Observable的輔助性操作符) 【視頻教程】
??? Conditional and Boolean Operators(Observable的條件和布爾操作符) 【視頻教程】
??? Mathematical and Aggregate Operators(Observable數學運算及聚合操作符) 【視頻教程】
??? 其它如observable.toList()、observable.connect()、observable.publish()等等。 【視頻教程】
3. RxJava Observer與Subcriber的關系 【視頻教程】
4. RxJava線程控制(Scheduler) 【視頻教程】
5. RxJava 并發之數據流發射太快怎樣辦(背壓(Backpressure)) 【視頻教程】
前言
在有心課堂《RxJava之旅》中有學員留言:map和doOnSubscribe默認調度器是IO調度器,這里說錯了吧?
以下我們分析下。
在前面講 Subscriber 的時候,提到過 Subscriber 的 onStart() 能夠用作流程開始前的初始化。然而 onStart() 由于在 subscribe() 發生時就被調用了,因此不能指定線程。而是僅僅能運行在 subscribe() 被調用時的線程。這就導致假設 onStart() 中含有對線程有要求的代碼(比如在界面上顯示一個 ProgressBar,這必須在主線程運行),將會有線程非法的風險,由于有時你無法預測 subscribe() 將會在什么線程運行。
而與 Subscriber.onStart() 相相應的。有一個方法 Observable.doOnSubscribe() 。
它和 Subscriber.onStart() 相同是在 subscribe() 調用后并且在事件發送前運行。但差別在于它能夠指定線程。
默認情況下, doOnSubscribe() 運行在 subscribe() 發生的線程;而假設在 doOnSubscribe() 之后有 subscribeOn() 的話,它將運行在離它近期的 subscribeOn() 所指定的線程。
演示樣例代碼:
Observable.create(onSubscribe).subscribeOn(Schedulers.io()).doOnSubscribe(new Action0() {@Overridepublic void call() {progressBar.setVisibility(View.VISIBLE); // 須要在主線程運行}}).subscribeOn(AndroidSchedulers.mainThread()) // 指定主線程.observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);subscribeOn 作用于該操作符之前的 Observable 的創建操符作以及 doOnSubscribe 操作符 ,換句話說就是 doOnSubscribe 以及 Observable 的創建操作符總是被其之后近期的 subscribeOn 控制 。沒看懂不要緊,看以下代碼你就懂了。
Observable.create(new Observable.OnSubscribe<Integer>() {@Overridepublic void call(Subscriber<?super Integer> subscriber) { subscriber.onNext(1); subscriber.onCompleted(); } }) .doOnSubscribe(new Action0() { @Override public void call() { System.out.println("00doOnSubscribe在線程" + Thread.currentThread().getName() + "中"); } }) .subscribeOn(Schedulers.newThread()) .map(new Func1<Integer, String>() { @Override public String call(Integer integer) { System.out.println("map1在線程" + Thread.currentThread().getName() + "中"); return integer + ""; } }) .doOnSubscribe(new Action0() { @Override public void call() { System.out.println("11doOnSubscribe在線程" + Thread.currentThread().getName() + "中"); } }) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.io()) .map(new Func1<String, String>() { @Override public String call(String s) { System.out.println("map2在線程" + Thread.currentThread().getName() + "中"); return s + "1"; } }) .doOnSubscribe(new Action0() { @Override public void call() { System.out.println("22doOnSubscribe在線程" + Thread.currentThread().getName() + "中"); } }) .subscribeOn(Schedulers.newThread()) .subscribe(new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { System.out.println("onNext在線程" + Thread.currentThread().getName() + "中"); } });
運行結果例如以下:
22doOnSubscribe在線程RxNewThreadScheduler-1中 11doOnSubscribe在線程RxIoScheduler-3中 00doOnSubscribe在線程RxNewThreadScheduler-2中 map1在線程RxNewThreadScheduler-2中 map2在線程RxIoScheduler-2中 onNext在線程RxIoScheduler-2中依據代碼和運行結果我總結例如以下:
今天就分析到這里,假設有問題請大家反饋交流。
轉載于:https://www.cnblogs.com/blfbuaa/p/7383739.html
總結
以上是生活随笔為你收集整理的RxJava中的doOnSubscribe默认运行线程分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 类的定义、成员定义修饰符
- 下一篇: [改善Java代码]非稳定排序推荐使用L