日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程语言 > java >内容正文

java

一起来造一个RxJava,揭秘RxJava的实现原理

發(fā)布時(shí)間:2025/3/21 java 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 一起来造一个RxJava,揭秘RxJava的实现原理 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

RxJava是一個(gè)神奇的框架,用法很簡(jiǎn)單,但內(nèi)部實(shí)現(xiàn)有點(diǎn)復(fù)雜,代碼邏輯有點(diǎn)繞。我讀源碼時(shí),確實(shí)有點(diǎn)似懂非懂的感覺。網(wǎng)上關(guān)于RxJava源碼分析的文章,源碼貼了一大堆,代碼邏輯繞來(lái)繞去的,讓人看得云里霧里的。既然用拆輪子的方式來(lái)分析源碼比較難啃,不如換種方式,以造輪子的方式,將源碼中與性能、兼容性、擴(kuò)展性有關(guān)的代碼剔除,留下核心代碼帶大家揭秘RxJava的實(shí)現(xiàn)原理。

什么是響應(yīng)式編程

首先,我們需要明確,RxJava是Reactive Programming在Java中的一種實(shí)現(xiàn)。什么是響應(yīng)式編程??
用一個(gè)字來(lái)概括就是流(Stream)。Stream 就是一個(gè)按時(shí)間排序的 Events 序列,它可以放射三種不同的 Events:(某種類型的)Value、Error 或者一個(gè)” Completed” Signal。通過(guò)分別為 Value、Error、”Completed”定義事件處理函數(shù),我們將會(huì)異步地捕獲這些 Events。基于觀察者模式,事件流將從上往下,從訂閱源傳遞到觀察者。

至于使用Rx框架的優(yōu)點(diǎn),它可以避免回調(diào)嵌套,更優(yōu)雅地切換線程實(shí)現(xiàn)異步處理數(shù)據(jù)。配合一些操作符,可以讓處理事件流的代碼更加簡(jiǎn)潔,邏輯更加清晰。

搭建大體的框架

要造一座房子,首先要把大體的框架搭好。在RxJava里面,有兩個(gè)必不可少的角色:Subscriber(觀察者) 和 Observable(訂閱源)。

Subscriber(觀察者)

Subsribler在RxJava里面是一個(gè)抽象類,它實(shí)現(xiàn)了Observer接口。

?

public interface Observer<T> {void onCompleted();void onError(Throwable t);void onNext(T var1); }

?

為了盡可能的簡(jiǎn)單,將Subscriber簡(jiǎn)化如下:

?

public abstract class Subscriber<T> implements Observer<T> {public void onStart() {} }


Observable(訂閱源)

?

Observable(訂閱源)在RxJava里面是一個(gè)大而雜的類,擁有很多工廠方法和各式各樣的操作符。每個(gè)Observable里面有一個(gè)OnSubscribe對(duì)象,只有一個(gè)方法(void call(Subscriber<? super T> subscriber);),用來(lái)產(chǎn)生數(shù)據(jù)流,這是典型的命令模式。

public class Observable<T> {final OnSubscribe<T> onSubscribe;private Observable(OnSubscribe<T> onSubscribe) {this.onSubscribe = onSubscribe;}public static <T> Observable<T> create(OnSubscribe<T> onSubscribe) {return new Observable<T>(onSubscribe);}public void subscribe(Subscriber<? super T> subscriber) {subscriber.onStart();onSubscribe.call(subscriber);}public interface OnSubscribe<T> {void call(Subscriber<? super T> subscriber);} }

?

?

?

實(shí)踐

到此,一個(gè)小型的RxJava的雛形就出來(lái)了。不信?我們來(lái)實(shí)踐一下吧

Observable.create(new Observable.OnSubscribe<Integer>() {@Overridepublic void call(Subscriber<? super Integer> subscriber) {for (int i = 0; i < 10; i++) {subscriber.onNext(i);}}}).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {}@Overridepublic void onError(Throwable t) {}@Overridepublic void onNext(Integer var1) {System.out.println(var1);}});

?

?

?

添加操作符

其實(shí),強(qiáng)大的RxJava的核心原理并沒有想象中那么復(fù)雜和神秘,運(yùn)用的就是典型的觀察者模式。有了基本雛形之后,我們繼續(xù)為這個(gè)框架添磚加瓦吧。RxJava之所以強(qiáng)大好用,與其擁有豐富靈活的操作符是分不開的。那么我們就試著為這個(gè)框架添加一個(gè)最常用的操作符:map。

