日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

Rx2.0后台开发分享

發(fā)布時(shí)間:2023/12/4 编程问答 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Rx2.0后台开发分享 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

Rxjava2.x

  • 微軟的一個(gè)函數(shù)庫(kù),Rx是一個(gè)編程模型,模板是提供一致的編程接口,幫助開(kāi)發(fā)者更方便的處理異步數(shù)據(jù)流,現(xiàn)在Rx已經(jīng)支持幾乎全部的流行編程語(yǔ)言。比較流行的有Rxjava,RxJs,Rx.NET,社區(qū)網(wǎng)站:http://reactivex.io/
  • Rx使用觀察者模式
    • 使用觀察者模式監(jiān)聽(tīng):RX可以訂閱任何可觀察的數(shù)據(jù)流并且執(zhí)行操作
    • 組合:RX使用查詢式的操作符和變換數(shù)據(jù)流
    • 創(chuàng)建:Rx可以方便的創(chuàng)建事件流和數(shù)據(jù)流
  • 簡(jiǎn)化代碼
    • 函數(shù)式風(fēng)格:對(duì)可觀察數(shù)據(jù)流使用無(wú)副作用的輸入輸出函數(shù),避免程序中錯(cuò)綜復(fù)雜的狀態(tài)
    • 簡(jiǎn)化代碼:Rx的操作符通常可以將復(fù)雜難題簡(jiǎn)單化為很少的代碼
    • 異步錯(cuò)誤處理: 傳統(tǒng)的try/catch沒(méi)有辦法處理異步計(jì)算,Rx提供了合適的錯(cuò)誤處理機(jī)制
    • 輕松使用并發(fā):Rx的Observables和Schedulers讓開(kāi)發(fā)者可以擺脫底層線程同步和各種并發(fā)問(wèn)題。
  • jar包maven倉(cāng)庫(kù)地址
<dependency><groupId>io.reactivex.rxjava2</groupId><artifactId>rxjava</artifactId><version>2.2.6</version> </dependency>
  • 一個(gè)簡(jiǎn)單的例子:

    public void myTestObservable(){Observable.fromIterable(Lists.newArrayList(1,2,3,4,5)).filter(integer -> {return integer > 2;}).subscribe(integer -> {System.out.println(Thread.currentThread().getName() + " : "+ integer );});}
    • 在Rxjava中,一個(gè)實(shí)現(xiàn)了Observer接口的對(duì)象可以訂閱(subscribe)一個(gè)Observable類的實(shí)例。訂閱者(Subscriber)對(duì)Observable發(fā)射(emit)的任何數(shù)據(jù)或數(shù)據(jù)序列作出響應(yīng)。這種模式簡(jiǎn)化了并發(fā)的操作,因?yàn)樗恍枰枞却齇bservable發(fā)射數(shù)據(jù),而是創(chuàng)建一個(gè)處于待命狀態(tài)的觀察者哨兵,哨兵在未來(lái)某個(gè)時(shí)刻響應(yīng)Observable的通知。

  • Observable

    • 如上圖是Observable發(fā)射數(shù)據(jù)的一個(gè)流程圖,
    • 時(shí)間線 左邊 ----右邊, 各個(gè)不同形狀標(biāo)識(shí)Observable上的元素
    • 最后垂直線表示Observable成功執(zhí)行
    • 向下虛線箭標(biāo)識(shí)數(shù)據(jù)被發(fā)射出去
    • 盒子表示各種操作符來(lái)對(duì)對(duì)應(yīng)數(shù)據(jù)進(jìn)行處理
    • 第二條時(shí)間線也是一個(gè)Observable,不過(guò)是轉(zhuǎn)換之后的
    • 當(dāng)轉(zhuǎn)換時(shí)候出現(xiàn)錯(cuò)誤將會(huì)并不會(huì)終止,他會(huì)用一個(gè)error事件替代這個(gè)位置
  • Subscribe

    • 觀察者通過(guò)SubScribe操作關(guān)聯(lián)Observable
  • Observer

    • 觀察者,決定了事件觸發(fā)的時(shí)候?qū)⒂性趺礃拥男袨?/li>
    void onSubscribe(@NonNull Disposable d); void onNext(@NonNull T t); void onError(@NonNull Throwable e); void onComplete();
    • onNext事件,被觀察者每發(fā)射一個(gè)數(shù)據(jù)都會(huì)調(diào)onNext事件,相當(dāng)于異步任務(wù)中的回調(diào)接口,相當(dāng)于Feature的get獲取,只不過(guò)onNext不是阻塞的就是一個(gè)哨兵模式,每次發(fā)射數(shù)據(jù),獲取立即獲取對(duì)應(yīng)結(jié)果,然后執(zhí)行之后的邏輯
    • onCompleted事件,表示事件隊(duì)列完成,Rxjava不僅把每一個(gè)事件單獨(dú)處理,還會(huì)把他看做一個(gè)隊(duì)列,Rxjava規(guī)定,當(dāng)不會(huì)再發(fā)射新的元素觸發(fā)onNext事件時(shí)候,需要觸發(fā)onCompleted事件作為結(jié)束標(biāo)志。
    • onError事件,事件隊(duì)列處理中出現(xiàn)異常時(shí)候,onError會(huì)被觸發(fā),可以在onError中統(tǒng)一處理異常情況
    • onSubScribe事件,表示元素開(kāi)始發(fā)射,相當(dāng)于所有元素執(zhí)行之前的一個(gè)預(yù)處理位置。
  • Schedulers

    • 默認(rèn)情況下Observable和Observer執(zhí)行過(guò)程是在同一個(gè)線程執(zhí)行如上面最簡(jiǎn)單例子,如果想要切換線程在不同線程執(zhí)行可以用SubscribeOn(),observeOn()。
    • Rxjava提供了幾種線調(diào)度器
