给 Android 开发人员的 RxJava 具体解释
前言
我從去年開(kāi)始使用 RxJava 。到如今一年多了。
今年加入了 Flipboard 后,看到 Flipboard 的 Android 項(xiàng)目也在使用 RxJava 。并且使用的場(chǎng)景越來(lái)越多 。
而近期這幾個(gè)月。我也發(fā)現(xiàn)國(guó)內(nèi)越來(lái)越多的人開(kāi)始提及 RxJava 。
有人說(shuō)『RxJava 真是太好用了』,有人說(shuō)『RxJava 真是太難用了』,另外很多其它的人表示:我真的百度了也谷歌了。但我還是想問(wèn): RxJava 究竟是什么?
鑒于 RxJava 眼下這種既火爆又神奇的現(xiàn)狀,而我又在一年的使用過(guò)程中對(duì) RxJava 有了一些理解,我決定寫下這篇文章來(lái)對(duì) RxJava 做一個(gè)相對(duì)詳細(xì)的、針對(duì) Android 開(kāi)發(fā)人員的介紹。
這篇文章的目的有兩個(gè):1. 給對(duì) RxJava 感興趣的人一些入門的指引2. 給正在使用 RxJava 但仍然心存疑惑的人一些更深入的解析
- RxJava 究竟是什么
- RxJava 好在哪
- API 介紹和原理簡(jiǎn)析
- 1. 概念:擴(kuò)展的觀察者模式
- 觀察者模式
- RxJava 的觀察者模式
- 2. 基本實(shí)現(xiàn)
- 1) 創(chuàng)建 Observer
- 2) 創(chuàng)建 Observable
- 3) Subscribe (訂閱)
- 4) 場(chǎng)景演示樣例
- a. 打印字符串?dāng)?shù)組
- b. 由 id 取得圖片并顯示
- 3. 線程控制 —— Scheduler (一)
- 1) Scheduler 的 API (一)
- 2) Scheduler 的原理 (一)
- 4. 變換
- 1) API
- 2) 變換的原理:lift()
- 3) compose: 對(duì) Observable 總體的變換
- 5. 線程控制:Scheduler (二)
- 1) Scheduler 的 API (二)
- 2) Scheduler 的原理(二)
- 3) 延伸:doOnSubscribe()
- 1. 概念:擴(kuò)展的觀察者模式
- RxJava 的適用場(chǎng)景和使用方式
- 1. 與 Retrofit 的結(jié)合
- 2. RxBinding
- 3. 各種異步操作
- 4. RxBus
- 最后
- 關(guān)于作者:
- 為什么寫這個(gè)?
在正文開(kāi)始之前的最后。放上 GitHub 鏈接和引入依賴的 gradle 代碼: Github:
https://github.com/ReactiveX/RxJava
https://github.com/ReactiveX/RxAndroid
引入依賴:
compile 'io.reactivex:rxjava:1.0.14'
compile 'io.reactivex:rxandroid:1.0.1'
(版本號(hào)號(hào)是文章公布時(shí)的最新穩(wěn)定版)
另外,感謝 RxJava 核心成員流火楓林的技術(shù)支持和內(nèi)測(cè)讀者代碼家、鮑永章、drakeet、馬琳、有時(shí)放縱、程序亦非猿、大頭鬼、XZoomEye、席德雨、TCahead、Tiiime、Ailurus、宅學(xué)長(zhǎng)、妖孽、大大大大大臣哥、NicodeLee的幫助,以及周伯通招聘的贊助。
RxJava 究竟是什么
一個(gè)詞:異步。
RxJava 在 GitHub 主頁(yè)上的自我介紹是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一個(gè)在 Java VM 上使用可觀測(cè)的序列來(lái)組成異步的、基于事件的程序的庫(kù))。這就是 RxJava ,概括得非常精準(zhǔn)。
然而,對(duì)于剛開(kāi)始學(xué)習(xí)的人來(lái)說(shuō),這太難看懂了。由于它是一個(gè)『總結(jié)』,而剛開(kāi)始學(xué)習(xí)的人更須要一個(gè)『引言』。
事實(shí)上, RxJava 的本質(zhì)能夠壓縮為異步這一個(gè)詞。
說(shuō)到根上。它就是一個(gè)實(shí)現(xiàn)異步操作的庫(kù),而別的定語(yǔ)都是基于這之上的。
RxJava 好在哪
換句話說(shuō),『相同是做異步,為什么人們用它,而不用現(xiàn)成的 AsyncTask / Handler / XXX / ... ?』
一個(gè)詞:簡(jiǎn)潔。
異步操作非常關(guān)鍵的一點(diǎn)是程序的簡(jiǎn)潔性,由于在調(diào)度過(guò)程比較復(fù)雜的情況下。異步代碼經(jīng)常會(huì)既難寫也難被讀懂。 Android 創(chuàng)造的 AsyncTask 和Handler ,事實(shí)上都是為了讓異步代碼更加簡(jiǎn)潔。RxJava 的優(yōu)勢(shì)也是簡(jiǎn)潔,但它的簡(jiǎn)潔的與眾不同之處在于。隨著程序邏輯變得越來(lái)越復(fù)雜,它依舊能夠保持簡(jiǎn)潔。
假設(shè)有這樣一個(gè)需求:界面上有一個(gè)自己定義的視圖 imageCollectorView 。它的作用是顯示多張圖片,并能使用 addImage(Bitmap) 方法來(lái)隨意添加顯示的圖片。如今須要程序?qū)⒁粋€(gè)給出的文件夾數(shù)組 File[] folders 中每一個(gè)文件夾下的 png 圖片都載入出來(lái)并顯示在 imageCollectorView 中。須要注意的是,由于讀取圖片的這一過(guò)程較為耗時(shí),須要放在后臺(tái)運(yùn)行。而圖片的顯示則必須在 UI 線程運(yùn)行。
經(jīng)常使用的實(shí)現(xiàn)方式有多種,我這里貼出當(dāng)中一種:
new Thread() {@Overridepublic void run() {super.run();for (File folder : folders) {File[] files = folder.listFiles();for (File file : files) {if (file.getName().endsWith(".png")) {final Bitmap bitmap = getBitmapFromFile(file);getActivity().runOnUiThread(new Runnable() {@Overridepublic void run() {imageCollectorView.addImage(bitmap);}});}}}} }.start();而假設(shè)使用 RxJava 。實(shí)現(xiàn)方式是這種:
Observable.from(folders).flatMap(new Func1<File, Observable<File>>() {@Overridepublic Observable<File> call(File file) {return Observable.from(file.listFiles());}}).filter(new Func1<File, Boolean>() {@Overridepublic Boolean call(File file) {return file.getName().endsWith(".png");}}).map(new Func1<File, Bitmap>() {@Overridepublic Bitmap call(File file) {return getBitmapFromFile(file);}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Bitmap>() {@Overridepublic void call(Bitmap bitmap) {imageCollectorView.addImage(bitmap);}});那位說(shuō)話了:『你這代碼明明變多了啊!
簡(jiǎn)潔個(gè)毛啊!』大兄弟你消消氣。我說(shuō)的是邏輯的簡(jiǎn)潔。不是單純的代碼量少(邏輯簡(jiǎn)潔才是提升讀寫代碼速度的必殺技對(duì)不?)。
觀察一下你會(huì)發(fā)現(xiàn), RxJava 的這個(gè)實(shí)現(xiàn),是一條從上到下的鏈?zhǔn)秸{(diào)用,沒(méi)有不論什么嵌套,這在邏輯的簡(jiǎn)潔性上是具有優(yōu)勢(shì)的。當(dāng)需求變得復(fù)雜時(shí),這種優(yōu)勢(shì)將更加明顯(試想假設(shè)還要求僅僅選取前 10 張圖片,常規(guī)方式要怎么辦?假設(shè)有很多其它這樣那樣的要求呢?再試想,在這一大堆需求實(shí)現(xiàn)完兩個(gè)月之后須要改功能,當(dāng)你翻回這里看到自己當(dāng)初寫下的那一片迷之縮進(jìn),你能保證自己將迅速看懂。而不是對(duì)著代碼又一次捋一遍思路?)。
另外。假設(shè)你的 IDE 是 Android Studio 。事實(shí)上每次打開(kāi)某個(gè) Java 文件的時(shí)候,你會(huì)看到被自己主動(dòng) Lambda 化的預(yù)覽,這將讓你更加清晰地看到程序邏輯:
Observable.from(folders).flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) }).filter((Func1) (file) -> { file.getName().endsWith(".png") }).map((Func1) (file) -> { getBitmapFromFile(file) }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });假設(shè)你習(xí)慣使用 Retrolambda ,你也能夠直接把代碼寫成上面這種簡(jiǎn)潔的形式。
而假設(shè)你看到這里還不知道什么是 Retrolambda ,我不建議你如今就去學(xué)習(xí)它。原因有兩點(diǎn):1. Lambda 是把雙刃劍,它讓你的代碼簡(jiǎn)潔的同一時(shí)候。減少了代碼的可讀性,因此同一時(shí)候?qū)W習(xí) RxJava 和 Retrolambda 可能會(huì)讓你忽略 RxJava 的一些技術(shù)細(xì)節(jié)。2. Retrolambda 是 Java 6/7 對(duì) Lambda 表達(dá)式的非官方兼容方案,它的向后兼容性和穩(wěn)定性是無(wú)法保障的,因此對(duì)于企業(yè)項(xiàng)目,使用 Retrolambda 是有風(fēng)險(xiǎn)的。所以。與非常多 RxJava 的推廣者不同,我并不推薦在學(xué)習(xí) RxJava 的同一時(shí)候一起學(xué)習(xí) Retrolambda。
事實(shí)上,我個(gè)人盡管非常贊賞 Retrolambda,但我從來(lái)不用它。
在Flipboard 的 Android 代碼中。有一段邏輯非常復(fù)雜。包括了多次內(nèi)存操作、本地文件操作和網(wǎng)絡(luò)操作,對(duì)象分分合合。線程間相互配合相互等待,一會(huì)兒排成人字,一會(huì)兒排成一字。
假設(shè)使用常規(guī)的方法來(lái)實(shí)現(xiàn),肯定是要寫得欲仙欲死,然而在使用 RxJava 的情況下。依舊僅僅是一條鏈?zhǔn)秸{(diào)用就完畢了。它非常長(zhǎng)。但非常清晰。
所以, RxJava 好在哪?就好在簡(jiǎn)潔,好在那把什么復(fù)雜邏輯都能穿成一條線的簡(jiǎn)潔。
API 介紹和原理簡(jiǎn)析
這個(gè)我就做不到一個(gè)詞說(shuō)明了……由于這一節(jié)的主要內(nèi)容就是一步步地說(shuō)明 RxJava 究竟如何做到了異步。如何做到了簡(jiǎn)潔。
1. 概念:擴(kuò)展的觀察者模式
RxJava 的異步實(shí)現(xiàn),是通過(guò)一種擴(kuò)展的觀察者模式來(lái)實(shí)現(xiàn)的。
觀察者模式
先簡(jiǎn)述一下觀察者模式,已經(jīng)熟悉的能夠跳過(guò)這一段。
觀察者模式面向的需求是:A 對(duì)象(觀察者)對(duì) B 對(duì)象(被觀察者)的某種變化高度敏感,須要在 B 變化的一瞬間做出反應(yīng)。舉個(gè)樣例。新聞里喜聞樂(lè)見(jiàn)的警察抓小偷。警察須要在小偷伸手作案的時(shí)候?qū)嵤┳ゲ丁?/span>
在這個(gè)樣例里,警察是觀察者,小偷是被觀察者,警察須要時(shí)刻盯著小偷的一舉一動(dòng),才干保證不會(huì)漏過(guò)不論什么瞬間。
程序的觀察者模式和這種真正的『觀察』略有不同,觀察者不須要時(shí)刻盯著被觀察者(比如 A 不須要每過(guò) 2ms 就檢查一次 B 的狀態(tài)),而是採(cǎi)用注冊(cè)(Register)或者稱為訂閱(Subscribe)的方式,告訴被觀察者:我須要你的某某狀態(tài),你要在它變化的時(shí)候通知我。
Android 開(kāi)發(fā)中一個(gè)比較典型的樣例是點(diǎn)擊監(jiān)聽(tīng)器 OnClickListener 。對(duì)設(shè)置 OnClickListener 來(lái)說(shuō)。 View 是被觀察者, OnClickListener 是觀察者。二者通過(guò) setOnClickListener() 方法達(dá)成訂閱關(guān)系。訂閱之后用戶點(diǎn)擊按鈕的瞬間。Android Framework 就會(huì)將點(diǎn)擊事件發(fā)送給已經(jīng)注冊(cè)的 OnClickListener 。採(cǎi)取這樣被動(dòng)的觀察方式,既省去了重復(fù)檢索狀態(tài)的資源消耗。也能夠得到最高的反饋速度。
當(dāng)然,這也得益于我們能夠隨意定制自己程序中的觀察者和被觀察者,而警察叔叔明顯無(wú)法要求小偷『你在作案的時(shí)候務(wù)必通知我』。
OnClickListener 的模式大致例如以下圖:
如圖所看到的,通過(guò) setOnClickListener() 方法,Button 持有 OnClickListener 的引用(這一過(guò)程沒(méi)有在圖上畫(huà)出)。當(dāng)用戶點(diǎn)擊時(shí),Button 自己主動(dòng)調(diào)用 OnClickListener 的 onClick() 方法。
另外。假設(shè)把這張圖中的概念抽象出來(lái)(Button -> 被觀察者、OnClickListener -> 觀察者、setOnClickListener() -> 訂閱,onClick() -> 事件)。就由專用的觀察者模式(比如僅僅用于監(jiān)聽(tīng)控件點(diǎn)擊)轉(zhuǎn)變成了通用的觀察者模式。
例如以下圖:
而 RxJava 作為一個(gè)工具庫(kù),使用的就是通用形式的觀察者模式。
RxJava 的觀察者模式
RxJava 有四個(gè)基本概念:Observable (可觀察者。即被觀察者)、 Observer (觀察者)、 subscribe (訂閱)、事件。
Observable 和 Observer 通過(guò) subscribe() 方法實(shí)現(xiàn)訂閱關(guān)系。從而 Observable 能夠在須要的時(shí)候發(fā)出事件來(lái)通知 Observer。
與傳統(tǒng)觀察者模式不同, RxJava 的事件回調(diào)方法除了普通事件 onNext() (相當(dāng)于 onClick() / onEvent())之外。還定義了兩個(gè)特殊的事件:onCompleted() 和 onError()。
- onCompleted(): 事件隊(duì)列完結(jié)。
RxJava 不僅把每一個(gè)事件單獨(dú)處理,還會(huì)把它們看做一個(gè)隊(duì)列。RxJava 規(guī)定。當(dāng)不會(huì)再有新的 onNext() 發(fā)出時(shí),須要觸發(fā) onCompleted() 方法作為標(biāo)志。
- onError(): 事件隊(duì)列異常。在事件處理過(guò)程中出異常時(shí),onError() 會(huì)被觸發(fā),同一時(shí)候隊(duì)列自己主動(dòng)終止。不同意再有事件發(fā)出。
- 在一個(gè)正確運(yùn)行的事件序列中, onCompleted() 和 onError() 有且僅僅有一個(gè)。并且是事件序列中的最后一個(gè)。須要注意的是,onCompleted() 和 onError() 二者也是相互排斥的,即在隊(duì)列中調(diào)用了當(dāng)中一個(gè)。就不應(yīng)該再調(diào)用還有一個(gè)。
RxJava 的觀察者模式大致例如以下圖:
2. 基本實(shí)現(xiàn)
基于以上的概念。 RxJava 的基本實(shí)現(xiàn)主要有三點(diǎn):
1) 創(chuàng)建 Observer
Observer 即觀察者,它決定事件觸發(fā)的時(shí)候?qū)⒂腥绾蔚男袨椤?RxJava 中的 Observer 接口的實(shí)現(xiàn)方式:
Observer<String> observer = new Observer<String>() {@Overridepublic void onNext(String s) {Log.d(tag, "Item: " + s);}@Overridepublic void onCompleted() {Log.d(tag, "Completed!");}@Overridepublic void onError(Throwable e) {Log.d(tag, "Error!");} };除了 Observer 接口之外,RxJava 還內(nèi)置了一個(gè)實(shí)現(xiàn)了 Observer 的抽象類:Subscriber。 Subscriber 對(duì) Observer 接口進(jìn)行了一些擴(kuò)展。但他們的基本使用方式是全然一樣的:
Subscriber<String> subscriber = new Subscriber<String>() {@Overridepublic void onNext(String s) {Log.d(tag, "Item: " + s);}@Overridepublic void onCompleted() {Log.d(tag, "Completed!");}@Overridepublic void onError(Throwable e) {Log.d(tag, "Error!");} };不僅基本使用方式一樣,實(shí)質(zhì)上,在 RxJava 的 subscribe 過(guò)程中,Observer 也總是會(huì)先被轉(zhuǎn)換成一個(gè) Subscriber 再使用。
所以假設(shè)你僅僅想使用基本功能,選擇 Observer 和 Subscriber 是全然一樣的。它們的差別對(duì)于使用者來(lái)說(shuō)主要有兩點(diǎn):
它會(huì)在 subscribe 剛開(kāi)始,而事件還未發(fā)送之前被調(diào)用,能夠用于做一些準(zhǔn)備工作,比如數(shù)據(jù)的清零或重置。
這是一個(gè)可選方法。默認(rèn)情況下它的實(shí)現(xiàn)為空。
須要注意的是,假設(shè)對(duì)準(zhǔn)備工作的線程有要求(比如彈出一個(gè)顯示運(yùn)行進(jìn)度的對(duì)話框。這必須在主線程運(yùn)行), onStart() 就不適用了。由于它總是在 subscribe 所發(fā)生的線程被調(diào)用,而不能指定線程。要在指定的線程來(lái)做準(zhǔn)備工作,能夠使用 doOnSubscribe() 方法,詳細(xì)能夠在后面的文中看到。
在這種方法被調(diào)用后。Subscriber 將不再接收事件。一般在這種方法調(diào)用前。能夠使用 isUnsubscribed() 先推斷一下?tīng)顟B(tài)。
unsubscribe() 這種方法非常重要。由于在 subscribe() 之后。 Observable 會(huì)持有 Subscriber 的引用,這個(gè)引用假設(shè)不能及時(shí)被釋放,將有內(nèi)存泄露的風(fēng)險(xiǎn)。所以最好保持一個(gè)原則:要在不再使用的時(shí)候盡快在合適的地方(比如 onPause() onStop() 等方法中)調(diào)用 unsubscribe() 來(lái)解除引用關(guān)系,以避免內(nèi)存泄露的發(fā)生。
2) 創(chuàng)建 Observable
Observable 即被觀察者,它決定什么時(shí)候觸發(fā)事件以及觸發(fā)如何的事件。 RxJava 使用 create() 方法來(lái)創(chuàng)建一個(gè) Observable 。并為它定義事件觸發(fā)規(guī)則:
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("Hello");subscriber.onNext("Hi");subscriber.onNext("Aloha");subscriber.onCompleted();} });能夠看到。這里傳入了一個(gè) OnSubscribe 對(duì)象作為參數(shù)。
OnSubscribe 會(huì)被存儲(chǔ)在返回的 Observable 對(duì)象中,它的作用相當(dāng)于一個(gè)計(jì)劃表,當(dāng) Observable 被訂閱的時(shí)候。OnSubscribe 的 call() 方法會(huì)自己主動(dòng)被調(diào)用,事件序列就會(huì)按照設(shè)定依次觸發(fā)(對(duì)于上面的代碼,就是觀察者Subscriber 將會(huì)被調(diào)用三次 onNext() 和一次 onCompleted())。
這樣,由被觀察者調(diào)用了觀察者的回調(diào)方法。就實(shí)現(xiàn)了由被觀察者向觀察者的事件傳遞,即觀察者模式。
這個(gè)樣例非常easy:事件的內(nèi)容是字符串。而不是一些復(fù)雜的對(duì)象;事件的內(nèi)容是已經(jīng)定好了的,而不像有的觀察者模式一樣是待確定的(比如網(wǎng)絡(luò)請(qǐng)求的結(jié)果在請(qǐng)求返回之前是未知的);全部事件在一瞬間被全部發(fā)送出去,而不是夾雜一些確定或不確定的時(shí)間間隔或者經(jīng)過(guò)某種觸發(fā)器來(lái)觸發(fā)的。總之。這個(gè)樣例看起來(lái)毫無(wú)實(shí)用價(jià)值。
但這是為了便于說(shuō)明,實(shí)質(zhì)上僅僅要你想,各種各樣的事件發(fā)送規(guī)則你都能夠自己來(lái)寫。至于詳細(xì)怎么做。后面都會(huì)講到。但如今不行。
僅僅有把基礎(chǔ)原理先說(shuō)明確了,上層的運(yùn)用才干更easy說(shuō)清晰。
create() 方法是 RxJava 最主要的創(chuàng)造事件序列的方法。
基于這種方法。 RxJava 還提供了一些方法用來(lái)快捷創(chuàng)建事件隊(duì)列,比如:
- just(T...): 將傳入的參數(shù)依次發(fā)送出來(lái)。
- from(T[]) / from(Iterable<? extends T>) : 將傳入的數(shù)組或 Iterable 拆分成詳細(xì)對(duì)象后,依次發(fā)送出來(lái)。
上面 just(T...) 的樣例和 from(T[]) 的樣例,都和之前的 create(OnSubscribe) 的樣例是等價(jià)的。
3) Subscribe (訂閱)
創(chuàng)建了 Observable 和 Observer 之后,再用 subscribe() 方法將它們聯(lián)結(jié)起來(lái),整條鏈子就能夠工作了。代碼形式非常easy:
observable.subscribe(observer); // 或者: observable.subscribe(subscriber);有人可能會(huì)注意到。 subscribe() 這種方法有點(diǎn)怪:它看起來(lái)是『observalbe 訂閱了 observer / subscriber』而不是『observer / subscriber 訂閱了 observalbe』。這看起來(lái)就像『雜志訂閱了讀者』一樣顛倒了對(duì)象關(guān)系。
這讓人讀起來(lái)有點(diǎn)別扭。只是假設(shè)把 API 設(shè)計(jì)成 observer.subscribe(observable) / subscriber.subscribe(observable) ,盡管更加符合思維邏輯,但對(duì)流式 API 的設(shè)計(jì)就造成影響了,比較起來(lái)明顯是得不償失的。
Observable.subscribe(Subscriber) 的內(nèi)部實(shí)現(xiàn)是這種(僅核心代碼):
// 注意:這不是 subscribe() 的源代碼。而是將源代碼中與性能、兼容性、擴(kuò)展性有關(guān)的代碼剔除后的核心代碼。 // 假設(shè)須要看源代碼,能夠去 RxJava 的 GitHub 倉(cāng)庫(kù)下載。 public Subscription subscribe(Subscriber subscriber) { subscriber.onStart(); onSubscribe.call(subscriber); return subscriber; }能夠看到,subscriber() 做了3件事:
在這里。事件發(fā)送的邏輯開(kāi)始運(yùn)行。從這也能夠看出。在 RxJava 中, Observable 并非在創(chuàng)建的時(shí)候就立即開(kāi)始發(fā)送事件,而是在它被訂閱的時(shí)候,即當(dāng) subscribe() 方法運(yùn)行的時(shí)候。
整個(gè)過(guò)程中對(duì)象間的關(guān)系例如以下圖:
或者能夠看動(dòng)圖:
除了 subscribe(Observer) 和 subscribe(Subscriber) 。subscribe() 還支持不完整定義的回調(diào)。RxJava 會(huì)自己主動(dòng)依據(jù)定義創(chuàng)建出 Subscriber 。形式例如以下:
Action1<String> onNextAction = new Action1<String>() {// onNext()@Overridepublic void call(String s) {Log.d(tag, s);} }; Action1<Throwable> onErrorAction = new Action1<Throwable>() {// onError()@Overridepublic void call(Throwable throwable) {// Error handling} }; Action0 onCompletedAction = new Action0() {// onCompleted()@Overridepublic void call() {Log.d(tag, "completed");} };// 自己主動(dòng)創(chuàng)建 Subscriber ,并使用 onNextAction 來(lái)定義 onNext() observable.subscribe(onNextAction); // 自己主動(dòng)創(chuàng)建 Subscriber ,并使用 onNextAction 和 onErrorAction 來(lái)定義 onNext() 和 onError() observable.subscribe(onNextAction, onErrorAction); // 自己主動(dòng)創(chuàng)建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 來(lái)定義 onNext()、 onError() 和 onCompleted() observable.subscribe(onNextAction, onErrorAction, onCompletedAction);簡(jiǎn)單解釋一下這段代碼中出現(xiàn)的 Action1 和 Action0。 Action0 是 RxJava 的一個(gè)接口,它僅僅有一個(gè)方法 call(),這種方法是無(wú)參無(wú)返回值的。由于 onCompleted() 方法也是無(wú)參無(wú)返回值的,因此 Action0 能夠被當(dāng)成一個(gè)包裝對(duì)象。將 onCompleted() 的內(nèi)容打包起來(lái)將自己作為一個(gè)參數(shù)傳入 subscribe() 以實(shí)現(xiàn)不完整定義的回調(diào)。這樣事實(shí)上也能夠看做將 onCompleted() 方法作為參數(shù)傳進(jìn)了 subscribe(),相當(dāng)于其它某些語(yǔ)言中的『閉包』。
Action1 也是一個(gè)接口,它相同僅僅有一個(gè)方法 call(T param),這種方法也無(wú)返回值,但有一個(gè)參數(shù);與 Action0 同理。由于 onNext(T obj) 和 onError(Throwable error) 也是單參數(shù)無(wú)返回值的。因此 Action1 能夠?qū)? onNext(obj) 和 onError(error) 打包起來(lái)傳入 subscribe() 以實(shí)現(xiàn)不完整定義的回調(diào)。事實(shí)上,盡管 Action0 和 Action1 在 API 中使用最廣泛。但 RxJava 是提供了多個(gè) ActionX 形式的接口 (比如 Action2, Action3) 的,它們能夠被用以包裝不同的無(wú)返回值的方法。
注:正如前面所提到的。Observer 和 Subscriber 具有相同的角色,并且 Observer 在 subscribe() 過(guò)程中最終會(huì)被轉(zhuǎn)換成 Subscriber 對(duì)象。因此,從這里開(kāi)始,后面的描寫敘述我將用 Subscriber 來(lái)取代 Observer 。這樣更加嚴(yán)謹(jǐn)。
4) 場(chǎng)景演示樣例
以下舉兩個(gè)樣例:
為了把原理用更清晰的方式表述出來(lái)。本文中挑選的都是功能盡可能簡(jiǎn)單的樣例。以至于有些演示樣例代碼看起來(lái)會(huì)有『畫(huà)蛇添足』『明明不用 RxJava 能夠更簡(jiǎn)便地解決這個(gè)問(wèn)題』的感覺(jué)。當(dāng)你看到這種情況,不要認(rèn)為是由于 RxJava 太啰嗦,而是由于在過(guò)早的時(shí)候舉出真實(shí)場(chǎng)景的樣例并不利于原理的解析,因此我刻意挑選了簡(jiǎn)單的情景。
a. 打印字符串?dāng)?shù)組
將字符串?dāng)?shù)組 names 中的全部字符串依次打印出來(lái):
String[] names = ...; Observable.from(names).subscribe(new Action1<String>() {@Overridepublic void call(String name) {Log.d(tag, name);}});b. 由 id 取得圖片并顯示
由指定的一個(gè) drawable 文件 id drawableRes 取得圖片,并顯示在 ImageView 中。并在出現(xiàn)異常的時(shí)候打印 Toast 報(bào)錯(cuò):
int drawableRes = ...; ImageView imageView = ...; Observable.create(new OnSubscribe<Drawable>() {@Overridepublic void call(Subscriber<? super Drawable> subscriber) { Drawable drawable = getTheme().getDrawable(drawableRes)); subscriber.onNext(drawable); subscriber.onCompleted(); } }).subscribe(new Observer<Drawable>() { @Override public void onNext(Drawable drawable) { imageView.setImageDrawable(drawable); } @Override public void onCompleted() { } @Override public void onError(Throwable e) { Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show(); } });正如上面兩個(gè)樣例這樣。創(chuàng)建出 Observable 和 Subscriber ,再用 subscribe() 將它們串起來(lái),一次 RxJava 的基本使用就完畢了。非常easy。
然而,
在 RxJava 的默認(rèn)規(guī)則中。事件的發(fā)出和消費(fèi)都是在同一個(gè)線程的。也就是說(shuō)。假設(shè)僅僅用上面的方法,實(shí)現(xiàn)出來(lái)的僅僅是一個(gè)同步的觀察者模式。觀察者模式本身的目的就是『后臺(tái)處理,前臺(tái)回調(diào)』的異步機(jī)制,因此異步對(duì)于 RxJava 是至關(guān)重要的。而要實(shí)現(xiàn)異步。則須要用到 RxJava 的還有一個(gè)概念: Scheduler 。
3. 線程控制 —— Scheduler (一)
在不指定線程的情況下, RxJava 遵循的是線程不變的原則,即:在哪個(gè)線程調(diào)用 subscribe()。就在哪個(gè)線程生產(chǎn)事件;在哪個(gè)線程生產(chǎn)事件,就在哪個(gè)線程消費(fèi)事件。
假設(shè)須要切換線程,就須要用到 Scheduler (調(diào)度器)。
1) Scheduler 的 API (一)
在RxJava 中,Scheduler ——調(diào)度器,相當(dāng)于線程控制器,RxJava 通過(guò)它來(lái)指定每一段代碼應(yīng)該運(yùn)行在什么樣的線程。
RxJava 已經(jīng)內(nèi)置了幾個(gè) Scheduler ,它們已經(jīng)適合大多數(shù)的使用場(chǎng)景:
- Schedulers.immediate(): 直接在當(dāng)前線程運(yùn)行,相當(dāng)于不指定線程。這是默認(rèn)的 Scheduler。
- Schedulers.newThread(): 總是啟用新線程。并在新線程運(yùn)行操作。
- Schedulers.io(): I/O 操作(讀寫文件、讀寫數(shù)據(jù)庫(kù)、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler。
行為模式和 newThread() 幾乎相同。差別在于 io() 的內(nèi)部實(shí)現(xiàn)是是用一個(gè)無(wú)數(shù)量上限的線程池,能夠重用空暇的線程,因此多數(shù)情況下 io() 比 newThread() 更有效率。
不要把計(jì)算工作放在 io() 中,能夠避免創(chuàng)建不必要的線程。
- Schedulers.computation(): 計(jì)算所使用的 Scheduler。
這個(gè)計(jì)算指的是 CPU 密集型計(jì)算。即不會(huì)被 I/O 等操作限制性能的操作,比如圖形的計(jì)算。這個(gè) Scheduler 使用的固定的線程池,大小為 CPU 核數(shù)。
不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時(shí)間會(huì)浪費(fèi) CPU。
- 另外。 Android 還有一個(gè)專用的 AndroidSchedulers.mainThread()。它指定的操作將在 Android 主線程運(yùn)行。
有了這幾個(gè) Scheduler ,就能夠使用 subscribeOn() 和 observeOn() 兩個(gè)方法來(lái)對(duì)線程進(jìn)行控制了。* subscribeOn(): 指定 subscribe() 所發(fā)生的線程。即 Observable.OnSubscribe 被激活時(shí)所處的線程。或者叫做事件產(chǎn)生的線程。
* observeOn(): 指定 Subscriber 所運(yùn)行在的線程。
或者叫做事件消費(fèi)的線程。
文字?jǐn)⑹隹倸w難理解,上代碼:
Observable.just(1, 2, 3, 4).subscribeOn(Schedulers.io()) // 指定 subscribe() 發(fā)生在 IO 線程.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調(diào)發(fā)生在主線程.subscribe(new Action1<Integer>() {@Overridepublic void call(Integer number) {Log.d(tag, "number:" + number);}});上面這段代碼中。由于 subscribeOn(Schedulers.io()) 的指定,被創(chuàng)建的事件的內(nèi)容 1、2、3、4 將會(huì)在 IO 線程發(fā)出。而由于 observeOn(AndroidScheculers.mainThread()) 的指定,因此 subscriber 數(shù)字的打印將發(fā)生在主線程 。
事實(shí)上,這種在 subscribe() 之前寫上兩句 subscribeOn(Scheduler.io()) 和 observeOn(AndroidSchedulers.mainThread()) 的使用方式非經(jīng)常見(jiàn),它適用于多數(shù)的 『后臺(tái)線程取數(shù)據(jù),主線程顯示』的程序策略。
而前面提到的由圖片 id 取得圖片并顯示的樣例。假設(shè)也加上這兩句:
int drawableRes = ...; ImageView imageView = ...; Observable.create(new OnSubscribe<Drawable>() {@Overridepublic void call(Subscriber<? super Drawable> subscriber) { Drawable drawable = getTheme().getDrawable(drawableRes)); subscriber.onNext(drawable); subscriber.onCompleted(); } }) .subscribeOn(Schedulers.io()) // 指定 subscribe() 發(fā)生在 IO 線程 .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調(diào)發(fā)生在主線程 .subscribe(new Observer<Drawable>() { @Override public void onNext(Drawable drawable) { imageView.setImageDrawable(drawable); } @Override public void onCompleted() { } @Override public void onError(Throwable e) { Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show(); } });那么,載入圖片將會(huì)發(fā)生在 IO 線程,而設(shè)置圖片則被設(shè)定在了主線程。這就意味著,即使載入圖片耗費(fèi)了幾十甚至幾百毫秒的時(shí)間,也不會(huì)造成絲毫界面的卡頓。
2) Scheduler 的原理 (一)
RxJava 的 Scheduler API 非常方便,也非常奇妙(加了一句話就把線程切換了,怎么做到的?并且 subscribe() 不是最外層直接調(diào)用的方法嗎,它居然也能被指定線程?)。然而 Scheduler 的原理須要放在后面講。由于它的原理是以下一節(jié)《變換》的原理作為基礎(chǔ)的。
好吧這一節(jié)事實(shí)上我屁也沒(méi)說(shuō),僅僅是為了讓你安心,讓你知道我不是忘了講原理,而是把它放在了更合適的地方。
4. 變換
最終要到牛逼的地方了,無(wú)論你激動(dòng)不激動(dòng),反正我是激動(dòng)了。
RxJava 提供了對(duì)事件序列進(jìn)行變換的支持。這是它的核心功能之中的一個(gè),也是大多數(shù)人說(shuō)『RxJava 真是太好用了』的最大原因。
所謂變換,就是將事件序列中的對(duì)象或整個(gè)序列進(jìn)行加工處理,轉(zhuǎn)換成不同的事件或事件序列。概念說(shuō)著總是模糊難懂的,來(lái)看 API。
1) API
首先看一個(gè) map() 的樣例:
Observable.just("images/logo.png") // 輸入類型 String.map(new Func1<String, Bitmap>() {@Overridepublic Bitmap call(String filePath) { // 參數(shù)類型 Stringreturn getBitmapFromPath(filePath); // 返回類型 Bitmap}}).subscribe(new Action1<Bitmap>() {@Overridepublic void call(Bitmap bitmap) { // 參數(shù)類型 BitmapshowBitmap(bitmap);}});這里出現(xiàn)了一個(gè)叫做 Func1 的類。它和 Action1 非常類似,也是 RxJava 的一個(gè)接口,用于包裝含有一個(gè)參數(shù)的方法。 Func1 和 Action 的差別在于, Func1 包裝的是有返回值的方法。
另外,和 ActionX 一樣。 FuncX 也有多個(gè)。用于不同參數(shù)個(gè)數(shù)的方法。FuncX 和 ActionX 的差別在 FuncX 包裝的是有返回值的方法。
能夠看到,map() 方法將參數(shù)中的 String 對(duì)象轉(zhuǎn)換成一個(gè) Bitmap 對(duì)象后返回,而在經(jīng)過(guò) map() 方法后,事件的參數(shù)類型也由 String 轉(zhuǎn)為了 Bitmap。
這種直接變換對(duì)象并返回的。是最常見(jiàn)的也最easy理解的變換。只是 RxJava 的變換遠(yuǎn)不止這樣。它不僅能夠針對(duì)事件對(duì)象,還能夠針對(duì)整個(gè)事件隊(duì)列,這使得 RxJava 變得非常靈活。我列舉幾個(gè)經(jīng)常使用的變換:
-
map(): 事件對(duì)象的直接變換,詳細(xì)功能上面已經(jīng)介紹過(guò)。它是 RxJava 最經(jīng)常使用的變換。
map() 的示意圖:
-
flatMap(): 這是一個(gè)非常實(shí)用但非常難理解的變換,因此我決定花多些篇幅來(lái)介紹它。
首先假設(shè)這么一種需求:假設(shè)有一個(gè)數(shù)據(jù)結(jié)構(gòu)『學(xué)生』。如今須要打印出一組學(xué)生的名字。實(shí)現(xiàn)方式非常easy:
非常easy。那么再假設(shè):假設(shè)要打印出每一個(gè)學(xué)生所須要修的全部課程的名稱呢?(需求的差別在于。每一個(gè)學(xué)生僅僅有一個(gè)名字,但卻有多個(gè)課程。)首先能夠這樣實(shí)現(xiàn):
Student[] students = ...; Subscriber<Student> subscriber = new Subscriber<Student>() {@Overridepublic void onNext(Student student) {List<Course> courses = student.getCourses();for (int i = 0; i < courses.size(); i++) {Course course = courses.get(i);Log.d(tag, course.getName());}}... }; Observable.from(students).subscribe(subscriber);依舊非常easy。
那么假設(shè)我不想在 Subscriber 中使用 for 循環(huán)。而是希望 Subscriber 中直接傳入單個(gè)的 Course 對(duì)象呢(這對(duì)于代碼復(fù)用非常重要)?用 map() 顯然是不行的,由于 map() 是一對(duì)一的轉(zhuǎn)化,而我如今的要求是一對(duì)多的轉(zhuǎn)化。那怎么才干把一個(gè) Student 轉(zhuǎn)化成多個(gè) Course 呢?
這個(gè)時(shí)候,就須要用 flatMap() 了:
Student[] students = ...; Subscriber<Course> subscriber = new Subscriber<Course>() {@Overridepublic void onNext(Course course) {Log.d(tag, course.getName());}... }; Observable.from(students).flatMap(new Func1<Student, Observable<Course>>() {@Overridepublic Observable<Course> call(Student student) {return Observable.from(student.getCourses());}}).subscribe(subscriber);從上面的代碼能夠看出。 flatMap() 和 map() 有一個(gè)相同點(diǎn):它也是把傳入的參數(shù)轉(zhuǎn)化之后返回還有一個(gè)對(duì)象。但須要注意,和 map() 不同的是。 flatMap() 中返回的是個(gè) Observable 對(duì)象,并且這個(gè) Observable 對(duì)象并非被直接發(fā)送到了 Subscriber 的回調(diào)方法中。 flatMap() 的原理是這種:1. 使用傳入的事件對(duì)象創(chuàng)建一個(gè) Observable 對(duì)象;2. 并不發(fā)送這個(gè) Observable, 而是將它激活,于是它開(kāi)始發(fā)送事件;3. 每一個(gè)創(chuàng)建出來(lái)的 Observable 發(fā)送的事件,都被匯入同一個(gè) Observable ,而這個(gè) Observable 負(fù)責(zé)將這些事件統(tǒng)一交給 Subscriber 的回調(diào)方法。這三個(gè)步驟,把事件拆成了兩級(jí),通過(guò)一組新創(chuàng)建的 Observable 將初始的對(duì)象『鋪平』之后通過(guò)統(tǒng)一路徑分發(fā)了下去。而這個(gè)『鋪平』就是 flatMap() 所謂的 flat。
flatMap() 示意圖:
擴(kuò)展:由于能夠在嵌套的 Observable 中加入異步代碼。 flatMap() 也經(jīng)常使用于嵌套的異步操作,比如嵌套的網(wǎng)絡(luò)請(qǐng)求。演示樣例代碼(Retrofit + RxJava):
networkClient.token() // 返回 Observable<String>,在訂閱時(shí)請(qǐng)求 token。并在響應(yīng)后發(fā)送 token.flatMap(new Func1<String, Observable<Messages>>() {@Overridepublic Observable<Messages> call(String token) {// 返回 Observable<Messages>,在訂閱時(shí)請(qǐng)求消息列表,并在響應(yīng)后發(fā)送請(qǐng)求到的消息列表return networkClient.messages();}}).subscribe(new Action1<Messages>() {@Overridepublic void call(Messages messages) {// 處理顯示消息列表showMessages(messages);}});傳統(tǒng)的嵌套請(qǐng)求須要使用嵌套的 Callback 來(lái)實(shí)現(xiàn)。而通過(guò) flatMap() ,能夠把嵌套的請(qǐng)求寫在一條鏈中,從而保持程序邏輯的清晰。
- throttleFirst(): 在每次事件觸發(fā)后的一定時(shí)間間隔內(nèi)丟棄新的事件。經(jīng)常使用作去抖動(dòng)過(guò)濾。比如按鈕的點(diǎn)擊監(jiān)聽(tīng)器:RxView.clickEvents(button) // RxBinding 代碼,后面的文章有解釋.throttleFirst(500, TimeUnit.MILLISECONDS) // 設(shè)置防抖間隔為 500ms.subscribe(subscriber);媽媽再也不怕我的用戶手抖點(diǎn)開(kāi)兩個(gè)重復(fù)的界面啦。
此外。 RxJava 還提供非常多便捷的方法來(lái)實(shí)現(xiàn)事件序列的變換。這里就不一一舉例了。
2) 變換的原理:lift()
這些變換盡管功能各有不同,但實(shí)質(zhì)上都是針對(duì)事件序列的處理和再發(fā)送。
而在 RxJava 的內(nèi)部。它們是基于同一個(gè)基礎(chǔ)的變換方法: lift(Operator)。
首先看一下 lift() 的內(nèi)部實(shí)現(xiàn)(僅核心代碼):
// 注意:這不是 lift() 的源代碼。而是將源代碼中與性能、兼容性、擴(kuò)展性有關(guān)的代碼剔除后的核心代碼。 // 假設(shè)須要看源代碼,能夠去 RxJava 的 GitHub 倉(cāng)庫(kù)下載。 public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) { return Observable.create(new OnSubscribe<R>() { @Override public void call(Subscriber subscriber) { Subscriber newSubscriber = operator.call(subscriber); newSubscriber.onStart(); onSubscribe.call(newSubscriber); } }); }這段代碼非常有意思:它生成了一個(gè)新的 Observable 并返回,并且創(chuàng)建新 Observable 所用的參數(shù) OnSubscribe 的回調(diào)方法 call() 中的實(shí)現(xiàn)居然看起來(lái)和前面講過(guò)的 Observable.subscribe() 一樣。然而它們并不一樣喲~不一樣的地方關(guān)鍵就在于第二行 onSubscribe.call(subscriber) 中的 onSubscribe 所指代的對(duì)象不同(高能預(yù)警:接下來(lái)的幾句話可能會(huì)導(dǎo)致身體的嚴(yán)重不適)——
- subscribe() 中這句話的 onSubscribe 指的是 Observable 中的 onSubscribe 對(duì)象,這個(gè)沒(méi)有問(wèn)題。但是 lift() 之后的情況就復(fù)雜了點(diǎn)。
- 當(dāng)含有 lift() 時(shí):
1.lift() 創(chuàng)建了一個(gè) Observable 后,加上之前的原始 Observable,已經(jīng)有兩個(gè) Observable 了;
2.而相同地,新 Observable 里的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe,也就有了兩個(gè) OnSubscribe;
3.當(dāng)用戶調(diào)用經(jīng)過(guò) lift() 后的 Observable 的 subscribe() 的時(shí)候,使用的是 lift() 所返回的新的 Observable ,于是它所觸發(fā)的 onSubscribe.call(subscriber)。也是用的新 Observable 中的新 OnSubscribe。即在 lift() 中生成的那個(gè) OnSubscribe;
4.而這個(gè)新 OnSubscribe 的 call() 方法中的 onSubscribe ,就是指的原始 Observable 中的原始 OnSubscribe ,在這個(gè) call() 方法里。新 OnSubscribe 利用 operator.call(subscriber) 生成了一個(gè)新的 Subscriber(Operator 就是在這里,通過(guò)自己的 call() 方法將新 Subscriber 和原始 Subscriber 進(jìn)行關(guān)聯(lián)。并插入自己的『變換』代碼以實(shí)現(xiàn)變換)。然后利用這個(gè)新 Subscriber 向原始 Observable 進(jìn)行訂閱。
這樣就實(shí)現(xiàn)了 lift() 過(guò)程,有點(diǎn)像一種代理機(jī)制,通過(guò)事件攔截和處理實(shí)現(xiàn)事件序列的變換。
精簡(jiǎn)掉細(xì)節(jié)的話,也能夠這么說(shuō):在 Observable 運(yùn)行了 lift(Operator) 方法之后,會(huì)返回一個(gè)新的 Observable,這個(gè)新的 Observable 會(huì)像一個(gè)代理一樣。負(fù)責(zé)接收原始的 Observable 發(fā)出的事件,并在處理后發(fā)送給 Subscriber。
假設(shè)你更喜歡具象思維,能夠看圖:
或者能夠看動(dòng)圖:
兩次和多次的 lift() 同理,例如以下圖:
舉一個(gè)詳細(xì)的 Operator 的實(shí)現(xiàn)。以下這是一個(gè)將事件中的 Integer 對(duì)象轉(zhuǎn)換成 String 的樣例,僅供參考:
observable.lift(new Observable.Operator<String, Integer>() {@Overridepublic Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) { // 將事件序列中的 Integer 對(duì)象轉(zhuǎn)換為 String 對(duì)象 return new Subscriber<Integer>() { @Override public void onNext(Integer integer) { subscriber.onNext("" + integer); } @Override public void onCompleted() { subscriber.onCompleted(); } @Override public void onError(Throwable e) { subscriber.onError(e); } }; } });講述 lift() 的原理僅僅是為了讓你更好地了解 RxJava 。從而能夠更好地使用它。然而無(wú)論你是否理解了 lift() 的原理,RxJava 都不建議開(kāi)發(fā)人員自己定義 Operator 來(lái)直接使用 lift(),而是建議盡量使用已有的 lift() 包裝方法(如 map() flatMap() 等)進(jìn)行組合來(lái)實(shí)現(xiàn)需求,由于直接使用 lift() 非常easy發(fā)生一些難以發(fā)現(xiàn)的錯(cuò)誤。
3) compose: 對(duì) Observable 總體的變換
除了 lift() 之外, Observable 還有一個(gè)變換方法叫做 compose(Transformer)。它和 lift() 的差別在于。 lift() 是針對(duì)事件項(xiàng)和事件序列的,而 compose() 是針對(duì) Observable 自身進(jìn)行變換。
舉個(gè)樣例,假設(shè)在程序中有多個(gè) Observable ,并且他們都須要應(yīng)用一組相同的 lift() 變換。你能夠這么寫:
observable1.lift1().lift2().lift3().lift4().subscribe(subscriber1); observable2.lift1().lift2().lift3().lift4().subscribe(subscriber2); observable3.lift1().lift2().lift3().lift4().subscribe(subscriber3); observable4.lift1().lift2().lift3().lift4().subscribe(subscriber1);你認(rèn)為這樣太不軟件project了,于是你改成了這樣:
private Observable liftAll(Observable observable) {return observable.lift1().lift2().lift3().lift4(); } ... liftAll(observable1).subscribe(subscriber1); liftAll(observable2).subscribe(subscriber2); liftAll(observable3).subscribe(subscriber3); liftAll(observable4).subscribe(subscriber4);可讀性、可維護(hù)性都提高了。但是 Observable 被一個(gè)方法包起來(lái),這種方式對(duì)于 Observale 的靈活性似乎還是增添了那么點(diǎn)限制。
怎么辦?這個(gè)時(shí)候。就應(yīng)該用 compose() 來(lái)攻克了:
public class LiftAllTransformer implements Observable.Transformer<Integer, String> {@Overridepublic Observable<String> call(Observable<Integer> observable) {return observable.lift1().lift2().lift3().lift4();} } ... Transformer liftAll = new LiftAllTransformer(); observable1.compose(liftAll).subscribe(subscriber1); observable2.compose(liftAll).subscribe(subscriber2); observable3.compose(liftAll).subscribe(subscriber3); observable4.compose(liftAll).subscribe(subscriber4);像上面這樣,使用 compose() 方法,Observable 能夠利用傳入的 Transformer 對(duì)象的 call 方法直接對(duì)自身進(jìn)行處理,也就不必被包在方法的里面了。
compose() 的原理比較簡(jiǎn)單,不附圖嘍。
5. 線程控制:Scheduler (二)
除了靈活的變換,RxJava 還有一個(gè)牛逼的地方,就是線程的自由控制。
1) Scheduler 的 API (二)
前面講到了,能夠利用 subscribeOn() 結(jié)合 observeOn() 來(lái)實(shí)現(xiàn)線程控制。讓事件的產(chǎn)生和消費(fèi)發(fā)生在不同的線程。
但是在了解了 map() flatMap() 等變換方法后。有些好事的(事實(shí)上就是當(dāng)初剛接觸 RxJava 時(shí)的我)就問(wèn)了:能不能多切換幾次線程?
答案是:能。
由于 observeOn() 指定的是 Subscriber 的線程。而這個(gè) Subscriber 并非(嚴(yán)格說(shuō)應(yīng)該為『不一定是』。但這里最好還是理解為『不是』)subscribe() 參數(shù)中的 Subscriber 。而是 observeOn() 運(yùn)行時(shí)的當(dāng)前 Observable 所相應(yīng)的 Subscriber 。即它的直接下級(jí) Subscriber 。換句話說(shuō)。observeOn() 指定的是它之后的操作所在的線程。
因此假設(shè)有多次切換線程的需求,僅僅要在每一個(gè)想要切換線程的位置調(diào)用一次 observeOn() 就可以。上代碼:
Observable.just(1, 2, 3, 4) // IO 線程,由 subscribeOn() 指定.subscribeOn(Schedulers.io()).observeOn(Schedulers.newThread()).map(mapOperator) // 新線程。由 observeOn() 指定.observeOn(Schedulers.io()).map(mapOperator2) // IO 線程,由 observeOn() 指定.observeOn(AndroidSchedulers.mainThread) .subscribe(subscriber); // Android 主線程,由 observeOn() 指定如上,通過(guò) observeOn() 的多次調(diào)用,程序?qū)崿F(xiàn)了線程的多次切換。
只是,不同于 observeOn() , subscribeOn() 的位置放在哪里都能夠,但它是僅僅能調(diào)用一次的。
又有好事的(事實(shí)上還是當(dāng)初的我)問(wèn)了:假設(shè)我非要調(diào)用多次 subscribeOn() 呢?會(huì)有什么效果?
這個(gè)問(wèn)題先放著,我們還是從 RxJava 線程控制的原理說(shuō)起吧。
2) Scheduler 的原理(二)
事實(shí)上。 subscribeOn() 和 observeOn() 的內(nèi)部實(shí)現(xiàn),也是用的 lift()。
詳細(xì)看圖(不同顏色的箭頭表示不同的線程):
subscribeOn() 原理圖:
observeOn() 原理圖:
從圖中能夠看出,subscribeOn() 和 observeOn() 都做了線程切換的工作(圖中的 "schedule..." 部位)。
不同的是。 subscribeOn() 的線程切換發(fā)生在 OnSubscribe 中,即在它通知上一級(jí) OnSubscribe 時(shí),這時(shí)事件還沒(méi)有開(kāi)始發(fā)送。因此 subscribeOn() 的線程控制能夠從事件發(fā)出的開(kāi)端就造成影響;而 observeOn() 的線程切換則發(fā)生在它內(nèi)建的 Subscriber 中,即發(fā)生在它即將給下一級(jí) Subscriber 發(fā)送事件時(shí),因此 observeOn() 控制的是它后面的線程。
最后,我用一張圖來(lái)解釋當(dāng)多個(gè) subscribeOn() 和 observeOn() 混合使用時(shí)。線程調(diào)度是怎么發(fā)生的(由于圖中對(duì)象較多。相對(duì)于上面的圖對(duì)結(jié)構(gòu)做了一些簡(jiǎn)化調(diào)整):
圖中共同擁有 5 處含有對(duì)事件的操作。由圖中能夠看出。①和②兩處受第一個(gè) subscribeOn() 影響。運(yùn)行在紅色線程;③和④處受第一個(gè) observeOn() 的影響。運(yùn)行在綠色線程;⑤處受第二個(gè) onserveOn() 影響,運(yùn)行在紫色線程。而第二個(gè) subscribeOn() ,由于在通知過(guò)程中線程就被第一個(gè) subscribeOn() 截?cái)?#xff0c;因此對(duì)整個(gè)流程并沒(méi)有不論什么影響。這里也就回答了前面的問(wèn)題:當(dāng)使用了多個(gè) subscribeOn() 的時(shí)候,僅僅有第一個(gè) subscribeOn() 起作用。
3) 延伸:doOnSubscribe()
然而,盡管超過(guò)一個(gè)的 subscribeOn() 對(duì)事件處理的流程沒(méi)有影響。但在流程之前卻是能夠利用的。
在前面講 Subscriber 的時(shí)候。提到過(guò) Subscriber 的 onStart() 能夠用作流程開(kāi)始前的初始化。然而 onStart() 由于在 subscribe() 發(fā)生時(shí)就被調(diào)用了。因此不能指定線程。而是僅僅能運(yùn)行在 subscribe() 被調(diào)用時(shí)的線程。
這就導(dǎo)致假設(shè) onStart() 中含有對(duì)線程有要求的代碼(比如在界面上顯示一個(gè) ProgressBar,這必須在主線程運(yùn)行)。將會(huì)有線程非法的風(fēng)險(xiǎn)。由于有時(shí)你無(wú)法預(yù)測(cè) subscribe() 將會(huì)在什么線程運(yùn)行。
而與 Subscriber.onStart() 相相應(yīng)的。有一個(gè)方法 Observable.doOnSubscribe() 。它和 Subscriber.onStart() 相同是在 subscribe() 調(diào)用后并且在事件發(fā)送前運(yùn)行,但差別在于它能夠指定線程。
默認(rèn)情況下。 doOnSubscribe() 運(yùn)行在 subscribe() 發(fā)生的線程;而假設(shè)在 doOnSubscribe() 之后有 subscribeOn() 的話,它將運(yùn)行在離它近期的 subscribeOn() 所指定的線程。
演示樣例代碼:
Observable.create(onSubscribe).subscribeOn(Schedulers.io()).doOnSubscribe(new Action0() {@Overridepublic void call() {progressBar.setVisibility(View.VISIBLE); // 須要在主線程運(yùn)行}}).subscribeOn(AndroidSchedulers.mainThread()) // 指定主線程.observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);如上,在 doOnSubscribe()的后面跟一個(gè) subscribeOn() 。就能指定準(zhǔn)備工作的線程了。
RxJava 的適用場(chǎng)景和使用方式
1. 與 Retrofit 的結(jié)合
Retrofit 是 Square 的一個(gè)著名的網(wǎng)絡(luò)請(qǐng)求庫(kù)。
沒(méi)實(shí)用過(guò) Retrofit 的能夠選擇跳過(guò)這一小節(jié)也沒(méi)關(guān)系,我舉的每種場(chǎng)景都僅僅是個(gè)樣例。并且樣例之間并無(wú)前后關(guān)聯(lián),僅僅是個(gè)拋磚引玉的作用,所以你跳過(guò)這里看別的場(chǎng)景也能夠的。
Retrofit 除了提供了傳統(tǒng)的 Callback 形式的 API,還有 RxJava 版本號(hào)的 Observable 形式 API。以下我用對(duì)照的方式來(lái)介紹 Retrofit 的 RxJava 版 API 和傳統(tǒng)版本號(hào)的差別。
以獲取一個(gè) User 對(duì)象的接口作為樣例。使用Retrofit 的傳統(tǒng) API,你能夠用這種方式來(lái)定義請(qǐng)求:
@GET("/user") public void getUser(@Query("userId") String userId, Callback<User> callback);在程序的構(gòu)建過(guò)程中。 Retrofit 會(huì)把自己主動(dòng)把方法實(shí)現(xiàn)并生成代碼。然后開(kāi)發(fā)人員就能夠利用以下的方法來(lái)獲取特定用戶并處理響應(yīng):
getUser(userId, new Callback<User>() {@Overridepublic void success(User user) {userView.setUser(user);}@Overridepublic void failure(RetrofitError error) {// Error handling...} };而使用 RxJava 形式的 API。定義相同的請(qǐng)求是這種:
@GET("/user") public Observable<User> getUser(@Query("userId") String userId);使用的時(shí)候是這種:
getUser(userId).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<User>() {@Overridepublic void onNext(User user) {userView.setUser(user);}@Overridepublic void onCompleted() {}@Overridepublic void onError(Throwable error) {// Error handling...}});看到差別了嗎?
當(dāng) RxJava 形式的時(shí)候,Retrofit 把請(qǐng)求封裝進(jìn) Observable ,在請(qǐng)求結(jié)束后調(diào)用 onNext() 或在請(qǐng)求失敗后調(diào)用 onError()。
對(duì)照來(lái)看, Callback 形式和 Observable 形式長(zhǎng)得不太一樣,但本質(zhì)都幾乎相同。并且在細(xì)節(jié)上 Observable 形式似乎還比 Callback 形式要差點(diǎn)。那 Retrofit 為什么還要提供 RxJava 的支持呢?
由于它好用啊!
從這個(gè)樣例看不出來(lái)是由于這僅僅是最簡(jiǎn)單的情況。
而一旦情景復(fù)雜起來(lái), Callback 形式立即就會(huì)開(kāi)始讓人頭疼。比方:
假設(shè)這么一種情況:你的程序取到的 User 并不應(yīng)該直接顯示,而是須要先與數(shù)據(jù)庫(kù)中的數(shù)據(jù)進(jìn)行比對(duì)和修正后再顯示。
使用 Callback 方式大概能夠這么寫:
getUser(userId, new Callback<User>() {@Overridepublic void success(User user) {processUser(user); // 嘗試修正 User 數(shù)據(jù)userView.setUser(user);}@Overridepublic void failure(RetrofitError error) {// Error handling...} };有問(wèn)題嗎?
非常簡(jiǎn)便。但不要這樣做。為什么?由于這樣做會(huì)影響性能。數(shù)據(jù)庫(kù)的操作非常重。一次讀寫操作花費(fèi) 10~20ms 是非經(jīng)常見(jiàn)的。這種耗時(shí)非常easy造成界面的卡頓。
所以通常情況下,假設(shè)能夠的話一定要避免在主線程中處理數(shù)據(jù)庫(kù)。
所以為了提升性能。這段代碼能夠優(yōu)化一下:
getUser(userId, new Callback<User>() {@Overridepublic void success(User user) {new Thread() {@Overridepublic void run() {processUser(user); // 嘗試修正 User 數(shù)據(jù)runOnUiThread(new Runnable() { // 切回 UI 線程@Overridepublic void run() {userView.setUser(user);}});}).start();}@Overridepublic void failure(RetrofitError error) {// Error handling...} };性能問(wèn)題解決,但……這代碼實(shí)在是太亂了,迷之縮進(jìn)啊!雜亂的代碼往往不僅僅是美觀問(wèn)題,由于代碼越亂往往就越難讀懂,而假設(shè)項(xiàng)目中充斥著雜亂的代碼,無(wú)疑會(huì)減少代碼的可讀性,造成團(tuán)隊(duì)開(kāi)發(fā)效率的減少和出錯(cuò)率的升高。
這時(shí)候,假設(shè)用 RxJava 的形式,就好辦多了。 RxJava 形式的代碼是這種:
getUser(userId).doOnNext(new Action1<User>() {@Overridepublic void call(User user) {processUser(user);}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<User>() {@Overridepublic void onNext(User user) {userView.setUser(user);}@Overridepublic void onCompleted() {}@Overridepublic void onError(Throwable error) {// Error handling...}});后臺(tái)代碼和前臺(tái)代碼全都寫在一條鏈中,明顯清晰了非常多。
再舉一個(gè)樣例:假設(shè) /user 接口并不能直接訪問(wèn),而須要填入一個(gè)在線獲取的 token ,代碼應(yīng)該怎么寫?
Callback 方式。能夠使用嵌套的 Callback:
@GET("/token") public void getToken(Callback<String> callback);@GET("/user") public void getUser(@Query("token") String token, @Query("userId") String userId, Callback<User> callback);...getToken(new Callback<String>() {@Overridepublic void success(String token) {getUser(token, userId, new Callback<User>() {@Overridepublic void success(User user) {userView.setUser(user);}@Overridepublic void failure(RetrofitError error) {// Error handling...}};}@Overridepublic void failure(RetrofitError error) {// Error handling...} });倒是沒(méi)有什么性能問(wèn)題,但是迷之縮進(jìn)毀一生,你懂我也懂。做過(guò)大項(xiàng)目的人應(yīng)該更懂。
而使用 RxJava 的話。代碼是這種:
@GET("/token") public Observable<String> getToken();@GET("/user") public Observable<User> getUser(@Query("token") String token, @Query("userId") String userId);...getToken().flatMap(new Func1<String, Observable<User>>() {@Overridepublic Observable<User> onNext(String token) {return getUser(token, userId);}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<User>() {@Overridepublic void onNext(User user) {userView.setUser(user);}@Overridepublic void onCompleted() {}@Overridepublic void onError(Throwable error) {// Error handling...}});用一個(gè) flatMap() 就搞定了邏輯,依舊是一條鏈。
看著就非常爽。是吧?
2016/03/31 更新,加上我寫的一個(gè) Sample 項(xiàng)目:
rengwuxian RxJava Samples
好。Retrofit 部分就到這里。
2. RxBinding
RxBinding 是 Jake Wharton 的一個(gè)開(kāi)源庫(kù),它提供了一套在 Android 平臺(tái)上的基于 RxJava 的 Binding API。
所謂 Binding,就是類似設(shè)置 OnClickListener 、設(shè)置 TextWatcher 這種注冊(cè)綁定對(duì)象的 API。
舉個(gè)設(shè)置點(diǎn)擊監(jiān)聽(tīng)的樣例。
使用 RxBinding 。能夠把事件監(jiān)聽(tīng)用這種方法來(lái)設(shè)置:
Button button = ...; RxView.clickEvents(button) // 以 Observable 形式來(lái)反饋點(diǎn)擊事件.subscribe(new Action1<ViewClickEvent>() {@Overridepublic void call(ViewClickEvent event) {// Click handling}});看起來(lái)除了形式變了沒(méi)什么差別,實(shí)質(zhì)上也是這樣。甚至假設(shè)你看一下它的源代碼,你會(huì)發(fā)現(xiàn)它連實(shí)現(xiàn)都沒(méi)什么驚喜:它的內(nèi)部是直接用一個(gè)包裹著的 setOnClickListener() 來(lái)實(shí)現(xiàn)的。然而。僅僅這一個(gè)形式的改變。卻恰好就是 RxBinding 的目的:擴(kuò)展性。通過(guò) RxBinding 把點(diǎn)擊監(jiān)聽(tīng)轉(zhuǎn)換成 Observable 之后,就有了對(duì)它進(jìn)行擴(kuò)展的可能。
擴(kuò)展的方式有非常多,依據(jù)需求而定。
一個(gè)樣例是前面提到過(guò)的 throttleFirst() ,用于去抖動(dòng),也就是消除手抖導(dǎo)致的高速連環(huán)點(diǎn)擊:
RxView.clickEvents(button).throttleFirst(500, TimeUnit.MILLISECONDS).subscribe(clickAction);假設(shè)想對(duì) RxBinding 有很多其它了解。能夠去它的 GitHub 項(xiàng)目 以下看看。
3. 各種異步操作
前面舉的 Retrofit 和 RxBinding 的樣例,是兩個(gè)能夠提供現(xiàn)成的 Observable 的庫(kù)。而假設(shè)你有某些異步操作無(wú)法用這些庫(kù)來(lái)自己主動(dòng)生成 Observable。也全然能夠自己寫。比如數(shù)據(jù)庫(kù)的讀寫、大圖片的載入、文件壓縮/解壓等各種須要放在后臺(tái)工作的耗時(shí)操作。都能夠用 RxJava 來(lái)實(shí)現(xiàn)。有了之前幾章的樣例。這里應(yīng)該不用再舉例了。
4. RxBus
RxBus 名字看起來(lái)像一個(gè)庫(kù),但它并非一個(gè)庫(kù),而是一種模式,它的思想是使用 RxJava 來(lái)實(shí)現(xiàn)了 EventBus ,而讓你不再須要使用 Otto 或者 GreenRobot 的 EventBus。至于什么是 RxBus,能夠看這篇文章。順便說(shuō)一句,Flipboard 已經(jīng)用 RxBus 替換掉了 Otto ,眼下為止沒(méi)有不良反應(yīng)。
最后
對(duì)于 Android 開(kāi)發(fā)人員來(lái)說(shuō)。 RxJava 是一個(gè)非常難上手的庫(kù),由于它對(duì)于 Android 開(kāi)發(fā)人員來(lái)說(shuō)有太多陌生的概念了。
但是它真的非常牛逼。因此,我寫了這篇《給 Android 開(kāi)發(fā)人員的 RxJava 詳細(xì)解釋》。希望能給始終搞不明確什么是 RxJava 的人一些入門的指引,或者能讓正在使用 RxJava 但仍然心存疑惑的人看到一些更深入的解析。無(wú)論如何。僅僅要能給各位同為 Android project師的你們提供一些幫助。這篇文章的目的就達(dá)到了。
再次感謝對(duì)這篇文章的產(chǎn)出提供支持的各位:
技術(shù)支持:流火楓林
內(nèi)測(cè)讀者:代碼家、鮑永章、drakeet、馬琳、有時(shí)放縱、程序亦非猿、大頭鬼、XZoomEye、席德雨、TCahead、Tiiime、Ailurus、宅學(xué)長(zhǎng)、妖孽、大大大大大臣哥、NicodeLee
贊助方:周伯通招聘是他們讓我的文章能夠以不那么丑陋的樣子出如今大家面前。
關(guān)于作者
朱凱(扔物線),Flipboard 北京 Android project師。
微博:扔物線
GitHub:rengwuxian
為什么寫這個(gè)?
與兩三年前的境況不同,中國(guó)如今已經(jīng)不缺0基礎(chǔ) Android project師。但中級(jí)和高級(jí)project師嚴(yán)重供不應(yīng)求。
因此我決定從今天開(kāi)始不定期地公布我的技術(shù)分享。僅僅希望能夠和大家共同提升,通過(guò)我們的成長(zhǎng)來(lái)解決一點(diǎn)點(diǎn)國(guó)內(nèi)互聯(lián)網(wǎng)公司人才稀缺的困境。也提升各位技術(shù)黨的收入。
所以,不僅要寫這篇,我還會(huì)寫很多其它。
至于內(nèi)容的定位,我計(jì)劃僅僅定位真正的干貨,一些邊邊角角的小技巧和炫酷的黑科技應(yīng)該都不會(huì)寫,總之希望每篇文章都能幫讀者提升真正的實(shí)力。
轉(zhuǎn)載于:https://www.cnblogs.com/llguanli/p/8327435.html
總結(jié)
以上是生活随笔為你收集整理的给 Android 开发人员的 RxJava 具体解释的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: (转)计算机网络基础知识总结
- 下一篇: Android 9.0新特性