日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 >

java 线程池 源码_java线程池源码分析

發(fā)布時(shí)間:2025/3/21 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java 线程池 源码_java线程池源码分析 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

我們?cè)陉P(guān)閉線程池的時(shí)候會(huì)使用shutdown()和shutdownNow(),那么問題來了:

這兩個(gè)方法又什么區(qū)別呢?

他們背后的原理是什么呢?

線程池中線程超過了coresize后會(huì)怎么操作呢?

為了解決這些疑問我們需要分析java線程池的原理。

1 基本使用

1.1 繼承關(guān)系

平常我們?cè)趧?chuàng)建線程池經(jīng)常使用的方式如下:

ExecutorService executorService = Executors.newFixedThreadPool(5);

看下newFixedThreadPool源碼, 其實(shí)Executors是個(gè)工廠類,內(nèi)部是new了一個(gè)ThreadPoolExecuto:

public static ExecutorService newFixedThreadPool(int nThreads) {

return new ThreadPoolExecutor(nThreads, nThreads,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue());

}

參數(shù)的意義就不介紹了,網(wǎng)上有很多內(nèi)容,看源碼注釋也可以明白。

線程池中類的繼承關(guān)系如下:

2 源碼分析

2.1 入口

將一個(gè)Runnable放到線程池執(zhí)行有兩種方式,一個(gè)是調(diào)用ThreadPoolExecutor#submit,一個(gè)是調(diào)用ThreadPoolExecutor#execute。其實(shí)submit是將Runnable封裝成了一個(gè)RunnableFuture,然后再調(diào)用execute,最終調(diào)用的還是execute,所以我們這里就只從ThreadPoolExecutor#execute開始分析。

2.2 ctl和線程池狀態(tài)

ThreadPoolExecutor中有個(gè)重要的屬性是ctl

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 高3位表示狀態(tài),低29位表示線程池中線程的多少

private static final int COUNT_BITS = Integer.SIZE - 3; // 32-3 = 29

private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 左移29為減1,即最終得到為高3位為0,低29位為1的數(shù)字,作為掩碼,是二進(jìn)制運(yùn)算中常用的方法

private static final int RUNNING = -1 << COUNT_BITS; // 高三位111

private static final int SHUTDOWN = 0 << COUNT_BITS; // 高三位000

private static final int STOP = 1 << COUNT_BITS; // 高三位001

private static final int TIDYING = 2 << COUNT_BITS; // 高三位010

private static final int TERMINATED = 3 << COUNT_BITS; // 高三位011

// Packing and unpacking ctl

private static int runStateOf(int c) { return c & ~CAPACITY; } // 保留高3位,即計(jì)算線程池狀態(tài)

private static int workerCountOf(int c) { return c & CAPACITY; } // 保留低29位, 即計(jì)算線程數(shù)量

private static int ctlOf(int rs, int wc) { return rs | wc; } // 求ctl

ThreadPoolExecutor中使用32位Integer來表示線程池的狀態(tài)和線程的數(shù)量,其中高3位表示狀態(tài),低29位表示數(shù)量。如果對(duì)二進(jìn)制運(yùn)行不熟悉可以參考:二進(jìn)制運(yùn)算。從上也可以看出線程池有五種狀態(tài),我們關(guān)心前3中狀態(tài)

RUNNING 接收task和處理queue中的task

SHUTDOWN 不再接收新的task,但是會(huì)處理完正在運(yùn)行的task和queue中的task,不會(huì)interrupt正在執(zhí)行的task,其實(shí)調(diào)用shutdown后線程池處于該狀態(tài)

STOP 不再接收新的task,也不處理queue中的task,同時(shí)正在運(yùn)行的線程會(huì)被interrupt。調(diào)用shutdownNow后線程池會(huì)處于該狀態(tài)。

2.3 execute

明白了ctl和線程池的狀態(tài)后我們來具體看下execute的處理邏輯

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

int c = ctl.get();

if (workerCountOf(c) < corePoolSize) { // 線程數(shù)量小于coresize,那么就調(diào)用addWorker

if (addWorker(command, true)) // 這里知道,返回true就不往下走了

return;

c = ctl.get();

}

// 不滿足上述條件,即線程數(shù)量 >= coreSize,或者addWorker返回fasle,那么走下面的邏輯

if (isRunning(c) && workQueue.offer(command)) { // 可以看到是往blockingqueue中放task

int recheck = ctl.get();

if (! isRunning(recheck) && remove(command))

reject(command);

else if (workerCountOf(recheck) == 0)

addWorker(null, false);

}

