日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Java中expecial,RxJava 学习笔记 (一)

發布時間:2024/9/3 java 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java中expecial,RxJava 学习笔记 (一) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

作者: 一字馬胡

轉載標志 【2017-12-13】

更新日志

日期

更新內容

備注

2017-12-13

RxJava學習筆記系列

系列筆記 (一)

2017-12-15

增加系列筆記(二)

2017-12-15 21:36

考慮到RxJava很大程度上用于android開發中,而我自身不是移動開發者,所以暫時將RxJava學習筆記系列掛起,在未來需要使用RxJava的時候再繼續學習,并且結合實際的應用來學習會收獲更多

掛起

導入

其實在很早以前就接觸過RxJava,并且當時學習RxJava還有一個產出:JSwitcher,這是一個基于RxJava的實驗性框架,對于該框架的介紹可以參考下面的描述:

JSwitcher is a Convenient tool to switch schedule base on RxJava, and Jswitcher also implement a sample Version Observer/Observable, you can learn how RxJava works from the sample codes. it's easy to switch to Another schedule from current schedule. you just need to care about your bussiness, using 'switchTo' Operator to switch to the longing schedlue when you want to do the work on the suitable schedule. There are some especial schedules for you, like I/O Bound Schedule, Cpu Bound Schedule, And Single Schedule, etc, if you want to create an extra schedule by yourself, it's ok for JSwitcher, and it's very easy to do this .And the most important thing is the jswitch support 'chain operator', that means you can switch to a schedule, then fit on this schedule some works, then you can do switch operator continue from current position, or you can just fit another work on current schedule, and jswitcher has terminal operator like 'waitAndShutdown', after do the operator, you can not do 'chain operator' anymore. and the jswitcher will wait some time and shutdown all of schedule.

該框架將RxJava的核心部分抽離出來并做了一些簡化處理,說到這里,需要提及一下,將一個復雜框架中的某部分抽象出來看似很簡單,但是實際操作起來還是有一些困難的,并且在實際操作的過程中為了不涉及過多外圍的內容時常需要簡化,就是將一些依賴外圍的核心部分中的某些內容拋棄,但是最為主要的骨架不能丟掉,這樣操作下來會對整個框架有一定的了解。如果上面的描述激起了你的興趣,可以實際去閱讀JSwitcher框架代碼,也可以作為快速入門RxJava的學習材料,但是該框架存在一些不確定性以及一些待研究正確性的點,所以不宜在實際項目中應用。

JSwitcher的核心功能是實現線程池的切換,并且支持按任務性質(I/O,Compute)來劃分線程池,切換到合適的線程池可以提交任務,具體的使用可以參考下面的例子:

SwitcherFitter.switcherFitter()

.switchToIoSchedule() //switch to i/o bound schedule

.switchToSingleSchedule() //switch to single schedule

.fit(normalRunner, future1, true) //do the normal runner at current schedule

.switchToComputeSchedule() // switch to cpu bound schedule

.fit(normalRunner, future2, true) // do

.fit(timeoutRunner, future3, true) // do

.switchToSingleSchedule() //switch

.switchToSingleSchedule() //switch

.fit(timeoutRunner, future4, true) //do

.awaitFuturesCompletedOrTimeout(100,

completableFutures, timeoutFutures, 10) //wait for the future

.switchToComputeSchedule() //switch

.fit(() -> {

System.out.println("i am a tester->" + Thread.currentThread().getName());

}) // do the stupid work

.waitAndShutdown(1000); //wait and shutdown !

關于JSwitcher的設計,可以參考下面的圖片:

本文作為學習RxJava的學習筆記的第一篇文章,會從RxJava的一些核心概念出發,并且從實際的例子來梳理RxJava的實現原理,當然,為了閱讀的流暢性,每一篇文章不會涉及太多的內容。需要說明的一點是,本文乃至本系列的所有文章都是基于RxJava2,RxJava目前有兩個版本,一個是RxJava1,一個是RxJava2,據說兩個版本間的差別還是很大的,介于我的學習都是基于RxJava2的,并且沒有接觸過RxJava1,所以本系列文章不會涉及RxJava1與RxJava2的對比內容,所有內容都是基于RxJava2的。

Observer和Observable

學習RxJava之前,你需要了解什么是Reactive,我的理解是應該要和傳統的代碼進行對比學習,我們一般寫代碼都是命令式的,我們希望做什么就做什么,比如我們想下載一張圖片,然后判斷圖片是否下載成功,如果成功了就展示出來,如果沒有下載成功則使用兜底圖片進行展示,如果沒有兜底圖片則不展示。下面是這個功能的偽代碼實現:

Image img = EntryDownloadHelper.downloadImageByUrl(url, timeout)

if img is null

then

if FALLBACK_IMG != null

