java.util.concurrent包API学习笔记
生活随笔
收集整理的這篇文章主要介紹了
java.util.concurrent包API学习笔记
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
newFixedThreadPool
創建一個固定大小的線程池。
shutdown():用于關閉啟動線程,如果不調用該語句,jvm不會關閉。
awaitTermination():用于等待子線程結束,再繼續執行下面的代碼。該例中我設置一直等著子線程結束。Java代碼 收藏代碼
public class Test { public static void main(String[] args) throws IOException, InterruptedException { ExecutorService service = Executors.newFixedThreadPool(2); for (int i = 0; i < 4; i++) { Runnable run = new Runnable() { @Override public void run() { System.out.println("thread start"); } }; service.execute(run); } service.shutdown(); service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); System.out.println("all thread complete"); }
} 輸出:
thread start
thread start
thread start
thread start
all thread complete
newScheduledThreadPool
這個先不說,我喜歡用spring quartz.
CyclicBarrier
假設有只有的一個場景:每個線程代表一個跑步運動員,當運動員都準備好后,才一起出發,只要有一個人沒有準備好,大家都等待.Java代碼 收藏代碼
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; class Runner implements Runnable { private CyclicBarrier barrier; private String name; public Runner(CyclicBarrier barrier, String name) { super(); this.barrier = barrier; this.name = name; } @Override public void run() { try { Thread.sleep(1000 * (new Random()).nextInt(8)); System.out.println(name + " 準備OK."); barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(name + " Go!!"); }
} public class Race { public static void main(String[] args) throws IOException, InterruptedException { CyclicBarrier barrier = new CyclicBarrier(3); ExecutorService executor = Executors.newFixedThreadPool(3); executor.submit(new Thread(new Runner(barrier, "zhangsan"))); executor.submit(new Thread(new Runner(barrier, "lisi"))); executor.submit(new Thread(new Runner(barrier, "wangwu"))); executor.shutdown(); } } 輸出:
wangwu 準備OK.
zhangsan 準備OK.
lisi 準備OK.
lisi Go!!
zhangsan Go!!
wangwu Go!!
ThreadPoolExecutornewFixedThreadPool生成一個固定的線程池,顧名思義,線程池的線程是不會釋放的,即使它是Idle。這就會產生性能問題,比如如果線程池的大小為200,當全部使用完畢后,所有的線程會繼續留在池中,相應的內存和線程切換(while(true)+sleep循環)都會增加。如果要避免這個問題,就必須直接使用ThreadPoolExecutor()來構造。可以像Tomcat的線程池一樣設置“最大線程數”、“最小線程數”和“空閑線程keepAlive的時間”。ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) corePoolSize:池中所保存的線程數,包括空閑線程(非最大同時干活的線程數)。如果池中線程數多于 corePoolSize,則這些多出的線程在空閑時間超過 keepAliveTime 時將會終止。
maximumPoolSize:線程池中最大線程數
keepAliveTime:線程空閑回收的時間
unit:keepAliveTime的單位
workQueue:保存任務的隊列,可以如下選擇:無界隊列: new LinkedBlockingQueue<Runnable>();有界隊列: new ArrayBlockingQueue<Runnable>(8);你不想讓客戶端無限的請求吃光你的CPU和內存吧,那就用有界隊列
handler:當提交任務數大于隊列size會拋出RejectedExecutionException,可選的值為:ThreadPoolExecutor.CallerRunsPolicy 等待隊列空閑
ThreadPoolExecutor.DiscardPolicy:丟棄要插入隊列的任務
ThreadPoolExecutor.DiscardOldestPolicy:刪除隊頭的任務
關于corePoolSize和maximumPoolSize:Java官方Docs寫道:
當新任務在方法 execute(java.lang.Runnable) 中提交時,如果運行的線程少于 corePoolSize,則創建新線程來處理請求(即使存在空閑線程)。如果運行的線程多于 corePoolSize 而少于 maximumPoolSize,則僅當隊列(queue)滿時才創建新線程。如果設置的 corePoolSize 和 maximumPoolSize 相同,則創建了固定大小的線程池。如果將 maximumPoolSize 設置為基本的無界值(如 Integer.MAX_VALUE),則允許池適應任意數量的并發任務。Java代碼 收藏代碼
public class Test { public static void main(String[] args) { BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(); ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 1, TimeUnit.DAYS, queue); for (int i = 0; i < 20; i++) { final int index = i; executor.execute(new Runnable() { public void run() { try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(String.format("thread %d finished", index)); } }); } executor.shutdown(); }
} 原子變量(Atomic )
并發庫中的BlockingQueue是一個比較好玩的類,顧名思義,就是阻塞隊列。該類主要提供了兩個方法put()和take(),前者將一個對象放到隊列中,如果隊列已經滿了,就等待直到有空閑節點;后者從head取一個對象,如果沒有對象,就等待直到有可取的對象。下面的例子比較簡單,一個讀線程,用于將要處理的文件對象添加到阻塞隊列中,另外四個寫線程用于取出文件對象,為了模擬寫操作耗時長的特點,特讓線程睡眠一段隨機長度的時間。另外,該Demo也使用到了線程池和原子整型(AtomicInteger),AtomicInteger可以在并發情況下達到原子化更新,避免使用了synchronized,而且性能非常高。由于阻塞隊列的put和take操作會阻塞,為了使線程退出,在隊列中添加了一個“標識”,算法中也叫“哨兵”,當發現這個哨兵后,寫線程就退出。Java代碼 收藏代碼
import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger; public class Test { static long randomTime() { return (long) (Math.random() * 1000); } public static void main(String[] args) { // 能容納100個文件 final BlockingQueue<File> queue = new LinkedBlockingQueue<File>(100); // 線程池 final ExecutorService exec = Executors.newFixedThreadPool(5); final File root = new File("D:\\dist\\blank"); // 完成標志 final File exitFile = new File(""); // 讀個數 final AtomicInteger rc = new AtomicInteger(); // 寫個數 final AtomicInteger wc = new AtomicInteger(); // 讀線程 Runnable read = new Runnable() { public void run() { scanFile(root); scanFile(exitFile); } public void scanFile(File file) { if (file.isDirectory()) { File[] files = file.listFiles(new FileFilter() { public boolean accept(File pathname) { return pathname.isDirectory() || pathname.getPath().endsWith(".log"); } }); for (File one : files) scanFile(one); } else { try { int index = rc.incrementAndGet(); System.out.println("Read0: " + index + " " + file.getPath()); queue.put(file); } catch (InterruptedException e) { } } } }; exec.submit(read); // 四個寫線程 for (int index = 0; index < 4; index++) { // write thread final int num = index; Runnable write = new Runnable() { String threadName = "Write" + num; public void run() { while (true) { try { Thread.sleep(randomTime()); int index = wc.incrementAndGet(); File file = queue.take(); // 隊列已經無對象 if (file == exitFile) { // 再次添加"標志",以讓其他線程正常退出 queue.put(exitFile); break; } System.out.println(threadName + ": " + index + " " + file.getPath()); } catch (InterruptedException e) { } } } }; exec.submit(write); } exec.shutdown(); } } CountDownLatch從名字可以看出,CountDownLatch是一個倒數計數的鎖,當倒數到0時觸發事件,也就是開鎖,其他人就可以進入了。在一些應用場合中,需要等待某個條件達到要求后才能做后面的事情;同時當線程都完成后也會觸發事件,以便進行后面的操作。
CountDownLatch最重要的方法是countDown()和await(),前者主要是倒數一次,后者是等待倒數到0,如果沒有到達0,就只有阻塞等待了。
一個CountDouwnLatch實例是不能重復使用的,也就是說它是一次性的,鎖一經被打開就不能再關閉使用了,如果想重復使用,請考慮使用CyclicBarrier。
下面的例子簡單的說明了CountDownLatch的使用方法,模擬了100米賽跑,10名選手已經準備就緒,只等裁判一聲令下。當所有人都到達終點時,比賽結束。Java代碼 收藏代碼
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; public class Test { public static void main(String[] args) throws InterruptedException { // 開始的倒數鎖 final CountDownLatch begin = new CountDownLatch(1); // 結束的倒數鎖 final CountDownLatch end = new CountDownLatch(10); // 十名選手 final ExecutorService exec = Executors.newFixedThreadPool(10); for (int index = 0; index < 10; index++) { final int NO = index + 1; Runnable run = new Runnable() { public void run() { try { begin.await(); Thread.sleep((long) (Math.random() * 10000)); System.out.println("No." + NO + " arrived"); } catch (InterruptedException e) { } finally { end.countDown(); } } }; exec.submit(run); } System.out.println("Game Start"); begin.countDown(); end.await(); System.out.println("Game Over"); exec.shutdown(); } } 使用Callable和Future實現線程等待和多線程返回值假設在main線程啟動一個線程,然后main線程需要等待子線程結束后,再繼續下面的操作,我們會通過join方法阻塞main線程,代碼如下:Java代碼 收藏代碼
Runnable runnable = ...;
Thread t = new Thread(runnable);
t.start();
t.join();
...... 通過JDK1.5線程池管理的線程可以使用Callable和Future實現(join()方法無法應用到在線程池線程)Java代碼 收藏代碼
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; public class Test { public static void main(String[] args) throws InterruptedException, ExecutionException { System.out.println("start main thread"); final ExecutorService exec = Executors.newFixedThreadPool(5); Callable<String> call = new Callable<String>() { public String call() throws Exception { System.out.println(" start new thread."); Thread.sleep(1000 * 5); System.out.println(" end new thread."); return "some value."; } }; Future<String> task = exec.submit(call); Thread.sleep(1000 * 2); task.get(); // 阻塞,并待子線程結束, exec.shutdown(); exec.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); System.out.println("end main thread"); } } Java代碼 收藏代碼
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; /**
* 多線程返回值測試
*/
public class ThreadTest { public static void main(String[] args) throws InterruptedException, ExecutionException { System.out.println("start main thread"); int threadCount = 5; final ExecutorService exec = Executors.newFixedThreadPool(threadCount); List<Future<Integer>> tasks = new ArrayList<Future<Integer>>(); for (int i = 0; i < threadCount; i++) { Callable<Integer> call = new Callable<Integer>() { public Integer call() throws Exception { Thread.sleep(1000); return 1; } }; tasks.add(exec.submit(call)); } long total = 0; for (Future<Integer> future : tasks) { total += future.get(); } exec.shutdown(); System.out.println("total: " + total); System.out.println("end main thread"); }
} CompletionService
這個東西的使用上很類似上面的example,不同的是,它會首先取完成任務的線程。下面的參考文章里,專門提到這個,大家有興趣可以看下,例子:Java代碼 收藏代碼
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; public class Test { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService exec = Executors.newFixedThreadPool(10); CompletionService<String> serv = new ExecutorCompletionService<String>(exec); for (int index = 0; index < 5; index++) { final int NO = index; Callable<String> downImg = new Callable<String>() { public String call() throws Exception { Thread.sleep((long) (Math.random() * 10000)); return "Downloaded Image " + NO; } }; serv.submit(downImg); } Thread.sleep(1000 * 2); System.out.println("Show web content"); for (int index = 0; index < 5; index++) { Future<String> task = serv.take(); String img = task.get(); System.out.println(img); } System.out.println("End"); // 關閉線程池 exec.shutdown(); }
} Semaphore信號量拿到信號量的線程可以進入代碼,否則就等待。通過acquire()和release()獲取和釋放訪問許可。下面的例子只允許5個線程同時進入執行acquire()和release()之間的代碼Java代碼 收藏代碼
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore; public class Test { public static void main(String[] args) { // 線程池 ExecutorService exec = Executors.newCachedThreadPool(); // 只能5個線程同時訪問 final Semaphore semp = new Semaphore(5); // 模擬20個客戶端訪問 for (int index = 0; index < 20; index++) { final int NO = index; Runnable run = new Runnable() { public void run() { try { // 獲取許可 semp.acquire(); System.out.println("Accessing: " + NO); Thread.sleep((long) (Math.random() * 10000)); // 訪問完后,釋放 semp.release(); } catch (InterruptedException e) { } } }; exec.execute(run); } // 退出線程池 exec.shutdown(); } } 參考:
jdk1.5中的線程池使用簡介
http://www.java3z.com/cwbwebhome/article/article2/2875.html
CAS原理
http://www.blogjava.net/syniii/archive/2010/11/18/338387.html?opt=admin
jdk1.5中java.util.concurrent包編寫多線程
http://hi.baidu.com/luotoo/blog/item/b895c3c2d650591e0ef47731.html
ExecutorSerive vs CompletionService
http://www.coderanch.com/t/491704/threads/java/ExecutorSerive-vs-CompletionService-- end --
轉載于:https://www.cnblogs.com/IamThat/p/4341125.html
總結
以上是生活随笔為你收集整理的java.util.concurrent包API学习笔记的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 求一个好听的小孩名字!
- 下一篇: clock函数返回负值~ (转)