日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

简述JAVA线程调度的原理,Rxjava原理(二)--线程调度

發(fā)布時(shí)間:2023/12/9 编程问答 44 豆豆
生活随笔 收集整理的這篇文章主要介紹了 简述JAVA线程调度的原理,Rxjava原理(二)--线程调度 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

1. 創(chuàng)建線程池和線程管理策略分析

// 在開發(fā)中使用Rxjava來完成線程切換會(huì)調(diào)用到以下方法(還有幾個(gè)就不一一列舉了,原理一樣的),那么就從這里開始分析

Schedulers.io()

Schedulers.computation()

Schedulers.newThread()

AndroidSchedulers.mainThread()

當(dāng)我們調(diào)用以上方法中的任意一個(gè),都會(huì)調(diào)到Schedulers類中,Schedulers使用策略模式封裝了所有線程切換策略(因此后面以io()分析)。

// 1. Schedulers類中,靜態(tài)創(chuàng)建IOTask(),當(dāng)調(diào)用Schedulers.io()的時(shí)候,就是返回這個(gè)Callable.

static {

SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());

COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());

IO = RxJavaPlugins.initIoScheduler(new IOTask());

TRAMPOLINE = TrampolineScheduler.instance();

NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());

}

// 2.創(chuàng)建IoScheduler

static final class IOTask implements Callable {

@Override

public Scheduler call() throws Exception {

return IoHolder.DEFAULT;

}

}

static final class IoHolder {

static final Scheduler DEFAULT = new IoScheduler();

}

// 3.創(chuàng)建線程池

public IoScheduler(ThreadFactory threadFactory) {

this.threadFactory = threadFactory;

this.pool = new AtomicReference(NONE);

start();

}

public void start() {

// CachedWorkerPool任務(wù)池,里面持有任務(wù)隊(duì)列和線程池

CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);

if (!pool.compareAndSet(NONE, update)) {

update.shutdown();

}

}

//4. CachedWorkerPool構(gòu)造方法中創(chuàng)建線程池,并且暴露get()提供需要執(zhí)行的任務(wù)

static final class CachedWorkerPool implements Runnable {

private final long keepAliveTime;

private final ConcurrentLinkedQueue expiringWorkerQueue;

final CompositeDisposable allWorkers;

private final ScheduledExecutorService evictorService;

private final Future> evictorTask;

private final ThreadFactory threadFactory;

CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {

......

if (unit != null) {

// 創(chuàng)建線程池

evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);

task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);

}

......

}

ThreadWorker get() {

.....

while (!expiringWorkerQueue.isEmpty()) {

// 任務(wù)隊(duì)列不為空,從隊(duì)列中取一個(gè)并返回

ThreadWorker threadWorker = expiringWorkerQueue.poll();

if (threadWorker != null) {

return threadWorker;

}

}

// 如果任務(wù)隊(duì)列是空的,就創(chuàng)建一個(gè)并返回

ThreadWorker w = new ThreadWorker(threadFactory);

allWorkers.add(w);

return w;

}

......

}

用一張圖可能說明得比較清楚一些。

Schedulers調(diào)度過程.png

2. Rxjava上游任務(wù)在子線程中執(zhí)行分析

// 上游線程切換使用過程

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter e) throws Exception {

e.onNext("JackOu");

}

})

// ObservableCreate.subscribeOn

.subscribeOn(Schedulers.io())

// ObservableSubscribeOn.subscribe

.subscribe(new Observer() {

......

@Override

public void onNext(String s) {

}

......

});

從上面使用過程的代碼看下面的圖,分析Rxjava封裝任務(wù)和拋任務(wù)到線程池的過程。

上游任務(wù)在線程池執(zhí)行流程圖.png

當(dāng)我們一訂閱(調(diào)用subscribe(Observer)方法)的時(shí)候,Rxjava將會(huì)把上游需要執(zhí)行的任務(wù)和下游的觀察者經(jīng)過層層包裹,包裹好之后,就會(huì)得到一個(gè)Scheduler.Worker任務(wù)對(duì)象。當(dāng)調(diào)用發(fā)射器的onNext的方式的時(shí)候,結(jié)合第一小節(jié)的圖片,ObservableSubscribeOn就會(huì)將任務(wù)拋到線程池執(zhí)行,在子線程中執(zhí)行任務(wù)并且返回,從而完成線程切換功能。

