日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

线程池解析

發布時間:2024/9/30 编程问答 20 豆豆
生活随笔 收集整理的這篇文章主要介紹了 线程池解析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

線程池解析

java線程對應內核線程,創建與銷毀需要切換到內核態,是系統開銷較大的操作,因此需要設法復用線程。線程池就是一個線程緩存,負責對線程統一分配,調優與監控,線程池的優勢:

提高性能:重用線程減少線程創建消亡的開銷
提高響應速度:任務到達時可以立即執行,不需要再去創建線程
方便管理線程:便于統一分配、調控和監控

執行Runnable任務流程

簡單說處理任務的優先級為:核心線程、任務隊列、最大線程,如果三者都滿了,使用handler處理被拒絕的任務。

具體如何實現的呢?

/** * 在將來某個時間執行該任務Runnable,可能是新建一個線程或者復用線程池已有的線程。 * 如果一個任務不能提交執行,要么是因為線程池shutdown了,要么是容量達到上限,此時執行拒絕策略 */ public void execute(Runnable command) {if (command == null)throw new NullPointerException();// AtomicInteger原子變量,前3位表示狀態,后29位表示線程數int c = ctl.get();if (workerCountOf(c) < corePoolSize) {// 1. addWorker方法里面會原子檢查當前running State和工作線程數目,防止并發時錯誤的多創建,// 也就是如果有別的線程添加任務達到corePoolSize,addWorker會返回false表示創建失敗if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {// 2. 即使任務成功的入隊,仍然需要再次檢查是否應該增加線程,因為此時從last checking可能線程都死亡或者線程池shutdownint recheck = ctl.get();// 所以我們需要再次檢查線程池狀態,如果已經shutdown需要移除任務并拒絕該任務if (! isRunning(recheck) && remove(command))reject(command);// 如果線程都已死亡就新建一個else if (workerCountOf(recheck) == 0)addWorker(null, false);// 如果入隊失敗,再嘗試新建線程。如果失敗說明線程池shutdown,或者飽和達到maximumPoolSize,應該執行拒絕策略}else if (!addWorker(command, false))reject(command); }

另外很重要的方法addWorker(),注意調用此方法是用來新建線程

/** * 檢查是否可以根據當前池狀態和給定界限(corePoolSize或maximumPoolSize)添加新的工作線程。 * 如果可能,將創建并啟動一個worker,將firstTask作為其第一個任務運行。 * 如果池已stopped或shutdown中,或者threadfactory創建線程失敗,則此方法返回 false。 * 如果線程創建失敗,或者threadfactory返回null,或者由于異常(通常是Thread.start()中的 OutOfMemoryError),我們會干凈利落地回滾。 * * @firstTask – 新線程應該首先運行的任務(如果沒有,則為null)。當線程少于corePoolSize時(在這種情況下我們總是啟動一個), * 或者當隊列已滿(在這種情況下我們必須繞過隊列)時,使用初始的第一個任務(在方法 execute() 中創建工作線程. * 最初空閑線程通常通過 prestartCoreThread 創建或替換其他垂死的工人。 */ private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||// check線程數是否超過限制wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 創建新的線程w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {// 獲取鎖!!final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();// 添加workerworkers.add(w);// 更新 池的最大值 int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {// 添加成功就啟動線程t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted; }

如何循環取任務執行的?

注意Worker實現了AQS,防止正在等待任務的線程被中斷

final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// 這里除了執行當前task,只要從隊列中能取getTask()就會不停的循環while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {// 沒有任務了就退出邏輯:停止線程,但會保證維持corePoolSizeprocessWorkerExit(w, completedAbruptly);} }

線程池狀態

線程池狀態是通過一個AtomicInteger的前3位表示的

  • Running:表示能接受新任務及處理隊列中的任務
  • Shutdown:表示不接受新任務,但是會處理隊列中的任務
  • stop:表示不接受新任務,不處理隊列中的任務,并且中斷正在處理的任務
  • Tidying:表示所有的任務已經終止,ctl記錄的“workerCount”為0,將會執行terminated()的鉤子方法
  • Terminated:terminated()方法執行完畢

轉換過程如下:

SHUTDOWN -> TIDYING:線程池線程數為空,工作隊列為空

STOP -> TIDYING:只要線程池線程數為空即可

TIDYING -> TERMINATED:terminated() 的鉤子方法執行完后

線程池中的阻塞工作隊列

阻塞隊列是隊列的常見應用。常見的BlockingQueue主要有三種實現:

SynchronousQueue是不緩存任務,相當于一個中轉站

ArrayBlockingQueue是一個用數組實現的有界阻塞隊列,必須設置容量。

LinkedBlockingQueue是一個用鏈表實現的阻塞隊列,容量可以選擇進行設置,不設置的話最大長度為Integer.MAX_VALUE,將是一個無邊界的阻塞隊列。對于一個無邊界隊列來說,是可以不斷的向隊列中加入任務的,這種情況下就有可能因為任務過多而導致內存溢出問題。newFixedThreadPool中創建的就是未指定容量的LinkedBlockingQueue。

任務拒絕策略

當最大線程滿后,會使用handler來處理被拒絕的任務,先看下 JDK 定義的 拒絕策略接口

public interface RejectedExecutionHandler {void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }

接口定義很明確,當觸發拒絕策略時,線程池會調用你設置的具體的策略,將當前提交的任務以及線程池實例本身傳遞給你處理,具體作何處理,不同場景會有不同的考慮,下面看 JDK 為我們內置了默認的四種處理策略為:

  • AbortPolicy 拋出異常。
public static class AbortPolicy implements RejectedExecutionHandler {public AbortPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());} }

