生活随笔
收集整理的這篇文章主要介紹了
Rx2.0后台开发分享
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
Rxjava2.x
微軟的一個(gè)函數(shù)庫(kù),Rx是一個(gè)編程模型,模板是提供一致的編程接口,幫助開(kāi)發(fā)者更方便的處理異步數(shù)據(jù)流,現(xiàn)在Rx已經(jīng)支持幾乎全部的流行編程語(yǔ)言。比較流行的有Rxjava,RxJs,Rx.NET,社區(qū)網(wǎng)站:http://reactivex.io/ Rx使用觀察者模式 使用觀察者模式監(jiān)聽(tīng):RX可以訂閱任何可觀察的數(shù)據(jù)流并且執(zhí)行操作 組合:RX使用查詢式的操作符和變換數(shù)據(jù)流 創(chuàng)建:Rx可以方便的創(chuàng)建事件流和數(shù)據(jù)流 簡(jiǎn)化代碼 函數(shù)式風(fēng)格:對(duì)可觀察數(shù)據(jù)流使用無(wú)副作用的輸入輸出函數(shù),避免程序中錯(cuò)綜復(fù)雜的狀態(tài) 簡(jiǎn)化代碼:Rx的操作符通常可以將復(fù)雜難題簡(jiǎn)單化為很少的代碼 異步錯(cuò)誤處理: 傳統(tǒng)的try/catch沒(méi)有辦法處理異步計(jì)算,Rx提供了合適的錯(cuò)誤處理機(jī)制 輕松使用并發(fā):Rx的Observables和Schedulers讓開(kāi)發(fā)者可以擺脫底層線程同步和各種并發(fā)問(wèn)題。 jar包maven倉(cāng)庫(kù)地址
< dependency> < groupId> io.reactivex.rxjava2
</ groupId> < artifactId> rxjava
</ artifactId> < version> 2.2.6
</ version>
</ dependency>
一個(gè)簡(jiǎn)單的例子:
public void myTestObservable ( ) { Observable . fromIterable ( Lists . newArrayList ( 1 , 2 , 3 , 4 , 5 ) ) . filter ( integer -> { return integer > 2 ; } ) . subscribe ( integer -> { System . out. println ( Thread . currentThread ( ) . getName ( ) + " : " + integer ) ; } ) ; }
在Rxjava中,一個(gè)實(shí)現(xiàn)了Observer接口的對(duì)象可以訂閱(subscribe)一個(gè)Observable類的實(shí)例。訂閱者(Subscriber)對(duì)Observable發(fā)射(emit)的任何數(shù)據(jù)或數(shù)據(jù)序列作出響應(yīng)。這種模式簡(jiǎn)化了并發(fā)的操作,因?yàn)樗恍枰枞却齇bservable發(fā)射數(shù)據(jù),而是創(chuàng)建一個(gè)處于待命狀態(tài)的觀察者哨兵,哨兵在未來(lái)某個(gè)時(shí)刻響應(yīng)Observable的通知。
Observable
如上圖是Observable發(fā)射數(shù)據(jù)的一個(gè)流程圖, 時(shí)間線 左邊 ----右邊, 各個(gè)不同形狀標(biāo)識(shí)Observable上的元素 最后垂直線表示Observable成功執(zhí)行 向下虛線箭標(biāo)識(shí)數(shù)據(jù)被發(fā)射出去 盒子表示各種操作符來(lái)對(duì)對(duì)應(yīng)數(shù)據(jù)進(jìn)行處理 第二條時(shí)間線也是一個(gè)Observable,不過(guò)是轉(zhuǎn)換之后的 當(dāng)轉(zhuǎn)換時(shí)候出現(xiàn)錯(cuò)誤將會(huì)并不會(huì)終止,他會(huì)用一個(gè)error事件替代這個(gè)位置 Subscribe
觀察者通過(guò)SubScribe操作關(guān)聯(lián)Observable Observer
觀察者,決定了事件觸發(fā)的時(shí)候?qū)⒂性趺礃拥男袨?/li> void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
onNext事件,被觀察者每發(fā)射一個(gè)數(shù)據(jù)都會(huì)調(diào)onNext事件,相當(dāng)于異步任務(wù)中的回調(diào)接口,相當(dāng)于Feature的get獲取,只不過(guò)onNext不是阻塞的就是一個(gè)哨兵模式,每次發(fā)射數(shù)據(jù),獲取立即獲取對(duì)應(yīng)結(jié)果,然后執(zhí)行之后的邏輯 onCompleted事件,表示事件隊(duì)列完成,Rxjava不僅把每一個(gè)事件單獨(dú)處理,還會(huì)把他看做一個(gè)隊(duì)列,Rxjava規(guī)定,當(dāng)不會(huì)再發(fā)射新的元素觸發(fā)onNext事件時(shí)候,需要觸發(fā)onCompleted事件作為結(jié)束標(biāo)志。 onError事件,事件隊(duì)列處理中出現(xiàn)異常時(shí)候,onError會(huì)被觸發(fā),可以在onError中統(tǒng)一處理異常情況 onSubScribe事件,表示元素開(kāi)始發(fā)射,相當(dāng)于所有元素執(zhí)行之前的一個(gè)預(yù)處理位置。 Schedulers
默認(rèn)情況下Observable和Observer執(zhí)行過(guò)程是在同一個(gè)線程執(zhí)行如上面最簡(jiǎn)單例子,如果想要切換線程在不同線程執(zhí)行可以用SubscribeOn(),observeOn()。 Rxjava提供了幾種線調(diào)度器
調(diào)度器類型效果 Schedulers.newThread(); 為每一個(gè)任務(wù)創(chuàng)建一個(gè)新線程 Schedulers.computation(); 用于計(jì)算任務(wù),如時(shí)間循環(huán)和回調(diào)處理,不要用于IO操作,默認(rèn)線程數(shù)等于處理器數(shù)量 Schedulers.io(); 用于IO密集型任務(wù),如異步阻塞IO操作,這個(gè)調(diào)度器的線程池會(huì)根據(jù)需要增長(zhǎng);對(duì)應(yīng)普通計(jì)算任務(wù),一般用上面這個(gè),Schedulers.io默認(rèn)是一個(gè)CachedThreadScheduler,很像一個(gè)有線程緩存的新線程調(diào)度器。 Schedulers.single(); 擁有一個(gè)線程單例的線程池,所有任務(wù)都在這一個(gè)線程中執(zhí)行,當(dāng)線程中有任務(wù)執(zhí)行時(shí)候,他的任務(wù)將會(huì)按照先進(jìn)先出的順序依次執(zhí)行。 Schedulers.trampoline(); Creates and returns a Scheduler that queues work on the current thread to be executed after the current work completes. 在當(dāng)前線程立即執(zhí)行任務(wù),如果當(dāng)前線程有任務(wù)在執(zhí)行,則會(huì)將其暫停,等插入進(jìn)來(lái)的任務(wù)執(zhí)行完之后,再將未完成的任務(wù)接著執(zhí)行。
來(lái)一個(gè)完整的例子來(lái)解釋一下線程切換:
Observable . create ( new ObservableOnSubscribe < Integer > ( ) { @Override public void subscribe ( ObservableEmitter < Integer > emitter
) throws Exception { for ( Integer integer
: intList
) { System . out
. println ( Thread . currentThread ( ) . getName ( ) + " : send" ) ; emitter
. onNext ( integer
) ; } emitter
. onComplete ( ) ; } } ) . subscribeOn ( Schedulers . computation ( ) ) . observeOn ( Schedulers . newThread ( ) ) . flatMap ( new Function < Integer , ObservableSource < ? > > ( ) { @Override public ObservableSource < ? > apply ( Integer integer
) throws Exception { return Observable . just ( integer
) . subscribeOn ( Schedulers . computation ( ) ) . filter ( new Predicate < Integer > ( ) { @Override public boolean test ( Integer integer
) throws Exception { System . out
. println ( Thread . currentThread ( ) . getName ( ) + ": filter one integer: " + integer
) ; return integer
> 2 ; } } ) ; } } ) . observeOn ( Schedulers . io ( ) ) . subscribe ( new Consumer < Object > ( ) { @Override public void accept ( Object o
) throws Exception { System . out
. println ( Thread . currentThread ( ) . getName ( ) + " : onSubscribe" ) ; } } ) ;
這個(gè)代碼看起來(lái)比較長(zhǎng),也可以這么寫:
Observable . create ( emitter
-> { intList
. forEach ( intTemp
-> emitter
. onNext ( intTemp
) ) ; emitter
. onComplete ( ) ; } ) . subscribeOn ( Schedulers . computation ( ) ) . observeOn ( Schedulers . computation ( ) ) . flatMap ( intStr
-> Observable . just ( intStr
) . subscribeOn ( Schedulers . computation ( ) ) . filter ( filterInt
-> Integer . valueOf ( filterInt
. toString ( ) ) > 2 ) ) . observeOn ( Schedulers . computation ( ) ) . subscribe ( intTemp
-> System . out
. println ( intTemp
) ) ;
第一個(gè)subscribeOn指定被觀察對(duì)象發(fā)射的線程,使用的computation模型 第一個(gè)observeOn指定之后的flatMap操作符切換到另外線程中執(zhí)行 最后的observeOn指定觀察者哨兵消費(fèi)數(shù)據(jù)的線程,會(huì)有如下結(jié)果
Observable的這種異步切換線程方式從整體流程來(lái)看還是同步的方式,他必須先Observable發(fā)射數(shù)據(jù)-----操作符change-----消費(fèi)數(shù)據(jù)并不是每次發(fā)射一個(gè)數(shù)據(jù)的同時(shí)進(jìn)行change接著消費(fèi)的并行實(shí)現(xiàn),因此Rxjava提供了另外一個(gè)并行的方式,如下案例
public static void flowableDemo ( ) throws InterruptedException { Flowable . create ( new FlowableOnSubscribe < Integer > ( ) { @Override public void subscribe ( FlowableEmitter < Integer > emitter
) throws Exception { for ( int i
= 1 ; i
< 100 ; i
++ ) { System . out
. println ( Thread . currentThread ( ) . getName ( ) + " 發(fā)射數(shù)據(jù)" ) ; emitter
. onNext ( i
) ; } emitter
. onComplete ( ) ; } } , BackpressureStrategy . ERROR
) . subscribeOn ( Schedulers . io ( ) ) . observeOn ( Schedulers . newThread ( ) ) . filter ( new Predicate < Integer > ( ) { @Override public boolean test ( Integer integer
) throws Exception { System . out
. println ( Thread . currentThread ( ) . getName ( ) + " 過(guò)濾發(fā)射數(shù)據(jù)" ) ; return integer
> 0 ; } } ) . observeOn ( Schedulers . newThread ( ) ) . subscribe ( new Subscriber < Object > ( ) { public void onSubscribe ( Subscription subscription
) { System . out
. println ( "取出n條數(shù)據(jù)" ) ; subscription
. request ( 3 ) ; } public void onNext ( Object o
) { try { Thread . sleep ( 2000 ) ; } catch ( InterruptedException e
) { e
. printStackTrace ( ) ; } System . out
. println ( Thread . currentThread ( ) . getName ( ) ) ; System . out
. println ( "消費(fèi)數(shù)據(jù):" + o
) ; } public void onError ( Throwable throwable
) { System . out
. println ( throwable
. getMessage ( ) ) ; } public void onComplete ( ) { System . out
. println ( "onComplete" ) ; } } ) ; }
用Flowable不僅僅是對(duì)每個(gè)模塊進(jìn)行了線程的切換,在同時(shí)也是并行的執(zhí)行了整個(gè)流程 我覺(jué)得在異步編程方面Rxjava的確比原始的Thread ,Runnable這類操作要方便的多,通過(guò)幾個(gè)操作符就可以達(dá)到異步的目的,這也是Rxjava的一個(gè)優(yōu)勢(shì),而在我們工作中我們一般都是用架構(gòu)組的異步框架也可以做到很優(yōu)雅的異步編程,比起Rxjava而言只是會(huì)多創(chuàng)建一個(gè)異步類而已,那么我們來(lái)對(duì)比一下兩種異步操作,我用之前的批量關(guān)注接口做測(cè)試 原來(lái)的異步方式:
public void batchFollowPush ( UserInfo userInfo
, List < UserInfo > objectUserInfos
, Integer source
) { List < Boolean > batchFollowResult
= new ArrayList < > ( ) ; Future < Boolean > pushFuture
= null ; for ( UserInfo objectUserInfo
: objectUserInfos
) { try { pushFuture
= ( Future < Boolean > ) pushAsync
. asyncFollowAndStoreMoment ( userInfo
, objectUserInfo
, source
) ; batchFollowResult
. add ( pushFuture
. get ( ) ) ; } catch ( Exception e
) { if ( pushFuture
!= null ) { pushFuture
. cancel ( true ) ; } throw new RuntimeException ( e
) ; } } log
. info ( "batchFollow result:{}" , JSON
. toJSONString ( batchFollowResult
) ) ; } public Object asyncFollowAndStoreMoment ( UserInfo userInfo
, UserInfo objectUserInfo
, Integer source
) { Optional < InteractError > interactErrorOptional
= personFacade
. interactRestrict ( userInfo
, objectUserInfo
) ; if ( interactErrorOptional
. isPresent ( ) ) { return Boolean . FALSE
; } int result
= wooerFacade
. addOrUpdatePush2 ( userInfo
. getMemberID ( ) , objectUserInfo
. getMemberID ( ) , 1 , source
, HeadUaUtils . getPlatform ( ) ) ; Boolean isTrue
= result
> 0 ; if ( isTrue
== null || ! isTrue
) { return Boolean . FALSE
; } limitedPushFacade
. followPush ( userInfo
, objectUserInfo
) ; MomentListStoreParam momentListStoreParam
= new MomentListStoreParam ( Arrays . asList ( Long . valueOf ( objectUserInfo
. getMemberID ( ) ) ) , userInfo
. getMemberID ( ) . longValue ( ) ) ; log
. info ( "batchFollow afterFilter param:{}" , JSON
. toJSONString ( momentListStoreParam
) ) ; momentManagerFacade
. storeMomentMongoByMomentId ( momentListStoreParam
) ; return Boolean . TRUE
; }
public IResponse batchFollowRx ( BatchFollowForm batchFollowForm
, UserInfo myUserInfo
) { log
. info ( "batchFollow param:{}" , JSON
. toJSONString ( batchFollowForm
) ) ; BusinessAssert . isNotNull ( batchFollowForm
, CommonError . ARGS_EMPTY
) ; List < Long > objectIDs
= batchFollowForm
. getObjectIDs ( ) ; List < UserInfo > objectUserInfo
= coreUserService
. getList ( objectIDs
, UserInfo . class ) ; Flowable . create ( new FlowableOnSubscribe < UserInfo > ( ) { @Override public void subscribe ( FlowableEmitter < UserInfo > emitter
) throws Exception { for ( UserInfo info
: objectUserInfo
) { emitter
. onNext ( info
) ; } } } , BackpressureStrategy . ERROR
) . parallel ( ) . runOn ( Schedulers . io ( ) ) . filter ( userInfo
-> { Optional < InteractError > interactErrorOptional
= personFacade
. interactRestrict ( myUserInfo
, userInfo
) ; if ( interactErrorOptional
. isPresent ( ) ) { return Boolean . FALSE
; } Boolean isTrue
= wooerFacade
. addOrUpdatePush2 ( myUserInfo
. getMemberID ( ) , userInfo
. getMemberID ( ) , 1 , batchFollowForm
. getSource ( ) , HeadUaUtils . getPlatform ( ) ) > 0 ; if ( isTrue
== null || ! isTrue
) { return Boolean . FALSE
; } return Boolean . TRUE
; } ) . runOn ( Schedulers . computation ( ) ) . sequential ( ) . subscribe ( new Consumer < UserInfo > ( ) { @Override public void accept ( UserInfo userInfo
) throws Exception { limitedPushFacade
. followPush ( myUserInfo
, userInfo
) ; MomentListStoreParam momentListStoreParam
= new MomentListStoreParam ( Arrays . asList ( Long . valueOf ( userInfo
. getMemberID ( ) ) ) , userInfo
. getMemberID ( ) . longValue ( ) ) ; log
. info ( "batchFollow afterFilter param:{}" , JSON
. toJSONString ( momentListStoreParam
) ) ; momentManagerFacade
. storeMomentMongoByMomentId ( momentListStoreParam
) ; } } , new Consumer < Throwable > ( ) { @Override public void accept ( Throwable throwable
) throws Exception { log
. error ( "batch follow error exception:{}" , throwable
. getMessage ( ) ) ; } } ) ; return MessageResponse . success ( FOLLOW_SUCCESS
) ; }
public static void oomDemo ( ) { Observable . create ( new ObservableOnSubscribe < Integer > ( ) { @Override public void subscribe ( ObservableEmitter < Integer > emitter
) throws Exception { for ( int i
= 0 ; ; i
++ ) { System . out
. println ( Thread . currentThread ( ) . getName ( ) + " onNext : " + i
) ; emitter
. onNext ( i
) ; } } } ) . subscribeOn ( Schedulers . io ( ) ) . observeOn ( Schedulers . single ( ) ) . subscribe ( new Consumer < Integer > ( ) { @Override public void accept ( Integer integer
) throws Exception { Thread . sleep ( 2000 ) ; System . out
. println ( Thread . currentThread ( ) . getName ( ) + " consumer : " + integer
) ; } } ) ; }
讓發(fā)射數(shù)據(jù)在多個(gè)線程中執(zhí)行,讓消費(fèi)數(shù)據(jù)在一個(gè)線程中執(zhí)行并且每?jī)擅氩畔M(fèi)一個(gè),這樣會(huì)導(dǎo)致發(fā)射的數(shù)據(jù)不斷的累積在內(nèi)存中,最終可能會(huì)導(dǎo)致oom,我們通過(guò)內(nèi)存信息來(lái)看他執(zhí)行之后一段時(shí)間的堆內(nèi)存信息
PSYoiungGen 年輕態(tài)區(qū),總共1024k,使用了511k eden區(qū)域是新對(duì)象區(qū),已經(jīng)被沾滿 from和to區(qū)域 大學(xué)是一樣,在gc時(shí)候會(huì)遍歷from或者to區(qū)域,將不能清除的拷貝到另外一個(gè)區(qū),然后清除本區(qū)域留下的,然后循環(huán) paroldGen 老年代區(qū)域也已經(jīng)被占滿 這種狀態(tài)下Observable因內(nèi)存不夠已經(jīng)oom,停止運(yùn)行了,只有消費(fèi)線程在消費(fèi)數(shù)據(jù)。
io. reactivex. exceptions. UndeliverableException: The exception could not be delivered
to the consumer because it has already canceled
/ disposed the flow or the exception has nowhere
to go to begin with. Further reading
: https
: / / github
. com
/ ReactiveX / RxJava / wiki
/ What 's
- different
- in
- 2.0 #error
- handling
| java. lang. OutOfMemoryError: GC overhead limit exceededat
io. reactivex. plugins. RxJavaPlugins. onError ( RxJavaPlugins . java
: 367 ) at
io. reactivex. internal. schedulers. ScheduledRunnable. run ( ScheduledRunnable . java
: 69 ) at
io. reactivex. internal. schedulers. ScheduledRunnable. call ( ScheduledRunnable . java
: 57 ) at
java. util. concurrent. FutureTask. run ( FutureTask . java
: 266 ) at
java. util. concurrent. ScheduledThreadPoolExecutor$
ScheduledFutureTask . access$
201 ( ScheduledThreadPoolExecutor . java
: 180 ) at
java. util. concurrent. ScheduledThreadPoolExecutor$
ScheduledFutureTask . run ( ScheduledThreadPoolExecutor . java
: 293 )
我們用一個(gè)Flowable的例子來(lái)看他如何解決這個(gè)oom的問(wèn)題:
public static void oomDemoFix ( ) { Flowable . create ( new FlowableOnSubscribe < Integer > ( ) { @Override public void subscribe ( FlowableEmitter < Integer > emitter
) throws Exception { for ( int i
= 0 ; ; i
++ ) { System . out
. println ( Thread . currentThread ( ) . getName ( ) + " onNext : " + i
) ; emitter
. onNext ( i
) ; } } } , BackpressureStrategy . ERROR
) . subscribeOn ( Schedulers . io ( ) ) . observeOn ( Schedulers . io ( ) ) . subscribe ( new Subscriber < Integer > ( ) { @Override public void onSubscribe ( Subscription subscription
) { subscription
. request ( 50 ) ; } @Override public void onNext ( Integer integer
) { System . out
. println ( Thread . currentThread ( ) . getName ( ) + "消費(fèi)數(shù)據(jù): " + integer
) ; } @Override public void onError ( Throwable throwable
) { } @Override public void onComplete ( ) { } } ) ; }
我們?cè)趧?chuàng)建Flowable的時(shí)候增加了一個(gè)參數(shù) BackpressureStrategy.ERROR,這個(gè)參數(shù)指定了在處理背壓?jiǎn)栴}時(shí)候執(zhí)行的一個(gè)策略,當(dāng)內(nèi)存滿時(shí)候接下來(lái)繼續(xù)發(fā)射的數(shù)據(jù)將會(huì)拋出MissingBackpressureException 異常,其余的策略稍等介紹 還有另外一個(gè)不同 onSubscribe中傳遞的不是Disposable變成了Subscription,而且還執(zhí)行了這句代碼subscription.request(50)。因?yàn)樵贔lowable中采用了一個(gè)新的思路,響應(yīng)獲取發(fā)射數(shù)據(jù)的方法來(lái)解決流量不均勻而造成的oom的問(wèn)題,也就是我要消費(fèi)多少我就取多少,這里我們從發(fā)射數(shù)據(jù)中取出了50條。其他的還是會(huì)存儲(chǔ)在內(nèi)存中。
Flowable中主要有這幾個(gè)策略
BackpressureStrategy.ERROR:如果緩存池(默認(rèn)128)溢出會(huì)立刻拋異常MissingBackpressureexception BackpressureStrategy.BUFFER:RxJava中有一個(gè)默認(rèn)能夠存儲(chǔ)128個(gè)事件的緩存池,可以調(diào)節(jié)大小,生產(chǎn)者生產(chǎn)事件,并且將處理不了的事件緩存。(謹(jǐn)慎使用,因?yàn)橐蕾噷?duì)消費(fèi)者消費(fèi)能力,耗內(nèi)存) BackpressureStrategy.DROP:消費(fèi)不了就丟棄,比如先生產(chǎn)200個(gè),并沒(méi)有消費(fèi),而是在緩存,然后消費(fèi)者request(200),但是緩存只有128個(gè),所以其他的事件都丟棄了。 BackpressureStrategy.LATEST:和DROP功能基本一致,處理不了丟棄,區(qū)別在于LATEST能夠讓消費(fèi)者接收到生產(chǎn)者產(chǎn)生的最后一個(gè)事件。 BackpressureStrategy.MISSING:生產(chǎn)的事件沒(méi)有進(jìn)行緩存和丟棄,下游接收到的事件必須進(jìn)行消費(fèi)或者處理!
感覺(jué)這些都是緩兵之計(jì),能否按照我的消費(fèi)能力來(lái)發(fā)射數(shù)據(jù)呢,這樣才完美。
Rxjava2.x后有一個(gè)FlowableEmitter 這個(gè)接口:
public static void fix ( Flowable flowable
) { flowable
. observeOn ( Schedulers . computation ( ) ) . subscribe ( new Subscriber < Integer > ( ) { @Override public void onSubscribe ( Subscription subscription
) { subscription
. request ( 20 ) ; } @Override public void onNext ( Integer integer
) { System . out
. println ( "消費(fèi)數(shù)據(jù): " + 100 ) ; } @Override public void onError ( Throwable throwable
) { } @Override public void onComplete ( ) { } } ) ; } public static Flowable flowableEmitterDemo ( ) { Flowable flowable
= Flowable . create ( new FlowableOnSubscribe < Integer > ( ) { @Override public void subscribe ( FlowableEmitter < Integer > emitter
) throws Exception { while ( emitter
. requested ( ) > 0 ) { System . out
. println ( "下游處理能力:" + emitter
. requested ( ) ) ; emitter
. onNext ( 100 ) ; } } } , BackpressureStrategy . ERROR
) . subscribeOn ( Schedulers . io ( ) ) ; return flowable
; }
我可以在Flowable發(fā)射數(shù)據(jù)之前通過(guò)requested來(lái)獲取下游Subscriber的消費(fèi)能力,依據(jù)這個(gè)來(lái)進(jìn)行數(shù)據(jù)的發(fā)射,這樣既可以控制發(fā)射以及消費(fèi)數(shù)據(jù)的速度,也能夠避免數(shù)據(jù)的丟失
現(xiàn)在我們看下開(kāi)始時(shí)候從官網(wǎng)摘抄的Rx的幾個(gè)優(yōu)點(diǎn):
首先函數(shù)式風(fēng)格,這種編程模式和常規(guī)的方式比較的確簡(jiǎn)化了不少代碼比如第一個(gè)案例,但是感覺(jué)我們用stream表達(dá)式加上lambda也可以達(dá)到這種效果,而且對(duì)于map,flatmap,filter等這些操作符對(duì)于沒(méi)有函數(shù)式編程的人來(lái)說(shuō)并不好理解不覺(jué)得這是優(yōu)勢(shì)
簡(jiǎn)化代碼,這點(diǎn)主要體現(xiàn)在異步編程模式時(shí)候,不管和我們java中的異步編程用的Thread和Runnable相比,還是我們框架中的異步編程框架比較的確代碼都更加簡(jiǎn)單,只需要通過(guò)幾個(gè)異步線程切換的操作符便可以達(dá)到目的,但是缺點(diǎn)也很明顯,需要引入新的jar,新的技術(shù)對(duì)不熟悉這塊技術(shù)的同事并不友好有一定學(xué)習(xí)成本不利于維護(hù)。
異步錯(cuò)誤處理,輕松使用并發(fā):通過(guò)onError捕獲異常信息,通過(guò)操作法切換線程,的確也是優(yōu)勢(shì)所在。
在之前的實(shí)踐中還有這種業(yè)務(wù)模型下使用Rxjava會(huì)更具優(yōu)勢(shì),當(dāng)我們需要從多個(gè)網(wǎng)絡(luò)環(huán)境來(lái)獲取各自信息從中篩選出符合我們預(yù)期的并對(duì)其進(jìn)行組合,我們可以通過(guò)Rxjava的豐富的操作符以及異步操作來(lái)完成。來(lái)一個(gè)簡(jiǎn)單的例子
public static Flowable getIOData1 ( ) { return Flowable . create ( new FlowableOnSubscribe < Integer > ( ) { @Override public void subscribe ( @NonNull FlowableEmitter < Integer > flowableEmitter
) throws Exception { for ( int i
= 0 ; i
< 10 ; i
++ ) { flowableEmitter
. onNext ( i
) ; } System . out
. println ( Thread . currentThread ( ) . getName ( ) ) ; } } , BackpressureStrategy . DROP
) . subscribeOn ( Schedulers . io ( ) ) . filter ( temp
-> temp
> 2 ) ; } public static Flowable getIOData2 ( ) { return Flowable . create ( new FlowableOnSubscribe < Integer > ( ) { @Override public void subscribe ( @NonNull FlowableEmitter < Integer > flowableEmitter
) throws Exception { for ( int i
= 10 ; i
< 21 ; i
++ ) { flowableEmitter
. onNext ( i
) ; } System . out
. println ( Thread . currentThread ( ) . getName ( ) ) ; } } , BackpressureStrategy . DROP
) . subscribeOn ( Schedulers . io ( ) ) . filter ( temp
-> temp
> 12 ) ; } public static Flowable getIOData3 ( ) { return Flowable . create ( new FlowableOnSubscribe < Integer > ( ) { @Override public void subscribe ( @NonNull FlowableEmitter < Integer > flowableEmitter
) throws Exception { for ( int i
= 20 ; i
< 30 ; i
++ ) { flowableEmitter
. onNext ( i
) ; } System . out
. println ( Thread . currentThread ( ) . getName ( ) ) ; } } , BackpressureStrategy . DROP
) . subscribeOn ( Schedulers . io ( ) ) . filter ( temp
-> temp
> 22 ) ; } public static Flowable getIOData4 ( ) { return Flowable . create ( new FlowableOnSubscribe < Integer > ( ) { @Override public void subscribe ( @NonNull FlowableEmitter < Integer > flowableEmitter
) throws Exception { for ( int i
= 30 ; i
< 41 ; i
++ ) { flowableEmitter
. onNext ( i
) ; } System . out
. println ( Thread . currentThread ( ) . getName ( ) ) ; } } , BackpressureStrategy . DROP
) . subscribeOn ( Schedulers . io ( ) ) . filter ( temp
-> temp
> 32 ) ; } public static void mergeDemo ( ) { Flowable . merge ( getIOData1 ( ) , getIOData2 ( ) , getIOData3 ( ) , getIOData4 ( ) ) . map ( temp
-> "onNext" + temp
) . flatMap ( new Function ( ) { @Override public Object apply ( @NonNull Object o
) throws Exception { return Flowable . just ( o
) . subscribeOn ( Schedulers . io ( ) ) ; } } ) . subscribeOn ( Schedulers . newThread ( ) ) . observeOn ( Schedulers . computation ( ) ) . subscribe ( new Subscriber ( ) { @Override public void onSubscribe ( Subscription subscription
) { subscription
. request ( Long . MAX_VALUE
) ; } @Override public void onNext ( Object o
) { System . out
. println ( "onNext: " + o
) ; } @Override public void onError ( Throwable throwable
) { } @Override public void onComplete ( ) { } } ) ; }
我們定義N個(gè)Flowable用異步方式分別請(qǐng)求各個(gè)第三方接口來(lái)獲取對(duì)應(yīng)的數(shù)據(jù)并且用filter過(guò)濾出我們需要的信息,然后通過(guò)merge操作法將所有獲取到的數(shù)據(jù)組合到同一個(gè)Flowable中,進(jìn)行統(tǒng)一的封裝處理以及之后的一些業(yè)務(wù)操作。 如果我們用傳統(tǒng)的方式,我們不得不定義N個(gè)變量來(lái)獲取四個(gè)異步線程數(shù)據(jù),然后等都獲取完畢之后,在分別對(duì)四個(gè)變量中保存的信息進(jìn)行篩選,之后通過(guò)邏輯操作合并到一起,和rxjava相比顯然要遜色很多。 這種方式就是Flowable通過(guò)內(nèi)置操作符對(duì)自身發(fā)射的數(shù)據(jù)在空間維度上重新組織,或者和其他Flowable發(fā)射的數(shù)據(jù)一起在空間維度上進(jìn)行重新組織,是的觀察者的邏輯變得更加的簡(jiǎn)單直觀因?yàn)橹苯涌床僮鞣湍苤谰唧w做了什么,不需要關(guān)心數(shù)據(jù)從哪里來(lái)這部分由Flowable屏蔽了,從而使得觀察者更加專注于下游邏輯。
RxJava的響應(yīng)式優(yōu)勢(shì)只有在異步邏輯占主導(dǎo)時(shí)才會(huì)體現(xiàn)出來(lái).
wiki地址:https://github.com/ReactiveX/RxJava/wiki reactivex官網(wǎng):http://reactivex.io/
創(chuàng)作挑戰(zhàn)賽 新人創(chuàng)作獎(jiǎng)勵(lì)來(lái)咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)
總結(jié)
以上是生活随笔 為你收集整理的Rx2.0后台开发分享 的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
如果覺(jué)得生活随笔 網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔 推薦給好友。