多线程—线程池Executor框架及四种常用线程池
池化技術應用:線程池、數據庫連接池、http連接池等等。
池化技術的思想主要是為了減少每次獲取資源的消耗,提高對資源的利用率。
使用線程池的好處:
- 降低資源消耗:通過重復利用已創建的線程降低線程創建和銷毀造成的消耗。
- 提高響應速度:當任務到達時,可以不需要等待線程創建就能立即執行。
- 管理線程:線程是稀缺資源,如果無限制的創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的分配,監控和調優。
Executor框架
綠色實線箭頭是繼承,虛線是接口實現
?
Executor接口是Executor框架的一個最基本的接口,Executor框架的大部分類都直接或間接地實現了此接口。
Executor接口只有一個execute(Runnable command)方法。
public void execute(Runnable r) {new Thread(r).start(); }ExecutorService接口繼承了Executor接口,該接口用于管理線程。
public interface ExecutorService extends Executor { // 請求關閉、發生超時或者當前線程中斷,無論哪一個首先發生之后,都將導致阻塞,直到所有任務完成執行。 boolean awaitTermination(long timeout, TimeUnit unit);// 執行給定的任務,當所有任務完成時,返回保持任務狀態和結果的 Future 列表。 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks);// 執行給定的任務,當所有任務完成或超時期滿時,返回保持任務狀態和結果的 Future 列表。 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit);// 任務列表里只要有一個任務完成了,就立即返回。而且一旦正常或異常返回后,則取消尚未完成的任務。 <T> T invokeAny(Collection<? extends Callable<T>> tasks);// 執行給定的任務,如果在給定的超時期滿前某個任務已成功完成(也就是未拋出異常),則返回其結果。 <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit);// 如果此執行程序已關閉,則返回 true。 boolean isShutdown();// 如果關閉后所有任務都已完成,則返回 true。先調用 shutdown 或 shutdownNow,否則 isTerminated 永不為 true。 boolean isTerminated();// 啟動一次順序關閉,執行以前提交的任務,但不接受新任務。 void shutdown();// 通過調用interrupt試圖停止所有正在執行的活動任務,暫停處理正在等待的任務,并返回等待執行的任務列表。 List<Runnable> shutdownNow();// 提交一個返回值的任務用于執行,返回一個表示任務的未決結果的 Future,該Future的get方法在成功完成時將會返回給定的結果。 <T> Future<T> submit(Callable<T> task);Future<?> submit(Runnable task);<T> Future<T> submit(Runnable task, T result); }execute和submit方法
- execute,執行一個任務,沒有返回值。
- submit,提交一個線程任務,有返回值。
- submit(Callable<T> task)能獲取到它的返回值,通過future.get()獲取(阻塞直到任務執行完)。
- submit(Runnable task, T result)能通過傳入的載體result間接獲得線程的返回值。
接下來介紹線程池主要實現類ThreadPoolExecutor
ThreadPoolExecutor構造函數
//五個參數的構造函數 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) //第六個參數 ThreadFactory threadFactory//第七個參數 RejectedExecutionHandler handlerThreadPoolExecutor類的構造函數有4個,前面5個參數是必須的,第6和7個參數同這5個參數又構成了3個構造函數。
int corePoolSize
- 線程池中核心線程數最大值,線程池新建線程的時候,如果當前線程總數小于corePoolSize,則新建的是核心線程,如果超過corePoolSize,進入任務隊列。
- 核心線程默認情況下會一直存活在線程池中,即使這個核心線程是閑置狀態。如果指定ThreadPoolExecutor的allowCoreThreadTimeOut這個屬性為true,那么閑置狀態的核心線程,超過一定時間(keepAliveTime),就會被銷毀掉。
int maximumPoolSize
- 該線程池中線程總數最大值。如果等待隊列滿了,創建非核心線程。
- 線程總數 = 核心線程數 + 非核心線程數。
long keepAliveTime
- 該線程池中非核心線程閑置超時時長。
- 一個非核心線程,如果不干活(閑置狀態)的時長超過這個參數所設定的時長,就會被銷毀掉。
- 如果設置allowCoreThreadTimeOut = true,則會作用于核心線程。
TimeUnit unit
- keepAliveTime的時間單位。
BlockingQueue<Runnable> workQueue
- 該線程池中的任務隊列,維護著等待執行的Runnable對象。
- 當所有的核心線程都在干活時,新添加的任務會被添加到這個隊列中等待處理,如果隊列滿了,則新建非核心線程執行任務。
隊列的三種通用策略詳解:
直接提交?SynchronousQueue
將任務直接提交給線程而不保存它們。如果所有線程都在工作,就新建一個線程來處理這個任務,所以通常要求maximumPoolSizes設置為Integer.MAX_VALUE,即無限大,以避免拒絕新提交的任務。
無界隊列?LinkedBlockingQueue
將導致在所有核心線程都在忙時新任務在隊列中等待,創建的線程就不會超過 corePoolSize,maximumPoolSize 的值也就沒意義了。
有界隊列?ArrayBlockingQueue
可以限定隊列的長度,接收到任務的時候,如果沒有達到corePoolSize的值,則新建線程(核心線程)執行任務,如果達到了,則入隊等候,如果隊列已滿,則新建線程(非核心線程)執行任務,又如果總線程數到了maximumPoolSize,并且隊列也滿了,則發生錯誤。
延時隊列?DelayQueue
傳進去的任務必須先實現Delayed接口。這個隊列接收到任務時,首先先入隊,只有達到了指定的延時時間,才會執行任務。
ThreadFactory threadFactory
用于創建新線程,是一個接口,new他的時候需要實現他的Thread newThread(Runnable r)方法,一般不用。
RejectedExecutionHandler handler
用于拋出異常,一般不用。
- ThreadPoolExecutor.AbortPolicy()? 拋出java.util.concurrent.RejectedExecutionException異常,默認。
- ThreadPoolExecutor.CallerRunsPolicy()? 重試添加當前的任務,他會自動重復調用execute()方法。?
- ThreadPoolExecutor.DiscardOldestPolicy()? 拋棄舊的任務??
- ThreadPoolExecutor.DiscardPolicy()? 拋棄當前的任務。?
如何配置線程池
CPU密集型任務
CPU核心數+1,CPU 密集型任務使得CPU使用率很高,內存、硬盤、網絡占用的時間少于cpu本身計算的時間,這時應配置盡可能小的線程避免線程之間頻繁的切換消耗資源。?
IO密集型任務
2*CPU核心數,O密集型任務CPU使用率并不高,當線程發出請求后,由于不占用cpu資源,可以阻塞等待,因此可以讓CPU在等待IO的時候有其他線程去處理別的任務,充分利用CPU時間。?
混合型任務
可以將任務分成IO密集型和CPU密集型任務,然后分別用不同的線程池去處理。
線程池執行策略
- 線程數量未達到corePoolSize,則新建一個核心線程執行任務
- 線程數量達到了corePools,則將任務移入隊列等待
- 隊列已滿,新建線程(非核心線程)執行任務
- 隊列已滿,總線程數又達到了maximumPoolSize,就會由RejectedExecutionHandler拋出異常
常用的四種線程池
CachedThreadPool 緩存線程池
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); cachedThreadPool.execute(new Runnable(){public void run() {};})特點:
- 核心線程數為0,線程數無限制
- 有空閑線程則復用空閑線程,若無空閑線程則新建線程
- 空閑線程只會等60s
- 直接提交隊列
FixedThreadPool?定長線程池
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}ExecutorService fixedThreadPool = Executors.newFixedThreadPool(int theads);fixedThreadPool.execute(new Runnable(){public void run() {};});?特點:
- 創建時給定核心線程數,所有線程都是核心線程。
- 線程空閑就回收,核心線程默認不回收。
- 阻塞隊列無界
SingleThreadExecutor?單線程化的線程池
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();singleThreadExecutor.execute(new Runnable(){public void run() {};}); static class FinalizableDelegatedExecutorServiceextends DelegatedExecutorService {FinalizableDelegatedExecutorService(ExecutorService executor) {super(executor);}protected void finalize() {super.shutdown();}}加了個finalize方法保證線程池的關閉,DelegatedExecutorService是繼承AbstractExecutorService的一個類。
和newFixedThreadPool(1)的區別
封裝成FinalizableDelegatedExecutorService類,這個類就是對ExecutorService進行了一個包裝,防止暴露出不該被暴露的方法,然后加上了finalize方法保證線程池的關閉。
特點:
- 線程池只有一個核心線程
- 線程空閑就回收,核心線程默認不回收。
- 阻塞隊列無界
ScheduledThreadPool?定長周期線程池:
ScheduledThreadPool是一個能實現定時、周期性任務的線程池。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);}ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(int theads);scheduledThreadPool.schedule(new Runnable() {public void run() {System.out.println("延遲1秒執行");}}, 1, TimeUnit.SECONDS);scheduledThreadPool.scheduleAtFixedRate(new Runnable() {public void run() {System.out.println("延遲1秒后每3秒執行一次");}}, 1, 3, TimeUnit.SECONDS); public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());}ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,實現了ScheduledExecutorService接口,該接口定義了schedule等任務調度的方法。ScheduledThreadPoolExecutor有兩個重要的內部類:DelayedWorkQueue和ScheduledFutureTask。DelayeddWorkQueue是一個阻塞隊列,而ScheduledFutureTask繼承自FutureTask,并且實現了Delayed接口。
特點:
- 給定核心線程數,定時、周期性處理任務
- 線程空閑就回收
- 阻塞隊列無界
線程池為什么能維持線程不釋放,隨時運行各種任務?
總結就是:如果隊列中沒有任務時,核心線程會一直阻塞在獲取任務的方法,直到返回任務。
//重點:poll會一直阻塞直到超過keepAliveTime或者獲取到任務 //take 會一直阻塞直到獲取到任務 //在沒有任務的時候 如果沒有特別設置allowCoreThreadTimeOut,我們的核心線程會一直阻塞在這里Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();所以也解釋了workQueues為什么要是BlockingQueue
ArrayBlockingQueue的take方法
public E take() throws InterruptedException {final ReentrantLock lock = this.lock; //加鎖lock.lockInterruptibly();try {while (count == 0)notEmpty.await(); //隊列為空時,將使這個線程進入阻塞狀態,直到被其他線程喚醒時取出元素return dequeue(); //消費對頭中的元素} finally {lock.unlock();}}可以看到當隊列內沒有任務時,調用await方法掛起線程。await方法是ConditionObject的方法,內部調用了LockSupport類的park方法將線程掛起。可以看這里。
總結
以上是生活随笔為你收集整理的多线程—线程池Executor框架及四种常用线程池的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 开发人员眼中最好的代码编辑器是谁?
- 下一篇: 13个好习惯 教你健康一整年