日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程语言 > java >内容正文

java

响应式编程RxJava (一)

發(fā)布時(shí)間:2025/3/18 java 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 响应式编程RxJava (一) 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

1.什么是RxJava? 1.1什么是響應(yīng)式編程? 是一種基于異步數(shù)據(jù)流概念的編程模式(異步數(shù)據(jù)流編程) 數(shù)據(jù)流 ->河流(被觀測(cè)、被過(guò)濾、被操作)

1.2響應(yīng)式編程的設(shè)計(jì)原則是: 保持?jǐn)?shù)據(jù)的不變性 沒(méi)有共享 阻塞是有害的

1.3在我們的Java里面提供了解決方案 - RxJava? RxJava:Reactive Extensions Java(Java響應(yīng)式編程) 響應(yīng)式編程最初誕生.Net里面 iOS開(kāi)發(fā)中也有響應(yīng)式編程(block)

// 傳統(tǒng)寫(xiě)法:加載文件 // new Thread() { // @Override // public void run() { // super.run(); // for (File folder : folders) { // File[] files = folder.listFiles(); // for (File file : files) { // if (file.getName().endsWith(".png")) { // final Bitmap bitmap = getBitmapFromFile(file); // // 更新UI線程 // runOnUiThread(new Runnable() { // @Override // public void run() { // imageCollectorView.addImage(bitmap); // } // }); // } // } // } // } // }.start(); 復(fù)制代碼

RxJava寫(xiě)法

File[] folders = new File[10];Observable.from(folders)//便利.flatMap(new Func1<File, Observable<File>>() {@Overridepublic Observable<File> call(File file) {return Observable.from(file.listFiles());}})//過(guò)濾.filter(new Func1<File, Boolean>() {@Overridepublic Boolean call(File file) {//條件return file.getName().endsWith(".png");}})//加載圖片.map(new Func1<File, Bitmap>() {@Overridepublic Bitmap call(File file) {return getBitmapFromFile(file);}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())//更新UI.subscribe(new Action1<Bitmap>() {@Overridepublic void call(Bitmap bitmap) {imageCollectorView.addImage(bitmap);}}); 復(fù)制代碼

文件數(shù)組 flatMap:相當(dāng)于我們手動(dòng)的起嵌套循環(huán) 隊(duì)列數(shù)據(jù)結(jié)構(gòu) 你會(huì)發(fā)現(xiàn)以下這個(gè)簡(jiǎn)單的案例有哪些優(yōu)勢(shì) 第一點(diǎn):你不需要考慮線程問(wèn)題 第二點(diǎn):你不要關(guān)心如何更新UI線程,如何調(diào)用

2.RxJava整體架構(gòu)設(shè)計(jì)?

整體架構(gòu)設(shè)計(jì) -> 主要觀察者模式同時(shí)里面還采用其他的設(shè)計(jì)模式 代理模式、迭代器模式、Builder設(shè)計(jì)模式(構(gòu)建者模式)整體RxJava框架,角色劃分:Observable :被觀察者Observer : 觀察者Subscrible : 訂閱Subjects : 科目Observable 和 Subjects 是兩個(gè)“生產(chǎn)“實(shí)體,Observer和Subscrible是兩個(gè)“消費(fèi)”實(shí)體熱Observables 和冷Observables從發(fā)射物的角度來(lái)看,有兩種不同的Observables:熱的和冷的。一個(gè)“熱”的Observable典型的只要一創(chuàng)建完就開(kāi)始發(fā)射數(shù)據(jù)。因此所有后續(xù)訂閱它的觀察者可能從序列中間得某個(gè)位置開(kāi)始接收數(shù)據(jù)(有一些數(shù)據(jù)錯(cuò)過(guò)了)。一個(gè)“冷”的Observable會(huì)一直等待,知道由觀察者訂閱它才開(kāi)始發(fā)射數(shù)據(jù),因此這個(gè)觀察者可以確保會(huì)收到整個(gè)數(shù)據(jù)序列。熱和冷熱:主動(dòng)場(chǎng)景:容器中目前只有一個(gè)觀察者,向所有的觀察者發(fā)送3條數(shù)據(jù),因?yàn)闊酧bservables一旦創(chuàng)建就立馬發(fā)送消息,假設(shè)我現(xiàn)在發(fā)送到了第二條數(shù)據(jù),突然之后增加了一個(gè)觀察者,這個(gè)時(shí)候,第二個(gè)觀察者就收不到之前的消息。 冷:被動(dòng)場(chǎng)景:容器中目前只有1個(gè)觀察者,因?yàn)槔銸bservables一旦創(chuàng)建就會(huì)等待觀察者訂閱,一定有觀察者訂閱了,我立馬將所有的消息發(fā)送給這個(gè)觀察者(訂閱人) 復(fù)制代碼

