日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Java 线程池框架核心代码分析

發(fā)布時間:2023/12/3 java 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java 线程池框架核心代码分析 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

轉載自?Java 線程池框架核心代碼分析

前言

多線程編程中,為每個任務分配一個線程是不現(xiàn)實的,線程創(chuàng)建的開銷和資源消耗都是很高的。線程池應運而生,成為我們管理線程的利器。Java 通過Executor接口,提供了一種標準的方法將任務的提交過程和執(zhí)行過程解耦開來,并用Runnable表示任務。

下面,我們來分析一下 Java 線程池框架的實現(xiàn)ThreadPoolExecutor。

下面的分析基于JDK1.7

生命周期

ThreadPoolExecutor中,使用CAPACITY的高3位來表示運行狀態(tài),分別是:

  • RUNNING:接收新任務,并且處理任務隊列中的任務
  • SHUTDOWN:不接收新任務,但是處理任務隊列的任務
  • STOP:不接收新任務,不處理任務隊列,同時中斷所有進行中的任務
  • TIDYING:所有任務已經(jīng)被終止,工作線程數(shù)量為 0,到達該狀態(tài)會執(zhí)行terminated()
  • TERMINATED:terminated()執(zhí)行完畢
  • ThreadPoolExecutor中用原子類來表示狀態(tài)位

    1private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    線程池模型

    核心參數(shù)

    • corePoolSize:最小存活的工作線程數(shù)量(如果設置allowCoreThreadTimeOut,那么該值為 0)
    • maximumPoolSize:最大的線程數(shù)量,受限于CAPACITY
    • keepAliveTime:對應線程的存活時間,時間單位由TimeUnit指定
    • workQueue:工作隊列,存儲待執(zhí)行的任務
    • RejectExecutionHandler:拒絕策略,線程池滿后會觸發(fā)

    線程池的最大容量:CAPACITY中的前三位用作標志位,也就是說工作線程的最大容量為(2^29)-1

    四種模型

    • CachedThreadPool:一個可緩存的線程池,如果線程池的當前規(guī)模超過了處理需求時,那么將回收空閑的線程,當需求增加時,則可以添加新的線程,線程池的規(guī)模不存在任何的限制。
    • FixedThreadPool:一個固定大小的線程池,提交一個任務時就創(chuàng)建一個線程,直到達到線程池的最大數(shù)量,這時線程池的大小將不再變化。
    • SingleThreadPool:一個單線程的線程池,它只有一個工作線程來執(zhí)行任務,可以確保按照任務在隊列中的順序來串行執(zhí)行,如果這個線程異常結束將創(chuàng)建一個新的線程來執(zhí)行任務。
    • ScheduledThreadPool:一個固定大小的線程池,并且以延遲或者定時的方式來執(zhí)行任務,類似于Timer。

    執(zhí)行任務 execute

    核心邏輯:

    1. 當前線程數(shù)量 < corePoolSize,直接開啟新的核心線程執(zhí)行任務addWorker(command, true)
    2. 當前線程數(shù)量 >= corePoolSize,且任務加入工作隊列成功

    • 檢查線程池當前狀態(tài)是否處于RUNNING
    • 如果否,則拒絕該任務
    • 如果是,判斷當前線程數(shù)量是否為 0,如果為 0,就增加一個工作線程。

    3. 開啟普通線程執(zhí)行任務addWorker(command, false),開啟失敗就拒絕該任務

    從上面的分析可以總結出線程池運行的四個階段:

  • poolSize < corePoolSize 且隊列為空,此時會新建線程來處理提交的任務
  • poolSize == corePoolSize,此時提交的任務進入工作隊列,工作線程從隊列中獲取任務執(zhí)行,此時隊列不為空且未滿。
  • poolSize == corePoolSize,并且隊列已滿,此時也會新建線程來處理提交的任務,但是poolSize < maxPoolSize
  • poolSize == maxPoolSize,并且隊列已滿,此時會觸發(fā)拒絕策略
  • 拒絕策略

    前面我們提到任務無法執(zhí)行會被拒絕,RejectedExecutionHandler是處理被拒絕任務的接口。下面是四種拒絕策略。

    • AbortPolicy:默認策略,終止任務,拋出RejectedException
    • CallerRunsPolicy:在調用者線程執(zhí)行當前任務,不拋異常
    • DiscardPolicy: 拋棄策略,直接丟棄任務,不拋異常
    • DiscardOldersPolicy:拋棄最老的任務,執(zhí)行當前任務,不拋異常

    線程池中的 Worker

    Worker繼承了AbstractQueuedSynchronizer和Runnable,前者給Worker提供鎖的功能,后者執(zhí)行工作線程的主要方法runWorker(Worker w)(從任務隊列撈任務執(zhí)行)。Worker 引用存在workers集合里面,用mainLock守護。

    12private final ReentrantLock mainLock = new ReentrantLock();private final HashSet<Worker> workers = new HashSet<Worker>();

    核心函數(shù) runWorker

    下面是簡化的邏輯,注意:每個工作線程的run都執(zhí)行下面的函數(shù)

    12345678910111213final void runWorker(Worker w) {????Thread wt = Thread.currentThread();????Runnable task = w.firstTask;????w.firstTask = null;????while (task != null || (task = getTask()) != null) {????????w.lock();????????beforeExecute(wt, task);????????task.run();????????afterExecute(task, thrown);????????w.unlock();????}????processWorkerExit(w, completedAbruptly);}
  • 從getTask()中獲取任務
  • 鎖住 worker
  • 執(zhí)行beforeExecute(wt, task),這是ThreadPoolExecutor提供給子類的擴展方法
  • 運行任務,如果該worker有配置了首次任務,則先執(zhí)行首次任務且只執(zhí)行一次。
  • 執(zhí)行afterExecute(task, thrown);
  • 解鎖 worker
  • 如果獲取到的任務為 null,關閉 worker
  • 獲取任務 getTask

    線程池內部的任務隊列是一個阻塞隊列,具體實現(xiàn)在構造時傳入。

    1private final BlockingQueue<Runnable> workQueue;

    getTask()從任務隊列中獲取任務,支持阻塞和超時等待任務,四種情況會導致返回null,讓worker關閉。

  • 現(xiàn)有的線程數(shù)量超過最大線程數(shù)量
  • 線程池處于STOP狀態(tài)
  • 線程池處于SHUTDOWN狀態(tài)且工作隊列為空
  • 線程等待任務超時,且線程數(shù)量超過保留線程數(shù)量
  • 核心邏輯:根據(jù)timed在阻塞隊列上超時等待或者阻塞等待任務,等待任務超時會導致工作線程被關閉。

    1234timed = allowCoreThreadTimeOut || wc > corePoolSize;Runnable r = timed ?????workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :????workQueue.take();

    在以下兩種情況下等待任務會超時:

  • 允許核心線程等待超時,即allowCoreThreadTimeOut(true)
  • 當前線程是普通線程,此時wc > corePoolSize
  • 工作隊列使用的是BlockingQueue,這里就不展開了,后面再寫一篇詳細的分析。

    總結

    • ThreadPoolExecutor基于生產者-消費者模式,提交任務的操作相當于生產者,執(zhí)行任務的線程相當于消費者。
    • Executors提供了四種基于ThreadPoolExecutor構造線程池模型的方法,除此之外,我們還可以直接繼承ThreadPoolExecutor,重寫beforeExecute和afterExecute方法來定制線程池任務執(zhí)行過程。
    • 使用有界隊列還是無界隊列需要根據(jù)具體情況考慮,工作隊列的大小和線程的數(shù)量也是需要好好考慮的。
    • 拒絕策略推薦使用CallerRunsPolicy,該策略不會拋棄任務,也不會拋出異常,而是將任務回退到調用者線程中執(zhí)行。

    總結

    以上是生活随笔為你收集整理的Java 线程池框架核心代码分析的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內容還不錯,歡迎將生活随笔推薦給好友。