調(diào)度器類型效果
Schedulers.newThread();為每一個(gè)任務(wù)創(chuàng)建一個(gè)新線程
Schedulers.computation();用于計(jì)算任務(wù),如時(shí)間循環(huán)和回調(diào)處理,不要用于IO操作,默認(rèn)線程數(shù)等于處理器數(shù)量
Schedulers.io();用于IO密集型任務(wù),如異步阻塞IO操作,這個(gè)調(diào)度器的線程池會(huì)根據(jù)需要增長(zhǎng);對(duì)應(yīng)普通計(jì)算任務(wù),一般用上面這個(gè),Schedulers.io默認(rèn)是一個(gè)CachedThreadScheduler,很像一個(gè)有線程緩存的新線程調(diào)度器。
Schedulers.single();擁有一個(gè)線程單例的線程池,所有任務(wù)都在這一個(gè)線程中執(zhí)行,當(dāng)線程中有任務(wù)執(zhí)行時(shí)候,他的任務(wù)將會(huì)按照先進(jìn)先出的順序依次執(zhí)行。
Schedulers.trampoline();Creates and returns a Scheduler that queues work on the current thread to be executed after the current work completes. 在當(dāng)前線程立即執(zhí)行任務(wù),如果當(dāng)前線程有任務(wù)在執(zhí)行,則會(huì)將其暫停,等插入進(jìn)來(lái)的任務(wù)執(zhí)行完之后,再將未完成的任務(wù)接著執(zhí)行。
  • 來(lái)一個(gè)完整的例子來(lái)解釋一下線程切換:
Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {for (Integer integer : intList) {System.out.println(Thread.currentThread().getName() + " : send");emitter.onNext(integer);}emitter.onComplete();}}).subscribeOn(Schedulers.computation()).observeOn(Schedulers.newThread()).flatMap(new Function<Integer, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(Integer integer) throws Exception {return Observable.just(integer).subscribeOn(Schedulers.computation()).filter(new Predicate<Integer>() {@Overridepublic boolean test(Integer integer) throws Exception {System.out.println(Thread.currentThread().getName() + ": filter one integer: "+ integer);return integer > 2;}});}}).observeOn(Schedulers.io()).subscribe(new Consumer<Object>() {@Overridepublic void accept(Object o) throws Exception {System.out.println(Thread.currentThread().getName()+ " : onSubscribe");}});

