日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java多线程学习-java.util.concurrent详解

發布時間:2024/4/17 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java多线程学习-java.util.concurrent详解 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

http://janeky.iteye.com/category/124727

java多線程學習-java.util.concurrent詳解(一) Latch/Barrier

  • 博客分類:
  • java多線程
多線程Java編程threadJDK Java1.5提供了一個非常高效實用的多線程包:java.util.concurrent, 提供了大量高級工具,可以幫助開發者編寫高效、易維護、結構清晰的Java多線程程序。從這篇blog起,我將跟大家一起共同學習這些新的Java多線程構件

1. CountDownLatch
??? 我們先來學習一下JDK1.5 API中關于這個類的詳細介紹:
“一個同步輔助類,在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待。 用給定的計數 初始化 CountDownLatch。由于調用了 countDown() 方法,所以在當前計數到達零之前,await 方法會一直受阻塞。之后,會釋放所有等待的線程,await 的所有后續調用都將立即返回。這種現象只出現一次——計數無法被重置。如果需要重置計數,請考慮使用 CyclicBarrier?!?

??? 這就是說,CountDownLatch可以用來管理一組相關的線程執行,只需在主線程中調用CountDownLatch 的await方法(一直阻塞),讓各個線程調用countDown方法。當所有的線程都只需完countDown了,await也順利返回,不再阻塞了。在這樣情況下尤其適用:將一個任務分成若干線程執行,等到所有線程執行完,再進行匯總處理。

??? 下面我舉一個非常簡單的例子。假設我們要打印1-100,最后再輸出“Ok“。1-100的打印順序不要求統一,只需保證“Ok“是在最后出現即可。

??? 解決方案:我們定義一個CountDownLatch,然后開10個線程分別打印(n-1)*10+1至(n-1)*10+10。主線程中調用await方法等待所有線程的執行完畢,每個線程執行完畢后都調用countDown方法。最后再await返回后打印“Ok”。

