Java线程池介绍
根據(jù)摩爾定律(Moore’s law),集成電路晶體管的數(shù)量差不多每?jī)赡昃蜁?huì)翻一倍。但是晶體管數(shù)量指數(shù)級(jí)的增長(zhǎng)不一定會(huì)導(dǎo)致 CPU 性能的指數(shù)級(jí)增長(zhǎng)。處理器制造商花了很多年來提高時(shí)鐘頻率和指令并行。在新一代的處理器上,單線程程序的執(zhí)行速率確實(shí)有所提高。但是,時(shí)鐘頻率不可能無限制地提高,如處理器 AMD FX-9590 的時(shí)鐘頻率達(dá)到5 GHz,這已經(jīng)非常困難了。如今處理器制造商更喜歡采用多核處理器(multi-core processors)。擁有4核的智能手機(jī)已經(jīng)非常普遍,更不用提手提電腦和臺(tái)式機(jī)。結(jié)果,軟件不得不采用多線程的方式,以便能夠更好的使用硬件。線程池可以幫助程序員更好地利用多核 CPU。
?
線程池
?
好的軟件設(shè)計(jì)不建議手動(dòng)創(chuàng)建和銷毀線程。線程的創(chuàng)建和銷毀是非常耗 CPU 和內(nèi)存的,因?yàn)檫@需要 JVM 和操作系統(tǒng)的參與。64位 JVM 默認(rèn)線程棧是大小1 MB。這就是為什么說在請(qǐng)求頻繁時(shí)為每個(gè)小的請(qǐng)求創(chuàng)建線程是一種資源的浪費(fèi)。線程池可以根據(jù)創(chuàng)建時(shí)選擇的策略自動(dòng)處理線程的生命周期。重點(diǎn)在于:在資源(如內(nèi)存、CPU)充足的情況下,線程池沒有明顯的優(yōu)勢(shì),否則沒有線程池將導(dǎo)致服務(wù)器奔潰。有很多的理由可以解釋為什么沒有更多的資源。例如,在拒絕服務(wù)(denial-of-service)攻擊時(shí)會(huì)引起的許多線程并行執(zhí)行,從而導(dǎo)致線程饑餓(thread starvation)。除此之外,手動(dòng)執(zhí)行線程時(shí),可能會(huì)因?yàn)楫惓?dǎo)致線程死亡,程序員必須記得處理這種異常情況。
?
即使在你的應(yīng)用中沒有顯式地使用線程池,但是像 Tomcat、Undertow這樣的web服務(wù)器,都大量使用了線程池。所以了解線程池是如何工作的,怎樣調(diào)整,對(duì)系統(tǒng)性能優(yōu)化非常有幫助。
?
線程池可以很容易地通過 Executors 工廠方法來創(chuàng)建。JDK 中實(shí)現(xiàn) ExecutorService 的類有:
?
-
ForkJoinPool
-
ThreadPoolExecutor
-
ScheduledThreadPoolExecutor
?
這些類都實(shí)現(xiàn)了線程池的抽象。下面的一小段代碼展示了 ExecutorService 的生命周期:
?
1 public List<Future<T>> executeTasks(Collection<Callable<T>> tasks) { 2 3 // create an ExecutorService 4 // 創(chuàng)建 ExecutorService 5 final ExecutorService executorService = Executors.newSingleThreadExecutor(); 6 7 // execute all tasks 8 // 執(zhí)行所有任務(wù) 9 final List<Future<T>> executedTasks = executorService.invokeAll(tasks); 10 11 // shutdown the ExecutorService after all tasks have completed 12 // 所有任務(wù)執(zhí)行完后關(guān)閉 ExecutorService 13 executorService.shutdown(); 14 15 return executedTasks; 16 17 }?
?
首先,創(chuàng)建一個(gè)最簡(jiǎn)單的 ExecutorService —— 一個(gè)單線程的執(zhí)行器(executor)。它用一個(gè)線程來處理所有的任務(wù)。當(dāng)然,你也可以通過各種方式自定義 ExecutorService,或者使用 Executors 類的工程方法來創(chuàng)建 ExecutorService:
?
newCachedThreadPool() :創(chuàng)建一個(gè) ExecutorService,該 ExecutorService 根據(jù)需要來創(chuàng)建線程,可以重復(fù)利用已存在的線程來執(zhí)行任務(wù)。
?
newFixedThreadPool(int numberOfThreads) :創(chuàng)建一個(gè)可重復(fù)使用的、固定線程數(shù)量的 ExecutorService。
?
newScheduledThreadPool(int corePoolSize):根據(jù)時(shí)間計(jì)劃,延遲給定時(shí)間后創(chuàng)建 ExecutorService(或者周期性地創(chuàng)建 ExecutorService)。
?
newSingleThreadExecutor():創(chuàng)建單個(gè)工作線程 ExecutorService。
?
newSingleThreadScheduledExecutor():根據(jù)時(shí)間計(jì)劃延遲創(chuàng)建單個(gè)工作線程 ExecutorService(或者周期性的創(chuàng)建)。
?
newWorkStealingPool():創(chuàng)建一個(gè)擁有多個(gè)任務(wù)隊(duì)列(以便減少連接數(shù))的 ExecutorService。
?
在上面這個(gè)例子里,所有的任務(wù)都只執(zhí)行一次,你也可以使用其他方法來執(zhí)行任務(wù):
?
-
void execute(Runnable)
-
Future submit(Callable)
-
Future submit(Runnable)
?
最后,關(guān)閉 executorService。Shutdown() 是一個(gè)非阻塞式方法。調(diào)用該方法后,ExecutorService 進(jìn)入“關(guān)閉模式(shutdown mode)”,在該模式下,之前提交的任務(wù)都會(huì)執(zhí)行完成,但是不會(huì)接收新的任務(wù)。如果想要等待任務(wù)執(zhí)行完成,需要調(diào)用 awaitTermination() 方法。
?
ExecutorService 是一個(gè)非常有用的工具,可以幫助我們很方便執(zhí)行所有的任務(wù)。它的好處在什么地方呢?我們不需要手動(dòng)創(chuàng)建工作線程。一個(gè)工作線程就是 ExecutorService 內(nèi)部使用的線程。值得注意的是,ExecutorService 管理線程的生命周期。它可以在負(fù)載增加的時(shí)候增加工作線程。另一方面,在一定周期內(nèi),它也可以減少空閑的線程。當(dāng)我們使用線程池的時(shí)候,我們就不再需要考慮線程本身。我們只需要考慮異步處理的任務(wù)。此外,當(dāng)出現(xiàn)不可預(yù)期的異常時(shí),我們不再需要重復(fù)創(chuàng)建線程,我們也不需要擔(dān)心當(dāng)一個(gè)線程執(zhí)行完任務(wù)后的重復(fù)使用問題。最后,一個(gè)任務(wù)提交以后,我們可以獲取一個(gè)未來結(jié)果的抽象——Future。當(dāng)然,在 Java 8中,我們可以使用更優(yōu)秀的 CompletableFuture,如何將一個(gè) Future 轉(zhuǎn)換為 CompletableFuture 已超出了本文所討論的范圍。但是請(qǐng)記住,只有提交的任務(wù)是一個(gè) Callable 時(shí),Future 才有意義,因?yàn)?Callable 有輸出結(jié)果,而 Runnable 沒有。
?
內(nèi)部組成
?
每個(gè)線程池由幾個(gè)模塊組成:
?
-
一個(gè)任務(wù)隊(duì)列,
-
一個(gè)工作線程的集合,
-
一個(gè)線程工廠,
-
管理線程狀態(tài)的元數(shù)據(jù)。
?
ExecutorService 接口有很多實(shí)現(xiàn),我們重點(diǎn)關(guān)注一下最常用的 ThreadPoolExecutor。實(shí)際上,newCachedThreadPool()、newFixedThreadPool() 和 newSingleThreadExecutor() 三個(gè)方法返回的都是 ThreadPoolExecutor 類的實(shí)例。如果要手動(dòng)創(chuàng)建一個(gè)ThreadPoolExecutor 類的實(shí)例,至少需要5個(gè)參數(shù):
?
-
int corePoolSize:線程池保存的線程數(shù)量。
-
int maximumPoolSize:線程的最大數(shù)量。
-
long keepAlive and TimeUnit unit:超出 corePoolSize 大小后,線程空閑的時(shí)間到達(dá)給定時(shí)間后將會(huì)關(guān)閉。
-
BlockingQueue workQueue:提交的任務(wù)將被放置在該隊(duì)列中等待執(zhí)行。
-
thread-pool
?
?
阻塞隊(duì)列
?
LinkedBlockingQueue 是調(diào)用 Executors 類中的方法生成 ThreadPoolExecutor 實(shí)例時(shí)使用的默認(rèn)隊(duì)列,PriorityBlockingQueue 實(shí)際上也是一個(gè)BlockingQueue,不過,根據(jù)設(shè)定的優(yōu)先級(jí)來處理任務(wù)也是一個(gè)棘手的問題。首先,提交一個(gè) Runnable 或 Callable 任務(wù),該任務(wù)被包裝成一個(gè) RunnableFuture,然后添加到隊(duì)列中,ProrityBlockingQueue 比較每個(gè)對(duì)象來決定執(zhí)行的優(yōu)先權(quán)(比較對(duì)象是包裝后的RunnableFuture而不是任務(wù)的內(nèi)容)。不僅如此,當(dāng) corePoolSize 大于1并且工作線程空閑時(shí),ThreadPoolExecutor 可能會(huì)根據(jù)插入順序來執(zhí)行,而不是 PriorityBlockingQueue 所期望的優(yōu)先級(jí)順序。
?
默認(rèn)情況下,ThreadPoolExecutor 的工作隊(duì)列(workQueue)是沒有邊界的。通常這是沒問題的,但是請(qǐng)記住,沒有邊界的工作隊(duì)列可能導(dǎo)致應(yīng)用出現(xiàn)內(nèi)存溢出(out of memory)錯(cuò)誤。如果要限制任務(wù)隊(duì)列的大小,可以設(shè)置 RejectionExecutionHandler。你可以自定義處理器或者從4個(gè)已有處理器(默認(rèn)AbortPolicy)中選擇一個(gè):
?
-
CallerRunsPolicy
-
AbortPolicy
-
DiscardPolicy
-
DiscardOldestPolicy
?
線程工廠
?
線程工廠通常用于創(chuàng)建自定義的線程。例如,你可以增加自定義的 Thread.UncaughtExceptionHandler 或者設(shè)置線程名稱。在下面的例子中,使用線程名稱和線程的序號(hào)來記錄未捕獲的異常。
?
1 public class LoggingThreadFactory implements ThreadFactory { 2 3 private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); 4 private static final String THREAD_NAME_PREFIX = "worker-thread-"; 5 private final AtomicInteger threadCreationCounter = new AtomicInteger(); 6 7 @Override 8 public Thread newThread(Runnable task) { 9 10 int threadNumber = threadCreationCounter.incrementAndGet(); 11 Thread workerThread = new Thread(task, THREAD_NAME_PREFIX + threadNumber); 12 13 workerThread.setUncaughtExceptionHandler(thread, throwable -> logger.error("Thread {} {}", thread.getName(), throwable)); 14 15 return workerThread; 16 17 } 18 }?
?
生產(chǎn)者消費(fèi)者實(shí)例
?
生產(chǎn)者消費(fèi)者是一種常見的同步多線程處理問題。在這個(gè)例子中,我們使用 ExecutorService 解決此問題。但是,這不是解決該問題的教科書例子。我們的目標(biāo)是演示線程池來處理所有的同步問題,從而程序員可以集中精力去實(shí)現(xiàn)業(yè)務(wù)邏輯。
?
Producer 定期的從數(shù)據(jù)庫(kù)獲取新的數(shù)據(jù)來創(chuàng)建任務(wù),并將任務(wù)提交給 ExecutorService。ExecutorService 管理的線程池中的一個(gè)工作線程代表一個(gè) Consumer,用于處理業(yè)務(wù)任務(wù)(如計(jì)算價(jià)格并返回給客戶)。
?
首先,我們使用 Spring 來配置:
?
1 @Configuration 2 public class ProducerConsumerConfiguration { 3 4 @Bean 5 public ExecutorService executorService() { 6 7 // single consumer 8 return Executors.newSingleThreadExecutor(); 9 } 10 11 // other beans such as a data source, a scheduler, etc. 12 13 }?
?
然后,建立一個(gè) Consumer 及一個(gè) ConsumerFactory。該工程方法通過生產(chǎn)者調(diào)用來創(chuàng)建一個(gè)任務(wù),在未來的某一個(gè)時(shí)間點(diǎn),會(huì)有一個(gè)工作線程執(zhí)行該任務(wù)。
?
1 public class Consumer implements Runnable { 2 3 private final BusinessTask businessTask; 4 private final BusinessLogic businessLogic; 5 6 public Consumer(BusinessTask businessTask, BusinessLogic businessLogic) { 7 8 this.businessTask = businessTask; 9 this.businessLogic = businessLogic; 10 11 } 12 13 @Override 14 public void run() { 15 16 businessLogic.processTask(businessTask); 17 } 18 19 } 20 21 @Component 22 public class ConsumerFactory { 23 24 private final BusinessLogic businessLogic; 25 26 public ConsumerFactory(BusinessLogic businessLogic) { 27 this.businessLogic = businessLogic; 28 } 29 30 public Consumer newConsumer(BusinessTask businessTask) { 31 return new Consumer(businessTask, businessLogic); 32 } 33 34 }?
?
最后,有一個(gè) Producer 類,用于從數(shù)據(jù)庫(kù)中獲取數(shù)據(jù)并創(chuàng)建業(yè)務(wù)任務(wù)。在這個(gè)例子中,我們假定 fetchData() 是通過 scheduler 周期性調(diào)用的。
?
1 @Component 2 public class Producer { 3 4 private final DataRepository dataRepository; 5 private final ExecutorService executorService; 6 private final ConsumerFactory consumerFactory; 7 8 @Autowired 9 public Producer(DataRepository dataRepository, ExecutorService executorService, 10 11 ConsumerFactory consumerFactory) { 12 13 this.dataRepository = dataRepository; 14 this.executorService = executorService; 15 this.consumerFactory = consumerFactory; 16 17 } 18 19 public void fetchAndSubmitForProcessing() { 20 21 List<Data> data = dataRepository.fetchNew(); 22 23 data.stream() 24 // create a business task from data fetched from the database 25 .map(BusinessTask::fromData) 26 // create a consumer for each business task 27 .map(consumerFactory::newConsumer) 28 // submit the task for further processing in the future (submit is a non-blocking method) 29 .forEach(executorService::submit); 30 31 } 32 }?
?
非常感謝 ExecutorService,這樣我們就可以集中精力實(shí)現(xiàn)業(yè)務(wù)邏輯,我們不需要擔(dān)心同步問題。上面的演示代碼只用了一個(gè)生產(chǎn)者和一個(gè)消費(fèi)者。但是,很容易擴(kuò)展為多個(gè)生產(chǎn)者和多個(gè)消費(fèi)者的情況。
?
總結(jié)
?
JDK 5 誕生于2004年,提供很多有用的并發(fā)工具,ExecutorService 類就是其中的一個(gè)。線程池通常應(yīng)用于服務(wù)器的底層(如 Tomcat 和 Undertow)。當(dāng)然,線程池也不僅僅局限于服務(wù)器環(huán)境。在任何密集并行(embarrassingly parallel)難題中它們都非常有用。由于現(xiàn)在越來越多的軟件運(yùn)行于多核系統(tǒng)上,線程池就更值得關(guān)注了。
轉(zhuǎn)載于:https://www.cnblogs.com/kangye1014/p/4993961.html
總結(jié)
- 上一篇: hdu 1233 最小生成树
- 下一篇: 漫谈Java IO之 Netty与NIO