RxJava初级解析(一)
扔物線大大的文章確實(shí)寫的牛 扔物線,看了他的文章受益匪淺,文中很多會(huì)引用到他的一些分析,沒有看過他的文章的建議先看一下。
一.概述
先簡單介紹一下RxJava的思想
RxJava 有四個(gè)基本概念:Observable (可觀察者,即被觀察者)、 Observer (觀察者)、 subscribe (訂閱)、事件。Observable 和 Observer 通過 subscribe() 方法實(shí)現(xiàn)訂閱關(guān)系,從而 Observable 可以在需要的時(shí)候發(fā)出事件來通知 Observer。
概念介紹
1.Observable(可觀察者,即被觀察者)
事件的觸發(fā)者。
2.Observer/Subscriber(觀察者)
事件的產(chǎn)生者
3.subscribe(訂閱)
可被觀察者和觀察者之間的橋梁
4.事件
產(chǎn)生的事件
5.總結(jié)
舉個(gè)例子:我是一名讀者雜志會(huì)員,我想訂閱讀者期刊,當(dāng)我訂閱之后,讀者工作室就會(huì)每個(gè)月給我發(fā)一本讀者雜志。在這個(gè)事件中,我就是一名被觀察者,讀者工作室就是觀察者,因?yàn)槲乙坏┊a(chǎn)生訂閱這件事,就會(huì)觸發(fā)讀者工作室的一系列動(dòng)作。
二.實(shí)例解析
1.最簡單的例子
observable(可被觀察者)
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("test1");subscriber.onNext("test2");subscriber.onCompleted();}});subscriber/observer(觀察者)
private Subscriber<String> subscribe = new Subscriber<String>() {@Overridepublic void onCompleted() {Log.e("HP", "onCompleted");}@Overridepublic void onError(Throwable e) {}@Overridepublic void onNext(String s) {Log.e("HP", s);} };subscribe(訂閱)
observable.subscribe(subscribe);執(zhí)行以上代碼會(huì)打印出如下結(jié)果
test1 test2 onCompleted這樣一個(gè)最簡單的RxJava代碼就完成了。
為什么在call方法中調(diào)用onNext(),onCompleted()會(huì)觸發(fā)subscriber/observer中對(duì)應(yīng)的方法呢?接下來一起看一下源碼,看看是如何訂閱成功的。
進(jìn)入到Observable.create()方法
public final static <T> Observable<T> create(OnSubscribe<T> f) {return new Observable<T>(hook.onCreate(f));} protected Observable(OnSubscribe<T> f) {this.onSubscribe = f; }這里創(chuàng)建了一個(gè)Observable對(duì)象,同時(shí)通過構(gòu)造函數(shù)將 f 賦值給Observalbe類中的onSubscribe
進(jìn)入到subscribe方法
public final Subscription subscribe(Subscriber<? super T> subscriber) {return Observable.subscribe(subscriber, this);}private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {// validate and proceedif (subscriber == null) {throw new IllegalArgumentException("observer can not be null");}if (observable.onSubscribe == null) {throw new IllegalStateException("onSubscribe function can not be null.");/** the subscribe function can also be overridden but generally that's not the appropriate approach* so I won't mention that in the exception*/}// new Subscriber so onStart itsubscriber.onStart();/** See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls* to user code from within an Observer"*/// if not already wrappedif (!(subscriber instanceof SafeSubscriber)) {// assign to `observer` so we return the protected versionsubscriber = new SafeSubscriber<T>(subscriber);}// The code below is exactly the same an unsafeSubscribe but not used because it would // add a significant depth to already huge call stacks.try {// allow the hook to intercept and/or decoratehook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);return hook.onSubscribeReturn(subscriber);} catch (Throwable e) {// special handling for certain Throwable/Error/Exception typesExceptions.throwIfFatal(e);// if an unhandled error occurs executing the onSubscribe we will propagate ittry {subscriber.onError(hook.onSubscribeError(e));} catch (Throwable e2) {Exceptions.throwIfFatal(e2);// if this happens it means the onError itself failed (perhaps an invalid function implementation)// so we are unable to propagate the error correctly and will just throwRuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);// TODO could the hook be the cause of the error in the on error handling.hook.onSubscribeError(r);// TODO why aren't we throwing the hook's return value.throw r;}return Subscriptions.unsubscribed();}}我們看到
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);其中hook.onSubscribeStart返回的是create()方法中創(chuàng)建的OnSubscribe對(duì)象,即上文中提到的 onSubscribe ,在這里調(diào)用OnSubscribe 中的call方法,將subscriber/observer傳遞到call方法中,所以我們?cè)赾all中調(diào)用onNext(),onError(),onCompleted()會(huì)觸發(fā)observer中對(duì)應(yīng)的方法,從而達(dá)到了事件通知的效果。
2.訂閱Action
觀察者除了我們的subscriber/observer之外,還可以是Action
再看一個(gè)例子
定義一個(gè)action
再看輸出結(jié)果
test1 test2不出所料,輸出結(jié)果與預(yù)期的相同。
進(jìn)入subscribe方法看以看究竟
原來在subscribe方法中將我們的action轉(zhuǎn)換成了subscribe。
所以通過action你可以自定義一些實(shí)現(xiàn)onNext(),onError(),onComplete()方法的action,這些action通常是可以作為公用的操作。
這些僅僅是RxJava最基礎(chǔ)用法的一個(gè)解析。
總結(jié)
以上是生活随笔為你收集整理的RxJava初级解析(一)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: hdu1010 dfs+路径剪枝
- 下一篇: Java Junit