// 如果不滿足上述條件,即blockingqueue也放不進(jìn)去,那么就走下面的邏輯

else if (!addWorker(command, false))

reject(command);

}

從上面的代碼我們可以看到線程池處理線程的基本思路是: 如果線程數(shù)量小于coresize那么就執(zhí)行task,否則就放到queue中,如果queue也放不下就走下面addWorker,如果也失敗了,那么就調(diào)用reject策略。當(dāng)然還涉及一些細(xì)節(jié),需要進(jìn)一步分析。

2.4 addWorker

execute中反復(fù)調(diào)用的是addWorker

private boolean addWorker(Runnable firstTask, boolean core) {

retry:

for (;;) {

int c = ctl.get();

int rs = runStateOf(c); // 計(jì)算線程池狀態(tài)

// Check if queue empty only if necessary.

if (rs >= SHUTDOWN && // 先忽略

! (rs == SHUTDOWN &&

firstTask == null &&

! workQueue.isEmpty()))

return false;

for (;;) {

int wc = workerCountOf(c); // 線程數(shù)量

if (wc >= CAPACITY ||

wc >= (core ? corePoolSize : maximumPoolSize)) // 可見如果超過了運(yùn)行的最大線程數(shù)量則返回false

return false;

if (compareAndIncrementWorkerCount(c)) // 如果成功,線程數(shù)量肯定加1

break retry;

c = ctl.get(); // Re-read ctl

if (runStateOf(c) != rs)

continue retry;

// else CAS failed due to workerCount change; retry inner loop

}

}

boolean workerStarted = false;

boolean workerAdded = false;

Worker w = null;

try {

final ReentrantLock mainLock = this.mainLock;

w = new Worker(firstTask); // 將task封裝成了Worker

final Thread t = w.thread; // 來獲取worker的thread

if (t != null) {

mainLock.lock();

try {

// Recheck while holding lock.

// Back out on ThreadFactory failure or if

// shut down before lock acquired.

int c = ctl.get();

int rs = runStateOf(c);

if (rs < SHUTDOWN ||

(rs == SHUTDOWN && firstTask == null)) {

if (t.isAlive()) // precheck that t is startable

throw new IllegalThreadStateException();

workers.add(w); // 將worker添加到hashset中報(bào)存,關(guān)閉的時(shí)候要使用

int s = workers.size();

if (s > largestPoolSize)

largestPoolSize = s;

workerAdded = true;

}

} finally {

mainLock.unlock();

}

if (workerAdded) { // 經(jīng)過一些檢查, 啟動(dòng)了work的thread

t.start();

workerStarted = true;

}

}

} finally {

if (! workerStarted)

addWorkerFailed(w); // 如果線程啟動(dòng)失敗,則將線程數(shù)減1

}

return workerStarted;

}

上面的代碼看起來比較復(fù)雜,但是如果我們忽略具體的細(xì)節(jié),從大致思路上看,其實(shí)也比較簡單。上面代碼的主要思路就是:除了一些狀態(tài)檢查外,首先將線程數(shù)量加1,然后將runnable分裝成一個(gè)worker,去啟動(dòng)worker線程,如果啟動(dòng)失敗則再將線程數(shù)量減1。返回false的原因可能是線程數(shù)量大于允許的數(shù)量。所以addWorker調(diào)用成功,則會(huì)啟動(dòng)一個(gè)work線程,且線程池中線程數(shù)量加1

2.5 worker

woker是線程池中真正的線程實(shí)體。線程池中的線程不是自定義的Runnable實(shí)現(xiàn)的線程,而是woker線程,worker在run方法里調(diào)用了自定義的Runnable的run方法。

Worker繼承了AQS,并實(shí)現(xiàn)了runnable接口:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

Worker(Runnable firstTask) {

setState(-1); // inhibit interrupts until runWorker

this.firstTask = firstTask;

this.thread = getThreadFactory().newThread(this); // 這個(gè)時(shí)候回頭看看addWorker中t.start(), 就明白了啟動(dòng)的實(shí)際是一個(gè)Woker線程,而不是用戶定義的Runnable

}

public void run() {

runWorker(this);

}

}

Worker中firstTask存儲(chǔ)了用戶定義的Runnable,thread是以他自身為參數(shù)的Thread對(duì)象。getThreadFactory()默認(rèn)返回是Executors#DefaultThreadFactory,用來新建線程,并定義了線程名稱的前綴等:

static class DefaultThreadFactory implements ThreadFactory {

private static final AtomicInteger poolNumber = new AtomicInteger(1);

private final ThreadGroup group;

private final AtomicInteger threadNumber = new AtomicInteger(1);

private final String namePrefix;

DefaultThreadFactory() {

SecurityManager s = System.getSecurityManager();

group = (s != null) ? s.getThreadGroup() :

Thread.currentThread().getThreadGroup();

namePrefix = "pool-" +

poolNumber.getAndIncrement() +

"-thread-"; //

}

public Thread newThread(Runnable r) { // 調(diào)用后新建一個(gè)線程

Thread t = new Thread(group, r,

namePrefix + threadNumber.getAndIncrement(),

0);

if (t.isDaemon())

t.setDaemon(false);

if (t.getPriority() != Thread.NORM_PRIORITY)

t.setPriority(Thread.NORM_PRIORITY);

return t;

}

}

2.6 runWoker

Worker的run方法調(diào)用了runWorker,并將自身作為參數(shù)傳了進(jìn)去,下面看看問題的關(guān)鍵:runWorker:

final void runWorker(Worker w) {

Thread wt = Thread.currentThread();

Runnable task = w.firstTask;

w.firstTask = null;

w.unlock(); // allow interrupts

boolean completedAbruptly = true;

try {

while (task != null || (task = getTask()) != null) { // 注意這里的while循環(huán),這里很關(guān)鍵。這里注意,如果兩個(gè)條件都滿足了,那么線程就結(jié)束了

w.lock(); // 注意worker繼承了AQS,相當(dāng)于自己實(shí)現(xiàn)了鎖,這個(gè)在關(guān)閉線程的時(shí)候有用

if ((runStateAtLeast(ctl.get(), STOP) ||

(Thread.interrupted() &&

runStateAtLeast(ctl.get(), STOP))) &&

!wt.isInterrupted())

wt.interrupt();

try {

beforeExecute(wt, task);

Throwable thrown = null;

try {

task.run(); // 僅僅是回調(diào)了Runnable的run方法

} catch (RuntimeException x) {

thrown = x; throw x;

} catch (Error x) {

thrown = x; throw x;

} catch (Throwable x) {

thrown = x; throw new Error(x);

} finally {

afterExecute(task, thrown);

}

} finally {

task = null; // 重點(diǎn),task執(zhí)行完后就被置位null

w.completedTasks++;

w.unlock();

}

}

completedAbruptly = false;

} finally {

processWorkerExit(w, completedAbruptly); // 注意while循環(huán)結(jié)束后worker線程就結(jié)束了

}

}

runWorker中有個(gè)while循環(huán),while中判斷條件為(task != null || (task = getTask()) != null)。假設(shè)我們按照正常的邏輯,即task != null,則會(huì)調(diào)用task.run方法,執(zhí)行完run方法后然后在finally中task被置為null;接著又進(jìn)入while循環(huán)判斷,這次task == null,所以不符合第一個(gè)判斷條件,則會(huì)繼續(xù)判斷 task == getTask()) != null。我們來看下getTask做了什么。

2.7 getTask

private Runnable getTask() {

boolean timedOut = false; // Did the last poll() time out?

retry:

for (;;) {

int c = ctl.get();

int rs = runStateOf(c);

// Check if queue empty only if necessary.

if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 當(dāng)調(diào)用shutdown()方法的時(shí)候,線程狀態(tài)就為shutdown了; 當(dāng)調(diào)用shutdownow()的時(shí)候,線程狀態(tài)就為stop了

decrementWorkerCount();

return null;

}

boolean timed; // Are workers subject to culling?

for (;;) { // 通過死循環(huán)設(shè)置狀態(tài)

int wc = workerCountOf(c);

// 設(shè)置允許core線程timeout或者線程數(shù)量大于coresize,則允許線程超時(shí)

timed = allowCoreThreadTimeOut || wc > corePoolSize;

// 如果線程數(shù)量 <= 最大線程數(shù) 且 沒有超時(shí)和允許超時(shí) 則跳出死循環(huán)

if (wc <= maximumPoolSize && ! (timedOut && timed))

break;

if (compareAndDecrementWorkerCount(c))

return null;

c = ctl.get(); // Re-read ctl

if (runStateOf(c) != rs)

continue retry;

// else CAS failed due to workerCount change; retry inner loop

}

try {

// 這里是關(guān)鍵,如果允許超時(shí)則調(diào)用poll從queue中取出task,否則就調(diào)用take可阻塞的獲取task

Runnable r = timed ?

workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

workQueue.take();

if (r != null) // 獲取到task則返回,然后runWorker的while循環(huán)就繼續(xù)執(zhí)行,并調(diào)用task的run方法

return r;

timedOut = true; // 否則設(shè)置為timeOut,繼續(xù)循環(huán),但是下次循環(huán)會(huì)走到if (compareAndDecrementWorkerCount(c)) 處,并返回null。

} catch (InterruptedException retry) {

timedOut = false;

}

}

}