這個(gè)代碼看起來(lái)比較長(zhǎng),也可以這么寫:

Observable.create(emitter -> {intList.forEach(intTemp -> emitter.onNext(intTemp));emitter.onComplete();}).subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation()).flatMap(intStr -> Observable.just(intStr).subscribeOn(Schedulers.computation()).filter(filterInt -> Integer.valueOf(filterInt.toString()) > 2)).observeOn(Schedulers.computation()).subscribe(intTemp -> System.out.println(intTemp));
  • 第一個(gè)subscribeOn指定被觀察對(duì)象發(fā)射的線程,使用的computation模型
  • 第一個(gè)observeOn指定之后的flatMap操作符切換到另外線程中執(zhí)行
  • 最后的observeOn指定觀察者哨兵消費(fèi)數(shù)據(jù)的線程,會(huì)有如下結(jié)果

  • Observable的這種異步切換線程方式從整體流程來(lái)看還是同步的方式,他必須先Observable發(fā)射數(shù)據(jù)-----操作符change-----消費(fèi)數(shù)據(jù)并不是每次發(fā)射一個(gè)數(shù)據(jù)的同時(shí)進(jìn)行change接著消費(fèi)的并行實(shí)現(xiàn),因此Rxjava提供了另外一個(gè)并行的方式,如下案例
public static void flowableDemo() throws InterruptedException {Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {for (int i = 1; i < 100; i++) {System.out.println(Thread.currentThread().getName() + " 發(fā)射數(shù)據(jù)");emitter.onNext(i);}emitter.onComplete();}}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()).observeOn(Schedulers.newThread()).filter(new Predicate<Integer>() {@Overridepublic boolean test(Integer integer) throws Exception {System.out.println(Thread.currentThread().getName() + " 過(guò)濾發(fā)射數(shù)據(jù)");return integer > 0;}}).observeOn(Schedulers.newThread()).subscribe(new Subscriber<Object>() {public void onSubscribe(Subscription subscription) {System.out.println("取出n條數(shù)據(jù)");subscription.request(3);}public void onNext(Object o) {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName());System.out.println("消費(fèi)數(shù)據(jù):" + o);}public void onError(Throwable throwable) {System.out.println(throwable.getMessage());}public void onComplete() {System.out.println("onComplete");}});}

  • 用Flowable不僅僅是對(duì)每個(gè)模塊進(jìn)行了線程的切換,在同時(shí)也是并行的執(zhí)行了整個(gè)流程
  • 我覺(jué)得在異步編程方面Rxjava的確比原始的Thread ,Runnable這類操作要方便的多,通過(guò)幾個(gè)操作符就可以達(dá)到異步的目的,這也是Rxjava的一個(gè)優(yōu)勢(shì),而在我們工作中我們一般都是用架構(gòu)組的異步框架也可以做到很優(yōu)雅的異步編程,比起Rxjava而言只是會(huì)多創(chuàng)建一個(gè)異步類而已,那么我們來(lái)對(duì)比一下兩種異步操作,我用之前的批量關(guān)注接口做測(cè)試
  • 原來(lái)的異步方式:
