java 线程池 源码_java线程池源码分析
我們?cè)陉P(guān)閉線程池的時(shí)候會(huì)使用shutdown()和shutdownNow(),那么問題來了:
這兩個(gè)方法又什么區(qū)別呢?
他們背后的原理是什么呢?
線程池中線程超過了coresize后會(huì)怎么操作呢?
為了解決這些疑問我們需要分析java線程池的原理。
1 基本使用
1.1 繼承關(guān)系
平常我們?cè)趧?chuàng)建線程池經(jīng)常使用的方式如下:
ExecutorService executorService = Executors.newFixedThreadPool(5);
看下newFixedThreadPool源碼, 其實(shí)Executors是個(gè)工廠類,內(nèi)部是new了一個(gè)ThreadPoolExecuto:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
}
參數(shù)的意義就不介紹了,網(wǎng)上有很多內(nèi)容,看源碼注釋也可以明白。
線程池中類的繼承關(guān)系如下:
2 源碼分析
2.1 入口
將一個(gè)Runnable放到線程池執(zhí)行有兩種方式,一個(gè)是調(diào)用ThreadPoolExecutor#submit,一個(gè)是調(diào)用ThreadPoolExecutor#execute。其實(shí)submit是將Runnable封裝成了一個(gè)RunnableFuture,然后再調(diào)用execute,最終調(diào)用的還是execute,所以我們這里就只從ThreadPoolExecutor#execute開始分析。
2.2 ctl和線程池狀態(tài)
ThreadPoolExecutor中有個(gè)重要的屬性是ctl
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 高3位表示狀態(tài),低29位表示線程池中線程的多少
private static final int COUNT_BITS = Integer.SIZE - 3; // 32-3 = 29
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 左移29為減1,即最終得到為高3位為0,低29位為1的數(shù)字,作為掩碼,是二進(jìn)制運(yùn)算中常用的方法
private static final int RUNNING = -1 << COUNT_BITS; // 高三位111
private static final int SHUTDOWN = 0 << COUNT_BITS; // 高三位000
private static final int STOP = 1 << COUNT_BITS; // 高三位001
private static final int TIDYING = 2 << COUNT_BITS; // 高三位010
private static final int TERMINATED = 3 << COUNT_BITS; // 高三位011
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; } // 保留高3位,即計(jì)算線程池狀態(tài)
private static int workerCountOf(int c) { return c & CAPACITY; } // 保留低29位, 即計(jì)算線程數(shù)量
private static int ctlOf(int rs, int wc) { return rs | wc; } // 求ctl
ThreadPoolExecutor中使用32位Integer來表示線程池的狀態(tài)和線程的數(shù)量,其中高3位表示狀態(tài),低29位表示數(shù)量。如果對(duì)二進(jìn)制運(yùn)行不熟悉可以參考:二進(jìn)制運(yùn)算。從上也可以看出線程池有五種狀態(tài),我們關(guān)心前3中狀態(tài)
RUNNING 接收task和處理queue中的task
SHUTDOWN 不再接收新的task,但是會(huì)處理完正在運(yùn)行的task和queue中的task,不會(huì)interrupt正在執(zhí)行的task,其實(shí)調(diào)用shutdown后線程池處于該狀態(tài)
STOP 不再接收新的task,也不處理queue中的task,同時(shí)正在運(yùn)行的線程會(huì)被interrupt。調(diào)用shutdownNow后線程池會(huì)處于該狀態(tài)。
2.3 execute
明白了ctl和線程池的狀態(tài)后我們來具體看下execute的處理邏輯
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // 線程數(shù)量小于coresize,那么就調(diào)用addWorker
if (addWorker(command, true)) // 這里知道,返回true就不往下走了
return;
c = ctl.get();
}
// 不滿足上述條件,即線程數(shù)量 >= coreSize,或者addWorker返回fasle,那么走下面的邏輯
if (isRunning(c) && workQueue.offer(command)) { // 可以看到是往blockingqueue中放task
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果不滿足上述條件,即blockingqueue也放不進(jìn)去,那么就走下面的邏輯
else if (!addWorker(command, false))
reject(command);
}
從上面的代碼我們可以看到線程池處理線程的基本思路是: 如果線程數(shù)量小于coresize那么就執(zhí)行task,否則就放到queue中,如果queue也放不下就走下面addWorker,如果也失敗了,那么就調(diào)用reject策略。當(dāng)然還涉及一些細(xì)節(jié),需要進(jìn)一步分析。
2.4 addWorker
execute中反復(fù)調(diào)用的是addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c); // 計(jì)算線程池狀態(tài)
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && // 先忽略
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c); // 線程數(shù)量
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) // 可見如果超過了運(yùn)行的最大線程數(shù)量則返回false
return false;
if (compareAndIncrementWorkerCount(c)) // 如果成功,線程數(shù)量肯定加1
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask); // 將task封裝成了Worker
final Thread t = w.thread; // 來獲取worker的thread
if (t != null) {
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); // 將worker添加到hashset中報(bào)存,關(guān)閉的時(shí)候要使用
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) { // 經(jīng)過一些檢查, 啟動(dòng)了work的thread
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w); // 如果線程啟動(dòng)失敗,則將線程數(shù)減1
}
return workerStarted;
}
上面的代碼看起來比較復(fù)雜,但是如果我們忽略具體的細(xì)節(jié),從大致思路上看,其實(shí)也比較簡單。上面代碼的主要思路就是:除了一些狀態(tài)檢查外,首先將線程數(shù)量加1,然后將runnable分裝成一個(gè)worker,去啟動(dòng)worker線程,如果啟動(dòng)失敗則再將線程數(shù)量減1。返回false的原因可能是線程數(shù)量大于允許的數(shù)量。所以addWorker調(diào)用成功,則會(huì)啟動(dòng)一個(gè)work線程,且線程池中線程數(shù)量加1
2.5 worker
woker是線程池中真正的線程實(shí)體。線程池中的線程不是自定義的Runnable實(shí)現(xiàn)的線程,而是woker線程,worker在run方法里調(diào)用了自定義的Runnable的run方法。
Worker繼承了AQS,并實(shí)現(xiàn)了runnable接口:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // 這個(gè)時(shí)候回頭看看addWorker中t.start(), 就明白了啟動(dòng)的實(shí)際是一個(gè)Woker線程,而不是用戶定義的Runnable
}
public void run() {
runWorker(this);
}
}
Worker中firstTask存儲(chǔ)了用戶定義的Runnable,thread是以他自身為參數(shù)的Thread對(duì)象。getThreadFactory()默認(rèn)返回是Executors#DefaultThreadFactory,用來新建線程,并定義了線程名稱的前綴等:
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-"; //
}
public Thread newThread(Runnable r) { // 調(diào)用后新建一個(gè)線程
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
2.6 runWoker
Worker的run方法調(diào)用了runWorker,并將自身作為參數(shù)傳了進(jìn)去,下面看看問題的關(guān)鍵:runWorker:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) { // 注意這里的while循環(huán),這里很關(guān)鍵。這里注意,如果兩個(gè)條件都滿足了,那么線程就結(jié)束了
w.lock(); // 注意worker繼承了AQS,相當(dāng)于自己實(shí)現(xiàn)了鎖,這個(gè)在關(guān)閉線程的時(shí)候有用
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); // 僅僅是回調(diào)了Runnable的run方法
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null; // 重點(diǎn),task執(zhí)行完后就被置位null
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly); // 注意while循環(huán)結(jié)束后worker線程就結(jié)束了
}
}
runWorker中有個(gè)while循環(huán),while中判斷條件為(task != null || (task = getTask()) != null)。假設(shè)我們按照正常的邏輯,即task != null,則會(huì)調(diào)用task.run方法,執(zhí)行完run方法后然后在finally中task被置為null;接著又進(jìn)入while循環(huán)判斷,這次task == null,所以不符合第一個(gè)判斷條件,則會(huì)繼續(xù)判斷 task == getTask()) != null。我們來看下getTask做了什么。
2.7 getTask
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 當(dāng)調(diào)用shutdown()方法的時(shí)候,線程狀態(tài)就為shutdown了; 當(dāng)調(diào)用shutdownow()的時(shí)候,線程狀態(tài)就為stop了
decrementWorkerCount();
return null;
}
boolean timed; // Are workers subject to culling?
for (;;) { // 通過死循環(huán)設(shè)置狀態(tài)
int wc = workerCountOf(c);
// 設(shè)置允許core線程timeout或者線程數(shù)量大于coresize,則允許線程超時(shí)
timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果線程數(shù)量 <= 最大線程數(shù) 且 沒有超時(shí)和允許超時(shí) 則跳出死循環(huán)
if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
try {
// 這里是關(guān)鍵,如果允許超時(shí)則調(diào)用poll從queue中取出task,否則就調(diào)用take可阻塞的獲取task
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null) // 獲取到task則返回,然后runWorker的while循環(huán)就繼續(xù)執(zhí)行,并調(diào)用task的run方法
return r;
timedOut = true; // 否則設(shè)置為timeOut,繼續(xù)循環(huán),但是下次循環(huán)會(huì)走到if (compareAndDecrementWorkerCount(c)) 處,并返回null。
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
忽略掉具體細(xì)節(jié),getTask的整體思路是: 從blockqueue中拿去task,如果queue中沒有task則分兩種情況:
如果允許超時(shí)則調(diào)用poll(keepAliveTime, TimeUnit.NANOSECONDS),在規(guī)定時(shí)間沒有返回了則getTask返回null,runWorker結(jié)束while循環(huán),work線程結(jié)束。當(dāng)線程數(shù)量大于coresize且blockqueue滿的時(shí)候且小于maxsize的時(shí)候,新創(chuàng)建的線程便是走這個(gè)邏輯;或者允許core線程超時(shí)的時(shí)候也是走這個(gè)邏輯
如果不允許超時(shí),則會(huì)一直阻塞直到blockqueue中有了新的task。take方法阻塞則表示worker線程也阻塞,也就是在沒有task執(zhí)行的情況下,worker線程便會(huì)阻塞等待。core線程走的就是這個(gè)邏輯。
這個(gè)時(shí)候回頭再看下runWorker,如果task != null,那么就會(huì)執(zhí)行task的run方法,執(zhí)行完后task就會(huì)為被置為null,再次進(jìn)入while循環(huán)執(zhí)行g(shù)etTask阻塞在這里了。通過這種方式保留住了線程。如果while循環(huán)結(jié)束了,那么worker線程也就結(jié)束了。
2.8 再看addWorker
分析到這里我們?cè)賮砜聪耡ddWoker。addWorker可以將第一個(gè)參數(shù)設(shè)置為null。例如ThreadPoolExecutor#prestartAllCoreThreads:
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true)) // addWorker第一個(gè)參數(shù)是null
++n;
return n;
}
經(jīng)過前面的分析,我們知道addWoker用來啟動(dòng)一個(gè)worker線程,worker線程調(diào)用runWorker來執(zhí)行,而runWorker中有個(gè)while循環(huán),判斷條件是(task != null || (task = getTask()) != null)。因?yàn)槲覀儌魅氲膖ask為null,所以就會(huì)判斷task = getTask()) != null,而getTask就是去blockqueue中拿去數(shù)據(jù),如果沒有任務(wù)就會(huì)阻塞住。這個(gè)時(shí)候就是一個(gè)阻塞的線程在等待task的到來了。所以傳入?yún)?shù)為null表示創(chuàng)建一個(gè)空的線程,什么都不執(zhí)行。
2.9 再看execute
已經(jīng)知道了線程池內(nèi)部的大概工作情況,我們?cè)賮砜聪氯绻衏ore線程都創(chuàng)建好了且處于空置狀態(tài),這個(gè)時(shí)候新放入一個(gè)線程的執(zhí)行流程。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // core線程都創(chuàng)建好了,所以判斷條件不滿足
if (addWorker(command, true))
return;
c = ctl.get();
}
// 會(huì)走到這里,會(huì)通過offer往blockingqueue里放置一個(gè)task。這個(gè)時(shí)候阻塞的core線程會(huì)通過blockingqueue的take拿到task執(zhí)行,類似一個(gè)生產(chǎn)者消費(fèi)者的情況
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果blockingqueue添加失敗,則創(chuàng)建線程直到maxsize
else if (!addWorker(command, false))
reject(command);
}
可見,線程和execute通過blockingqueue來通信,而不是其他方式,execute往blockingqueue中放置task,線程通過take來獲取。整體線程池的邏輯如下圖
2.10 shutdown
這個(gè)時(shí)候我們終于可以來看看shutdown和shutdownNow了
看下shutdown
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN); // 重點(diǎn),將線程狀態(tài)置為shutdown,這樣getTask等workqueue為空后就返回null了
interruptIdleWorkers(); // 重點(diǎn)
onShutdown(); // 什么都沒做
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 線程沒有中斷 且 獲取到worker的鎖
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt(); // 調(diào)用interrup,中斷線程
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
shutdown的核心方法在interruptIdleWorkers里,這里可以看到在t.interrupt的時(shí)候有個(gè)判斷添加,一個(gè)是線程沒有設(shè)置中斷標(biāo)記,第二個(gè)是獲取到worker的鎖,我們注意下第二個(gè)條件?;仡^看下runWorker,while中執(zhí)行task的run方法的時(shí)候,會(huì)先獲取到worker線程的鎖,所以如果線程正在執(zhí)行task的run方法,則shutdown的時(shí)候會(huì)獲取鎖失敗,也就不會(huì)中斷線程了。這里可以得出結(jié)論:shutdown不會(huì)中斷正在執(zhí)行的線程。
如果blockingqueu中有task還沒執(zhí)行完呢? 這個(gè)時(shí)候while中的take并不會(huì)阻塞,也不會(huì)被中斷,shutdown中也沒有清空blockingqueue的操作。所以可以得出結(jié)論:shutdown會(huì)等blockingqueue中的task執(zhí)行完成再關(guān)閉??梢哉fshutdown是一種比較溫柔的關(guān)閉方式了。
如果core線程都阻塞在take方法上了,即沒有正在執(zhí)行的task了,那么這個(gè)時(shí)候 t.interrupt則會(huì)中斷take方法,worker線程的while循環(huán)結(jié)束,worker線程結(jié)束。當(dāng)所有的worker線程都結(jié)束后線程池就關(guān)閉了
總結(jié)下就是: shutdown會(huì)把它被調(diào)用前放到線程池中的task全部執(zhí)行完。
2.11 shutdownNow
再來看下shutdownNow
public List shutdownNow() {
List tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP); // 重點(diǎn),將線程狀態(tài)置為stop
interruptWorkers(); // 重點(diǎn)
tasks = drainQueue(); // 重點(diǎn)
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { // 沒有去獲取woker的鎖
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
private List drainQueue() {
BlockingQueue q = workQueue;
List taskList = new ArrayList();
q.drainTo(taskList); // 將blockingqueue中的task清空
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
從上面的代碼可以看出:
shutdownNow不會(huì)去獲取worker的鎖,所以shutdownNow會(huì)導(dǎo)致正在運(yùn)行的task也被中斷
shutdownNow會(huì)將blockingqueue中的task清空,所以在blockingqueue中的task也不會(huì)被執(zhí)行
總結(jié)就是shutdownNow比較粗暴,調(diào)用他后,他會(huì)將所有之前提交的任務(wù)都interrupt,且將blockingqueue中的task清空
另外就是不論是shutdown還是shutdownNow都是調(diào)用Thread的interrupt()方法。如果task不響應(yīng)中斷或者忽略中斷標(biāo)記,那么這個(gè)線程就不會(huì)被終止。例如在run中執(zhí)行以下邏輯
poolExecutor.execute(new Runnable() {
@Override
public void run() {
while (true) {
System.out.println("b");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.printf("不處理"); // 忽略中斷
}
}
}
});
運(yùn)行結(jié)果是,即使調(diào)用了shutdownNow也終止不了線程運(yùn)行
b
0
不處理b
b
b
b
b
....
3 總結(jié)
線程通過while循環(huán)不停的從blockingqueue中獲取task來保留線程,避免重復(fù)重建線程
4 參考
總結(jié)
以上是生活随笔為你收集整理的java 线程池 源码_java线程池源码分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python朋友圈表白_情人节「告白生成
- 下一篇: java的8中数据类型_java 8种基