日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

线程池源码分析

發布時間:2024/2/28 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 线程池源码分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

為什么要用線程池?

  • 降低系統資源消耗。

  • 提高線程可控性。

  • 如何創建使用線程池?

    JDK8提供了五種創建線程池的方法:

    1.創建一個定長線程池,可控制線程最大并發數,超出的線程會在隊列中等待。

    public?static?ExecutorService?newFixedThreadPool(int?nThreads)?{return?new?ThreadPoolExecutor(nThreads,?nThreads,0L,?TimeUnit.MILLISECONDS,new?LinkedBlockingQueue<Runnable>()); }

    2.(JDK8新增)會根據所需的并發數來動態創建和關閉線程。能夠合理的使用CPU進行對任務進行并發操作,所以適合使用在很耗時的任務。

    注意返回的是ForkJoinPool對象。

    public?static?ExecutorService?newWorkStealingPool(int?parallelism)?{return?new?ForkJoinPool(parallelism,ForkJoinPool.defaultForkJoinWorkerThreadFactory,null,?true); }

    什么是ForkJoinPool:

    public?ForkJoinPool(int?parallelism,ForkJoinWorkerThreadFactory?factory,UncaughtExceptionHandler?handler,boolean?asyncMode)?{this(checkParallelism(parallelism),checkFactory(factory),handler,asyncMode???FIFO_QUEUE?:?LIFO_QUEUE,"ForkJoinPool-"?+?nextPoolId()?+?"-worker-");checkPermission();}

    使用一個無限隊列來保存需要執行的任務,可以傳入線程的數量;不傳入,則默認使用當前計算機中可用的cpu數量;使用分治法來解決問題,使用fork()和join()來進行調用。

    3.創建一個可緩存的線程池,可靈活回收空閑線程,若無可回收,則新建線程。

    public?static?ExecutorService?newCachedThreadPool()?{return?new?ThreadPoolExecutor(0,?Integer.MAX_VALUE,60L,?TimeUnit.SECONDS,new?SynchronousQueue<Runnable>()); }

    4.創建一個單線程的線程池。

    public?static?ExecutorService?newSingleThreadExecutor()?{return?new?FinalizableDelegatedExecutorService(new?ThreadPoolExecutor(1,?1,0L,?TimeUnit.MILLISECONDS,new?LinkedBlockingQueue<Runnable>())); }

    5.創建一個定長線程池,支持定時及周期性任務執行。

    public?static?ScheduledExecutorService?newScheduledThreadPool(int?corePoolSize)?{return?new?ScheduledThreadPoolExecutor(corePoolSize); }

    execute方法:

    public void execute(Runnable command) {if (command == null)throw new NullPointerException();// clt記錄著runState和workerCountint c = ctl.get();//workerCountOf方法取出低29位的值,表示當前活動的線程數//然后拿線程數和 核心線程數做比較if (workerCountOf(c) < corePoolSize) {// 如果活動線程數<核心線程數// 添加到//addWorker中的第二個參數表示限制添加線程的數量是根據corePoolSize來判斷還是maximumPoolSize來判斷if (addWorker(command, true))// 如果成功則返回return;// 如果失敗則重新獲取 runState和 workerCountc = ctl.get();}// 如果當前線程池是運行狀態并且任務添加到隊列成功if (isRunning(c) && workQueue.offer(command)) {// 重新獲取 runState和 workerCountint recheck = ctl.get();// 如果不是運行狀態并且 if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)//第一個參數為null,表示在線程池中創建一個線程,但不去啟動// 第二個參數為false,將線程池的有限線程數量的上限設置為maximumPoolSizeaddWorker(null, false);}//再次調用addWorker方法,但第二個參數傳入為false,將線程池的有限線程數量的上限設置為maximumPoolSizeelse if (!addWorker(command, false))//如果失敗則拒絕該任務reject(command); }

    總結一下它的工作流程:

  • 當workerCount > corePoolSize,創建線程執行任務。

  • 當workerCount <= corePoolSize&&阻塞隊列workQueue未滿,把新的任務放入阻塞隊列。

  • 當workQueue已滿,并且workerCount >= corePoolSize,并且workerCount < maximumPoolSize,創建線程執行任務。

  • 當workQueue已滿,workerCount >= maximumPoolSize,采取拒絕策略,默認拒絕策略是直接拋異常。

  • addWorker方法?主要工作是在線程池中創建一個新的線程并執行

    private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();// 獲取運行狀態int rs = runStateOf(c);// Check if queue empty only if necessary.// 如果狀態值 >= SHUTDOWN (不接新任務&不處理隊列任務)// 并且 如果 !(rs為SHUTDOWN 且 firsTask為空 且 阻塞隊列不為空)if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))// 返回falsereturn false;for (;;) {//獲取線程數wcint wc = workerCountOf(c);// 如果wc大與容量 || core如果為true表示根據corePoolSize來比較,否則為maximumPoolSizeif (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// 增加workerCount(原子操作)if (compareAndIncrementWorkerCount(c))// 如果增加成功,則跳出break retry;// wc增加失敗,則再次獲取runStatec = ctl.get(); // Re-read ctl// 如果當前的運行狀態不等于rs,說明狀態已被改變,返回重新執行if (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 根據firstTask來創建Worker對象w = new Worker(firstTask);// 根據worker創建一個線程final Thread t = w.thread;if (t != null) {// new一個鎖final ReentrantLock mainLock = this.mainLock;// 加鎖mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.// 獲取runStateint rs = runStateOf(ctl.get());// 如果rs小于SHUTDOWN(處于運行)或者(rs=SHUTDOWN && firstTask == null)// firstTask == null證明只新建線程而不執行任務if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// 如果t活著就拋異常if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();// 否則加入worker(HashSet)//workers包含池中的所有工作線程。僅在持有mainLock時訪問。workers.add(w);// 獲取工作線程數量int s = workers.size();//largestPoolSize記錄著線程池中出現過的最大線程數量if (s > largestPoolSize)// 如果 s比它還要大,則將s賦值給它largestPoolSize = s;// worker的添加工作狀態改為true workerAdded = true;}} finally {mainLock.unlock();}// 如果worker的添加工作完成if (workerAdded) {// 啟動線程t.start();// 修改線程啟動狀態workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}// 返回線啟動狀態return workerStarted;

    為什么需要持有mainLock?

    因為workers是HashSet類型的,不能保證線程安全。

    那w = new Worker(firstTask);如何理解呢

    Worker.java

    private?final?class?Workerextends?AbstractQueuedSynchronizerimplements?Runnable

    可以看到它繼承了AQS并發框架還實現了Runnable。證明它還是一個線程任務類。那我們調用t.start()事實上就是調用了該類重寫的run方法。

    Worker為什么不使用ReentrantLock來實現呢?

    tryAcquire方法它是不允許重入的,而ReentrantLock是允許重入的。對于線程來說,如果線程正在執行是不允許其它鎖重入進來的。

    線程只需要兩個狀態,一個是獨占鎖,表明正在執行任務;一個是不加鎖,表明是空閑狀態。

    public?void?run()?{runWorker(this); }

    run方法又調用了runWorker方法:

    final?void?runWorker(Worker?w)?{//?拿到當前線程Thread?wt?=?Thread.currentThread();//?拿到當前任務Runnable?task?=?w.firstTask;//?將Worker.firstTask置空?并且釋放鎖w.firstTask?=?null;w.unlock();?//?allow?interruptsboolean?completedAbruptly?=?true;try?{//?如果task或者getTask不為空,則一直循環while?(task?!=?null?||?(task?=?getTask())?!=?null)?{//?加鎖w.lock();//?If?pool?is?stopping,?ensure?thread?is?interrupted;//?if?not,?ensure?thread?is?not?interrupted.??This//?requires?a?recheck?in?second?case?to?deal?with//?shutdownNow?race?while?clearing?interrupt//??return?ctl.get()?>=?stop?//?如果線程池狀態>=STOP?或者?(線程中斷且線程池狀態>=STOP)且當前線程沒有中斷//?其實就是保證兩點://?1.?線程池沒有停止//?2.?保證線程沒有中斷if?((runStateAtLeast(ctl.get(),?STOP)?||(Thread.interrupted()?&&runStateAtLeast(ctl.get(),?STOP)))?&&!wt.isInterrupted())//?中斷當前線程wt.interrupt();try?{//?空方法beforeExecute(wt,?task);Throwable?thrown?=?null;try?{//?執行run方法(Runable對象)task.run();}?catch?(RuntimeException?x)?{thrown?=?x;?throw?x;}?catch?(Error?x)?{thrown?=?x;?throw?x;}?catch?(Throwable?x)?{thrown?=?x;?throw?new?Error(x);}?finally?{afterExecute(task,?thrown);}}?finally?{//?執行完后,?將task置空,?完成任務++,?釋放鎖task?=?null;w.completedTasks++;w.unlock();}}completedAbruptly?=?false;}?finally?{//?退出工作processWorkerExit(w,?completedAbruptly);}

    總結一下runWorker方法的執行過程:

  • while循環中,不斷地通過getTask()方法從workerQueue中獲取任務

  • 如果線程池正在停止,則中斷線程。否則調用3.

  • 調用task.run()執行任務;

  • 如果task為null則跳出循環,執行processWorkerExit()方法,銷毀線程workers.remove(w);

  • 這個流程圖非常經典:

    ?

    總結

    以上是生活随笔為你收集整理的线程池源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。