那么RxJava是如何實(shí)現(xiàn)操作符的呢?其實(shí),每調(diào)用一次操作符的方法,就相當(dāng)于在上層數(shù)據(jù)源和下層觀察者之間橋接了一個(gè)新的Observable。橋接的Observable內(nèi)部會(huì)實(shí)例化有新的OnSuscribe和Subscriber。OnSuscribe負(fù)責(zé)接受目標(biāo)Subscriber傳來(lái)的訂閱請(qǐng)求,并調(diào)用源Observable.OnSubscribe的subscribe方法。源Observable.OnSubscribe將Event往下發(fā)送給橋接Observable.Subscriber,最終橋接Observable.Subscriber將Event做相應(yīng)處理后轉(zhuǎn)發(fā)給目標(biāo)Subscriber。流程如下圖所示:

接著,我們用代碼實(shí)現(xiàn)這一過(guò)程。在Observable類里面添加如下代碼:

public <R> Observable<R> map(Transformer<? super T, ? extends R> transformer) {return create(new OnSubscribe<R>() { // 生成一個(gè)橋接的Observable和 OnSubscribe@Overridepublic void call(Subscriber<? super R> subscriber) {Observable.this.subscribe(new Subscriber<T>() { // 訂閱上層的Observable@Overridepublic void onCompleted() {subscriber.onCompleted();}@Overridepublic void onError(Throwable t) {subscriber.onError(t);}@Overridepublic void onNext(T var1) {// 將上層的onSubscribe發(fā)送過(guò)來(lái)的Event,通過(guò)轉(zhuǎn)換和處理,轉(zhuǎn)發(fā)給目標(biāo)的subscribersubscriber.onNext(transformer.call(var1));}});}});}public interface Transformer<T, R> {R call(T from);}

?

?

?

map操作符的作用是將T類型的Event轉(zhuǎn)化成R類型,轉(zhuǎn)化策略抽象成Transformer<T, R>(RxJava中用的是Func1<T, R>,但為了便于理解,起了一個(gè)有意義的名字)這一個(gè)函數(shù)接口,由外部傳入。

上面代碼中使用到一些泛型的通配符,有些地方使用了super,有些地方使用了extends,其實(shí)這是有講究的,傳給Transformer#call方法的參數(shù)是T類型的,那么call方法的參數(shù)類型可以聲明成是T的父類,Transformer#call方法的返回值要求是R類型的,那么它的返回值類型應(yīng)該聲明成R的子類。如果大家不能理解,也可以不用在意這些細(xì)節(jié)。

那么我們一起來(lái)測(cè)試一下吧。

Observable.create(new Observable.OnSubscribe<Integer>() {@Overridepublic void call(Subscriber<? super Integer> subscriber) {for (int i = 0; i < 10; i++) {subscriber.onNext(i);}}}).map(new Observable.Transformer<Integer, String>() {@Overridepublic String call(Integer from) {return "maping " + from;}}).subscribe(new Subscriber<String>() {@Overridepublic void onNext(String var1) {System.out.println(var1);}@Overridepublic void onCompleted() {}@Overridepublic void onError(Throwable t) {}});

?

?

?

但是,我們看到map()方法內(nèi)內(nèi)部類有點(diǎn)多,代碼缺少拓展性和可讀性,我們應(yīng)該進(jìn)行適當(dāng)?shù)刂貥?gòu),將主要的邏輯抽離成獨(dú)立的模塊,并保證模塊間盡量解耦,否則Observable只會(huì)越來(lái)越臃腫。

public <R> Observable<R> map(Transformer<? super T, ? extends R> transformer) {return create(new MapOnSubscribe<T, R>(this, transformer));} public class MapOnSubscribe<T, R> implements Observable.OnSubscribe<R> {final Observable<T> source;final Observable.Transformer<? super T, ? extends R> transformer;public MapOnSubscribe(Observable<T> source, Observable.Transformer<? super T, ? extends R> transformer) {this.source = source;this.transformer = transformer;}@Overridepublic void call(Subscriber<? super R> subscriber) {source.subscribe(new MapSubscriber<R, T>(subscriber, transformer));} } public class MapSubscriber<T, R> extends Subscriber<R> {final Subscriber<? super T> actual;final Observable.Transformer<? super R, ? extends T> transformer;public MapSubscriber(Subscriber<? super T> actual, Observable.Transformer<? super R, ? extends T> transformer) {this.actual = actual;this.transformer = transformer;}@Overridepublic void onCompleted() {actual.onCompleted();}@Overridepublic void onError(Throwable t) {actual.onError(t);}@Overridepublic void onNext(R var1) {actual.onNext(transformer.call(var1));} }

?

?

?

添加線程切換功能

RxJava中最激動(dòng)人心的功能是異步處理,能夠自如地切換線程。利用?subscribeOn()?結(jié)合?observeOn()?來(lái)實(shí)現(xiàn)線程控制,讓事件的產(chǎn)生和消費(fèi)發(fā)生在不同的線程。?observeOn()?可以多次調(diào)用,實(shí)現(xiàn)了線程的多次切換,最終目標(biāo)Subscriber的執(zhí)行線程與最后一次observeOn()的調(diào)用有關(guān)。但subscribeOn()?多次調(diào)用只有第一個(gè)subscribeOn()?起作用。為什么呢?因?yàn)?observeOn()?作用的是Subscriber,而subscribeOn()?作用的是OnSubscribe。