public void batchFollowPush(UserInfo userInfo, List<UserInfo> objectUserInfos, Integer source) {List<Boolean> batchFollowResult = new ArrayList<>();Future<Boolean> pushFuture = null;for (UserInfo objectUserInfo : objectUserInfos) {try {pushFuture = (Future<Boolean>)pushAsync.asyncFollowAndStoreMoment(userInfo, objectUserInfo, source);batchFollowResult.add(pushFuture.get());} catch (Exception e) {if (pushFuture != null) {pushFuture.cancel(true);}throw new RuntimeException(e);}}log.info("batchFollow result:{}", JSON.toJSONString(batchFollowResult));}//異步類中方法public Object asyncFollowAndStoreMoment(UserInfo userInfo, UserInfo objectUserInfo, Integer source) {// 關(guān)注交互限制Optional<InteractError> interactErrorOptional = personFacade.interactRestrict(userInfo, objectUserInfo);if (interactErrorOptional.isPresent()) {return Boolean.FALSE;}int result = wooerFacade.addOrUpdatePush2(userInfo.getMemberID(), objectUserInfo.getMemberID(), 1, source, HeadUaUtils.getPlatform());Boolean isTrue = result > 0;if (isTrue == null || !isTrue) {return Boolean.FALSE;}limitedPushFacade.followPush(userInfo, objectUserInfo);MomentListStoreParam momentListStoreParam =new MomentListStoreParam(Arrays.asList(Long.valueOf(objectUserInfo.getMemberID())),userInfo.getMemberID().longValue());log.info("batchFollow afterFilter param:{}", JSON.toJSONString(momentListStoreParam));momentManagerFacade.storeMomentMongoByMomentId(momentListStoreParam);return Boolean.TRUE;}
  • RX方式
public IResponse batchFollowRx(BatchFollowForm batchFollowForm, UserInfo myUserInfo) {log.info("batchFollow param:{}", JSON.toJSONString(batchFollowForm));BusinessAssert.isNotNull(batchFollowForm, CommonError.ARGS_EMPTY);List<Long> objectIDs = batchFollowForm.getObjectIDs();List<UserInfo> objectUserInfo = coreUserService.getList(objectIDs, UserInfo.class);Flowable.create(new FlowableOnSubscribe<UserInfo>() {@Overridepublic void subscribe(FlowableEmitter<UserInfo> emitter) throws Exception {for (UserInfo info : objectUserInfo) {emitter.onNext(info);}}}, BackpressureStrategy.ERROR).parallel().runOn(Schedulers.io()).filter(userInfo -> {Optional<InteractError> interactErrorOptional = personFacade.interactRestrict(myUserInfo, userInfo);if (interactErrorOptional.isPresent()) {return Boolean.FALSE;}Boolean isTrue =wooerFacade.addOrUpdatePush2(myUserInfo.getMemberID(), userInfo.getMemberID(),1, batchFollowForm.getSource(), HeadUaUtils.getPlatform()) > 0;if (isTrue == null || !isTrue) {return Boolean.FALSE;}return Boolean.TRUE;}).runOn(Schedulers.computation()).sequential().subscribe(new Consumer<UserInfo>() {@Overridepublic void accept(UserInfo userInfo) throws Exception {limitedPushFacade.followPush(myUserInfo, userInfo);MomentListStoreParam momentListStoreParam =new MomentListStoreParam(Arrays.asList(Long.valueOf(userInfo.getMemberID())),userInfo.getMemberID().longValue());log.info("batchFollow afterFilter param:{}", JSON.toJSONString(momentListStoreParam));momentManagerFacade.storeMomentMongoByMomentId(momentListStoreParam);}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {log.error("batch follow error exception:{}", throwable.getMessage());}});return MessageResponse.success(FOLLOW_SUCCESS);}
  • 測(cè)試批量關(guān)注接口,相同容器環(huán)境在同一個(gè)namespace上,并且相同條件的男性賬號(hào),關(guān)注同一批13個(gè)異性用戶:

    • 普通callback異步方式:

    • Rxjava方式

  • backPressure

    • 以上的例子中每次都用到了BackpressureStrategy操作符這個(gè)是Rxjava2.x后為了解決背壓?jiǎn)栴}的一個(gè)操作符,所謂背壓,即異步的情況下發(fā)射數(shù)據(jù)的速度大于消費(fèi)數(shù)據(jù)的速度帶來(lái)的問(wèn)題。
    • BackPressure現(xiàn)象說(shuō)明:Flowable無(wú)限的生產(chǎn)事件,但是SubScribe消費(fèi)的速度很慢,導(dǎo)致事件堆積,當(dāng)堆積到一定程度將會(huì)造成OOM,我們模擬一下這種情況。
