自定义线程池内置线程池的使用 ThreadPoolExecutor和Executorservice 示例与注意事项
文章目錄
- 線程池介紹
- 自己設計一個線程池
- 1.設計ThreadPool類:
- 2.設計工作隊列
- 3.實現自己設計的線程池
- 用java的ThreadPoolExecutor自定義線程池
- 自定義線程池-參數設計分析
- 自定義線程池-實現步驟示例
- Exectors創建內置線程池
- ExecutorService介紹和示例
- Scheduledexecutorservice
線程池介紹
自JDK1.5起,utils包提供了ExecutorService線程池的實現,主要目的是為了重復利用線程,提高系統效率。我們知道Thread是一個重量級的資源,創建、啟動以及銷毀都是比較耗費系統資源的,因此對線程的重復利用一種是非常好的程序設計習慣,加之系統中可創建的線程數量是有限的,線程數量和系統性能是一種拋物線的關系,也就是說當線程數量達到某個數值的時候,性能反倒會降低很多,因此對線程的管理,尤其是數量的控制更能直接決定程序的性能。
所謂線程池,通俗的理解就是有一個池子,里面存放著已經創建好的線程,當有任務提交給線程池執行時,池子中的某個線程會主動執行該任務。如果池子中的線程數量不夠應付數量眾多的任務時,則需要自動擴充新的線程到池子中,但是該數量是有限的,就好比池塘的水界線一樣。當任務比較少的時候,池子中的線程能夠自動回收,釋放資源。
為了能夠異步地提交任務和緩存未被處理的任務,需要有一個任務隊列,
通過上面的描述可知,一個完整的線程池應該具備如下要素。
???任務隊列:用于緩存提交的任務。
???線程數量管理功能:一個線程池必須能夠很好地管理和控制線程數量,可通過如下三個參數來實現,比如創建線程池時初始的線程數量init;線程池自動擴充時最大的線程數量max;在線程池空閑時需要釋放線程但是也要維護一定數量的活躍數量或者核心數量core。
有了這三個參數,就能夠很好地控制線程池中的線程數量,將其維護在一個合理的范圍之內,
三者之間的關系是init<=core<=max。
???任務拒絕策略:如果線程數量已達到上限且任務隊列已達到上限且任務隊列已滿,則需要有相應的拒絕策略來通知任務提交者。
???線程工廠:主要用于個性化定制線程,比如將線程設置為守護線程以及設置線程名稱等。
???QueueSize:任務隊列主要存放提交的Runnable,但是為了防止內存溢出,需要有limit數量對其進行控制。
???Keepedalive時間:該時間主要決定線程各個重要參數自動維護的時間間隔。
自己設計一個線程池
在這里實現一個比較簡單的ThreadPool,雖然比較簡單,但是該有的功能基本上都具備,對讀者學習和掌握JUC中的ExecutorService也有一定的幫助
線程池工作過程
a) 如果正在運行的線程數量小于 corePoolSize,那么馬上創建線程運行這個任務;
b) 如果正在運行的線程數量大于或等于 corePoolSize,那么將這個任務放入隊列;
c) 如果這時候隊列滿了,而且正在運行的線程數量小于 maximumPoolSize,那么還是要創建非核心線程立刻運行這個任務;
d) 如果隊列滿了,而且正在運行的線程數量大于或等于 maximumPoolSize,那么線程池會拋出異常RejectExecutionException。
1.設計ThreadPool類:
public interface ThreadPool {//提交任務到線程池void execute(Runnable runnable);//關閉線程池void shutdown();//獲取線程池的初始化大小int getInitSize();//獲取線程池最大的線程數int getMaxSize();//獲取線程池的核心線程數量int getCoreSize();//獲取線程池中用于緩存任務隊列的大小int getQueueSize();//獲取線程池中活躍線程的數量int getActiveCount();//查看線程池是否已經被shutdownboolean isShutdown(); }ThreadFactory提供了創建線程的接口,以便于個性化地定制Thread,比如Thread應該被加到哪個Group中,優先級、線程名字以及是否為守護線程等,
2.設計工作隊列
RunanbleQueue主要用于存放提交的Runnable,該Runnable是一個BlockedQueue,并且有limit的限制
package com.wangwenjun.concurrent.chapter08;//任務隊列,主要用于緩存提交到線程池中的任務 public interface RunnableQueue {//當有新的任務進來時首先會offer到隊列中void offer(Runnable runnable);//工作線程通過take方法獲取RunnableRunnable take();//獲取任務隊列中任務的數量int size(); }自定義阻塞隊列LinkedRunnableQueue的示例:
import java.util.LinkedList;public class LinkedRunnableQueue implements RunnableQueue {//任務隊列的最大容量,在構造時傳入private final int limit;//若任務隊列中的任務已經滿了,則需要執行拒絕策略 private final DenyPolicy denyPolicy;//存放任務的隊列private final LinkedList<Runnable> runnableList = new LinkedList<>();private final ThreadPool threadPool;public LinkedRunnableQueue(int limit, DenyPolicy denyPolicy, ThreadPool threadPool){this.limit = limit;this.denyPolicy = denyPolicy;this.threadPool = threadPool;} } // 在LinkedRunnableQueue中有幾個重要的屬性,第一個是limit,也就是Runnable隊列的上限;當提交的Runnable數量達到limit上限時,則會調用DenyPolicy的reject方法;runnableList是一個雙向循環列表,用于存放Runnable任務@Override public void offer(Runnable runnable) {synchronized (runnableList) {if (runnableList.size() >= limit){//無法容納新的任務時執行拒絕策略denyPolicy.reject(runnable, threadPool);} else{//將任務加入到隊尾,并且喚醒阻塞中的線程runnableList.addLast(runnable);runnableList.notifyAll();}} }//offer方法是一個同步方法,如果隊列數量達到了上限,則會執行拒絕策略,否則會將runnable存放至隊列中,同時喚醒take任務的線程: @Override public Runnable take() throws InterruptedException {synchronized (runnableList){while (runnableList.isEmpty()){try{//如果任務隊列中沒有可執行任務,則當前線程將會掛起,進入runnableList關聯的monitor waitset中等待喚醒(新的任務加入)runnableList.wait();} catch (InterruptedException e){//被中斷時需要將該異常拋出throw e;}}//從任務隊列頭部移除一個任務return runnableList.removeFirst();} }//take方法也是同步方法,線程不斷從隊列中獲取Runnable任務,當隊列為空的時候工作線程會陷入阻塞,有可能在阻塞的過程中被中斷,為了傳遞中斷信號需要在catch語句塊中將異常拋出以通知上游(InternalTask),示例代碼如下: @Override public int size() {synchronized (runnableList){//返回當前任務隊列中的任務數return runnableList.size();} //其中,size方法用于返回runnableList的任務個數。 }3.實現自己設計的線程池
public class BasicThreadPool extends Thread implements ThreadPool {//初始化線程數量private final int initSize;//線程池最大線程數量private final int maxSize;//線程池核心線程數量private final int coreSize;//當前活躍的線程數量private int activeCount;//創建線程所需的工廠private final ThreadFactory threadFactory;//任務隊列private final RunnableQueue runnableQueue;//線程池是否已經被shutdownprivate volatile boolean isShutdown = false;//工作線程隊列private final Queue<ThreadTask> threadQueue = new ArrayDeque<>();private final static DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy();private final static ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory();private final long keepAliveTime;private final TimeUnit timeUnit;//構造時需要傳遞的參數:初始的線程數量,最大的線程數量,核心線程數量,任務隊列的最大數量public BasicThreadPool(int initSize, int maxSize, int coreSize,int queueSize){this(initSize, maxSize, coreSize, DEFAULT_THREAD_FACTORY,queueSize, DEFAULT_DENY_POLICY, 10, TimeUnit.SECONDS);}//構造線程池時需要傳入的參數,該構造函數需要的參數比較多public BasicThreadPool(int initSize, int maxSize, int coreSize,ThreadFactory threadFactory, int queueSize, DenyPolicy denyPolicy, long keepAliveTime, TimeUnit timeUnit){this.initSize = initSize;this.maxSize = maxSize;this.coreSize = coreSize;this.threadFactory = threadFactory;this.runnableQueue = new LinkedRunnableQueue(queueSize, denyPolicy, this);this.keepAliveTime = keepAliveTime;this.timeUnit = timeUnit;this.init();}//初始化時,先創建initSize個線程private void init(){start();for (int i = 0; i < initSize; i++){newThread();}} //提交任務非常簡單,只是將Runnable插入runnableQueue中即可@Override public void execute(Runnable runnable) {if (this.isShutdown)throw new IllegalStateException("The thread pool is destroy");//提交任務只是簡單地往任務隊列中插入Runnablethis.runnableQueue.offer(runnable); } ,線程池自動維護代碼如下: private void newThread() {//創建任務線程,并且啟動InternalTask internalTask = new InternalTask(runnableQueue);Thread thread = this.threadFactory.createThread(internalTask);ThreadTask threadTask = new ThreadTask(thread, internalTask);threadQueue.offer(threadTask);this.activeCount++;thread.start();} private void removeThread() {//從線程池中移除某個線程ThreadTask threadTask = threadQueue.remove();threadTask.internalTask.stop();this.activeCount--; } @Override public void run() {//run方法繼承自Thread,主要用于維護線程數量,比如擴容、回收等工作while (!isShutdown && !isInterrupted()){try{timeUnit.sleep(keepAliveTime);} catch (InterruptedException e){isShutdown = true;break;}synchronized (this){if (isShutdown)break;//當前的隊列中有任務尚未處理,并且activeCount< coreSize則繼續擴容if (runnableQueue.size() > 0&& activeCount < coreSize){for (int i = initSize; i < coreSize; i++){newThread();}//continue的目的在于不想讓線程的擴容直接達到maxsizecontinue;} //當前的隊列中有任務尚未處理,并且activeCount< maxSize則繼續擴容if (runnableQueue.size() > 0&& activeCount < maxSize){for (int i = coreSize; i < maxSize; i++){newThread();}}//如果任務隊列中沒有任務,則需要回收,回收至coreSize即可if (runnableQueue.size()==0&& activeCount > coreSize){for (int i = coreSize; i < activeCount; i++){removeThread();}}}} }//ThreadTask只是InternalTask和Thread的一個組合 private static class ThreadTask {public ThreadTask(Thread thread, InternalTask internalTask){this.thread = thread;this.internalTask = internalTask;}Thread thread;InternalTask internalTask; }自己設計的線程池測試代碼
import java.util.concurrent.TimeUnit;public class ThreadPoolTest {public static void main(String[] args) throws InterruptedException{ //定義線程池,初始化線程數為2,核心線程數為4,最大線程數為6,任務隊列最多允許1000個任務for (int i = 0; i < 20; i++)threadPool.execute(() ->{try{TimeUnit.SECONDS.sleep(10); System.out.println(Thread.currentThread().getName() + " is running and done.");} catch (InterruptedException e){e.printStackTrace();}});for (; ; ){//不斷輸出線程池的信息System.out.println("getActiveCount:" + threadPool.getActiveCount());System.out.println("getQueueSize:" + threadPool.getQueueSize());System.out.println("getCoreSize:" + threadPool.getCoreSize());System.out.println("getMaxSize:" + threadPool.getMaxSize());System.out.println("======================================");TimeUnit.SECONDS.sleep(5);}} }用java的ThreadPoolExecutor自定義線程池
自定義線程池需要用到ThreadPoolExecutor,這個類提供ExecutorService執行方法的默認實現。 此類使用newTaskFor返回的RunnableFuture實現submit 、 invokeAny和invokeAll方法,默??認為該包中提供的FutureTask類。
2.1.1:ThreadPoolExecutor部分源碼
構造方法:public ThreadPoolExecutor(int corePoolSize,/核心線程數量int maximumPoolSize,//最大線程數long keepAliveTime,/最大空閑時間TimeUnit unit,時間單位BlockingQueue<Runnable>workQueue,/任務隊列ThreadFactory threadFactory,/線程工廠RejectedExecutionHandler handler/∥飽和處理機制){…}自定義線程池-參數設計分析
◆通過觀察Java中的內置線程池參數講解和線程池工作流程總結我們不難發現要設計一個好的線程池,就必須合理的設置線程池的4個參數那到底該如何合理的設計4個參數的值呢?我們起往下看.
4個參數的設計
1:核心線程數( corepoolsize)
核心線程數的設計需要依據任務的處理時間可和每秒產生的任務數量來確定例如執行個任務常要0.1秒系統百分之80的時間每秒都會產生100個任務那么要想在1秒內處理完這100個任務就需要10個線程此時我們就可以設計核心線程數為10,當然實時情兄不可能這么平均所以我們般按照8020原則設計即可,既技照百分之80的情況設計核心線程數剩下的百分之20可以利用最大線程數處理
2:任務隊列長度( workqueue)
任務隊列長度一般設計為線程數/單個任務執行時可2即可,如上面的場中核心線程數設計為10單個任務執行時可為
0.1秒.則隊列長度可以設計為200
3:最大線程數(maximumPoolSize)
最大線程數的設計除了需要參照核心線程數的條件外,還需要參照系統每秒產生的最大任務數決定例如:上述環境中,如果系統每秒最大產生的任務是1000個,那么最大線程數=(最大任務數-任務隊列長度)*單個任務執行時間:既:最大線程數=(1000-200)*0.1=80個:
4:最大空閑時間(keepAliveTime)
這個參數的設計完全參考系統運行環境和硬件壓力設定沒有固定的參考值用戶可以根據經驗和系統產生任務的時間間隔合理設置一個值即可;
自定義線程池-實現步驟示例
1編寫任務類( My Task),實現 Runnable接口
2編寫線程類( My Worker)用于執行任務需要持有所有任務;
3編寫線程池類( Mythread Pool),包含提交任務執行任務的能力;
4編寫測試類( Mytest)創建線程池對象提交多個任務
例子:
public static void main(String[] args)throws ExecutionException, InterruptedException {// ① 創建ThreadPoolExecutor,7個構造參數 ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 30,TimeUnit.SECONDS,new ArrayBlockingQueue<>(10),Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardPolicy());// ② 提交執行異步任務,不關注返回值 executor.execute(() -> System.out.println(" execute the runnable task"));// ③ 提交執行異步任務,關注返回值 Future<String> future = executor.submit(() -> " Execute the callable task and this is the result");// ④獲取并輸出callable任務的返回值System.out.println(future.get()); }Exectors創建內置線程池
注:《阿里巴巴Java開發手冊》中強制線程池不允許使用 Executors 去創建,而是通過ThreadPoolExecutor 的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險
Executors 返回線程池對象的弊端如下:
FixedThreadPool 和 SingleThreadExecutor : 允許請求的隊列長度為Integer.MAX_VALUE,可能堆積大量的請求,從而導致OOM。
CachedThreadPool 和 ScheduledThreadPool : 允許創建的線程數量為Integer.MAX_VALUE ,可能會創建大量線程,從而導致OOM。
ExecutorService介紹和示例
Executors其實是個工具類,里面提供了好多靜態方法,這些方法根據用戶選擇返回不同的線程池實例。 ThreadPoolExecutor繼承了AbstractExecutorService,成員變量ctl是一個Integer的原子變量,用來記錄線程池狀態和線程池中線程個數,類似于ReentrantReadWriteLock使用一個變量來保存兩種信息。
獲取ExecutorServicei可以利用JDK中的Executors類中的靜態方法,常用獲取方式如下:
???static ExecutorService newCachedThreadPool(創建一個默認的線程池對象,里面的線程可重用,且在第一次使用時才創建static ExecutorService ,最多線程個數為Integer.MAX_VALUE,并且阻塞隊列為同步隊列。keeyAliveTime=60說明只要當前線程在60s內空閑則回收。這個類型的特殊之處在于,加入同步隊列的任務會被馬上執行,同步隊列里面最多只有一個任務。
???static ExecutorService newFixedThreadPool(int n Threads)創建一個可重用固定線程數的線程池并且阻塞隊列長度為Integer.MAX_VALUE。keeyAliveTime=0說明只要線程個數比核心線程個數多并且當前空閑則回收。
static ExecutorService newFixedThreadPool(int n
???static ExecutorService newSingleThreadExecutor()創建一個使用單個worker線程的Executor,以無界隊列方式來運行該線程。并且阻塞隊列長度為Integer.MAX_VALUE。keeyAliveTime=0說明只要線程個數比核心線程個數多并且當前空閑則回收。
上面三個構造方法都有帶ThreadFactory的重載方法,用于自定義線程創建的方式。
例子:
//創建一個線程池 ExecutorService pool = Executors.newFixedThreadPool(taskSize); // 創建多個有返回值的任務 List<Future> list = new ArrayList<Future>(); for (int i = 0; i < taskSize; i++) { Callable c = new MyCallable(i + " "); // 執行任務并獲取Future 對象 Future f = pool.submit(c); list.add(f); } // 關閉線程池 pool.shutdown(); // 獲取所有并發任務的運行結果 for (Future f : list) { // 從Future 對象上獲取任務的返回值,并輸出到控制臺 System.out.println("res:" + f.get().toString());Scheduledexecutorservice
Scheduledexecutorservice,是 ExecutorService的子接口,具備了延遲運行或定期執行任務的能力,常用獲取方式如下
static Scheduledexecutorservice newscheduled Threadpool(int corepoolsize創建一個可重用固定線程數的線程池且允許延遲運行或定期執行任務
static Scheduledexecutorservice newscheduledthread Pool(int corepoolsize, Threadfactory threadfactory)
創建一個可重用固定線程數的線程池且線程池中的所有線程都使用 Thread Factory來創建,且允許延遲運行或定期執行任務;
static Scheduledexecutorservice newsinglethreadscheduledexecutor(Threadfactory threadfactory)創建一個單線程執行程序,它可安排在給定延退后運行命令或者定期地執行。
例子:
ScheduledExecutorService scheduledThreadPool= Executors.newScheduledThreadPool(3); scheduledThreadPool.schedule(newRunnable(){@Overridepublic void run() { System.out.println("延遲三秒"); } }, 3, TimeUnit.SECONDS); scheduledThreadPool.scheduleAtFixedRate(newRunnable(){ @Override public void run() { System.out.println("延遲1秒后每三秒執行一次");} },1,3,TimeUnit.SECONDS);總結
以上是生活随笔為你收集整理的自定义线程池内置线程池的使用 ThreadPoolExecutor和Executorservice 示例与注意事项的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 用redis+jwt保存在线用户和获得在
- 下一篇: redis+aop防重复提交