這里借用扔物線的圖:

簡(jiǎn)單地調(diào)用一個(gè)方法就可以完成線程的切換,很神奇對(duì)吧。RxJava是如何實(shí)現(xiàn)的呢?除了橋接Observable以外,RxJava還用到一個(gè)很關(guān)鍵的類—Scheduler(調(diào)度器)。文檔中給Scheduler的定義是:A Scheduler is an object that schedules units of work.,也就是進(jìn)行任務(wù)的調(diào)度的一個(gè)東西。Scheduler里面有一個(gè)重要的抽象方法:

public abstract Worker createWorker();

?

?

?

Worker是Scheduler的內(nèi)部類,它是具體任務(wù)的執(zhí)行者。當(dāng)要提交任務(wù)給Worker執(zhí)行需要調(diào)用Worker的schedule(Action0 aciton)方法。

public abstract Subscription schedule(Action0 action);

?

?

?

要獲得一個(gè)Scheduler并不需要我們?nèi)ew,一般是調(diào)用Schedulers的工廠方法。

public final class Schedulers {private final Scheduler computationScheduler;private final Scheduler ioScheduler;private final Scheduler newThreadScheduler;public static Scheduler io() {return RxJavaHooks.onIOScheduler(getInstance().ioScheduler);}public static Scheduler computation() {return RxJavaHooks.onComputationScheduler(getInstance().computationScheduler);}... }

?

?

?

具體的Scheduler的實(shí)現(xiàn)類就不帶大家一起看了,但我們需要知道,能做到線程切換的關(guān)鍵Worker的schedule方法,因?yàn)樗鼤?huì)把傳過(guò)來(lái)的任務(wù)放入線程池,或新線程中執(zhí)行,這取決于具體Scheduler的實(shí)現(xiàn)。

自定義Scheduler

那么,下面我們先來(lái)自定義一個(gè)簡(jiǎn)單的Scheduler和Worker。

