http://janeky.iteye.com/category/124727
java多線程學習-java.util.concurrent詳解(一) Latch/Barrier 多線程Java編程threadJDK
Java1.5提供了一個非常高效實用的多線程包:java.util.concurrent, 提供了大量高級工具,可以幫助開發者編寫高效、易維護、結構清晰的Java多線程程序。從這篇blog起,我將跟大家一起共同學習這些新的Java多線程構件
1. CountDownLatch
??? 我們先來學習一下JDK1.5 API中關于這個類的詳細介紹:
“一個同步輔助類,在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待。 用給定的計數 初始化 CountDownLatch。由于調用了 countDown() 方法,所以在當前計數到達零之前,await 方法會一直受阻塞。之后,會釋放所有等待的線程,await 的所有后續調用都將立即返回。這種現象只出現一次——計數無法被重置。如果需要重置計數,請考慮使用 CyclicBarrier?!?
??? 這就是說,CountDownLatch可以用來管理一組相關的線程執行,只需在主線程中調用CountDownLatch 的await方法(一直阻塞),讓各個線程調用countDown方法。當所有的線程都只需完countDown了,await也順利返回,不再阻塞了。在這樣情況下尤其適用:將一個任務分成若干線程執行,等到所有線程執行完,再進行匯總處理。
??? 下面我舉一個非常簡單的例子。
假設我們要打印1-100,最后再輸出“Ok“。1-100的打印順序不要求統一,只需保證“Ok“是在最后出現即可。 ??? 解決方案:我們定義一個CountDownLatch,然后開10個線程分別打印(n-1)*10+1至(n-1)*10+10。主線程中調用await方法等待所有線程的執行完畢,每個線程執行完畢后都調用countDown方法。最后再await返回后打印“Ok”。
具體代碼如下(本代碼參考了JDK示例代碼):
Java代碼 ?
import ?java.util.concurrent.CountDownLatch; ??? ? ? ? ?? public ?class ?TestCountDownLatch?{ ??????private ?static ?final ?int ?N?=?10 ; ?? ?? ????public ?static ?void ?main(String[]?args)?throws ?InterruptedException?{ ?? ????????CountDownLatch?doneSignal?=?new ?CountDownLatch(N); ?? ????????CountDownLatch?startSignal?=?new ?CountDownLatch(1 );?? ?? ????????for ?(int ?i?=?1 ;?i?<=?N;?i++)?{ ?? ????????????new ?Thread(new ?Worker(i,?doneSignal,?startSignal)).start();?? ????????} ?? ????????System.out.println("begin------------" ); ?? ????????startSignal.countDown(); ?? ????????doneSignal.await(); ?? ????????System.out.println("Ok" ); ?? ?? ????} ?? ?? ????static ?class ?Worker?implements ?Runnable?{ ?? ????????private ?final ?CountDownLatch?doneSignal; ?? ????????private ?final ?CountDownLatch?startSignal; ?? ????????private ?int ?beginIndex; ?? ?? ????????Worker(int ?beginIndex,?CountDownLatch?doneSignal, ?? ????????????????CountDownLatch?startSignal)?{ ?? ????????????this .startSignal?=?startSignal; ?? ????????????this .beginIndex?=?beginIndex; ?? ????????????this .doneSignal?=?doneSignal; ?? ????????} ?? ?? ????????public ?void ?run()?{ ?? ????????????try ?{ ?? ????????????????startSignal.await();??? ????????????????beginIndex?=?(beginIndex?-?1 )?*?10 ?+?1 ; ?? ????????????????for ?(int ?i?=?beginIndex;?i?<=?beginIndex?+?10 ;?i++)?{ ?? ????????????????????System.out.println(i); ?? ????????????????} ?? ????????????}?catch ?(InterruptedException?e)?{ ?? ????????????????e.printStackTrace(); ?? ????????????}?finally ?{ ?? ????????????????doneSignal.countDown(); ?? ????????????} ?? ????????} ?? ????} ?? }?? import java.util.concurrent.CountDownLatch;
/*** 示例:CountDownLatch的使用舉例* Mail: ken@iamcoding.com* @author janeky*/
public class TestCountDownLatch {private static final int N = 10;public static void main(String[] args) throws InterruptedException {CountDownLatch doneSignal = new CountDownLatch(N);CountDownLatch startSignal = new CountDownLatch(1);//開始執行信號for (int i = 1; i <= N; i++) {new Thread(new Worker(i, doneSignal, startSignal)).start();//線程啟動了}System.out.println("begin------------");startSignal.countDown();//開始執行啦doneSignal.await();//等待所有的線程執行完畢System.out.println("Ok");}static class Worker implements Runnable {private final CountDownLatch doneSignal;private final CountDownLatch startSignal;private int beginIndex;Worker(int beginIndex, CountDownLatch doneSignal,CountDownLatch startSignal) {this.startSignal = startSignal;this.beginIndex = beginIndex;this.doneSignal = doneSignal;}public void run() {try {startSignal.await(); //等待開始執行信號的發布beginIndex = (beginIndex - 1) * 10 + 1;for (int i = beginIndex; i <= beginIndex + 10; i++) {System.out.println(i);}} catch (InterruptedException e) {e.printStackTrace();} finally {doneSignal.countDown();}}}
}
??? 總結:CounDownLatch對于管理一組相關線程非常有用。上述示例代碼中就形象地描述了兩種使用情況。第一種是計算器為1,代表了兩種狀態,開關。第二種是計數器為N,代表等待N個操作完成。今后我們在編寫多線程程序時,可以使用這個構件來管理一組獨立線程的執行。
2. CyclicBarrier
??? 我們先來學習一下JDK1.5 API中關于這個類的詳細介紹:
??? “一個同步輔助類,它允許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)。在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier 很有用。因為該 barrier 在釋放等待線程后可以重用,所以稱它為循環 的 barrier。
??? CyclicBarrier 支持一個可選的 Runnable 命令,在一組線程中的最后一個線程到達之后(但在釋放所有線程之前),該命令只在每個屏障點運行一次。若在繼續所有參與線程之前更新共享狀態,此屏障操作 很有用。
??? 我們在學習CountDownLatch的時候就提到了CyclicBarrier。兩者究竟有什么聯系呢?引用[JCIP]中的描述“The key difference is that with a barrier, all the threads must come together at a barrier point at the same time in order to proceed. Latches are for waiting for events; barriers are for waiting for other threads。CyclicBarrier等待所有的線程一起完成后再執行某個動作。這個功能CountDownLatch也同樣可以實現。但是CountDownLatch更多時候是在等待某個事件的發生。在CyclicBarrier中,所有的線程調用await方法,等待其他線程都執行完。
??? 舉一個很簡單的例子,
今天晚上我們哥們4個去Happy。就互相通知了一下:晚上八點準時到xx酒吧門前集合,不見不散!。有個哥們住的近,早早就到了。有的事務繁忙,剛好踩點到了。無論怎樣,先來的都不能獨自行動,只能等待所有人 代碼如下(參考了網上給的一些教程)
Java代碼 ?
import ?java.util.Random; ??import ?java.util.concurrent.BrokenBarrierException; ??import ?java.util.concurrent.CyclicBarrier; ??import ?java.util.concurrent.ExecutorService; ??import ?java.util.concurrent.Executors; ???? public ?class ?TestCyclicBarrier?{ ???? ????public ?static ?void ?main(String[]?args)?{ ?? ???? ?? ????????ExecutorService?exec?=?Executors.newCachedThreadPool();????? ?? ????????final ?Random?random=new ?Random(); ?? ???????? ?? ????????final ?CyclicBarrier?barrier=new ?CyclicBarrier(4 ,new ?Runnable(){ ?? ????????????@Override ?? ????????????public ?void ?run()?{ ?? ????????????????System.out.println("大家都到齊了,開始happy去" ); ?? ????????????}}); ?? ???????? ?? ????????for (int ?i=0 ;i<4 ;i++){ ?? ????????????exec.execute(new ?Runnable(){ ?? ????????????????@Override ?? ????????????????public ?void ?run()?{ ?? ????????????????????try ?{ ?? ????????????????????????Thread.sleep(random.nextInt(1000 )); ?? ????????????????????}?catch ?(InterruptedException?e)?{ ?? ????????????????????????e.printStackTrace(); ?? ????????????????????} ?? ????????????????????System.out.println(Thread.currentThread().getName()+"到了,其他哥們呢" ); ?? ????????????????????try ?{ ?? ????????????????????????barrier.await();?? ????????????????????}?catch ?(InterruptedException?e)?{ ?? ????????????????????????e.printStackTrace(); ?? ????????????????????}?catch ?(BrokenBarrierException?e)?{ ?? ????????????????????????e.printStackTrace(); ?? ????????????????????} ?? ????????????????}}); ?? ????????} ?? ????????exec.shutdown(); ?? ????} ?? ?? }?? import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class TestCyclicBarrier {public static void main(String[] args) {ExecutorService exec = Executors.newCachedThreadPool(); final Random random=new Random();final CyclicBarrier barrier=new CyclicBarrier(4,new Runnable(){@Overridepublic void run() {System.out.println("大家都到齊了,開始happy去");}});for(int i=0;i<4;i++){exec.execute(new Runnable(){@Overridepublic void run() {try {Thread.sleep(random.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName()+"到了,其他哥們呢");try {barrier.await();//等待其他哥們} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}});}exec.shutdown();}}
??? 關于await方法要特別注意一下,它有可能在阻塞的過程中由于某些原因被中斷
??? 總結:CyclicBarrier就是一個柵欄,等待所有線程到達后再執行相關的操作。barrier 在釋放等待線程后可以重用。
更多的Java編程資料,歡迎訪問我的blog:http://janeky.iteye.com,希望能夠與你有更多的交流
未完待續
java多線程學習-java.util.concurrent詳解(二)Semaphore/FutureTask/Exchanger Java多線程Exchangethread算法
前一篇文章 http://janeky.iteye.com/category/124727
我們學習了java.util.concurrent的CountDownLatch和CyclicBarrier
今天我們繼續共同來探討其他的多線程組件
-----------------------------------------------------------------------------
3. Semaphore
??? 我們先來學習一下JDK1.5 API中關于這個類的詳細介紹:
“一個計數信號量。從概念上講,信號量維護了一個許可集。如有必要,在許可可用前會阻塞每一個 acquire(),然后再獲取該許可。每個 release() 添加一個許可,從而可能釋放一個正在阻塞的獲取者。但是,不使用實際的許可對象,Semaphore 只對可用許可的號碼進行計數,并采取相應的行動?!?
??? 我們一般用它來控制某個對象的線程訪問對象
??? 例如
,對于某個容器,我們規定,最多只能容納n個線程同時操作 使用信號量來模擬實現 具體代碼如下(參考 [JCIP])
Java代碼 ?
import ?java.util.Collections; ??import ?java.util.HashSet; ??import ?java.util.Set; ??import ?java.util.concurrent.ExecutorService; ??import ?java.util.concurrent.Executors; ??import ?java.util.concurrent.Semaphore; ???? public ?class ?TestSemaphore?{ ???? ????public ?static ?void ?main(String[]?args)?{ ?? ????????ExecutorService?exec?=?Executors.newCachedThreadPool(); ?? ????????TestSemaphore?t?=?new ?TestSemaphore(); ?? ????????final ?BoundedHashSet<String>?set?=?t.getSet(); ?? ?? ????????for ?(int ?i?=?0 ;?i?<?3 ;?i++)?{?? ????????????exec.execute(new ?Runnable()?{ ?? ????????????????public ?void ?run()?{ ?? ????????????????????try ?{ ?? ????????????????????????set.add(Thread.currentThread().getName()); ?? ????????????????????}?catch ?(InterruptedException?e)?{ ?? ????????????????????????e.printStackTrace(); ?? ????????????????????} ?? ????????????????} ?? ????????????}); ?? ????????} ?? ?? ????????for ?(int ?j?=?0 ;?j?<?3 ;?j++)?{?? ????????????exec.execute(new ?Runnable()?{ ?? ????????????????public ?void ?run()?{ ?? ????????????????????set.remove(Thread.currentThread().getName()); ?? ????????????????} ?? ????????????}); ?? ????????} ?? ????????exec.shutdown(); ?? ????} ?? ?? ????public ?BoundedHashSet<String>?getSet()?{ ?? ????????return ?new ?BoundedHashSet<String>(2 );?? ????} ?? ?? ????class ?BoundedHashSet<T>?{ ?? ????????private ?final ?Set<T>?set; ?? ????????private ?final ?Semaphore?semaphore; ?? ?? ????????public ?BoundedHashSet(int ?bound)?{ ?? ????????????this .set?=?Collections.synchronizedSet(new ?HashSet<T>()); ?? ????????????this .semaphore?=?new ?Semaphore(bound,?true ); ?? ????????} ?? ?? ????????public ?void ?add(T?o)?throws ?InterruptedException?{ ?? ????????????semaphore.acquire();?? ????????????set.add(o); ?? ????????????System.out.printf("add:%s%n" ,o); ?? ????????} ?? ?? ????????public ?void ?remove(T?o)?{ ?? ????????????if ?(set.remove(o)) ?? ????????????????semaphore.release();?? ????????????System.out.printf("remove:%s%n" ,o); ?? ????????} ?? ????} ?? }?? import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;public class TestSemaphore {public static void main(String[] args) {ExecutorService exec = Executors.newCachedThreadPool();TestSemaphore t = new TestSemaphore();final BoundedHashSet<String> set = t.getSet();for (int i = 0; i < 3; i++) {//三個線程同時操作addexec.execute(new Runnable() {public void run() {try {set.add(Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}}});}for (int j = 0; j < 3; j++) {//三個線程同時操作removeexec.execute(new Runnable() {public void run() {set.remove(Thread.currentThread().getName());}});}exec.shutdown();}public BoundedHashSet<String> getSet() {return new BoundedHashSet<String>(2);//定義一個邊界約束為2的線程}class BoundedHashSet<T> {private final Set<T> set;private final Semaphore semaphore;public BoundedHashSet(int bound) {this.set = Collections.synchronizedSet(new HashSet<T>());this.semaphore = new Semaphore(bound, true);}public void add(T o) throws InterruptedException {semaphore.acquire();//信號量控制可訪問的線程數目set.add(o);System.out.printf("add:%s%n",o);}public void remove(T o) {if (set.remove(o))semaphore.release();//釋放掉信號量System.out.printf("remove:%s%n",o);}}
}
??? 總結:Semaphore通常用于對象池的控制
4.FutureTask
??? 我們先來學習一下JDK1.5 API中關于這個類的詳細介紹:
??? “取消的異步計算。利用開始和取消計算的方法、查詢計算是否完成的方法和獲取計算結果的方法,此類提供了對 Future 的基本實現。僅在計算完成時才能獲取結果;如果計算尚未完成,則阻塞 get 方法。一旦計算完成,就不能再重新開始或取消計算。
可使用 FutureTask 包裝 Callable 或 Runnable 對象。因為 FutureTask 實現了 Runnable,所以可將 FutureTask 提交給 Executor 執行。
除了作為一個獨立的類外,此類還提供了 protected 功能,這在創建自定義任務類時可能很有用。 “
??? 應用舉例:
我們的算法中有一個很耗時的操作,在編程的是,我們希望將它獨立成一個模塊,調用的時候當做它是立刻返回的,并且可以隨時取消的 具體代碼如下(參考 [JCIP])
Java代碼 ?
import ?java.util.concurrent.Callable; ??import ?java.util.concurrent.ExecutionException; ??import ?java.util.concurrent.ExecutorService; ??import ?java.util.concurrent.Executors; ??import ?java.util.concurrent.FutureTask; ???? public ?class ?TestFutureTask?{ ???? ????public ?static ?void ?main(String[]?args)?{ ?? ????????ExecutorService?exec=Executors.newCachedThreadPool(); ?? ???????? ?? ????????FutureTask<String>?task=new ?FutureTask<String>(new ?Callable<String>(){?? ????????????@Override ?? ????????????public ?String?call()?throws ?Exception?{ ?? ????????????????return ?Thread.currentThread().getName();?? ????????????}}); ?? ???????????? ?? ????????????try ?{ ?? ????????????????exec.execute(task);?? ????????????????String?result=task.get();?? ????????????????System.out.printf("get:%s%n" ,result); ?? ????????????}?catch ?(InterruptedException?e)?{ ?? ????????????????e.printStackTrace(); ?? ????????????}?catch ?(ExecutionException?e)?{ ?? ????????????????e.printStackTrace(); ?? ????????????} ?? ????} ?? ?? }?? import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;public class TestFutureTask {public static void main(String[] args) {ExecutorService exec=Executors.newCachedThreadPool();FutureTask<String> task=new FutureTask<String>(new Callable<String>(){//FutrueTask的構造參數是一個Callable接口@Overridepublic String call() throws Exception {return Thread.currentThread().getName();//這里可以是一個異步操作}});try {exec.execute(task);//FutureTask實際上也是一個線程String result=task.get();//取得異步計算的結果,如果沒有返回,就會一直阻塞等待System.out.printf("get:%s%n",result);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}}
??? 總結:FutureTask其實就是新建了一個線程單獨執行,使得線程有一個返回值,方便程序的編寫
5. Exchanger
??? 我們先來學習一下JDK1.5 API中關于這個類的詳細介紹:
??? “可以在pair中對元素進行配對和交換的線程的同步點。每個線程將條目上的某個方法呈現給 exchange 方法,與伙伴線程進行匹配,并且在返回時接收其伙伴的對象。Exchanger 可能被視為 SynchronousQueue 的雙向形式。Exchanger 可能在應用程序(比如遺傳算法和管道設計)中很有用。 “
??? 應用舉例:
有兩個緩存區,兩個線程分別向兩個緩存區fill和take,當且僅當一個滿了,兩個緩存區交換 ??? 代碼如下(參考了網上給的示例?? http://hi.baidu.com/webidea/blog/item/2995e731e53ad5a55fdf0e7d.html)
Java代碼 ?
import ?java.util.ArrayList; ??import ?java.util.concurrent.Exchanger; ???? public ?class ?TestExchanger?{ ???? ????public ?static ?void ?main(String[]?args)?{ ?? ????????final ?Exchanger<ArrayList<Integer>>?exchanger?=?new ?Exchanger<ArrayList<Integer>>(); ?? ????????final ?ArrayList<Integer>?buff1?=?new ?ArrayList<Integer>(10 ); ?? ????????final ?ArrayList<Integer>?buff2?=?new ?ArrayList<Integer>(10 ); ?? ?? ????????new ?Thread(new ?Runnable()?{ ?? ????????????@Override ?? ????????????public ?void ?run()?{ ?? ????????????????ArrayList<Integer>?buff?=?buff1; ?? ????????????????try ?{ ?? ????????????????????while ?(true )?{ ?? ????????????????????????if ?(buff.size()?>=?10 )?{ ?? ????????????????????????????buff?=?exchanger.exchange(buff);?? ????????????????????????????System.out.println("exchange?buff1" ); ?? ????????????????????????????buff.clear(); ?? ????????????????????????} ?? ????????????????????????buff.add((int )(Math.random()*100 )); ?? ????????????????????????Thread.sleep((long )(Math.random()*1000 )); ?? ????????????????????} ?? ????????????????}?catch ?(InterruptedException?e)?{ ?? ????????????????????e.printStackTrace(); ?? ????????????????} ?? ????????????} ?? ????????}).start(); ?? ???????? ?? ????????new ?Thread(new ?Runnable(){ ?? ????????????@Override ?? ????????????public ?void ?run()?{ ?? ????????????????ArrayList<Integer>?buff=buff2; ?? ????????????????while (true ){ ?? ????????????????????try ?{ ?? ????????????????????????for (Integer?i:buff){ ?? ????????????????????????????System.out.println(i); ?? ????????????????????????} ?? ????????????????????????Thread.sleep(1000 ); ?? ????????????????????????buff=exchanger.exchange(buff);?? ????????????????????????System.out.println("exchange?buff2" ); ?? ????????????????????}?catch ?(InterruptedException?e)?{ ?? ????????????????????????e.printStackTrace(); ?? ????????????????????} ?? ????????????????} ?? ????????????}}).start(); ?? ????} ?? }?? import java.util.ArrayList;
import java.util.concurrent.Exchanger;public class TestExchanger {public static void main(String[] args) {final Exchanger<ArrayList<Integer>> exchanger = new Exchanger<ArrayList<Integer>>();final ArrayList<Integer> buff1 = new ArrayList<Integer>(10);final ArrayList<Integer> buff2 = new ArrayList<Integer>(10);new Thread(new Runnable() {@Overridepublic void run() {ArrayList<Integer> buff = buff1;try {while (true) {if (buff.size() >= 10) {buff = exchanger.exchange(buff);//開始跟另外一個線程交互數據System.out.println("exchange buff1");buff.clear();}buff.add((int)(Math.random()*100));Thread.sleep((long)(Math.random()*1000));}} catch (InterruptedException e) {e.printStackTrace();}}}).start();new Thread(new Runnable(){@Overridepublic void run() {ArrayList<Integer> buff=buff2;while(true){try {for(Integer i:buff){System.out.println(i);}Thread.sleep(1000);buff=exchanger.exchange(buff);//開始跟另外一個線程交換數據System.out.println("exchange buff2");} catch (InterruptedException e) {e.printStackTrace();}}}}).start();}
}
??? 總結:Exchanger在特定的使用場景比較有用(兩個伙伴線程之間的數據交互)
----------------------------------------------------------------------------------
更多的java多線程資料,歡迎訪問 http://janeky.iteye.com/category/124727
java多線程學習-java.util.concurrent詳解(三)ScheduledThreadPoolExecutor Java多線程Blog
前一篇blog http://janeky.iteye.com/category/124727我們學習了java多線程的信號量/FutureTask
----------------------------------------------------------------------------------
6. ScheduledThreadPoolExecutor
??? 我們先來學習一下JDK1.5 API中關于這個類的詳細介紹:
??? "可另行安排在給定的延遲后運行命令,或者定期執行命令。需要多個輔助線程時,或者要求 ThreadPoolExecutor 具有額外的靈活性或功能時,此類要優于 Timer。
??? 一旦啟用已延遲的任務就執行它,但是有關何時啟用,啟用后何時執行則沒有任何實時保證。按照提交的先進先出 (FIFO) 順序來啟用那些被安排在同一執行時間的任務。
??? 雖然此類繼承自 ThreadPoolExecutor,但是幾個繼承的調整方法對此類并無作用。特別是,因為它作為一個使用 corePoolSize 線程和一個無界隊列的固定大小的池,所以調整 maximumPoolSize 沒有什么效果。"
??? 在JDK1.5之前,我們關于定時/周期操作都是通過Timer來實現的。但是Timer有以下幾種危險[JCIP]
a. Timer是基于絕對時間的。容易受系統時鐘的影響。
b. Timer只新建了一個線程來執行所有的TimeTask。所有TimeTask可能會相關影響
c. Timer不會捕獲TimerTask的異常,只是簡單地停止。這樣勢必會影響其他TimeTask的執行。
??? 如果你是使用JDK1.5以上版本,建議用ScheduledThreadPoolExecutor代替Timer。它基本上解決了上述問題。它采用相對時間,用線程池來執行TimerTask,會出來TimerTask異常。
??? 下面通過一個簡單的實例來闡述ScheduledThreadPoolExecutor的使用。
??
?
? 我們定期讓定時器拋異常 ??? 我們定期從控制臺打印系統時間 代碼如下(參考了網上的一些代碼,在此表示感謝)
Java代碼 ?
import ?java.util.concurrent.ScheduledThreadPoolExecutor; ??import ?java.util.concurrent.TimeUnit; ???? ?? public ?class ?TestScheduledThreadPoolExecutor?{ ?????? ?? ????public ?static ?void ?main(String[]?args)?{ ?? ????????ScheduledThreadPoolExecutor?exec=new ?ScheduledThreadPoolExecutor(1 ); ?? ???????? ?? ????????exec.scheduleAtFixedRate(new ?Runnable(){?? ????????????@Override ?? ????????????public ?void ?run()?{ ?? ????????????????throw ?new ?RuntimeException(); ?? ????????????}},?1000 ,?5000 ,?TimeUnit.MILLISECONDS); ?? ???????? ?? ????????exec.scheduleAtFixedRate(new ?Runnable(){?? ????????????@Override ?? ????????????public ?void ?run()?{ ?? ????????????????System.out.println(System.nanoTime()); ?? ????????????}},?1000 ,?2000 ,?TimeUnit.MILLISECONDS); ?? ????} ?? ?? }?? import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class TestScheduledThreadPoolExecutor {public static void main(String[] args) {ScheduledThreadPoolExecutor exec=new ScheduledThreadPoolExecutor(1);exec.scheduleAtFixedRate(new Runnable(){//每隔一段時間就觸發異常@Overridepublic void run() {throw new RuntimeException();}}, 1000, 5000, TimeUnit.MILLISECONDS);exec.scheduleAtFixedRate(new Runnable(){//每隔一段時間打印系統時間,證明兩者是互不影響的@Overridepublic void run() {System.out.println(System.nanoTime());}}, 1000, 2000, TimeUnit.MILLISECONDS);}}
總結:是時候把你的定時器換成 ScheduledThreadPoolExecutor了
--------------------------------------------------------------------
更多的java多線程資料,歡迎訪問 http://janeky.iteye.com/category/124727
?
java多線程學習-java.util.concurrent詳解(四) BlockingQueue Java多線程thread生活數據結構
前面一篇blog http://janeky.iteye.com/category/124727
我們主要探討了ScheduledThreadPoolExecutor定時器的使用
---------------------------------------------------------------------------------
7.BlockingQueue
??? “支持兩個附加操作的 Queue,這兩個操作是:獲取元素時等待隊列變為非空,以及存儲元素時等待空間變得可用?!?
??? 這里我們主要討論BlockingQueue的最典型實現:LinkedBlockingQueue 和ArrayBlockingQueue。兩者的不同是底層的數據結構不夠,一個是鏈表,另外一個是數組。
???
??? 后面將要單獨解釋其他類型的BlockingQueue和SynchronousQueue
???
BlockingQueue的經典用途是 生產者-消費者模式 ??? 代碼如下:
Java代碼 ?
import ?java.util.Random; ??import ?java.util.concurrent.BlockingQueue; ??import ?java.util.concurrent.LinkedBlockingQueue; ???? public ?class ?TestBlockingQueue?{ ???? ????public ?static ?void ?main(String[]?args)?{ ?? ????????final ?BlockingQueue<Integer>?queue=new ?LinkedBlockingQueue<Integer>(3 ); ?? ????????final ?Random?random=new ?Random(); ?? ???????? ?? ????????class ?Producer?implements ?Runnable{ ?? ????????????@Override ?? ????????????public ?void ?run()?{ ?? ????????????????while (true ){ ?? ????????????????????try ?{ ?? ????????????????????????int ?i=random.nextInt(100 ); ?? ????????????????????????queue.put(i);?? ????????????????????????if (queue.size()==3 ) ?? ????????????????????????{ ?? ????????????????????????????System.out.println("full" ); ?? ????????????????????????} ?? ????????????????????}?catch ?(InterruptedException?e)?{ ?? ????????????????????????e.printStackTrace(); ?? ????????????????????} ?? ????????????????} ?? ????????????} ?? ????????} ?? ???????? ?? ????????class ?Consumer?implements ?Runnable{ ?? ????????????@Override ?? ????????????public ?void ?run()?{ ?? ????????????????while (true ){ ?? ????????????????????try ?{ ?? ????????????????????????queue.take();?? ????????????????????????Thread.sleep(1000 ); ?? ????????????????????}?catch ?(InterruptedException?e)?{ ?? ????????????????????????e.printStackTrace(); ?? ????????????????????} ?? ????????????????} ?? ????????????} ?? ????????} ?? ???????? ?? ????????new ?Thread(new ?Producer()).start(); ?? ????????new ?Thread(new ?Consumer()).start(); ?? ????} ?? ?? }?? import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;public class TestBlockingQueue {public static void main(String[] args) {final BlockingQueue<Integer> queue=new LinkedBlockingQueue<Integer>(3);final Random random=new Random();class Producer implements Runnable{@Overridepublic void run() {while(true){try {int i=random.nextInt(100);queue.put(i);//當隊列達到容量時候,會自動阻塞的if(queue.size()==3){System.out.println("full");}} catch (InterruptedException e) {e.printStackTrace();}}}}class Consumer implements Runnable{@Overridepublic void run() {while(true){try {queue.take();//當隊列為空時,也會自動阻塞Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}}new Thread(new Producer()).start();new Thread(new Consumer()).start();}}
??? 總結:BlockingQueue使用時候特別注意take 和 put
8. DelayQueue
我們先來學習一下JDK1.5 API中關于這個類的詳細介紹:
??? “它是包含Delayed 元素的一個無界阻塞隊列,只有在延遲期滿時才能從中提取元素。該隊列的頭部 是延遲期滿后保存時間最長的 Delayed 元素。如果延遲都還沒有期滿,則隊列沒有頭部,并且 poll 將返回 null。當一個元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一個小于等于 0 的值時,將發生到期。即使無法使用 take 或 poll 移除未到期的元素,也不會將這些元素作為正常元素對待。例如,size 方法同時返回到期和未到期元素的計數。此隊列不允許使用 null 元素?!?
??? 在現實生活中,很多DelayQueue的例子。就拿上海的SB會來說明,很多國家地區的開館時間不同。你很早就來到園區,然后急急忙忙地跑到一些心儀的館區,發現有些還沒開,你吃了閉門羹。
??? 仔細研究DelayQueue,你會發現它其實就是一個PriorityQueue的封裝(按照delay時間排序),里面的元素都實現了Delayed接口,相關操作需要判斷延時時間是否到了。
??? 在實際應用中,有人拿它來管理跟實際相關的緩存、session等
??
下面我就通過 “上海SB會的例子來闡述DelayQueue的用法” 代碼如下:
Java代碼 ?
import ?java.util.Random; ??import ?java.util.concurrent.DelayQueue; ??import ?java.util.concurrent.Delayed; ??import ?java.util.concurrent.TimeUnit; ???? public ?class ?TestDelayQueue?{ ???? ????private ?class ?Stadium?implements ?Delayed ?? ????{ ?? ????????long ?trigger; ?? ???????? ?? ????????public ?Stadium(long ?i){ ?? ????????????trigger=System.currentTimeMillis()+i; ?? ????????} ?? ???????? ?? ????????@Override ?? ????????public ?long ?getDelay(TimeUnit?arg0)?{ ?? ????????????long ?n=trigger-System.currentTimeMillis(); ?? ????????????return ?n; ?? ????????} ?? ?? ????????@Override ?? ????????public ?int ?compareTo(Delayed?arg0)?{ ?? ????????????return ?(int )(this .getDelay(TimeUnit.MILLISECONDS)-arg0.getDelay(TimeUnit.MILLISECONDS)); ?? ????????} ?? ???????? ?? ????????public ?long ?getTriggerTime(){ ?? ????????????return ?trigger; ?? ????????} ?? ???????? ?? ????} ?? ????public ?static ?void ?main(String[]?args)throws ?Exception?{ ?? ????????Random?random=new ?Random(); ?? ????????DelayQueue<Stadium>?queue=new ?DelayQueue<Stadium>(); ?? ????????TestDelayQueue?t=new ?TestDelayQueue(); ?? ???????? ?? ????????for (int ?i=0 ;i<5 ;i++){ ?? ????????????queue.add(t.new ?Stadium(random.nextInt(30000 ))); ?? ????????} ?? ????????Thread.sleep(2000 ); ?? ???????? ?? ????????while (true ){ ?? ????????????Stadium?s=queue.take();?? ????????????if (s!=null ){ ?? ????????????????System.out.println(System.currentTimeMillis()-s.getTriggerTime());?? ????????????} ?? ????????????if (queue.size()==0 ) ?? ????????????????break ; ?? ????????} ?? ????} ?? }?? import java.util.Random;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;public class TestDelayQueue {private class Stadium implements Delayed{long trigger;public Stadium(long i){trigger=System.currentTimeMillis()+i;}@Overridepublic long getDelay(TimeUnit arg0) {long n=trigger-System.currentTimeMillis();return n;}@Overridepublic int compareTo(Delayed arg0) {return (int)(this.getDelay(TimeUnit.MILLISECONDS)-arg0.getDelay(TimeUnit.MILLISECONDS));}public long getTriggerTime(){return trigger;}}public static void main(String[] args)throws Exception {Random random=new Random();DelayQueue<Stadium> queue=new DelayQueue<Stadium>();TestDelayQueue t=new TestDelayQueue();for(int i=0;i<5;i++){queue.add(t.new Stadium(random.nextInt(30000)));}Thread.sleep(2000);while(true){Stadium s=queue.take();//延時時間未到就一直等待if(s!=null){System.out.println(System.currentTimeMillis()-s.getTriggerTime());//基本上是等于0}if(queue.size()==0)break;}}
}
??? 總結:適用于需要延時操作的隊列管理
9. SynchronousQueue
??? 我們先來學習一下JDK1.5 API中關于這個類的詳細介紹:
??? “一種阻塞隊列,其中每個插入操作必須等待另一個線程的對應移除操作 ,反之亦然。同步隊列沒有任何內部容量,甚至連一個隊列的容量都沒有。不能在同步隊列上進行 peek,因為僅在試圖要移除元素時,該元素才存在;除非另一個線程試圖移除某個元素,否則也不能(使用任何方法)插入元素;也不能迭代隊列,因為其中沒有元素可用于迭代。隊列的頭 是嘗試添加到隊列中的首個已排隊插入線程的元素;如果沒有這樣的已排隊線程,則沒有可用于移除的元素并且 poll() 將會返回 null。對于其他 Collection 方法(例如 contains),SynchronousQueue 作為一個空 collection。此隊列不允許 null 元素。
??? 同步隊列類似于 CSP 和 Ada 中使用的 rendezvous 信道。它非常適合于傳遞性設計,在這種設計中,在一個線程中運行的對象要將某些信息、事件或任務傳遞給在另一個線程中運行的對象,它就必須與該對象同步。 “
??? 看起來很有意思吧。隊列竟然是沒有內部容量的。這個隊列其實是BlockingQueue的一種實現。每個插入操作必須等待另一個線程的對應移除操作,反之亦然。它給我們提供了在線程之間交換單一元素的極輕量級方法
??
應用舉例:我們要在多個線程中傳遞一個變量。 ?? 代碼如下(其實就是生產者消費者模式)
Java代碼 ?
import ?java.util.Arrays; ??import ?java.util.List; ??import ?java.util.concurrent.BlockingQueue; ??import ?java.util.concurrent.SynchronousQueue; ???? public ?class ?TestSynchronousQueue?{ ???? ????class ?Producer?implements ?Runnable?{ ?? ????????private ?BlockingQueue<String>?queue; ?? ????????List<String>?objects?=?Arrays.asList("one" ,?"two" ,?"three" ); ?? ?? ????????public ?Producer(BlockingQueue<String>?q)?{ ?? ????????????this .queue?=?q; ?? ????????} ?? ?? ????????@Override ?? ????????public ?void ?run()?{ ?? ????????????try ?{ ?? ????????????????for ?(String?s?:?objects)?{ ?? ????????????????????queue.put(s);?? ????????????????????System.out.printf("put:%s%n" ,s); ?? ????????????????} ?? ????????????????queue.put("Done" );?? ????????????}?catch ?(InterruptedException?e)?{ ?? ????????????????e.printStackTrace(); ?? ????????????} ?? ????????} ?? ????} ?? ?? ????class ?Consumer?implements ?Runnable?{ ?? ????????private ?BlockingQueue<String>?queue; ?? ?? ????????public ?Consumer(BlockingQueue<String>?q)?{ ?? ????????????this .queue?=?q; ?? ????????} ?? ?? ????????@Override ?? ????????public ?void ?run()?{ ?? ????????????String?obj?=?null ; ?? ????????????try ?{ ?? ????????????????while ?(!((obj?=?queue.take()).equals("Done" )))?{ ?? ????????????????????System.out.println(obj);?? ????????????????????Thread.sleep(3000 );??????? ????????????????} ?? ????????????}?catch ?(InterruptedException?e)?{ ?? ????????????????e.printStackTrace(); ?? ????????????} ?? ????????} ?? ????} ?? ?? ????public ?static ?void ?main(String[]?args)?{ ?? ????????BlockingQueue<String>?q=new ?SynchronousQueue<String>(); ?? ????????TestSynchronousQueue?t=new ?TestSynchronousQueue(); ?? ????????new ?Thread(t.new ?Producer(q)).start(); ?? ????????new ?Thread(t.new ?Consumer(q)).start(); ?? ????} ?? ?? }?? import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;public class TestSynchronousQueue {class Producer implements Runnable {private BlockingQueue<String> queue;List<String> objects = Arrays.asList("one", "two", "three");public Producer(BlockingQueue<String> q) {this.queue = q;}@Overridepublic void run() {try {for (String s : objects) {queue.put(s);// 產生數據放入隊列中System.out.printf("put:%s%n",s);}queue.put("Done");// 已完成的標志} catch (InterruptedException e) {e.printStackTrace();}}}class Consumer implements Runnable {private BlockingQueue<String> queue;public Consumer(BlockingQueue<String> q) {this.queue = q;}@Overridepublic void run() {String obj = null;try {while (!((obj = queue.take()).equals("Done"))) {System.out.println(obj);//從隊列中讀取對象Thread.sleep(3000); //故意sleep,證明Producer是put不進去的}} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) {BlockingQueue<String> q=new SynchronousQueue<String>();TestSynchronousQueue t=new TestSynchronousQueue();new Thread(t.new Producer(q)).start();new Thread(t.new Consumer(q)).start();}}
?? 總結:SynchronousQueue主要用于單個元素在多線程之間的傳遞
------------------------------------------------------------
更多的Java多線程資料,歡迎訪問 http://janeky.iteye.com/category/124727
?
總結
以上是生活随笔 為你收集整理的java多线程学习-java.util.concurrent详解 的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網站內容還不錯,歡迎將生活随笔 推薦給好友。