[转载] Java线程池框架源码分析
轉載自http://www.linuxidc.com/Linux/2014-11/108791.htm
相關類Executor,Executors,AbstractExecutorService,ExecutorService
Executor:整個線程池執行者框架的頂層接口。定義了一個execute方法,整個線程執行者框架的核心方法。
public interface Executor {
? ? void execute(Runnable command);
}
ExecutorService:這是一個接口它繼承自Executor,定義了shutdown,shutdownNow,awaitTermination,submit,invokeAll等方法。
AbstractExecutorService:實現了ExecutorService接口中的submit,invokeAll的方法。
? public Future<?> submit(Runnable task) {
? ? ? ? if (task == null) throw new NullPointerException();
? ? ? ? RunnableFuture<Void> ftask = newTaskFor(task, null);
? ? ? ? execute(ftask);
? ? ? ? return ftask;
? ? }
? ? public <T> Future<T> submit(Runnable task, T result) {
? ? ? ? if (task == null) throw new NullPointerException();
? ? ? ? RunnableFuture<T> ftask = newTaskFor(task, result);
? ? ? ? execute(ftask);
? ? ? ? return ftask;
? ? }
? ? public <T> Future<T> submit(Callable<T> task) {
? ? ? ? if (task == null) throw new NullPointerException();
? ? ? ? RunnableFuture<T> ftask = newTaskFor(task);
? ? ? ? execute(ftask);
? ? ? ? return ftask;
? ? }
在這里,所有submit方法提交的任務最終還是調用了execute方法,execute是接口Executor中定義的方法,AbstractExecutorService沒有實現它,
需要子類去實現這個方法,ThreadPoolExecutor繼承了AbstractExecutorService,它實現了execute方法。ScheduledThreadPoolExecutor繼承自
ThreadPoolExecutor,并覆蓋了ThreadPoolExecutor的execute方法。這個方法是線程執行框者架的核心邏輯,不同的線程池執行者有不同的實現邏輯。
AbstractExecutorService的功能較為簡單,實現了不同參數的submit,invokeAll方法。
ThreadPoolExecutor線程池執行者:它有一個核心的成員變量:
private final HashSet<Worker> workers = new HashSet<Worker>();
workers可以看做是ThreadPoolExecutor中用于運行任務的線程池。
worker是一個封裝了一個Thread對象并實現了Runnable接口的類。封裝Thread很容易理解,因為它要利用Thread去運行execute方法提交過來的runnable任務,但是為什么會繼承runnable接口呢?
下面是剔除了部分代碼的Worker源碼:
? private final class Worker
? ? ? ? extends AbstractQueuedSynchronizer
? ? ? ? implements Runnable
? ? {
? ? ? final Thread thread;
? ? ? ??
? ? ? ? Runnable firstTask;
? ? ? ? Worker(Runnable firstTask) {
? ? ? ? ? ? setState(-1);?
? ? ? ? ? ? this.firstTask = firstTask;
? ? ? ? ? ? this.thread = getThreadFactory().newThread(this);
? ? ? ? }
? ? ? ? public void run() {
? ? ? ? ? ? runWorker(this);
? ? ? ? }
? ? }
Worker是ThreadPoolExecutor的一個內部類,Worker本身實現了Runnable接口,并封裝了一個Thread對象,最后在構造方法中獲取了一個Runnable對象,這個對象就是ThreadPoolExecutor通過execute提交過來的目標任務。
跟蹤runWorker(this)方法:
?final void runWorker(Worker w) {
? ? ? ? Thread wt = Thread.currentThread();
? ? ? ? Runnable task = w.firstTask;
? ? ? ? w.firstTask = null;
? ? ? ? w.unlock();?
? ? ? ? boolean completedAbruptly = true;
? ? ? ? try {
? ? ? ? ? ? while (task != null || (task = getTask()) != null) {
? ? ? ? ? ? ? ? w.lock();
? ? ? ? ? ? ? ? if ((runStateAtLeast(ctl.get(), STOP) ||
? ? ? ? ? ? ? ? ? ? (Thread.interrupted() &&
? ? ? ? ? ? ? ? ? ? ? runStateAtLeast(ctl.get(), STOP))) &&
? ? ? ? ? ? ? ? ? ? !wt.isInterrupted())
? ? ? ? ? ? ? ? ? ? wt.interrupt();
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? beforeExecute(wt, task);
? ? ? ? ? ? ? ? ? ? Throwable thrown = null;
? ? ? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? ? ? task.run();//在這里直接調用了目標任務的run方法,并沒有將它傳給Thread對象。
? ? ? ? ? ? ? ? ? ? } catch (RuntimeException x) {
? ? ? ? ? ? ? ? ? ? ? ? thrown = x; throw x;
? ? ? ? ? ? ? ? ? ? } catch (Error x) {
? ? ? ? ? ? ? ? ? ? ? ? thrown = x; throw x;
? ? ? ? ? ? ? ? ? ? } catch (Throwable x) {
? ? ? ? ? ? ? ? ? ? ? ? thrown = x; throw new Error(x);
? ? ? ? ? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? ? ? ? ? afterExecute(task, thrown);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? ? ? task = null;
? ? ? ? ? ? ? ? ? ? w.completedTasks++;
? ? ? ? ? ? ? ? ? ? w.unlock();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? completedAbruptly = false;
? ? ? ? } finally {
? ? ? ? ? ? processWorkerExit(w, completedAbruptly);
? ? ? ? }
? ? }
回過頭來在看看Worker的構造方法:
Worker(Runnable firstTask) {
? ? ? ? setState(-1);?
? ? ? ? this.firstTask = firstTask;
? ? ? ? this.thread = getThreadFactory().newThread(this);
? ? }
它將自己傳給了自己的成員變量thread。目標任務被執行的步驟可能就是:Worker的成員變量thread啟動后調用worker的run方法,worker的run方法中將自己傳給runWorker,runWorker在調用目標執行對象的run方法。
那么thread是何時被執行的呢?
下面看看ThreadPoolExecutor中的一個其他方法:
? private boolean addWorker(Runnable firstTask, boolean core) {
? ? ? ......
? ? ? ? try {
? ? ? ? ? ? final ReentrantLock mainLock = this.mainLock;
? ? ? ? ? ? w = new Worker(firstTask);
? ? ? ? ? ? final Thread t = w.thread;//這里初始化一個Worker對象w,在將w的成員變量thread付給t
? ? ? ? ? ? if (t != null) {
? ? ? ? ? ? ? ? mainLock.lock();
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? int c = ctl.get();
? ? ? ? ? ? ? ? ? ? int rs = runStateOf(c);
? ? ? ? ? ? ? ? ? ? if (rs < SHUTDOWN ||
? ? ? ? ? ? ? ? ? ? ? ? (rs == SHUTDOWN && firstTask == null)) {
? ? ? ? ? ? ? ? ? ? ? ? if (t.isAlive())?
? ? ? ? ? ? ? ? ? ? ? ? ? ? throw new IllegalThreadStateException();
? ? ? ? ? ? ? ? ? ? ? ? workers.add(w);
? ? ? ? ? ? ? ? ? ? ? ? int s = workers.size();
? ? ? ? ? ? ? ? ? ? ? ? if (s > largestPoolSize)
? ? ? ? ? ? ? ? ? ? ? ? ? ? largestPoolSize = s;
? ? ? ? ? ? ? ? ? ? ? ? workerAdded = true;
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? ? ? mainLock.unlock();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? if (workerAdded) {
? ? ? ? ? ? ? ? ? ? t.start();//在這里調用t的start方法。
? ? ? ? ? ? ? ? ? ? workerStarted = true;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? } finally {
? ? ? ? ? ? if (! workerStarted)
? ? ? ? ? ? ? ? addWorkerFailed(w);
? ? ? ? }
? ? ? ? return workerStarted;
? ? }
這里為什么會設計的這么繞,我想主要是Worker不僅封裝了一個thread,而且對目標任務進行了封裝,在運行封裝過后的目標任務前,addWorker可以做一些相關操作。
這里僅僅介紹了ThreadPoolExecutor的線程池,那么這個線程池是如何被維護的,下面介紹幾個關鍵的參數。
private volatile int corePoolSize;
? private volatile int maximumPoolSize;
? private final BlockingQueue<Runnable> workQueue;
這三個是ThreadPoolExecutor的成員變量,其中workQueue跟縣城池沒有關系。workQueue是一個線程安全的阻塞隊列。
corePoolSize是線程池的核心大小,maximumPoolSize是線程池的最大大小。
當提交新任務時,如果ThreadPoolExecutor中有線程在運行,并且線程的數量小于corePoolSize,那么就會有新的線程被創建。如果當前運行的線程數大于corePoolSize,就會放到緩存隊列workQueue中。如果緩沖隊列也滿了,就繼續創建線程,直到線程的數量達到maximumPoolSize
?public void execute(Runnable command) {
? ? ? ? if (command == null)
? ? ? ? ? ? throw new NullPointerException();
? ? ? ? //判斷如果當前運行的線程數小于 corePoolSize,添加新的線程(addWorker會添加一個新的線程,上面有介紹),方法直接返回。
? ? ? ? int c = ctl.get();
? ? ? ? if (workerCountOf(c) < corePoolSize) {
? ? ? ? ? ? if (addWorker(command, true))
? ? ? ? ? ? ? ? return;
? ? ? ? ? ? c = ctl.get();
? ? ? ? }
? ? ? ? //如果當前的運行的線程數大于或等于corePoolSize則新的任務會放到緩存隊列中。
? ? ? ? if (isRunning(c) && workQueue.offer(command)) {
? ? ? ? ? ? int recheck = ctl.get();
? ? ? ? ? ? if (! isRunning(recheck) && remove(command))
? ? ? ? ? ? ? ? reject(command);
? ? ? ? ? ? else if (workerCountOf(recheck) == 0)
? ? ? ? ? ? ? ? addWorker(null, false);
? ? ? ? }
? ? ? ? else if (!addWorker(command, false))
? ? ? ? //最后緩沖隊列添加失敗,則會繼續添加線程。如果添加新的線程失敗,則拒絕這個任務。
? ? ? ? ? ? reject(command);
? ? }
還有些其他的參數:
private volatile ThreadFactory threadFactory //線程的工廠函數。
?private volatile RejectedExecutionHandler handler;//任務拒絕的處理類。
?private volatile long keepAliveTime;//任務等待的是將。
ThreadPoolExecutor有幾個構造方法來初始化這些參數。Executors類將這些參數簡化了來獲得一個ExecutorService的引用。
public static ExecutorService newFixedThreadPool(int nThreads) {
? ? ? ? return new ThreadPoolExecutor(nThreads, nThreads,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 0L, TimeUnit.MILLISECONDS,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new LinkedBlockingQueue<Runnable>());
? ? }
? ? ? public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
? ? ? ? return new ThreadPoolExecutor(nThreads, nThreads,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 0L, TimeUnit.MILLISECONDS,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new LinkedBlockingQueue<Runnable>(),
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? threadFactory);
? ? }
? ? public static ExecutorService newCachedThreadPool() {
? ? ? ? return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 60L, TimeUnit.SECONDS,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new SynchronousQueue<Runnable>());
? ? }
? ? public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
? ? ? ? return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 60L, TimeUnit.SECONDS,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new SynchronousQueue<Runnable>(),
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? threadFactory);
? ? }
這四個方法中前兩個的核心線程數和最大線程數相同,所有可運行的線程數是固定的,<=nThreads。當任務數大于nThreads時,就是放入緩沖隊列中。? 后兩個方法中,線程數是無邊界的,核心線程數是0,最大線程數是整型的最大值,然后如果有線程60秒內沒有任務運行的話就銷毀。每次有新的任務來,都會創建新的線程或使用以前創建的線程(60秒內沒有任務運行的線程)。你可能有疑問,既然核心線程數是0,那么所有的任務不是都放到隊里里了嗎?那么現在就來看看SynchronousQueue這個隊里,可以看看這里的介紹?http://www.linuxidc.com/Linux/2014-11/108792.htm?。
回過頭來看看任務提交方法的源碼:
? public void execute(Runnable command) {
? ? ? ? if (command == null)
? ? ? ? ? ? throw new NullPointerException();
? ? ? ? int c = ctl.get();
? ? ? ? if (workerCountOf(c) < corePoolSize) {
? ? ? ? ? ? if (addWorker(command, true))
? ? ? ? ? ? ? ? return;
? ? ? ? ? ? c = ctl.get();
? ? ? ? }
? ? ? ? if (isRunning(c) && workQueue.offer(command)) {//這里是在往隊列里方任務,如果不成功就會添加Worker(封裝了線程對象)
? ? ? ? ? ? int recheck = ctl.get();
? ? ? ? ? ? if (! isRunning(recheck) && remove(command))
? ? ? ? ? ? ? ? reject(command);
? ? ? ? ? ? else if (workerCountOf(recheck) == 0)
? ? ? ? ? ? ? ? addWorker(null, false);
? ? ? ? }
? ? ? ? else if (!addWorker(command, false))
? ? ? ? ? ? reject(command);
? ? }
上面鏈接里提到:offer()往queue里放一個element后立即返回,如果碰巧這個element被另一個thread取走了,offer方法返回true,認為offer成功;否則返回false。
試想一下,第一次提交任務的時候,核心線程數為0,此時沒有線程所以沒有線程從workQueue中取東西,所以這里的workQueue.offer(command)會返回false,那么就會通過addWorker(command, false)創建一個新的線程。
轉載于:https://www.cnblogs.com/scott19820130/p/4730763.html
總結
以上是生活随笔為你收集整理的[转载] Java线程池框架源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 在MAC下搭建JSP开发环境
- 下一篇: java美元兑换,(Java实现) 美元