public class Scheduler {final Executor executor;public Scheduler(Executor executor) {this.executor = executor;}public Worker createWorker() {return new Worker(executor);}public static class Worker {final Executor executor;public Worker(Executor executor) {this.executor = executor;}// 這里接受的是Runnable而不是Action0,其實(shí)這沒什么關(guān)系,主要是懶得自定義函數(shù)式接口了。public void schedule(Runnable runnable) {executor.execute(runnable);}} }

?

?

?

為了達(dá)到高仿效果,我們也提供相應(yīng)的工廠方法。

public class Schedulers {private static final Scheduler ioScheduler = new Scheduler(Executors.newSingleThreadExecutor());public static Scheduler io() {return ioScheduler;} }

?

?

?

實(shí)現(xiàn)subscribeOn

subscribeOn是作用于上層OnSubscribe的,可以讓OnSubscribe的call方法在新線程中執(zhí)行。

因此,在Observable類里面,添加如下代碼:

public Observable<T> subscribeOn(Scheduler scheduler) {return Observable.create(new OnSubscribe<T>() {@Overridepublic void call(Subscriber<? super T> subscriber) {subscriber.onStart();// 將事件的生產(chǎn)切換到新的線程。scheduler.createWorker().schedule(new Runnable() {@Overridepublic void run() {Observable.this.onSubscribe.call(subscriber);}});}});}

?

?

?

測(cè)試一下:

Observable.create(new Observable.OnSubscribe<Integer>() {@Overridepublic void call(Subscriber<? super Integer> subscriber) {System.out.println("OnSubscribe@ "+Thread.currentThread().getName()); //new Threadsubscriber.onNext(1);}}).subscribeOn(Schedulers.io()).subscribe(new Subscriber<Integer>() {...@Overridepublic void onNext(Integer var1) {System.out.println("Subscriber@ "+Thread.currentThread().getName()); // new ThreadSystem.out.println(var1);}});

?

?

?

實(shí)現(xiàn)observeOn

subscribeOn是作用于下層Subscriber的,需要讓下層Subscriber的事件處理方法放到新線程中執(zhí)行。

為此,在Observable類里面,添加如下代碼:

public Observable<T> observeOn(Scheduler scheduler) {return Observable.create(new OnSubscribe<T>() {@Overridepublic void call(Subscriber<? super T> subscriber) {subscriber.onStart();Scheduler.Worker worker = scheduler.createWorker();Observable.this.onSubscribe.call(new Subscriber<T>() {@Overridepublic void onCompleted() {worker.schedule(new Runnable() {@Overridepublic void run() {subscriber.onCompleted();}});}@Overridepublic void onError(Throwable t) {worker.schedule(new Runnable() {@Overridepublic void run() {subscriber.onError(t);}});}@Overridepublic void onNext(T var1) {worker.schedule(new Runnable() {@Overridepublic void run() {subscriber.onNext(var1);}});}});}});}

?

?

?

測(cè)試一下:

Observable.create(new Observable.OnSubscribe<Integer>() {@Overridepublic void call(Subscriber<? super Integer> subscriber) {System.out.println("OnSubscribe@ " + Thread.currentThread().getName()); // mainsubscriber.onNext(1);}}).observeOn(Schedulers.io()).subscribe(new Subscriber<Integer>() {...@Overridepublic void onNext(Integer var1) {System.out.println("Subscriber@ " + Thread.currentThread().getName()); // new ThreadSystem.out.println(var1);}});

?

?

?

在Android中切換線程

經(jīng)過(guò)以上實(shí)踐,我們終于知道了RxJava線程切換的核心原理了。下面我們順便來(lái)看看Android里面是如何進(jìn)行線程切換的。

首先找到AndroidSchedulers,發(fā)現(xiàn)一個(gè)Scheduler的具體實(shí)現(xiàn)類:LooperScheduler。

private AndroidSchedulers() {...mainThreadScheduler = new LooperScheduler(Looper.getMainLooper());...}/** A {@link Scheduler} which executes actions on the Android UI thread. */public static Scheduler mainThread() {return getInstance().mainThreadScheduler;}

?

?

?

LooperScheduler的代碼很清晰,內(nèi)部持有一個(gè)Handler,用于線程的切換。在Worker的schedule(Action0 action,...)方法中,將action通過(guò)Handler切換到所綁定的線程中執(zhí)行。

class LooperScheduler extends Scheduler {private final Handler handler;LooperScheduler(Looper looper) {handler = new Handler(looper);}LooperScheduler(Handler handler) {this.handler = handler;}@Overridepublic Worker createWorker() {return new HandlerWorker(handler);}static class HandlerWorker extends Worker {private final Handler handler;...@Overridepublic Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {...action = hook.onSchedule(action);ScheduledAction scheduledAction = new ScheduledAction(action, handler);Message message = Message.obtain(handler, scheduledAction);message.obj = this; // Used as token for unsubscription operation.handler.sendMessageDelayed(message, unit.toMillis(delayTime));...return scheduledAction;}@Overridepublic Subscription schedule(final Action0 action) {return schedule(action, 0, TimeUnit.MILLISECONDS);}}static final class ScheduledAction implements Runnable, Subscription {private final Action0 action;private final Handler handler;private volatile boolean unsubscribed;...@Override public void run() {try {action.call();} ...}...} }

?

?

?

結(jié)語(yǔ)

就這樣,以上用代碼演示了RxJava一些核心功能是如何實(shí)現(xiàn)的,希望能給大家?guī)?lái)不一樣的啟發(fā)。但這只是一個(gè)小小的Demo,離真正能運(yùn)用于工程的Rx框架還差太遠(yuǎn)。這也讓我們明白到,一個(gè)健壯的框架,需要考慮太多東西,比如代碼的可拓展性和可讀性,性能優(yōu)化,可測(cè)試性,兼容性,極端情況等等。但有時(shí)要想深入理解一個(gè)復(fù)雜框架的實(shí)現(xiàn)原理,就需要?jiǎng)冸x這些細(xì)節(jié)代碼,多關(guān)注主干的調(diào)用邏輯,化繁為簡(jiǎn)。

Demo代碼可到Github獲取:https://github.com/TellH/RxJavaDemo/tree/master/src/my_rxjava

參考&拓展

  • https://mp.weixin.qq.com/s?__biz=MzI1MTA1MzM2Nw==&mid=2649796857&idx=1&sn=ed8325aeddac7fd2bd81a0717c010e98&scene=1&srcid=0817o3Xzkx4ILR6FKaR1M9LX#rd
  • https://gank.io/post/560e15be2dca930e00da1083
  • https://zhuanlan.zhihu.com/p/22338235

總結(jié)

以上是生活随笔為你收集整理的一起来造一个RxJava,揭秘RxJava的实现原理的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。