then img = FALLBACK_IMG

if img != null

then

ShowEntryHelper.showImage(img, height, weight)

看起來很熟悉并且很容易理解,那什么是Reactive的呢?如果使用RxJava來重寫上面的代碼,則代碼看起來像下面這樣:

String imgUrl = "xxx";

Image img = null;

Image FALLBACK_IMG = "xxx";

int timeout = 1000;

int height = 100;

int weight = 200;

Observable.create(new ObservableOnSubscribe() {

public void subscribe(ObservableEmitter e) throws Exception {

if (imgUrl == null || imgUrl.isEmpty()) {

e.onNext(FALLBACK_IMG);

} else {

img = EntryDownloadHelper.downloadImageByUrl(imgUrl, timeout);

if (img == null) {

e.onNext(FALLBACK_IMG);

} else {

e.onNext(img);

}

}

e. onComplete();

}

}).subscribe(new Observer() {

public void onSubscribe(Disposable disposable) {

}

public void onNext(Image s) {

if (s != null) {

ShowEntryHelper.showImage(img, height, weight);

}

}

public void onError(Throwable throwable) {

System.out.println("onError:" + throwable);

}

public void onComplete() {

}

});

這只是一個簡單的小例子,并沒有什么使用價值,并且需要說明的一點是,RxJava更適合用于移動應用的開發,所以如果是做移動開發的話,學習RxJava的價值會更大,但是在一些其他的開發過程中也會使用到RxJava。

在上面的例子中,出現了兩個比較關鍵的對象,ObServer和Observable,RxJava在實現Reactive的時候使用了觀察者設計模式,Observable是被觀察者,可以叫數據源,也可以叫做生產者,反正就是負責生產數據,并且將數據推送出去的東西,而ObServer是觀察者對象,它會綁定到一個Observable上,并且觀察Observable的行為,當ObServable觸發事件的時候,ObServer會接收到事件,并且對相應的事件作出相應。所以可以將ObServer叫做事件的接收者,也可以叫做事件的消費者。有了觀察者和被觀察者,需要將兩個角色聯系起來,也就是上面所說到的將Observer綁定到Observable上,這個時候就需要使用Observable的subscribe方法,叫做訂閱,下面會詳細講解Observable是如何將事件傳遞給Observer的。

學習一個新技術最開始需要做的就是寫一個demo,并且運行起來,然后再繼續學習下去。下面首先寫一個RxJava的demo,下面的分析將會基于該demo:

Observable.create(new ObservableOnSubscribe() {

public void subscribe(ObservableEmitter e) throws Exception {

e.onNext("test");

e.onComplete();

}

}).subscribe(new Observer() {

public void onSubscribe(Disposable disposable) {

System.out.println("onSubscribe");

}

public void onNext(String s) {

System.out.println("onNext:" + s);

}

public void onError(Throwable throwable) {

System.out.println("onError:" + throwable);

}

public void onComplete() {

System.out.println("onComplete:");

}

});

首先需要創建一個Observable,可以使用Observable的靜態方法create,當然可以直接new一個Observable對象,并且實現Observable的方法來實現,就像下面這樣:

Observable observable = new Observable() {

@Override

protected void subscribeActual(Observer super String> observer) {

observer.onNext("ok");

observer.onComplete();

}

};

現在,Observable已經有了,下面就需要在該Observable上綁定一個Observer,就像上面的例子一樣,使用Observable的subscribe方法,需要說明的一點是,可以在Observable做非常豐富的聚合操作,可以對Observable進行一系列聚合操作(比如map,filter等操作)之后再綁定Observer,但是本文不會涉及這些操作的內容,這些內容將在下一篇該系列的文章中出現。

目前Observable有六個subscribe方法供Observer選擇:

public final Disposable subscribe()

public final Disposable subscribe(Consumer super T> onNext)

public final Disposable subscribe(Consumer super T> onNext, Consumer super Throwable> onError)

public final Disposable subscribe(Consumer super T> onNext, Consumer super Throwable> onError,Action onComplete)

public final Disposable subscribe(Consumer super T> onNext, Consumer super Throwable> onError, Action onComplete, Consumer super Disposable> onSubscribe)

public final void subscribe(Observer super T> observer)

可以選擇這六個中的任意一個來綁定Observer,本文以一個看起來較為簡單的subscribe方法來分析,也就是上面例子中使用的版本:

public final void subscribe(Observer super T> observer)

下面展示了該方法的詳細實現細節:

public final void subscribe(Observer super T> observer) {

ObjectHelper.requireNonNull(observer, "observer is null");

try {

observer = RxJavaPlugins.onSubscribe(this, observer);

ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

subscribeActual(observer);

} catch (NullPointerException e) { // NOPMD

throw e;

} catch (Throwable e) {

Exceptions.throwIfFatal(e);

// can't call onError because no way to know if a Disposable has been set or not

// can't call onSubscribe because the call might have set a Subscription already

RxJavaPlugins.onError(e);

NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");

npe.initCause(e);

throw npe;

}

}

