Java并发编程实战————Executor框架与任务执行
引言
本篇博客介紹通過“執行任務”的機制來設計應用程序時需要掌握的一些知識。所有的內容均提煉自《Java并發編程實戰》中第六章的內容。
大多數并發應用程序都是圍繞“任務執行”來構造的:任務通常是一些抽象的且離散的工作單元。
當圍繞“任務執行”來設計應用程序結構時,第一步,就是要找出清晰的任務邊界。在理想情況下,各個任務之間是相互獨立的:任務并不依賴于其他任務的狀態、結果或邊界效應。
大多數服務器應用程序提供了一種自然的任務邊界選擇方式:以獨立的客戶請求為邊界。
不好的例子:串行與“為每個任務創建線程”
在書中6.1節,介紹了由最簡單的串行執行任務到為每個任務創建一個線程這兩種執行任務的方式。應該說這兩種方式都是不可取的。
這一節主要是為了引出下一節介紹的“任務執行框架”。
其中“串行執行任務”的缺點是在一般的服務器應用程序中,無法提高吞吐率或快速響應性。
而“為每個任務創建線程”的方式的問題在于可能導致:高性能開銷、高資源消耗、影響穩定性。
【重點】在工作或面試中也會遇到這個極富針對性的問題,即大量創建線程會存在哪些問題?
1、高性能開銷:創建和銷毀都需要一定的代價,創建過程需要時間,延遲處理請求,也需要jvm和操作系統提供一些輔助操作。
2、高資源消耗:活躍的線程會消耗系統資源,尤其是內存。當可運行的線程數量多余可用處理器的數量,那么會有大量空閑的線程占用內存,不僅給垃圾回收帶來壓力,在競爭CPU的時候還將產生額外的性能開銷。
3、影響穩定性:大量線程占用內存,內存不足,導致可能拋出OutOfMemoryError,系統崩潰。
線程數量的限制
書中在這里簡單引出一個概念:穩定性。
根據前后文的聯系,這里具體指的是:應用程序不會因為線程過多而拋出OutOfMemoryError異常。
為了達到這種穩定性,在可創建線程數量上存在一個限制。這個限制受平臺以及多個因素影響,包括JVM啟動參數、Thread構造函數中請求的棧大小、底層操作系統對線程的限制等。例如,在32位機器上,其中一個主要的限制因素是線程棧的地址空間。每個線程都維護兩個執行棧,一個用于Java代碼,另一個用于原生代碼。
通常,JVM在默認情況下會生成一個復合棧,大小約0.5M~1M(這個值可以通過JVM標志 -Xss或通過Thread的構造函數來修改),那么:線程數量 ≈ 2^32(bit) / 0.5(MB) ≈幾千或幾萬。
因此,在一定范圍內,增加線程可以提高系統的吞吐率,但如果超出這個范圍,再創建更多的線程只會降低程序的執行速度。
Executor接口
public?interface?Executor?{??void?execute(Runnable?command);?? }??Executor是一個非常簡單的接口,只有一個execute(Runnable) 方法,它是其他的靈活且強大的異步任務框架的基礎。通過這種方式,用Runnable來表示任務,可以將任務的提交過程與執行過程解耦。
Executor本身就是基于生產者消費者,提交任務相當于生產者,執行任務相當于消費者,因此,如果要在程序中實現一個生產者-消費者的設計,那么最簡單的方式通常就是使用Executor。
什么是執行策略?
執行策略,定義了任務執行的“what、where、when、how”等方面,主要是描述根據不同的資源而選擇不同的執行方式,一個最優執行策略應當是與硬件資源最匹配的。
線程池
先來看一下四種常用線程池的創建:
ExecutorService?newFixedThreadPool?=?Executors.newFixedThreadPool(10);?? ExecutorService?newCachedThreadPool?=?Executors.newCachedThreadPool();?? ScheduledExecutorService?newScheduledThreadPool?=?Executors.newScheduledThreadPool(10);?? ExecutorService?newSingleThreadExecutor?=?Executors.newSingleThreadExecutor();其中:ExecutorService extends Executor,ScheduledExecutorService extends ExecutorService 。?
1、newFixedThreadPool(int) :創建一個定額線程池,每提交一個任務創建一個線程,達到數量限制后不再增加,這時線程池的規模將不再變化(如果某個線程由于發生了未預期的異常而結束,那么線程池會補充一個新的線程)
2、NewCachedThreadPool() : 創建一個可緩存的線程池,線程池的規模不存在任何限制,當線程多余任務時,回收空閑線程;當任務增加時,創建新線程。
3、NewSingleThreadExecutor:單線程的Executor,如果這個線程異常結束,會創建另一個線程來替代。NewSingleThreadExecutor能確保依照任務在隊列中的順序串行執行(例如FIFO、LIFO、優先級)。
4、NewScheduleThreadPool:創建一個固定長度的線程池,而且以延遲或定時的方式來執行任務,類似于Timer。
Executor的生命周期
?JVM只有在所有(非守護)線程全部終止后才會退出,無法正確地關閉Executor,JVM將無法結束。
Executor以異步的方式來執行任務,導致了提交任務的狀態不是立即可見的,即有些任務可能已經完成,有些可能正在執行,還有些可能正在隊列中等待執行。
ExecutorSevice接口就是為了解決執行服務的生命周期問題,擴展了Executor接口。它添加了一些用于聲明周期管理的方法(同時還有一些用于任務提交的便利方法):
public?interface?ExecutorService?extends?Executor?{??void?shutdown();??List<Runnable>?shutdownNow();??boolean?isShutdown();??boolean?isTerminated();??boolean?awaitTermination(long?timeout,?TimeUnit?unit)??throws?InterruptedException;??//?......其他用于任務提交的便利方法?? }??這五個方法是聲明周期管理的方法,其余的都是與任務提交相關的方法,比如,可以提交比較大的集合Callable對象的方法:
invokeAll(Collection<? extends Callable<T>> tasks)【重點】ExecutorService的三種狀態:運行、關閉、已終止 。
ExecutorService在初始創建時處于運行狀態。shutdown()方法將執行平緩的關閉過程:不再接受新的任務,同時等待已經提交的任務執行完成——包括那些還未開始執行的任務。shutdownNow()方法將執行粗暴的關閉方式:它將嘗試取消所有運行中的任務,并且不再啟動隊列中尚未開始執行的任務。
延遲任務與周期任務
Timer類負責管理延遲任務以及周期任務,但它本身存在缺陷,因此通常要用ScheduleThreadPoolExecutor的構造函數或newScheduleThreadPool工廠方法來創建該類對象。
Timer的缺陷在于,Timer在執行所有定時任務時只會創建一個線程。如果某個任務的執行時間過長,那么將破壞其他TimerTask的定時精確性。
Timer還有一個問題就是,Timer線程不會捕獲異常,當TimerTask拋出未檢查異常時將終止定時線程。Timer也不會恢復線程的執行,而是會錯誤地任務整個Timer都被取消了。這就造成:已經被調度但尚未執行的TimerTask將不會再執行,新的任務也不會被調度。稱之為“線程泄漏”。
【重點】生命周期小結
Runnable和Callable等任務的生命周期:創建、提交、開始、完成、取消。
Future表示的就是一個任務的生命周期。
Thread的生命周期:創建、就緒、運行、阻塞、死亡(或結束)。
ExecutorService的生命周期(因為它繼承自Executor,因此也是Executor的生命周期):創建、運行、關閉、已終止。
Callable與Future
callable
Runnable有一個局限性是沒有返回值,也沒辦法拋出受檢異常。對于某些異步獲得結果的任務無法勝任,Callable應運而生。
它是Runnable的升級版,既可以使用Callable<Void>來達到Runnable一樣的效果,同時也可以使用Callable<T> 來指定返回結果。
創建Callable的方式有兩種:構造函數、靜態的封裝方法。
Callable<String>?callableTask?=?new?Callable<String>()?{??@Override??public?String?call()?throws?Exception?{??return?"this?is?a?callable?task....";??}?? };?Java 8 style:
Callable<String>?callableTask?=?()?->?{??return?"this?is?a?callable?task....";?? };靜態方法:Executors.callable(Runnable task, T result):
Callable<String>?call?=?Executors.callable(()?->?{System.out.println("this?is?a?runnable?task..."); },?"done!");Future
future表示一個任務的生命周期。主要提供了一些方法用于判斷任務處于哪個階段,還可以獲取任務的結果甚至是取消任務。它本身還有一層隱含意義是,任務的生命周期只能前進,不能后退,當一個任務處于“完成”狀態,就永遠停留在“完成”狀態上。這一點和ExecutorService的生命周期一樣。
Future接口:
public?interface?Future<V>?{??boolean?cancel(boolean?mayInterruptIfRunning);??boolean?isCancelled();??boolean?isDone();??V?get()?throws?InterruptedException,?ExecutionException;??V?get(long?timeout,?TimeUnit?unit)??throws?InterruptedException,?ExecutionException,?TimeoutException;?? }創建Future的方式通常是使用ExecutorService的submit()方法獲取返回值。如果想通過構造器的方式顯式地創建一個任務的生命周期管理對象,可以使用FutureTask。
FutureTask<String>?runnFutureTask?=?new?FutureTask<String>(runnable,?"done!");?? FutureTask<String> callFutureTask = new FutureTask<>(callable);FutureTask類實現了Runnable和Future兩個接口。
(說明:FutureTask是Java 5加入的類,Java 6又為它補充了一個新的RunnableFuture接口,Runnable接口和Future接口被提升到了RunnableFuture接口上,這更像是一種重構手段,我個人認為在實際開發中用途可能不及直接使用FutureTask)
由于FutureTask實現了Runnable接口,因此可以將它提交給Executor來執行,或者直接調用它的run方法。
是的,FutureTask的run()方法可以直接執行任務,而不需要什么start。
Future.get
get()方法的行為取決于任務的狀態(尚未開始、正在運行、已完成)如果任務已經完成,那么get會立即返回或拋出一個Exception;如果任務沒有完成,那么get將阻塞直到任務完成。如果任務拋出異常,那么get將該異常封裝成ExecutionException并重新拋出,可以通過getCause來進一步獲得被封裝的初始異常。如果任務被取消,那么get將拋出CancellationException。
異構任務并行化存在的局限
A與B兩個完全不同的任務通過并行方式可以實現小幅度的性能提升,但是如果想大幅度的提升存在一定的困難。因此,得出一個結論是,只有當大量相互獨立且同構的任務可以并發進行處理時,才能體現出真正的性能提升。
CompletionService與它的子類ExecutorCompletionService
CompletionService是Executor與BlockingQueue的融合。
回顧一下BlockingQueue的一些特性:
BlockingQueue接口是Queue的子接口,有兩個最主要的實現,LinkedBlockingQueue(無界隊列)和ArrayBlockingQueue(有界隊列)。take()或poll()方法都是BlockingQueue的取頭元素的方法,唯一不同的是當沒有可用的頭元素時,take會無限期等待(阻塞),poll可以設置一個超時時間,一旦超時,將返回null。
CompletionService是在任務執行的功能上加入了隊列的特性,很明顯是用于處理一批允許有返回值的任務。
用法:創建一個CompletionService(ExecutorCompletionService對象)。【ExecutorCompletionService的構造器允許我們傳入一個ExecutorService(用于采取不同的執行策略)和一個BlockingQueue(該參數可選,默認LinkedBlockingQueue)】然后可以將一組Callable任務提交給CompletionService來執行,然后使用類似隊列操作的take或poll方法來獲取已完成的結果,這些結果會在完成時被封裝為Future。
【擴展】ExecutorCompletionService的實現很簡單。首先通過構造函數創建一個BlockingQueue來保存計算結果,然后當計算完成時,調用FutureTask的done方法,放入隊列。展開:當提交某個任務時,該任務將首先包裝為一個QueueingFuture,這是FutureTask【回顧:FutureTask實現了Future、Runnable】的一個子類,QueueingFuture改寫了FutureTask的done方法——將結果放入BlockingQueue中。take和poll方法委托給BlockingQueue方法,這些方法會在得到結果之前阻塞。
為任務設置時限
有時候,如果某個任務無法在指定時間內完成,那么將不再需要它的結果,此時可以放棄這個任務。Future.get中支持這種需求:當結果可用時,它將立即返回,如果在指定時限內沒有計算出結果,那么拋出TimeoutException。
在使用時限任務時需要注意,當這些任務超市后應該立即停止,從而避免為繼續計算一個不再使用的結果而浪費計算資源。
【使用Future.get為單個任務設置時限,如果希望對一組任務設置計算時限,比如前面介紹的CompletionService,那么可以使用poll方法來設置執行時間】
invokeAll方法
ExecutorServie接口中有兩個重載的invokeAll方法:
<T>?List<Future<T>>?invokeAll(Collection<??extends?Callable<T>>?tasks)??throws?InterruptedException;?? <T>?List<Future<T>>?invokeAll(Collection<??extends?Callable<T>>?tasks,??long?timeout,?TimeUnit?unit)??throws?InterruptedException;invokeAll方法支持將多個任務提交到一個ExecutorService并獲得結果。invokeAll方法的參數為一組任務,并返回一組Future。invokeAll按照任務集合中迭代器的順序將所有的Future添加到返回的集合中,從而使調用者能夠將各個Future與其表示的Callable關聯起來。
當所有任務都執行完畢時,或者調用線程被中斷時,又或者超過指定時限時,invokeAll都會返回。當超過指定時限,任何還未完成的任務都會取消。當invokeAll返回后,每個任務要么正常完成,要么被取消,而客戶端代碼可以調用get或isCancelled來判斷究竟是何種情況。
第六章小結
通過圍繞任務執行來設計應用程序,可以簡化開發過程,并有助于實現并發。
Executor框架將任務提交與執行策略解耦開來,同時還支持多種不同類型的執行策略。當需要創建線程來執行任務時,可以考慮使用Executor。
要想在將應用程序分解為不同的任務時獲得最大的好處,必須定義清晰的任務邊界。某些應用程序中存在著比較明顯的任務邊界,而在其他一些程序中則需要進一步分析才能揭示出粒度更細的并行性。
總結
以上是生活随笔為你收集整理的Java并发编程实战————Executor框架与任务执行的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: LeetCode算法入门- Genera
- 下一篇: 当面试官问我————Java是值传递还是