从串行线程封闭到对象池、线程池
今天講一個(gè)牛逼而實(shí)用的概念,串行線程封閉。對(duì)象池是串行線程封閉的典型應(yīng)用場(chǎng)景;線程池糅合了對(duì)象池技術(shù),但核心實(shí)現(xiàn)不依賴于對(duì)象池,很容易產(chǎn)生誤會(huì)。
本文從串行線程封閉和對(duì)象池入手,最后通過(guò)源碼分析線程池的核心原理,厘清對(duì)象池與線程池之間的誤會(huì)。
線程封閉與串行線程封閉
線程封閉
線程封閉是一種常見的線程安全設(shè)計(jì)策略:僅在固定的一個(gè)線程內(nèi)訪問對(duì)象,不對(duì)其他線程共享。
使用線程封閉技術(shù),對(duì)象O始終只對(duì)一個(gè)線程T1可見,“單線程”中自然不存在線程安全的問題。
ThreadLocal是常用的線程安全工具。線程封閉在Servlet及高層的web框架Spring等中應(yīng)用不少。
https://monkeysayhi.github.io/2016/11/27/源碼%7CThreadLocal的實(shí)現(xiàn)原理/
串行線程封閉
線程封閉雖然好用,卻限制了對(duì)象的共享。串行線程封閉改進(jìn)了這一點(diǎn):對(duì)象O只能由單個(gè)線程T1擁有,但可以通過(guò)安全的發(fā)布對(duì)象O來(lái)轉(zhuǎn)移O的所有權(quán);在轉(zhuǎn)移所有權(quán)后,也只有另一個(gè)線程T2能獲得這個(gè)O的所有權(quán),并且發(fā)布O的T1不會(huì)再訪問O。
所謂“所有權(quán)”,指修改對(duì)象的權(quán)利。
相對(duì)于線程封閉,串行線程封閉使得任意時(shí)刻,最多僅有一個(gè)線程擁有對(duì)象的所有權(quán)。當(dāng)然,這不是絕對(duì)的,只要線程T1事實(shí)不會(huì)再修改對(duì)象O,那么就相當(dāng)于僅有T2擁有對(duì)象的所有權(quán)。串行線層封閉讓對(duì)象變得可以共享(雖然只能串行的擁有所有權(quán)),靈活性得到大大提高;相對(duì)的,要共享對(duì)象就涉及安全發(fā)布的問題,依靠BlockingQueue等同步工具很容易實(shí)現(xiàn)這一點(diǎn)。
對(duì)象池是串行線程封閉的經(jīng)典應(yīng)用場(chǎng)景,如數(shù)據(jù)庫(kù)連接池等。
對(duì)象池
對(duì)象池利用了串行封閉:將對(duì)象O“借給”一個(gè)請(qǐng)求線程T1,T1使用完再交還給對(duì)象池,并保證“未擅自發(fā)布該對(duì)象”且“以后不再使用”;對(duì)象池收回O后,等T2來(lái)借的時(shí)候再把它借給T2,完成對(duì)象所有權(quán)的傳遞。
猴子擼了一個(gè)簡(jiǎn)化版的線程池,用戶只需要覆寫newObject()方法:
public abstract class AbstractObjectPool<T> {
? protected final int min;
? protected final int max;
? protected final List<T> usings = new LinkedList<>();
? protected final List<T> buffer = new LinkedList<>();
? private volatile boolean inited = false;
? public AbstractObjectPool(int min, int max) {
? ? this.min = min;
? ? this.max = max;
? ? if (this.min < 0 || this.min > this.max) {
? ? ? throw new IllegalArgumentException(String.format(
? ? ? ? ? "need 0 <= min <= max <= Integer.MAX_VALUE, given min: %s, max: %s", this.min, this.max));
? ? }
? }
? public void init() {
? ? for (int i = 0; i < min; i++) {
? ? ? buffer.add(newObject());
? ? }
? ? inited = true;
? }
? protected void checkInited() {
? ? if (!inited) {
? ? ? throw new IllegalStateException("not inited");
? ? }
? }
? abstract protected T newObject();
? public synchronized T getObject() {
? ? checkInited();
? ? if (usings.size() == max) {
? ? ? return null;
? ? }
? ? if (buffer.size() == 0) {
? ? ? T newObj = newObject();
? ? ? usings.add(newObj);
? ? ? return newObj;
? ? }
? ? T oldObj = buffer.remove(0);
? ? usings.add(oldObj);
? ? return oldObj;
? }
? public synchronized void freeObject(T obj) {
? ? checkInited();
? ? if (!usings.contains(obj)) {
? ? ? throw new IllegalArgumentException(String.format("obj not in using queue: %s", obj));
? ? }
? ? usings.remove(usings.indexOf(obj));
? ? buffer.add(obj);
? }
}
AbstractObjectPool具有以下特性:
支持設(shè)置最小、最大容量
對(duì)象一旦申請(qǐng)就不再釋放,避免了GC
雖然很簡(jiǎn)單,但大可以用于一些時(shí)間敏感、資源充裕的場(chǎng)景。如果時(shí)間進(jìn)一步敏感,可將getObject()、freeObject()改寫為并發(fā)程度更高的版本,但記得保證安全發(fā)布安全回收;如果資源不那么充裕,可以適當(dāng)增加對(duì)象回收策略。
可以看到,一個(gè)對(duì)象池的基本行為包括:
創(chuàng)建對(duì)象newObject()
借取對(duì)象getObject()
歸還對(duì)象freeObject()
典型的對(duì)象池有各種連接池、常量池等,應(yīng)用非常多,模型也大同小異,不做解析。令人迷惑的是線程池,很容易讓人誤以為線程池的核心原理也是對(duì)象池,下面來(lái)追一遍源碼。
線程池
首先擺出結(jié)論:線程池糅合了對(duì)象池模型,但核心原理是生產(chǎn)者-消費(fèi)者模型。
繼承結(jié)構(gòu)如下:
用戶可以將Runnable(或Callables)實(shí)例提交給線程池,線程池會(huì)異步執(zhí)行該任務(wù),返回響應(yīng)的結(jié)果(完成/返回值)。
猴子最喜歡的是submit(Callable<T> task)方法。我們從該方法入手,逐步深入函數(shù)棧,探究線程池的實(shí)現(xiàn)原理。
submit()
submit()方法在ExecutorService接口中定義,AbstractExecutorService實(shí)現(xiàn),ThreadPoolExecutor直接繼承。
public abstract class AbstractExecutorService implements ExecutorService {
...
? ? public <T> Future<T> submit(Callable<T> task) {
? ? ? ? if (task == null) throw new NullPointerException();
? ? ? ? RunnableFuture<T> ftask = newTaskFor(task);
? ? ? ? execute(ftask);
? ? ? ? return ftask;
? ? }
...
AbstractExecutorService#newTaskFor()創(chuàng)建一個(gè)RunnableFuture類型的FutureTask。
核心是execute()方法。
execute()
execute()方法在Executor接口中定義,ThreadPoolExecutor實(shí)現(xiàn)。
public class ThreadPoolExecutor extends AbstractExecutorService {
...
? ? public void execute(Runnable command) {
? ? ? ? if (command == null)
? ? ? ? ? ? throw new NullPointerException();
? ? ? ? int c = ctl.get();
? ? ? ? if (workerCountOf(c) < corePoolSize) {
? ? ? ? ? ? if (addWorker(command, true))
? ? ? ? ? ? ? ? return;
? ? ? ? ? ? c = ctl.get();
? ? ? ? }
? ? ? ? if (isRunning(c) && workQueue.offer(command)) {
? ? ? ? ? ? int recheck = ctl.get();
? ? ? ? ? ? if (! isRunning(recheck) && remove(command))
? ? ? ? ? ? ? ? reject(command);
? ? ? ? ? ? else if (workerCountOf(recheck) == 0)
? ? ? ? ? ? ? ? addWorker(null, false);
? ? ? ? }
? ? ? ? else if (!addWorker(command, false))
? ? ? ? ? ? reject(command);
? ? }
...
}
我們暫且忽略線程池的池化策略。關(guān)注一個(gè)最簡(jiǎn)單的場(chǎng)景,看能不能先回答一個(gè)問題:線程池中的任務(wù)如何執(zhí)行?
核心是addWorker()方法。以8行的參數(shù)為例,此時(shí),線程池中的線程數(shù)未達(dá)到最小線程池大小corePoolSize,通常可以直接在9行返回。
addWorker()
簡(jiǎn)化如下:
public class ThreadPoolExecutor extends AbstractExecutorService {
...
? ? private boolean addWorker(Runnable firstTask, boolean core) {
? ? ? ? boolean workerStarted = false;
? ? ? ? boolean workerAdded = false;
? ? ? ? Worker w = null;
? ? ? ? try {
? ? ? ? ? ? w = new Worker(firstTask);
? ? ? ? ? ? final Thread t = w.thread;
? ? ? ? ? ? if (t != null) {
? ? ? ? ? ? ? ? final ReentrantLock mainLock = this.mainLock;
? ? ? ? ? ? ? ? mainLock.lock();
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? int rs = runStateOf(ctl.get());
? ? ? ? ? ? ? ? ? ? if (rs < SHUTDOWN) {
? ? ? ? ? ? ? ? ? ? ? ? workers.add(w);
? ? ? ? ? ? ? ? ? ? ? ? workerAdded = true;
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? ? ? mainLock.unlock();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? if (workerAdded) {
? ? ? ? ? ? ? ? ? ? t.start();
? ? ? ? ? ? ? ? ? ? workerStarted = true;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? } finally {
? ? ? ? ? ? if (! workerStarted)
? ? ? ? ? ? ? ? addWorkerFailed(w);
? ? ? ? }
? ? ? ? return workerStarted;
? ? }
...
}
我去掉了很多用于管理線程池、維護(hù)線程安全的代碼。假設(shè)線程池未關(guān)閉,worker(即w,下同)添加成功,則必然能夠?qū)orker添加至workers中。workers是一個(gè)HashSet:
private final HashSet<Worker> workers = new HashSet<Worker>();
哪里是對(duì)象池?
如果說(shuō)與對(duì)象池有關(guān),那么workers即相當(dāng)于示例代碼中的using,應(yīng)用了對(duì)象池模型;只不過(guò)這里的using是一直增長(zhǎng)的,直到達(dá)到最大線程池大小maximumPoolSize。
但是很明顯,線程池并沒有將線程發(fā)布出去,workers也僅僅完成using“保存線程”的功能。那么,線程池中的任務(wù)如何執(zhí)行呢?跟線程池有沒有關(guān)系?
哪里又不是?
注意9、17、24行:
9行將我們提交到線程池的firstTask封裝入一個(gè)worker。
17行將worker加入workers,維護(hù)起來(lái)
24行則啟動(dòng)了worker中的線程t
核心在與這三行,但線程池并沒有直接在addWorker()中啟動(dòng)任務(wù)firstTask,代之以啟動(dòng)一個(gè)worker。最終任務(wù)必然被啟動(dòng),那么我們繼續(xù)看Worker如何啟動(dòng)這個(gè)任務(wù)。
Worker
Worker實(shí)現(xiàn)了Runnable接口:
private final class Worker
? ? extends AbstractQueuedSynchronizer
? ? implements Runnable
{
...
? ? Worker(Runnable firstTask) {
? ? ? ? setState(-1); // inhibit interrupts until runWorker
? ? ? ? this.firstTask = firstTask;
? ? ? ? this.thread = getThreadFactory().newThread(this);
? ? }
? ? /** Delegates main run loop to outer runWorker? */
? ? public void run() {
? ? ? ? runWorker(this);
? ? }
...
}
為什么要將構(gòu)造Worker時(shí)的參數(shù)命名為firstTask?因?yàn)楫?dāng)且僅當(dāng)需要建立新的Worker以執(zhí)行任務(wù)task時(shí),才會(huì)調(diào)用構(gòu)造函數(shù)。因此,任務(wù)task對(duì)于新Worker而言,是第一個(gè)任務(wù)firstTask。
Worker的實(shí)現(xiàn)非常簡(jiǎn)單:將自己作為Runable實(shí)例,構(gòu)造時(shí)在內(nèi)部創(chuàng)建并持有一個(gè)線程thread。Thread和Runable的使用大家很熟悉了,核心是Worker的run方法,它直接調(diào)用了runWorker()方法。
runWorker()
敲黑板!!!
重頭戲來(lái)了。簡(jiǎn)化如下:
public class ThreadPoolExecutor extends AbstractExecutorService {
...
? ? final void runWorker(Worker w) {
? ? ? ? Thread wt = Thread.currentThread();
? ? ? ? Runnable task = w.firstTask;
? ? ? ? w.firstTask = null;
? ? ? ? w.unlock(); // allow interrupts
? ? ? ? boolean completedAbruptly = true;
? ? ? ? try {
? ? ? ? ? ? while (task != null || (task = getTask()) != null) {
? ? ? ? ? ? ? ? w.lock();
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? beforeExecute(wt, task);
? ? ? ? ? ? ? ? ? ? Throwable thrown = null;
? ? ? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? ? ? task.run();
? ? ? ? ? ? ? ? ? ? } catch (RuntimeException x) {
? ? ? ? ? ? ? ? ? ? ? ? thrown = x; throw x;
? ? ? ? ? ? ? ? ? ? } catch (Error x) {
? ? ? ? ? ? ? ? ? ? ? ? thrown = x; throw x;
? ? ? ? ? ? ? ? ? ? } catch (Throwable x) {
? ? ? ? ? ? ? ? ? ? ? ? thrown = x; throw new Error(x);
? ? ? ? ? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? ? ? ? ? afterExecute(task, thrown);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? ? ? task = null;
? ? ? ? ? ? ? ? ? ? w.completedTasks++;
? ? ? ? ? ? ? ? ? ? w.unlock();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? completedAbruptly = false;
? ? ? ? } finally {
? ? ? ? ? ? processWorkerExit(w, completedAbruptly);
? ? ? ? }
? ? }
...
}
我們?cè)谇懊鎸⒁獔?zhí)行的任務(wù)賦值給firstTask,5-6行首先取出任務(wù)task,并將firstTask置為null。因?yàn)榻酉聛?lái)要執(zhí)行task,firstTask字段就沒有用了。
重點(diǎn)是10-31行的while循環(huán)。下面分情況討論。
case1:第一次進(jìn)入循環(huán),task不為null
case1對(duì)應(yīng)前面作出的諸多假設(shè)。
第一次進(jìn)入循環(huán)時(shí),task==firstTask,不為null,使10行布爾短路直接進(jìn)入循環(huán);從而16行執(zhí)行的是firstTask的run()方法;異常處理不表;最后,finally代碼塊中,task會(huì)被置為null,導(dǎo)致下一輪循環(huán)會(huì)進(jìn)入case2。
case2:非第一次進(jìn)入循環(huán),task為null
case2是更普遍的情況,也就是線程池的核心。
case1中,task被置為了null,使10行布爾表達(dá)式執(zhí)行第二部分(task = getTask()) != null(getTask()稍后再講,它返回一個(gè)用戶已提交的任務(wù))。假設(shè)task得到了一個(gè)已提交的任務(wù),從而16行執(zhí)行的是新獲得的任務(wù)task的run()方法。后同case1,最后task仍然會(huì)被置為null,以后循環(huán)都將進(jìn)入case2。
GETTASK()
任務(wù)從哪來(lái)呢?簡(jiǎn)化如下:
public class ThreadPoolExecutor extends AbstractExecutorService {
...
? ? private Runnable getTask() {
? ? ? ? boolean timedOut = false;
? ? ? ? for (;;) {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? Runnable r = timed ?
? ? ? ? ? ? ? ? ? ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
? ? ? ? ? ? ? ? ? ? workQueue.take();
? ? ? ? ? ? ? ? if (r != null)
? ? ? ? ? ? ? ? ? ? return r;
? ? ? ? ? ? ? ? timedOut = true;
? ? ? ? ? ? } catch (InterruptedException retry) {
? ? ? ? ? ? ? ? timedOut = false;
? ? ? ? ? ? }
? ? ? ? }
? ? }
...
}
我們先看最簡(jiǎn)單的,19-28行。
首先,workQueue是一個(gè)線程安全的BlockingQueue,大部分時(shí)候使用的實(shí)現(xiàn)類是LinkedBlockingQueue:
private final BlockingQueue<Runnable> workQueue;
假設(shè)timed為false,則調(diào)用阻塞的take()方法,返回的r一定不是null,從而12行退出,將任務(wù)交給了某個(gè)worker線程。
一個(gè)小細(xì)節(jié)有點(diǎn)意思:前面每個(gè)worker線程runWorker()方法時(shí),在循環(huán)中加鎖粒度在worker級(jí)別,直接使用的lock同步;但因?yàn)槊恳粋€(gè)woker都會(huì)調(diào)用getTask(),考慮到性能因素,源碼中g(shù)etTask()中使用樂觀的CAS+SPIN實(shí)現(xiàn)無(wú)鎖同步。
關(guān)于樂觀鎖和CAS,可以參考https://monkeysayhi.github.io/2017/10/22/源碼%7C并發(fā)一枝花之ConcurrentLinkedQueue【偽】/
workQueue中的元素從哪來(lái)呢?這就要回顧execute()方法了。
EXECUTE()
public class ThreadPoolExecutor extends AbstractExecutorService {
...
? ? public void execute(Runnable command) {
? ? ? ? if (command == null)
? ? ? ? ? ? throw new NullPointerException();
? ? ? ? int c = ctl.get();
? ? ? ? if (workerCountOf(c) < corePoolSize) {
? ? ? ? ? ? if (addWorker(command, true))
? ? ? ? ? ? ? ? return;
? ? ? ? ? ? c = ctl.get();
? ? ? ? }
? ? ? ? if (isRunning(c) && workQueue.offer(command)) {
? ? ? ? ? ? int recheck = ctl.get();
? ? ? ? ? ? if (! isRunning(recheck) && remove(command))
? ? ? ? ? ? ? ? reject(command);
? ? ? ? ? ? else if (workerCountOf(recheck) == 0)
? ? ? ? ? ? ? ? addWorker(null, false);
? ? ? ? }
? ? ? ? else if (!addWorker(command, false))
? ? ? ? ? ? reject(command);
? ? }
...
}
前面以8行的參數(shù)為例,此時(shí),線程池中的線程數(shù)未達(dá)到最小線程池大小corePoolSize,通常可以直接在9行返回。進(jìn)入8行的條件是“當(dāng)前worker數(shù)小于最小線程池大小corePoolSize”。
如果不滿足,會(huì)繼續(xù)執(zhí)行到12行。isRunning(c)判斷線程池是否未關(guān)閉,我們關(guān)注未關(guān)閉的情況;則會(huì)繼續(xù)執(zhí)行布爾表達(dá)式的第二部分workQueue.offer(command),嘗試將任務(wù)command放入隊(duì)列workQueue。
workQueue.offer()的行為取決于線程池持有的BlockingQueue實(shí)例。
Executors.newFixedThreadPool()、Executors.newSingleThreadExecutor()創(chuàng)建的線程池使用LinkedBlockingQueue,而Executors.newCachedThreadPool()創(chuàng)建的線程池則使用SynchronousQueue。
以LinkedBlockingQueue為例,創(chuàng)建時(shí)不配置容量,即創(chuàng)建為無(wú)界隊(duì)列,則LinkedBlockingQueue#offer()永遠(yuǎn)返回true,從而進(jìn)入12-18行。
更細(xì)節(jié)的內(nèi)容不必關(guān)心了,當(dāng)workQueue.offer()返回true時(shí),已經(jīng)將任務(wù)command放入了隊(duì)列workQueue。當(dāng)未來(lái)的某個(gè)時(shí)刻,某worker執(zhí)行完某一個(gè)任務(wù)之后,會(huì)從workQueue中再取出一個(gè)任務(wù)繼續(xù)執(zhí)行,直到線程池關(guān)閉,直到海枯石爛。
CachedThreadPool是一種無(wú)界線程池,使用SynchronousQueue能進(jìn)一步提升性能,簡(jiǎn)化代碼結(jié)構(gòu)。留給讀者分析。
CASE2小結(jié)
可以看到,實(shí)際上,線程池的核心原理與對(duì)象池模型無(wú)關(guān),而是生產(chǎn)者-消費(fèi)者模型:
-
生產(chǎn)者(調(diào)用submit()或execute()方法)將任務(wù)task放入隊(duì)列
-
消費(fèi)者(worker線程)循環(huán)從隊(duì)列中取出任務(wù)處理任務(wù)(執(zhí)行task.run())
鉤子方法
回到runWorker()方法,在執(zhí)行任務(wù)的過(guò)程中,線程池保留了一些鉤子方法,如beforeExecute()、afterExecute()。用戶可以在實(shí)現(xiàn)自己的線程池時(shí),可以通過(guò)覆寫鉤子方法為線程池添加功能。
但猴子不認(rèn)為鉤子方法是一種好的設(shè)計(jì)。因?yàn)殂^子方法大多依賴于源碼實(shí)現(xiàn),那么除非了解源碼或API聲明絕對(duì)的嚴(yán)謹(jǐn)正確,否則很難正確使用鉤子方法。等發(fā)生錯(cuò)誤時(shí)再去了解實(shí)現(xiàn),可能就太晚了。說(shuō)到底,還是不要使用類似extends這種表達(dá)“擴(kuò)展”語(yǔ)義的語(yǔ)法來(lái)實(shí)現(xiàn)繼承,詳見Java中如何恰當(dāng)?shù)谋磉_(dá)“繼承”與“擴(kuò)展”的語(yǔ)義?。
當(dāng)然,鉤子方法也是極其方便的。權(quán)衡看待。
總結(jié)
相對(duì)于線程封閉,串行線程封閉離用戶的距離更近一些,簡(jiǎn)單靈活,實(shí)用性強(qiáng),很容易掌握。而線程封閉更多淪為單純的設(shè)計(jì)策略,單純使用線程封閉的場(chǎng)景不多。
線程池與串行線程封閉、對(duì)象池的關(guān)系不大,但經(jīng)常被混為一談;沒看過(guò)源碼的很難想到其實(shí)現(xiàn)方案,面試時(shí)也能立分高下。
線程池的實(shí)現(xiàn)很有意思。在追源碼之前,猴子一直以為線程池就是把線程存起來(lái),用的時(shí)候取出來(lái)執(zhí)行任務(wù);看了源碼才知道實(shí)現(xiàn)如此之妙,簡(jiǎn)潔優(yōu)雅效率高。
源碼才是最好的老師。
來(lái)源:ImportNew
總結(jié)
以上是生活随笔為你收集整理的从串行线程封闭到对象池、线程池的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 用画小狗的方法来解释Java中的值传递
- 下一篇: 这本造价500万的“黑科技”日历,用37