public static void oomDemo(){Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {for (int i=0;;i++){System.out.println(Thread.currentThread().getName() + " onNext : "+ i);emitter.onNext(i);}}}).subscribeOn(Schedulers.io()).observeOn(Schedulers.single()).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {Thread.sleep(2000);System.out.println(Thread.currentThread().getName() + " consumer : "+ integer);}});}
  • 讓發(fā)射數(shù)據(jù)在多個(gè)線程中執(zhí)行,讓消費(fèi)數(shù)據(jù)在一個(gè)線程中執(zhí)行并且每?jī)擅氩畔M(fèi)一個(gè),這樣會(huì)導(dǎo)致發(fā)射的數(shù)據(jù)不斷的累積在內(nèi)存中,最終可能會(huì)導(dǎo)致oom,我們通過(guò)內(nèi)存信息來(lái)看他執(zhí)行之后一段時(shí)間的堆內(nèi)存信息

  • PSYoiungGen 年輕態(tài)區(qū),總共1024k,使用了511k
  • eden區(qū)域是新對(duì)象區(qū),已經(jīng)被沾滿
  • from和to區(qū)域 大學(xué)是一樣,在gc時(shí)候會(huì)遍歷from或者to區(qū)域,將不能清除的拷貝到另外一個(gè)區(qū),然后清除本區(qū)域留下的,然后循環(huán)
  • paroldGen 老年代區(qū)域也已經(jīng)被占滿
  • 這種狀態(tài)下Observable因內(nèi)存不夠已經(jīng)oom,停止運(yùn)行了,只有消費(fèi)線程在消費(fèi)數(shù)據(jù)。
io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | java.lang.OutOfMemoryError: GC overhead limit exceededat io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:69)at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
  • 我們用一個(gè)Flowable的例子來(lái)看他如何解決這個(gè)oom的問(wèn)題:
public static void oomDemoFix(){Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {for (int i=0;;i++){System.out.println(Thread.currentThread().getName() + " onNext : "+ i);emitter.onNext(i);}}}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()).observeOn(Schedulers.io()).subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription subscription) {subscription.request(50);}@Overridepublic void onNext(Integer integer) {System.out.println(Thread.currentThread().getName() + "消費(fèi)數(shù)據(jù): "+ integer);}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onComplete() {}});}
  • 我們?cè)趧?chuàng)建Flowable的時(shí)候增加了一個(gè)參數(shù) BackpressureStrategy.ERROR,這個(gè)參數(shù)指定了在處理背壓?jiǎn)栴}時(shí)候執(zhí)行的一個(gè)策略,當(dāng)內(nèi)存滿時(shí)候接下來(lái)繼續(xù)發(fā)射的數(shù)據(jù)將會(huì)拋出MissingBackpressureException 異常,其余的策略稍等介紹
  • 還有另外一個(gè)不同 onSubscribe中傳遞的不是Disposable變成了Subscription,而且還執(zhí)行了這句代碼subscription.request(50)。因?yàn)樵贔lowable中采用了一個(gè)新的思路,響應(yīng)獲取發(fā)射數(shù)據(jù)的方法來(lái)解決流量不均勻而造成的oom的問(wèn)題,也就是我要消費(fèi)多少我就取多少,這里我們從發(fā)射數(shù)據(jù)中取出了50條。其他的還是會(huì)存儲(chǔ)在內(nèi)存中。

