日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Java并发编程之线程池ThreadPoolExecutor解析

發布時間:2025/3/11 java 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java并发编程之线程池ThreadPoolExecutor解析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

線程池存在的意義

平常使用線程即new Thread()然后調用start()方法去啟動這個線程,但是在頻繁的業務情況下如果在生產環境大量的創建Thread對象是則會浪費資源,不僅增加GC回收壓力,并且還浪費了時間,創建線程是需要花時間的;

線程池的存在就是降低頻繁的創建線程,降低資源的消耗以及創建時間的浪費,并且可以同一管理。

ThreadPoolExecutor

在JDK中所有的線程池的父類就是ThreadPoolExecutor,以下是它的構造方法

/*** Creates a new {@code ThreadPoolExecutor} with the given initial* parameters.** @param corePoolSize the number of threads to keep in the pool, even* if they are idle, unless {@code allowCoreThreadTimeOut} is set* @param maximumPoolSize the maximum number of threads to allow in the* pool* @param keepAliveTime when the number of threads is greater than* the core, this is the maximum time that excess idle threads* will wait for new tasks before terminating.* @param unit the time unit for the {@code keepAliveTime} argument* @param workQueue the queue to use for holding tasks before they are* executed. This queue will hold only the {@code Runnable}* tasks submitted by the {@code execute} method.* @param threadFactory the factory to use when the executor* creates a new thread* @param handler the handler to use when execution is blocked* because the thread bounds and queue capacities are reached* @throws IllegalArgumentException if one of the following holds:<br>* {@code corePoolSize < 0}<br>* {@code keepAliveTime < 0}<br>* {@code maximumPoolSize <= 0}<br>* {@code maximumPoolSize < corePoolSize}* @throws NullPointerException if {@code workQueue}* or {@code threadFactory} or {@code handler} is null*/public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}

int corePoolSize?:線程池中核心線程數,小于corePoolSize?,就會創建新線程,等于corePoolSize?,這個任務就會保存到BlockingQueue,如果調用prestartAllCoreThreads()方法就會一次性的啟動corePoolSize個數的線程。

int maximumPoolSize: 允許的最大線程數,BlockingQueue也滿了,小于maximumPoolSize時候就會再次創建新的線程

long keepAliveTime:線程空閑下來后,存活的時間,這個參數只在大于corePoolSize才有用

TimeUnit unit:存活時間的單位值

BlockingQueue<Runnable> workQueue:保存任務的阻塞隊列

ThreadFactory threadFactory:創建線程的工廠,給新建的線程賦予名字

RejectedExecutionHandler handler:飽和策略

? ? ? ? ? AbortPolicy :直接拋出異常,默認;

? ? ? ? ?CallerRunsPolicy:用調用者所在的線程來執行任務

? ? ? ? ?DiscardOldestPolicy:丟棄阻塞隊列里最老的任務,隊列里最靠前的任務

? ? ? ? ?DiscardPolicy :當前任務直接丟棄

也可以實現自己的飽和策略,實現RejectedExecutionHandler接口即可

實現基本原理

主要是依賴BlockingQueue<Runnable>隊列和HashSet<Worker>實現的,Worker繼承了Runnable以及AQS的一個內部類,所以這個類具體等待并且開啟線程的功能

在提交Runnable可執行的線程時,

當前線程數小于corePoolSize??的時候,僅僅是將Runnable添加到HashSet<Worker>當中,并且執行start()方法,調用的是runWorker()方法

當前線程數大于或等于corePoolSize??的時候,會將Runnable添加到workerQueue隊列中等待并且會添加一個null的Runnable到addWorker()方法當中。如果隊列滿了offer失敗就會執相應的reject拒絕策略。

public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}

在addWorker()方法當中,如果Runnable為空的話,會直接返回false,否則將創建一個Worker對象并且啟動它,在runWorker中,首先執行完后傳輸過來的Runnable對象中的run(),然后循環去workerQueue隊列使用take方法拿等待隊列中的Runnable對象,并且執行相應的run()方法。

final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}

關閉線程池的方法:

shutdownNow():設置線程池的狀態,還會嘗試停止正在運行或者暫停任務的線程

shutdown()設置線程池的狀態,只會中斷所有沒有執行任務的線程

工作機制

合理配置線程池

根據任務的性質來:計算密集型(CPU),IO密集型,混合型

計算密集型:加密,大數分解,正則……., 線程數適當小一點,最大推薦:機器的Cpu核心數+1,為什么+1,防止頁缺失,(機器的Cpu核心=Runtime.getRuntime().availableProcessors();)

IO密集型:讀取文件,數據庫連接,網絡通訊, 線程數適當大一點,機器的Cpu核心數*2,

混合型:盡量拆分,IO密集型>>計算密集型,拆分意義不大,IO密集型~計算密集型

隊列的選擇上,應該使用有界,無界隊列可能會導致內存溢出

Executors預定義的線程池

FixedThreadPool:創建固定線程數量的,適用于負載較重的服務器,使用了無界隊列

SingleThreadPoolExecutor:創建單個線程,需要順序保證執行任務,不會有多個線程活動,使用了無界隊列

CachedThreadPool:會根據需要來創建新線程的,執行很多短期異步任務的程序,使用了SynchronousQueue
WorkStealingPool(JDK7以后): 基于ForkJoinPool實現

Executor框架

還有一個是定時器,待會兒再說吧

總結

以上是生活随笔為你收集整理的Java并发编程之线程池ThreadPoolExecutor解析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。