日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

[转载] Java线程池框架源码分析

發布時間:2024/6/21 java 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 [转载] Java线程池框架源码分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

轉載自http://www.linuxidc.com/Linux/2014-11/108791.htm

相關類Executor,Executors,AbstractExecutorService,ExecutorService

Executor:整個線程池執行者框架的頂層接口。定義了一個execute方法,整個線程執行者框架的核心方法。

public interface Executor {

? ? void execute(Runnable command);
}

ExecutorService:這是一個接口它繼承自Executor,定義了shutdown,shutdownNow,awaitTermination,submit,invokeAll等方法。

AbstractExecutorService:實現了ExecutorService接口中的submit,invokeAll的方法。

? public Future<?> submit(Runnable task) {
? ? ? ? if (task == null) throw new NullPointerException();
? ? ? ? RunnableFuture<Void> ftask = newTaskFor(task, null);
? ? ? ? execute(ftask);
? ? ? ? return ftask;
? ? }

? ? public <T> Future<T> submit(Runnable task, T result) {
? ? ? ? if (task == null) throw new NullPointerException();
? ? ? ? RunnableFuture<T> ftask = newTaskFor(task, result);
? ? ? ? execute(ftask);
? ? ? ? return ftask;
? ? }


? ? public <T> Future<T> submit(Callable<T> task) {
? ? ? ? if (task == null) throw new NullPointerException();
? ? ? ? RunnableFuture<T> ftask = newTaskFor(task);
? ? ? ? execute(ftask);
? ? ? ? return ftask;
? ? }

在這里,所有submit方法提交的任務最終還是調用了execute方法,execute是接口Executor中定義的方法,AbstractExecutorService沒有實現它,

需要子類去實現這個方法,ThreadPoolExecutor繼承了AbstractExecutorService,它實現了execute方法。ScheduledThreadPoolExecutor繼承自

ThreadPoolExecutor,并覆蓋了ThreadPoolExecutor的execute方法。這個方法是線程執行框者架的核心邏輯,不同的線程池執行者有不同的實現邏輯。

AbstractExecutorService的功能較為簡單,實現了不同參數的submit,invokeAll方法。

ThreadPoolExecutor線程池執行者:它有一個核心的成員變量:

private final HashSet<Worker> workers = new HashSet<Worker>();

workers可以看做是ThreadPoolExecutor中用于運行任務的線程池。

worker是一個封裝了一個Thread對象并實現了Runnable接口的類。封裝Thread很容易理解,因為它要利用Thread去運行execute方法提交過來的runnable任務,但是為什么會繼承runnable接口呢?

下面是剔除了部分代碼的Worker源碼:

? private final class Worker
? ? ? ? extends AbstractQueuedSynchronizer
? ? ? ? implements Runnable
? ? {
? ? ? final Thread thread;
? ? ? ??
? ? ? ? Runnable firstTask;

? ? ? ? Worker(Runnable firstTask) {
? ? ? ? ? ? setState(-1);?
? ? ? ? ? ? this.firstTask = firstTask;
? ? ? ? ? ? this.thread = getThreadFactory().newThread(this);
? ? ? ? }

? ? ? ? public void run() {
? ? ? ? ? ? runWorker(this);
? ? ? ? }


? ? }

Worker是ThreadPoolExecutor的一個內部類,Worker本身實現了Runnable接口,并封裝了一個Thread對象,最后在構造方法中獲取了一個Runnable對象,這個對象就是ThreadPoolExecutor通過execute提交過來的目標任務。

跟蹤runWorker(this)方法:

