JDK并发包
JDK5之后引進(jìn)了并發(fā)包java.util.concurrent,讓并發(fā)的開發(fā)更加可控,更加簡單。所以有必要好好學(xué)習(xí)下,下面從同步控制、并發(fā)容器、線程池三部分來詳細(xì)了解它。
1. 各種同步控制工具的使用
1.1 ReentrantLock(重用鎖)
1)與synchronized的區(qū)別是,它需要手動申請鎖與解鎖,在 finally 塊中釋放鎖,而synchronized是JVM自動處理的。可控性上ReentrantLock更強(qiáng)。
由于ReentrantLock是重入鎖,所以可以反復(fù)得到相同的一把鎖,它有一個與鎖相關(guān)的獲取計(jì)數(shù)器,如果擁有鎖的某個線程再次得到鎖,那么獲取計(jì)數(shù)器就加1,然后鎖需要被釋放兩次才能獲得真正釋放(重入鎖)。
注:synchronized 也是可重入的,當(dāng)線程進(jìn)入由線程已經(jīng)擁有的監(jiān)控器保護(hù)的 synchronized 塊,就允許線程繼續(xù)進(jìn)行,當(dāng)線程退出第二個(或者后續(xù)) synchronized 塊的時候,不釋放鎖,只有線程退出它進(jìn)入的監(jiān)控器保護(hù)的第一個synchronized 塊時,才釋放鎖,例如:
在帶有鎖的方法1里面調(diào)用了帶有鎖的方法2,這時在方法1獲得的鎖還是沒有釋放的,其它線程還是訪問不了方法1,即使方法2結(jié)束了,直至方法1的鎖釋放,鎖才會真正釋放。假如有多個線程同時來調(diào)用方法1,那輸出結(jié)果還是按順序輸出:1,2,1,2...
2)還有與synchronized不同的是,ReentrantLock對中斷是有響應(yīng)的。普通的lock.lock()是不能響應(yīng)中斷的,lock.lockInterruptibly()能夠響應(yīng)中斷。
3)可限時,超時不能獲得鎖,就返回false,不會永久等待構(gòu)成死鎖,使用lock.tryLock(long timeout, TimeUnit unit)來實(shí)現(xiàn)可限時鎖,參數(shù)為時間和單位。無法獲得后就直接退出了。
1.2 Condition
Condition與ReentrantLock的關(guān)系就類似于synchronized與Object.wait()/signal()
await()方法會使當(dāng)前線程等待,同時釋放當(dāng)前鎖,當(dāng)其他線程中使用signal()時或者signalAll()方法時,線 程會重新獲得鎖并繼續(xù)執(zhí)行。或者當(dāng)線程被中斷時,也能跳出等待。這和Object.wait()方法很相似。
awaitUninterruptibly()方法與await()方法基本相同,但是它并不會再等待過程中響應(yīng)中斷。 singal()方法用于喚醒一個在等待中的線程。相對的singalAll()方法會喚醒所有在等待中的線程。這和Obejct.notify()方法很類似。
1.3.Semaphore
對于鎖來說,它是互斥的排他的。意思就是,只要我獲得了鎖,沒人能再獲得了。
而對于Semaphore來說,它允許多個線程同時進(jìn)入臨界區(qū)。可以認(rèn)為它是一個共享鎖,但是共享的額度是有限制的,額度用完了,其他沒有拿到額度的線程還是要阻塞在臨界區(qū)外。當(dāng)額度為1時,就相等于lock。??
常用方法有:
semaphore.acquire();//申請?jiān)S可,當(dāng)然一個線程也可以一次申請多個許可acquire(int permits),誰拿到令牌(acquire)就可以去執(zhí)行了,如果沒有令牌則需要等待。
semaphore.release();//執(zhí)行完畢,一定要?dú)w還(release)令牌,否則令牌會被很快用光,別的線程就無法獲得令牌而執(zhí)行下去了。
1.4 ReadWriteLock
ReadWriteLock是區(qū)分功能的鎖。讀和寫是兩種不同的功能,讀-讀不互斥,讀-寫互斥,寫-寫互斥。
這樣的設(shè)計(jì)是并發(fā)量提高了,又保證了數(shù)據(jù)安全。
使用方式是:
private static ReentrantReadWriteLock readWriteLock=new ReentrantReadWriteLock(); private static Lock readLock = readWriteLock.readLock(); private static Lock writeLock = readWriteLock.writeLock();1.5 CountDownLatch
倒數(shù)計(jì)時器,等待所有檢查線程全部完工后,再執(zhí)行。每個任務(wù)完成后調(diào)用countDown(),計(jì)數(shù)器就會減1,當(dāng)計(jì)數(shù)為0的時候,await()阻塞后面的方法會繼續(xù)執(zhí)行。
1.6 CyclicBarrier
和CountDownLatch相似,也是等待某些線程都做完以后再執(zhí)行。與CountDownLatch區(qū)別在于這個計(jì)數(shù)器可以反復(fù)使用。比如,假設(shè)我們將計(jì)數(shù)器設(shè)置為10。那么湊齊第一批1 0個線程后,計(jì)數(shù)器就會歸零,然后接著湊齊下一批10個線程。并且每次完成一批線程后會觸發(fā)一個動作
?CyclicBarrier(int parties, Runnable barrierAction)//barrierAction就是當(dāng)計(jì)數(shù)器一次計(jì)數(shù)完成后,系統(tǒng)會執(zhí)行的動作
也是通過await()來阻塞主線程等待任務(wù)全部完成
2. 并發(fā)容器
2.1?ConcurrentHashMap
我們知道HashMap不是一個線程安全的容器,最簡單的方式使HashMap變成線程安全就是使用Collections.synchronizedMap,它是對HashMap的一個包裝,如:
Collections.synchronizedMap(new HashMap())
同理對于List,Set也提供了相似方法。
但是這種方式只適合于并發(fā)量比較小的情況,它會將HashMap包裝在里面,然后將HashMap的每個操作都加上synchronized。
下面來看下ConcurrentHashMap是如何實(shí)現(xiàn)的:
在?ConcurrentHashMap內(nèi)部有一個Segment段,它將大的HashMap切分成若干個段(小的HashMap),然后讓數(shù)據(jù)在每一段上Hash,這樣多個線程在不同段上的Hash操作一定是線程安全的,所以只需要同步同一個段上的線程就可以了,這樣實(shí)現(xiàn)了鎖的分離,大大增加了并發(fā)量。
在使用ConcurrentHashMap.size時會比較麻煩,因?yàn)樗y(tǒng)計(jì)每個段的數(shù)據(jù)和,在這個時候,要把每一個段都加上鎖,然后再做數(shù)據(jù)統(tǒng)計(jì)。這個就是把鎖分離后的小小弊端,但是size方法應(yīng)該是不會被高頻率調(diào)用的方法。
2.2?BlockingQueue
BlockingQueue不是一個高性能的容器。但是它是一個非常好的共享數(shù)據(jù)的容器。是典型的生產(chǎn)者和消費(fèi)者的實(shí)現(xiàn)。
它在內(nèi)部實(shí)現(xiàn)了同步的隊(duì)列,實(shí)現(xiàn)方式采用的是我們第2種await() / signal()方法。它可以在生成對象時指定容量大小。它用于阻塞操作的是put()和take()方法。
put()方法:類似于我們上面的生產(chǎn)者線程,容量達(dá)到最大時,自動阻塞。
take()方法:類似于我們上面的消費(fèi)者線程,容量為0時,自動阻塞。
3.線程池
線程池的作用就是將線程進(jìn)行復(fù)用,減少創(chuàng)建與銷毀線程的開銷。
ThreadPoolExecutor是線程池的一個重要實(shí)現(xiàn)。
而Executors是一個工廠類。
新提交一個任務(wù)時的處理流程很明顯:
1、如果線程池的當(dāng)前大小還沒有達(dá)到基本大小(poolSize < corePoolSize),那么就新增加一個線程處理新提交的任務(wù);
2、如果當(dāng)前大小已經(jīng)達(dá)到了基本大小,就將新提交的任務(wù)提交到阻塞隊(duì)列排隊(duì),等候處理workQueue.offer(command);
3、如果隊(duì)列容量已達(dá)上限,并且當(dāng)前大小poolSize沒有達(dá)到maximumPoolSize,那么就新增線程來處理任務(wù);
4、如果隊(duì)列已滿,并且當(dāng)前線程數(shù)目也已經(jīng)達(dá)到上限,那么意味著線程池的處理能力已經(jīng)達(dá)到了極限,此時需要拒絕新增加的任務(wù)。至于如何拒絕處理新增
? ? 的任務(wù),取決于線程池的飽和策略RejectedExecutionHandler。
3.1.線程池的種類
- new FixedThreadPool 固定數(shù)量的線程池,線程池中的線程數(shù)量是固定的,不會改變。
- new SingleThreadExecutor 單一線程池,線程池中只有一個線程。
- new CachedThreadPool 緩存線程池,線程池中的線程數(shù)量不固定,會根據(jù)需求的大小進(jìn)行改變。
- new ScheduledThreadPool?計(jì)劃任務(wù)調(diào)度的線程池,用于執(zhí)行計(jì)劃任務(wù),比如每隔5分鐘怎么樣
3.2.拒絕策略
有時候,任務(wù)非常繁重,導(dǎo)致系統(tǒng)負(fù)載太大。在上面說過,當(dāng)任務(wù)量越來越大時,任務(wù)都將放到FixedThreadPool的阻塞隊(duì)列中,導(dǎo)致內(nèi)存消耗太大,最終導(dǎo)致內(nèi)存溢出。這樣的情況是應(yīng)該要避免的。因此當(dāng)我們發(fā)現(xiàn)線程數(shù)量要超過最大線程數(shù)量時,我們應(yīng)該放棄一些任務(wù)。丟棄時,我們應(yīng)該把任務(wù)記下來,而不是直接丟掉。
共有以上4種策略。
AbortPolicy:如果不能接受任務(wù)了,則拋出異常。
CallerRunsPolicy:如果不能接受任務(wù)了,則讓調(diào)用的線程去完成。
DiscardOldestPolicy:如果不能接受任務(wù)了,則丟棄最老的一個任務(wù),由一個隊(duì)列來維護(hù)。
DiscardPolicy:如果不能接受任務(wù)了,則丟棄任務(wù)。
在ThreadPoolExecutor的另一個構(gòu)造函數(shù)中,可傳入一個handler,而handler就是拒絕策略的實(shí)現(xiàn),它會告訴我們,如果任務(wù)不能執(zhí)行了,該怎么做.
3.3 ForkJoin
Fork/Join框架是Java7提供了的一個用于并行執(zhí)行任務(wù)的框架, 是一個把大任務(wù)分割成若干個小任務(wù),最終匯總每個小任務(wù)結(jié)果后得到大任務(wù)結(jié)果的框架。有點(diǎn)類似工作流里面的分支合并概念
參考文章:
https://my.oschina.net/hosee/blog/485121
http://www.importnew.com/21288.html
https://my.oschina.net/hosee/blog/614319
3.4 Future
Future<V>代表一個異步執(zhí)行的操作,通過get()方法可以獲得操作的結(jié)果,如果異步操作還沒有完成,則get()會使當(dāng)前線程阻塞。FutureTask<V>實(shí)現(xiàn)了Future<V>和Runable<V>
3.5 Callable
一個有返回值的操作
3.6?CompletionService
CompletionService對ExecutorService進(jìn)行了包裝,內(nèi)部維護(hù)一個保存Future對象的BlockingQueue。只有當(dāng)這個Future對象狀態(tài)是結(jié)束的時候,才會加入到這個Queue中,take()方法其實(shí)就是Producer-Consumer中的Consumer。它會從Queue中取出Future對象,如果Queue是空的,就會阻塞在那里,直到有完成的Future對象加入到Queue中。所以,先完成的必定先被取出。這樣就減少了不必要的等待時間。
與迭代了FutureTask的數(shù)組的區(qū)別是,CompletionService 任務(wù)完成后就把其結(jié)果加到result中,而不用依次等待每個任務(wù)完成。
轉(zhuǎn)載于:https://www.cnblogs.com/ptw-share/p/6681353.html
總結(jié)
- 上一篇: 从1~N中任选出三个数,最小公倍数最大
- 下一篇: html5的消息通知