Java - concurrent包详解
我們都知道,在JDK1.5之前,Java中要進(jìn)行業(yè)務(wù)并發(fā)時(shí),通常需要有程序員獨(dú)立完成代碼實(shí)現(xiàn),當(dāng)然也有一些開(kāi)源的框架提供了這些功能,但是這些依然沒(méi)有JDK自帶的功能使用起來(lái)方便。而當(dāng)針對(duì)高質(zhì)量Java多線程并發(fā)程序設(shè)計(jì)時(shí),為防止死蹦等現(xiàn)象的出現(xiàn),比如使用java之前的wait()、notify()和synchronized等,每每需要考慮性能、死鎖、公平性、資源管理以及如何避免線程安全性方面帶來(lái)的危害等諸多因素,往往會(huì)采用一些較為復(fù)雜的安全策略,加重了程序員的開(kāi)發(fā)負(fù)擔(dān).萬(wàn)幸的是,在JDK1.5出現(xiàn)之后,Sun大神(Doug Lea)終于為我們這些可憐的小程序員推出了java.util.concurrent工具包以簡(jiǎn)化并發(fā)完成。開(kāi)發(fā)者們借助于此,將有效的減少競(jìng)爭(zhēng)條件(race conditions)和死鎖線程。concurrent包很好的解決了這些問(wèn)題,為我們提供了更實(shí)用的并發(fā)程序模型。
Executor :具體Runnable任務(wù)的執(zhí)行者。
ExecutorService :一個(gè)線程池管理者,其實(shí)現(xiàn)類有多種,我會(huì)介紹一部分。我們能把Runnable,Callable提交到池中讓其調(diào)度。
Semaphore :一個(gè)計(jì)數(shù)信號(hào)量
ReentrantLock :一個(gè)可重入的互斥鎖定 Lock,功能類似synchronized,但要強(qiáng)大的多。
Future :是與Runnable,Callable進(jìn)行交互的接口,比如一個(gè)線程執(zhí)行結(jié)束后取返回的結(jié)果等等,還提供了cancel終止線程。
BlockingQueue :阻塞隊(duì)列。
CompletionService : ExecutorService的擴(kuò)展,可以獲得線程執(zhí)行結(jié)果的
CountDownLatch :一個(gè)同步輔助類,在完成一組正在其他線程中執(zhí)行的操作之前,它允許一個(gè)或多個(gè)線程一直等待。
CyclicBarrier :一個(gè)同步輔助類,它允許一組線程互相等待,直到到達(dá)某個(gè)公共屏障點(diǎn)
Future :Future 表示異步計(jì)算的結(jié)果。
ScheduledExecutorService :一個(gè) ExecutorService,可安排在給定的延遲后運(yùn)行或定期執(zhí)行的命令。
接下來(lái)逐一介紹
Executors主要方法說(shuō)明
newFixedThreadPool(固定大小線程池)
創(chuàng)建一個(gè)可重用固定線程集合的線程池,以共享的無(wú)界隊(duì)列方式來(lái)運(yùn)行這些線程(只有要請(qǐng)求的過(guò)來(lái),就會(huì)在一個(gè)隊(duì)列里等待執(zhí)行)。如果在關(guān)閉前的執(zhí)行期間由于失敗而導(dǎo)致任何線程終止,那么一個(gè)新線程將代替它執(zhí)行后續(xù)的任務(wù)(如果需要)。
newCachedThreadPool(無(wú)界線程池,可以進(jìn)行自動(dòng)線程回收)
創(chuàng)建一個(gè)可根據(jù)需要?jiǎng)?chuàng)建新線程的線程池,但是在以前構(gòu)造的線程可用時(shí)將重用它們。對(duì)于執(zhí)行很多短期異步任務(wù)的程序而言,這些線程池通常可提高程序性能。調(diào)用 execute 將重用以前構(gòu)造的線程(如果線程可用)。如果現(xiàn)有線程沒(méi)有可用的,則創(chuàng)建一個(gè)新線程并添加到池中。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。因此,長(zhǎng)時(shí)間保持空閑的線程池不會(huì)使用任何資源。注意,可以使用 ThreadPoolExecutor 構(gòu)造方法創(chuàng)建具有類似屬性但細(xì)節(jié)不同(例如超時(shí)參數(shù))的線程池。
newSingleThreadExecutor(單個(gè)后臺(tái)線程)
創(chuàng)建一個(gè)使用單個(gè) worker 線程的 Executor,以無(wú)界隊(duì)列方式來(lái)運(yùn)行該線程。(注意,如果因?yàn)樵陉P(guān)閉前的執(zhí)行期間出現(xiàn)失敗而終止了此單個(gè)線程,那么如果需要,一個(gè)新線程將代替它執(zhí)行后續(xù)的任務(wù))。可保證順序地執(zhí)行各個(gè)任務(wù),并且在任意給定的時(shí)間不會(huì)有多個(gè)線程是活動(dòng)的。與其他等效的 newFixedThreadPool(1) 不同,可保證無(wú)需重新配置此方法所返回的執(zhí)行程序即可使用其他的線程。
這些方法返回的都是ExecutorService對(duì)象,這個(gè)對(duì)象可以理解為就是一個(gè)線程池。
這個(gè)線程池的功能還是比較完善的。可以提交任務(wù)submit()可以結(jié)束線程池shutdown()。
雖然打印了一些信息,但是看的不是非常清晰,這個(gè)線程池是如何工作的,我們來(lái)將休眠的時(shí)間調(diào)長(zhǎng)10倍。
Thread.sleep((int)(Math.random()*10000));
再來(lái)看,會(huì)清楚看到只能執(zhí)行4個(gè)線程。當(dāng)執(zhí)行完一個(gè)線程后,才會(huì)又執(zhí)行一個(gè)新的線程,也就是說(shuō),我們將所有的線程提交后,線程池會(huì)等待執(zhí)行完最后shutdown。我們也會(huì)發(fā)現(xiàn),提交的線程被放到一個(gè)“無(wú)界隊(duì)列里”。這是一個(gè)有序隊(duì)列(BlockingQueue,這個(gè)下面會(huì)說(shuō)到)。
另外它使用了Executors的靜態(tài)函數(shù)生成一個(gè)固定的線程池,顧名思義,線程池的線程是不會(huì)釋放的,即使它是Idle。
這就會(huì)產(chǎn)生性能問(wèn)題,比如如果線程池的大小為200,當(dāng)全部使用完畢后,所有的線程會(huì)繼續(xù)留在池中,相應(yīng)的內(nèi)存和線程切換(while(true)+sleep循環(huán))都會(huì)增加。
如果要避免這個(gè)問(wèn)題,就必須直接使用ThreadPoolExecutor()來(lái)構(gòu)造。可以像通用的線程池一樣設(shè)置“最大線程數(shù)”、“最小線程數(shù)”和“空閑線程keepAlive的時(shí)間”。
這個(gè)就是線程池基本用法。
Semaphore
一個(gè)計(jì)數(shù)信號(hào)量。從概念上講,信號(hào)量維護(hù)了一個(gè)許可集合。如有必要,在許可可用前會(huì)阻塞每一個(gè) acquire(),然后再獲取該許可。每個(gè) release() 添加一個(gè)許可,從而可能釋放一個(gè)正在阻塞的獲取者。但是,不使用實(shí)際的許可對(duì)象,Semaphore 只對(duì)可用許可的號(hào)碼進(jìn)行計(jì)數(shù),并采取相應(yīng)的行動(dòng)。
Semaphore 通常用于限制可以訪問(wèn)某些資源(物理或邏輯的)的線程數(shù)目。例如,下面的類使用信號(hào)量控制對(duì)內(nèi)容池的訪問(wèn):
這里是一個(gè)實(shí)際的情況,大家排隊(duì)上廁所,廁所只有兩個(gè)位置,來(lái)了10個(gè)人需要排隊(duì)。
ReentrantLock
一個(gè)可重入的互斥鎖定 Lock,它具有與使用 synchronized 方法和語(yǔ)句所訪問(wèn)的隱式監(jiān)視器鎖定相同的一些基本行為和語(yǔ)義,但功能更強(qiáng)大。
ReentrantLock 將由最近成功獲得鎖定,并且還沒(méi)有釋放該鎖定的線程所擁有。當(dāng)鎖定沒(méi)有被另一個(gè)線程所擁有時(shí),調(diào)用 lock 的線程將成功獲取該鎖定并返回。如果當(dāng)前線程已經(jīng)擁有該鎖定,此方法將立即返回。可以使用 isHeldByCurrentThread() 和 getHoldCount() 方法來(lái)檢查此情況是否發(fā)生。
此類的構(gòu)造方法接受一個(gè)可選的公平參數(shù)。
當(dāng)設(shè)置為 true時(shí),在多個(gè)線程的爭(zhēng)用下,這些鎖定傾向于將訪問(wèn)權(quán)授予等待時(shí)間最長(zhǎng)的線程。否則此鎖定將無(wú)法保證任何特定訪問(wèn)順序。
與采用默認(rèn)設(shè)置(使用不公平鎖定)相比,使用公平鎖定的程序在許多線程訪問(wèn)時(shí)表現(xiàn)為很低的總體吞吐量(即速度很慢,常常極其慢),但是在獲得鎖定和保證鎖定分配的均衡性時(shí)差異較小。不過(guò)要注意的是,公平鎖定不能保證線程調(diào)度的公平性。因此,使用公平鎖定的眾多線程中的一員可能獲得多倍的成功機(jī)會(huì),這種情況發(fā)生在其他活動(dòng)線程沒(méi)有被處理并且目前并未持有鎖定時(shí)。還要注意的是,未定時(shí)的 tryLock 方法并沒(méi)有使用公平設(shè)置。因?yàn)榧词蛊渌€程正在等待,只要該鎖定是可用的,此方法就可以獲得成功。
建議總是 立即實(shí)踐,使用 try 塊來(lái)調(diào)用 lock,在之前/之后的構(gòu)造中,最典型的代碼如下:
我的例子:
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.ReentrantLock; public class MyReentrantLock extends Thread{ TestReentrantLock lock; private int id; public MyReentrantLock(int i,TestReentrantLock test){this.id=i;this.lock=test; } public void run(){lock.print(id); } public static void main(String args[]){ExecutorService service=Executors.newCachedThreadPool();TestReentrantLock lock=new TestReentrantLock();for(int i=0;i<10;i++){service.submit(new MyReentrantLock(i,lock));}service.shutdown(); } } class TestReentrantLock{ private ReentrantLock lock=new ReentrantLock(); public void print(int str){try{lock.lock();System.out.println(str+"獲得");Thread.sleep((int)(Math.random()*1000));}catch(Exception e){e.printStackTrace();}finally{System.out.println(str+"釋放");lock.unlock();} } }BlockingQueue
支持兩個(gè)附加操作的 Queue,這兩個(gè)操作是:檢索元素時(shí)等待隊(duì)列變?yōu)榉强?#xff0c;以及存儲(chǔ)元素時(shí)等待空間變得可用。
BlockingQueue 不接受 null 元素。試圖 add、put 或 offer 一個(gè) null 元素時(shí),某些實(shí)現(xiàn)會(huì)拋出 NullPointerException。null 被用作指示 poll 操作失敗的警戒值。
BlockingQueue 可以是限定容量的。它在任意給定時(shí)間都可以有一個(gè) remainingCapacity,超出此容量,便無(wú)法無(wú)阻塞地 put 額外的元素。
沒(méi)有任何內(nèi)部容量約束的 BlockingQueue 總是報(bào)告 Integer.MAX_VALUE 的剩余容量。
BlockingQueue 實(shí)現(xiàn)主要用于生產(chǎn)者-使用者隊(duì)列,但它另外還支持 Collection 接口。因此,舉例來(lái)說(shuō),使用 remove(x) 從隊(duì)列中移除任意一個(gè)元素是有可能的。
然而,這種操作通常不 會(huì)有效執(zhí)行,只能有計(jì)劃地偶爾使用,比如在取消排隊(duì)信息時(shí)。
BlockingQueue 實(shí)現(xiàn)是線程安全的。所有排隊(duì)方法都可以使用內(nèi)部鎖定或其他形式的并發(fā)控制來(lái)自動(dòng)達(dá)到它們的目的。
然而,大量的 Collection 操作(addAll、containsAll、retainAll 和 removeAll)沒(méi)有 必要自動(dòng)執(zhí)行,除非在實(shí)現(xiàn)中特別說(shuō)明。
因此,舉例來(lái)說(shuō),在只添加了 c 中的一些元素后,addAll(c) 有可能失敗(拋出一個(gè)異常)。
BlockingQueue 實(shí)質(zhì)上不 支持使用任何一種“close”或“shutdown”操作來(lái)指示不再添加任何項(xiàng)。
這種功能的需求和使用有依賴于實(shí)現(xiàn)的傾向。例如,一種常用的策略是:對(duì)于生產(chǎn)者,插入特殊的 end-of-stream 或 poison 對(duì)象,并根據(jù)使用者獲取這些對(duì)象的時(shí)間來(lái)對(duì)它們進(jìn)行解釋。
下面的例子演示了這個(gè)阻塞隊(duì)列的基本功能。
———————執(zhí)行結(jié)果—————–
{0} in queue!
{1} in queue!
{2} in queue!
{3} in queue!
0 has take!
{4} in queue!
1 has take!
{6} in queue!
2 has take!
{7} in queue!
3 has take!
{8} in queue!
4 has take!
{5} in queue!
6 has take!
{9} in queue!
7 has take!
8 has take!
5 has take!
9 has take!
CompletionService
將生產(chǎn)新的異步任務(wù)與使用已完成任務(wù)的結(jié)果分離開(kāi)來(lái)的服務(wù)。生產(chǎn)者 submit 執(zhí)行的任務(wù)。使用者 take 已完成的任務(wù),
并按照完成這些任務(wù)的順序處理它們的結(jié)果。例如,CompletionService 可以用來(lái)管理異步 IO ,執(zhí)行讀操作的任務(wù)作為程序或系統(tǒng)的一部分提交,
然后,當(dāng)完成讀操作時(shí),會(huì)在程序的不同部分執(zhí)行其他操作,執(zhí)行操作的順序可能與所請(qǐng)求的順序不同。
通常,CompletionService 依賴于一個(gè)單獨(dú)的 Executor 來(lái)實(shí)際執(zhí)行任務(wù),在這種情況下,
CompletionService 只管理一個(gè)內(nèi)部完成隊(duì)列。ExecutorCompletionService 類提供了此方法的一個(gè)實(shí)現(xiàn)。
CountDownLatch
一個(gè)同步輔助類,在完成一組正在其他線程中執(zhí)行的操作之前,它允許一個(gè)或多個(gè)線程一直等待。
用給定的計(jì)數(shù) 初始化 CountDownLatch。由于調(diào)用了 countDown() 方法,所以在當(dāng)前計(jì)數(shù)到達(dá)零之前,await 方法會(huì)一直受阻塞。
之后,會(huì)釋放所有等待的線程,await 的所有后續(xù)調(diào)用都將立即返回。這種現(xiàn)象只出現(xiàn)一次——計(jì)數(shù)無(wú)法被重置。如果需要重置計(jì)數(shù),請(qǐng)考慮使用 CyclicBarrier。
CountDownLatch 是一個(gè)通用同步工具,它有很多用途。將計(jì)數(shù) 1 初始化的 CountDownLatch 用作一個(gè)簡(jiǎn)單的開(kāi)/關(guān)鎖存器,
或入口:在通過(guò)調(diào)用 countDown() 的線程打開(kāi)入口前,所有調(diào)用 await 的線程都一直在入口處等待。
用 N 初始化的 CountDownLatch 可以使一個(gè)線程在 N 個(gè)線程完成某項(xiàng)操作之前一直等待,或者使其在某項(xiàng)操作完成 N 次之前一直等待。
CountDownLatch 的一個(gè)有用特性是,它不要求調(diào)用 countDown 方法的線程等到計(jì)數(shù)到達(dá)零時(shí)才繼續(xù),
而在所有線程都能通過(guò)之前,它只是阻止任何線程繼續(xù)通過(guò)一個(gè) await。
一下的例子是別人寫(xiě)的,非常形象。
CountDownLatch最重要的方法是countDown()和await(),前者主要是倒數(shù)一次,后者是等待倒數(shù)到0,如果沒(méi)有到達(dá)0,就只有阻塞等待了。
CyclicBarrier
一個(gè)同步輔助類,它允許一組線程互相等待,直到到達(dá)某個(gè)公共屏障點(diǎn) (common barrier point)。
在涉及一組固定大小的線程的程序中,這些線程必須不時(shí)地互相等待,此時(shí) CyclicBarrier 很有用。因?yàn)樵?barrier 在釋放等待線程后可以重用,所以稱它為循環(huán) 的 barrier。
CyclicBarrier 支持一個(gè)可選的 Runnable 命令,在一組線程中的最后一個(gè)線程到達(dá)之后(但在釋放所有線程之前),
該命令只在每個(gè)屏障點(diǎn)運(yùn)行一次。若在繼續(xù)所有參與線程之前更新共享狀態(tài),此屏障操作 很有用。
示例用法:下面是一個(gè)在并行分解設(shè)計(jì)中使用 barrier 的例子,很經(jīng)典的旅行團(tuán)例子:
CyclicBarrier最重要的屬性就是參與者個(gè)數(shù),另外最要方法是await()。當(dāng)所有線程都調(diào)用了await()后,就表示這些線程都可以繼續(xù)執(zhí)行,否則就會(huì)等待。
Future
Future 表示異步計(jì)算的結(jié)果。它提供了檢查計(jì)算是否完成的方法,以等待計(jì)算的完成,并檢索計(jì)算的結(jié)果。
計(jì)算完成后只能使用 get 方法來(lái)檢索結(jié)果,如有必要,計(jì)算完成前可以阻塞此方法。取消則由 cancel 方法來(lái)執(zhí)行。
還提供了其他方法,以確定任務(wù)是正常完成還是被取消了。一旦計(jì)算完成,就不能再取消計(jì)算。
如果為了可取消性而使用 Future但又不提供可用的結(jié)果,則可以聲明 Future
這樣我們就把concurrent包下比較重要的功能都已經(jīng)總結(jié)完了,希望對(duì)我們理解能有幫助。
總結(jié)
以上是生活随笔為你收集整理的Java - concurrent包详解的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 科创板新股如何申购
- 下一篇: 银行卡bcss.38是什么意思