大数据处理系列之(一)Java线程池使用
前言:最近在做分布式海量數(shù)據(jù)處理項目,使用到了java的線程池,所以搜集了一些資料對它的使用做了一下總結(jié)和探究,
前面介紹的東西大多都是從網(wǎng)上搜集整理而來。文中最核心的東西在于后面兩節(jié)無界隊列線程池和有界隊列線程池的實例
使用以及線上問題處理方案。????????????????????????????????
?
1.? 為什么要用線程池?
??????在Java中,如果每當(dāng)一個請求到達就創(chuàng)建一個新線程,開銷是相當(dāng)大的。在實際使用中,每個請求創(chuàng)建新線程的服務(wù)器
在創(chuàng)建和銷毀線程上花費的時間和消耗的系統(tǒng)資源,甚至可能要比花在實際處理實際的用戶請求的時間和資源要多的多。除
了創(chuàng)建和銷毀線程的開銷之外,活動的線程也需要消耗系統(tǒng)資源。如果在一個JVM中創(chuàng)建太多的線程,可能會導(dǎo)致系統(tǒng)由于
過度消耗內(nèi)存或者“切換過度”而導(dǎo)致系統(tǒng)資源不足。為了防止資源不足,服務(wù)器應(yīng)用程序需要一些辦法來限制任何給定時刻
處理的請求數(shù)目,盡可能減少創(chuàng)建和銷毀線程的次數(shù),特別是一些資源耗費比較大的線程的創(chuàng)建和銷毀,盡量利用已有對象
來進行服務(wù),這就是“池化資源”技術(shù)產(chǎn)生的原因。
??? ?線程池主要用來解決線程生命周期開銷問題和資源不足問題,通過對多個任務(wù)重用線程,線程創(chuàng)建的開銷被分攤到多個任
務(wù)上了,而且由于在請求到達時線程已經(jīng)存在,所以消除了創(chuàng)建所帶來的延遲。這樣,就可以立即請求服務(wù),使應(yīng)用程序響
應(yīng)更快。另外,通過適當(dāng)?shù)恼{(diào)整線程池中的線程數(shù)據(jù)可以防止出現(xiàn)資源不足的情況。
????? 網(wǎng)上找來的這段話,清晰的描述了為什么要使用線程池,使用線程池有哪些好處。工程項目中使用線程池的場景比比皆是。
本文關(guān)注的重點是如何在實戰(zhàn)中來使用好線程池這一技術(shù),來滿足海量數(shù)據(jù)大并發(fā)用戶請求的場景。
?
2. ThreadPoolExecutor類
?????? Java中的線程池技術(shù)主要用的是ThreadPoolExecutor 這個類。先來看這個類的構(gòu)造函數(shù),
ThreadPoolExecutor(int?corePoolSize, int?maximumPoolSize, long?keepAliveTime, TimeUnit?unit,
BlockingQueue<Runnable>?workQueue, ThreadFactory?threadFactory, RejectedExecutionHandler?handler)?
??? corePoolSize? ?????線程池維護線程的最少數(shù)量
??? maximumPoolSize ???線程池維護線程的最大數(shù)量?
??? keepAliveTime????? 線程池維護線程所允許的空閑時間??
??? workQueue? ????????任務(wù)隊列,用來存放我們所定義的任務(wù)處理線程
??? threadFactory ?????線程創(chuàng)建工廠
??? handler??????????? 線程池對拒絕任務(wù)的處理策略
??? ?ThreadPoolExecutor 將根據(jù) corePoolSize和 maximumPoolSize?設(shè)置的邊界自動調(diào)整池大小。當(dāng)新任務(wù)在方法
execute(Runnable) 中提交時, 如果運行的線程少于 corePoolSize,則創(chuàng)建新線程來處理請求,即使其他輔助線程是
空閑的。如果運行的線程多于 corePoolSize 而少于 maximumPoolSize,則僅當(dāng)隊列滿時才創(chuàng)建新線程。 如果設(shè)置的
corePoolSize 和 maximumPoolSize 相同,則創(chuàng)建了固定大小的線程池。
???? ThreadPoolExecutor是Executors類的實現(xiàn),Executors類里面提供了一些靜態(tài)工廠,生成一些常用的線程池,主
要有以下幾個:
???? newSingleThreadExecutor:創(chuàng)建一個單線程的線程池。這個線程池只有一個線程在工作,也就是相當(dāng)于單線程串行執(zhí)行
所有任務(wù)。如果這個唯一的線程因為異常結(jié)束,那么會有一個新的線程來替代它。此線程池保證所有任務(wù)的執(zhí)行順序按照任
務(wù)的提交順序執(zhí)行。 ?
???? newFixedThreadPool:創(chuàng)建固定大小的線程池。每次提交一個任務(wù)就創(chuàng)建一個線程,直到線程達到線程池的最大大小。線
程池的大小一旦達到最大值就會保持不變,如果某個線程因為執(zhí)行異常而結(jié)束,那么線程池會補充一個新線程。
???? newCachedThreadPool:創(chuàng)建一個可緩存的線程池。如果線程池的大小超過了處理任務(wù)所需要的線程,那么就會回收部分
空閑(60秒不執(zhí)行任務(wù))的線程,當(dāng)任務(wù)數(shù)增加時,此線程池又可以智能的添加新線程來處理任務(wù)。此線程池不會對線程池
大小做限制,線程池大小完全依賴于操作系統(tǒng)(或者說JVM)能夠創(chuàng)建的最大線程大小。
????? 在實際的項目中,我們會使用得到比較多的是newFixedThreadPool,創(chuàng)建固定大小的線程池,但是這個方法在真實的線上
環(huán)境中還是會有很多問題,這個將會在下面一節(jié)中詳細講到。
????? 當(dāng)任務(wù)源源不斷的過來,而我們的系統(tǒng)又處理不過來的時候,我們要采取的策略是拒絕服務(wù)。RejectedExecutionHandler接
口提供了拒絕任務(wù)處理的自定義方法的機會。在ThreadPoolExecutor中已經(jīng)包含四種處理策略。
????? 1)CallerRunsPolicy:線程調(diào)用運行該任務(wù)的 execute 本身。此策略提供簡單的反饋控制機制,能夠減緩新任務(wù)的提交速度。
???????? ?public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
???????????? if (!e.isShutdown()) {
??????????????? ?r.run();
??????????? }
??????? }
這個策略顯然不想放棄執(zhí)行任務(wù)。但是由于池中已經(jīng)沒有任何資源了,那么就直接使用調(diào)用該execute的線程本身來執(zhí)行。
???? 2)AbortPolicy:處理程序遭到拒絕將拋出運行時 RejectedExecutionException
??????? ?public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
??????????? ? throw new RejectedExecutionException();
??????? }
?這種策略直接拋出異常,丟棄任務(wù)。
????? 3)DiscardPolicy:不能執(zhí)行的任務(wù)將被刪除
??????????public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
?? 這種策略和AbortPolicy幾乎一樣,也是丟棄任務(wù),只不過他不拋出異常。
???? 4)DiscardOldestPolicy:如果執(zhí)行程序尚未關(guān)閉,則位于工作隊列頭部的任務(wù)將被刪除,然后重試執(zhí)行程序(如果再次失敗,
則重復(fù)此過程)
??????? public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
??????????? if (!e.isShutdown()) {
??????????????? e.getQueue().poll();
??????????????? e.execute(r);
??????????? }
??????? }
????? 該策略就稍微復(fù)雜一些,在pool沒有關(guān)閉的前提下首先丟掉緩存在隊列中的最早的任務(wù),然后重新嘗試運行該任務(wù)。這個策略
需要適當(dāng)小心。
?
3.? ThreadPoolExecutor無界隊列使用
?? public class ThreadPool {
??????? private final static String poolName = "mypool";
??????? static private ThreadPool threadFixedPool = new ThreadPool(2);
?????? private ExecutorService executor;
????? static public ThreadPool getFixedInstance() {
?????????? return threadFixedPool;
?????? }
??? private ThreadPool(int num) {
?????????? executor = Executors.newFixedThreadPool(num, new DaemonThreadFactory(poolName));
}
public void execute(Runnable r) {
?????????? executor.execute(r);
}
public static void main(String[] params) {
?????????? class MyRunnable implements Runnable {
??????????????????? public void run() {
???????????????????????????? System.out.println("OK!");
???????????????????????????? try {
?????????????????????????????????????? Thread.sleep(10);
???????????????????????????? } catch (InterruptedException e) {
?????????????????????????????????????? e.printStackTrace();
???????????????????????????? }
??????????????????? }
?????????? }
?????????? for (int i = 0; i < 10; i++) {
???????????? ThreadPool.getFixedInstance().execute(new MyRunnable());
?????????? }
?????????? try {
??????????????????? Thread.sleep(2000);
??????????????????? System.out.println("Process end.");
?????????? } catch (InterruptedException e) {
??????????????????? e.printStackTrace();
?????????? }
}
}
?????? 在這段代碼中,我們發(fā)現(xiàn)我們用到了Executors.newFixedThreadPool()函數(shù),這個函數(shù)的實現(xiàn)是這樣子的:
return?new?ThreadPoolExecutor(nThreads,?nThreads,?0L,?TimeUnit.MILLISECONDS,new?LinkedBlockingQueue<Runnable>());?
?????? 它實際上是創(chuàng)建了一個無界隊列的固定大小的線程池。執(zhí)行這段代碼,我們發(fā)現(xiàn)所有的任務(wù)都正常處理了。但是在真實的線上環(huán)
境中會存在這樣的一個問題,前端的用戶請求源源不斷的過來,后端的處理線程如果處理時間變長,無法快速的將用戶請求處理
完返回結(jié)果給前端,那么任務(wù)隊列中將堵塞大量的請求。這些請求在前端都是有超時時間設(shè)置的,假設(shè)請求是通過套接字過來,
當(dāng)我們的后端處理進程處理完一個請求后,從隊列中拿下一個任務(wù),發(fā)現(xiàn)這個任務(wù)的套接字已經(jīng)無效了,這是因為在用戶端已經(jīng)
超時,將套接字建立的連接關(guān)閉了。這樣一來我們這邊的處理程序再去讀取套接字時,就會發(fā)生I/0 Exception. 惡性循環(huán),導(dǎo)致我
們所有的處理服務(wù)線程讀的都是超時的套接字,所有的請求過來都拋I/O異常,這樣等于我們整個系統(tǒng)都掛掉了,已經(jīng)無法對外提供
正常的服務(wù)了。
???? 對于海量數(shù)據(jù)的處理,現(xiàn)在業(yè)界都是采用集群系統(tǒng)來進行處理,當(dāng)請求的數(shù)量不斷加大的時候,我們可以通過增加處理節(jié)點,反正現(xiàn)
在硬件設(shè)備相對便宜。但是要保證系統(tǒng)的可靠性和穩(wěn)定性,在程序方面我們還是可以進一步的優(yōu)化的,我們下一節(jié)要講述的就是針對
線上出現(xiàn)的這類問題的一種處理策略。
?
4.???ThreadPoolExecutor有界隊列使用
public class ThreadPool {
???????? private final static String poolName = "mypool";
???????? static private ThreadPool threadFixedPool = null;
???????? public ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(2);
???????? private ExecutorService executor;
?
???????? static public ThreadPool getFixedInstance() {
?????????????????? return threadFixedPool;
???????? }
???????? private ThreadPool(int num) {
?????????????????? executor = new ThreadPoolExecutor(2, 4,60,TimeUnit.SECONDS, queue,new DaemonThreadFactory
(poolName), new ThreadPoolExecutor.AbortPolicy());
???????? }
???????? public void execute(Runnable r) {
?????????????????? executor.execute(r);
???????? }
????????
???????? public static void main(String[] params) {
?????????????????? class MyRunnable implements Runnable {
??????????????????????????? public void run() {
???????????????????????????????????? System.out.println("OK!");
???????????????????????????????????? try {
?????????????????????????????????????????????? Thread.sleep(10);
???????????????????????????????????? } catch (InterruptedException e) {
?????????????????????????????????????????????? e.printStackTrace();
???????????????????????????????????? }
??????????????????????????? }
?????????????????? }
?????????????????? int count = 0;
?????????????????? for (int i = 0; i < 10; i++) {
??????????????????????????? try {
???????????????????????????????????? ThreadPool.getFixedInstance().execute(new MyRunnable());
??????????????????????????? } catch (RejectedExecutionException e) {
???????????????????????????????????? e.printStackTrace();
???????????????????????????????????? count++;
??????????????????????????? }
?????????????????? }
?????????????????? try {
??????????????????????????? log.info("queue size:" + ThreadPool.getFixedInstance().queue.size());
??????????????????????????? Thread.sleep(2000);
?????????????????? } catch (InterruptedException e) {
??????????????????????????? e.printStackTrace();
?????????????????? }
?????????????????? System.out.println("Reject task: " + count);
???????? }
}
?????? 首先我們來看下這段代碼幾個重要的參數(shù),corePoolSize 為2,maximumPoolSize為4,任務(wù)隊列大小為2,每個任務(wù)平
均處理時間為10ms,一共有10個并發(fā)任務(wù)。
????? 執(zhí)行這段代碼,我們會發(fā)現(xiàn),有4個任務(wù)失敗了。這里就驗證了我們在上面提到有界隊列時候線程池的執(zhí)行順序。當(dāng)新任務(wù)在
方法 execute(Runnable) 中提交時, 如果運行的線程少于 corePoolSize,則創(chuàng)建新線程來處理請求。 如果運行的線程多于
corePoolSize 而少于 maximumPoolSize,則僅當(dāng)隊列滿時才創(chuàng)建新線程,如果此時線程數(shù)量達到maximumPoolSize,并且隊
列已經(jīng)滿,就會拒絕繼續(xù)進來的請求。
??? 現(xiàn)在我們調(diào)整一下代碼中的幾個參數(shù),將并發(fā)任務(wù)數(shù)改為200,執(zhí)行結(jié)果Reject task: 182,說明有18個任務(wù)成功了,線程處理
完一個請求后會接著去處理下一個過來的請求。在真實的線上環(huán)境中,會源源不斷的有新的請求過來,當(dāng)前的被拒絕了,但只要線
程池線程把當(dāng)下的任務(wù)處理完之后還是可以處理下一個發(fā)送過來的請求。
???? 通過有界隊列可以實現(xiàn)系統(tǒng)的過載保護,在高壓的情況下,我們的系統(tǒng)處理能力不會變?yōu)?,還能正常對外進行服務(wù),雖然有些服
務(wù)可能會被拒絕,至于如何減少被拒絕的數(shù)量以及對拒絕的請求采取何種處理策略我將會在下一篇文章《系統(tǒng)的過載保護》中繼續(xù)
闡述。
?
參考文獻:
轉(zhuǎn)載于:https://www.cnblogs.com/cstar/archive/2012/06/14/2549494.html
總結(jié)
以上是生活随笔為你收集整理的大数据处理系列之(一)Java线程池使用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: DataGridView数据验证Cell
- 下一篇: 关于Javascript闭包的理解