JDK并发包1-1
https://www.jianshu.com/p/ef342bc21f7e多線程的控制并不是一個非常簡單的事情,一般意義上呢,最基本的多線程控制,用synchronized關(guān)鍵字,和object.wait,和object.notify,這些操作,那這個在之前的課程當(dāng)中呢,也已經(jīng)又給大家介紹,我們在這里會介紹一些更加高級的工具,這些高級的工具呢,他首先在功能上要比synchronized要高級一些,其實在使用上面來說呢,它會封裝一些更加常用的一些場景,使大家使用更加快捷,同時寫JDK開發(fā)包的呢,都是一些比較厲害的人物,所以相對來講,他們的實現(xiàn)呢,會比大家自己去實現(xiàn)一個類似的功能呢,性能上和效果上要好的多,首先第一個我們想要給大家介紹的呢,是ReentrantLock,是重入鎖,它是synchronized的一個替代品,或者說它是一個增強版,synchronized關(guān)鍵字,特點是使用簡單,但是功能上是比較薄弱的,因為他只能做到說,多個線程進(jìn)行臨界區(qū)訪問的時候呢,不能進(jìn)入臨界區(qū)的線程進(jìn)行一個等待,這個等待是一個死等,只有前面的線程離開臨界區(qū)以后呢,才能進(jìn)去,但是ReentranLock給我們提供了更多的選擇性,我們在JDK1.5之前呢,JAVA虛擬機對synchronized的優(yōu)化并不會充分,RenntrantLock的性能要好于synchronized關(guān)鍵字,但是在做了充分優(yōu)化之后,現(xiàn)在的JDK的版本當(dāng)中,其實兩者性能是不相上下的,所以如果只是一個簡單功能的實現(xiàn)呢,沒有必要刻意去追求比較高級的功能,ReentrantLock我們主要看哪些方面,他除了實現(xiàn)普通的鎖的功能之外,他還實現(xiàn)了比如可中斷,可限時,公平鎖,可重入,這些特點,什么是可重入呢,ReentrantLock對于同一個線程來講,否則會出現(xiàn)一個線程把自己給卡死
package com.learn.thread;import java.util.concurrent.locks.ReentrantLock;public class ReentrantLockDemo implements Runnable{public static ReentrantLock lock=new ReentrantLock();public static int i=0;@Overridepublic void run() {for(int j=0;j<100000;j++) {lock.lock();try {i++;}finally {lock.unlock();}}}public static void main(String[] args) throws InterruptedException {ReentrantLockDemo demo=new ReentrantLockDemo();Thread t1=new Thread(demo);Thread t2=new Thread(demo);t1.start();t2.start();t1.join();t2.join();System.out.println(i);}
}
我們在多個線程要對i做++操作,大家可以知道,直接對這個變量做++操作呢,一定不是線程安全的,所以我們加鎖,這個做就是ReentrantLock,我們現(xiàn)在有兩個線程,都會對他去做一個加加操作,我們每一次只允許一個對他做++,所以我們每次在做加之前,我們會做一個lock操作,在這個鎖上做一個lock操作,離開之后要做unlock操作,這個寫法也是使用ReentantLock的一個基本范式,必須這樣寫,為什么要把unlock寫到finally里面呢,防止發(fā)生意外,導(dǎo)致你這個鎖沒有釋放掉,你這個鎖沒有釋放掉后果是很嚴(yán)重的,會導(dǎo)致你這個添加的線程沒法進(jìn)來,萬一你在執(zhí)行過程當(dāng)中,給拋了個異常,異常你又沒有處理,那這個時候后果就比較嚴(yán)重,你寫finally里面呢,不管里面是否有異常發(fā)生,發(fā)生什么情況,finally在我這個程序退出之前呢,總是會給執(zhí)行一下的,在這個地方把鎖給釋放掉,相對于synchronized來講呢,對synchronized來說,他的這個鎖的釋放呢,是虛擬機完成的自動的動作,我們只要把synchronized的括號給括起來,就可以了,語法上檢查是通過的,那synchronized就能保證鎖能夠被釋放掉,ReentrantLock是由程序決定是在什么時候釋放鎖,那從這點來講,它提供了鎖的一個靈活性,在任何場景下來釋放,但是同樣道理,靈活性付出的代價呢,你要格外小心,不能忘記把鎖給釋放掉,如果你忘了釋放這個地方,其他的就永遠(yuǎn)進(jìn)不來了,各自加了這么多次,它是沒有任何不安全性發(fā)生,如果是不安全,那么會小于這個數(shù)字,這里是一個重入,如果說你不幸很意外的,在線程中兩次對這個鎖加鎖,如果兩次對這個鎖進(jìn)行加鎖之后呢,你許可的數(shù)量就變成2,如果出現(xiàn)這種情況,那么你必須對這個鎖釋放兩次,大家可能會覺得有點奇怪,從我們這個直觀意義上呢,我一個線程取一次鎖,這是很自然的事情,我不當(dāng)心的情況,我又再取得一次,這不是什么大的問題,當(dāng)我不需要使用這個鎖的時候呢,把它釋放掉,那我也只需要執(zhí)行一次,那也就是可以了,如果我們在這種情況之下,沃我們只執(zhí)行一次unlock會是什么結(jié)果,我們可以看到,這個程序我已經(jīng)開啟來了,開了很長時間了,但是沒有停止,因為有一個線程卡死在里邊了,所以他是不會把這個i給打印出來的,這個時候是會有一個線程在等待的現(xiàn)象,是因為你前面的線程只釋放了一次unlock,導(dǎo)致其他線程就進(jìn)不來了,對于重入鎖來講,lock了幾次,你就必須要釋放幾次,如果我們lock了兩次,我們就釋放兩次,那就沒有問題了,這是重入鎖的一個特點,如果你一個線程拿了兩個許可,你得釋放兩次,下面我們來看一個比較好的功能,叫做可中斷,重入鎖它是可以被中斷的,他不像synchronized關(guān)鍵字一樣,對中斷是沒有響應(yīng)的,對中斷沒有響應(yīng)的一個后果呢,如果你發(fā)生了死鎖,或者長期等待的情況,不一定死鎖產(chǎn)生了長期等待的情況,你前一個線程因為某一些原因沒有完成某些操作,導(dǎo)致后面的線程要做一個長期的等待,那么你在長期等待過程當(dāng)中,我們想看到的一個現(xiàn)象,我們希望這個線程停下來,這個時候一個可行的辦法呢,發(fā)送一個中斷信號,讓這個線程停下來,重入鎖就提供這個功能說,我在 加鎖的同時,可以去響應(yīng)你的中斷,如果我發(fā)生了死鎖,如果發(fā)生了一些意外發(fā)生的情況,我在一個鎖上卡了很久,那我還有一個辦法把你這個線程給喚醒,不至于永久性的卡死下去
package com.learn.thread;import java.util.concurrent.locks.ReentrantLock;public class ReentrantLockInterruptDemo implements Runnable{public static ReentrantLock lock1=new ReentrantLock();public static ReentrantLock lock2=new ReentrantLock();int lock;public ReentrantLockInterruptDemo(int lock) {this.lock=lock;}@Overridepublic void run() {try {if(lock==1) {lock1.lockInterruptibly();try {Thread.sleep(500);}catch (InterruptedException e) {e.printStackTrace();}lock2.lockInterruptibly();}else {lock2.lockInterruptibly();try {Thread.sleep(500);}catch (InterruptedException e) {e.printStackTrace();}lock1.lockInterruptibly();}}catch (InterruptedException e) {e.printStackTrace();}finally {if(lock1.isHeldByCurrentThread()) {lock1.unlock();}if(lock2.isHeldByCurrentThread()) {lock2.unlock();}System.out.println(Thread.currentThread().getId()+":線程退出");}}public static void main(String[] args) throws InterruptedException {ReentrantLockInterruptDemo r1=new ReentrantLockInterruptDemo(1);ReentrantLockInterruptDemo r2=new ReentrantLockInterruptDemo(2);Thread t1=new Thread(r1);Thread t2=new Thread(r2);t1.start();t2.start();Thread.sleep(1000);DeadlockChecker.check();}}
package com.learn.thread;import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;/** 檢查死鎖 */
public class DeadlockChecker {private final static ThreadMXBean mbean = ManagementFactory.getThreadMXBean();final static Runnable deadLockChecker = new Runnable() {@Overridepublic void run() {while(true){long[] deadLockedThreadIds = mbean.findDeadlockedThreads();if(deadLockedThreadIds != null){ThreadInfo[] threadInfos = mbean.getThreadInfo(deadLockedThreadIds);for(Thread t : Thread.getAllStackTraces().keySet()){for(ThreadInfo ti : threadInfos){if(ti.getThreadId() == t.getId()){t.interrupt();}}}}try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}}}};public static void check() {Thread t = new Thread(deadLockChecker);t.setDaemon(true);t.start();}
}
lockInterruptibly表示一個可中斷的加鎖,只有這樣去加鎖,他才會響應(yīng)中斷,我們?nèi)ド暾堃粋€鎖,除非當(dāng)前線程被中斷,中斷之后他就會拋出一個異常,那我們來看一下這個程序,這個程序有兩個鎖,我想在這個程序當(dāng)中去構(gòu)造一個死鎖的現(xiàn)象,線程我們也開兩個,這里是一個實例變量,不是一個靜態(tài)變量,我兩個實例可以賦值兩個不同的值,那當(dāng)我lock等于1的時候呢,這樣線程1鎖了1,線程2先鎖了2,同時線程1申請lock2,是很明顯的一個死鎖,對于這個死鎖來講,如果你使用lock方法呢,就不太有辦法去幫他解開了,但是如果你使用Interruptbly,那你就可以把兩個線程當(dāng)一個中斷,這個程序依然可以順利的結(jié)束,死鎖必然會產(chǎn)生一個無限期等待
package com.learn.thread;import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;public class TimedLock implements Runnable{public static ReentrantLock lock=new ReentrantLock();@Overridepublic void run() {try {if(lock.tryLock(5, TimeUnit.SECONDS)) {Thread.sleep(6000);}else {System.out.println("get lock failed");}}catch (InterruptedException e) {e.printStackTrace();}finally {if(lock.isHeldByCurrentThread())lock.unlock();}}public static void main(String[] args) {TimedLock ins=new TimedLock();Thread t1=new Thread(ins);Thread t2=new Thread(ins);t1.start();t2.start();}}
釋放占用的鎖,使得別人可以獲得鎖,trylock有兩個參數(shù),準(zhǔn)備在這個鎖上等多久,比如這里就等5秒中,第二個是單位,這里是5秒,如果你要改成5毫秒,那這里就改成毫秒,另外一個是公平鎖,ReentrantLock內(nèi)部是支持公平鎖的,參數(shù)是fair,/*** Creates an instance of {@code ReentrantLock} with the* given fairness policy.** @param fair {@code true} if this lock should use a fair ordering policy*/
public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();
}什么叫公平鎖呢,我這個鎖可以保證線程先來先到,后來者后得,在一般意義上呢,我們這個鎖是不公平的,我先來申請鎖的線程,未必會先拿到鎖,我后面來的線程未必后拿到鎖,可能我后面的鎖運氣好一點,我反而先拿到鎖,有可能會拿到某些線程,拿不到鎖,產(chǎn)生饑餓現(xiàn)象,公平鎖是不會有問題,對于公平鎖來說,先到的線程他一定會先拿到鎖,后到的線程會后拿到鎖,公平鎖雖然不會產(chǎn)生饑餓,但是公平鎖的性能呢,是要比非公平鎖性能差很多,因為公平鎖還要處理一個排隊的問題,所以如果說沒有特別需求,我們不一定要去使用公平的狀態(tài),默認(rèn)情況下這個鎖是非公平的,那這里是ReentrantLock所提供的一些功能,下面我們來看一下和重入鎖相關(guān)的一個概念condition,ReentrantLock和Condition之間的關(guān)系呢,如同synchronized和object.wait,object.notify,之間的關(guān)系,之前我們已經(jīng)有了比較清楚地了解,你要去wait或者notify一個線程,那你就要獲得這個monitor監(jiān)視器的所有權(quán),如果你沒有得到monitor的所有權(quán),不能做notify,那么同樣的道理,我們condition是相當(dāng)于說,在某一個鎖上面,去做這個wait和notify,你也要去獲得這個鎖,Condition什么意思呢,如果之前把wait和notify清楚的話,Condition意思是和他一樣的,但是不同的是它是和ReentrantLock一起使用,而這兩個是和synchronized一起使用,一個是monitor,一個是ReentrantLock,那接口也是非常非常類似,一個是wait和notify,一個是await,等待在Condition上,signal通知,通知等待在Condition上的線程,也有signalAll,這個也notifyAll相同的意思,比較強大的是說,wait也是有一個等待時間public final void wait(long timeout, int nanos) throws InterruptedException {if (timeout < 0) {throw new IllegalArgumentException("timeout value is negative");}if (nanos < 0 || nanos > 999999) {throw new IllegalArgumentException("nanosecond timeout value out of range");}if (nanos > 0) {timeout++;}wait(timeout);
}https://www.cnblogs.com/ten951/p/6212127.html
package com.learn.thread;import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;public class ReenterLockCondition implements Runnable {public static ReentrantLock lock = new ReentrantLock();public static Condition condition = lock.newCondition();/*** When an object implementing interface <code>Runnable</code> is used* to create a thread, starting the thread causes the object's* <code>run</code> method to be called in that separately executing* thread.* <p>* The general contract of the method <code>run</code> is that it may* take any action whatsoever.** @see Thread#run()*/@Overridepublic void run() {try {lock.lock();condition.await();System.out.println("Thread is going on");} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}public static void main(String[] args) throws InterruptedException {ReenterLockCondition r1 = new ReenterLockCondition();Thread t1 = new Thread(r1);t1.start();Thread.sleep(2000);lock.lock();condition.signal();lock.unlock();}
}
使得當(dāng)前線程被掛起,等待操作,信號量是什么呢,對于我們這個鎖來講,它是互斥的,排他的,它是exclusive的,我進(jìn)去了,沒有人再能進(jìn)去,它是絕對嚴(yán)格的保護(hù),當(dāng)我有一個線程進(jìn)了這個區(qū)間之后呢,另外不可能再有線程能夠進(jìn)入到這個區(qū)間里面去,信號量是一個許可為一的,信號量是我允許若干個線程,進(jìn)入到這個區(qū)間里面來,臨界區(qū)里面來,但是超過我許可范圍的線程呢,必須等待,他可以認(rèn)為是一個廣義上的鎖,也可以認(rèn)為是一個共享鎖,他可以有多個線程共享去使用這個臨界區(qū),比如我們這個信號量當(dāng)中,假使有10個許可,每一個許可可以分配給10個線程,當(dāng)然一個線程也可以那兩三個許可,你可以根據(jù)業(yè)務(wù)的需求,每個線程可以拿幾個許可,比如10個線程,信號量允許多個線程進(jìn)入臨界區(qū),許可數(shù)量唯一的時候就相當(dāng)于一把鎖,我要處理多個請求,如果系統(tǒng)的負(fù)載有限,只能同時處理10個請求的任務(wù),超過10個我們就沒有能力處理,這個時候我們就可以使用信號量去控制,當(dāng)有10個線程進(jìn)來的時候呢,那么我就給他做執(zhí)行,超過10個我就讓他做等待,這個是一個非常簡單的使用系統(tǒng)做這個功能控制,信號量是一個共享鎖
/*** Acquires a permit from this semaphore, blocking until one is* available, or the thread is {@linkplain Thread#interrupt interrupted}.** <p>Acquires a permit, if one is available and returns immediately,* reducing the number of available permits by one.** <p>If no permit is available then the current thread becomes* disabled for thread scheduling purposes and lies dormant until* one of two things happens:* <ul>* <li>Some other thread invokes the {@link #release} method for this* semaphore and the current thread is next to be assigned a permit; or* <li>Some other thread {@linkplain Thread#interrupt interrupts}* the current thread.* </ul>** <p>If the current thread:* <ul>* <li>has its interrupted status set on entry to this method; or* <li>is {@linkplain Thread#interrupt interrupted} while waiting* for a permit,* </ul>* then {@link InterruptedException} is thrown and the current thread's* interrupted status is cleared.** @throws InterruptedException if the current thread is interrupted*/
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}也可以讓一個線程拿多個許可/*** Acquires the given number of permits from this semaphore,* blocking until all are available,* or the thread is {@linkplain Thread#interrupt interrupted}.** <p>Acquires the given number of permits, if they are available,* and returns immediately, reducing the number of available permits* by the given amount.** <p>If insufficient permits are available then the current thread becomes* disabled for thread scheduling purposes and lies dormant until* one of two things happens:* <ul>* <li>Some other thread invokes one of the {@link #release() release}* methods for this semaphore, the current thread is next to be assigned* permits and the number of available permits satisfies this request; or* <li>Some other thread {@linkplain Thread#interrupt interrupts}* the current thread.* </ul>** <p>If the current thread:* <ul>* <li>has its interrupted status set on entry to this method; or* <li>is {@linkplain Thread#interrupt interrupted} while waiting* for a permit,* </ul>* then {@link InterruptedException} is thrown and the current thread's* interrupted status is cleared.* Any permits that were to be assigned to this thread are instead* assigned to other threads trying to acquire permits, as if* permits had been made available by a call to {@link #release()}.** @param permits the number of permits to acquire* @throws InterruptedException if the current thread is interrupted* @throws IllegalArgumentException if {@code permits} is negative*/
public void acquire(int permits) throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();sync.acquireSharedInterruptibly(permits);
}申請給定量許可,阻塞直到所有的許可可用https://www.cnblogs.com/ten951/p/6212132.html
package com.learn.thread;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;public class SemapDemo implements Runnable {final Semaphore semp = new Semaphore(5);//允許五個許可/*** When an object implementing interface <code>Runnable</code> is used* to create a thread, starting the thread causes the object's* <code>run</code> method to be called in that separately executing* thread.* <p>* The general contract of the method <code>run</code> is that it may* take any action whatsoever.** @see Thread#run()*/@Overridepublic void run() {try {semp.acquire();Thread.sleep(2000);System.out.println(Thread.currentThread().getId() + ":done!");semp.release();} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) {ExecutorService exec = Executors.newFixedThreadPool(20);//容量為20的線程池final SemapDemo demo = new SemapDemo();for (int i = 0; i < 20; i++) {exec.submit(demo);}}
}
許可的釋放最好是寫到finally里面,萬一你忘記釋放了,你的許可就憑空蒸發(fā)掉了,永遠(yuǎn)就沒有了,現(xiàn)在我有20個線程,但是我許可數(shù)量是5個,信號量構(gòu)造的時候你可以指定許可,因此前5個線程去拿的時候呢,必然可以一下子拿到,進(jìn)來就打印這個done,但是你后面的線程,進(jìn)來的時候你未必可以馬上拿到,因為我這個線程要休眠兩秒鐘,這是一個比較長的時間,線程的提交是很快的,我一下子把20個線程提交上來了,但是我前5個在里面待2秒,在2秒鐘之內(nèi)我只能做5個,另外的15個被等待,等待完了之后呢,后面5個進(jìn)來之后呢,前5個一下子做完了,過兩秒鐘在一起刷一下,這個程序沒有結(jié)束,是因為這個線程池沒有結(jié)束掉,所有的線程不是Daemon線程,所以不會結(jié)束,事實上工作已經(jīng)完成了,一個線程可以拿若干個許可,每個線程我給他拿兩個許可,這樣我們可以更加靈活的去共享我們的資源,應(yīng)該分配給誰,信號量也是堆資源的一種分配,下面我們來看一下ReadWriteLock,讀寫鎖,為什么會有讀寫鎖這種東西呢,對于重入鎖來講,我們傳統(tǒng)的synchronized來說,他的鎖是部分線程功能的,有些時候讀和寫是兩種不同的操作,因此我們可以想象一下,如果在你讀這個數(shù)據(jù)當(dāng)中,你不分青紅皂白都去加鎖,那對這個性能是有很大殺傷力的,如果你都去讀的,我們可以以一種比較開放的姿態(tài)去看這個問題,都是讀的線程你就不應(yīng)該加鎖,大家都應(yīng)該能進(jìn)去,但是如果你有寫的線程發(fā)生,寫的事件發(fā)生,那在這種情況下,因為寫有可能修改數(shù)據(jù),修改數(shù)據(jù)會導(dǎo)致你讀到的數(shù)據(jù)呢,會發(fā)生不一致,因此當(dāng)有寫發(fā)生的時候呢,我們才有加鎖這個操作,因此從功能上面講,我們將這個鎖進(jìn)行功能上的劃分,使得我們性能有很大的提高,并行度能夠提高,畢竟有一點,加鎖之后,并行度是1,也就是一次只有一個線程能夠進(jìn)去,這完全不符合我們高并發(fā)的一個概念,高并發(fā)應(yīng)該是一次有好多線程在跑,我一次只能跑一個線程,那只是傳統(tǒng)意義上的并發(fā),ReadWriteLock才有一點點高并發(fā)的意思在里面,我允許你很多線程一起做read,我們講過一個無等待的阻塞,顯然我們的ReentrantLock,我們的Synchronized,都是阻塞的并行,無等待的并發(fā),它會把這個線程掛起,ReadWriteLock如果沒有write線程發(fā)生,所有的read線程都是無等待的并發(fā),JDK5提供的一個具有讀寫分離的一個鎖,我們看讀寫鎖的情況
我寫到一半,你會讀到不一致的情況,所以讀寫之間還是要做一些互斥的操作,下面我們來看一下CountDownLatch,從名字上可以看到,他就是一個倒數(shù)計時器,10,9,8,7,6,...0,是一個倒數(shù),這個可以用到一個什么地方呢,比如一個非常典型的場景呢,就是在發(fā)射火箭的時候,在發(fā)射火箭之前,可能會進(jìn)行一個各項的檢查,是不是符合發(fā)射的條件,那么每一項檢查呢,都可以看做有一個單獨的線程去執(zhí)行的,那么在這個檢查的過程呢,每一個線程都有自己的檢查任務(wù),如果我們一共有10個檢查項,假設(shè)我們有10個檢查項的話,每當(dāng)一個線程完成自己的檢查任務(wù)之后呢,他就做一個countdown,自己任務(wù)就完成了,到達(dá)了他的一個執(zhí)行目標(biāo),當(dāng)所有的線程都完成任務(wù)了呢,我們的計時器就會清零,一共有10個線程去做這個事情,他每一個做完之后呢,他就會countdown,減到0之后呢,最終等待在countdown上的主線程,比如以火箭發(fā)射為例呢,那么主線程就是火箭發(fā)射本身,火箭發(fā)射發(fā)現(xiàn)所有的任務(wù)都完成了,這個時候await就會返回,不會再去等待,就返回了,就可以執(zhí)行后面的一些事情,還有一個示意圖就是這樣子,主線程會在這個臨界線上做一個等待,等待發(fā)射
其他檢查任務(wù)可能就分別執(zhí)行,過程可能也需要花費一些時間,并不會馬上就做完,因此主線程就在這里做等待,所有的檢查任務(wù)全部都到臨界點,全部都執(zhí)行完畢之后,主線程才會到這個點上全部執(zhí)行,所以countdownLatch可以簡單的看成是一個柵欄,整個線程按照時間的執(zhí)行上面呢,畫了一條線,使得所有的線程都要到了那個點為止,我們的主線程,他才能夠繼續(xù)往下走
package com.learn.thread;import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class CountDownLatchDemo implements Runnable {static final CountDownLatch end = new CountDownLatch(10);static final CountDownLatchDemo demo = new CountDownLatchDemo();/*** When an object implementing interface <code>Runnable</code> is used* to create a thread, starting the thread causes the object's* <code>run</code> method to be called in that separately executing* thread.* <p>* The general contract of the method <code>run</code> is that it may* take any action whatsoever.** @see Thread#run()*/@Overridepublic void run() {try {Thread.sleep(new Random().nextInt(10) * 1000);System.out.println("check complete");end.countDown();//完成 可以減1} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) throws InterruptedException {ExecutorService exec = Executors.newFixedThreadPool(10);for (int i = 0; i < 10; i++) {exec.submit(demo);}end.await();//主線程等待所有10個線程完成任務(wù) 才能繼續(xù)執(zhí)行System.out.println("Fire!");exec.shutdown();}
}
我們有10個線程,有10個檢查任務(wù),是需要在主線程開始之前,如果沒有跑完主線程會一直等待,在模擬一個任務(wù)或者檢查,檢查完畢之后會打印一個檢查完畢,然后countdown,表示自己完成了,做幾個countdown之后,程序啟動之后我們開啟10個線程,每個線程都會去做這個run,等到10個都跑完之后,這個countdown減掉之后呢,我們這個await才會返回,返回之后就發(fā)射火箭,因為每個線程都睡了隨機的時間,在這個線程完成之前,是不會做這個發(fā)射操作的,所有10個線程都完成了,那就做這個發(fā)射操作,所以這個應(yīng)用場景,其實在我們的實際業(yè)務(wù)當(dāng)中是非常普遍的,要等待他的準(zhǔn)備業(yè)務(wù)完成之后才能操作,那么你這個準(zhǔn)備業(yè)務(wù)怎么去通知主線程,我們就可以用countdown去通知,我都可以包裝在countdownlatch上面,下面是CyclicBarrier,循環(huán)柵欄,和CountDownLatch是非常相像的,Cyclic是循環(huán),CountDownLatch只是一次計數(shù),Cyclic他可以反復(fù)的使用,他可以一批一批的去執(zhí)行,比如我要做10個線程,我第二批10個線程到了之后呢,我主線程再工作一次,第三個10個線程到了之后呢,我主線程還可以再工作一次,他就是循環(huán)的一個姿態(tài),主要接口也非常的相像,等待所有的參與者都到達(dá)了之后,我才能夠繼續(xù)往下執(zhí)行
當(dāng)有一個士兵到了你不能叫做集合完成,當(dāng)有一個士兵到達(dá)了之后,要等待其他的士兵全部到達(dá),才叫集合完畢,所以這個士兵會等待所有的士兵到達(dá),到達(dá)完畢之后呢,下達(dá)任務(wù),所有士兵都會分別去執(zhí)行自己的任務(wù),當(dāng)有一個士兵執(zhí)行完成呢,我們并不能表明我們總體任務(wù)完成,我要等待所有的任務(wù)都執(zhí)行完成了,我們才能說這個任務(wù)完成了,其實他們是可以復(fù)用同一個Cyclic的,這里10個等待完畢,這里10個任務(wù)完成,都用同一個實例https://www.cnblogs.com/ten951/p/6212160.html
package com.learn.thread;import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;public class CyclicBarrierDemo {public static class Soldier implements Runnable {private String soldier;private final CyclicBarrier cyclic;public Soldier(CyclicBarrier cyclic, String soldier) {this.soldier = soldier;this.cyclic = cyclic;}/*** When an object implementing interface <code>Runnable</code> is used* to create a thread, starting the thread causes the object's* <code>run</code> method to be called in that separately executing* thread.* <p>* The general contract of the method <code>run</code> is that it may* take any action whatsoever.** @see Thread#run()*/@Overridepublic void run() {try {//等待所有士兵到齊cyclic.await();doWork();//等待所有士兵完成工作cyclic.await();} catch (InterruptedException e) {//在等待過程中,線程被中斷e.printStackTrace();} catch (BrokenBarrierException e) {//表示當(dāng)前CyclicBarrier已經(jīng)損壞.系統(tǒng)無法等到所有線程到齊了.e.printStackTrace();}}void doWork() {try {Thread.sleep(Math.abs(new Random().nextInt() % 10000));} catch (InterruptedException e) {e.printStackTrace();}System.out.println(soldier + ":任務(wù)完成");}}public static class BarrierRun implements Runnable {boolean flag;int N;public BarrierRun(boolean flag, int N) {this.flag = flag;this.N = N;}/*** When an object implementing interface <code>Runnable</code> is used* to create a thread, starting the thread causes the object's* <code>run</code> method to be called in that separately executing* thread.* <p>* The general contract of the method <code>run</code> is that it may* take any action whatsoever.** @see Thread#run()*/@Overridepublic void run() {if (flag) {System.out.println("司令:[士兵" + N + "個,任務(wù)完成!]");} else {System.out.println("司令:[士兵" + N + "個,集合完畢!]");flag = true;}}}public static void main(String[] args) {final int N = 10;Thread[] allSoldier = new Thread[N];boolean flag = false;CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N));//設(shè)置屏障點,主要為了執(zhí)行這個方法System.out.println("集合隊伍! ");for (int i = 0; i < N; i++) {System.out.println("士兵" + i + "報道! ");allSoldier[i] = new Thread(new Soldier(cyclic, "士兵" + i));allSoldier[i].start();}}
}
一旦調(diào)用lockSupport就會被掛起了,unpark可以繼續(xù)往下執(zhí)行,他和suspend一樣,都是掛起,但是有個本質(zhì)的不同,Support是可以毫無顧忌的去使用,但是suspend不是,是不建議使用的,LockSupport他有一個什么好處呢,他的思想有點類似于信號量的思想,我內(nèi)部會產(chǎn)生一個許可的東西,那我park的時候呢,unpark就是去申請這個許可,因此他有一個特點是說,如果不幸的發(fā)生unpark,發(fā)現(xiàn)發(fā)生在park之前,那我這個park并不會把這個線程給阻塞,這個和suspend是不一樣的,如果resume發(fā)生在suspend之前,現(xiàn)在還是會被掛起,但是如果unpark發(fā)生在park之前,那我park是掛不住的
package com.learn.thread;import java.util.concurrent.locks.LockSupport;public class LockSupportDemo {public static Object u = new Object();static ChangeObjectThread t1 = new ChangeObjectThread("t1");static ChangeObjectThread t2 = new ChangeObjectThread("t2");public static class ChangeObjectThread extends Thread {public ChangeObjectThread(String name) {super.setName(name);}public void run() {synchronized (u) {System.out.println("in" + getName());LockSupport.park();}}}public static void main(String[] args) throws InterruptedException {t1.start();Thread.sleep(100);t2.start();LockSupport.unpark(t1);LockSupport.unpark(t2);t1.join();t2.join();}
}
就算我unpark發(fā)生在park之前,相當(dāng)于許可被拿掉了,不會吧線程給阻塞住,unpark是使得一個許可可用,park會使得線程掛起,什么時候可以讓這個線程繼續(xù)往下走呢,第一種是調(diào)用unpark,第二種是中斷,大部分wait函數(shù),他在中斷之后呢,都會拋出一個中斷異常,但是你要注意的是,park不會,他不會拋出中斷異常,這個操作是沒有異常拋出的,但是他會響應(yīng)中斷,如果中斷發(fā)生,這個park會立即返回,另外線程中斷了當(dāng)前線程,如果說這個地方被中斷了,但是從Park本身是不可以判斷是否被中斷的,park有這么一個特點,能夠響應(yīng)中斷,但是不拋出異常,JDK內(nèi)部是使用的非常廣泛的,它是一個比較底層的原理操作,把這個線程給掛起,內(nèi)部也是使用了unsafe這個類,/*** Disables the current thread for thread scheduling purposes unless the* permit is available.** <p>If the permit is available then it is consumed and the call* returns immediately; otherwise the current thread becomes disabled* for thread scheduling purposes and lies dormant until one of three* things happens:** <ul>** <li>Some other thread invokes {@link #unpark unpark} with the* current thread as the target; or** <li>Some other thread {@linkplain Thread#interrupt interrupts}* the current thread; or** <li>The call spuriously (that is, for no reason) returns.* </ul>** <p>This method does <em>not</em> report which of these caused the* method to return. Callers should re-check the conditions which caused* the thread to park in the first place. Callers may also determine,* for example, the interrupt status of the thread upon return.*/
public static void park() {UNSAFE.park(false, 0L);
}有些地方會直接調(diào)用unsafe的park,看一下重入鎖ReentrantLock基本的實現(xiàn)思路
重入鎖它是一個應(yīng)用級的東西,不是一個系統(tǒng)級的東西,LockSupport它是一個系統(tǒng)級的東西,它是調(diào)用了一些native的API,重入鎖本身它是一個應(yīng)用級實現(xiàn),有一些他調(diào)用了LockSupport,直接的實現(xiàn)是JAVA的實現(xiàn),他的實現(xiàn)有三個內(nèi)容是比較重要的,第一個是CAS狀態(tài),無鎖的操作我們在前面也有介紹,CAS狀態(tài)做什么事情呢,判斷這個鎖到底有沒有被人占用,比如0沒有被占用,1表示被占用了,鎖的本質(zhì)內(nèi)部是一個CAS操作,用它來修改某一個變量,這個變量能不能修改成功,第二個是等待隊列,如果我沒有拿到這個鎖,那我就進(jìn)入一個等待的隊列,多個線程要排隊,內(nèi)部必須要維護(hù)一個等待隊列,把所有等待在這個鎖上的線程呢,都給保存起來,等待在隊列上的線程呢,類似LockSupport當(dāng)中的park操作,只要我進(jìn)入了等待隊列的線程呢,我都要park把它掛起,我什么時候把它unpark呢,讓他繼續(xù)執(zhí)行呢,當(dāng)我前面的線程把鎖unlock的時候,我就從等待隊列當(dāng)中挑一個出來,來做unpark操作,這個就是ReentrantLock實現(xiàn)的一個根本,/*** Acquires the lock.** <p>Acquires the lock if it is not held by another thread and returns* immediately, setting the lock hold count to one.** <p>If the current thread already holds the lock then the hold* count is incremented by one and the method returns immediately.** <p>If the lock is held by another thread then the* current thread becomes disabled for thread scheduling* purposes and lies dormant until the lock has been acquired,* at which time the lock hold count is set to one.*/
public void lock() {sync.lock();
}這里有兩個實現(xiàn),一個公平的,一個是非公平的,我們來看一下非公平的實現(xiàn)/*** Performs {@link Lock#lock}. The main reason for subclassing* is to allow fast path for nonfair version.*/
abstract void lock();/*** Sync object for non-fair locks*/
static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;/*** Performs lock. Try immediate barge, backing up to normal* acquire on failure.*/final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}
}首先是compareAndSetState比較設(shè)置CAS操作,acquire這個是鎖申請,/*** Acquires in exclusive mode, ignoring interrupts. Implemented* by invoking at least once {@link #tryAcquire},* returning on success. Otherwise the thread is queued, possibly* repeatedly blocking and unblocking, invoking {@link* #tryAcquire} until success. This method can be used* to implement method {@link Lock#lock}.** @param arg the acquire argument. This value is conveyed to* {@link #tryAcquire} but is otherwise uninterpreted and* can represent anything you like.*/
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}加到等待隊列當(dāng)中去/*** Creates and enqueues node for current thread and given mode.** @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared* @return the new node*/
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;
}等待隊列的節(jié)點是Node,Node是對線程的包裝,節(jié)點可以把等待的線程信息都拿出來
?
總結(jié)