不執行此任務,而且直接拋出一個運行時異常 RejectedExecutionException,為java線程池默認的阻塞策略。切記會中斷調用者的處理過程,因此需要try catch,否則程序會直接退出。

  • DiscardPolicy 直接靜悄悄的丟棄這個任務,不觸發任何動作。這個策略基本不會使用
public static class DiscardPolicy implements RejectedExecutionHandler {public DiscardPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {} }
  • DiscardOldestPolicy 丟棄隊列最前面(最舊)的任務,然后重新嘗試執行任務(重復此過程)。
public static class DiscardOldestPolicy implements RejectedExecutionHandler {public DiscardOldestPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}} }
  • CallerRunsPolicy 由調用線程處理該任務 。
public static class CallerRunsPolicy implements RejectedExecutionHandler {public CallerRunsPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}} }

功能:當觸發拒絕策略時,只要線程池沒有關閉,就由提交任務的當前線程處理。

使用場景:一般在不允許失敗的、對性能要求不高、并發量較小的場景下使用,因為線程池一般情況下不會關閉,也就是提交的任務一定會被運行,但是由于是調用者線程自己執行的,當多次提交任務時,就會阻塞后續任務執行,性能和效率自然就慢了。

下面我們看下第三方框架中都有哪些拒絕策略

Dubbo 中的線程拒絕策略

