java concurrent包介绍及使用
2019獨角獸企業重金招聘Python工程師標準>>>
說一說java的concurrent包1-concurrent包簡介
前面一個系列的文章都在圍繞hash展開,今天準備先說下concurrent包,這個系列可能會以使用場景說明為主,concurrent包本身的代碼分析可能比較少; 我在這方面的實踐經驗較為有限,有錯誤歡迎批評指正?
不過前一個系列并未結束,還有一些文章沒有放出來,歡迎關注核桃博客?concurrent包是jdk1.5引入的重要的包,主要代碼由大牛Doug Lea完成,其實是在jdk1.4時代,由于java語言內置對多線程編程的支持比較基礎和有限,所以他寫了這個,因為實在太過于優秀,所以被加入到jdk之中;?
通常所說的concurrent包基本有3個package組成?
java.util.concurrent:提供大部分關于并發的接口和類,如BlockingQueue,Callable,ConcurrentHashMap,ExecutorService, Semaphore等?
java.util.concurrent.atomic:提供所有原子操作的類, 如AtomicInteger, AtomicLong等;?
java.util.concurrent.locks:提供鎖相關的類, 如Lock, ReentrantLock, ReadWriteLock, Condition等;?
concurrent包的優點:?
1. 首先,功能非常豐富,諸如線程池(ThreadPoolExecutor),CountDownLatch等并發編程中需要的類已經有現成的實現,不需要自己去實現一套; 畢竟jdk1.4對多線程編程的主要支持幾乎就只有Thread, Runnable,synchronized等?
2. concurrent包里面的一些操作是基于硬件級別的CAS(compare and swap),就是在cpu級別提供了原子操作,簡單的說就可以提供無阻塞、無鎖定的算法; 而現代cpu大部分都是支持這樣的算法的;
說一說java的concurrent包2-等待多個線程完成執行的CountDownLatch?
前面一篇說了concurrent包的基本結構,接下來首先看一下一個非常有用的類,CountDownLatch, 可以用來在一個線程中等待多個線程完成任務的類;?
前面一篇說了concurrent包的基本結構,接下來首先看一下一個非常有用的類,CountDownLatch, 可以用來在一個線程中等待多個線程完成任務的類;?通常的使用場景是,某個主線程接到一個任務,起了n個子線程去完成,但是主線程需要等待這n個子線程都完成任務了以后才開始執行某個操作;?
下面是一段演示代碼?
Java代碼??
@Test?? public?void?demoCountDown()?? {??int?count?=?10;??final?CountDownLatch?l?=?new?CountDownLatch(count);??for(int?i?=?0;?i?<?count;?++i)??{??final?int?index?=?i;??new?Thread(new?Runnable()?{??@Override??public?void?run()?{??try?{??Thread.currentThread().sleep(20?*?1000);??}?catch?(InterruptedException?e)?{??e.printStackTrace();??}??System.out.println("thread?"?+?index?+?"?has?finished...");??l.countDown();??}??}).start();??}??try?{??l.await();??}?catch?(InterruptedException?e)?{??e.printStackTrace();??}??System.out.println("now?all?threads?have?finished");??}運行的結果?thread 1 has finished...?
thread 3 has finished...?
thread 4 has finished...?
thread 6 has finished...?
thread 8 has finished...?
thread 0 has finished...?
thread 7 has finished...?
thread 9 has finished...?
thread 2 has finished...?
thread 5 has finished...?
now all threads have finished?
前面10個線程的執行完成順序會變化,但是最后一句始終會等待前面10個線程都完成之后才會執行
說一說java的concurrent包3-線程安全并且無阻塞的Atomic類?
有了CountDownLatch,涉及到多線程同步的演示就比較容易了,接下來我們看下Atomic相關的類, 比如AtomicLong, AtomicInteger等這些;?
有了CountDownLatch,涉及到多線程同步的演示就比較容易了,接下來我們看下Atomic相關的類, 比如AtomicLong, AtomicInteger等這些;?簡單的說,這些類都是線程安全的,支持無阻塞無鎖定的?
Java代碼??
set()??
get()??
getAndSet()??
getAndIncrement()??
getAndDecrement()??
getAndAdd()??
等操作?
下面是一個測試代碼?
Java代碼??
package?com.hetaoblog.concurrent.test;??import?java.util.concurrent.CountDownLatch;?? import?java.util.concurrent.atomic.AtomicLong;??import?org.junit.Test;?? /**?*?*?by?http://www.hetaoblog.com?*?@author?hetaoblog?*?*/?? public?class?AtomicTest?{??@Test??public?void?testAtomic()??{??final?int?loopcount?=?10000;??int?threadcount?=?10;??final?NonSafeSeq?seq1?=?new?NonSafeSeq();??final?SafeSeq?seq2?=?new?SafeSeq();??final?CountDownLatch?l?=?new?CountDownLatch(threadcount);??for(int?i?=?0;?i?<?threadcount;?++i)??{??final?int?index?=?i;??new?Thread(new?Runnable()?{??@Override??public?void?run()?{??for(int?j?=?0;?j?<?loopcount;?++j)??{??seq1.inc();??seq2.inc();??}??System.out.println("finished?:?"?+?index);??l.countDown();??}??}).start();??}??try?{??l.await();??}?catch?(InterruptedException?e)?{??e.printStackTrace();??}??System.out.println("both?have?finished....");??System.out.println("NonSafeSeq:"?+?seq1.get());??System.out.println("SafeSeq?with?atomic:?"?+?seq2.get());??}?? }??class?NonSafeSeq{??private?long?count?=?0;??public?void?inc()??{??count++;??}??public?long??get()??{??return?count;??}?? }??class?SafeSeq{??private?AtomicLong?count??=?new?AtomicLong(0);??public?void?inc()??{??count.incrementAndGet();??}??public?long?get()??{??return?count.longValue();??}?? }其中NonSafeSeq是作為對比的類,直接放一個private long count不是線程安全的,而SafeSeq里面放了一個AtomicLong,是線程安全的;可以直接調用incrementAndGet來增加?運行代碼,可以得到類似這樣的結果?
finished : 1?
finished : 0?
finished : 3?
finished : 2?
finished : 5?
finished : 4?
finished : 6?
finished : 8?
finished : 9?
finished : 7?
both have finished....?
NonSafeSeq:91723?
SafeSeq with atomic: 100000?
可以看到,10個線程,每個線程運行了10,000次,理論上應該有100,000次增加,使用了普通的long是非線程安全的,而使用了AtomicLong是線程安全的;?
注意,這個例子也說明,雖然long本身的單個設置是原子的,要么成功要么不成功,但是諸如count++這樣的操作就不是線程安全的;因為這包括了讀取和寫入兩步操作;
說一說java的concurrent包4--可以代替synchronized關鍵字的ReentrantLock?
在jdk 1.4時代,線程間的同步主要依賴于synchronized關鍵字,本質上該關鍵字是一個對象鎖,可以加在不同的instance上或者class上,從使用的角度則分別可以加在非靜態方法,靜態方法,以及直接synchronized(MyObject)這樣的用法;?
在jdk 1.4時代,線程間的同步主要依賴于synchronized關鍵字,本質上該關鍵字是一個對象鎖,可以加在不同的instance上或者class上,從使用的角度則分別可以加在非靜態方法,靜態方法,以及直接synchronized(MyObject)這樣的用法;?concurrent包提供了一個可以替代synchronized關鍵字的ReentrantLock,?
簡單的說你可以new一個ReentrantLock, 然后通過lock.lock和lock.unlock來獲取鎖和釋放鎖;注意必須將unlock放在finally塊里面,?
reentrantlock的好處?
1. 是更好的性能,?
2. 提供同一個lock對象上不同condition的信號通知?
3. 還提供lockInterruptibly這樣支持響應中斷的加鎖過程,意思是說你試圖去加鎖,但是當前鎖被其他線程hold住,然后你這個線程可以被中斷;?
簡單的一個例子:?
Java代碼??
package?com.hetaoblog.concurrent.test;??import?java.util.concurrent.CountDownLatch;?? import?java.util.concurrent.locks.ReentrantLock;??import?org.junit.Test;??public?class?ReentrantLockDemo?{??@Test??public?void?demoLock()??{??final?int?loopcount?=?10000;??int?threadcount?=?10;??final?SafeSeqWithLock?seq?=?new?SafeSeqWithLock();??final?CountDownLatch?l?=?new?CountDownLatch(threadcount);??for(int?i?=?0;?i?<?threadcount;?++i)??{??final?int?index?=?i;??new?Thread(new?Runnable()?{??@Override??public?void?run()?{??for(int?j?=?0;?j?<?loopcount;?++j)??{??seq.inc();??}??System.out.println("finished?:?"?+?index);??l.countDown();??}??}).start();??}??try?{??l.await();??}?catch?(InterruptedException?e)?{??e.printStackTrace();??}??System.out.println("both?have?finished....");??System.out.println("SafeSeqWithLock:"?+?seq.get());??}?? }??class?SafeSeqWithLock{??private?long?count?=?0;??private?ReentrantLock?lock?=?new?ReentrantLock();??public?void?inc()??{??lock.lock();??try{??count++;??}??finally{??lock.unlock();??}??}??public?long?get()??{??return?count;??}?? }同樣以前面的類似Sequence的類舉例,通過對inc操作加鎖,保證了線程安全;?當然,這里get()我沒有加鎖,對于這樣直接讀取返回原子類型的函數,我認為不加鎖是沒問題的,相當于返回最近成功操作的值;?
運行結果類似這樣,?
finished : 7?
finished : 2?
finished : 6?
finished : 1?
finished : 5?
finished : 3?
finished : 0?
finished : 9?
finished : 8?
finished : 4?
both have finished....?
SafeSeqWithLock:100000
說一說java的concurrent包5--讀寫鎖ReadWriteLock?
concurrent包里面還提供了一個非常有用的鎖,讀寫鎖ReadWriteLock?
concurrent包里面還提供了一個非常有用的鎖,讀寫鎖ReadWriteLock?下面是ReadWriteLock接口的說明:?
A ReadWriteLock maintains a pair of associated locks, one for read-only operations and one for writing. The read lock may be held simultaneously by multiple reader threads, so long as there are no writers. The write lock is exclusive.?
意思是說讀鎖可以有很多個鎖同時上鎖,只要當前沒有寫鎖;?
寫鎖是排他的,上了寫鎖,其他線程既不能上讀鎖,也不能上寫鎖;同樣,需要上寫鎖的前提是既沒有讀鎖,也沒有寫鎖;?
兩個寫鎖不能同時獲得無需說明,下面一段程序說明下上了讀鎖以后,其他線程需要上寫鎖也無法獲得?
Java代碼??
@Test?? public?void?testRWLock_getw_onr()?? {??ReentrantReadWriteLock?lock?=?new?ReentrantReadWriteLock();??final?Lock?rlock?=?lock.readLock();??final?Lock?wlock?=?lock.writeLock();??final?CountDownLatch?l??=?new?CountDownLatch(2);??//?start?r?thread??new?Thread(new?Runnable()?{??@Override??public?void?run()?{??System.out.println(new?Date()?+?"now?to?get?rlock");??rlock.lock();??try?{??Thread.currentThread().sleep(20?*?1000);??}?catch?(InterruptedException?e)?{??e.printStackTrace();??}??System.out.println(new?Date()?+?"now?to?unlock?rlock");??rlock.unlock();??l.countDown();??}??}).start();??//?start?w?thread??new?Thread(new?Runnable()?{??@Override??public?void?run()?{??System.out.println(new?Date()?+?"now?to?get?wlock");??wlock.lock();??System.out.println(new?Date()?+?"now?to?unlock?wlock");??wlock.unlock();??l.countDown();??}??}).start();??try?{??l.await();??}?catch?(InterruptedException?e)?{??e.printStackTrace();??}??System.out.println(new?Date()?+?"finished");?? }這代碼在我機器上打印的結果是, 也就是試圖獲得寫鎖的線程只有當另外一個線程將讀鎖釋放了以后才可以獲得?Tue Feb 28 23:18:13 CST 2012now to get rlock?
Tue Feb 28 23:18:13 CST 2012now to get wlock?
Tue Feb 28 23:18:33 CST 2012now to unlock rlock?
Tue Feb 28 23:18:33 CST 2012now to unlock wlock?
Tue Feb 28 23:18:33 CST 2012finished?
ReadWriteLock的實現是ReentrantReadWriteLock,?
有趣的是,在一個線程中,讀鎖不能直接升級為寫鎖,但是寫鎖可以降級為讀鎖;?
這意思是,如果你已經有了讀鎖,再去試圖獲得寫鎖,將會無法獲得, 一直堵住了;?
但是如果你有了寫鎖,再去試圖獲得讀鎖,沒問題;?
下面是一段降級的代碼,?
Java代碼??
@Test?? public?void?testRWLock_downgrade()?? {??ReentrantReadWriteLock?lock?=?new?ReentrantReadWriteLock();??Lock?rlock?=?lock.readLock();??Lock?wlock?=?lock.writeLock();??System.out.println("now?to?get?wlock");??wlock.lock();??System.out.println("now?to?get?rlock");??rlock.lock();??System.out.println("now?to?unlock?wlock");??wlock.unlock();??System.out.println("now?to?unlock?rlock");??rlock.unlock();??System.out.println("finished");??}可以正常打印出?now to get wlock?
now to get rlock?
now to unlock wlock?
now to unlock rlock?
finished?
下面是一段升級的代碼,?
Java代碼??
@Test??public?void?testRWLock_upgrade()??{??ReentrantReadWriteLock?lock?=?new?ReentrantReadWriteLock();??Lock?rlock?=?lock.readLock();??Lock?wlock?=?lock.writeLock();??System.out.println("now?to?get?rlock");??rlock.lock();??System.out.println("now?to?get?wlock");??wlock.lock();??System.out.println("now?to?unlock?wlock");??wlock.unlock();??System.out.println("now?to?unlock?rlock");??rlock.unlock();??System.out.println("finished");??}只能打印出下面兩句,后面就一直掛住了?now to get rlock?
now to get wlock
說一說java的concurrent包6–java里面的線程基礎類Thread?
有網友建議我在介紹concurrent包之前先介紹下jdk1.5之前的多線程知識,這是個相當不錯的想法, 這篇就先介紹下Thread類;?
有網友建議我在介紹concurrent包之前先介紹下jdk1.5之前的多線程知識,這是個相當不錯的想法, 這篇就先介紹下Thread類;?Thread類是java中的線程,幾乎所有的多線程都在Thread這個類的基礎之后展開;?
下面介紹這個類的基本用法,Thread類的最基本函數就是run函數?
public void run()?
簡單的說來,基本的創建一個完成自己功能的線程可以繼承Thread類,然后override這個run方法, 如下所示?
Java代碼??
public?class?ThreadDemo?{??@Test??public?void?testThread()??{??SimpleThread?t?=?new?SimpleThread();??t.start();??}???}?? class?SimpleThread?extends?Thread{??@Override??public?void?run()?{??System.out.println(?Thread.currentThread().getName()?+?"?is?running??");??}?? }通常在run方法里面實現自己要做的功能,這里簡單的打印了了一句話, 運行結果是?Thread-0 is running?
啟動一個線程就是new一個自己的Thread對象,然后調用其中的start方法啟動這個線程;注意, run()方法運行結束之后這個線程的生命周期就結束了;?
上面舉的例子是說啟動一個線程就去完成一個任務,有的時候我們需要一個線程始終在跑,定期執行一些任務,然后在某個時刻停止這個線程的運行; 那么可以有類似下面的一段代碼:?
Java代碼??
public?class?ThreadDemo?{??public?static?void?main(String[]?args)??{??PeriodicalRunningThread?t?=?new?PeriodicalRunningThread();??t.start();??System.out.println("main?thread?is?going?to?sleep...");??try?{??Thread.currentThread().sleep(20?*?1000);??}?catch?(InterruptedException?e)?{??e.printStackTrace();??}??System.out.println(new?Date()?+?"?now?to?stop?PeriodicalRunningThread");??t.setRunning(false);??}??}???class?PeriodicalRunningThread?extends?Thread{??private?volatile?boolean?running?=?true;??@Override??public?void?run()?{??while(running)??{??System.out.println(new?Date()?+?"?"?+?Thread.currentThread().getName()?+??"?is?running?"?+?new?Date());??try?{??Thread.currentThread().sleep(5?*?1000);??}?catch?(InterruptedException?e)?{??e.printStackTrace();??}??}??System.out.println(new?Date()?+?"?"?+?Thread.currentThread().getName()?+?"?will?end");??}??public?void?setRunning(boolean?running)?{??this.running?=?running;??}??}這段代碼的打印結果是:?main thread is going to sleep…?
Wed Feb 29 21:10:39 CST 2012 Thread-0 is running Wed Feb 29 21:10:39 CST 2012?
Wed Feb 29 21:10:44 CST 2012 Thread-0 is running Wed Feb 29 21:10:44 CST 2012?
Wed Feb 29 21:10:49 CST 2012 Thread-0 is running Wed Feb 29 21:10:49 CST 2012?
Wed Feb 29 21:10:54 CST 2012 Thread-0 is running Wed Feb 29 21:10:54 CST 2012?
Wed Feb 29 21:10:59 CST 2012 now to stop PeriodicalRunningThread?
Wed Feb 29 21:10:59 CST 2012 Thread-0 will end?
這里通過一個volatile的boolean值來作為標識表示這個線程的停止;?
關于這里的volatile關鍵字的使用,如有興趣可以先看這個,核桃博客也會在這個系列的后續文章中對這個關鍵字做說明?
http://www.ibm.com/developerworks/cn/java/j-jtp06197.html?
這樣,在這個running標識為true的時候,該線程一直在跑,但是完成一段任務后會sleep一段時間,然后繼續執行;
說一說java的concurrent包7–Thread和Runnable?
這篇還是Thread和Runnable的基礎?
這篇還是Thread和Runnable的基礎?在前面一篇的代碼里面已經介紹了Thread類的其他幾個常用的方法,?
1. sleep函數,作用是讓當前線程sleep一段時間,單位以毫秒計算;?
public static void sleep(long millis)?
2. 靜態方法Thread.currentThread(), 得到當前線程?
public static Thread currentThread()?
3. getName方法,得到當前線程名稱?
public final String getName()?
這個名稱可以在構造Thread的時候傳入, 也可以通過setName()方法設置;這個在多線程調試的時候是比較有用的,設置當前線程名,然后在log4j的輸出字符串格式里面加入%t,就可以在日志中打印當前線程名稱,方便看到當前的日志是從哪里來的;?
現在介紹下多線程里面另外一個重要的接口Runnable, 這個接口表示可以被一個線程執行的任務,事實上Thread類也實現了這個Runnable接口;?
這個接口只有一個函數, 實現者只要在里面調用代碼就可以了?
void run()?
同時, Thread類有個構造函數是傳入一個Runnable實現的;?
常用的一個用法就是通過匿名內部類來創建線程執行簡單任務,避免寫太多的類,外部需要的變量可以通過加final修飾符后傳入, 代碼例子如下:?
Java代碼??
public?static?void?testThreadWithRunnable()?? {??final?String?word?=?"hello,world";??new?Thread(new?Runnable()?{??@Override??public?void?run()?{??System.out.println(word);??}??}).start();?? }??public?static?void?main(String[]?args)?? {??//periodicalThreadTest();??testThreadWithRunnable();??}上面的代碼會打印?hello,world
說一說java的concurrent包8–用在一個lock上的多個Condition?
concurrent系列的前一篇說到說一說java的concurrent包7–thread和runnable,現在繼續,今天介紹下Condtion這個接口,可以用在一個lock上的多個不同的情況;?在jdk的線程同步代碼中,無論的synchronized關鍵字,或者是lock上的await/signal等,都只能在一個鎖上做同步通知;?
假設有3個線程,要對一個資源做同步,一般只能有一個鎖來做同步通知操作,那么通知的時候無法做到精確的通知3個線程中的某一個的;?
因為你調用了wait()/notify()的時候,具體的調度是jvm決定的;?
但是有的時候的確需要需要對一個鎖做多種不同情況的精確通知, 比如一個緩存,滿了和空了是兩種不同的情況,可以分別通知取數據的線程和放數據的線程;?
Condition的基本使用如下:?
* Condition是個接口,基本的方法就是await()和signal()方法;?
* Condition依賴于Lock接口,生成一個Condition的基本代碼是lock.newCondition()?
* 調用Condition的await()和signal()方法,都必須在lock保護之內,就是說必須在lock.lock()和lock.unlock之間才可以?
* 和Object.wait()方法一樣,每次調用Condition的await()方法的時候,當前線程就自動釋放了對當前鎖的擁有權?
當然,Condition其實是個接口,上面說的這幾點,在實現Condition的時候可以自由控制一點;但是jdk的javadoc說了,如果有啥特別的實現,必須要清楚的說明的;?
下一節我會結合具體的代碼來介紹下Condition的使用;
說一說java的concurrent包9–Condition的代碼例子BoundedBuffer?
面說了Condition的基本含義,今天這篇說下Condition的一個代碼例子;?javadoc里面對Condition有一個絕佳的例子,BoundedBuffer類,就是一個線程安全的有界限的緩存;非常巧妙的利用了Condition,根據來通知不同的線程做不同的事情;?
下面先看下具體代碼:?
Java代碼??
class?BoundedBuffer?{??final?Lock?lock?=?new?ReentrantLock();??final?Condition?notFull??=?lock.newCondition();???final?Condition?notEmpty?=?lock.newCondition();???final?Object[]?items?=?new?Object[100];??int?putptr,?takeptr,?count;??public?void?put(Object?x)?throws?InterruptedException?{??lock.lock();??try?{??while?(count?==?items.length)???notFull.await();??items[putptr]?=?x;???if?(++putptr?==?items.length)?putptr?=?0;??++count;??notEmpty.signal();??}?finally?{??lock.unlock();??}??}??public?Object?take()?throws?InterruptedException?{??lock.lock();??try?{??while?(count?==?0)???notEmpty.await();??Object?x?=?items[takeptr];???if?(++takeptr?==?items.length)?takeptr?=?0;??--count;??notFull.signal();??return?x;??}?finally?{??lock.unlock();??}??}???}代碼意思不復雜,一個有界的buffer,里面是個數組,可以往里面放數據和取數據;?由于該buffer被多個線程共享,所以每次放和取操作的時候都用一個lock保護起來;?
每次取數據(take)的時候,?
a. 如果當前個數是0(用一個count計數), 那么就調用notEmpty.await等待,鎖就釋放了;?
b. 取數據的索引專門有一個,每次向前一步; 如果到頭了就從0開始循環使用?
c.如果有數據,那就取一個數據,將count減1,同時調用notfull.signal(),?
每次放數據(put)的時候?
a.如果count和length相等,也就是滿了,那就調用notFull.await等待,釋放了鎖; 等待有一些take()調用完成之后才會進入?
b. 放數據也有一個索引putptr, 放入數據; 如果到頭了也從0開始循環使用?
c. 調用notempty.signal(); 如果有線程在take()的時候await住了,那么就會被通知到,可以繼續進行操作
說一說java的concurrent包10–Condition和BoundedBuffer的測試代碼
前面一篇說了Condition和BoundedBuffer的基本代碼,下面寫一個簡單的程序測試下這個BoundedBuffer;?
前面一篇說了Condition和BoundedBuffer的基本代碼,下面寫一個簡單的程序測試下這個BoundedBuffer;?這段程序的目的是測試先put()后take()的操作,?
1. 我將BoundedBuffer的大小設置成5,同時在每次進入notFull和notEmpty的await()的時候打印一下表示當前線程正在等待;?
2. 先開啟10個線程做put()操作,預計有5個線程可以完成,另外5個會進入等待?
3. 主線程sleep10秒中,然后啟動10個線程做take()操作;?
這個時候,首先第一個take()必然成功完成,在這之前等待的5個put()線程都不會被喚醒, 接下來的事情就不好說了;?
剩下的5個put()線程和9個take()線程中的任何一個都可能會被jvm調度;?
比如可能出現?
a. 開始take()的時候,有5個連續的take()線程完成操作; 然后又進入put()和take()交替的情況?
b. 第一個take()之后,立刻會有一個put()線程被notFull().signal()喚醒; 然后繼續有take()和put()交替的情況;?
其中take()線程也可能進入notEmpty.await()操作;?
但是任何時候,未完成的take()線程始終>=未完成的put()線程, 這個也是很自然的;?
Java代碼??
package?com.hetaoblog.concurrent.test;??import?java.util.Date;??import?java.util.concurrent.CountDownLatch;??import?java.util.concurrent.locks.Condition;??import?java.util.concurrent.locks.Lock;??import?java.util.concurrent.locks.ReentrantLock;??import?org.junit.Test;??public?class?BoundedBufferTest?{??@Test??public?void?testPutTake()??{??final?BoundedBuffer?bb?=?new?BoundedBuffer();??int?count?=?10;??final?CountDownLatch?c?=?new?CountDownLatch(count?*?2);??System.out.println(new?Date()?+?"?now?try?to?call?put?for?"?+?count?);??for(int?i?=?0;?i?<?count?;?++i)??{??final?int?index?=?i;??try?{??Thread?t?=?new?Thread(new?Runnable()?{??@Override??public?void?run()?{??try?{??bb.put(index);??System.out.println(new?Date()?+?"??put?finished:??"?+?index);??}?catch?(InterruptedException?e)?{??e.printStackTrace();??}??c.countDown();??}??});??t.start();??}?catch?(Exception?e)?{??e.printStackTrace();??}??}??try?{??System.out.println(new?Date()?+?"?main?thread?is?going?to?sleep?for?10?seconds");??Thread.sleep(10?*?1000);??}?catch?(InterruptedException?e1)?{??e1.printStackTrace();??}??System.out.println(new?Date()?+?"?now?try?to?take?for?count:?"?+?count);??for(int?i?=0;?i?<?count;?++i)??{??Thread?t=?new?Thread(new?Runnable()?{??@Override??public?void?run()?{??try?{??Object?o?=?bb.take();??System.out.println(new?Date()?+?"?take?get:?"?+?o);??}?catch?(InterruptedException?e)?{??e.printStackTrace();??}??c.countDown();??}??});??t.start();??}??try?{??System.out.println(new?Date()?+?":?main?thread?is?to?wait?for?all?threads");??c.await();??}?catch?(InterruptedException?e)?{??e.printStackTrace();??}??System.out.println(new?Date()?+?"?all?threads?finished");??}??}??class?BoundedBuffer?{??final?Lock?lock?=?new?ReentrantLock();??final?Condition?notFull??=?lock.newCondition();???final?Condition?notEmpty?=?lock.newCondition();???final?Object[]?items?=?new?Object[5];??int?putptr,?takeptr,?count;??public?void?put(Object?x)?throws?InterruptedException?{??lock.lock();??try?{??while?(count?==?items.length)???{??System.out.println(new?Date()?+?"?put??is?to?wait....");??notFull.await();??}??items[putptr]?=?x;???if?(++putptr?==?items.length)?putptr?=?0;??++count;??notEmpty.signal();??}?finally?{??lock.unlock();??}??}??public?Object?take()?throws?InterruptedException?{??lock.lock();??try?{??while?(count?==?0)??{??System.out.println(new?Date()?+?"?take?is?going?to?wait..");??notEmpty.await();??}??Object?x?=?items[takeptr];???if?(++takeptr?==?items.length)?takeptr?=?0;??--count;??notFull.signal();??return?x;??}?finally?{??lock.unlock();??}??}???}下面是這段程序在我機器上的運行結果:?這是其中一個執行結果,正好對應前面說的情況a, 5個take()先完成;這里出現了take()線程調用notEmpty.await()的情況?
Thu Mar 15 21:15:13 CST 2012 now try to call put for 10?
Thu Mar 15 21:15:13 CST 2012 put finished: 0?
Thu Mar 15 21:15:13 CST 2012 put finished: 2?
Thu Mar 15 21:15:13 CST 2012 put finished: 3?
Thu Mar 15 21:15:13 CST 2012 put finished: 1?
Thu Mar 15 21:15:13 CST 2012 main thread is going to sleep for 10 seconds?
Thu Mar 15 21:15:13 CST 2012 put finished: 4?
Thu Mar 15 21:15:13 CST 2012 put is to wait....?
Thu Mar 15 21:15:13 CST 2012 put is to wait....?
Thu Mar 15 21:15:13 CST 2012 put is to wait....?
Thu Mar 15 21:15:13 CST 2012 put is to wait....?
Thu Mar 15 21:15:13 CST 2012 put is to wait....?
Thu Mar 15 21:15:23 CST 2012 now try to take for count: 10?
Thu Mar 15 21:15:23 CST 2012 take get: 3?
Thu Mar 15 21:15:23 CST 2012 take get: 2?
Thu Mar 15 21:15:23 CST 2012 take get: 1?
Thu Mar 15 21:15:23 CST 2012 take get: 0?
Thu Mar 15 21:15:23 CST 2012 take get: 4?
Thu Mar 15 21:15:23 CST 2012 put finished: 5?
Thu Mar 15 21:15:23 CST 2012: main thread is to wait for all threads?
Thu Mar 15 21:15:23 CST 2012 take is going to wait..?
Thu Mar 15 21:15:23 CST 2012 take get: 5?
Thu Mar 15 21:15:23 CST 2012 put finished: 6?
Thu Mar 15 21:15:23 CST 2012 put finished: 8?
Thu Mar 15 21:15:23 CST 2012 put finished: 7?
Thu Mar 15 21:15:23 CST 2012 put finished: 9?
Thu Mar 15 21:15:23 CST 2012 take get: 6?
Thu Mar 15 21:15:23 CST 2012 take get: 7?
Thu Mar 15 21:15:23 CST 2012 take get: 8?
Thu Mar 15 21:15:23 CST 2012 take get: 9?
Thu Mar 15 21:15:23 CST 2012 all threads finished?
這是另一個執行結果:?
Thu Mar 15 21:02:49 CST 2012 now try to call put for 10?
Thu Mar 15 21:02:49 CST 2012 put finished: 3?
Thu Mar 15 21:02:49 CST 2012 put finished: 1?
Thu Mar 15 21:02:49 CST 2012 put finished: 0?
Thu Mar 15 21:02:49 CST 2012 put finished: 2?
Thu Mar 15 21:02:49 CST 2012 put finished: 4?
Thu Mar 15 21:02:49 CST 2012 put is to wait....?
Thu Mar 15 21:02:49 CST 2012 put is to wait....?
Thu Mar 15 21:02:49 CST 2012 put is to wait....?
Thu Mar 15 21:02:49 CST 2012 main thread is going to sleep for 10 seconds?
Thu Mar 15 21:02:49 CST 2012 put is to wait....?
Thu Mar 15 21:02:49 CST 2012 put is to wait....?
Thu Mar 15 21:02:59 CST 2012 now try to take for count: 10?
Thu Mar 15 21:02:59 CST 2012 take get: 1?
Thu Mar 15 21:02:59 CST 2012 take get: 0?
Thu Mar 15 21:02:59 CST 2012 take get: 3?
Thu Mar 15 21:02:59 CST 2012 take get: 4?
Thu Mar 15 21:02:59 CST 2012: main thread is to wait for all threads?
Thu Mar 15 21:02:59 CST 2012 take is going to wait..?
Thu Mar 15 21:02:59 CST 2012 take is going to wait..?
Thu Mar 15 21:02:59 CST 2012 put finished: 5?
Thu Mar 15 21:02:59 CST 2012 take get: 2?
Thu Mar 15 21:02:59 CST 2012 take get: 5?
Thu Mar 15 21:02:59 CST 2012 take is going to wait..?
Thu Mar 15 21:02:59 CST 2012 take is going to wait..?
Thu Mar 15 21:02:59 CST 2012 put finished: 7?
Thu Mar 15 21:02:59 CST 2012 put finished: 6?
Thu Mar 15 21:02:59 CST 2012 put finished: 8?
Thu Mar 15 21:02:59 CST 2012 put finished: 9?
Thu Mar 15 21:02:59 CST 2012 take get: 7?
Thu Mar 15 21:02:59 CST 2012 take get: 6?
Thu Mar 15 21:02:59 CST 2012 take get: 8?
Thu Mar 15 21:02:59 CST 2012 take get: 9?
Thu Mar 15 21:02:59 CST 2012 all threads finished?
執行結果2:?
Thu Mar 15 21:14:30 CST 2012 now try to call put for 10?
Thu Mar 15 21:14:30 CST 2012 main thread is going to sleep for 10 seconds?
Thu Mar 15 21:14:30 CST 2012 put finished: 8?
Thu Mar 15 21:14:30 CST 2012 put finished: 6?
Thu Mar 15 21:14:30 CST 2012 put finished: 2?
Thu Mar 15 21:14:30 CST 2012 put finished: 0?
Thu Mar 15 21:14:30 CST 2012 put finished: 4?
Thu Mar 15 21:14:30 CST 2012 put is to wait....?
Thu Mar 15 21:14:30 CST 2012 put is to wait....?
Thu Mar 15 21:14:30 CST 2012 put is to wait....?
Thu Mar 15 21:14:30 CST 2012 put is to wait....?
Thu Mar 15 21:14:30 CST 2012 put is to wait....?
Thu Mar 15 21:14:40 CST 2012 now try to take for count: 10?
Thu Mar 15 21:14:40 CST 2012 take get: 8?
Thu Mar 15 21:14:40 CST 2012 take get: 6?
Thu Mar 15 21:14:40 CST 2012 take get: 4?
Thu Mar 15 21:14:40 CST 2012 take get: 2?
Thu Mar 15 21:14:40 CST 2012: main thread is to wait for all threads?
Thu Mar 15 21:14:40 CST 2012 take get: 0?
Thu Mar 15 21:14:40 CST 2012 take is going to wait..?
Thu Mar 15 21:14:40 CST 2012 take is going to wait..?
Thu Mar 15 21:14:40 CST 2012 take is going to wait..?
Thu Mar 15 21:14:40 CST 2012 put finished: 1?
Thu Mar 15 21:14:40 CST 2012 put finished: 5?
Thu Mar 15 21:14:40 CST 2012 put finished: 3?
Thu Mar 15 21:14:40 CST 2012 put finished: 9?
Thu Mar 15 21:14:40 CST 2012 take get: 1?
Thu Mar 15 21:14:40 CST 2012 put finished: 7?
Thu Mar 15 21:14:40 CST 2012 take get: 5?
Thu Mar 15 21:14:40 CST 2012 take get: 3?
Thu Mar 15 21:14:40 CST 2012 take get: 7?
Thu Mar 15 21:14:40 CST 2012 take get: 9?
Thu Mar 15 21:14:40 CST 2012 all threads finished?
在幾次不同的執行中,始終可以觀察到任何時候,未完成的take()線程數>= 未完成的put()線程; 在未完成的線程數相等的情況下,即使jvm首先調度到了take()線程,也會進入notEmpty.await()釋放鎖,進入等待
說一說java的concurrent包11–Condition和BoundedBuffer的測試代碼2?
前面一篇說了一個Condition和BoundedBuffer的測試代碼例子,前面測試的是先put()再take()的操作,這篇說一下先take()再put()的操作;?
前面一篇說了一個Condition和BoundedBuffer的測試代碼例子,前面測試的是先put()再take()的操作,這篇說一下先take()再put()的操作;?當然,必須先要說明的是,這篇和前面這篇在打印日志的時候其實是有錯誤的,這個錯誤在前面一篇并不明顯,不會導致明顯的問題;?
但是同樣的原因導致現在這個先take()再put()的操作會出現明顯的錯誤,看上去會顯得不可思議;?
具體情況留到下一篇詳細說明,這里先上測試目的,測試代碼和運行結果;?
同時說明多線程編程需要非常謹慎,否則極易出錯?
測試目的:?
1. 我將BoundedBuffer的大小設置成5,同時在每次進入notFull和notEmpty的await()的時候打印一下表示當前線程正在等待;?
2. 先開啟10個線程做take()操作,由于開始BoundedBuffer里面沒有東西,所以10個線程全部調用await進入等待?
3. 主線程sleep10秒中,然后啟動10個線程做put()操作;?
在第一個put()完成之后,接下來應該會有部分put()線程和take()線程先后完成;?
理論上,?
a. 任何一個元素的put()都會發生在take()之前;?
b. 如果X表示某個操作成功的次數,在X(put)-X(take)<5的時候,put線程不會進入等待狀態?
下面是測試代碼:?
Java代碼??
????@Test??public?void?testTakePut()??{??final?BoundedBuffer?bb?=?new?BoundedBuffer();??int?count?=?10;??final?CountDownLatch?c?=?new?CountDownLatch(count?*?2);??System.out.println(new?Date()?+?"?first?try?to?call?take?for?count:?"?+?count);??for(int?i?=0;?i?<?count;?++i)??{??final?int?index?=?i;??Thread?t=?new?Thread(new?Runnable()?{??@Override??public?void?run()?{??try?{??Thread.currentThread().setName("?TAKE?"?+?index);??Object?o?=?bb.take();??System.out.println(new?Date()?+?"?"?+?"?take?get:?"?+?o?);??}?catch?(InterruptedException?e)?{??e.printStackTrace();??}??c.countDown();??}??});??t.start();??}??try?{??System.out.println(new?Date()?+?"?main?thread?is?going?to?sleep?for?10?seconds");??Thread.sleep(10?*?1000);??}?catch?(InterruptedException?e1)?{??e1.printStackTrace();??}??System.out.println(new?Date()?+?"?now?try?to?call?put?for?"?+?count?);??for(int?i?=?0;?i?<?count?;?++i)??{??final?int?index?=?i;??try?{??Thread?t?=?new?Thread(new?Runnable()?{??@Override??public?void?run()?{??Thread.currentThread().setName("?PUT?"?+?index);??try?{??bb.put(index);??System.out.println(new?Date()?+?"?"?+?"??put?finished:??"?+?index?);??}?catch?(InterruptedException?e)?{??e.printStackTrace();??}??c.countDown();??}??});??t.start();??}?catch?(Exception?e)?{??e.printStackTrace();??}??}??try?{??System.out.println(new?Date()?+?":?main?thread?is?to?wait?for?all?threads");??c.await();??}?catch?(InterruptedException?e)?{??e.printStackTrace();??}??System.out.println(new?Date()?+?"?all?threads?finished");??}??class?BoundedBuffer?{??final?Lock?lock?=?new?ReentrantLock();??final?Condition?notFull??=?lock.newCondition();???final?Condition?notEmpty?=?lock.newCondition();???final?Object[]?items?=?new?Object[5];??int?putptr,?takeptr,?count;??public?void?put(Object?x)?throws?InterruptedException?{??lock.lock();??try?{??while?(count?==?items.length)???{??System.out.println(new?Date()?+?"?"?+?Thread.currentThread().getName()?+?"?put??is?to?wait....:?"?+?System.currentTimeMillis());????notFull.await();??}??items[putptr]?=?x;???if?(++putptr?==?items.length)?putptr?=?0;??++count;??notEmpty.signal();??}?finally?{??lock.unlock();??}??}??public?Object?take()?throws?InterruptedException?{??lock.lock();??try?{??while?(count?==?0)??{??System.out.println(new?Date()?+?"?"?+?Thread.currentThread().getName()?+?"?take?is?going?to?wait..?"?+?System.currentTimeMillis());????notEmpty.await();??}??Object?x?=?items[takeptr];???if?(++takeptr?==?items.length)?takeptr?=?0;??--count;??notFull.signal();??return?x;??}?finally?{??lock.unlock();??}??}???}運行結果1:?Fri Mar 16 20:50:10 CST 2012 first try to call take for count: 10?
Fri Mar 16 20:50:10 CST 2012 TAKE 0 take is going to wait..?
Fri Mar 16 20:50:10 CST 2012 TAKE 1 take is going to wait..?
Fri Mar 16 20:50:10 CST 2012 TAKE 2 take is going to wait..?
Fri Mar 16 20:50:10 CST 2012 TAKE 3 take is going to wait..?
Fri Mar 16 20:50:10 CST 2012 TAKE 5 take is going to wait..?
Fri Mar 16 20:50:10 CST 2012 main thread is going to sleep for 10 seconds?
Fri Mar 16 20:50:10 CST 2012 TAKE 4 take is going to wait..?
Fri Mar 16 20:50:10 CST 2012 TAKE 7 take is going to wait..?
Fri Mar 16 20:50:10 CST 2012 TAKE 6 take is going to wait..?
Fri Mar 16 20:50:10 CST 2012 TAKE 9 take is going to wait..?
Fri Mar 16 20:50:10 CST 2012 TAKE 8 take is going to wait..?
Fri Mar 16 20:50:20 CST 2012 now try to call put for 10?
Fri Mar 16 20:50:20 CST 2012: main thread is to wait for all threads?
Fri Mar 16 20:50:20 CST 2012 PUT 7 put finished: 7?
Fri Mar 16 20:50:20 CST 2012 PUT 9 put finished: 9?
Fri Mar 16 20:50:20 CST 2012 PUT 8 put finished: 8?
Fri Mar 16 20:50:20 CST 2012 PUT 3 put is to wait....?
Fri Mar 16 20:50:20 CST 2012 PUT 1 put is to wait....?
Fri Mar 16 20:50:20 CST 2012 PUT 5 put finished: 5?
Fri Mar 16 20:50:20 CST 2012 PUT 4 put is to wait....?
Fri Mar 16 20:50:20 CST 2012 TAKE 0 take get: 8?
Fri Mar 16 20:50:20 CST 2012 TAKE 2 take get: 9?
Fri Mar 16 20:50:20 CST 2012 TAKE 3 take get: 0?
Fri Mar 16 20:50:20 CST 2012 TAKE 5 take get: 6?
Fri Mar 16 20:50:20 CST 2012 TAKE 4 take get: 5?
Fri Mar 16 20:50:20 CST 2012 PUT 2 put finished: 2?
Fri Mar 16 20:50:20 CST 2012 PUT 3 put finished: 3?
Fri Mar 16 20:50:20 CST 2012 PUT 1 put finished: 1?
Fri Mar 16 20:50:20 CST 2012 TAKE 7 take get: 2?
Fri Mar 16 20:50:20 CST 2012 TAKE 6 take get: 3?
Fri Mar 16 20:50:20 CST 2012 TAKE 9 take get: 1?
Fri Mar 16 20:50:20 CST 2012 TAKE 8 take get: 4?
Fri Mar 16 20:50:20 CST 2012 PUT 6 put finished: 6?
Fri Mar 16 20:50:20 CST 2012 PUT 0 put finished: 0?
Fri Mar 16 20:50:20 CST 2012 PUT 4 put finished: 4?
Fri Mar 16 20:50:20 CST 2012 TAKE 1 take get: 7?
Fri Mar 16 20:50:20 CST 2012 all threads finished?
注意到紅色部分:?
第一個加為紅色是因為按照打印結果,put()只完成了3次,就開始有put()進入等待了,而BoundedBuffer的大小是5,理論上應該沒有滿的!?
第二個加為紅色是因為元素4竟然先被take,然后再被put!?顯然程序有地方出錯了!具體原因分析,歡迎關注核桃博客:)
轉載于:https://my.oschina.net/u/1185331/blog/502350
總結
以上是生活随笔為你收集整理的java concurrent包介绍及使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 孕妇梦到被猫咬了手是怎么回事
- 下一篇: 新RSS reader