3.RxJava基本API? 第一個(gè)案例:如何創(chuàng)建Observables?

subscribe 相關(guān)源碼:

public final Subscription subscribe(final Observer<? super T> observer) {if (observer instanceof Subscriber) {return subscribe((Subscriber<? super T>)observer);}if (observer == null) {throw new NullPointerException("observer is null");}return subscribe(new ObserverSubscriber<T>(observer));}static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {// validate and proceedif (subscriber == null) {throw new IllegalArgumentException("subscriber can not be null");}if (observable.onSubscribe == null) {throw new IllegalStateException("onSubscribe function can not be null.");/** the subscribe function can also be overridden but generally that's not the appropriate approach* so I won't mention that in the exception*/}// new Subscriber so onStart itsubscriber.onStart();/** See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls* to user code from within an Observer"*/// if not already wrappedif (!(subscriber instanceof SafeSubscriber)) {// assign to `observer` so we return the protected versionsubscriber = new SafeSubscriber<T>(subscriber);}// The code below is exactly the same an unsafeSubscribe but not used because it would// add a significant depth to already huge call stacks.try {// allow the hook to intercept and/or decorateRxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);return RxJavaHooks.onObservableReturn(subscriber);} catch (Throwable e) {// special handling for certain Throwable/Error/Exception typesExceptions.throwIfFatal(e);// in case the subscriber can't listen to exceptions anymoreif (subscriber.isUnsubscribed()) {RxJavaHooks.onError(RxJavaHooks.onObservableError(e));} else {// if an unhandled error occurs executing the onSubscribe we will propagate ittry {subscriber.onError(RxJavaHooks.onObservableError(e));} catch (Throwable e2) {Exceptions.throwIfFatal(e2);// if this happens it means the onError itself failed (perhaps an invalid function implementation)// so we are unable to propagate the error correctly and will just throwRuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);// TODO could the hook be the cause of the error in the on error handling.RxJavaHooks.onObservableError(r);// TODO why aren't we throwing the hook's return value.throw r; // NOPMD}}return Subscriptions.unsubscribed();}}public class SafeSubscriber<T> extends Subscriber<T> {private final Subscriber<? super T> actual;boolean done;public SafeSubscriber(Subscriber<? super T> actual) {super(actual);this.actual = actual;}/*** Notifies the Subscriber that the {@code Observable} has finished sending push-based notifications.* <p>* The {@code Observable} will not call this method if it calls {@link #onError}.*/@Overridepublic void onCompleted() {if (!done) {done = true;try {actual.onCompleted();} catch (Throwable e) {// we handle here instead of another method so we don't add stacks to the frame// which can prevent it from being able to handle StackOverflowExceptions.throwIfFatal(e);RxJavaHooks.onError(e);throw new OnCompletedFailedException(e.getMessage(), e);} finally { // NOPMDtry {// Similarly to onError if failure occurs in unsubscribe then Rx contract is broken// and we throw an UnsubscribeFailureException.unsubscribe();} catch (Throwable e) {RxJavaHooks.onError(e);throw new UnsubscribeFailedException(e.getMessage(), e);}}}}/*** Notifies the Subscriber that the {@code Observable} has experienced an error condition.* <p>* If the {@code Observable} calls this method, it will not thereafter call {@link #onNext} or* {@link #onCompleted}.** @param e* the exception encountered by the Observable*/@Overridepublic void onError(Throwable e) {// we handle here instead of another method so we don't add stacks to the frame// which can prevent it from being able to handle StackOverflowExceptions.throwIfFatal(e);if (!done) {done = true;_onError(e);}}/*** Provides the Subscriber with a new item to observe.* <p>* The {@code Observable} may call this method 0 or more times.* <p>* The {@code Observable} will not call this method again after it calls either {@link #onCompleted} or* {@link #onError}.** @param t* the item emitted by the Observable*/@Overridepublic void onNext(T t) {try {if (!done) {actual.onNext(t);}} catch (Throwable e) {// we handle here instead of another method so we don't add stacks to the frame// which can prevent it from being able to handle StackOverflowExceptions.throwOrReport(e, this);}}/*** The logic for {@code onError} without the {@code isFinished} check so it can be called from within* {@code onCompleted}.** @see <a href="https://github.com/ReactiveX/RxJava/issues/630">the report of this bug</a>*/@SuppressWarnings("deprecation")protected void _onError(Throwable e) { // NOPMDRxJavaPlugins.getInstance().getErrorHandler().handleError(e);try {actual.onError(e);} catch (OnErrorNotImplementedException e2) { // NOPMD/** onError isn't implemented so throw** https://github.com/ReactiveX/RxJava/issues/198** Rx Design Guidelines 5.2** "when calling the Subscribe method that only has an onNext argument, the OnError behavior* will be to rethrow the exception on the thread that the message comes out from the observable* sequence. The OnCompleted behavior in this case is to do nothing."*/try {unsubscribe();} catch (Throwable unsubscribeException) {RxJavaHooks.onError(unsubscribeException);throw new OnErrorNotImplementedException("Observer.onError not implemented and error while unsubscribing.", new CompositeException(Arrays.asList(e, unsubscribeException))); // NOPMD}throw e2;} catch (Throwable e2) {/** throw since the Rx contract is broken if onError failed** https://github.com/ReactiveX/RxJava/issues/198*/RxJavaHooks.onError(e2);try {unsubscribe();} catch (Throwable unsubscribeException) {RxJavaHooks.onError(unsubscribeException);throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError and during unsubscription.", new CompositeException(Arrays.asList(e, e2, unsubscribeException)));}throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2)));}// if we did not throw above we will unsubscribe here, if onError failed then unsubscribe happens in the catchtry {unsubscribe();} catch (Throwable unsubscribeException) {RxJavaHooks.onError(unsubscribeException);throw new OnErrorFailedException(unsubscribeException);}}/*** Returns the {@link Subscriber} underlying this {@code SafeSubscriber}.** @return the {@link Subscriber} that was used to create this {@code SafeSubscriber}*/public Subscriber<? super T> getActual() {return actual;} }復(fù)制代碼

subscriber 實(shí)際上就是Observer

RxJava基本使用 源碼分析 Observable創(chuàng)建原理分析: 第一步:調(diào)用Observable.create()方法 第二步:添加觀察者訂閱監(jiān)聽(tīng)Observable.OnSubscrible 第三步:在Observable.create方法中創(chuàng)建被觀察者new Observable(hook.onCreate(f)); 第四步:在Observable類構(gòu)造方法中保存了觀察者訂閱監(jiān)聽(tīng)

訂閱觀察者原理分析: 第一步:注冊(cè)觀察者監(jiān)聽(tīng)observable.subscribe(new Observer()) 第二步:在Observable類中調(diào)用了 public final Subscription subscribe(final Observer<? super T> observer) { if (observer instanceof Subscriber) { return subscribe((Subscriber<? super T>)observer); } if (observer == null) { throw new NullPointerException("observer is null"); } return subscribe(new ObserverSubscriber(observer)); } 方法中注冊(cè)觀察者 第三步:在Observable類中調(diào)用了 public final Subscription subscribe(Subscriber<? super T> subscriber) { return Observable.subscribe(subscriber, this); }方法 第四步:調(diào)用了Observable.subscribe(subscriber, this);方法 第五步:在 Observable.subscribe方法中調(diào)用了監(jiān)聽(tīng)觀察者訂閱的回調(diào)接口 RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);