Flowable中主要有這幾個(gè)策略

  • BackpressureStrategy.ERROR:如果緩存池(默認(rèn)128)溢出會(huì)立刻拋異常MissingBackpressureexception
  • BackpressureStrategy.BUFFER:RxJava中有一個(gè)默認(rèn)能夠存儲(chǔ)128個(gè)事件的緩存池,可以調(diào)節(jié)大小,生產(chǎn)者生產(chǎn)事件,并且將處理不了的事件緩存。(謹(jǐn)慎使用,因?yàn)橐蕾噷?duì)消費(fèi)者消費(fèi)能力,耗內(nèi)存)
  • BackpressureStrategy.DROP:消費(fèi)不了就丟棄,比如先生產(chǎn)200個(gè),并沒(méi)有消費(fèi),而是在緩存,然后消費(fèi)者request(200),但是緩存只有128個(gè),所以其他的事件都丟棄了。
  • BackpressureStrategy.LATEST:和DROP功能基本一致,處理不了丟棄,區(qū)別在于LATEST能夠讓消費(fèi)者接收到生產(chǎn)者產(chǎn)生的最后一個(gè)事件。
  • BackpressureStrategy.MISSING:生產(chǎn)的事件沒(méi)有進(jìn)行緩存和丟棄,下游接收到的事件必須進(jìn)行消費(fèi)或者處理!

感覺(jué)這些都是緩兵之計(jì),能否按照我的消費(fèi)能力來(lái)發(fā)射數(shù)據(jù)呢,這樣才完美。

  • Rxjava2.x后有一個(gè)FlowableEmitter 這個(gè)接口:
public static void fix(Flowable flowable){flowable.observeOn(Schedulers.computation()).subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription subscription) {subscription.request(20);}@Overridepublic void onNext(Integer integer) {System.out.println("消費(fèi)數(shù)據(jù): "+ 100);}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onComplete() {}});}public static Flowable flowableEmitterDemo(){Flowable flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {while (emitter.requested() > 0){System.out.println("下游處理能力:"+ emitter.requested());emitter.onNext(100);}}}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io());return flowable;}
  • 我可以在Flowable發(fā)射數(shù)據(jù)之前通過(guò)requested來(lái)獲取下游Subscriber的消費(fèi)能力,依據(jù)這個(gè)來(lái)進(jìn)行數(shù)據(jù)的發(fā)射,這樣既可以控制發(fā)射以及消費(fèi)數(shù)據(jù)的速度,也能夠避免數(shù)據(jù)的丟失

現(xiàn)在我們看下開(kāi)始時(shí)候從官網(wǎng)摘抄的Rx的幾個(gè)優(yōu)點(diǎn):

  • 首先函數(shù)式風(fēng)格,這種編程模式和常規(guī)的方式比較的確簡(jiǎn)化了不少代碼比如第一個(gè)案例,但是感覺(jué)我們用stream表達(dá)式加上lambda也可以達(dá)到這種效果,而且對(duì)于map,flatmap,filter等這些操作符對(duì)于沒(méi)有函數(shù)式編程的人來(lái)說(shuō)并不好理解不覺(jué)得這是優(yōu)勢(shì)

  • 簡(jiǎn)化代碼,這點(diǎn)主要體現(xiàn)在異步編程模式時(shí)候,不管和我們java中的異步編程用的Thread和Runnable相比,還是我們框架中的異步編程框架比較的確代碼都更加簡(jiǎn)單,只需要通過(guò)幾個(gè)異步線程切換的操作符便可以達(dá)到目的,但是缺點(diǎn)也很明顯,需要引入新的jar,新的技術(shù)對(duì)不熟悉這塊技術(shù)的同事并不友好有一定學(xué)習(xí)成本不利于維護(hù)。

  • 異步錯(cuò)誤處理,輕松使用并發(fā):通過(guò)onError捕獲異常信息,通過(guò)操作法切換線程,的確也是優(yōu)勢(shì)所在。

  • 在之前的實(shí)踐中還有這種業(yè)務(wù)模型下使用Rxjava會(huì)更具優(yōu)勢(shì),當(dāng)我們需要從多個(gè)網(wǎng)絡(luò)環(huán)境來(lái)獲取各自信息從中篩選出符合我們預(yù)期的并對(duì)其進(jìn)行組合,我們可以通過(guò)Rxjava的豐富的操作符以及異步操作來(lái)完成。來(lái)一個(gè)簡(jiǎn)單的例子