忽略掉具體細(xì)節(jié),getTask的整體思路是: 從blockqueue中拿去task,如果queue中沒有task則分兩種情況:

如果允許超時(shí)則調(diào)用poll(keepAliveTime, TimeUnit.NANOSECONDS),在規(guī)定時(shí)間沒有返回了則getTask返回null,runWorker結(jié)束while循環(huán),work線程結(jié)束。當(dāng)線程數(shù)量大于coresize且blockqueue滿的時(shí)候且小于maxsize的時(shí)候,新創(chuàng)建的線程便是走這個(gè)邏輯;或者允許core線程超時(shí)的時(shí)候也是走這個(gè)邏輯

如果不允許超時(shí),則會(huì)一直阻塞直到blockqueue中有了新的task。take方法阻塞則表示worker線程也阻塞,也就是在沒有task執(zhí)行的情況下,worker線程便會(huì)阻塞等待。core線程走的就是這個(gè)邏輯。

這個(gè)時(shí)候回頭再看下runWorker,如果task != null,那么就會(huì)執(zhí)行task的run方法,執(zhí)行完后task就會(huì)為被置為null,再次進(jìn)入while循環(huán)執(zhí)行g(shù)etTask阻塞在這里了。通過這種方式保留住了線程。如果while循環(huán)結(jié)束了,那么worker線程也就結(jié)束了。

2.8 再看addWorker

分析到這里我們?cè)賮砜聪耡ddWoker。addWorker可以將第一個(gè)參數(shù)設(shè)置為null。例如ThreadPoolExecutor#prestartAllCoreThreads:

public int prestartAllCoreThreads() {

int n = 0;

while (addWorker(null, true)) // addWorker第一個(gè)參數(shù)是null

++n;

return n;

}

經(jīng)過前面的分析,我們知道addWoker用來啟動(dòng)一個(gè)worker線程,worker線程調(diào)用runWorker來執(zhí)行,而runWorker中有個(gè)while循環(huán),判斷條件是(task != null || (task = getTask()) != null)。因?yàn)槲覀儌魅氲膖ask為null,所以就會(huì)判斷task = getTask()) != null,而getTask就是去blockqueue中拿去數(shù)據(jù),如果沒有任務(wù)就會(huì)阻塞住。這個(gè)時(shí)候就是一個(gè)阻塞的線程在等待task的到來了。所以傳入?yún)?shù)為null表示創(chuàng)建一個(gè)空的線程,什么都不執(zhí)行。

2.9 再看execute

已經(jīng)知道了線程池內(nèi)部的大概工作情況,我們?cè)賮砜聪氯绻衏ore線程都創(chuàng)建好了且處于空置狀態(tài),這個(gè)時(shí)候新放入一個(gè)線程的執(zhí)行流程。

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

int c = ctl.get();

if (workerCountOf(c) < corePoolSize) { // core線程都創(chuàng)建好了,所以判斷條件不滿足

if (addWorker(command, true))

return;

c = ctl.get();

}

// 會(huì)走到這里,會(huì)通過offer往blockingqueue里放置一個(gè)task。這個(gè)時(shí)候阻塞的core線程會(huì)通過blockingqueue的take拿到task執(zhí)行,類似一個(gè)生產(chǎn)者消費(fèi)者的情況

if (isRunning(c) && workQueue.offer(command)) {

int recheck = ctl.get();

if (! isRunning(recheck) && remove(command))

reject(command);

else if (workerCountOf(recheck) == 0)

addWorker(null, false);

}

// 如果blockingqueue添加失敗,則創(chuàng)建線程直到maxsize

else if (!addWorker(command, false))

reject(command);

}

可見,線程和execute通過blockingqueue來通信,而不是其他方式,execute往blockingqueue中放置task,線程通過take來獲取。整體線程池的邏輯如下圖

2.10 shutdown

這個(gè)時(shí)候我們終于可以來看看shutdown和shutdownNow了

看下shutdown

public void shutdown() {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

checkShutdownAccess();

advanceRunState(SHUTDOWN); // 重點(diǎn),將線程狀態(tài)置為shutdown,這樣getTask等workqueue為空后就返回null了

interruptIdleWorkers(); // 重點(diǎn)

onShutdown(); // 什么都沒做

} finally {

mainLock.unlock();

}

