Java 并发编程——Executor框架和线程池原理
Java 并發(fā)編程系列文章
Java 并發(fā)基礎(chǔ)——線程安全性
Java 并發(fā)編程——Callable+Future+FutureTask
java 并發(fā)編程——Thread 源碼重新學(xué)習(xí)
java并發(fā)編程——通過ReentrantLock,Condition實(shí)現(xiàn)銀行存取款
Java并發(fā)編程——BlockingQueue
Java 并發(fā)編程——Executor框架和線程池原理
?
Eexecutor作為靈活且強(qiáng)大的異步執(zhí)行框架,其支持多種不同類型的任務(wù)執(zhí)行策略,提供了一種標(biāo)準(zhǔn)的方法將任務(wù)的提交過程和執(zhí)行過程解耦開發(fā),基于生產(chǎn)者-消費(fèi)者模式,其提交任務(wù)的線程相當(dāng)于生產(chǎn)者,執(zhí)行任務(wù)的線程相當(dāng)于消費(fèi)者,并用Runnable來表示任務(wù),Executor的實(shí)現(xiàn)還提供了對(duì)生命周期的支持,以及統(tǒng)計(jì)信息收集,應(yīng)用程序管理機(jī)制和性能監(jiān)視等機(jī)制。
下面這段代碼中將多個(gè)任務(wù)放到了線程池中執(zhí)行:
static class MyRunnable implements Runnable{@Overridepublic void run() {for(int i=0;i<100;i++)System.out.println(Thread.currentThread().getName()+": "+i);} }static class MyThread extends Thread {public MyThread(String in){super(in);}@Overridepublic void run(){System.out.println(Thread.currentThread().getName());} }public static void main(String[] args) {ExecutorService pool3 = Executors.newFixedThreadPool(2);pool3.execute(new MyThread("t1"));pool3.execute(new MyThread("t2"));pool3.execute(new MyThread("t3"));pool3.execute(new MyThread("t4"));pool3.shutdown();ExecutorService pool2 = Executors.newFixedThreadPool(2);pool2.submit(new MyRunnable());pool2.submit(new MyRunnable());pool2.shutdownNow(); }Executor:一個(gè)接口,其定義了一個(gè)接收Runnable對(duì)象的方法executor,其方法簽名為executor(Runnable command),
ExecutorService:是一個(gè)比Executor使用更廣泛的子類接口,其提供了生命周期管理的方法,以及可跟蹤一個(gè)或多個(gè)異步任務(wù)執(zhí)行狀況返回Future的方法
AbstractExecutorService:ExecutorService執(zhí)行方法的默認(rèn)實(shí)現(xiàn)
ScheduledExecutorService:一個(gè)可定時(shí)調(diào)度任務(wù)的接口
ScheduledThreadPoolExecutor:ScheduledExecutorService的實(shí)現(xiàn),一個(gè)可定時(shí)調(diào)度任務(wù)的線程池
ThreadPoolExecutor:線程池,可以通過調(diào)用Executors以下靜態(tài)工廠方法來創(chuàng)建線程池并返回一個(gè)ExecutorService對(duì)象:
1. Executor 接口
Executor是java.util.concurrent 包下的一個(gè)接口。 public interface Executor {/*** Executes the given command at some time in the future. The command* may execute in a new thread, in a pooled thread, or in the calling* thread, at the discretion of the {@code Executor} implementation.** @param command the runnable task* @throws RejectedExecutionException if this task cannot be* accepted for execution* @throws NullPointerException if command is null*/void execute(Runnable command); }該接口十分簡單,只有一個(gè)執(zhí)行的方法 execute() 方法。
2. ExecutorService接口????????????????????????????????????????????????????????????????????????????????????
為了充分理解ExecutorService接口建議先了解:Java 并發(fā)編程——Callable+Future+FutureTask
public interface ExecutorService extends Executor;
void shutdown();
啟動(dòng)一次順序關(guān)閉,執(zhí)行以前提交的任務(wù),但不接受新任務(wù)。
?
List<Runnable>? shutdownNow()
試圖停止所有正在執(zhí)行的活動(dòng)任務(wù),暫停處理正在等待的任務(wù),并返回等待執(zhí)行的任務(wù)列表。
?
boolean isShutdown()
*
* @return {@code true} if this executor has been shut down
*/
boolean isTerminated()
@return {@code true} if all tasks have completed following shut down boolean isTerminated()只有當(dāng)shutdown()或者shutdownnow()被調(diào)用,而且所有任務(wù)都執(zhí)行完成后才會(huì)返回true。
?
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
* request, or the timeout occurs, or the current thread is
* interrupted, whichever happens first.
?
<T> Future<T> submit(Callable<T> task)<T> Future<T> submit(Runnable task, T result)Future<?Future> submit(Runnable task)
都是提交一個(gè)任務(wù)等待執(zhí)行,只不過第二個(gè)函數(shù)Future.get()返回值為result。第三個(gè)函數(shù)Future.get()返回值為null<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
同時(shí)提交多個(gè)任務(wù),返回第一個(gè)執(zhí)行完成的結(jié)果。
?
ExecutorService中execute()函數(shù)和submit()函數(shù)的區(qū)別(開頭線程池使用的代碼中使用了這兩種方法執(zhí)行一個(gè)任務(wù))
1. 接收的參數(shù)不一樣,execute接口只能接收Runnable向,而submit接口可以接收多種類型的對(duì)象
2.返回值不同,execute沒有返回值,而submit返回一個(gè)Future對(duì)象
3.Exception處理
There is a difference when looking at exception handling. If your tasks throws an exception and if it was submitted with execute this exception will go to the uncaught exception handler (when you don't have provided one explicitly, the default one will just print the stack trace to System.err). If you submitted the task with submit any thrown exception, checked or not, is then part of the task's return status. For a task that was submitted with submit and that terminates with an exception, the Future.get will rethrow this exception, wrapped in an ExecutionException.
?
3. AbstractExecutorService???????????????????????????????????????????????????????????????????????????? ?
通過這個(gè)類的名字可能就大概知道了這個(gè)類的作用,它實(shí)現(xiàn)了ExecutorService中的部分方法,對(duì)于繼承它的類可以減少實(shí)現(xiàn)的代碼。該抽象類中并沒有存放任務(wù)或者線程的數(shù)組或者Collection所以,對(duì)線程隊(duì)列的具體管理AbstractExecutorService類并不涉及。
下面看一下里面幾個(gè)關(guān)鍵接口的實(shí)現(xiàn):
當(dāng)向線程池中提交一個(gè)任務(wù)時(shí),它實(shí)際上還是調(diào)用execute接口去執(zhí)行的,然后將執(zhí)行的結(jié)果(如果有的話)一個(gè)Future對(duì)象返回給任務(wù)提交方。
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask;}execute()的實(shí)現(xiàn)并沒有放在當(dāng)前的抽象類中實(shí)現(xiàn),而是讓子類去實(shí)現(xiàn)。
再看一下invokeAll的執(zhí)行過程:
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException {if (tasks == null)throw new NullPointerException();ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());try { // 將tasks轉(zhuǎn)換成Futrues類型 for (Callable<T> t : tasks) {RunnableFuture<T> f = newTaskFor(t);futures.add(f);execute(f);} ? // 執(zhí)行future中的get函數(shù) for (int i = 0, size = futures.size(); i < size; i++) {Future<T> f = futures.get(i);if (!f.isDone()) {try { f.get(); }catch (CancellationException ignore) {}catch (ExecutionException ignore) {}}}return futures;} catch (Throwable t) {cancelAll(futures);throw t;}}其它函數(shù)的實(shí)現(xiàn)原理基本相同,參考源碼。
4.?ThreadPoolExecutor???????????????????????????????????????????????????????????????????????????????????
public class ThreadPoolExecutor extends AbstractExecutorService
為什么需要線程池
使用線程的時(shí)候就去創(chuàng)建一個(gè)線程,這樣實(shí)現(xiàn)起來非常簡便,但是就會(huì)有一個(gè)問題:如果并發(fā)的線程數(shù)量很多,并且每個(gè)線程都是執(zhí)行一個(gè)時(shí)間很短的任務(wù)就結(jié)束了,這樣頻繁創(chuàng)建線程就會(huì)大大降低系統(tǒng)的效率,因?yàn)轭l繁創(chuàng)建線程和銷毀線程需要時(shí)間。
構(gòu)造方法
ThreadPoolExecutor是線程池的真正實(shí)現(xiàn),他通過構(gòu)造方法的一系列參數(shù),來構(gòu)成不同配置的線程池。常用的構(gòu)造方法有下面四個(gè):
ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)構(gòu)造方法參數(shù)說明
-
corePoolSize
核心線程數(shù),默認(rèn)情況下核心線程會(huì)一直存活,即使處于閑置狀態(tài)也不會(huì)受存keepAliveTime限制。除非將allowCoreThreadTimeOut設(shè)置為true。
?
-
maximumPoolSize
線程池所能容納的最大線程數(shù)。超過這個(gè)數(shù)的線程將被阻塞。當(dāng)任務(wù)隊(duì)列為沒有設(shè)置大小的LinkedBlockingDeque時(shí),這個(gè)值無效。
?
-
keepAliveTime
非核心線程的閑置超時(shí)時(shí)間,超過這個(gè)時(shí)間就會(huì)被回收。(當(dāng)線程數(shù)沒有超過核心線程數(shù)時(shí),這個(gè)時(shí)間沒有任何意義)
?
-
unit
指定keepAliveTime的單位,如TimeUnit.SECONDS。當(dāng)將allowCoreThreadTimeOut設(shè)置為true時(shí)對(duì)corePoolSize生效。
?
-
workQueue
線程池中的任務(wù)隊(duì)列.
常用的有三種隊(duì)列,SynchronousQueue,LinkedBlockingDeque,ArrayBlockingQueue。
?
-
threadFactory
線程工廠,提供創(chuàng)建新線程的功能。ThreadFactory是一個(gè)接口,只有一個(gè)方法。
當(dāng)線程池中的資源已經(jīng)全部使用,添加新線程被拒絕時(shí),會(huì)調(diào)用RejectedExecutionHandler的rejectedExecution方法。
下圖為線程池的主要結(jié)構(gòu):
一定要注意一個(gè)概念,即存在于線程池中容器的一定是Thread對(duì)象,而不是你要求運(yùn)行的任務(wù)(所以叫線程池而不叫任務(wù)池也不叫對(duì)象池);你要求運(yùn)行的任務(wù)將被線程池分配給某一個(gè)空閑的Thread運(yùn)行。
下面這例子很好的說明了ThreadpoolExecutor的用法:
public class TestThreadPoolExecutor {static class MyTask implements Runnable {private int taskNum;public MyTask(int num) {this.taskNum = num;}@Overridepublic void run() {System.out.println("正在執(zhí)行task "+taskNum);try {Thread.currentThread().sleep(4000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("task "+taskNum+"執(zhí)行完畢");}}public static void main(String[] args) {// 核心線程數(shù)為 5// 最大線程數(shù)為 10(最多同時(shí)運(yùn)行10個(gè)線程)// 非核心線程沒有任務(wù)執(zhí)行時(shí),最多等待200ms// 等待隊(duì)列ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,200,TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(5));for(int i=0;i<15;i++){executor.submit(new MyTask(i));System.out.println("線程池中線程數(shù)目:"+executor.getPoolSize()+",隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:"+executor.getQueue().size()+",已執(zhí)行玩別的任務(wù)數(shù)目:"+executor.getCompletedTaskCount());}} }執(zhí)行結(jié)果如下:
public class TestThreadPoolExecutor {static class MyTask implements Runnable {private int taskNum;public MyTask(int num) {this.taskNum = num;}@Overridepublic void run() {System.out.println("正在執(zhí)行task "+taskNum);try {Thread.currentThread().sleep(4000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("task "+taskNum+"執(zhí)行完畢");}}public static void main(String[] args) {// 核心線程數(shù)為 5// 最大線程數(shù)為 10(最多同時(shí)運(yùn)行10個(gè)線程)// 非核心線程沒有任務(wù)執(zhí)行時(shí),最多等待200ms// 等待隊(duì)列ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,200,TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(5));for(int i=0;i<15;i++){executor.submit(new MyTask(i));System.out.println("線程池中線程數(shù)目:"+executor.getPoolSize()+",隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:"+executor.getQueue().size()+",已執(zhí)行玩別的任務(wù)數(shù)目:"+executor.getCompletedTaskCount());}} } View Code這個(gè)例子很好的說明了線程池的執(zhí)行邏輯:當(dāng)核心線程池為滿時(shí),開啟線程執(zhí)行提交的任務(wù),當(dāng)核心線程池滿時(shí),將任務(wù)放到等待隊(duì)列中等待執(zhí)行。當(dāng)隊(duì)列已滿時(shí),開啟線程去執(zhí)行任務(wù)(沒有達(dá)到最大線程數(shù)10),當(dāng)開啟線程執(zhí)行完成后執(zhí)行等待隊(duì)列中的線程。
線程池中 核心線程、線程隊(duì)列、最大線程的運(yùn)行準(zhǔn)則:
1、首先可以通過線程池提供的submit()方法或者execute()方法,要求線程池執(zhí)行某個(gè)任務(wù)。線程池收到這個(gè)要求執(zhí)行的任務(wù)后,會(huì)有幾種處理情況:
1.1、如果當(dāng)前線程池中運(yùn)行的線程數(shù)量還沒有達(dá)到corePoolSize大小時(shí),線程池會(huì)創(chuàng)建一個(gè)新的線程運(yùn)行你的任務(wù),無論之前已經(jīng)創(chuàng)建的線程是否處于空閑狀態(tài)。
1.2、如果當(dāng)前線程池中運(yùn)行的線程數(shù)量已經(jīng)達(dá)到設(shè)置的corePoolSize大小,線程池會(huì)把你的這個(gè)任務(wù)加入到等待隊(duì)列中。直到某一個(gè)的線程空閑了,線程池會(huì)根據(jù)設(shè)置的等待隊(duì)列規(guī)則,從隊(duì)列中取出一個(gè)新的任務(wù)執(zhí)行。
1.3、如果根據(jù)隊(duì)列規(guī)則,這個(gè)任務(wù)無法加入等待隊(duì)列。這時(shí)線程池就會(huì)創(chuàng)建一個(gè)“非核心線程”直接運(yùn)行這個(gè)任務(wù)。注意,如果這種情況下任務(wù)執(zhí)行成功,那么當(dāng)前線程池中的線程數(shù)量一定大于corePoolSize。
1.4、如果這個(gè)任務(wù),無法被“核心線程”直接執(zhí)行,又無法加入等待隊(duì)列,又無法創(chuàng)建“非核心線程”直接執(zhí)行,且你沒有為線程池設(shè)置RejectedExecutionHandler。這時(shí)線程池會(huì)拋出RejectedExecutionException異常,即線程池拒絕接受這個(gè)任務(wù)。(實(shí)際上拋出RejectedExecutionException異常的操作,是ThreadPoolExecutor線程池中一個(gè)默認(rèn)的RejectedExecutionHandler實(shí)現(xiàn):AbortPolicy)
2、一旦線程池中某個(gè)線程完成了任務(wù)的執(zhí)行,它就會(huì)試圖到任務(wù)等待隊(duì)列中拿去下一個(gè)等待任務(wù)(所有的等待任務(wù)都實(shí)現(xiàn)了BlockingQueue接口,按照接口字面上的理解,這是一個(gè)可阻塞的隊(duì)列接口),它會(huì)調(diào)用等待隊(duì)列的poll()方法,并停留在哪里。
3、當(dāng)線程池中的線程超過你設(shè)置的corePoolSize參數(shù),說明當(dāng)前線程池中有所謂的“非核心線程”。那么當(dāng)某個(gè)線程處理完任務(wù)后,如果等待keepAliveTime時(shí)間后仍然沒有新的任務(wù)分配給它,那么這個(gè)線程將會(huì)被回收。線程池回收線程時(shí),對(duì)所謂的“核心線程”和“非核心線程”是一視同仁的,直到線程池中線程的數(shù)量等于你設(shè)置的corePoolSize參數(shù)時(shí),回收過程才會(huì)停止。
這里關(guān)于線程池的具體實(shí)現(xiàn)先留個(gè)坑,到時(shí)候再填。
5. 線程池管理工具 Executors????????????????????????????????????????????????????????????????????????????
Executors是線程池創(chuàng)建和使用的工具類,它的所有方法都是static的。
- 生成一個(gè)固定大小的線程池:
最大線程數(shù)設(shè)置為與核心線程數(shù)相等,此時(shí) keepAliveTime 設(shè)置為 0(因?yàn)檫@里它是沒用的,即使不為 0,線程池默認(rèn)也不會(huì)回收 corePoolSize 內(nèi)的線程),任務(wù)隊(duì)列采用 LinkedBlockingQueue,無界隊(duì)列。
過程分析:剛開始,每提交一個(gè)任務(wù)都創(chuàng)建一個(gè) worker,當(dāng) worker 的數(shù)量達(dá)到 nThreads 后,不再創(chuàng)建新的線程,而是把任務(wù)提交到 LinkedBlockingQueue 中,而且之后線程數(shù)始終為 nThreads。
- 生成只有一個(gè)線程的固定線程池,這個(gè)更簡單,和上面的一樣,只要設(shè)置線程數(shù)為 1 就可以了:
- 生成一個(gè)需要的時(shí)候就創(chuàng)建新的線程,同時(shí)可以復(fù)用之前創(chuàng)建的線程(如果這個(gè)線程當(dāng)前沒有任務(wù))的線程池:
核心線程數(shù)為 0,最大線程數(shù)為 Integer.MAX_VALUE,keepAliveTime 為 60 秒,任務(wù)隊(duì)列采用 SynchronousQueue。
這種線程池對(duì)于任務(wù)可以比較快速地完成的情況有比較好的性能。如果線程空閑了 60 秒都沒有任務(wù),那么將關(guān)閉此線程并從線程池中移除。所以如果線程池空閑了很長時(shí)間也不會(huì)有問題,因?yàn)殡S著所有的線程都會(huì)被關(guān)閉,整個(gè)線程池不會(huì)占用任何的系統(tǒng)資源。
過程分析:我把 execute 方法的主體黏貼過來,讓大家看得明白些。鑒于 corePoolSize 是 0,那么提交任務(wù)的時(shí)候,直接將任務(wù)提交到隊(duì)列中,由于采用了 SynchronousQueue,所以如果是第一個(gè)任務(wù)提交的時(shí)候,offer 方法肯定會(huì)返回 false,因?yàn)榇藭r(shí)沒有任何 worker 對(duì)這個(gè)任務(wù)進(jìn)行接收,那么將進(jìn)入到最后一個(gè)分支來創(chuàng)建第一個(gè) worker。之后再提交任務(wù)的話,取決于是否有空閑下來的線程對(duì)任務(wù)進(jìn)行接收,如果有,會(huì)進(jìn)入到第二個(gè) if 語句塊中,否則就是和第一個(gè)任務(wù)一樣,進(jìn)到最后的 else if 分支。
?
參考:
https://www.cnblogs.com/MOBIN/p/5436482.html
ExectorService 中invokeeAll 接口說明: https://blog.csdn.net/baidu_23086307/article/details/51740852
線程池使用: https://blog.csdn.net/qq_25806863/article/details/71126867
https://javadoop.com/2017/09/05/java-thread-pool/#toc6
https://blog.csdn.net/lipc_/article/details/52025993
轉(zhuǎn)載于:https://www.cnblogs.com/NeilZhang/p/8955126.html
《新程序員》:云原生和全面數(shù)字化實(shí)踐50位技術(shù)專家共同創(chuàng)作,文字、視頻、音頻交互閱讀總結(jié)
以上是生活随笔為你收集整理的Java 并发编程——Executor框架和线程池原理的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: c++学习4 -- 输入输出
- 下一篇: Java程序员必备的Intellij插件