常用并发工具类(线程池)
文章目錄
- 概述
- ThreadPoolExecutor
- ThreadPoolExecutor 的主要屬性
- Worker 主要屬性
- 線程池的狀態(tài)
- 線程池的狀態(tài)流轉(zhuǎn)
- 線程池提交任務(wù)的執(zhí)行流程
- 線程數(shù)量的設(shè)置
- 線程池的種類
- FixedThreadPool
- CachedThreadPool
- SingleThreadExecutor
- ScheduledThreadPoolExecutor
- SingleThreadScheduledExecutor
- ForkJoinPool
- work-stealing
- ForkJoinPool 注意事項(xiàng)
概述
線程池,是資源池化思想的一種實(shí)現(xiàn)。線程是一種寶貴且有限的 CPU 資源,一個線程的創(chuàng)建跟銷毀的成本是比較高的。
所以創(chuàng)建線程池,主要有以下兩個目的:
- 復(fù)用線程:單個線程創(chuàng)建使用完畢后,可以不用立馬銷毀,而是把這個線程放入到線程池中,等待下次執(zhí)行任務(wù)
- 管理線程:線程是一種寶貴且有限的 CPU 資源,其數(shù)量并不是無上限的
- 可以通過線程池來限制創(chuàng)建線程的數(shù)量,也可以通過線程池來決定何時銷毀冗余創(chuàng)建的線程
- 在線程池中存在多個線程時,可以通過線程池來協(xié)調(diào)每個線程的任務(wù)執(zhí)行情況,從而避免出現(xiàn)線程處于空閑狀態(tài),造成線程資源的浪費(fèi)
ThreadPoolExecutor
ThreadPoolExecutor 是一個通用的線程池的實(shí)現(xiàn),類圖如下所示:
可以看到,ThreadPoolExecutor 主要實(shí)現(xiàn)了 Executor、ExecutorService、AbstractExecutorService
ThreadPoolExecutor 的主要屬性
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private final BlockingQueue<Runnable> workQueue;private final ReentrantLock mainLock = new ReentrantLock();/*** Set containing all worker threads in pool. Accessed only when* holding mainLock.*/private final HashSet<Worker> workers = new HashSet<Worker>();/*** Wait condition to support awaitTermination*/private final Condition termination = mainLock.newCondition();/*** Tracks largest attained pool size. Accessed only under* mainLock.*/private int largestPoolSize;/*** Counter for completed tasks. Updated only on termination of* worker threads. Accessed only under mainLock.*/private long completedTaskCount;private volatile ThreadFactory threadFactory;/*** Handler called when saturated or shutdown in execute.*/private volatile RejectedExecutionHandler handler;/*** Timeout in nanoseconds for idle threads waiting for work.* Threads use this timeout when there are more than corePoolSize* present or if allowCoreThreadTimeOut. Otherwise they wait* forever for new work.*/private volatile long keepAliveTime;/*** If false (default), core threads stay alive even when idle.* If true, core threads use keepAliveTime to time out waiting* for work.*/private volatile boolean allowCoreThreadTimeOut;/*** Core pool size is the minimum number of workers to keep alive* (and not allow to time out etc) unless allowCoreThreadTimeOut* is set, in which case the minimum is zero.*/private volatile int corePoolSize;/*** Maximum pool size. Note that the actual maximum is internally* bounded by CAPACITY.*/private volatile int maximumPoolSize;/*** The default rejected execution handler*/private static final RejectedExecutionHandler defaultHandler =new AbortPolicy();- 內(nèi)置一個 AtomicInteger,高 3 位用于表示線程池當(dāng)前狀態(tài),低 29 位用于表示線程個數(shù)
- 內(nèi)置一個阻塞隊(duì)列,用于核心線程都處于執(zhí)行狀態(tài)后保存任務(wù)
- 內(nèi)置一個 ReentrantLock 獨(dú)占鎖,以及一個由鎖對象創(chuàng)建出來的條件對象 Condition termination
- 一個去重集合 HashSet<Worker> workers,用于保存線程
- 歷史出現(xiàn)過的最多線程數(shù)量 int largestPoolSize
- 已完成的任務(wù)數(shù)量 long completedTaskCount
- 由 volatile 關(guān)鍵字修飾的線程組 ThreadFactory threadFactory
- 由 volatile 關(guān)鍵字修飾的拒絕策略 RejectedExecutionHandler handler
- 由 volatile 關(guān)鍵字修飾的空閑線程最大存活時間 long keepAliveTime
- 由 volatile 關(guān)鍵字修飾的是否允許核心線程超時 boolean allowCoreThreadTimeOut
- 由 volatile 關(guān)鍵字修飾的核心線程數(shù)量 int corePoolSize
- 由 volatile 關(guān)鍵字修飾的最大線程數(shù)量 int maximumPoolSize
- 默認(rèn)的拒絕策略 new AbortPolicy(),即默認(rèn)的拒絕策略是拋出異常
Worker 主要屬性
Worker 即工作線程。Worker 繼承了 AQS 并實(shí)現(xiàn)了 Runnable 接口,主要有以下幾個屬性:
- Thread thread:線程對象
- Runnable firstTask:首次執(zhí)行的任務(wù)
線程池的狀態(tài)
線程池一共有 5 個狀態(tài),分別是:
- RUNNING:運(yùn)行狀態(tài),接受新任務(wù)并且處理阻塞隊(duì)列里的任務(wù)
- SHUTDOWN:標(biāo)記關(guān)閉狀態(tài),拒絕新任務(wù),但是處理正在執(zhí)行和阻塞隊(duì)列里的任務(wù)
- STOP:關(guān)閉狀態(tài),拒絕新任務(wù)并且清空阻塞隊(duì)列,同時會中斷所有線程
- TIDYING:清場狀態(tài),線程池和阻塞隊(duì)列都為空,將要調(diào)用 terminated() 方法
線程池的狀態(tài)流轉(zhuǎn)
- 線程池一旦被創(chuàng)建,就是 RUNNING 狀態(tài),可以正常地執(zhí)行任務(wù),以及接受新任務(wù)
- 在 RUNNING 狀態(tài)調(diào)用 shutdown() 方法后,線程池就會切換到 SHUTDOWN 狀態(tài),標(biāo)記關(guān)閉狀態(tài)下
- 遍歷線程池,當(dāng)前所有空閑線程都會被調(diào)用 interrupt() 方法設(shè)置中斷( 被設(shè)置中斷的線程將會從線程池中移除)
- 不再接受新任務(wù),新提交的任務(wù)將直接丟棄
- 當(dāng)前正在執(zhí)行任務(wù)的線程把正在執(zhí)行的任務(wù)繼續(xù)完成,且這些線程將會繼續(xù)從阻塞隊(duì)列中獲取任務(wù)執(zhí)行
- 在 RUNNING 或者 SHUTDOWN 狀態(tài)調(diào)用 shutdownNow() 方法后,線程池就會切換到 STOP 狀態(tài)
- 不再接受新任務(wù)
- 遍歷線程池,調(diào)用每個線程的 interrupt() 方法設(shè)置中斷
- 遍歷阻塞隊(duì)列,移除所有任務(wù)
- 在 SHUTDOWN 或者 STOP 狀態(tài)持續(xù)一段時間后,當(dāng)線程池中沒有線程,并且阻塞隊(duì)列也為空后,線程池將會切換到 TIDYING 狀態(tài)
- TIDYING 狀態(tài)將會做最后的收尾工作,確保所有資源都被釋放
線程池提交任務(wù)的執(zhí)行流程
- 當(dāng)有新任務(wù)提交時,判斷當(dāng)前核心線程數(shù)是否小于最大核心線程數(shù)
- 如果小于最大核心線程數(shù),則創(chuàng)建一個新線程執(zhí)行任務(wù)
- 如果大于核心線程數(shù),則嘗試將任務(wù)提交到阻塞隊(duì)列中
- 如果阻塞隊(duì)列未滿,則將任務(wù)提交到阻塞隊(duì)列中
- 如果阻塞隊(duì)列已滿,則嘗試開啟新線程
- 如果當(dāng)前線程數(shù)量小于最大線程數(shù),則創(chuàng)建一個新線程執(zhí)行任務(wù)
- 如果當(dāng)前線程數(shù)量等于最大線程數(shù),則執(zhí)行拒絕策略
- AbortPolicy:直接拋出異常
- CallerRunsPolicy:由提交任務(wù)的線程執(zhí)行
- DiscardPolicy:直接丟棄任務(wù)
- DiscardOldestPolicy:將阻塞隊(duì)列隊(duì)頭的任務(wù)取出并丟棄,然后重試提交
線程數(shù)量的設(shè)置
線程數(shù)量的經(jīng)驗(yàn)計(jì)算公式為:
線程數(shù) = CPU 核心數(shù) *(1 + 平均等待時間 / 平均工作時間)線程池的種類
FixedThreadPool
FixedThreadPool,即固定數(shù)量線程池,特點(diǎn)是核心線程數(shù)量 = 最大線程數(shù)量,且阻塞隊(duì)列容量較大甚至無界。
- 固定數(shù)量,意味著線程不會頻繁地創(chuàng)建和銷毀,在線程數(shù)量達(dá)到核心線程數(shù)量后,在執(zhí)行任務(wù)過程中沒有發(fā)生異常的情況下,線程數(shù)量將不會再發(fā)生變化
- 阻塞隊(duì)列容量非常大,代表著任務(wù)可以被無條件地,一直不斷地添加進(jìn)來,可能會引發(fā) GC 問題
- 當(dāng)任務(wù)的執(zhí)行速度遠(yuǎn)小于任務(wù)的添加速度時,可能會導(dǎo)致阻塞隊(duì)列中的節(jié)點(diǎn)數(shù)量特別巨大,吃光堆內(nèi)存空間,導(dǎo)致 OOM
FixedThreadExecutor 適用于計(jì)算密集型任務(wù), 確保 CPU 在長時間被單個工作線程使用的情況下, 盡可能少地創(chuàng)建、分配、銷毀線程, 即適用于長期且數(shù)量可控的任務(wù)。
CachedThreadPool
CachedThreadPool,即緩存線程池,特點(diǎn)是
- 核心線程數(shù)量為 0,意味著在線程池空閑時,不會占用任何的線程資源
- 最大線程數(shù)非常巨大,意味著可以創(chuàng)建非常多的線程,可能會導(dǎo)致系統(tǒng)線程資源耗盡的問題
- 線程有較短的最大存活時間(如 60s),意味著每個工作線程的存活時間比較短
- 阻塞隊(duì)列不存儲元素,意味著每次有新任務(wù)提交時,要么可以立馬被一個空閑線程執(zhí)行,要么可以立馬創(chuàng)建一個新線程執(zhí)行去執(zhí)行
CachedThreadPool 適用于存在數(shù)量多且耗時少的任務(wù)場景,由于未限制線程最大數(shù)量,在任務(wù)的執(zhí)行速度遠(yuǎn)小于任務(wù)的添加速度時,有可能會導(dǎo)致系統(tǒng)線程資源耗盡或引發(fā) OOM。
SingleThreadExecutor
SingleThreadExecutor,即單線程線程池,特點(diǎn)是:
- 核心線程數(shù)量 = 最大線程數(shù)量 = 1,意味著線程池中最多只會有一個線程,意味著所有提交的任務(wù)都會被串行化執(zhí)行,任意時刻不會有同時兩個任務(wù)在被同時執(zhí)行
- 阻塞隊(duì)列容量非常大甚至無界,代表著任務(wù)可以被無條件地,一直不斷地添加進(jìn)來,可能會引發(fā) GC 問題
SingleThreadExecutor 適用于多個任務(wù)需要串行化執(zhí)行的場景,由于阻塞隊(duì)列無界,還是有可能引發(fā) OOM。
注意,Executors.newSingleThreadExecutor() 并不等價(jià)于 Executors.newFixedThreadPool(1),Executors 中的源碼為:
可以看到,newSingleThreadExecutor() 方法創(chuàng)建出來的 ThreadPoolExecutor 對象被包了一層 FinalizableDelegatedExecutorService,所以這兩者不等價(jià)
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor,是定時任務(wù)線程池,可以使用它來實(shí)現(xiàn)定時任務(wù),主要的構(gòu)造方法為:
public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());}可以看到,ScheduledThreadPoolExecutor 的特點(diǎn)是:
- 最大線程數(shù)無界
- 空閑線程的最大存活時間為 0,即在線程池不會存儲空閑線程
- 阻塞隊(duì)列 DelayedWorkQueue 是一個底層數(shù)據(jù)結(jié)構(gòu)為堆的延遲阻塞隊(duì)列,無界
SingleThreadScheduledExecutor
SingleThreadScheduledExecutor 即單線程定時任務(wù)線程池,可以保證所有定時任務(wù)都會由一個線程串行執(zhí)行
ForkJoinPool
ForkJoinPool,是一個用于管理 ForkJoinWrokerThread 的線程池,ForkJoin 是一種基于分治思想的并發(fā)編程框架,通過將任務(wù)分解后使用多個線程并行計(jì)算,最后合并所有子任務(wù)的計(jì)算結(jié)果得到最終的計(jì)算結(jié)果。
ForkJoinPool 線程池可以把一個大任務(wù)拆分成小任務(wù)并行執(zhí)行,但是任務(wù)類必須繼承自 RecursiveTask 或者 RecursiveAction
work-stealing
工作竊取算法(work-stealing)算法是指某個線程從其他隊(duì)列里竊取任務(wù)來執(zhí)行。主要實(shí)現(xiàn)的思路有一下幾個關(guān)鍵點(diǎn):
- 每個工作者線程都會對應(yīng)一個雙端任務(wù)隊(duì)列
- 當(dāng)某個工作者線程自己的任務(wù)隊(duì)列中的任務(wù)執(zhí)行完畢之后,將會從其他工作者線程的任務(wù)隊(duì)列里竊取任務(wù)出來執(zhí)行
- 工作者線程從自己的任務(wù)隊(duì)列的頭部來獲取任務(wù)(removeFirst)
- 工作者線程執(zhí)行任務(wù)竊取時,將從其他工作者線程的任務(wù)隊(duì)列的尾部來獲取任務(wù)(removeLast)
ForkJoinPool 注意事項(xiàng)
- 任務(wù)類必須繼承自 RecursiveTask 或者 RecursiveAction
- 子任務(wù)間沒有相互依賴
- 任務(wù)最好不要包含阻塞 IO 操作
總結(jié)
以上是生活随笔為你收集整理的常用并发工具类(线程池)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: nfine框架 上传文件_MVC之Str
- 下一篇: SVN服务器部署并实现双机同步及禁止普通