具體代碼如下(本代碼參考了JDK示例代碼):
Java代碼 ?
  • import?java.util.concurrent.CountDownLatch; ??
  • /** ?
  • ?*?示例:CountDownLatch的使用舉例 ?
  • ?*?Mail:?ken@iamcoding.com ?
  • ?*?@author?janeky ?
  • ?*/??
  • public?class?TestCountDownLatch?{ ??
  • ????private?static?final?int?N?=?10; ??
  • ??
  • ????public?static?void?main(String[]?args)?throws?InterruptedException?{ ??
  • ????????CountDownLatch?doneSignal?=?new?CountDownLatch(N); ??
  • ????????CountDownLatch?startSignal?=?new?CountDownLatch(1);//開始執行信號 ??
  • ??
  • ????????for?(int?i?=?1;?i?<=?N;?i++)?{ ??
  • ????????????new?Thread(new?Worker(i,?doneSignal,?startSignal)).start();//線程啟動了 ??
  • ????????} ??
  • ????????System.out.println("begin------------"); ??
  • ????????startSignal.countDown();//開始執行啦 ??
  • ????????doneSignal.await();//等待所有的線程執行完畢 ??
  • ????????System.out.println("Ok"); ??
  • ??
  • ????} ??
  • ??
  • ????static?class?Worker?implements?Runnable?{ ??
  • ????????private?final?CountDownLatch?doneSignal; ??
  • ????????private?final?CountDownLatch?startSignal; ??
  • ????????private?int?beginIndex; ??
  • ??
  • ????????Worker(int?beginIndex,?CountDownLatch?doneSignal, ??
  • ????????????????CountDownLatch?startSignal)?{ ??
  • ????????????this.startSignal?=?startSignal; ??
  • ????????????this.beginIndex?=?beginIndex; ??
  • ????????????this.doneSignal?=?doneSignal; ??
  • ????????} ??
  • ??
  • ????????public?void?run()?{ ??
  • ????????????try?{ ??
  • ????????????????startSignal.await();?//等待開始執行信號的發布 ??
  • ????????????????beginIndex?=?(beginIndex?-?1)?*?10?+?1; ??
  • ????????????????for?(int?i?=?beginIndex;?i?<=?beginIndex?+?10;?i++)?{ ??
  • ????????????????????System.out.println(i); ??
  • ????????????????} ??
  • ????????????}?catch?(InterruptedException?e)?{ ??
  • ????????????????e.printStackTrace(); ??
  • ????????????}?finally?{ ??
  • ????????????????doneSignal.countDown(); ??
  • ????????????} ??
  • ????????} ??
  • ????} ??
  • }??
  • import java.util.concurrent.CountDownLatch; /*** 示例:CountDownLatch的使用舉例* Mail: ken@iamcoding.com* @author janeky*/ public class TestCountDownLatch {private static final int N = 10;public static void main(String[] args) throws InterruptedException {CountDownLatch doneSignal = new CountDownLatch(N);CountDownLatch startSignal = new CountDownLatch(1);//開始執行信號for (int i = 1; i <= N; i++) {new Thread(new Worker(i, doneSignal, startSignal)).start();//線程啟動了}System.out.println("begin------------");startSignal.countDown();//開始執行啦doneSignal.await();//等待所有的線程執行完畢System.out.println("Ok");}static class Worker implements Runnable {private final CountDownLatch doneSignal;private final CountDownLatch startSignal;private int beginIndex;Worker(int beginIndex, CountDownLatch doneSignal,CountDownLatch startSignal) {this.startSignal = startSignal;this.beginIndex = beginIndex;this.doneSignal = doneSignal;}public void run() {try {startSignal.await(); //等待開始執行信號的發布beginIndex = (beginIndex - 1) * 10 + 1;for (int i = beginIndex; i <= beginIndex + 10; i++) {System.out.println(i);}} catch (InterruptedException e) {e.printStackTrace();} finally {doneSignal.countDown();}}} }

    ??? 總結:CounDownLatch對于管理一組相關線程非常有用。上述示例代碼中就形象地描述了兩種使用情況。第一種是計算器為1,代表了兩種狀態,開關。第二種是計數器為N,代表等待N個操作完成。今后我們在編寫多線程程序時,可以使用這個構件來管理一組獨立線程的執行。

    2. CyclicBarrier
    ??? 我們先來學習一下JDK1.5 API中關于這個類的詳細介紹:
    ??? “一個同步輔助類,它允許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)。在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier 很有用。因為該 barrier 在釋放等待線程后可以重用,所以稱它為循環 的 barrier。
    ??? CyclicBarrier 支持一個可選的 Runnable 命令,在一組線程中的最后一個線程到達之后(但在釋放所有線程之前),該命令只在每個屏障點運行一次。若在繼續所有參與線程之前更新共享狀態,此屏障操作 很有用。

    ??? 我們在學習CountDownLatch的時候就提到了CyclicBarrier。兩者究竟有什么聯系呢?引用[JCIP]中的描述“The key difference is that with a barrier, all the threads must come together at a barrier point at the same time in order to proceed. Latches are for waiting for events; barriers are for waiting for other threads。CyclicBarrier等待所有的線程一起完成后再執行某個動作。這個功能CountDownLatch也同樣可以實現。但是CountDownLatch更多時候是在等待某個事件的發生。在CyclicBarrier中,所有的線程調用await方法,等待其他線程都執行完。

    ??? 舉一個很簡單的例子,今天晚上我們哥們4個去Happy。就互相通知了一下:晚上八點準時到xx酒吧門前集合,不見不散!。有個哥們住的近,早早就到了。有的事務繁忙,剛好踩點到了。無論怎樣,先來的都不能獨自行動,只能等待所有人

    代碼如下(參考了網上給的一些教程)
    Java代碼 ?
  • import?java.util.Random; ??
  • import?java.util.concurrent.BrokenBarrierException; ??
  • import?java.util.concurrent.CyclicBarrier; ??
  • import?java.util.concurrent.ExecutorService; ??
  • import?java.util.concurrent.Executors; ??
  • ??
  • public?class?TestCyclicBarrier?{ ??
  • ??
  • ????public?static?void?main(String[]?args)?{ ??
  • ???? ??
  • ????????ExecutorService?exec?=?Executors.newCachedThreadPool();????? ??
  • ????????final?Random?random=new?Random(); ??
  • ???????? ??
  • ????????final?CyclicBarrier?barrier=new?CyclicBarrier(4,new?Runnable(){ ??
  • ????????????@Override??
  • ????????????public?void?run()?{ ??
  • ????????????????System.out.println("大家都到齊了,開始happy去"); ??
  • ????????????}}); ??
  • ???????? ??
  • ????????for(int?i=0;i<4;i++){ ??
  • ????????????exec.execute(new?Runnable(){ ??
  • ????????????????@Override??
  • ????????????????public?void?run()?{ ??
  • ????????????????????try?{ ??
  • ????????????????????????Thread.sleep(random.nextInt(1000)); ??
  • ????????????????????}?catch?(InterruptedException?e)?{ ??
  • ????????????????????????e.printStackTrace(); ??
  • ????????????????????} ??
  • ????????????????????System.out.println(Thread.currentThread().getName()+"到了,其他哥們呢"); ??
  • ????????????????????try?{ ??
  • ????????????????????????barrier.await();//等待其他哥們 ??
  • ????????????????????}?catch?(InterruptedException?e)?{ ??
  • ????????????????????????e.printStackTrace(); ??
  • ????????????????????}?catch?(BrokenBarrierException?e)?{ ??
  • ????????????????????????e.printStackTrace(); ??
  • ????????????????????} ??
  • ????????????????}}); ??
  • ????????} ??
  • ????????exec.shutdown(); ??
  • ????} ??
  • ??
  • }??
  • import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;public class TestCyclicBarrier {public static void main(String[] args) {ExecutorService exec = Executors.newCachedThreadPool(); final Random random=new Random();final CyclicBarrier barrier=new CyclicBarrier(4,new Runnable(){@Overridepublic void run() {System.out.println("大家都到齊了,開始happy去");}});for(int i=0;i<4;i++){exec.execute(new Runnable(){@Overridepublic void run() {try {Thread.sleep(random.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName()+"到了,其他哥們呢");try {barrier.await();//等待其他哥們} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}});}exec.shutdown();}}

    ??? 關于await方法要特別注意一下,它有可能在阻塞的過程中由于某些原因被中斷

    ??? 總結:CyclicBarrier就是一個柵欄,等待所有線程到達后再執行相關的操作。barrier 在釋放等待線程后可以重用。

    更多的Java編程資料,歡迎訪問我的blog:http://janeky.iteye.com,希望能夠與你有更多的交流

    未完待續
    • src.rar (1.3 KB)
    • 下載次數: 24

      java多線程學習-java.util.concurrent詳解(二)Semaphore/FutureTask/Exchanger

      • 博客分類:
      • java多線程
      Java多線程Exchangethread算法 前一篇文章 http://janeky.iteye.com/category/124727
      我們學習了java.util.concurrent的CountDownLatch和CyclicBarrier
      今天我們繼續共同來探討其他的多線程組件
      -----------------------------------------------------------------------------
      3. Semaphore
      ??? 我們先來學習一下JDK1.5 API中關于這個類的詳細介紹:
      “一個計數信號量。從概念上講,信號量維護了一個許可集。如有必要,在許可可用前會阻塞每一個 acquire(),然后再獲取該許可。每個 release() 添加一個許可,從而可能釋放一個正在阻塞的獲取者。但是,不使用實際的許可對象,Semaphore 只對可用許可的號碼進行計數,并采取相應的行動?!?

      ??? 我們一般用它來控制某個對象的線程訪問對象

      ??? 例如,對于某個容器,我們規定,最多只能容納n個線程同時操作
      使用信號量來模擬實現


      具體代碼如下(參考 [JCIP])
      Java代碼 ?
    • import?java.util.Collections; ??
    • import?java.util.HashSet; ??
    • import?java.util.Set; ??
    • import?java.util.concurrent.ExecutorService; ??
    • import?java.util.concurrent.Executors; ??
    • import?java.util.concurrent.Semaphore; ??
    • ??
    • public?class?TestSemaphore?{ ??
    • ??
    • ????public?static?void?main(String[]?args)?{ ??
    • ????????ExecutorService?exec?=?Executors.newCachedThreadPool(); ??
    • ????????TestSemaphore?t?=?new?TestSemaphore(); ??
    • ????????final?BoundedHashSet<String>?set?=?t.getSet(); ??
    • ??
    • ????????for?(int?i?=?0;?i?<?3;?i++)?{//三個線程同時操作add ??
    • ????????????exec.execute(new?Runnable()?{ ??
    • ????????????????public?void?run()?{ ??
    • ????????????????????try?{ ??
    • ????????????????????????set.add(Thread.currentThread().getName()); ??
    • ????????????????????}?catch?(InterruptedException?e)?{ ??
    • ????????????????????????e.printStackTrace(); ??
    • ????????????????????} ??
    • ????????????????} ??
    • ????????????}); ??
    • ????????} ??
    • ??
    • ????????for?(int?j?=?0;?j?<?3;?j++)?{//三個線程同時操作remove ??
    • ????????????exec.execute(new?Runnable()?{ ??
    • ????????????????public?void?run()?{ ??
    • ????????????????????set.remove(Thread.currentThread().getName()); ??
    • ????????????????} ??
    • ????????????}); ??
    • ????????} ??
    • ????????exec.shutdown(); ??
    • ????} ??
    • ??
    • ????public?BoundedHashSet<String>?getSet()?{ ??
    • ????????return?new?BoundedHashSet<String>(2);//定義一個邊界約束為2的線程 ??
    • ????} ??
    • ??
    • ????class?BoundedHashSet<T>?{ ??
    • ????????private?final?Set<T>?set; ??
    • ????????private?final?Semaphore?semaphore; ??
    • ??
    • ????????public?BoundedHashSet(int?bound)?{ ??
    • ????????????this.set?=?Collections.synchronizedSet(new?HashSet<T>()); ??
    • ????????????this.semaphore?=?new?Semaphore(bound,?true); ??
    • ????????} ??
    • ??
    • ????????public?void?add(T?o)?throws?InterruptedException?{ ??
    • ????????????semaphore.acquire();//信號量控制可訪問的線程數目 ??
    • ????????????set.add(o); ??
    • ????????????System.out.printf("add:%s%n",o); ??
    • ????????} ??
    • ??
    • ????????public?void?remove(T?o)?{ ??
    • ????????????if?(set.remove(o)) ??
    • ????????????????semaphore.release();//釋放掉信號量 ??
    • ????????????System.out.printf("remove:%s%n",o); ??
    • ????????} ??
    • ????} ??
    • }??
    • import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore;public class TestSemaphore {public static void main(String[] args) {ExecutorService exec = Executors.newCachedThreadPool();TestSemaphore t = new TestSemaphore();final BoundedHashSet<String> set = t.getSet();for (int i = 0; i < 3; i++) {//三個線程同時操作addexec.execute(new Runnable() {public void run() {try {set.add(Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}}});}for (int j = 0; j < 3; j++) {//三個線程同時操作removeexec.execute(new Runnable() {public void run() {set.remove(Thread.currentThread().getName());}});}exec.shutdown();}public BoundedHashSet<String> getSet() {return new BoundedHashSet<String>(2);//定義一個邊界約束為2的線程}class BoundedHashSet<T> {private final Set<T> set;private final Semaphore semaphore;public BoundedHashSet(int bound) {this.set = Collections.synchronizedSet(new HashSet<T>());this.semaphore = new Semaphore(bound, true);}public void add(T o) throws InterruptedException {semaphore.acquire();//信號量控制可訪問的線程數目set.add(o);System.out.printf("add:%s%n",o);}public void remove(T o) {if (set.remove(o))semaphore.release();//釋放掉信號量System.out.printf("remove:%s%n",o);}} }

      ??? 總結:Semaphore通常用于對象池的控制

      4.FutureTask
      ??? 我們先來學習一下JDK1.5 API中關于這個類的詳細介紹:

      ??? “取消的異步計算。利用開始和取消計算的方法、查詢計算是否完成的方法和獲取計算結果的方法,此類提供了對 Future 的基本實現。僅在計算完成時才能獲取結果;如果計算尚未完成,則阻塞 get 方法。一旦計算完成,就不能再重新開始或取消計算。
      可使用 FutureTask 包裝 Callable 或 Runnable 對象。因為 FutureTask 實現了 Runnable,所以可將 FutureTask 提交給 Executor 執行。
      除了作為一個獨立的類外,此類還提供了 protected 功能,這在創建自定義任務類時可能很有用。 “

      ??? 應用舉例:我們的算法中有一個很耗時的操作,在編程的是,我們希望將它獨立成一個模塊,調用的時候當做它是立刻返回的,并且可以隨時取消的

      具體代碼如下(參考 [JCIP])
      Java代碼 ?
    • import?java.util.concurrent.Callable; ??
    • import?java.util.concurrent.ExecutionException; ??
    • import?java.util.concurrent.ExecutorService; ??
    • import?java.util.concurrent.Executors; ??
    • import?java.util.concurrent.FutureTask; ??
    • ??
    • public?class?TestFutureTask?{ ??
    • ??
    • ????public?static?void?main(String[]?args)?{ ??
    • ????????ExecutorService?exec=Executors.newCachedThreadPool(); ??
    • ???????? ??
    • ????????FutureTask<String>?task=new?FutureTask<String>(new?Callable<String>(){//FutrueTask的構造參數是一個Callable接口 ??
    • ????????????@Override??
    • ????????????public?String?call()?throws?Exception?{ ??
    • ????????????????return?Thread.currentThread().getName();//這里可以是一個異步操作 ??
    • ????????????}}); ??
    • ???????????? ??
    • ????????????try?{ ??
    • ????????????????exec.execute(task);//FutureTask實際上也是一個線程 ??
    • ????????????????String?result=task.get();//取得異步計算的結果,如果沒有返回,就會一直阻塞等待 ??
    • ????????????????System.out.printf("get:%s%n",result); ??
    • ????????????}?catch?(InterruptedException?e)?{ ??
    • ????????????????e.printStackTrace(); ??
    • ????????????}?catch?(ExecutionException?e)?{ ??
    • ????????????????e.printStackTrace(); ??
    • ????????????} ??
    • ????} ??
    • ??
    • }??
    • import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask;public class TestFutureTask {public static void main(String[] args) {ExecutorService exec=Executors.newCachedThreadPool();FutureTask<String> task=new FutureTask<String>(new Callable<String>(){//FutrueTask的構造參數是一個Callable接口@Overridepublic String call() throws Exception {return Thread.currentThread().getName();//這里可以是一個異步操作}});try {exec.execute(task);//FutureTask實際上也是一個線程String result=task.get();//取得異步計算的結果,如果沒有返回,就會一直阻塞等待System.out.printf("get:%s%n",result);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}}

      ??? 總結:FutureTask其實就是新建了一個線程單獨執行,使得線程有一個返回值,方便程序的編寫

      5. Exchanger
      ??? 我們先來學習一下JDK1.5 API中關于這個類的詳細介紹:
      ??? “可以在pair中對元素進行配對和交換的線程的同步點。每個線程將條目上的某個方法呈現給 exchange 方法,與伙伴線程進行匹配,并且在返回時接收其伙伴的對象。Exchanger 可能被視為 SynchronousQueue 的雙向形式。Exchanger 可能在應用程序(比如遺傳算法和管道設計)中很有用。 “

      ??? 應用舉例:有兩個緩存區,兩個線程分別向兩個緩存區fill和take,當且僅當一個滿了,兩個緩存區交換

      ??? 代碼如下(參考了網上給的示例?? http://hi.baidu.com/webidea/blog/item/2995e731e53ad5a55fdf0e7d.html)
      Java代碼 ?
    • import?java.util.ArrayList; ??
    • import?java.util.concurrent.Exchanger; ??
    • ??
    • public?class?TestExchanger?{ ??
    • ??
    • ????public?static?void?main(String[]?args)?{ ??
    • ????????final?Exchanger<ArrayList<Integer>>?exchanger?=?new?Exchanger<ArrayList<Integer>>(); ??
    • ????????final?ArrayList<Integer>?buff1?=?new?ArrayList<Integer>(10); ??
    • ????????final?ArrayList<Integer>?buff2?=?new?ArrayList<Integer>(10); ??
    • ??
    • ????????new?Thread(new?Runnable()?{ ??
    • ????????????@Override??
    • ????????????public?void?run()?{ ??
    • ????????????????ArrayList<Integer>?buff?=?buff1; ??
    • ????????????????try?{ ??
    • ????????????????????while?(true)?{ ??
    • ????????????????????????if?(buff.size()?>=?10)?{ ??
    • ????????????????????????????buff?=?exchanger.exchange(buff);//開始跟另外一個線程交互數據 ??
    • ????????????????????????????System.out.println("exchange?buff1"); ??
    • ????????????????????????????buff.clear(); ??
    • ????????????????????????} ??
    • ????????????????????????buff.add((int)(Math.random()*100)); ??
    • ????????????????????????Thread.sleep((long)(Math.random()*1000)); ??
    • ????????????????????} ??
    • ????????????????}?catch?(InterruptedException?e)?{ ??
    • ????????????????????e.printStackTrace(); ??
    • ????????????????} ??
    • ????????????} ??
    • ????????}).start(); ??
    • ???????? ??
    • ????????new?Thread(new?Runnable(){ ??
    • ????????????@Override??
    • ????????????public?void?run()?{ ??
    • ????????????????ArrayList<Integer>?buff=buff2; ??
    • ????????????????while(true){ ??
    • ????????????????????try?{ ??
    • ????????????????????????for(Integer?i:buff){ ??
    • ????????????????????????????System.out.println(i); ??
    • ????????????????????????} ??
    • ????????????????????????Thread.sleep(1000); ??
    • ????????????????????????buff=exchanger.exchange(buff);//開始跟另外一個線程交換數據 ??
    • ????????????????????????System.out.println("exchange?buff2"); ??
    • ????????????????????}?catch?(InterruptedException?e)?{ ??
    • ????????????????????????e.printStackTrace(); ??
    • ????????????????????} ??
    • ????????????????} ??
    • ????????????}}).start(); ??
    • ????} ??
    • }??
    • import java.util.ArrayList; import java.util.concurrent.Exchanger;public class TestExchanger {public static void main(String[] args) {final Exchanger<ArrayList<Integer>> exchanger = new Exchanger<ArrayList<Integer>>();final ArrayList<Integer> buff1 = new ArrayList<Integer>(10);final ArrayList<Integer> buff2 = new ArrayList<Integer>(10);new Thread(new Runnable() {@Overridepublic void run() {ArrayList<Integer> buff = buff1;try {while (true) {if (buff.size() >= 10) {buff = exchanger.exchange(buff);//開始跟另外一個線程交互數據System.out.println("exchange buff1");buff.clear();}buff.add((int)(Math.random()*100));Thread.sleep((long)(Math.random()*1000));}} catch (InterruptedException e) {e.printStackTrace();}}}).start();new Thread(new Runnable(){@Overridepublic void run() {ArrayList<Integer> buff=buff2;while(true){try {for(Integer i:buff){System.out.println(i);}Thread.sleep(1000);buff=exchanger.exchange(buff);//開始跟另外一個線程交換數據System.out.println("exchange buff2");} catch (InterruptedException e) {e.printStackTrace();}}}}).start();} }

      ??? 總結:Exchanger在特定的使用場景比較有用(兩個伙伴線程之間的數據交互)
      ----------------------------------------------------------------------------------
      更多的java多線程資料,歡迎訪問 http://janeky.iteye.com/category/124727

      java多線程學習-java.util.concurrent詳解(三)ScheduledThreadPoolExecutor

      • 博客分類:
      • java多線程
      Java多線程Blog 前一篇blog http://janeky.iteye.com/category/124727我們學習了java多線程的信號量/FutureTask
      ----------------------------------------------------------------------------------

      6. ScheduledThreadPoolExecutor
      ??? 我們先來學習一下JDK1.5 API中關于這個類的詳細介紹:

      ??? "可另行安排在給定的延遲后運行命令,或者定期執行命令。需要多個輔助線程時,或者要求 ThreadPoolExecutor 具有額外的靈活性或功能時,此類要優于 Timer。
      ??? 一旦啟用已延遲的任務就執行它,但是有關何時啟用,啟用后何時執行則沒有任何實時保證。按照提交的先進先出 (FIFO) 順序來啟用那些被安排在同一執行時間的任務。

      ??? 雖然此類繼承自 ThreadPoolExecutor,但是幾個繼承的調整方法對此類并無作用。特別是,因為它作為一個使用 corePoolSize 線程和一個無界隊列的固定大小的池,所以調整 maximumPoolSize 沒有什么效果。"

      ??? 在JDK1.5之前,我們關于定時/周期操作都是通過Timer來實現的。但是Timer有以下幾種危險[JCIP]

      a. Timer是基于絕對時間的。容易受系統時鐘的影響。
      b. Timer只新建了一個線程來執行所有的TimeTask。所有TimeTask可能會相關影響
      c. Timer不會捕獲TimerTask的異常,只是簡單地停止。這樣勢必會影響其他TimeTask的執行。

      ??? 如果你是使用JDK1.5以上版本,建議用ScheduledThreadPoolExecutor代替Timer。它基本上解決了上述問題。它采用相對時間,用線程池來執行TimerTask,會出來TimerTask異常。

      ??? 下面通過一個簡單的實例來闡述ScheduledThreadPoolExecutor的使用。
      ??
      ? ? 我們定期讓定時器拋異常
      ??? 我們定期從控制臺打印系統時間


      代碼如下(參考了網上的一些代碼,在此表示感謝)
      Java代碼 ?
    • import?java.util.concurrent.ScheduledThreadPoolExecutor; ??
    • import?java.util.concurrent.TimeUnit; ??
    • ??
    • ??
    • public?class?TestScheduledThreadPoolExecutor?{ ??
    • ???? ??
    • ????public?static?void?main(String[]?args)?{ ??
    • ????????ScheduledThreadPoolExecutor?exec=new?ScheduledThreadPoolExecutor(1); ??
    • ???????? ??
    • ????????exec.scheduleAtFixedRate(new?Runnable(){//每隔一段時間就觸發異常 ??
    • ????????????@Override??
    • ????????????public?void?run()?{ ??
    • ????????????????throw?new?RuntimeException(); ??
    • ????????????}},?1000,?5000,?TimeUnit.MILLISECONDS); ??
    • ???????? ??
    • ????????exec.scheduleAtFixedRate(new?Runnable(){//每隔一段時間打印系統時間,證明兩者是互不影響的 ??
    • ????????????@Override??
    • ????????????public?void?run()?{ ??
    • ????????????????System.out.println(System.nanoTime()); ??
    • ????????????}},?1000,?2000,?TimeUnit.MILLISECONDS); ??
    • ????} ??
    • ??
    • }??
    • import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit;public class TestScheduledThreadPoolExecutor {public static void main(String[] args) {ScheduledThreadPoolExecutor exec=new ScheduledThreadPoolExecutor(1);exec.scheduleAtFixedRate(new Runnable(){//每隔一段時間就觸發異常@Overridepublic void run() {throw new RuntimeException();}}, 1000, 5000, TimeUnit.MILLISECONDS);exec.scheduleAtFixedRate(new Runnable(){//每隔一段時間打印系統時間,證明兩者是互不影響的@Overridepublic void run() {System.out.println(System.nanoTime());}}, 1000, 2000, TimeUnit.MILLISECONDS);}}

      總結:是時候把你的定時器換成 ScheduledThreadPoolExecutor了

      --------------------------------------------------------------------
      更多的java多線程資料,歡迎訪問 http://janeky.iteye.com/category/124727

      ?

      java多線程學習-java.util.concurrent詳解(四) BlockingQueue

      • 博客分類:
      • java多線程
      Java多線程thread生活數據結構 前面一篇blog http://janeky.iteye.com/category/124727
      我們主要探討了ScheduledThreadPoolExecutor定時器的使用
      ---------------------------------------------------------------------------------
      7.BlockingQueue
      ??? “支持兩個附加操作的 Queue,這兩個操作是:獲取元素時等待隊列變為非空,以及存儲元素時等待空間變得可用?!?

      ??? 這里我們主要討論BlockingQueue的最典型實現:LinkedBlockingQueue 和ArrayBlockingQueue。兩者的不同是底層的數據結構不夠,一個是鏈表,另外一個是數組。
      ???
      ??? 后面將要單獨解釋其他類型的BlockingQueue和SynchronousQueue

      ??? BlockingQueue的經典用途是 生產者-消費者模式

      ??? 代碼如下:
      Java代碼 ?
    • import?java.util.Random; ??
    • import?java.util.concurrent.BlockingQueue; ??
    • import?java.util.concurrent.LinkedBlockingQueue; ??
    • ??
    • public?class?TestBlockingQueue?{ ??
    • ??
    • ????public?static?void?main(String[]?args)?{ ??
    • ????????final?BlockingQueue<Integer>?queue=new?LinkedBlockingQueue<Integer>(3); ??
    • ????????final?Random?random=new?Random(); ??
    • ???????? ??
    • ????????class?Producer?implements?Runnable{ ??
    • ????????????@Override??
    • ????????????public?void?run()?{ ??
    • ????????????????while(true){ ??
    • ????????????????????try?{ ??
    • ????????????????????????int?i=random.nextInt(100); ??
    • ????????????????????????queue.put(i);//當隊列達到容量時候,會自動阻塞的 ??
    • ????????????????????????if(queue.size()==3) ??
    • ????????????????????????{ ??
    • ????????????????????????????System.out.println("full"); ??
    • ????????????????????????} ??
    • ????????????????????}?catch?(InterruptedException?e)?{ ??
    • ????????????????????????e.printStackTrace(); ??
    • ????????????????????} ??
    • ????????????????} ??
    • ????????????} ??
    • ????????} ??
    • ???????? ??
    • ????????class?Consumer?implements?Runnable{ ??
    • ????????????@Override??
    • ????????????public?void?run()?{ ??
    • ????????????????while(true){ ??
    • ????????????????????try?{ ??
    • ????????????????????????queue.take();//當隊列為空時,也會自動阻塞 ??
    • ????????????????????????Thread.sleep(1000); ??
    • ????????????????????}?catch?(InterruptedException?e)?{ ??
    • ????????????????????????e.printStackTrace(); ??
    • ????????????????????} ??
    • ????????????????} ??
    • ????????????} ??
    • ????????} ??
    • ???????? ??
    • ????????new?Thread(new?Producer()).start(); ??
    • ????????new?Thread(new?Consumer()).start(); ??
    • ????} ??
    • ??
    • }??
    • import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue;public class TestBlockingQueue {public static void main(String[] args) {final BlockingQueue<Integer> queue=new LinkedBlockingQueue<Integer>(3);final Random random=new Random();class Producer implements Runnable{@Overridepublic void run() {while(true){try {int i=random.nextInt(100);queue.put(i);//當隊列達到容量時候,會自動阻塞的if(queue.size()==3){System.out.println("full");}} catch (InterruptedException e) {e.printStackTrace();}}}}class Consumer implements Runnable{@Overridepublic void run() {while(true){try {queue.take();//當隊列為空時,也會自動阻塞Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}}new Thread(new Producer()).start();new Thread(new Consumer()).start();}}
      ??? 總結:BlockingQueue使用時候特別注意take 和 put

      8. DelayQueue

      我們先來學習一下JDK1.5 API中關于這個類的詳細介紹:
      ??? “它是包含Delayed 元素的一個無界阻塞隊列,只有在延遲期滿時才能從中提取元素。該隊列的頭部 是延遲期滿后保存時間最長的 Delayed 元素。如果延遲都還沒有期滿,則隊列沒有頭部,并且 poll 將返回 null。當一個元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一個小于等于 0 的值時,將發生到期。即使無法使用 take 或 poll 移除未到期的元素,也不會將這些元素作為正常元素對待。例如,size 方法同時返回到期和未到期元素的計數。此隊列不允許使用 null 元素?!?

      ??? 在現實生活中,很多DelayQueue的例子。就拿上海的SB會來說明,很多國家地區的開館時間不同。你很早就來到園區,然后急急忙忙地跑到一些心儀的館區,發現有些還沒開,你吃了閉門羹。

      ??? 仔細研究DelayQueue,你會發現它其實就是一個PriorityQueue的封裝(按照delay時間排序),里面的元素都實現了Delayed接口,相關操作需要判斷延時時間是否到了。

      ??? 在實際應用中,有人拿它來管理跟實際相關的緩存、session等

      ?? 下面我就通過 “上海SB會的例子來闡述DelayQueue的用法”

      代碼如下:
      Java代碼 ?
    • import?java.util.Random; ??
    • import?java.util.concurrent.DelayQueue; ??
    • import?java.util.concurrent.Delayed; ??
    • import?java.util.concurrent.TimeUnit; ??
    • ??
    • public?class?TestDelayQueue?{ ??
    • ??
    • ????private?class?Stadium?implements?Delayed ??
    • ????{ ??
    • ????????long?trigger; ??
    • ???????? ??
    • ????????public?Stadium(long?i){ ??
    • ????????????trigger=System.currentTimeMillis()+i; ??
    • ????????} ??
    • ???????? ??
    • ????????@Override??
    • ????????public?long?getDelay(TimeUnit?arg0)?{ ??
    • ????????????long?n=trigger-System.currentTimeMillis(); ??
    • ????????????return?n; ??
    • ????????} ??
    • ??
    • ????????@Override??
    • ????????public?int?compareTo(Delayed?arg0)?{ ??
    • ????????????return?(int)(this.getDelay(TimeUnit.MILLISECONDS)-arg0.getDelay(TimeUnit.MILLISECONDS)); ??
    • ????????} ??
    • ???????? ??
    • ????????public?long?getTriggerTime(){ ??
    • ????????????return?trigger; ??
    • ????????} ??
    • ???????? ??
    • ????} ??
    • ????public?static?void?main(String[]?args)throws?Exception?{ ??
    • ????????Random?random=new?Random(); ??
    • ????????DelayQueue<Stadium>?queue=new?DelayQueue<Stadium>(); ??
    • ????????TestDelayQueue?t=new?TestDelayQueue(); ??
    • ???????? ??
    • ????????for(int?i=0;i<5;i++){ ??
    • ????????????queue.add(t.new?Stadium(random.nextInt(30000))); ??
    • ????????} ??
    • ????????Thread.sleep(2000); ??
    • ???????? ??
    • ????????while(true){ ??
    • ????????????Stadium?s=queue.take();//延時時間未到就一直等待 ??
    • ????????????if(s!=null){ ??
    • ????????????????System.out.println(System.currentTimeMillis()-s.getTriggerTime());//基本上是等于0 ??
    • ????????????} ??
    • ????????????if(queue.size()==0) ??
    • ????????????????break; ??
    • ????????} ??
    • ????} ??
    • }??
    • import java.util.Random; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit;public class TestDelayQueue {private class Stadium implements Delayed{long trigger;public Stadium(long i){trigger=System.currentTimeMillis()+i;}@Overridepublic long getDelay(TimeUnit arg0) {long n=trigger-System.currentTimeMillis();return n;}@Overridepublic int compareTo(Delayed arg0) {return (int)(this.getDelay(TimeUnit.MILLISECONDS)-arg0.getDelay(TimeUnit.MILLISECONDS));}public long getTriggerTime(){return trigger;}}public static void main(String[] args)throws Exception {Random random=new Random();DelayQueue<Stadium> queue=new DelayQueue<Stadium>();TestDelayQueue t=new TestDelayQueue();for(int i=0;i<5;i++){queue.add(t.new Stadium(random.nextInt(30000)));}Thread.sleep(2000);while(true){Stadium s=queue.take();//延時時間未到就一直等待if(s!=null){System.out.println(System.currentTimeMillis()-s.getTriggerTime());//基本上是等于0}if(queue.size()==0)break;}} }

      ??? 總結:適用于需要延時操作的隊列管理


      9. SynchronousQueue
      ??? 我們先來學習一下JDK1.5 API中關于這個類的詳細介紹:

      ??? “一種阻塞隊列,其中每個插入操作必須等待另一個線程的對應移除操作 ,反之亦然。同步隊列沒有任何內部容量,甚至連一個隊列的容量都沒有。不能在同步隊列上進行 peek,因為僅在試圖要移除元素時,該元素才存在;除非另一個線程試圖移除某個元素,否則也不能(使用任何方法)插入元素;也不能迭代隊列,因為其中沒有元素可用于迭代。隊列的頭 是嘗試添加到隊列中的首個已排隊插入線程的元素;如果沒有這樣的已排隊線程,則沒有可用于移除的元素并且 poll() 將會返回 null。對于其他 Collection 方法(例如 contains),SynchronousQueue 作為一個空 collection。此隊列不允許 null 元素。
      ??? 同步隊列類似于 CSP 和 Ada 中使用的 rendezvous 信道。它非常適合于傳遞性設計,在這種設計中,在一個線程中運行的對象要將某些信息、事件或任務傳遞給在另一個線程中運行的對象,它就必須與該對象同步。 “

      ??? 看起來很有意思吧。隊列竟然是沒有內部容量的。這個隊列其實是BlockingQueue的一種實現。每個插入操作必須等待另一個線程的對應移除操作,反之亦然。它給我們提供了在線程之間交換單一元素的極輕量級方法

      ?? 應用舉例:我們要在多個線程中傳遞一個變量。

      ?? 代碼如下(其實就是生產者消費者模式)
      Java代碼 ?
    • import?java.util.Arrays; ??
    • import?java.util.List; ??
    • import?java.util.concurrent.BlockingQueue; ??
    • import?java.util.concurrent.SynchronousQueue; ??
    • ??
    • public?class?TestSynchronousQueue?{ ??
    • ??
    • ????class?Producer?implements?Runnable?{ ??
    • ????????private?BlockingQueue<String>?queue; ??
    • ????????List<String>?objects?=?Arrays.asList("one",?"two",?"three"); ??
    • ??
    • ????????public?Producer(BlockingQueue<String>?q)?{ ??
    • ????????????this.queue?=?q; ??
    • ????????} ??
    • ??
    • ????????@Override??
    • ????????public?void?run()?{ ??
    • ????????????try?{ ??
    • ????????????????for?(String?s?:?objects)?{ ??
    • ????????????????????queue.put(s);//?產生數據放入隊列中 ??
    • ????????????????????System.out.printf("put:%s%n",s); ??
    • ????????????????} ??
    • ????????????????queue.put("Done");//?已完成的標志 ??
    • ????????????}?catch?(InterruptedException?e)?{ ??
    • ????????????????e.printStackTrace(); ??
    • ????????????} ??
    • ????????} ??
    • ????} ??
    • ??
    • ????class?Consumer?implements?Runnable?{ ??
    • ????????private?BlockingQueue<String>?queue; ??
    • ??
    • ????????public?Consumer(BlockingQueue<String>?q)?{ ??
    • ????????????this.queue?=?q; ??
    • ????????} ??
    • ??
    • ????????@Override??
    • ????????public?void?run()?{ ??
    • ????????????String?obj?=?null; ??
    • ????????????try?{ ??
    • ????????????????while?(!((obj?=?queue.take()).equals("Done")))?{ ??
    • ????????????????????System.out.println(obj);//從隊列中讀取對象 ??
    • ????????????????????Thread.sleep(3000);?????//故意sleep,證明Producer是put不進去的 ??
    • ????????????????} ??
    • ????????????}?catch?(InterruptedException?e)?{ ??
    • ????????????????e.printStackTrace(); ??
    • ????????????} ??
    • ????????} ??
    • ????} ??
    • ??
    • ????public?static?void?main(String[]?args)?{ ??
    • ????????BlockingQueue<String>?q=new?SynchronousQueue<String>(); ??
    • ????????TestSynchronousQueue?t=new?TestSynchronousQueue(); ??
    • ????????new?Thread(t.new?Producer(q)).start(); ??
    • ????????new?Thread(t.new?Consumer(q)).start(); ??
    • ????} ??
    • ??
    • }??
    • import java.util.Arrays; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue;public class TestSynchronousQueue {class Producer implements Runnable {private BlockingQueue<String> queue;List<String> objects = Arrays.asList("one", "two", "three");public Producer(BlockingQueue<String> q) {this.queue = q;}@Overridepublic void run() {try {for (String s : objects) {queue.put(s);// 產生數據放入隊列中System.out.printf("put:%s%n",s);}queue.put("Done");// 已完成的標志} catch (InterruptedException e) {e.printStackTrace();}}}class Consumer implements Runnable {private BlockingQueue<String> queue;public Consumer(BlockingQueue<String> q) {this.queue = q;}@Overridepublic void run() {String obj = null;try {while (!((obj = queue.take()).equals("Done"))) {System.out.println(obj);//從隊列中讀取對象Thread.sleep(3000); //故意sleep,證明Producer是put不進去的}} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) {BlockingQueue<String> q=new SynchronousQueue<String>();TestSynchronousQueue t=new TestSynchronousQueue();new Thread(t.new Producer(q)).start();new Thread(t.new Consumer(q)).start();}}

      ?? 總結:SynchronousQueue主要用于單個元素在多線程之間的傳遞

      ------------------------------------------------------------
      更多的Java多線程資料,歡迎訪問 http://janeky.iteye.com/category/124727

      ?

      總結

      以上是生活随笔為你收集整理的java多线程学习-java.util.concurrent详解的全部內容,希望文章能夠幫你解決所遇到的問題。

      如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。