简述JAVA线程调度的原理,Rxjava原理(二)--线程调度
1. 創(chuàng)建線程池和線程管理策略分析
// 在開發(fā)中使用Rxjava來完成線程切換會(huì)調(diào)用到以下方法(還有幾個(gè)就不一一列舉了,原理一樣的),那么就從這里開始分析
Schedulers.io()
Schedulers.computation()
Schedulers.newThread()
AndroidSchedulers.mainThread()
當(dāng)我們調(diào)用以上方法中的任意一個(gè),都會(huì)調(diào)到Schedulers類中,Schedulers使用策略模式封裝了所有線程切換策略(因此后面以io()分析)。
// 1. Schedulers類中,靜態(tài)創(chuàng)建IOTask(),當(dāng)調(diào)用Schedulers.io()的時(shí)候,就是返回這個(gè)Callable.
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
// 2.創(chuàng)建IoScheduler
static final class IOTask implements Callable {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
// 3.創(chuàng)建線程池
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference(NONE);
start();
}
public void start() {
// CachedWorkerPool任務(wù)池,里面持有任務(wù)隊(duì)列和線程池
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}
//4. CachedWorkerPool構(gòu)造方法中創(chuàng)建線程池,并且暴露get()提供需要執(zhí)行的任務(wù)
static final class CachedWorkerPool implements Runnable {
private final long keepAliveTime;
private final ConcurrentLinkedQueue expiringWorkerQueue;
final CompositeDisposable allWorkers;
private final ScheduledExecutorService evictorService;
private final Future> evictorTask;
private final ThreadFactory threadFactory;
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
......
if (unit != null) {
// 創(chuàng)建線程池
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
}
......
}
ThreadWorker get() {
.....
while (!expiringWorkerQueue.isEmpty()) {
// 任務(wù)隊(duì)列不為空,從隊(duì)列中取一個(gè)并返回
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
// 如果任務(wù)隊(duì)列是空的,就創(chuàng)建一個(gè)并返回
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
......
}
用一張圖可能說明得比較清楚一些。
Schedulers調(diào)度過程.png
2. Rxjava上游任務(wù)在子線程中執(zhí)行分析
// 上游線程切換使用過程
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
e.onNext("JackOu");
}
})
// ObservableCreate.subscribeOn
.subscribeOn(Schedulers.io())
// ObservableSubscribeOn.subscribe
.subscribe(new Observer() {
......
@Override
public void onNext(String s) {
}
......
});
從上面使用過程的代碼看下面的圖,分析Rxjava封裝任務(wù)和拋任務(wù)到線程池的過程。
上游任務(wù)在線程池執(zhí)行流程圖.png
當(dāng)我們一訂閱(調(diào)用subscribe(Observer)方法)的時(shí)候,Rxjava將會(huì)把上游需要執(zhí)行的任務(wù)和下游的觀察者經(jīng)過層層包裹,包裹好之后,就會(huì)得到一個(gè)Scheduler.Worker任務(wù)對(duì)象。當(dāng)調(diào)用發(fā)射器的onNext的方式的時(shí)候,結(jié)合第一小節(jié)的圖片,ObservableSubscribeOn就會(huì)將任務(wù)拋到線程池執(zhí)行,在子線程中執(zhí)行任務(wù)并且返回,從而完成線程切換功能。
3. Rxjava下游任務(wù)在主線程中執(zhí)行分析
3.1 創(chuàng)建AndroidSchedulers.mainThread的過程
如第一節(jié)Schedulers的創(chuàng)建流程一樣,當(dāng)調(diào)用AndroidSchedulers.mainThread()之后,最終會(huì)創(chuàng)建HandlerScheduler。
// 1.創(chuàng)建HandlerScheduler,并且傳入MainLooper
public final class AndroidSchedulers {
private static final class MainHolder {
// 創(chuàng)建HandlerScheduler
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
}
// 2.當(dāng)創(chuàng)建任務(wù)的時(shí)候,創(chuàng)建HandlerWorker
final class HandlerScheduler extends Scheduler {
private final Handler handler;
HandlerScheduler(Handler handler) {
this.handler = handler;
}
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
}
// 3.當(dāng)執(zhí)行任務(wù)的時(shí)候
private static final class HandlerWorker extends Worker {
private final Handler handler;
HandlerWorker(Handler handler) {
this.handler = handler;
}
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
......
// 包裝任務(wù)
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
// 創(chuàng)建Message包裝任務(wù)
Message message = Message.obtain(handler, scheduled);
message.obj = this;
// 發(fā)送任務(wù)到MainLooper中,該任務(wù)就在主線程中執(zhí)行了
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
......
return scheduled;
}
}
其實(shí)真正將任務(wù)放在主線程中執(zhí)行就是上面三個(gè)步驟,但是Rxjava增加了很多其他功能,例如解除訂閱(將任務(wù)包裝在Disposable中),增加hook功能(在任務(wù)外面在包裝了ScheduledRunnable)等等,其最內(nèi)層的本質(zhì)就是我們需要執(zhí)行的任務(wù)。細(xì)化的包裹情況如下圖:
主線程執(zhí)行任務(wù).png
4.多個(gè)線程切換,以哪個(gè)為準(zhǔn)
如下面代碼,我們作死得切換線程,那么哪些線程會(huì)最終執(zhí)行我們的任務(wù)呢
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
e.onNext("JackOu");
}
})
.subscribeOn(Schedulers.io()) // 上游切換,靠近上游的生效
.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread()) // 下游切換,靠近下游的生效
.subscribe(new Observer() {
......
@Override
public void onNext(String s) {
}
......
});
我們可以從第二節(jié)和第三節(jié)看出,當(dāng)我們每調(diào)用一次subscribeOn方法,上游就會(huì)多包裝一層Scheduler,在訂閱之后,解包裹的時(shí)候越靠近“待執(zhí)行任務(wù)”的subscribeOn越后解包,所以最靠近任務(wù)的subscribeOn調(diào)用會(huì)是最終被執(zhí)行,也就是最終被執(zhí)行的線程。
因此我們可以總結(jié)得到:
總結(jié)一: 在多次調(diào)用線程切換的時(shí)候,第一次調(diào)用subscribeOn的線程切換會(huì)是最后執(zhí)行任務(wù)的線程;最后調(diào)用observeOn切換的線程會(huì)是最后執(zhí)行的線程。
總結(jié)二:從調(diào)用關(guān)系來看,越靠近上游的線程切換,將是最終執(zhí)行任務(wù)的線程;越靠近下游的線程切換,將是最終執(zhí)行任務(wù)的線程。
總結(jié)
以上是生活随笔為你收集整理的简述JAVA线程调度的原理,Rxjava原理(二)--线程调度的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 寒假万恶之源3:抓老鼠啊~亏了还是赚了?
- 下一篇: plsql 查看表空间使用情况