日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

线程池工作原理

發布時間:2024/9/30 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 线程池工作原理 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

線程池工作原理

線程池狀態的切換:

線程池關鍵類的uml圖:

線程池就是把任務提交和任務執行解耦。

首先看一下線程池的使用:

public static void main(String args[]) throws InterruptedException {ExecutorService es = Executors.newFixedThreadPool(10);//1,創建線程池es.submit(()->{System.out.println("執行任務");});//2,提交任務es.shutdown();//3,線程池關閉 }

跟進源碼:

1,創建線程池:

(可以看出來只是對線程池對象ThreadPoolExecutor屬性的賦值)

public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); }public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler); }public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler; }

幾個重要的參數:

基本大小,corePoolSize 基本的工作線程數。

最大大小 maximumPoolSize 當工作線程大于core,且workQueue也滿了的情況下(arrayQueue),可以創建的最大工作線程。

保持存活時間 keepAliveTime 阻塞隊列poll時的延時值,即如果線程在這個時間內仍然拿不到可以工作的任務,則殺死線程。

阻塞隊列 workQueue 工作大于等于coreSize時用來阻塞的阻塞隊列

線程工廠 threadFactory 生成新線程的工廠類

拒絕策略 rejectedExecutionHandler 線程池停止或者滿了的時候的拒絕策略類

2,提交任務

提交任務的關鍵思路

submit時如果狀態合適,會創建執行線程,去執行任務。
或者是運行狀態下,工作線程已經大于等于coreSize的線程,則會放入到阻塞隊列,等待有空閑下來的線程來執行。

public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);//創建一個 FutureTaskexecute(ftask);//提交任務return ftask;//返回future }public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();//提交一個任務,如果工作線程數小于coreSize,則直接加入一個worker。加入失敗則繼續if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}//二次檢查,如果線程池是運行狀態,則把command加入阻塞隊列中。if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();//如果不是運行狀態,從queue去除。if (! isRunning(recheck) && remove(command))reject(command);//如果是運行,且沒有工作線程,增加一個worker。else if (workerCountOf(recheck) == 0)addWorker(null, false);}//如果不能加入,則拒絕。(隊列已滿,嘗試再加入,如果加入失敗,則拒絕)else if (!addWorker(command, false))reject(command); }private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.//判斷是否狀態不為running 并且 (狀態不為SHUTDOWN || firstTask不為null || workQueue為空)//如果滿足,直接return。不做任何事情if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);//如果工作線程已經大于最大數量或者 大于core或max,則return。if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//cas增加工作線程數,增加成功跳出大循環if (compareAndIncrementWorkerCount(c))break retry;//增加失敗,繼續判斷狀態,繼續循環c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//創建新的Workerw = new Worker(firstTask);//線程工廠創建出一個有著傳入任務firstTask的線程final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());//如果線程池為running或者為shutdown并且firstTask為null(關閉前提交任務)if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {//如果t已經啟動,則拋出異常。if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();//workers增加線程wworkers.add(w);int s = workers.size();//if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}//如果加入到workers,則啟動。if (workerAdded) {t.start();workerStarted = true;}}} finally {//如果啟動失敗,沒有調用start方法時,則原子操作減去workCount,從workers移除。if (! workerStarted)addWorkerFailed(w);}return workerStarted; }

2.1 worker的執行

public void run() {runWorker(this);}final void runWorker(Worker w) {Thread wt = Thread.currentThread();//獲取worker里面的任務。Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {//如果task為null時則去workqueue里去取任務,直到取完,則關閉這個工作線程。while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interrupt//如果線程池正在關閉,確保線程被 interrupted,如果沒有,確保線程沒有被 interrupted,這個需要二次檢查防止調用shutdownNow 時正在interrupt。//判斷如果狀態已經是stop,或者 當前線程已經被打斷并且線程狀態已經是stop并且workers還沒有被關閉。 調用interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {//模板方法,前置調用beforeExecute(wt, task);Throwable thrown = null;try {//調用run方法,執行任務。task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {//模板方法,后置調用afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {//執行結束后操作processWorkerExit(w, completedAbruptly);} } //private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.//如果已經關閉了且任務阻塞隊列為空,則減去工作線程數。不返回任務。(因為沒任務)if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//如果工作線程數已經大于最大數目并且 (工作線程數大于1或者工作隊列為空),cas減去工作線程數,返回。if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {//判斷是否容許延時取出,如果allowCoreThreadTimeOut為真或者 工作線程數大于coreSize,則延時取出,否則一直等待取出。Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();//自旋等待r不為null的任務if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}} }private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();//如果執行失敗,則減去worker數目final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;//從workers中移除需要執行的任務wworkers.remove(w);} finally {mainLock.unlock();}如果執行完成時線程池正在關閉,工作線程數為0,則把線程池狀態置為終止terminatetryTerminate();int c = ctl.get();if (runStateLessThan(c, STOP)) {//正常執行完是false,則進入。if (!completedAbruptly) {//min為0或者coreSize。int min = allowCoreThreadTimeOut ? 0 : corePoolSize;//如果min為0且阻塞隊列不為空。則min置為1。if (min == 0 && ! workQueue.isEmpty())min = 1;如果工作線程數大于min,則不需要再增加工作線程,如果比min還小,則會走到下面的addWorker。if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false);} }

3,線程池關閉

shutdown的操作

1判斷shutdown權限

2修改線程池狀態,不再接受新任務

3interrupt所有線程

4修改狀態到terminal

public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//檢查當前線程是否有能修改所有工作線程的權限checkShutdownAccess();//自旋 + cas 去修改線程池的狀態。advanceRunState(SHUTDOWN);//嘗試interrupt所有工作線程t.interrupt() interruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}tryTerminate(); }final void tryTerminate() {//自旋,如果失敗繼續執行。for (;;) {int c = ctl.get();//如果是running或者已經是tidying或者terminate,則不用做操作。if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;//如果工作線程不為0,則嘗試interrupt后返回。if (workerCountOf(c) != 0) { // Eligible to terminateinterruptIdleWorkers(ONLY_ONE);return;}final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//將狀態置為tidying然后調用模板方法 terminated,之后將狀態置為terminated。if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {terminated();} finally {ctl.set(ctlOf(TERMINATED, 0));termination.signalAll();}return;}} finally {mainLock.unlock();}// else retry on failed CAS} }

總結

以上是生活随笔為你收集整理的线程池工作原理的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。