java并发编程——线程池的工作原理与源码解读
2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
線程池的簡(jiǎn)單介紹
基于多核CPU的發(fā)展,使得多線程開發(fā)日趨流行。然而線程的創(chuàng)建和銷毀,都涉及到系統(tǒng)調(diào)用,比較消耗系統(tǒng)資源,所以就引入了線程池技術(shù),避免頻繁的線程創(chuàng)建和銷毀。
在Java用有一個(gè)Executors工具類,可以為我們創(chuàng)建一個(gè)線程池,其本質(zhì)就是new了一個(gè)ThreadPoolExecutor對(duì)象。
建議使用較為方便的 Executors 工廠方法來創(chuàng)建線程池。
- Executors.newCachedThreadPool()(無界線程池,可以進(jìn)行自動(dòng)線程回收)
- Executors.newFixedThreadPool(int)(固定大小線程池)
- Executors.newSingleThreadExecutor()(單個(gè)后臺(tái)線程)。
- Executors.newScheduledThreadPool() (支持計(jì)劃任務(wù)的線程池)
ThreadPoolExecutor工作原理介紹
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}線程池的執(zhí)行流程圖
任務(wù)被提交到線程池,會(huì)先判斷當(dāng)前線程數(shù)量是否小于corePoolSize,如果小于則創(chuàng)建線程來執(zhí)行提交的任務(wù),否則將任務(wù)放入workQueue隊(duì)列,如果workQueue滿了,則判斷當(dāng)前線程數(shù)量是否小于maximumPoolSize,如果小于則創(chuàng)建線程執(zhí)行任務(wù),否則就會(huì)調(diào)用handler,以表示線程池拒絕接收任務(wù)。
線程池使用介紹
newScheduledThreadPool的使用示例
public class SchedulePoolDemo {public static void main(String[] args){ScheduledExecutorService service = Executors.newScheduledThreadPool(10);//如果前面的任務(wù)沒有完成, 調(diào)度也不會(huì)啟動(dòng)service.scheduleAtFixedRate(()->{try {Thread.sleep(2000);// 每?jī)擅氪蛴∫淮?System.out.println(System.currentTimeMillis()/1000);} catch (InterruptedException e) {e.printStackTrace();}}, 0, 2, TimeUnit.SECONDS);} }潛在宕機(jī)風(fēng)險(xiǎn)
使用Executors來創(chuàng)建要注意潛在宕機(jī)風(fēng)險(xiǎn).其返回的線程池對(duì)象的弊端如下:
- FixedThreadPool和SingleThreadPoolPool : 允許的請(qǐng)求隊(duì)列長(zhǎng)度為 Integer.MAX_VALUE,可能會(huì)堆積大量的請(qǐng)求,從而導(dǎo)致 OOM.
- CachedThreadPool和ScheduledThreadPool : 允許的創(chuàng)建線程數(shù)量為 Integer.MAX_VALUE,可能會(huì)創(chuàng)建大量的線程,從而導(dǎo)致 OOM.
綜上所述, 在可能有大量請(qǐng)求的線程池場(chǎng)景中, 更推薦自定義ThreadPoolExecutor來創(chuàng)建線程池, 具體構(gòu)造函數(shù)配置如下:
線程池大小配置
一般根據(jù)任務(wù)類型進(jìn)行區(qū)分, 假設(shè)CPU為N核
- CPU密集型任務(wù)需要減少線程數(shù)量, 降低線程之間切換造成的開銷, 可配置線程池大小為N + 1.
- IO密集型任務(wù)則可以加大線程數(shù)量, 可配置線程池大小為 N * 2.
- 混合型任務(wù)則可以拆分為CPU密集型與IO密集型, 獨(dú)立配置.
自定義阻塞隊(duì)列BlockingQueue
主要存放等待執(zhí)行的線程, ThreadPoolExecutor中支持自定義該隊(duì)列來實(shí)現(xiàn)不同的排隊(duì)隊(duì)列.
- ArrayBlockingQueue:先進(jìn)先出隊(duì)列,創(chuàng)建時(shí)指定大小, 有界;
- LinkedBlockingQueue:使用鏈表實(shí)現(xiàn)的先進(jìn)先出隊(duì)列,默認(rèn)大小為Integer.MAX_VALUE;
- SynchronousQueue:不保存提交的任務(wù), 數(shù)據(jù)也不會(huì)緩存到隊(duì)列中, 用于生產(chǎn)者和消費(fèi)者互等對(duì)方, 一起離開.
- PriorityBlockingQueue: 支持優(yōu)先級(jí)的隊(duì)列
回調(diào)接口
線程池提供了一些回調(diào)方法, 具體使用如下所示.
ExecutorService service = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>()) {@Overrideprotected void beforeExecute(Thread t, Runnable r) {System.out.println("準(zhǔn)備執(zhí)行任務(wù): " + r.toString());}@Overrideprotected void afterExecute(Runnable r, Throwable t) {System.out.println("結(jié)束任務(wù): " + r.toString());}@Overrideprotected void terminated() {System.out.println("線程池退出");}};可以在回調(diào)接口中, 對(duì)線程池的狀態(tài)進(jìn)行監(jiān)控, 例如任務(wù)執(zhí)行的最長(zhǎng)時(shí)間, 平均時(shí)間, 最短時(shí)間等等, 還有一些其他的屬性如下:
- taskCount:線程池需要執(zhí)行的任務(wù)數(shù)量.
- completedTaskCount:線程池在運(yùn)行過程中已完成的任務(wù)數(shù)量.小于或等于taskCount.
- largestPoolSize:線程池曾經(jīng)創(chuàng)建過的最大線程數(shù)量.通過這個(gè)數(shù)據(jù)可以知道線程池是否滿過.如等于線程池的最大大小,則表示線程池曾經(jīng)滿了.
- getPoolSize:線程池的線程數(shù)量.如果線程池不銷毀的話,池里的線程不會(huì)自動(dòng)銷毀,所以這個(gè)大小只增不減.
- getActiveCount:獲取活動(dòng)的線程數(shù).
自定義拒絕策略
線程池滿負(fù)荷運(yùn)轉(zhuǎn)后, 因?yàn)闀r(shí)間空間的問題, 可能需要拒絕掉部分任務(wù)的執(zhí)行.
jdk提供了RejectedExecutionHandler接口, 并內(nèi)置了幾種線程拒絕策略
- AbortPolicy: 直接拒絕策略, 拋出異常.
- CallerRunsPolicy: 調(diào)用者自己執(zhí)行任務(wù)策略.
- DiscardOldestPolicy: 舍棄最老的未執(zhí)行任務(wù)策略. 使用方式也很簡(jiǎn)單, 直接傳參給ThreadPool
自定義ThreadFactory
線程工廠用于創(chuàng)建池里的線程. 例如在工廠中都給線程setDaemon(true), 這樣程序退出的時(shí)候, 線程自動(dòng)退出.或者統(tǒng)一指定線程優(yōu)先級(jí), 設(shè)置名稱等等.
class NamedThreadFactory implements ThreadFactory {private static final AtomicInteger threadIndex = new AtomicInteger(0);private final String baseName;private final boolean daemon;public NamedThreadFactory(String baseName) {this(baseName, true);}public NamedThreadFactory(String baseName, boolean daemon) {this.baseName = baseName;this.daemon = daemon;}public Thread newThread(Runnable runnable) {Thread thread = new Thread(runnable, this.baseName + "-" + threadIndex.getAndIncrement());thread.setDaemon(this.daemon);return thread;} }關(guān)閉線程池
跟直接new Thread不一樣, 局部變量的線程池, 需要手動(dòng)關(guān)閉, 不然會(huì)導(dǎo)致線程泄漏問題.默認(rèn)提供兩種方式關(guān)閉線程池.- shutdown: 等所有任務(wù), 包括阻塞隊(duì)列中的執(zhí)行完, 才會(huì)終止, 但是不會(huì)接受新任務(wù). - shutdownNow: 立即終止線程池, 打斷正在執(zhí)行的任務(wù), 清空隊(duì)列.ThreadPoolExecutor源碼分析
ThreadPoolExecutor中ctl屬性介紹
ctl是ThreadPoolExecutor的一個(gè)重要屬性,它記錄著ThreadPoolExecutor的線程數(shù)量和線程狀態(tài)。
//Integer有32位,其中前三位用于記錄線程狀態(tài),后29位用于記錄線程的數(shù)量. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //表示用于記錄線程數(shù)量的位數(shù) private static final int COUNT_BITS = Integer.SIZE - 3; //將1左移COUNT_BITS位再減1,表示能表示的最大線程數(shù) private static final int CAPACITY = (1 << COUNT_BITS) - 1; //用ctl前三位分別表示線程池的狀態(tài) //(前三位為111)接受新任務(wù)并且處理已經(jīng)進(jìn)入隊(duì)列的任務(wù) private static final int RUNNING = -1 << COUNT_BITS; //(前三位為000)不接受新任務(wù),但是處理已經(jīng)進(jìn)入隊(duì)列的任務(wù) private static final int SHUTDOWN = 0 << COUNT_BITS; //(前三位001)不接受新任務(wù),不處理已經(jīng)進(jìn)入隊(duì)列的任務(wù),并且中斷正在執(zhí)行的任務(wù) private static final int STOP = 1 << COUNT_BITS; //(前三位010)所有任務(wù)執(zhí)行完成,workerCount為0。線程轉(zhuǎn)到了狀態(tài)TIDYING會(huì)執(zhí)行terminated()鉤子方法 private static final int TIDYING = 2 << COUNT_BITS; //(前三位011)任務(wù)已經(jīng)執(zhí)行完成 private static final int TERMINATED = 3 << COUNT_BITS; //狀態(tài)值就是只關(guān)心前三位的值,所以把后29位清0 private static int runStateOf(int c) { return c & ~CAPACITY; }//線程數(shù)量就是只關(guān)心后29位的值,所以把前3位清0 private static int workerCountOf(int c) { return c & CAPACITY; }//兩個(gè)數(shù)相或 private static int ctlOf(int rs, int wc) { return rs | wc; }execute()方法解析
public void execute(Runnable command) {if (command == null) throw new NullPointerException();int c = ctl.get();//判斷當(dāng)前活躍線程數(shù)是否小于corePoolSizeif (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))//調(diào)用addWorker創(chuàng)建線程執(zhí)行任務(wù)return;c = ctl.get();}//如果不小于corePoolSize,則將任務(wù)添加到workQueue隊(duì)列。if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();//再次獲取ctl的狀態(tài)//如果不在運(yùn)行狀態(tài)了,那么就從隊(duì)列中移除任務(wù)if (! isRunning(recheck) && remove(command))reject(command);//如果在運(yùn)行階段,但是Worker數(shù)量為0,調(diào)用addWorker方法else if (workerCountOf(recheck) == 0)addWorker(null, false);}//嘗試創(chuàng)建非核心線程如果創(chuàng)建失敗就會(huì)調(diào)用reject拒絕接受任務(wù)。else if (!addWorker(command, false))reject(command);} //調(diào)用handler的rejectedExecution(command,this)方法。handler是RejectedExecutionHandler接口,默認(rèn)實(shí)現(xiàn)是AbortPolicy final void reject(Runnable command) {handler.rejectedExecution(command, this); }addWorker()方法解析
addWorker方法用于創(chuàng)建線程,并且通過core參數(shù)表示該線程是否是核心線程,如果返回true則表示創(chuàng)建成功,否則失敗。addWorker的代碼如下所示:
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);//得到線程池的運(yùn)行狀態(tài)// rs>=SHUTDOWN為false,即線程池處于RUNNING狀態(tài).// rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()這個(gè)條件為true,也就意味著三個(gè)條件同時(shí)滿足,即線程池狀態(tài)為SHUTDOWN且firstTask為null且隊(duì)列不為空,這種情況為處理隊(duì)列中剩余任務(wù)。上面提到過當(dāng)處于SHUTDOWN狀態(tài)時(shí),不接受新任務(wù),但是會(huì)處理完隊(duì)列里面的任務(wù)。如果firstTask不為null,那么就屬于添加新任務(wù);如果firstTask為null,并且隊(duì)列為空,那么就不需要再處理了。if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||//如果創(chuàng)建的是非核心線程(core=false)時(shí),則需要判斷當(dāng)前線程數(shù)wc>=maximumPoolSize,如果返回false,創(chuàng)建非核心線程失敗。//如果創(chuàng)建的是核心線程(core=true)時(shí),則需要判斷當(dāng)前線程數(shù)wc>=corePoolSize,如果返回false,創(chuàng)建核心線程失敗。wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))//worker+1執(zhí)行成功,那么跳出外循環(huán)break retry;c = ctl.get();if (runStateOf(c) != rs)//再次判斷當(dāng)前狀態(tài),如果新獲取的狀態(tài)和當(dāng)前狀態(tài)不一致,則再次進(jìn)入外循環(huán)continue retry;// else CAS failed due to workerCount change; retry inner loop}}/* 一旦跳出外循環(huán),表示可以創(chuàng)建創(chuàng)建線程,這里具體是Worker對(duì)象,Worker實(shí)現(xiàn)了Runnable接口并且繼承AbstractQueueSynchronizer,內(nèi)部維持一個(gè)Runnable的隊(duì)列。try塊中主要就是創(chuàng)建Worker對(duì)象,然后將其保存到workers中,workers是一個(gè)HashSet,表示工作線程的集合。然后如果添加成功,則開啟Worker所在的線程。如果開啟線程失敗,則調(diào)用addWorkerFailed方法,addWokerFailed用于回滾worker線程的創(chuàng)建。 */boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//以firstTask作為Worker的第一個(gè)任務(wù)創(chuàng)建Workerw = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();//對(duì)整個(gè)線程池加鎖try {int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();//啟動(dòng)啟動(dòng)這個(gè)線程workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}addWorkerFailed()方法解析
private void addWorkerFailed(Worker w) {//對(duì)整個(gè)線程成績(jī)加鎖final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//移除Worker對(duì)象if (w != null)workers.remove(w);//減小worker數(shù)量decrementWorkerCount();//檢查termination狀態(tài)tryTerminate();} finally {mainLock.unlock();}}addWorkerFailed首先從workers集合中移除線程,然后將wokerCount減1,最后檢查終結(jié)。
tryTerminate()方法解析
tryTerminate()方法用于檢查是否有必要將線程池狀態(tài)轉(zhuǎn)移到TERMINATED。
final void tryTerminate() {for (;;) {int c = ctl.get();/*狀態(tài)判斷,如果有符合以下條件之一。則跳出循環(huán)(1)線程池處于RUNNING狀態(tài)(2)線程池狀態(tài)處于TIDYING狀態(tài)(3)線程池狀態(tài)處于SHUTDOWN狀態(tài)并且隊(duì)列不為空 如果不滿足上述的情況,那么目前狀態(tài)屬于SHUTDOWN切隊(duì)列為空,或者狀態(tài)屬于STOP,那么調(diào)用interruptIdleWorkers方法停止一個(gè)Worker線程,然后退出。*/if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;if (workerCountOf(c) != 0) { // Eligible to terminateinterruptIdleWorkers(ONLY_ONE);return;} /* 如果沒有退出循環(huán)的話,那么就首先將狀態(tài)設(shè)置成TIDYING,然后調(diào)用terminated方法,最后設(shè)置狀態(tài)為TERMINATED。terminated方法是個(gè)空實(shí)現(xiàn),用于當(dāng)線程池終結(jié)時(shí)處理一些事情。 */final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {terminated();} finally {ctl.set(ctlOf(TERMINATED, 0));termination.signalAll();}return;}} finally {mainLock.unlock();}// else retry on failed CAS}}構(gòu)造函數(shù)Worker(firstTask)解析
Worker繼承自AbstractQueuedSynchronizer并實(shí)現(xiàn)Runnbale接口。AbstractQueuedSynchronizer提供了一個(gè)實(shí)現(xiàn)阻塞鎖和其他同步工具,比如信號(hào)量、事件等依賴于等待隊(duì)列的框架。Worker的構(gòu)造方法中會(huì)使用threadFactory構(gòu)造線程變量并持有run方法調(diào)用了runWorker方法,將線程委托給主循環(huán)線程。
Worker(Runnable firstTask) {setState(-1);this.firstTask = firstTask;//設(shè)置該線程的this.thread = getThreadFactory().newThread(this);//創(chuàng)建一個(gè)線程 }//當(dāng)我我們啟動(dòng)一個(gè)線程時(shí)就會(huì)觸發(fā)Worker中的此方法 public void run() {runWorker(this); }runWorker()方法解析
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;//首次任務(wù)是創(chuàng)建Worker時(shí)添加的任務(wù)w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {//線程調(diào)用runWoker,會(huì)while循環(huán)調(diào)用getTask方法從workerQueue里讀取任務(wù),然后執(zhí)行任務(wù)。只要getTask方法不返回null,此線程就不會(huì)退出。while (task != null || (task = getTask()) != null) {w.lock();//對(duì)Worker加鎖//如果線程池停止了,那么中斷線程if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);//空實(shí)現(xiàn),任務(wù)運(yùn)行之前的操作Throwable thrown = null;try {task.run();//執(zhí)行任務(wù)} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);//空實(shí)現(xiàn),任務(wù)運(yùn)行之后的操作}} finally {task = null;//任務(wù)執(zhí)行完畢后,將task設(shè)置為nullw.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}getTask()方法解析
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);//必要時(shí)檢查隊(duì)列是否為空if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);//判斷是否允許超時(shí),wc>corePoolSize則是判斷當(dāng)前線程數(shù)是否大于corePoolSize。boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {//如果當(dāng)前線程數(shù)大于corePoolSize,//則會(huì)調(diào)用workQueue的poll方法獲取任務(wù),超時(shí)時(shí)間是keepAliveTime。//如果超過keepAliveTime時(shí)長(zhǎng),poll返回了null,//上邊提到的while循序就會(huì)退出,線程也就執(zhí)行完了。//如果當(dāng)前線程數(shù)小于corePoolSize,//則會(huì)調(diào)用workQueue的take方法阻塞當(dāng)前線程,不會(huì)退出Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}參考地址:
- http://www.cnblogs.com/qingquanzi/p/8146638.html
- https://blog.csdn.net/qq_19431333/article/details/59030892
- https://www.cnblogs.com/xdecode/p/9119794.html
轉(zhuǎn)載于:https://my.oschina.net/cqqcqqok/blog/2049249
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)總結(jié)
以上是生活随笔為你收集整理的java并发编程——线程池的工作原理与源码解读的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【Infragistics教程】在jav
- 下一篇: 自定义按钮 图片标题位置随意放置