Android—RxJava库知识
RXJAVA:一個在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程序的庫。
優(yōu)點:異步,邏輯簡潔易懂。
程序要求:將一個給出的目錄數(shù)組 File[] folders 中每個目錄下的 png 圖片都加載出來并顯示在 imageCollectorView 中。
例子:
new Thread() {@Overridepublic void run() {super.run();for (File folder : folders) {File[] files = folder.listFiles();for (File file : files) {if (file.getName().endsWith(".png")) {final Bitmap bitmap = getBitmapFromFile(file);getActivity().runOnUiThread(new Runnable() {@Overridepublic void run() {imageCollectorView.addImage(bitmap);}});}}}} }.start();RxJava實現(xiàn):?
Observable.from(folders).flatMap(new Func1<File, Observable<File>>() {@Overridepublic Observable<File> call(File file) {return Observable.from(file.listFiles());}}).filter(new Func1<File, Boolean>() {@Overridepublic Boolean call(File file) {return file.getName().endsWith(".png");}}).map(new Func1<File, Bitmap>() {@Overridepublic Bitmap call(File file) {return getBitmapFromFile(file);}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Bitmap>() {@Overridepublic void call(Bitmap bitmap) {imageCollectorView.addImage(bitmap);}});RxJava 3個基本要素:Observable (可觀察者,即被觀察者)、 Observer (觀察者)、 subscribe (訂閱)。
Observable 和 Observer 通過 subscribe() 方法實現(xiàn)訂閱關(guān)系,從而 Observable 可以在需要的時候發(fā)出事件來通知 Observer。
實現(xiàn)基本步驟:
1、創(chuàng)建 Observer
Observer 即觀察者,它決定事件觸發(fā)的時候?qū)⒂性鯓拥男袨椤?RxJava 中的 Observer接口的實現(xiàn)方式:
Observer<String> observer = new Observer<String>() {@Overridepublic void onNext(String s) {Log.d(tag, "Item: " + s);}@Overridepublic void onCompleted() {Log.d(tag, "Completed!");}@Overridepublic void onError(Throwable e) {Log.d(tag, "Error!");} };- onCompleted(): 事件隊列完結(jié)。RxJava 不僅把每個事件單獨處理,還會把它們看做一個隊列。RxJava 規(guī)定,當(dāng)不會再有新的 onNext() 發(fā)出時,需要觸發(fā) onCompleted() 方法作為標(biāo)志。
- onError(): 事件隊列異常。在事件處理過程中出異常時,onError() 會被觸發(fā),同時隊列自動終止,不允許再有事件發(fā)出。
- 在一個正確運行的事件序列中, onCompleted() 和 onError() 有且只有一個,并且是事件序列中的最后一個。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在隊列中調(diào)用了其中一個,就不應(yīng)該再調(diào)用另一個。
除了 Observer 接口之外,RxJava 還內(nèi)置了一個實現(xiàn)了 Observer 的抽象類:Subscriber。 Subscriber 對 Observer 接口進(jìn)行了一些擴(kuò)展,但他們的基本使用方式是完全一樣的。它們的區(qū)別對于使用者來說主要有兩點:
2、創(chuàng)建 Observable
Observable 即被觀察者,它決定什么時候觸發(fā)事件以及觸發(fā)怎樣的事件。 RxJava 使用 create() 方法來創(chuàng)建一個 Observable ,并為它定義事件觸發(fā)規(guī)則:
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("Hello");subscriber.onNext("Hi");subscriber.onNext("Aloha");subscriber.onCompleted();} });RxJava 還提供了一些方法用來快捷創(chuàng)建事件隊列,例如:
- just(T...): 將傳入的參數(shù)依次發(fā)送出來。
- from(T[]) / from(Iterable<? extends T>) : 將傳入的數(shù)組或 Iterable 拆分成具體對象后,依次發(fā)送出來。
上面 just(T...) 的例子和 from(T[]) 的例子,都和之前的 create(OnSubscribe) 的例子是等價的。
3、Subscribe (訂閱)
創(chuàng)建了 Observable 和 Observer 之后,再用 subscribe() 方法將它們聯(lián)結(jié)起來,整條鏈子就可以工作了。
observable.subscribe(observer); // 或者: observable.subscribe(subscriber);觀察者模式本身的目的就是『后臺處理,前臺回調(diào)』的異步機(jī)制,因此異步對于 RxJava 是至關(guān)重要的。而要實現(xiàn)異步,則需要用到 RxJava 的另一個概念:Scheduler?。
除了 subscribe(Observer) 和 subscribe(Subscriber) ,subscribe() 還支持不完整定義的回調(diào),RxJava 會自動根據(jù)定義創(chuàng)建出 Subscriber 。形式如下:
Action1<String> onNextAction = new Action1<String>() {// onNext()@Overridepublic void call(String s) {Log.d(tag, s);} }; Action1<Throwable> onErrorAction = new Action1<Throwable>() {// onError()@Overridepublic void call(Throwable throwable) {// Error handling} }; Action0 onCompletedAction = new Action0() {// onCompleted()@Overridepublic void call() {Log.d(tag, "completed");} };// 自動創(chuàng)建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 來定義onNext()、 onError() 和 onCompleted()observable.subscribe(onNextAction, onErrorAction, onCompletedAction);Action0 ,Action1 是 RxJava 的接口,它們只有一個方法 call,Action0 的這個call()方法是無參無返回值的;而Action1的方法有一個參數(shù),為call(T param)。由于onCompleted() 方法也是無參無返回值的,對應(yīng)Action0 可以被當(dāng)成一個包裝對象,onNext(T obj) 和 onError(Throwable error) 也是單參數(shù)無返回值的,對應(yīng)Action1。將這三個方法的內(nèi)容打包起來將自己作為一個參數(shù)傳入 subscribe() 以實現(xiàn)不完整定義的回調(diào)。根據(jù)參數(shù)的數(shù)量X還有ActionX。
String[] names = ...; Observable.from(names).subscribe(new Action1<String>() {@Overridepublic void call(String name) {Log.d(tag, name);}});3. 線程控制 —— Scheduler (一)
RxJava 已經(jīng)內(nèi)置了幾個 Scheduler :
- Schedulers.immediate(): 直接在當(dāng)前線程運行,相當(dāng)于不指定線程。這是默認(rèn)的 Scheduler。
- Schedulers.newThread(): 總是啟用新線程,并在新線程執(zhí)行操作。
- Schedulers.io(): I/O 操作(讀寫文件、讀寫數(shù)據(jù)庫、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler。行為模式和 newThread() 差不多,區(qū)別在于 io() 的內(nèi)部實現(xiàn)是是用一個無數(shù)量上限的線程池,可以重用空閑的線程,因此多數(shù)情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免創(chuàng)建不必要的線程。
- Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數(shù)。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。
- Android 還有一個專用的 AndroidSchedulers.mainThread(),它指定的操作將在 Android 主線程運行。
可以使用 subscribeOn() 和 observeOn() 兩個方法來對線程進(jìn)行控制了。
- subscribeOn(): 指定 subscribe() 所發(fā)生的線程,即 Observable.OnSubscribe 被激活時所處的線程?;蛘呓凶鍪录a(chǎn)生的線程。 只能有一個。
- ?observeOn(): 指定 Subscriber 所運行在的線程?;蛘呓凶鍪录M的線程??捎卸鄠€。
由于 subscribeOn(Schedulers.io()) 的指定,被創(chuàng)建的事件的內(nèi)容 1、2、3、4 將會在 IO 線程發(fā)出;而由于 observeOn(AndroidScheculers.mainThread()) 的指定,因此 subscriber 數(shù)字的打印將發(fā)生在主線程 。
這兩句經(jīng)常會使用到,一般在IO線程獲取網(wǎng)絡(luò)數(shù)據(jù),主線程將數(shù)據(jù)更新到界面。『后臺線程取數(shù)據(jù),主線程顯示』的程序策略。
4、RxJava 的操作符
map():數(shù)據(jù)類型轉(zhuǎn)換
將String轉(zhuǎn)換為Bitmap對象
Observable.just("images/logo.png") // 輸入類型 String.map(new Func1<String, Bitmap>() {@Overridepublic Bitmap call(String filePath) { // 參數(shù)類型 Stringreturn getBitmapFromPath(filePath); // 返回類型 Bitmap}}).subscribe(new Action1<Bitmap>() {@Overridepublic void call(Bitmap bitmap) { // 參數(shù)類型 BitmapshowBitmap(bitmap);}});Func1 的類。和 Action1 非常相似,也是 RxJava 的一個接口,用于包裝含有一個參數(shù)的方法。 Func1 和 Action 的區(qū)別在于, Func1 包裝的是有返回值的方法。另外,和 ActionX 一樣, FuncX 也有多個,用于不同參數(shù)個數(shù)的方法。FuncX 和 ActionX 的區(qū)別在 FuncX 包裝的是有返回值的方法。
flatMap():假設(shè)有一個數(shù)據(jù)結(jié)構(gòu)『學(xué)生』,現(xiàn)在需要打印出一組學(xué)生的名字,可以用map()將Student類型轉(zhuǎn)換為String。
但是如果要打印出學(xué)生的課程,則需要flatMap(),因為課程有多個。
Student[] students = ...; Subscriber<Course> subscriber = new Subscriber<Course>() {@Overridepublic void onNext(Course course) {Log.d(tag, course.getName());}... }; Observable.from(students).flatMap(new Func1<Student, Observable<Course>>() {@Overridepublic Observable<Course> call(Student student) {return Observable.from(student.getCourses());}}).subscribe(subscriber);我們也可以用map轉(zhuǎn)換并返回List<Course>,然后在onNext里遍歷,但是不夠?qū)嵱谩?/p>
concat():concat操作符連接多個Observable一起輸出,第一個Observable發(fā)射的所有數(shù)據(jù)在第二個Observable發(fā)射的任何數(shù)據(jù)前面,以此類推。
first():只發(fā)射第一個滿足某個條件的數(shù)據(jù)
三級緩存:
- 首先檢查內(nèi)存是否有緩存
- 然后檢查文件緩存中是否有
- 最后才從網(wǎng)絡(luò)中取
任何一步一旦發(fā)現(xiàn)數(shù)據(jù)后面的操作都不執(zhí)行
在rxjava中為我們提供了兩個解決這個問題的操作符,分別是:concat和first
concat將3個Observable連在一起,first則如果發(fā)射第一個滿足條件的Observable
final Observable<String> memory = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {if (memoryCache != null) {subscriber.onNext(memoryCache);} else {subscriber.onCompleted();}}});Observable<String> disk = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {String cachePref = rxPreferences.getString("cache").get();if (!TextUtils.isEmpty(cachePref)) {subscriber.onNext(cachePref);} else {subscriber.onCompleted();}}});Observable<String> network = Observable.just("network");//依次檢查memory、disk、network Observable.concat(memory, disk, network).first().subscribeOn(Schedulers.newThread()).subscribe(s -> {memoryCache = "memory";System.out.println("--------------subscribe: " + s);});skip(int i):跳過Observable發(fā)射的前i項數(shù)據(jù)
repeat(int i):重復(fù)發(fā)送i次
interval(int i,TimeUnit.SECONDS) :每個i秒發(fā)送一次
take(int i):只取前幾個
takelat(int i):只取后幾個
?
轉(zhuǎn)載自:
RxJava 詳解
Rx系列之RxJava初識
Rx系列之RxJava操作符
Rx系列之Rxjava操作符進(jìn)階-使用場景
總結(jié)
以上是生活随笔為你收集整理的Android—RxJava库知识的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JIRA 5.0.1 发布
- 下一篇: Android—Bitmap图片大小计算