private Observable<String> observableString;@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_simple2);// 創(chuàng)建一個(gè)被觀察者// 配置回調(diào)接口---OnSubscribe// 為什么要配置?// 監(jiān)聽(tīng)觀察者訂閱,一旦有觀察者訂閱了,立馬回調(diào)改接口observableString = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> observer) {Log.i("main", "回到了");//訪問(wèn)請(qǐng)求// 所以在這個(gè)方法里面我們可以干一些事情// 進(jìn)行數(shù)據(jù)通信(說(shuō)白了就是通知觀察者)for (int i = 0; i < 5; i++) {observer.onNext("第" + i + "個(gè)數(shù)據(jù)");}//訪問(wèn)完成// 當(dāng)我們的數(shù)據(jù)傳遞完成observer.onCompleted();}});}public void click(View v) {// 觀察者訂閱// 回調(diào)原理:// 核心代碼:// hook.onSubscribeStart(observable,// observable.onSubscribe).call(subscriber);observableString.subscribe(new Observer<String>() {@Overridepublic void onCompleted() {Log.i("main", "---onCompleted---");}@Overridepublic void onError(Throwable e) {System.out.println("Oh,no! Something wrong happened!");}@Overridepublic void onNext(String item) {// 接受數(shù)據(jù)Log.i("main", "觀察者接收到了數(shù)據(jù): " + item);}});}結(jié)果輸出 08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: 回到了 08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: 觀察者接收到了數(shù)據(jù): 第0個(gè)數(shù)據(jù) 08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: 觀察者接收到了數(shù)據(jù): 第1個(gè)數(shù)據(jù) 08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: 觀察者接收到了數(shù)據(jù): 第2個(gè)數(shù)據(jù) 08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: 觀察者接收到了數(shù)據(jù): 第3個(gè)數(shù)據(jù) 08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: 觀察者接收到了數(shù)據(jù): 第4個(gè)數(shù)據(jù) 08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: ---onCompleted--- 復(fù)制代碼

