【多线程】ThreadPoolExecutor 类的使用详解
ThreadPoolExecutor 構(gòu)造方法
ThreadPoolExecutor共4個(gè)構(gòu)造方法:
咱們直接看參數(shù)最多的7個(gè)參數(shù)分別代表:
一個(gè)任務(wù)通過(guò) execute(Runnable)方法被添加到線(xiàn)程池,任務(wù)就是一個(gè) Runnable類(lèi)型的對(duì)象,任務(wù)的執(zhí)行方法就是 Runnable類(lèi)型對(duì)象的run()方法。
一個(gè)任務(wù)通過(guò)execute(Runnable)方法欲添加到線(xiàn)程池時(shí)
- 如果此時(shí)線(xiàn)程池中的數(shù)量小于corePoolSize,即使線(xiàn)程池中的線(xiàn)程都處于空閑狀態(tài),也要?jiǎng)?chuàng)建新的線(xiàn)程來(lái)處理被添加的任務(wù)。
- 如果此時(shí)線(xiàn)程池中的數(shù)量等于 corePoolSize,但是緩沖隊(duì)列 workQueue未滿(mǎn),那么任務(wù)被放入緩沖隊(duì)列。
- 如果此時(shí)線(xiàn)程池中的數(shù)量大于corePoolSize,緩沖隊(duì)列workQueue滿(mǎn),并且線(xiàn)程池中的數(shù)量小于maximumPoolSize,建新的線(xiàn)程來(lái)處理被添加的任務(wù)。
- 如果此時(shí)線(xiàn)程池中的數(shù)量大于corePoolSize,緩沖隊(duì)列workQueue滿(mǎn),并且線(xiàn)程池中的數(shù)量等于maximumPoolSize,那么通過(guò) handler所指定的策略來(lái)處理此任務(wù)。
處理任務(wù)的優(yōu)先級(jí)為
- 核心線(xiàn)程corePoolSize、任務(wù)隊(duì)列workQueue、最大線(xiàn)程maximumPoolSize,如果三者都滿(mǎn)了,使用handler處理被拒絕的任務(wù)。
當(dāng)線(xiàn)程池中的線(xiàn)程數(shù)量大于 corePoolSize 時(shí),如果某線(xiàn)程空閑時(shí)間超過(guò) keepAliveTime,線(xiàn)程將被終止。這樣,線(xiàn)程池可以動(dòng)態(tài)的調(diào)整池中的線(xiàn)程數(shù)。
workQueue任務(wù)隊(duì)列
任務(wù)隊(duì)列,被添加到線(xiàn)程池中,但尚未被執(zhí)行的任務(wù);它一般分為直接提交隊(duì)列、有界任務(wù)隊(duì)列、無(wú)界任務(wù)隊(duì)列、優(yōu)先任務(wù)隊(duì)列幾種;
直接切換
設(shè)置為SynchronousQueue隊(duì)列,SynchronousQueue是一個(gè)特殊的BlockingQueue,它沒(méi)有容量,沒(méi)執(zhí)行一個(gè)插入操作就會(huì)阻塞,需要再執(zhí)行一個(gè)刪除操作才會(huì)被喚醒,反之每一個(gè)刪除操作也都要等待對(duì)應(yīng)的插入操作。
使用SynchronousQueue隊(duì)列,提交的任務(wù)不會(huì)被保存,總是會(huì)馬上提交執(zhí)行。如果用于執(zhí)行任務(wù)的線(xiàn)程數(shù)量小于maximumPoolSize,則嘗試創(chuàng)建新的進(jìn)程,如果達(dá)到maximumPoolSize設(shè)置的最大值,則根據(jù)你設(shè)置的handler執(zhí)行拒絕策略。因此這種方式你提交的任務(wù)不會(huì)被緩存起來(lái),而是會(huì)被馬上執(zhí)行,在這種情況下,你需要對(duì)你程序的并發(fā)量有個(gè)準(zhǔn)確的評(píng)估,才能設(shè)置合適的maximumPoolSize數(shù)量,否則很容易就會(huì)執(zhí)行拒絕策略
無(wú)界隊(duì)列
一般使用基于鏈表的阻塞隊(duì)列LinkedBlockingQueue。使用無(wú)界任務(wù)隊(duì)列,線(xiàn)程池的任務(wù)隊(duì)列可以無(wú)限制的添加新的任務(wù),而線(xiàn)程池創(chuàng)建的最大線(xiàn)程數(shù)量就是你corePoolSize設(shè)置的數(shù)量,也就是說(shuō)在這種情況下maximumPoolSize這個(gè)參數(shù)是無(wú)效的,哪怕你的任務(wù)隊(duì)列中緩存了很多未執(zhí)行的任務(wù),當(dāng)線(xiàn)程池的線(xiàn)程數(shù)達(dá)到corePoolSize后,就不會(huì)再增加了;若后續(xù)有新的任務(wù)加入,則直接進(jìn)入隊(duì)列等待,當(dāng)使用這種任務(wù)隊(duì)列模式時(shí),一定要注意你任務(wù)提交與處理之間的協(xié)調(diào)與控制,不然會(huì)出現(xiàn)隊(duì)列中的任務(wù)由于無(wú)法及時(shí)處理導(dǎo)致一直增長(zhǎng),直到最后資源耗盡的問(wèn)題。
使用有界隊(duì)列
一般使用ArrayBlockingQueue。使用該方式可以將線(xiàn)程池的最大線(xiàn)程數(shù)量限制為maximumPoolSize,這樣能夠降低資源的消耗,但同時(shí)這種方式也使得線(xiàn)程池對(duì)線(xiàn)程的調(diào)度變得更困難,因?yàn)榫€(xiàn)程池和隊(duì)列的容量都是有限的值,所以要想使線(xiàn)程池處理任務(wù)的吞吐率達(dá)到一個(gè)相對(duì)合理的范圍,又想使線(xiàn)程調(diào)度相對(duì)簡(jiǎn)單,并且還要盡可能的降低線(xiàn)程池對(duì)資源的消耗,就需要合理的設(shè)置這兩個(gè)數(shù)量。
使用ArrayBlockingQueue有界任務(wù)隊(duì)列,若有新的任務(wù)需要執(zhí)行時(shí),線(xiàn)程池會(huì)創(chuàng)建新的線(xiàn)程,直到創(chuàng)建的線(xiàn)程數(shù)量達(dá)到corePoolSize時(shí),則會(huì)將新的任務(wù)加入到等待隊(duì)列中。若等待隊(duì)列已滿(mǎn),即超過(guò)ArrayBlockingQueue初始化的容量,則繼續(xù)創(chuàng)建線(xiàn)程,直到線(xiàn)程數(shù)量達(dá)到maximumPoolSize設(shè)置的最大線(xiàn)程數(shù)量,若大于maximumPoolSize,則執(zhí)行拒絕策略。在這種情況下,線(xiàn)程數(shù)量的上限與有界任務(wù)隊(duì)列的狀態(tài)有直接關(guān)系,如果有界隊(duì)列初始容量較大或者沒(méi)有達(dá)到超負(fù)荷的狀態(tài),線(xiàn)程數(shù)將一直維持在corePoolSize以下,反之當(dāng)任務(wù)隊(duì)列已滿(mǎn)時(shí),則會(huì)以maximumPoolSize為最大線(xiàn)程數(shù)上限。
- 如果要想降低系統(tǒng)資源的消耗(包括CPU的使用率,操作系統(tǒng)資源的消耗,上下文環(huán)境切換的開(kāi)銷(xiāo)等), 可以設(shè)置較大的隊(duì)列容量和較小的線(xiàn)程池容量, 但這樣也會(huì)降低線(xiàn)程處理任務(wù)的吞吐量。
- 如果提交的任務(wù)經(jīng)常發(fā)生阻塞,那么可以考慮通過(guò)調(diào)用 setMaximumPoolSize() 方法來(lái)重新設(shè)定線(xiàn)程池的容量。
- 如果隊(duì)列的容量設(shè)置的較小,通常需要將線(xiàn)程池的容量設(shè)置大一點(diǎn),這樣CPU的使用率會(huì)相對(duì)的高一些。但如果線(xiàn)程池的容量設(shè)置的過(guò)大,則在提交的任務(wù)數(shù)量太多的情況下,并發(fā)量會(huì)增加,那么線(xiàn)程之間的調(diào)度就是一個(gè)要考慮的問(wèn)題,因?yàn)檫@樣反而有可能降低處理任務(wù)的吞吐量。
ThreadPoolExecutor內(nèi)部有實(shí)現(xiàn)4個(gè)拒絕策略,默認(rèn)為AbortPolicy策略
- CallerRunsPolicy:由調(diào)用execute方法提交任務(wù)的線(xiàn)程來(lái)執(zhí)行這個(gè)任務(wù)
- AbortPolicy:拋出異常RejectedExecutionException拒絕提交任務(wù)
- DiscardPolicy:直接拋棄任務(wù),不做任何處理
- DiscardOldestPolicy:去除任務(wù)隊(duì)列中的第一個(gè)任務(wù),重新提交
ThreadPoolExecutor 使用
1,當(dāng)池中正在運(yùn)行的線(xiàn)程數(shù)(包括空閑線(xiàn)程數(shù))小于corePoolSize時(shí),新建線(xiàn)程執(zhí)行任務(wù)
public static void main(String[] args) {ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1));// 任務(wù)1pool.execute(() -> {try {Thread.sleep(3 * 1000);System.out.println("--helloWorld_001--" + Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}});//任務(wù)2pool.execute(() -> System.out.println("--helloWorld_002--" + Thread.currentThread().getName()));}運(yùn)行結(jié)果:
–helloWorld_001–pool-1-thread-1
–helloWorld_002–pool-1-thread-2
結(jié)論:線(xiàn)程1 結(jié)束后 沒(méi)有繼續(xù)線(xiàn)程1 而是啟動(dòng)線(xiàn)程2
2,當(dāng)池中正在運(yùn)行的線(xiàn)程數(shù)(包括空閑線(xiàn)程數(shù))大于等于corePoolSize時(shí),新插入的任務(wù)進(jìn)入workQueue排隊(duì)(如果workQueue長(zhǎng)度允許),等待空閑線(xiàn)程來(lái)執(zhí)行。
public static void main(String[] args) {ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1));// 任務(wù)1pool.execute(() -> {try {Thread.sleep(3 * 1000);System.out.println("--helloWorld_001--" + Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}});// 任務(wù)2pool.execute(() -> {try {Thread.sleep(5 * 1000);System.out.println("--helloWorld_002--" + Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}});// 任務(wù)3pool.execute(() -> System.out.println("--helloWorld_003--" + Thread.currentThread().getName()));}運(yùn)行結(jié)果:
–helloWorld_001–pool-1-thread-1
–helloWorld_003–pool-1-thread-1
–helloWorld_002–pool-1-thread-2
結(jié)論:任務(wù)2在運(yùn)行過(guò)程中,任務(wù)3啟動(dòng)不會(huì)新建線(xiàn)程,因?yàn)橛幸粋€(gè)隊(duì)列是空的,maximumPoolSize=3這個(gè)參數(shù)不起作用。
3,當(dāng)隊(duì)列里的任務(wù)達(dá)到上限,并且池中正在進(jìn)行的線(xiàn)程小于maxinumPoolSize,對(duì)于新加入的任務(wù),新建線(xiàn)程。
public static void main(String[] args) {ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1));// 任務(wù)1pool.execute(() -> {try {Thread.sleep(3 * 1000);System.out.println("--helloWorld_001--" + Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}});// 任務(wù)2pool.execute(() -> {try {Thread.sleep(5 * 1000);System.out.println("--helloWorld_002--" + Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}});// 任務(wù)3pool.execute(() -> System.out.println("--helloWorld_003--" + Thread.currentThread().getName()));// 任務(wù)4pool.execute(() -> System.out.println("--helloWorld_004--" + Thread.currentThread().getName()));}運(yùn)行結(jié)果:
–helloWorld_004–pool-1-thread-3
–helloWorld_003–pool-1-thread-3
–helloWorld_001–pool-1-thread-1
–helloWorld_002–pool-1-thread-2
結(jié)果:任務(wù)1,2啟動(dòng)后 任務(wù)3在隊(duì)列 , 隊(duì)列就滿(mǎn)了, 由于正在進(jìn)行的線(xiàn)程數(shù)是2 < maximumPoolSize,只能新建一個(gè)線(xiàn)程了 然后任務(wù)4就進(jìn)了新線(xiàn)程-3,任務(wù)4結(jié)束,隊(duì)列里的任務(wù)3在線(xiàn)程3 進(jìn)行。
4,隊(duì)列里的任務(wù)達(dá)到上限,并且池中正在運(yùn)行的線(xiàn)程等于maximumPoolSize,對(duì)于新加入的任務(wù),執(zhí)行拒絕策略(線(xiàn)程池默認(rèn)的策略是拋異常)。
public static void main(String[] args) {ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1));// 任務(wù)1pool.execute(() -> {try {Thread.sleep(3 * 1000);System.out.println("--helloWorld_001--" + Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}});// 任務(wù)2pool.execute(() -> {try {Thread.sleep(5 * 1000);System.out.println("--helloWorld_002--" + Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}});// 任務(wù)3pool.execute(() -> System.out.println("--helloWorld_003--" + Thread.currentThread().getName()));// 任務(wù)4pool.execute(() -> {try {Thread.sleep(2 * 1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("--helloWorld_004--" + Thread.currentThread().getName());});// 任務(wù)5pool.execute(() -> System.out.println("--helloWorld_005--" + Thread.currentThread().getName()));}運(yùn)行結(jié)果:
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task cs.wy.Thread.ThreadPoolExecutorTest$$Lambda$5/999966131@7699a589 rejected from java.util.concurrent.ThreadPoolExecutor@58372a00[Running, pool size = 3, active threads = 3, queued tasks = 1, completed tasks = 0]at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)at cs.wy.Thread.ThreadPoolExecutorTest.main(ThreadPoolExecutorTest.java:40) --helloWorld_004--pool-1-thread-3 --helloWorld_003--pool-1-thread-3 --helloWorld_001--pool-1-thread-1 --helloWorld_002--pool-1-thread-2結(jié)論:隊(duì)列達(dá)到上限,線(xiàn)程池達(dá)到最大值,故拋出異常。
關(guān)閉線(xiàn)程
分為兩種方式:
//平緩關(guān)閉,不允許新的線(xiàn)程加入,正在運(yùn)行的都跑完即可關(guān)閉。 pool.shutdown(); //暴力關(guān)閉。不允許新的線(xiàn)程加入,且直接停到正在進(jìn)行的線(xiàn)程。 pool.shutdownNow();總結(jié)
以上是生活随笔為你收集整理的【多线程】ThreadPoolExecutor 类的使用详解的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 【多线程】Synchronized及实现
- 下一篇: 【多线程】ThreadPoolExecu