3. Rxjava下游任務(wù)在主線程中執(zhí)行分析

3.1 創(chuàng)建AndroidSchedulers.mainThread的過程

如第一節(jié)Schedulers的創(chuàng)建流程一樣,當(dāng)調(diào)用AndroidSchedulers.mainThread()之后,最終會(huì)創(chuàng)建HandlerScheduler。

// 1.創(chuàng)建HandlerScheduler,并且傳入MainLooper

public final class AndroidSchedulers {

private static final class MainHolder {

// 創(chuàng)建HandlerScheduler

static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));

}

private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(

new Callable() {

@Override public Scheduler call() throws Exception {

return MainHolder.DEFAULT;

}

});

public static Scheduler mainThread() {

return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);

}

}

// 2.當(dāng)創(chuàng)建任務(wù)的時(shí)候,創(chuàng)建HandlerWorker

final class HandlerScheduler extends Scheduler {

private final Handler handler;

HandlerScheduler(Handler handler) {

this.handler = handler;

}

@Override

public Worker createWorker() {

return new HandlerWorker(handler);

}

}

// 3.當(dāng)執(zhí)行任務(wù)的時(shí)候

private static final class HandlerWorker extends Worker {

private final Handler handler;

HandlerWorker(Handler handler) {

this.handler = handler;

}

@Override

public Disposable schedule(Runnable run, long delay, TimeUnit unit) {

......

// 包裝任務(wù)

run = RxJavaPlugins.onSchedule(run);

ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

// 創(chuàng)建Message包裝任務(wù)

Message message = Message.obtain(handler, scheduled);

message.obj = this;

// 發(fā)送任務(wù)到MainLooper中,該任務(wù)就在主線程中執(zhí)行了

handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

......

return scheduled;

}

}

其實(shí)真正將任務(wù)放在主線程中執(zhí)行就是上面三個(gè)步驟,但是Rxjava增加了很多其他功能,例如解除訂閱(將任務(wù)包裝在Disposable中),增加hook功能(在任務(wù)外面在包裝了ScheduledRunnable)等等,其最內(nèi)層的本質(zhì)就是我們需要執(zhí)行的任務(wù)。細(xì)化的包裹情況如下圖:

主線程執(zhí)行任務(wù).png

4.多個(gè)線程切換,以哪個(gè)為準(zhǔn)

如下面代碼,我們作死得切換線程,那么哪些線程會(huì)最終執(zhí)行我們的任務(wù)呢

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter e) throws Exception {

e.onNext("JackOu");

}

})

.subscribeOn(Schedulers.io()) // 上游切換,靠近上游的生效

.subscribeOn(Schedulers.newThread())

.subscribeOn(Schedulers.computation())

.observeOn(Schedulers.io())

.observeOn(Schedulers.computation())

.observeOn(AndroidSchedulers.mainThread()) // 下游切換,靠近下游的生效

.subscribe(new Observer() {

......

@Override

public void onNext(String s) {

}

......

});

我們可以從第二節(jié)和第三節(jié)看出,當(dāng)我們每調(diào)用一次subscribeOn方法,上游就會(huì)多包裝一層Scheduler,在訂閱之后,解包裹的時(shí)候越靠近“待執(zhí)行任務(wù)”的subscribeOn越后解包,所以最靠近任務(wù)的subscribeOn調(diào)用會(huì)是最終被執(zhí)行,也就是最終被執(zhí)行的線程。

因此我們可以總結(jié)得到:

總結(jié)一: 在多次調(diào)用線程切換的時(shí)候,第一次調(diào)用subscribeOn的線程切換會(huì)是最后執(zhí)行任務(wù)的線程;最后調(diào)用observeOn切換的線程會(huì)是最后執(zhí)行的線程。

總結(jié)二:從調(diào)用關(guān)系來看,越靠近上游的線程切換,將是最終執(zhí)行任務(wù)的線程;越靠近下游的線程切換,將是最終執(zhí)行任務(wù)的線程。

總結(jié)

以上是生活随笔為你收集整理的简述JAVA线程调度的原理,Rxjava原理(二)--线程调度的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。