observableString.subscribe 中 RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); 調(diào)用call方法

另一種方式自動(dòng)發(fā)送

private Observable<String> observableString;@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_simple2);List<String> items = new ArrayList<String>();items.add("Kpioneer");items.add("Xpioneer");items.add("haocai");items.add("Huhu");// 框架本身提供了這樣的API// from: 一旦當(dāng)你有觀察者注冊(cè),立馬發(fā)送消息序列// 框架內(nèi)部實(shí)現(xiàn)// 框架內(nèi)部調(diào)用create方法// 迭代器模式// OnSubscribeFromIterable類專門(mén)用于遍歷集合// OnSubscribeFromArray類專門(mén)用于遍歷數(shù)組observableString = Observable.from(items);}public void click(View v) {observableString.subscribe(new Observer<String>() {@Overridepublic void onCompleted() {Log.i("main", "---onCompleted---");}@Overridepublic void onError(Throwable e) {System.out.println("Oh,no! Something wrong happened!");}@Overridepublic void onNext(String item) {// 接受數(shù)據(jù)Log.i("main", "觀察者接收到了數(shù)據(jù): " + item);}});}結(jié)果輸出08-02 14:38:14.517 32289-32289/com.haocai.architect.rxjava I/main: 觀察者接收到了數(shù)據(jù): Kpioneer 08-02 14:38:14.517 32289-32289/com.haocai.architect.rxjava I/main: 觀察者接收到了數(shù)據(jù): Xpioneer 08-02 14:38:14.517 32289-32289/com.haocai.architect.rxjava I/main: 觀察者接收到了數(shù)據(jù): haocai 08-02 14:38:14.517 32289-32289/com.haocai.architect.rxjava I/main: 觀察者接收到了數(shù)據(jù): Huhu 08-02 14:38:14.517 32289-32289/com.haocai.architect.rxjava I/main: ---onCompleted--- 復(fù)制代碼/*** Copyright 2014 Netflix, Inc.** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/ package rx.internal.operators;import java.util.Iterator; import java.util.concurrent.atomic.AtomicLong;import rx.*; import rx.Observable.OnSubscribe; import rx.exceptions.Exceptions;/*** Converts an {@code Iterable} sequence into an {@code Observable}.* <p>* ![](http://upload-images.jianshu.io/upload_images/1824809-fa9342290145e00e.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)* <p>* You can convert any object that supports the Iterable interface into an Observable that emits each item in* the object, with the {@code toObservable} operation.* @param <T> the value type of the items*/ public final class OnSubscribeFromIterable<T> implements OnSubscribe<T> {final Iterable<? extends T> is;public OnSubscribeFromIterable(Iterable<? extends T> iterable) {if (iterable == null) {throw new NullPointerException("iterable must not be null");}this.is = iterable;}@Overridepublic void call(final Subscriber<? super T> o) {Iterator<? extends T> it;boolean b;try {it = is.iterator();b = it.hasNext();} catch (Throwable ex) {Exceptions.throwOrReport(ex, o);return;}if (!o.isUnsubscribed()) {if (!b) {o.onCompleted();} else {o.setProducer(new IterableProducer<T>(o, it));}}}static final class IterableProducer<T> extends AtomicLong implements Producer {/** */private static final long serialVersionUID = -8730475647105475802L;// 具體的觀察者private final Subscriber<? super T> o;// 具體的數(shù)據(jù)private final Iterator<? extends T> it;IterableProducer(Subscriber<? super T> o, Iterator<? extends T> it) {this.o = o;this.it = it;}@Overridepublic void request(long n) {if (get() == Long.MAX_VALUE) {// already started with fast-pathreturn;}if (n == Long.MAX_VALUE && compareAndSet(0, Long.MAX_VALUE)) {fastPath();} elseif (n > 0 && BackpressureUtils.getAndAddRequest(this, n) == 0L) {slowPath(n);}}void slowPath(long n) {// backpressure is requestedfinal Subscriber<? super T> o = this.o;final Iterator<? extends T> it = this.it;long r = n;long e = 0;for (;;) {while (e != r) {if (o.isUnsubscribed()) {return;}T value;try {value = it.next();} catch (Throwable ex) {Exceptions.throwOrReport(ex, o);return;}o.onNext(value);if (o.isUnsubscribed()) {return;}boolean b;try {b = it.hasNext();} catch (Throwable ex) {Exceptions.throwOrReport(ex, o);return;}if (!b) {if (!o.isUnsubscribed()) {o.onCompleted();}return;}e++;}r = get();if (e == r) {r = BackpressureUtils.produced(this, e);if (r == 0L) {break;}e = 0L;}}}void fastPath() {// fast-path without backpressurefinal Subscriber<? super T> o = this.o;final Iterator<? extends T> it = this.it;for (;;) {if (o.isUnsubscribed()) {return;}T value;try {value = it.next();} catch (Throwable ex) {Exceptions.throwOrReport(ex, o);return;}o.onNext(value);if (o.isUnsubscribed()) {return;}boolean b;try {b = it.hasNext();} catch (Throwable ex) {Exceptions.throwOrReport(ex, o);return;}if (!b) {if (!o.isUnsubscribed()) {o.onCompleted();}return;}}}}} 復(fù)制代碼

總結(jié)

以上是生活随笔為你收集整理的响应式编程RxJava (一)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。