线程池工作原理
線程池工作原理
線程池狀態(tài)的切換:
線程池關(guān)鍵類的uml圖:
線程池就是把任務(wù)提交和任務(wù)執(zhí)行解耦。
首先看一下線程池的使用:
public static void main(String args[]) throws InterruptedException {ExecutorService es = Executors.newFixedThreadPool(10);//1,創(chuàng)建線程池es.submit(()->{System.out.println("執(zhí)行任務(wù)");});//2,提交任務(wù)es.shutdown();//3,線程池關(guān)閉 }跟進(jìn)源碼:
1,創(chuàng)建線程池:
(可以看出來只是對線程池對象ThreadPoolExecutor屬性的賦值)
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); }public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler); }public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler; }幾個重要的參數(shù):
基本大小,corePoolSize 基本的工作線程數(shù)。
最大大小 maximumPoolSize 當(dāng)工作線程大于core,且workQueue也滿了的情況下(arrayQueue),可以創(chuàng)建的最大工作線程。
保持存活時間 keepAliveTime 阻塞隊列poll時的延時值,即如果線程在這個時間內(nèi)仍然拿不到可以工作的任務(wù),則殺死線程。
阻塞隊列 workQueue 工作大于等于coreSize時用來阻塞的阻塞隊列
線程工廠 threadFactory 生成新線程的工廠類
拒絕策略 rejectedExecutionHandler 線程池停止或者滿了的時候的拒絕策略類
2,提交任務(wù)
提交任務(wù)的關(guān)鍵思路:
submit時如果狀態(tài)合適,會創(chuàng)建執(zhí)行線程,去執(zhí)行任務(wù)。
或者是運行狀態(tài)下,工作線程已經(jīng)大于等于coreSize的線程,則會放入到阻塞隊列,等待有空閑下來的線程來執(zhí)行。
2.1 worker的執(zhí)行
public void run() {runWorker(this);}final void runWorker(Worker w) {Thread wt = Thread.currentThread();//獲取worker里面的任務(wù)。Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {//如果task為null時則去workqueue里去取任務(wù),直到取完,則關(guān)閉這個工作線程。while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interrupt//如果線程池正在關(guān)閉,確保線程被 interrupted,如果沒有,確保線程沒有被 interrupted,這個需要二次檢查防止調(diào)用shutdownNow 時正在interrupt。//判斷如果狀態(tài)已經(jīng)是stop,或者 當(dāng)前線程已經(jīng)被打斷并且線程狀態(tài)已經(jīng)是stop并且workers還沒有被關(guān)閉。 調(diào)用interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {//模板方法,前置調(diào)用beforeExecute(wt, task);Throwable thrown = null;try {//調(diào)用run方法,執(zhí)行任務(wù)。task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {//模板方法,后置調(diào)用afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {//執(zhí)行結(jié)束后操作processWorkerExit(w, completedAbruptly);} } //private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.//如果已經(jīng)關(guān)閉了且任務(wù)阻塞隊列為空,則減去工作線程數(shù)。不返回任務(wù)。(因為沒任務(wù))if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//如果工作線程數(shù)已經(jīng)大于最大數(shù)目并且 (工作線程數(shù)大于1或者工作隊列為空),cas減去工作線程數(shù),返回。if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {//判斷是否容許延時取出,如果allowCoreThreadTimeOut為真或者 工作線程數(shù)大于coreSize,則延時取出,否則一直等待取出。Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();//自旋等待r不為null的任務(wù)if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}} }private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();//如果執(zhí)行失敗,則減去worker數(shù)目final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;//從workers中移除需要執(zhí)行的任務(wù)wworkers.remove(w);} finally {mainLock.unlock();}如果執(zhí)行完成時線程池正在關(guān)閉,工作線程數(shù)為0,則把線程池狀態(tài)置為終止terminatetryTerminate();int c = ctl.get();if (runStateLessThan(c, STOP)) {//正常執(zhí)行完是false,則進(jìn)入。if (!completedAbruptly) {//min為0或者coreSize。int min = allowCoreThreadTimeOut ? 0 : corePoolSize;//如果min為0且阻塞隊列不為空。則min置為1。if (min == 0 && ! workQueue.isEmpty())min = 1;如果工作線程數(shù)大于min,則不需要再增加工作線程,如果比min還小,則會走到下面的addWorker。if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false);} }3,線程池關(guān)閉
shutdown的操作
1判斷shutdown權(quán)限
2修改線程池狀態(tài),不再接受新任務(wù)
3interrupt所有線程
4修改狀態(tài)到terminal
public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//檢查當(dāng)前線程是否有能修改所有工作線程的權(quán)限checkShutdownAccess();//自旋 + cas 去修改線程池的狀態(tài)。advanceRunState(SHUTDOWN);//嘗試interrupt所有工作線程t.interrupt() interruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}tryTerminate(); }final void tryTerminate() {//自旋,如果失敗繼續(xù)執(zhí)行。for (;;) {int c = ctl.get();//如果是running或者已經(jīng)是tidying或者terminate,則不用做操作。if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;//如果工作線程不為0,則嘗試interrupt后返回。if (workerCountOf(c) != 0) { // Eligible to terminateinterruptIdleWorkers(ONLY_ONE);return;}final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//將狀態(tài)置為tidying然后調(diào)用模板方法 terminated,之后將狀態(tài)置為terminated。if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {terminated();} finally {ctl.set(ctlOf(TERMINATED, 0));termination.signalAll();}return;}} finally {mainLock.unlock();}// else retry on failed CAS} }總結(jié)
- 上一篇: Redis 常用操作命令
- 下一篇: 深入浅出理解锁之—— AbstractQ