线程池工作原理
線程池工作原理
線程池狀態的切換:
線程池關鍵類的uml圖:
線程池就是把任務提交和任務執行解耦。
首先看一下線程池的使用:
public static void main(String args[]) throws InterruptedException {ExecutorService es = Executors.newFixedThreadPool(10);//1,創建線程池es.submit(()->{System.out.println("執行任務");});//2,提交任務es.shutdown();//3,線程池關閉 }跟進源碼:
1,創建線程池:
(可以看出來只是對線程池對象ThreadPoolExecutor屬性的賦值)
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); }public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler); }public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler; }幾個重要的參數:
基本大小,corePoolSize 基本的工作線程數。
最大大小 maximumPoolSize 當工作線程大于core,且workQueue也滿了的情況下(arrayQueue),可以創建的最大工作線程。
保持存活時間 keepAliveTime 阻塞隊列poll時的延時值,即如果線程在這個時間內仍然拿不到可以工作的任務,則殺死線程。
阻塞隊列 workQueue 工作大于等于coreSize時用來阻塞的阻塞隊列
線程工廠 threadFactory 生成新線程的工廠類
拒絕策略 rejectedExecutionHandler 線程池停止或者滿了的時候的拒絕策略類
2,提交任務
提交任務的關鍵思路:
submit時如果狀態合適,會創建執行線程,去執行任務。
或者是運行狀態下,工作線程已經大于等于coreSize的線程,則會放入到阻塞隊列,等待有空閑下來的線程來執行。
2.1 worker的執行
public void run() {runWorker(this);}final void runWorker(Worker w) {Thread wt = Thread.currentThread();//獲取worker里面的任務。Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {//如果task為null時則去workqueue里去取任務,直到取完,則關閉這個工作線程。while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interrupt//如果線程池正在關閉,確保線程被 interrupted,如果沒有,確保線程沒有被 interrupted,這個需要二次檢查防止調用shutdownNow 時正在interrupt。//判斷如果狀態已經是stop,或者 當前線程已經被打斷并且線程狀態已經是stop并且workers還沒有被關閉。 調用interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {//模板方法,前置調用beforeExecute(wt, task);Throwable thrown = null;try {//調用run方法,執行任務。task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {//模板方法,后置調用afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {//執行結束后操作processWorkerExit(w, completedAbruptly);} } //private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.//如果已經關閉了且任務阻塞隊列為空,則減去工作線程數。不返回任務。(因為沒任務)if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//如果工作線程數已經大于最大數目并且 (工作線程數大于1或者工作隊列為空),cas減去工作線程數,返回。if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {//判斷是否容許延時取出,如果allowCoreThreadTimeOut為真或者 工作線程數大于coreSize,則延時取出,否則一直等待取出。Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();//自旋等待r不為null的任務if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}} }private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();//如果執行失敗,則減去worker數目final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;//從workers中移除需要執行的任務wworkers.remove(w);} finally {mainLock.unlock();}如果執行完成時線程池正在關閉,工作線程數為0,則把線程池狀態置為終止terminatetryTerminate();int c = ctl.get();if (runStateLessThan(c, STOP)) {//正常執行完是false,則進入。if (!completedAbruptly) {//min為0或者coreSize。int min = allowCoreThreadTimeOut ? 0 : corePoolSize;//如果min為0且阻塞隊列不為空。則min置為1。if (min == 0 && ! workQueue.isEmpty())min = 1;如果工作線程數大于min,則不需要再增加工作線程,如果比min還小,則會走到下面的addWorker。if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false);} }3,線程池關閉
shutdown的操作
1判斷shutdown權限
2修改線程池狀態,不再接受新任務
3interrupt所有線程
4修改狀態到terminal
public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//檢查當前線程是否有能修改所有工作線程的權限checkShutdownAccess();//自旋 + cas 去修改線程池的狀態。advanceRunState(SHUTDOWN);//嘗試interrupt所有工作線程t.interrupt() interruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}tryTerminate(); }final void tryTerminate() {//自旋,如果失敗繼續執行。for (;;) {int c = ctl.get();//如果是running或者已經是tidying或者terminate,則不用做操作。if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;//如果工作線程不為0,則嘗試interrupt后返回。if (workerCountOf(c) != 0) { // Eligible to terminateinterruptIdleWorkers(ONLY_ONE);return;}final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//將狀態置為tidying然后調用模板方法 terminated,之后將狀態置為terminated。if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {terminated();} finally {ctl.set(ctlOf(TERMINATED, 0));termination.signalAll();}return;}} finally {mainLock.unlock();}// else retry on failed CAS} }總結
- 上一篇: Redis 常用操作命令
- 下一篇: 深入浅出理解锁之—— AbstractQ