线程池的使用(线程池重点解析)
我們有兩種常見的創(chuàng)建線程的方法,一種是繼承Thread類,一種是實現(xiàn)Runnable的接口,Thread類其實也是實現(xiàn)了Runnable接口。但是我們創(chuàng)建這兩種線程在運行結(jié)束后都會被虛擬機(jī)銷毀,如果線程數(shù)量多的話,頻繁的創(chuàng)建和銷毀線程會大大浪費時間和效率,更重要的是浪費內(nèi)存,因為正常來說線程執(zhí)行完畢后死亡,線程對象變成垃圾!那么有沒有一種方法能讓線程運行完后不立即銷毀,而是讓線程重復(fù)使用,繼續(xù)執(zhí)行其他的任務(wù)哪?我們使用線程池就能很好地解決這個問題。
? ? ? ?我們接下來重點說明線程池類家族有哪些,線程池創(chuàng)建線程完成任務(wù)的實現(xiàn)原理是什么以及線程池的一些特性來進(jìn)行分析。此文參考了https://www.cnblogs.com/dolphin0520/p/3932921.html
一.線程池家族
? ? ?線程池的最上層接口是Executor,這個接口定義了一個核心方法execute(Runnabel command),這個方法最后被ThreadPoolExecutor類實現(xiàn),這個方法是用來傳入任務(wù)的。而且ThreadPoolExecutor是線程池的核心類,此類的構(gòu)造方法如下:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue);public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);構(gòu)造方法的參數(shù)及意義:
corePoolSize:核心線程池的大小,如果核心線程池有空閑位置,這是新的任務(wù)就會被核心線程池新建一個線程執(zhí)行,執(zhí)行完畢后不會銷毀線程,線程會進(jìn)入緩存隊列等待再次被運行。
maximunPoolSize:線程池能創(chuàng)建最大的線程數(shù)量。如果核心線程池和緩存隊列都已經(jīng)滿了,新的任務(wù)進(jìn)來就會創(chuàng)建新的線程來執(zhí)行。但是數(shù)量不能超過maximunPoolSize,否側(cè)會采取拒絕接受任務(wù)策略,我們下面會具體分析。
keepAliveTime:非核心線程能夠空閑的最長時間,超過時間,線程終止。這個參數(shù)默認(rèn)只有在線程數(shù)量超過核心線程池大小時才會起作用。只要線程數(shù)量不超過核心線程大小,就不會起作用。
unit:時間單位,和keepAliveTime配合使用。
workQueue:緩存隊列,用來存放等待被執(zhí)行的任務(wù)。
threadFactory:線程工廠,用來創(chuàng)建線程,一般有三種選擇策略。
ArrayBlockingQueue; LinkedBlockingQueue; SynchronousQueue;handler:拒絕處理策略,線程數(shù)量大于最大線程數(shù)就會采用拒絕處理策略,四種策略為
ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。 ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。 ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程) ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)?
Executor接口有一個子接口ExecutorService,ExecutorService的實現(xiàn)類為AbstracExecutorService,而ThreadPoolExcutor正是AbstrcExecutorService的子類。
ThreadPoolExecutor還有兩個常用的方法shutdown和submit,兩者都用來關(guān)閉線程池,但是后者有一個結(jié)果返回。
?
?
二.線程池實現(xiàn)原理
線程池圖:
1.線程池狀態(tài)
線程池和線程一樣擁有自己的狀態(tài),在ThreadPoolExecutor類中定義了一個volatile變量runState來表示線程池的狀態(tài),線程池有四種狀態(tài),分別為RUNNING、SHURDOWN、STOP、TERMINATED。
線程池創(chuàng)建后處于RUNNING狀態(tài)。
調(diào)用shutdown后處于SHUTDOWN狀態(tài),線程池不能接受新的任務(wù),會等待緩沖隊列的任務(wù)完成。
調(diào)用shutdownNow后處于STOP狀態(tài),線程池不能接受新的任務(wù),并嘗試終止正在執(zhí)行的任務(wù)。
當(dāng)線程池處于SHUTDOWN或STOP狀態(tài),并且所有工作線程已經(jīng)銷毀,任務(wù)緩存隊列已經(jīng)清空或執(zhí)行結(jié)束后,線程池被設(shè)置為TERMINATED狀態(tài)。
2.線程池任務(wù)的執(zhí)行
當(dāng)執(zhí)行execute(Runnable command)方法后,傳入了一個任務(wù),我們看一下execute方法的實現(xiàn)原理。
public void execute(Runnable command) {if (command == null)throw new NullPointerException();if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {if (runState == RUNNING && workQueue.offer(command)) {if (runState != RUNNING || poolSize == 0)ensureQueuedTaskHandled(command);}else if (!addIfUnderMaximumPoolSize(command))reject(command); } }整個方法的執(zhí)行過程是這樣的,首先判斷任務(wù)是否為空,空拋空指針異常,否則執(zhí)行下一個判斷,如果目前線程的數(shù)量小于核心線程池大小,就執(zhí)行addIfUnderCorePollSize(command)方法,在核心線程池創(chuàng)建新的線程,并且執(zhí)行這個任務(wù)。
我們看這個方法的具體實現(xiàn):
private boolean addIfUnderCorePoolSize(Runnable firstTask) {Thread t = null;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (poolSize < corePoolSize && runState == RUNNING)t = addThread(firstTask); //創(chuàng)建線程去執(zhí)行firstTask任務(wù) } finally {mainLock.unlock();}if (t == null)return false;t.start();return true; }這個方法首先是獲取線程池的鎖,參考別人的博客,說是和線程池狀態(tài)有關(guān),沒有搞懂......,后面又進(jìn)行一次判斷,判斷線程池線程數(shù)量和核心線程池的比較,前面execute()已經(jīng)判斷過,這里為什么還要進(jìn)行判斷哪?因為我們執(zhí)行完Execute()中的判斷后,可能有新的任務(wù)進(jìn)來了,并且為這個任務(wù)在核心線程池創(chuàng)建了新的線程去執(zhí)行,如果剛好這是核心線程池滿了,那么就不能再加入新的縣城到核心線程池了。這種可能性是存在的,因為你不知道cpu什么時間分配給誰,所以我們要加一個判斷,至于線程池狀態(tài)為什么也要判斷,也是因為可能有其他線程執(zhí)行了shutdown或者shutdownNow方法,導(dǎo)致線程池狀態(tài)不是RUNNING,那么線程池就停止接收新的任務(wù),也就不會創(chuàng)建新的線程去執(zhí)行這個任務(wù)了。
t=addThread(firstTask);這句代碼至關(guān)重要,我們看方法的實現(xiàn)代碼:
private Thread addThread(Runnable firstTask) {Worker w = new Worker(firstTask);Thread t = threadFactory.newThread(w); //創(chuàng)建一個線程,執(zhí)行任務(wù) if (t != null) {w.thread = t; //將創(chuàng)建的線程的引用賦值為w的成員變量 workers.add(w); //將當(dāng)前任務(wù)添加到任務(wù)集int nt = ++poolSize; //當(dāng)前線程數(shù)加1 if (nt > largestPoolSize)largestPoolSize = nt;}return t; }這個方法返回類型是Thread,所以我們可以新建一個線程并執(zhí)行任務(wù),之后將線程對象返回給外面的線程對象,然后執(zhí)行t.start(),我們看到有一個Worker對象接收了任務(wù),我們看Worker類的實現(xiàn):
private final class Worker implements Runnable {private final ReentrantLock runLock = new ReentrantLock();private Runnable firstTask;volatile long completedTasks;Thread thread;Worker(Runnable firstTask) {this.firstTask = firstTask;}boolean isActive() {return runLock.isLocked();}void interruptIfIdle() {final ReentrantLock runLock = this.runLock;if (runLock.tryLock()) {try {if (thread != Thread.currentThread())thread.interrupt();} finally {runLock.unlock();}}}void interruptNow() {thread.interrupt();}private void runTask(Runnable task) {final ReentrantLock runLock = this.runLock;runLock.lock();try {if (runState < STOP &&Thread.interrupted() &&runState >= STOP)boolean ran = false;beforeExecute(thread, task); //beforeExecute方法是ThreadPoolExecutor類的一個方法,沒有具體實現(xiàn),用戶可以根據(jù)//自己需要重載這個方法和后面的afterExecute方法來進(jìn)行一些統(tǒng)計信息,比如某個任務(wù)的執(zhí)行時間等 try {task.run();ran = true;afterExecute(task, null);++completedTasks;} catch (RuntimeException ex) {if (!ran)afterExecute(task, ex);throw ex;}} finally {runLock.unlock();}}public void run() {try {Runnable task = firstTask;firstTask = null;while (task != null || (task = getTask()) != null) {runTask(task);task = null;}} finally {workerDone(this); //當(dāng)任務(wù)隊列中沒有任務(wù)時,進(jìn)行清理工作 }} }這個類實現(xiàn)了Runnable接口,所以會有run()方法,我們看到run中執(zhí)行的還是傳入的任務(wù),其實相當(dāng)于調(diào)用傳入任務(wù)對象的run方法,我們之所以費力氣將任務(wù)對象加到Worker類中去執(zhí)行,是因為這個線程執(zhí)行之后會進(jìn)入阻塞隊列等待被執(zhí)行,這個線程的生命并沒有結(jié)束,這也正是我們使用線程池的最大原因。我們用一個Set集合存儲Worker,這樣不會有重復(fù)的任務(wù)被存儲,firstTask被執(zhí)行完后進(jìn)入緩存隊列,而這個新創(chuàng)建的線程就一直從緩存隊列中拿到任務(wù)去執(zhí)行。這個方法為getTask(),所以我們來看看線程如何從緩存隊列拿到任務(wù)。
Runnable getTask() {for (;;) {try {int state = runState;if (state > SHUTDOWN)return null;Runnable r;if (state == SHUTDOWN) // Help drain queuer = workQueue.poll();else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //如果線程數(shù)大于核心池大小或者允許為核心池線程設(shè)置空閑時間,//則通過poll取任務(wù),若等待一定的時間取不到任務(wù),則返回nullr = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);elser = workQueue.take();if (r != null)return r;if (workerCanExit()) { //如果沒取到任務(wù),即r為null,則判斷當(dāng)前的worker是否可以退出if (runState >= SHUTDOWN) // Wake up othersinterruptIdleWorkers(); //中斷處于空閑狀態(tài)的workerreturn null;}// Else retry} catch (InterruptedException ie) {// On interruption, re-check runState}} }我們看到如果核心線程池中創(chuàng)建的線程想要拿到緩存隊列中的任務(wù),先要判斷線程池的狀態(tài),如果STOP或者TERMINATED,返回NULL,如果是RUNNING或者SHUTDOWN,則從緩存隊列中拿到任務(wù)去執(zhí)行。
這就是核心線程池執(zhí)行任務(wù)的原理。
那么如果線程數(shù)量超過核心線程池大小哪?我們回到executor()方法,如果發(fā)生這種情況,處理方式是?
if (runState == RUNNING && workQueue.offer(command))這段代碼意思是,如果線程數(shù)量超過核心線程池大小,先進(jìn)行線程池狀態(tài)的判斷,如果是RUNNING,則將新進(jìn)來的線程加入緩存隊列。如果失敗,往往是因為緩存隊列滿了或者線程池狀態(tài)不是RUNNING,就直接創(chuàng)建新的線程去執(zhí)行任務(wù),調(diào)用addIfUnderMaximumPoolSize(command),就會新創(chuàng)建線程,但是這個縣城不是核心線程池中的,是臨時擴(kuò)展的,要保證線程數(shù)最大不超過線程池大小?maximumPoolSize,如果超過執(zhí)行?reject(command);操作,拒絕接受新的任務(wù)。
還有如果任務(wù)已經(jīng)加入緩存隊列成功還要繼續(xù)進(jìn)行判斷
if (runState != RUNNING || poolSize == 0)這是為了防止在將任務(wù)加入緩存隊列的同時其他線程調(diào)用shutdown或者shutdownNow方法,所以采取了保護(hù)措施
ensureQueuedTaskHandled(command)?
我們看addIfUnderMaximumPoolSize的實現(xiàn)方法:
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {Thread t = null;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (poolSize < maximumPoolSize && runState == RUNNING)t = addThread(firstTask);} finally {mainLock.unlock();}if (t == null)return false;t.start();return true; }這個方法和addIfUnderCorePoolSize基本一樣,只是方法中判斷條件改變了,這個方法是在緩沖隊列滿了并且線程池狀態(tài)是在RUNNING狀態(tài)下才會執(zhí)行,里面的判斷條件是線程池數(shù)量小于線程池最大容量,并且線程池狀態(tài)是RUNNING。
?
我們進(jìn)行總結(jié):
- 如果當(dāng)前線程池中的線程數(shù)目小于corePoolSize,則每來一個任務(wù),就會創(chuàng)建一個線程去執(zhí)行這個任務(wù);
- 如果當(dāng)前線程池中的線程數(shù)目>=corePoolSize,則每來一個任務(wù),會嘗試將其添加到任務(wù)緩存隊列當(dāng)中,若添加成功,則該任務(wù)會等待空閑線程將其取出去執(zhí)行;若添加失敗(一般來說是任務(wù)緩存隊列已滿),則會嘗試創(chuàng)建新的線程去執(zhí)行這個任務(wù);
- 如果當(dāng)前線程池中的線程數(shù)目達(dá)到maximumPoolSize,則會采取任務(wù)拒絕策略進(jìn)行處理;
- 如果線程池中的線程數(shù)量大于 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止,直至線程池中的線程數(shù)目不大于corePoolSize;如果允許為核心池中的線程設(shè)置存活時間,那么核心池中的線程空閑時間超過keepAliveTime,線程也會被終止。
三.線程池使用示例
? ? ??
package cn.yqg.java;public class Task implements Runnable{private int num;public Task(int num) {this.num=num;}@Overridepublic void run() {System.out.println("正在執(zhí)行任務(wù) "+num);try {Thread.currentThread().sleep(4000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("線程"+num+"執(zhí)行完畢");} } package cn.yqg.java;import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;public class Test4 {public static void main(String[] args) {ThreadPoolExecutor pool=new ThreadPoolExecutor(5,10,200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5));for(int i=0;i<15;i++) {Task task=new Task(i);pool.execute(task);System.out.println("線程池中線程數(shù)目:"+pool.getPoolSize()+",隊列中等待執(zhí)行的任務(wù)數(shù)目:"+pool.getQueue().size()+",已執(zhí)行玩別的任務(wù)數(shù)目:"+pool.getCompletedTaskCount());}pool.shutdown();} }一種可能情況:
正在執(zhí)行任務(wù) 0 線程池中線程數(shù)目:1,隊列中等待執(zhí)行的任務(wù)數(shù)目:0,已執(zhí)行玩別的任務(wù)數(shù)目:0 線程池中線程數(shù)目:2,隊列中等待執(zhí)行的任務(wù)數(shù)目:0,已執(zhí)行玩別的任務(wù)數(shù)目:0 線程池中線程數(shù)目:3,隊列中等待執(zhí)行的任務(wù)數(shù)目:0,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行任務(wù) 1 正在執(zhí)行任務(wù) 2 線程池中線程數(shù)目:4,隊列中等待執(zhí)行的任務(wù)數(shù)目:0,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行任務(wù) 3 線程池中線程數(shù)目:5,隊列中等待執(zhí)行的任務(wù)數(shù)目:0,已執(zhí)行玩別的任務(wù)數(shù)目:0 線程池中線程數(shù)目:5,隊列中等待執(zhí)行的任務(wù)數(shù)目:1,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行任務(wù) 4 線程池中線程數(shù)目:5,隊列中等待執(zhí)行的任務(wù)數(shù)目:2,已執(zhí)行玩別的任務(wù)數(shù)目:0 線程池中線程數(shù)目:5,隊列中等待執(zhí)行的任務(wù)數(shù)目:3,已執(zhí)行玩別的任務(wù)數(shù)目:0 線程池中線程數(shù)目:5,隊列中等待執(zhí)行的任務(wù)數(shù)目:4,已執(zhí)行玩別的任務(wù)數(shù)目:0 線程池中線程數(shù)目:5,隊列中等待執(zhí)行的任務(wù)數(shù)目:5,已執(zhí)行玩別的任務(wù)數(shù)目:0 線程池中線程數(shù)目:6,隊列中等待執(zhí)行的任務(wù)數(shù)目:5,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行任務(wù) 10 線程池中線程數(shù)目:7,隊列中等待執(zhí)行的任務(wù)數(shù)目:5,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行任務(wù) 11 線程池中線程數(shù)目:8,隊列中等待執(zhí)行的任務(wù)數(shù)目:5,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行任務(wù) 12 線程池中線程數(shù)目:9,隊列中等待執(zhí)行的任務(wù)數(shù)目:5,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行任務(wù) 13 線程池中線程數(shù)目:10,隊列中等待執(zhí)行的任務(wù)數(shù)目:5,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行任務(wù) 14 線程2執(zhí)行完畢 線程1執(zhí)行完畢 線程4執(zhí)行完畢 線程3執(zhí)行完畢 正在執(zhí)行任務(wù) 8 線程0執(zhí)行完畢 正在執(zhí)行任務(wù) 9 正在執(zhí)行任務(wù) 7 正在執(zhí)行任務(wù) 6 正在執(zhí)行任務(wù) 5 線程12執(zhí)行完畢 線程13執(zhí)行完畢 線程11執(zhí)行完畢 線程10執(zhí)行完畢 線程14執(zhí)行完畢 線程7執(zhí)行完畢 線程9執(zhí)行完畢 線程8執(zhí)行完畢 線程5執(zhí)行完畢 線程6執(zhí)行完畢從執(zhí)行結(jié)果可以看出,當(dāng)線程池中線程的數(shù)目大于5時,便將任務(wù)放入任務(wù)緩存隊列里面,當(dāng)任務(wù)緩存隊列滿了之后,便創(chuàng)建新的線程。如果上面程序中,將for循環(huán)中改成執(zhí)行超過15個任務(wù),就會拋出任務(wù)拒絕異常了。
不過并不提倡我們直接使用ThreadPoolExecutor,而是使用Executors類中提供的幾個靜態(tài)方法來創(chuàng)建線程池:
Executors.newCachedThreadPool(); //創(chuàng)建一個緩沖池,緩沖池容量大小為Integer.MAX_VALUE Executors.newSingleThreadExecutor(); //創(chuàng)建容量為1的緩沖池 Executors.newFixedThreadPool(int); //創(chuàng)建固定容量大小的緩沖池 public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>()); }從它們的具體實現(xiàn)來看,它們實際上也是調(diào)用了ThreadPoolExecutor,只不過參數(shù)都已配置好了。
newFixedThreadPool創(chuàng)建的線程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;
newSingleThreadExecutor將corePoolSize和maximumPoolSize都設(shè)置為1,也使用的LinkedBlockingQueue;
newCachedThreadPool將corePoolSize設(shè)置為0,將maximumPoolSize設(shè)置為Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務(wù)就創(chuàng)建線程運行,當(dāng)線程空閑超過60秒,就銷毀線程。
實際中,如果Executors提供的三個靜態(tài)方法能滿足要求,就盡量使用它提供的三個方法,因為自己去手動配置ThreadPoolExecutor的參數(shù)有點麻煩,要根據(jù)實際任務(wù)的類型和數(shù)量來進(jìn)行配置。
另外,如果ThreadPoolExecutor達(dá)不到要求,可以自己繼承ThreadPoolExecutor類進(jìn)行重寫。
from:?https://www.cnblogs.com/zzuli/p/9386463.html
總結(jié)
以上是生活随笔為你收集整理的线程池的使用(线程池重点解析)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java并发:线程池详解(ThreadP
- 下一篇: MySql 里的IFNULL、NULLI