tryTerminate();

}

private void interruptIdleWorkers() {

interruptIdleWorkers(false);

}

private void interruptIdleWorkers(boolean onlyOne) {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

for (Worker w : workers) {

Thread t = w.thread;

// 線程沒有中斷 且 獲取到worker的鎖

if (!t.isInterrupted() && w.tryLock()) {

try {

t.interrupt(); // 調(diào)用interrup,中斷線程

} catch (SecurityException ignore) {

} finally {

w.unlock();

}

}

if (onlyOne)

break;

}

} finally {

mainLock.unlock();

}

}

shutdown的核心方法在interruptIdleWorkers里,這里可以看到在t.interrupt的時(shí)候有個(gè)判斷添加,一個(gè)是線程沒有設(shè)置中斷標(biāo)記,第二個(gè)是獲取到worker的鎖,我們注意下第二個(gè)條件?;仡^看下runWorker,while中執(zhí)行task的run方法的時(shí)候,會(huì)先獲取到worker線程的鎖,所以如果線程正在執(zhí)行task的run方法,則shutdown的時(shí)候會(huì)獲取鎖失敗,也就不會(huì)中斷線程了。這里可以得出結(jié)論:shutdown不會(huì)中斷正在執(zhí)行的線程。

如果blockingqueu中有task還沒執(zhí)行完呢? 這個(gè)時(shí)候while中的take并不會(huì)阻塞,也不會(huì)被中斷,shutdown中也沒有清空blockingqueue的操作。所以可以得出結(jié)論:shutdown會(huì)等blockingqueue中的task執(zhí)行完成再關(guān)閉??梢哉fshutdown是一種比較溫柔的關(guān)閉方式了。

如果core線程都阻塞在take方法上了,即沒有正在執(zhí)行的task了,那么這個(gè)時(shí)候 t.interrupt則會(huì)中斷take方法,worker線程的while循環(huán)結(jié)束,worker線程結(jié)束。當(dāng)所有的worker線程都結(jié)束后線程池就關(guān)閉了

總結(jié)下就是: shutdown會(huì)把它被調(diào)用前放到線程池中的task全部執(zhí)行完。

2.11 shutdownNow

再來看下shutdownNow

public List shutdownNow() {

List tasks;

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

checkShutdownAccess();

advanceRunState(STOP); // 重點(diǎn),將線程狀態(tài)置為stop

interruptWorkers(); // 重點(diǎn)

tasks = drainQueue(); // 重點(diǎn)

} finally {

mainLock.unlock();

}

tryTerminate();

return tasks;

}

private void interruptWorkers() {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

for (Worker w : workers)

w.interruptIfStarted();

} finally {

mainLock.unlock();

}

}

void interruptIfStarted() {

Thread t;

if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { // 沒有去獲取woker的鎖

try {

t.interrupt();

} catch (SecurityException ignore) {

}

}

}

private List drainQueue() {

BlockingQueue q = workQueue;

List taskList = new ArrayList();

q.drainTo(taskList); // 將blockingqueue中的task清空

if (!q.isEmpty()) {

for (Runnable r : q.toArray(new Runnable[0])) {

if (q.remove(r))

taskList.add(r);

}

}

return taskList;

}

從上面的代碼可以看出:

shutdownNow不會(huì)去獲取worker的鎖,所以shutdownNow會(huì)導(dǎo)致正在運(yùn)行的task也被中斷

shutdownNow會(huì)將blockingqueue中的task清空,所以在blockingqueue中的task也不會(huì)被執(zhí)行

總結(jié)就是shutdownNow比較粗暴,調(diào)用他后,他會(huì)將所有之前提交的任務(wù)都interrupt,且將blockingqueue中的task清空

另外就是不論是shutdown還是shutdownNow都是調(diào)用Thread的interrupt()方法。如果task不響應(yīng)中斷或者忽略中斷標(biāo)記,那么這個(gè)線程就不會(huì)被終止。例如在run中執(zhí)行以下邏輯

poolExecutor.execute(new Runnable() {

@Override

public void run() {

while (true) {

System.out.println("b");

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

System.out.printf("不處理"); // 忽略中斷

}

}

}

});

運(yùn)行結(jié)果是,即使調(diào)用了shutdownNow也終止不了線程運(yùn)行

b

0

不處理b

b

b

b

b

....

3 總結(jié)

線程通過while循環(huán)不停的從blockingqueue中獲取task來保留線程,避免重復(fù)重建線程

4 參考

總結(jié)

以上是生活随笔為你收集整理的java 线程池 源码_java线程池源码分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。