public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);private final String threadName;private final URL url;private static volatile long lastPrintTime = 0;private static Semaphore guard = new Semaphore(1);public AbortPolicyWithReport(String threadName, URL url) {this.threadName = threadName;this.url = url;}@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor e) {String msg = String.format("Thread pool is EXHAUSTED!" +" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),url.getProtocol(), url.getIp(), url.getPort());logger.warn(msg);dumpJStack();throw new RejectedExecutionException(msg);}private void dumpJStack() {//省略實現} }

可以看到,當dubbo的工作線程觸發了線程拒絕后,主要做了三個事情,原則就是盡量讓使用者清楚觸發線程拒絕策略的真實原因

  • 輸出了一條警告級別的日志,日志內容為線程池的詳細設置參數,以及線程池當前的狀態,還有當前拒絕任務的一些詳細信息。可以說,這條日志,使用dubbo的有過生產運維經驗的或多或少是見過的,這個日志簡直就是日志打印的典范,其他的日志打印的典范還有spring。得益于這么詳細的日志,可以很容易定位到問題所在
  • 輸出當前線程堆棧詳情,這個太有用了,當你通過上面的日志信息還不能定位問題時,案發現場的dump線程上下文信息就是你發現問題的救命稻草
  • 繼續拋出拒絕執行異常,使本次任務失敗,這個繼承了JDK默認拒絕策略的特性

Netty 中的線程池拒絕策略

private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {NewThreadRunsPolicy() {super();}public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {try {final Thread t = new Thread(r, "Temporary task executor");t.start();} catch (Throwable e) {throw new RejectedExecutionException("Failed to start a new thread", e);}} }

Netty新建了一個線程來處理的。所以,Netty的實現相較于調用者執行策略的使用面就可以擴展到支持高效率高性能的場景了。但是也要注意一點,Netty的實現里,在創建線程時未做任何的判斷約束,也就是說只要系統還有資源就會創建新的線程來處理,直到new不出新的線程了,才會拋創建線程失敗的異常

PinPoint 中的線程池拒絕策略

public class RejectedExecutionHandlerChain implements RejectedExecutionHandler {private final RejectedExecutionHandler[] handlerChain;//通過靜態方法來返回一個對象public static RejectedExecutionHandler build(List<RejectedExecutionHandler> chain) {Objects.requireNonNull(chain, "handlerChain must not be null");RejectedExecutionHandler[] handlerChain = chain.toArray(new RejectedExecutionHandler[0]);return new RejectedExecutionHandlerChain(handlerChain);}private RejectedExecutionHandlerChain(RejectedExecutionHandler[] handlerChain) {this.handlerChain = Objects.requireNonNull(handlerChain, "handlerChain must not be null");}@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {for (RejectedExecutionHandler rejectedExecutionHandler : handlerChain) {rejectedExecutionHandler.rejectedExecution(r, executor);}} }

pinpoint的拒絕策略實現很有特點,和其他的實現都不同。他定義了一個拒絕策略鏈,包裝了一個拒絕策略列表,當觸發拒絕策略時,會將策略鏈中的rejectedExecution依次執行一遍

ActiveMQ 中的線程池拒絕策略

new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {try {executor.getQueue().offer(r, 60, TimeUnit.SECONDS);} catch (InterruptedException e) {throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");}throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");} });

activeMq中的策略屬于最大努力執行任務型,當觸發拒絕策略時,在嘗試一分鐘的時間重新將任務塞進任務隊列,當一分鐘超時還沒成功時,就拋出異常。這種方式感覺可以用于這樣的場景,比如接受設備上報的消息大部分情況下都是比較少能處理的過來,每個任務處理時間不長,只有很小概率消息任務數量增加,就可以阻塞一段時間,等待其他任務完成。

BlockingQueue的核心方法:
放入數據:
  offer(anObject):表示如果可能的話,將anObject加到BlockingQueue里,即如果BlockingQueue可以容納,
    則返回true,否則返回false.(本方法不阻塞當前執行方法的線程)
  offer(E o, long timeout, TimeUnit unit),可以設定等待的時間,如果在指定的時間內,還不能往隊列中
    加入BlockingQueue,則返回失敗。
  put(anObject):把anObject加到BlockingQueue里,如果BlockQueue沒有空間,則調用此方法的線程被阻塞
    直到BlockingQueue里面有空間再繼續.
獲取數據:
  poll(time):取走BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數規定的時間,
    取不到時返回null;
  poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的對象,如果在指定時間內,
    隊列一旦有數據可取,則立即返回隊列中的數據。否則直到時間超時還沒有數據可取,返回失敗。
  take():取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到
    BlockingQueue有新的數據被加入;
  drainTo():一次性從BlockingQueue獲取所有可用的數據對象(還可以指定獲取數據的個數),
    通過該方法,可以提升獲取數據效率;不需要多次分批加鎖或釋放鎖。

Java提供的幾種線程池

  • FixedThreadPool(int n)

    new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());核心線程和最大線程都是n,使用LinkedBlockingQueue無界隊列。所以當請求越來越多無法及時處理時,就會不斷堆積,容易造成內存溢出OOM。

  • SingleThreadExecutor 單個工作線程的線程池

    new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())等同于FixedThreadPool(1),只有一個工作線程。保證提交的任務按順序執行,所以可以用來解決面試經常問到的如何控制多個線程順序打印’ABCABC’問題如何控制多個線程的執行順序

  • CachedThreadPool

    底層是`return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());,synchronousQueue是直接交換隊列,不緩存任務。這個線程池會利用已有的線程,沒有就新建,適合用來處理處理時間較短的異步任務,但是如果任務很多,會不斷創建線程直到超出系統限制(如發生線程內存分配不足)。如果線程超過60s沒有被使用,會被回收。

  • ScheduledThreadPool(int corePoolSize)

    可以用來執行一些簡單的延遲或按固定頻率的任務。 延遲5秒后執行任務scheduledExecutorService.schedule(thread,5, TimeUnit.SECONDS);
    在1s后執行一次,然后每隔3秒執行任務#scheduledExecutorService.scheduleAtFixedRate(thread,1,3,TimeUnit.SECONDS);

  • WorkStealingPool

    return new ForkJoinPool(parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);,JDK8加入的,返回一個工作竊取線程池ForkJoinPool,會維護足夠的線程以支持給定的并行度級別,并且可以使用多個隊列來減少爭用。 并行度級別對應于主動參與或可用于參與任務處理的最大線程數。 線程的實際數量可能會動態增長和收縮。 工作竊取池不保證提交任務的執行順序

    適合線程會產生子任務的,線程會把產生的子任務放進自己的工作隊列,其他線程可以幫助這個線程執行。

Spring提供的線程池ThreadPoolTaskExecutor,方便我們自定義線程池:

@Configuration @EnableAsync//開啟異步任務的支持 public class TaskExecutorConfig {@Bean(value = "businessEventProcessTaskExecutor", destroyMethod = "destroy")public ThreadPoolTaskExecutor getAsyncExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// 獲取JAVA虛擬機的可用處理器數量。IO密集型建議核心線程數是該值2倍;計算密集型建議核心線程數是該值1倍int processorNum = Runtime.getRuntime().availableProcessors();// 設置核心線程數量。若池中的實際線程數小于該值,無論其中是否有空閑的線程,都會產生新的線程executor.setCorePoolSize(processorNum * 2);// 設置最大線程數量executor.setMaxPoolSize(processorNum * 4);// 設置阻塞任務隊列大小executor.setQueueCapacity(100);// 線程名稱前綴executor.setThreadNamePrefix("demo-");// 設置線程池中任務的等待時間,若超過等待時間仍未銷毀則強制銷毀,以確保應用最后能夠被關閉,而不是阻塞住executor.setAwaitTerminationSeconds(AWAIT_TERMINATION_SECONDS);// 設置拒絕策略,當線程池阻塞隊列已滿時對新任務的處理。調節機制,即飽和時回退主線程執行executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;} }

線程池如何設置線程數:
耗時IO密集型:如讀寫數據庫、文件、網絡通信了,建議核心線程數是處理器數量的數倍,因為IO操作相對于CPU來說比較慢,為了不讓CPU過多的等待空耗;
計算密集型:建議核心線程數是處理器數量的1-2倍,因為CPU基本一直在工作。

源碼使用舉例

總結

以上是生活随笔為你收集整理的线程池解析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。