手写Java线程池_超详细解说_绝对能运行_代码超详细注释
線程池
問題背景
只是單純使用 new Thread(runnable).start(); 的方式創(chuàng)建線程, 將會(huì)導(dǎo)致嚴(yán)重的程序性能問題: 1.線程創(chuàng)建, 銷毀需要消耗很大的系統(tǒng)資源; 2.虛擬機(jī)創(chuàng)建線程的數(shù)量是有限的; 2.線程調(diào)度切換也將使程序性能下降; 針對(duì)這些問題, 對(duì)線程數(shù)量進(jìn)行管理, 有效地重復(fù)利用線程, 將會(huì)很好地提高程序性能.
線程池原理
使用隊(duì)列創(chuàng)建一定數(shù)量的線程, 當(dāng)有任務(wù)的時(shí)候, 使用隊(duì)列中線程執(zhí)行任務(wù)(如果任務(wù)過多, 就將其放入任務(wù)隊(duì)列, 進(jìn)入等待執(zhí)行狀態(tài)), 任務(wù)執(zhí)行完就自動(dòng)回收線程隊(duì)列中的線程(任務(wù)過少或者任務(wù)數(shù)量小于線程數(shù)量, 超出的線程將會(huì)銷毀, 做到線程隊(duì)列具有伸縮性);
根據(jù)上面描述, 我們自己的線程池將具有一下特點(diǎn):
1.內(nèi)部使用隊(duì)列來管理線程, 管理提交的任務(wù).
2.控制線程數(shù)量, 做到線程隊(duì)列具有良好的伸縮性.
3.當(dāng)任務(wù)數(shù)過多, 或者任務(wù)隊(duì)列已經(jīng)飽和, 將使用任務(wù)拒絕策略, 告訴對(duì)應(yīng)的任務(wù)提交者.
4.使用線程工廠定制線程隊(duì)列中, 每個(gè)線程的名字, 狀態(tài), 是否為守護(hù)線程等等.
線程池類圖結(jié)構(gòu)
任務(wù)隊(duì)列, 隊(duì)列使用limit限制提交任務(wù)的大小, 實(shí)現(xiàn)RunnableQueue接口(RunnableQueue接口負(fù)責(zé): 1.接收用戶提交的任務(wù); 2.獲取任務(wù)隊(duì)列中的任務(wù); 3.查看任務(wù)隊(duì)列大小), LinkedRunnableQueue實(shí)現(xiàn)RunnableQueue中的方法, 并且針對(duì)用戶提交不同的任務(wù)以及線程池種類(ThreadPool)的不同, 決定是否執(zhí)行拒絕策略(拒絕策略具有多個(gè), 拒絕方式取決于用戶自定義, 在線程池內(nèi)部具有默認(rèn)的拒絕策略實(shí)現(xiàn));
實(shí)現(xiàn)Runnable接口, 在run方法中獲取RunnableQueue中的任務(wù), 然后執(zhí)行RunnQueue中的任務(wù), InternalTask中的run方法是一個(gè)while循環(huán)循環(huán)結(jié)束條件取決于是否關(guān)閉該線程(關(guān)閉線程據(jù)需要設(shè)置flag變量, 當(dāng)flage為false, 線程run方法結(jié)束, 自動(dòng)結(jié)束生命), 而不是當(dāng)前用戶提交的任務(wù)是否執(zhí)行完!!!; InternalTask主要是對(duì)RunnableQueue的一種封裝; stop方法主要是設(shè)置線程flag(flag主要判斷當(dāng)前線程是否關(guān)閉)
線程池原型:
1.實(shí)現(xiàn)Runnable接口(此處注明, BasicThreadPool是繼承Thread, 但是Thread內(nèi)容太多了不能很好地在UML圖中顯示, 所以我就把他刪除了, 只留下了實(shí)現(xiàn)Runnable接口), 因?yàn)榫€程池自身執(zhí)行也需要一個(gè)線程, 所以繼承Thread, 這樣可以在BasicThreadPool的構(gòu)造方法中執(zhí)行start(), run方法中執(zhí)行創(chuàng)建線程的操作(線程池內(nèi)部執(zhí)行任務(wù)的線程); 創(chuàng)建線程取決于線程池設(shè)置的最大線程數(shù), 核心線程數(shù), 初始化線程數(shù), 用戶提交的任務(wù)數(shù);
2.實(shí)現(xiàn)ThreadPool接口(該接口主要用于定義線程池的基本操作, 比如執(zhí)行任務(wù), 獲取線程池的一些基本屬性) ;
3.內(nèi)部具有2個(gè)內(nèi)部類(ThreadTask負(fù)責(zé)對(duì)InternalTask進(jìn)行封裝, DefaultThreadFactory主要定義默認(rèn)的線程創(chuàng)建方式), 不同的線程池中擁有不同的默認(rèn)創(chuàng)建方式, 因此將線程創(chuàng)建方式設(shè)置為內(nèi)部類;
4.在BasicThreadPool中使用newThread方法創(chuàng)建線程(這些線程用于執(zhí)行ThreadTask中的任務(wù));
5.線程池原型中具有2個(gè)隊(duì)列, 第一個(gè)是剛才上面提的RunnQueue(負(fù)責(zé)執(zhí)行的任務(wù)), 第二個(gè)是ThreadQueue(負(fù)責(zé)存儲(chǔ)創(chuàng)建的每一個(gè)線程, 使用ArrayQueue實(shí)現(xiàn), 這樣很好地維護(hù)管理了線程, 做到資源重用)
6.removeThread方法: 刪除多余的線程, 當(dāng)用戶提交的任務(wù)數(shù)量小于線程池中創(chuàng)建的線程數(shù)量, 那么就刪除一定數(shù)量的線程, 這樣才不會(huì)浪費(fèi)線程資源.
7.在構(gòu)造方法中設(shè)置基本屬性, 以及當(dāng)前線程池的拒絕策略.
每個(gè)接口, 類的詳細(xì)定義
ThreadPool(interface 定義線程池基本操作)
package com.concurrent.customthreadpool;/*** 線程池接口* @author regotto*/ public interface ThreadPool {/*** 執(zhí)行提交的Runnable任務(wù)* @param runnable*/void execute(Runnable runnable);/*** 關(guān)閉線程池*/void shutdown();/*** 獲得線程池初始化大小* @return initSize*/int getInitSize();/*** 獲得線程池最大線程數(shù)* @return maxSize*/int getMaxSize();/*** 獲取線程池核心線程數(shù)* @return coreSize*/int getCoreSize();/*** 獲取線程池中用于緩存任務(wù)隊(duì)列的大小* @return queueSize*/int getQueueSize();/*** 獲取線程池中國(guó)活躍的線程數(shù)量* @return activeCount*/int getActiveCount();/*** 查看線程池是否shutdown* @return boolan*/boolean isShutdown();}RunnableQueue(interface 任務(wù)隊(duì)列)
package com.concurrent.customthreadpool;/*** 存放提交的Runnable, 使用BlockedQueue, 設(shè)置limit* @author regotto*/ public interface RunnableQueue {/*** 緩存提交到線程池中的任務(wù)* @param runnable*/void offer(Runnable runnable);/*** 從緩存中獲取Runnable任務(wù)* 如果沒有任務(wù), 調(diào)用者線程掛起, 在某些特定的時(shí)候拋出中斷異常* @throws InterruptedException* @return runnable*/Runnable take() throws InterruptedException;/*** 緩沖區(qū)大小* @return size*/int size();}LinkedRunnableQueue(class 對(duì)RunnableQueue的封裝, 用戶提交任務(wù), 線程執(zhí)行任務(wù), 此過程使用生產(chǎn)者-消費(fèi)者模式實(shí)現(xiàn))
package com.concurrent.customthreadpool;import java.util.LinkedList;/*** 線程池的內(nèi)部線程隊(duì)列, 緩沖區(qū)* @author regotto*/ public class LinkedRunnableQueue implements RunnableQueue{/*** limit: 限制當(dāng)前runnableList中還能存放多少內(nèi)容* denyPolicy: 拒絕策略* runnableList: 存放runnable的緩沖區(qū)* threadPool: 線程池*/private final int limit;private final RunnableDenyPolicy denyPolicy;private final LinkedList<Runnable> runnableList = new LinkedList<>();private final ThreadPool threadPool;public LinkedRunnableQueue(int limit, RunnableDenyPolicy denyPolicy, ThreadPool threadPool){this.limit = limit;this.denyPolicy = denyPolicy;this.threadPool = threadPool;}@Overridepublic void offer(Runnable runnable) {synchronized (runnableList) {if (runnableList.size() >= limit) {//用戶提交的任務(wù)大于限制條件, 執(zhí)行對(duì)應(yīng)的拒絕策略System.out.println(runnableList.size() + " >= " + limit + " execute deny policy");denyPolicy.reject(runnable, threadPool);} else {//添加任務(wù)到任務(wù)隊(duì)列尾部, 有任務(wù)存在, 喚醒剛才wait的線程runnableList.addLast(runnable);runnableList.notifyAll();}}}@Overridepublic Runnable take() throws InterruptedException{synchronized (runnableList) {while (runnableList.isEmpty()) {try {//從RunnableQueue中取出任務(wù), 如果任務(wù)為空, 使當(dāng)前線程waitrunnableList.wait();} catch (InterruptedException e) {throw e;}}//移除任務(wù)緩沖區(qū)的第一個(gè)return runnableList.removeFirst();}}@Overridepublic int size() {synchronized (runnableList) {return runnableList.size();}} }InternalTask(class 對(duì)RunnableQueue中任務(wù)的執(zhí)行)
package com.concurrent.customthreadpool;/*** 用于線程池內(nèi)部, 獲取runnableQueue中的runnable* @author regotto*/ public class InternalTask implements Runnable {private final RunnableQueue runnableQueue;private volatile boolean running = true;public InternalTask(RunnableQueue runnableQueue){this.runnableQueue = runnableQueue;}@Overridepublic void run() {//如果線程沒有關(guān)閉, 就讓該線程死循環(huán), 處理每一個(gè)提交的任務(wù)while (running && !Thread.currentThread().isInterrupted()){try {//處于中斷時(shí)候的線程不做處理//獲取RunnableQueue中任務(wù), 然后執(zhí)行Runnable take = runnableQueue.take();System.out.println("runnableQueue.take(): " + take.toString());take.run();} catch (InterruptedException e) {running = false;break;}}}/*** 停止當(dāng)前任務(wù), 設(shè)置其running為false, 在shutdown中處理*/public void stop(){this.running = false;}}RunnableDenyPolicy(interface 任務(wù)拒絕策略)
package com.concurrent.customthreadpool;/*** 當(dāng)任務(wù)數(shù)提交超過緩沖區(qū)limit, 執(zhí)行對(duì)應(yīng)的任務(wù)拒絕策略* @author regotto*/ @FunctionalInterface public interface RunnableDenyPolicy {/*** 對(duì)提交到threadPool的runnable是否執(zhí)行reject* @param runnable* @param threadPool*/void reject(Runnable runnable, ThreadPool threadPool);/*** 該策略使用空方法直接丟棄任務(wù)*/class DiscardDenyPolicy implements RunnableDenyPolicy {@Overridepublic void reject(Runnable runnable, ThreadPool threadPool) {System.out.println(runnable + "不做處理");}}/*** 該策略拋出一個(gè)RunnableDenyException*/class AbortDenyPolicy implements RunnableDenyPolicy {@Overridepublic void reject(Runnable runnable, ThreadPool threadPool) {throw new RunnableDenyException("The" + runnable + "will be abort");}}/***該策略Runnable給提交者所在的線程中運(yùn)行, 不加入到線程中*/class RunnerDenyPolicy implements RunnableDenyPolicy{@Overridepublic void reject(Runnable runnable, ThreadPool threadPool) {if (threadPool.isShutdown()) {runnable.run();}}}}RunnableDenyException(class 處理RunnableDenyPolicy拋出的運(yùn)行時(shí)異常)
package com.concurrent.customthreadpool;public class RunnableDenyException extends RuntimeException {public RunnableDenyException(String message) {super(message);} }ThreadFactory(interface 定義創(chuàng)建線程的接口)
package com.concurrent.customthreadpool;/*** 創(chuàng)建線程接口, 定制線程屬于哪一個(gè)group, 是否為守護(hù)線程, 優(yōu)先級(jí), 名字等* @author regotto*/ public interface ThreadFactory {/*** 創(chuàng)建定制化線程* @param runnable* @return thread*/Thread createThread(Runnable runnable);}BasicThreadPool(class 線程池的實(shí)現(xiàn))
package com.concurrent.customthreadpool;import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger;/*** 默認(rèn)的自定義的線程池, 內(nèi)部使用Queue進(jìn)行維護(hù)* @author regotto*/ public class BasicThreadPool extends Thread implements ThreadPool{/**initSize: 初始化線程數(shù)* maxSize: 線程池最大線程數(shù)* coreSize: 線程核心數(shù)* activeCount: 當(dāng)前活躍線程數(shù)* threadFactory: 線程工廠, 配置線程創(chuàng)建需要的參數(shù)* runnableQueue: 任務(wù)隊(duì)列* isShutdown: 是否關(guān)閉線程池* threadQueue: 工作線程隊(duì)列* DEFAULT_THREAD_FACTORY: 默認(rèn)的線程工廠* keepAliveTime: 線程存活時(shí)間*/private final int initSize;private final int maxSize;private final int coreSize;private int activeCount;private final ThreadFactory threadFactory;private final RunnableQueue runnableQueue;private volatile boolean isShutdown = false;private Queue<ThreadTask> threadQueue = new ArrayDeque<>();private final static RunnableDenyPolicy DEFAULT_DENY_POLICY = new RunnableDenyPolicy.DiscardDenyPolicy();private final static ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory();private final static long DEFAULT_KEEP_ALIVE_TIME = 10;private final long keepAliveTime;private final TimeUnit timeUnit;public BasicThreadPool(int initSize, int maxSize, int coreSize, int queueSize) {this(initSize, maxSize, coreSize, DEFAULT_THREAD_FACTORY, queueSize, DEFAULT_DENY_POLICY,DEFAULT_KEEP_ALIVE_TIME, TimeUnit.SECONDS);}public BasicThreadPool(int initSize, int maxSize, int coreSize, ThreadFactory threadFactory, int queueSize,RunnableDenyPolicy denyPolicy, long keepAliveTime, TimeUnit timeUnit) {this.initSize = initSize;this.maxSize = maxSize;this.coreSize = coreSize;this.threadFactory = threadFactory;this.runnableQueue = new LinkedRunnableQueue(queueSize, denyPolicy, this);this.keepAliveTime = keepAliveTime;this.timeUnit = timeUnit;this.init();}/*** 初始化線程池, 創(chuàng)建initThread*/private void init() {start();for (int i = 0; i < initSize; ++i) {newThread();}}/*** 創(chuàng)建線程添加到線程隊(duì)列, 然后用該線程執(zhí)行ThreadTask任務(wù)(層層封裝, 封裝用戶提交的任務(wù))*/private void newThread() {InternalTask internalTask = new InternalTask(runnableQueue);//使用自定義的線程工廠創(chuàng)建線程Thread thread = this.threadFactory.createThread(internalTask);ThreadTask threadTask = new ThreadTask(thread, internalTask);System.out.println(threadTask.thread.getName() + "被添加");//添加到線程隊(duì)列threadQueue.offer(threadTask);this.activeCount++;//被添加后的線程執(zhí)行startthread.start();}@Overridepublic void execute(Runnable runnable) {if (this.isShutdown) {throw new IllegalStateException("The thread pool id destroy");}//將用戶提交的任務(wù)放到runnableQueue中, 等待線程隊(duì)列中線程執(zhí)行this.runnableQueue.offer(runnable);}private void removeThread() {//ArrayDeque的remove就是removeFirstThreadTask threadTask = threadQueue.remove();//設(shè)置當(dāng)前線程flag, 在InternalTask中跳出循環(huán)自動(dòng)結(jié)束線程生命threadTask.internalTask.stop();this.activeCount--;}@Overridepublic void run() {while (!isShutdown && !isInterrupted()) {try {timeUnit.sleep(keepAliveTime);} catch (InterruptedException e) {isShutdown = true;break;}synchronized (this) {if (isShutdown) {break;}//當(dāng)前隊(duì)列中有任務(wù)還沒有處理, 且activeCount < coreSizeif (runnableQueue.size() > 0 && activeCount < coreSize) {//此處i曾寫做i=0,導(dǎo)致多創(chuàng)建了一個(gè)線程,在沒有任務(wù)的時(shí)候該線程一直保持wait//因?yàn)殛P(guān)閉pool,該線程沒有add到threadQueue,導(dǎo)致Interrupt失敗,最終導(dǎo)致線程一直運(yùn)行中for (int i = initSize; i < coreSize; ++i) {newThread();}//防止后面的if判斷創(chuàng)建線程數(shù)超過coreSize, 在coreSize還沒有滿的時(shí)候, 只執(zhí)行當(dāng)前的ifcontinue;}//當(dāng)上面if中創(chuàng)建的線程數(shù)不足的時(shí)候, 就擴(kuò)大線程池線程數(shù), 直到maxSizeif (runnableQueue.size() > 0 && activeCount < maxSize) {for (int i = coreSize; i < maxSize; ++i) {newThread();}}//當(dāng)沒有任務(wù), 但是activeCount線程數(shù)超出coreSize大小, 回收超出coreSize的線程if (runnableQueue.size() == 0 && activeCount > coreSize) {for (int i = coreSize; i < activeCount; ++i) {removeThread();}}}}}@Overridepublic void shutdown() {synchronized (this) {if (!isShutdown) {isShutdown = true;System.out.println("threadQueue size:" + threadQueue.size());threadQueue.forEach(threadTask -> {//調(diào)用internalTask中stop, 設(shè)置當(dāng)前線程運(yùn)行標(biāo)志為falsethreadTask.internalTask.stop();//設(shè)置線程中斷狀態(tài)threadTask.thread.interrupt();System.out.println(threadTask.thread.getName());});System.out.println("threadQueue中線程已經(jīng)關(guān)閉");//當(dāng)前線程池自己也要關(guān)閉this.interrupt();}}}@Overridepublic int getInitSize() {if (isShutdown) {throw new IllegalStateException("The thread pool is destroy");}return this.initSize;}@Overridepublic int getMaxSize() {if (isShutdown) {throw new IllegalStateException("The thread pool is destroy");}return this.maxSize;}@Overridepublic int getCoreSize() {if (isShutdown) {throw new IllegalStateException("The thread pool is destroy");}return this.coreSize;}@Overridepublic int getQueueSize() {if (isShutdown) {throw new IllegalStateException("The thread pool is destroy");}return this.runnableQueue.size();}@Overridepublic int getActiveCount() {if (isShutdown) {throw new IllegalStateException("The thread pool is destroy");}return this.activeCount;}@Overridepublic boolean isShutdown() {return this.isShutdown;}/*** 內(nèi)部類, 定義自己默認(rèn)的線程工廠*/private static class DefaultThreadFactory implements ThreadFactory {private static final AtomicInteger GROUP_COUNTER = new AtomicInteger(1);//設(shè)置線程組private static final ThreadGroup GROUP = new ThreadGroup("MyThreadPool-" + GROUP_COUNTER.getAndDecrement());private static final AtomicInteger COUNTER = new AtomicInteger(0);@Overridepublic Thread createThread(Runnable runnable) {//創(chuàng)建定制化的線程return new Thread(GROUP, runnable, " thread-pool-" + COUNTER.getAndDecrement());}}/*** 封裝InternalTask, 與每次創(chuàng)建的線程綁定在一起*/private class ThreadTask {Thread thread;InternalTask internalTask;ThreadTask(Thread thread, InternalTask internalTask){this.thread = thread;this.internalTask = internalTask;}} }ThreadPoolTest(class 測(cè)試類)
package com.concurrent.customthreadpool;import java.util.concurrent.TimeUnit;/*** 用于測(cè)試自定的線程池* @author regotto*/ public class ThreadPoolTest {public static void main(String[] args) {//初始化線程數(shù):2, 最大線程數(shù):6, 核心線程數(shù):4, 任務(wù)隊(duì)列大小:1000final BasicThreadPool threadPool = new BasicThreadPool(2, 6, 4, 1000);//創(chuàng)建20個(gè)任務(wù)提交進(jìn)行執(zhí)行for (int i = 0; i < 20; ++i) {threadPool.execute(() -> {try {TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName() + "is running and done.");} catch (InterruptedException e) {e.printStackTrace();}});}//此處用于測(cè)試線程池運(yùn)行時(shí)基本信息狀態(tài) // for (int j = 0; j < 1000; ++j) { // System.out.println("getActiveCount: " + threadPool.getActiveCount()); // System.out.println("getQueueSize: " + threadPool.getQueueSize()); // System.out.println("getCoreSize: " + threadPool.getCoreSize()); // System.out.println("getMaxSize: " + threadPool.getMaxSize()); // try { // TimeUnit.SECONDS.sleep(3); // } catch (InterruptedException e) { // e.printStackTrace(); // } // }try {TimeUnit.SECONDS.sleep(25);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("shutdown");//測(cè)試線程池shutdown功能threadPool.shutdown();} }總結(jié)
上面有錯(cuò), 還請(qǐng)指出, 如果認(rèn)為我寫的還不錯(cuò), 還請(qǐng)點(diǎn)個(gè)贊, 多多支持一下, O(∩_∩)O~~
總結(jié)
以上是生活随笔為你收集整理的手写Java线程池_超详细解说_绝对能运行_代码超详细注释的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: sql 整改措施 注入_SQL注入的漏洞
- 下一篇: auto.js停止所有线程_Java多线