日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

[并发编程] - Executor框架#ThreadPoolExecutor源码解读03

發布時間:2025/3/21 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 [并发编程] - Executor框架#ThreadPoolExecutor源码解读03 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

  • Pre
  • execute源碼分析
    • addWorker()解讀
    • Worker解讀


Pre

[并發編程] - Executor框架#ThreadPoolExecutor源碼解讀02
說了一堆結論性的東西,作為開發人員著實是不過癮,那這里我們就來剖根問底來看下線程池是如何工作的。


execute源碼分析

ThreadPoolExecutor te = new ThreadPoolExecutor(5,10,500,TimeUnit.SECONDS,new ArrayBlockingQueue(5));for (int i = 0; i < 6; i++) {te.submit(()->{System.out.println("i m task :"+Thread.currentThread().getName());});}

使用ThreadPoolExecutor 自定義了一個線程池

參數對應如下

int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue

調用了 AbstractExecutorService#submit

public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}

最核心的方法 execute ,由子類ThredPoolExecutor實現

public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** clt記錄著runState和workerCount*/int c = ctl.get();/** workerCountOf方法取出低29位的值,表示當前活動的線程數;* 如果當前活動線程數小于corePoolSize,則新建一個線程放從入線程池中;* 并把任務添加到該線程中。*/if (workerCountOf(c) < corePoolSize) {/** addWorker中的第二個參數表示限制添加線程的數量是根據corePoolSize來判斷還是maximumPoolSize來判斷;* 如果為true,根據corePoolSize來判斷;* 如果為false,則根據maximumPoolSize來判斷*/if (addWorker(command, true))return;/** 如果添加失敗,則重新獲取ctl值*/c = ctl.get();}/** 如果當前線程池是運行狀態并且任務添加到隊列成功*/if (isRunning(c) && workQueue.offer(command)) {// 重新獲取ctl值int recheck = ctl.get();// 再次判斷線程池的運行狀態,如果不是運行狀態,由于之前已經把command添加到workQueue中了,// 這時需要移除該command// 執行過后通過handler使用拒絕策略對該任務進行處理,整個方法返回if (! isRunning(recheck) && remove(command))reject(command);/** 獲取線程池中的有效線程數,如果數量是0,則執行addWorker方法* 這里傳入的參數表示:* 1. 第一個參數為null,表示在線程池中創建一個線程,但不去啟動;* 2. 第二個參數為false,將線程池的有限線程數量的上限設置為maximumPoolSize,添加線程時根據maximumPoolSize來判斷;* 如果判斷workerCount大于0,則直接返回,在workQueue中新增的command會在將來的某個時刻被執行。*/else if (workerCountOf(recheck) == 0)addWorker(null, false);}/** 如果執行到這里,有兩種情況:* 1. 線程池已經不是RUNNING狀態;* 2. 線程池是RUNNING狀態,但workerCount >= corePoolSize并且workQueue已滿。* 這時,再次調用addWorker方法,但第二個參數傳入為false,將線程池的有限線程數量的上限設置為maximumPoolSize;* 如果失敗則拒絕該任務*/else if (!addWorker(command, false))reject(command); }

主要的流程,注釋中也寫的很清楚了

/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/

簡單來說,在執行execute()方法時如果狀態一直是RUNNING時,的執行過程如下:

  • 如果workerCount < corePoolSize,則創建并啟動一個線程來執行新提交的任務;
  • 如果workerCount >= corePoolSize,且線程池內的阻塞隊列未滿,則將任務添加到該阻塞隊列中;
  • 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且線程池內的阻塞隊列已滿,則創建并啟動一個線程來執行新提交的任務;
  • 如果workerCount >= maximumPoolSize,并且線程池內的阻塞隊列已滿, 則根據拒絕策略來處理該任務, 默認的處理方式是直接拋異常。
  • Note : 這里要注意一下addWorker(null, false);,也就是創建一個線程,但并沒有傳入任務,因為任務已經被添加到workQueue中了,所以worker在執行的時候,會直接從workQueue中獲取任務。所以,在workerCountOf(recheck) == 0時執行addWorker(null, false);也是為了保證線程池在RUNNING狀態下必須要有一個線程來執行任務。


    addWorker()解讀

    private boolean addWorker(Runnable firstTask, boolean core) {}

    addWorker方法的主要工作是在線程池中創建一個新的線程并執行,

    • firstTask參數 用于指定新增的線程執行的第一個任務,
    • core參數為true表示在新增線程時會判斷當前活動線程數是否少于corePoolSize,false表示新增線程前需要判斷當前活動線程數是否少于maximumPoolSize
    private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();// 獲取運行狀態int rs = runStateOf(c);/** 這個if判斷* 如果rs >= SHUTDOWN,則表示此時不再接收新任務;* 接著判斷以下3個條件,只要有1個不滿足,則返回false:* 1. rs == SHUTDOWN,這時表示關閉狀態,不再接受新提交的任務,但卻可以繼續處理阻塞隊列中已保存的任務* 2. firsTask為空* 3. 阻塞隊列不為空* * 首先考慮rs == SHUTDOWN的情況* 這種情況下不會接受新提交的任務,所以在firstTask不為空的時候會返回false;* 然后,如果firstTask為空,并且workQueue也為空,則返回false,* 因為隊列中已經沒有任務了,不需要再添加線程了*/// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {// 獲取線程數int wc = workerCountOf(c);// 如果wc超過CAPACITY,也就是ctl的低29位的最大值(二進制是29個1),返回false;// 這里的core是addWorker方法的第二個參數,如果為true表示根據corePoolSize來比較,// 如果為false則根據maximumPoolSize來比較。// if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// 嘗試增加workerCount,如果成功,則跳出第一個for循環if (compareAndIncrementWorkerCount(c))break retry;// 如果增加workerCount失敗,則重新獲取ctl的值c = ctl.get(); // Re-read ctl// 如果當前的運行狀態不等于rs,說明狀態已被改變,返回第一個for循環繼續執行if (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 根據firstTask來創建Worker對象w = new Worker(firstTask);// 每一個Worker對象都會創建一個線程final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {int rs = runStateOf(ctl.get());// rs < SHUTDOWN表示是RUNNING狀態;// 如果rs是RUNNING狀態或者rs是SHUTDOWN狀態并且firstTask為null,向線程池中添加線程。// 因為在SHUTDOWN時不會在添加新的任務,但還是會執行workQueue中的任務if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();// workers是一個HashSetworkers.add(w);int s = workers.size();// largestPoolSize記錄著線程池中出現過的最大線程數量if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {// 啟動線程t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted; }

    Worker解讀

    addWorker中多次提到了這個Work這個類, 其實就是 線程池中的每一個線程被封裝成一個Worker對象,ThreadPool維護的其實就是一組Worker對象

    ThreadPoolExector中內部類 Worker

    private final class Worker extends AbstractQueuedSynchronizer implements Runnable{private static final long serialVersionUID = 6138294804551838833L;final Thread thread;Runnable firstTask;volatile long completedTasks;/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*/Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker */public void run() {runWorker(this);}// Lock methods//// The value 0 represents the unlocked state.// The value 1 represents the locked state.protected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}

    繼承了AQS,并實現了Runnable接口 ,
    兩個比較重要的屬性

  • firstTask用它來保存傳入的任務;
  • thread是在調用構造方法時通過ThreadFactory來創建的線程,是用來處理任務的線程。
  • 在調用構造方法時,需要把任務傳入,這里通過getThreadFactory().newThread(this);來新建一個線程,newThread方法傳入的參數是this,因為Worker本身繼承了Runnable接口,也就是一個線程,所以一個Worker對象在啟動的時候會調用Worker類中的run方法。

    Worker繼承了AQS,使用AQS來實現獨占鎖的功能。為什么不使用ReentrantLock來實現呢?可以看到tryAcquire方法,它是不允許重入的,而ReentrantLock是允許重入的

    總結

    以上是生活随笔為你收集整理的[并发编程] - Executor框架#ThreadPoolExecutor源码解读03的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。