public static Flowable getIOData1(){return Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull FlowableEmitter<Integer> flowableEmitter) throws Exception {for (int i = 0; i < 10; i++) {flowableEmitter.onNext(i);}System.out.println(Thread.currentThread().getName());}}, BackpressureStrategy.DROP).subscribeOn(Schedulers.io()).filter(temp -> temp > 2);}public static Flowable getIOData2(){return Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull FlowableEmitter<Integer> flowableEmitter) throws Exception {for (int i = 10; i < 21; i++) {flowableEmitter.onNext(i);}System.out.println(Thread.currentThread().getName());}}, BackpressureStrategy.DROP).subscribeOn(Schedulers.io()).filter(temp -> temp > 12);}public static Flowable getIOData3(){return Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull FlowableEmitter<Integer> flowableEmitter) throws Exception {for (int i = 20; i < 30; i++) {flowableEmitter.onNext(i);}System.out.println(Thread.currentThread().getName());}}, BackpressureStrategy.DROP).subscribeOn(Schedulers.io()).filter(temp -> temp > 22);}public static Flowable getIOData4(){return Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull FlowableEmitter<Integer> flowableEmitter) throws Exception {for (int i = 30; i < 41; i++) {flowableEmitter.onNext(i);}System.out.println(Thread.currentThread().getName());}}, BackpressureStrategy.DROP).subscribeOn(Schedulers.io()).filter(temp -> temp > 32);}public static void mergeDemo(){Flowable.merge(getIOData1(), getIOData2(), getIOData3(), getIOData4()).map(temp -> "onNext"+ temp).flatMap(new Function() {@Overridepublic Object apply(@NonNull Object o) throws Exception {return Flowable.just(o).subscribeOn(Schedulers.io());}}).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.computation()).subscribe(new Subscriber() {@Overridepublic void onSubscribe(Subscription subscription) {subscription.request(Long.MAX_VALUE);}@Overridepublic void onNext(Object o) {System.out.println("onNext: "+ o);}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onComplete() {}});}
  • 我們定義N個(gè)Flowable用異步方式分別請(qǐng)求各個(gè)第三方接口來(lái)獲取對(duì)應(yīng)的數(shù)據(jù)并且用filter過(guò)濾出我們需要的信息,然后通過(guò)merge操作法將所有獲取到的數(shù)據(jù)組合到同一個(gè)Flowable中,進(jìn)行統(tǒng)一的封裝處理以及之后的一些業(yè)務(wù)操作。
  • 如果我們用傳統(tǒng)的方式,我們不得不定義N個(gè)變量來(lái)獲取四個(gè)異步線程數(shù)據(jù),然后等都獲取完畢之后,在分別對(duì)四個(gè)變量中保存的信息進(jìn)行篩選,之后通過(guò)邏輯操作合并到一起,和rxjava相比顯然要遜色很多。
  • 這種方式就是Flowable通過(guò)內(nèi)置操作符對(duì)自身發(fā)射的數(shù)據(jù)在空間維度上重新組織,或者和其他Flowable發(fā)射的數(shù)據(jù)一起在空間維度上進(jìn)行重新組織,是的觀察者的邏輯變得更加的簡(jiǎn)單直觀因?yàn)橹苯涌床僮鞣湍苤谰唧w做了什么,不需要關(guān)心數(shù)據(jù)從哪里來(lái)這部分由Flowable屏蔽了,從而使得觀察者更加專注于下游邏輯。

RxJava的響應(yīng)式優(yōu)勢(shì)只有在異步邏輯占主導(dǎo)時(shí)才會(huì)體現(xiàn)出來(lái).

  • wiki地址:https://github.com/ReactiveX/RxJava/wiki
  • reactivex官網(wǎng):http://reactivex.io/
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來(lái)咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)

總結(jié)

以上是生活随笔為你收集整理的Rx2.0后台开发分享的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。