?final void runWorker(Worker w) {
? ? ? ? Thread wt = Thread.currentThread();
? ? ? ? Runnable task = w.firstTask;
? ? ? ? w.firstTask = null;
? ? ? ? w.unlock();?
? ? ? ? boolean completedAbruptly = true;
? ? ? ? try {
? ? ? ? ? ? while (task != null || (task = getTask()) != null) {
? ? ? ? ? ? ? ? w.lock();

? ? ? ? ? ? ? ? if ((runStateAtLeast(ctl.get(), STOP) ||
? ? ? ? ? ? ? ? ? ? (Thread.interrupted() &&
? ? ? ? ? ? ? ? ? ? ? runStateAtLeast(ctl.get(), STOP))) &&
? ? ? ? ? ? ? ? ? ? !wt.isInterrupted())
? ? ? ? ? ? ? ? ? ? wt.interrupt();
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? beforeExecute(wt, task);
? ? ? ? ? ? ? ? ? ? Throwable thrown = null;
? ? ? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? ? ? task.run();//在這里直接調用了目標任務的run方法,并沒有將它傳給Thread對象。
? ? ? ? ? ? ? ? ? ? } catch (RuntimeException x) {
? ? ? ? ? ? ? ? ? ? ? ? thrown = x; throw x;
? ? ? ? ? ? ? ? ? ? } catch (Error x) {
? ? ? ? ? ? ? ? ? ? ? ? thrown = x; throw x;
? ? ? ? ? ? ? ? ? ? } catch (Throwable x) {
? ? ? ? ? ? ? ? ? ? ? ? thrown = x; throw new Error(x);
? ? ? ? ? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? ? ? ? ? afterExecute(task, thrown);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? ? ? task = null;
? ? ? ? ? ? ? ? ? ? w.completedTasks++;
? ? ? ? ? ? ? ? ? ? w.unlock();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? completedAbruptly = false;
? ? ? ? } finally {
? ? ? ? ? ? processWorkerExit(w, completedAbruptly);
? ? ? ? }
? ? }

回過頭來在看看Worker的構造方法:

Worker(Runnable firstTask) {
? ? ? ? setState(-1);?
? ? ? ? this.firstTask = firstTask;
? ? ? ? this.thread = getThreadFactory().newThread(this);
? ? }

它將自己傳給了自己的成員變量thread。目標任務被執行的步驟可能就是:Worker的成員變量thread啟動后調用worker的run方法,worker的run方法中將自己傳給runWorker,runWorker在調用目標執行對象的run方法。

那么thread是何時被執行的呢?

下面看看ThreadPoolExecutor中的一個其他方法:

? private boolean addWorker(Runnable firstTask, boolean core) {
? ? ? ......
? ? ? ? try {
? ? ? ? ? ? final ReentrantLock mainLock = this.mainLock;
? ? ? ? ? ? w = new Worker(firstTask);
? ? ? ? ? ? final Thread t = w.thread;//這里初始化一個Worker對象w,在將w的成員變量thread付給t
? ? ? ? ? ? if (t != null) {
? ? ? ? ? ? ? ? mainLock.lock();
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? int c = ctl.get();
? ? ? ? ? ? ? ? ? ? int rs = runStateOf(c);

? ? ? ? ? ? ? ? ? ? if (rs < SHUTDOWN ||
? ? ? ? ? ? ? ? ? ? ? ? (rs == SHUTDOWN && firstTask == null)) {
? ? ? ? ? ? ? ? ? ? ? ? if (t.isAlive())?
? ? ? ? ? ? ? ? ? ? ? ? ? ? throw new IllegalThreadStateException();
? ? ? ? ? ? ? ? ? ? ? ? workers.add(w);
? ? ? ? ? ? ? ? ? ? ? ? int s = workers.size();
? ? ? ? ? ? ? ? ? ? ? ? if (s > largestPoolSize)
? ? ? ? ? ? ? ? ? ? ? ? ? ? largestPoolSize = s;
? ? ? ? ? ? ? ? ? ? ? ? workerAdded = true;
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? ? ? mainLock.unlock();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? if (workerAdded) {
? ? ? ? ? ? ? ? ? ? t.start();//在這里調用t的start方法。
? ? ? ? ? ? ? ? ? ? workerStarted = true;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? } finally {
? ? ? ? ? ? if (! workerStarted)
? ? ? ? ? ? ? ? addWorkerFailed(w);
? ? ? ? }
? ? ? ? return workerStarted;
? ? }

這里為什么會設計的這么繞,我想主要是Worker不僅封裝了一個thread,而且對目標任務進行了封裝,在運行封裝過后的目標任務前,addWorker可以做一些相關操作。

這里僅僅介紹了ThreadPoolExecutor的線程池,那么這個線程池是如何被維護的,下面介紹幾個關鍵的參數。

private volatile int corePoolSize;
? private volatile int maximumPoolSize;
? private final BlockingQueue<Runnable> workQueue;

這三個是ThreadPoolExecutor的成員變量,其中workQueue跟縣城池沒有關系。workQueue是一個線程安全的阻塞隊列。

corePoolSize是線程池的核心大小,maximumPoolSize是線程池的最大大小。

當提交新任務時,如果ThreadPoolExecutor中有線程在運行,并且線程的數量小于corePoolSize,那么就會有新的線程被創建。如果當前運行的線程數大于corePoolSize,就會放到緩存隊列workQueue中。如果緩沖隊列也滿了,就繼續創建線程,直到線程的數量達到maximumPoolSize

?public void execute(Runnable command) {
? ? ? ? if (command == null)
? ? ? ? ? ? throw new NullPointerException();

? ? ? ? //判斷如果當前運行的線程數小于 corePoolSize,添加新的線程(addWorker會添加一個新的線程,上面有介紹),方法直接返回。
? ? ? ? int c = ctl.get();
? ? ? ? if (workerCountOf(c) < corePoolSize) {
? ? ? ? ? ? if (addWorker(command, true))
? ? ? ? ? ? ? ? return;
? ? ? ? ? ? c = ctl.get();
? ? ? ? }


? ? ? ? //如果當前的運行的線程數大于或等于corePoolSize則新的任務會放到緩存隊列中。
? ? ? ? if (isRunning(c) && workQueue.offer(command)) {
? ? ? ? ? ? int recheck = ctl.get();
? ? ? ? ? ? if (! isRunning(recheck) && remove(command))
? ? ? ? ? ? ? ? reject(command);
? ? ? ? ? ? else if (workerCountOf(recheck) == 0)
? ? ? ? ? ? ? ? addWorker(null, false);
? ? ? ? }
? ? ? ? else if (!addWorker(command, false))
? ? ? ? //最后緩沖隊列添加失敗,則會繼續添加線程。如果添加新的線程失敗,則拒絕這個任務。
? ? ? ? ? ? reject(command);

? ? }

還有些其他的參數:

private volatile ThreadFactory threadFactory //線程的工廠函數。
?private volatile RejectedExecutionHandler handler;//任務拒絕的處理類。
?private volatile long keepAliveTime;//任務等待的是將。

ThreadPoolExecutor有幾個構造方法來初始化這些參數。Executors類將這些參數簡化了來獲得一個ExecutorService的引用。

public static ExecutorService newFixedThreadPool(int nThreads) {
? ? ? ? return new ThreadPoolExecutor(nThreads, nThreads,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 0L, TimeUnit.MILLISECONDS,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new LinkedBlockingQueue<Runnable>());
? ? }

? ? ? public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
? ? ? ? return new ThreadPoolExecutor(nThreads, nThreads,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 0L, TimeUnit.MILLISECONDS,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new LinkedBlockingQueue<Runnable>(),
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? threadFactory);
? ? }

? ? public static ExecutorService newCachedThreadPool() {
? ? ? ? return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 60L, TimeUnit.SECONDS,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new SynchronousQueue<Runnable>());
? ? }

? ? public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
? ? ? ? return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 60L, TimeUnit.SECONDS,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new SynchronousQueue<Runnable>(),
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? threadFactory);
? ? }