看起來代碼很多,但是核心代碼就一句:subscribeActual(observer),然后:

/**

* Operator implementations (both source and intermediate) should implement this method that

* performs the necessary business logic.

*

There is no need to call any of the plugin hooks on the current Observable instance or

* the Subscriber.

* @param observer the incoming Observer, never null

*/

protected abstract void subscribeActual(Observer super T> observer);

再看一下new一個Observable的代碼:

Observable observable = new Observable() {

@Override

protected void subscribeActual(Observer super String> observer) {

// XXX

}

};

也就是說,subscribe方法中會調用Observable的subscribeActual方法,并且將subscribe的參數(也就是綁定到該Observable的Observer)傳遞給subscribeActual,然后,我們在subscribeActual方法里面對subscribeActual的參數observer的操作實際上就是直接調用了Observer的方法,所以Observer當然會對響應相應的事件。

這個理解起來不太困難,下面看一下使用Observable的create靜態方法來創建Observable的時候是怎么講一個Observer綁定到一個create出來的Observable上的,回頭看下面的代碼:

Observable.create(new ObservableOnSubscribe() {

public void subscribe(ObservableEmitter e) throws Exception {

// XXX

}

}).subscribe(new Observer() {

// XXX

});

這個看起來好像不能像上面那種情況一樣理解,因為create的參數是new一個ObservableOnSubscribe對象,現在先來看一下create方法的具體實現細節:

public static Observable create(ObservableOnSubscribe source) {

ObjectHelper.requireNonNull(source, "source is null");

return RxJavaPlugins.onAssembly(new ObservableCreate(source));

}

可以看到,create方法返回的是一個ObservableCreate對象,并且將我們的Observable對象傳遞給了ObservableCreate,這里使用了包裝模式,將Observable包裝成了ObservableCreate對象。在ObservableCreate類中找到了subscribeActual的實現,而這個subscribeActual正是實現了Observable的subscribeActual。所以包裝需要包裝徹底啊。下面是ObservableCreate類的subscribeActual的具體實現:

@Override

protected void subscribeActual(Observer super T> observer) {

CreateEmitter parent = new CreateEmitter(observer);

observer.onSubscribe(parent);

try {

source.subscribe(parent);

} catch (Throwable ex) {

Exceptions.throwIfFatal(ex);

parent.onError(ex);

}

}

在subscribeActual內部,又對Observer做了一次包裝,將Observer對象包裝成了CreateEmitter對象,為什么呢?因為在create方法的參數中我們new的Observable是一個ObservableOnSubscribe類型的對象,而ObservableOnSubscribe的subscribe的參數需要是CreateEmitter類型的,那我們new出來的ObservableOnSubscribe到哪去了呢?看下面的構造函數:

final ObservableOnSubscribe source;

public ObservableCreate(ObservableOnSubscribe source) {

this.source = source;

}

可以看到,我們new出來的ObservableOnSubscribe被保存在source字段中,在來看ObservableCreate類的subscribeActual方法,其中有關鍵的一句話:source.subscribe(parent),source是Observable,parent是Observer,只是Observer和Observable都是被包裝了一層的。如果想具體了解到底是怎么包裝的,可以參考CreateEmitter類,也可以借助這個機會學習一下包裝模式,還是比較有用的。

本文是對RxJava學習筆記系列的第一篇文章,內容淺顯易懂,沒有涉及太多的內容,主要分析了一下RxJava中的兩個重要的對象,Observable和Observer,并且梳理了一下一個Observer是如何綁定到一個Observable上的,當然,這是學習RxJava的基礎內容,如果對這一部分內容都不清楚的話,還需要繼續學習一下,本文涉及到兩個設計模式,一個是觀察者模式,一個是包裝模式,結合具體的例子來看還是很好理解的。本文開頭還介紹了一下JSwitcher,對于學習RxJava還是比較有幫助的。下面簡單做一下RxJava學習筆記系列的文章計劃:

《RxJava學習筆記 (一)》 : 了解RxJava中的Observable和Observer,并且明白如何實現訂閱

《RxJava學習筆記 (二)》 : RxJava中Observable豐富的聚合操作支持的學習筆記

《RxJava學習筆記 (三)》 : RxJava2中的線程切換學習筆記

《RxJava學習筆記 (四)》 : RxJava Flowable學習

暫時定這幾部分內容,在總結過程中如果發現還有什么內容需要補充的時候會進行補充更新。

掃碼入群

總結

以上是生活随笔為你收集整理的Java中expecial,RxJava 学习笔记 (一)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。