這四個方法中前兩個的核心線程數和最大線程數相同,所有可運行的線程數是固定的,<=nThreads。當任務數大于nThreads時,就是放入緩沖隊列中。? 后兩個方法中,線程數是無邊界的,核心線程數是0,最大線程數是整型的最大值,然后如果有線程60秒內沒有任務運行的話就銷毀。每次有新的任務來,都會創建新的線程或使用以前創建的線程(60秒內沒有任務運行的線程)。你可能有疑問,既然核心線程數是0,那么所有的任務不是都放到隊里里了嗎?那么現在就來看看SynchronousQueue這個隊里,可以看看這里的介紹?http://www.linuxidc.com/Linux/2014-11/108792.htm?。

回過頭來看看任務提交方法的源碼:

? public void execute(Runnable command) {
? ? ? ? if (command == null)
? ? ? ? ? ? throw new NullPointerException();
? ? ? ? int c = ctl.get();
? ? ? ? if (workerCountOf(c) < corePoolSize) {
? ? ? ? ? ? if (addWorker(command, true))
? ? ? ? ? ? ? ? return;
? ? ? ? ? ? c = ctl.get();
? ? ? ? }
? ? ? ? if (isRunning(c) && workQueue.offer(command)) {//這里是在往隊列里方任務,如果不成功就會添加Worker(封裝了線程對象)
? ? ? ? ? ? int recheck = ctl.get();
? ? ? ? ? ? if (! isRunning(recheck) && remove(command))
? ? ? ? ? ? ? ? reject(command);
? ? ? ? ? ? else if (workerCountOf(recheck) == 0)
? ? ? ? ? ? ? ? addWorker(null, false);
? ? ? ? }
? ? ? ? else if (!addWorker(command, false))
? ? ? ? ? ? reject(command);
? ? }

上面鏈接里提到:offer()往queue里放一個element后立即返回,如果碰巧這個element被另一個thread取走了,offer方法返回true,認為offer成功;否則返回false。

試想一下,第一次提交任務的時候,核心線程數為0,此時沒有線程所以沒有線程從workQueue中取東西,所以這里的workQueue.offer(command)會返回false,那么就會通過addWorker(command, false)創建一個新的線程。

轉載于:https://www.cnblogs.com/scott19820130/p/4730763.html

總結

以上是生活随笔為你收集整理的[转载] Java线程池框架源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。