Java 多线程 —— 常用并发容器
引言
本博客基于常用的并發容器,簡單概括其基本特性和簡單使用,并不涉及較深層次的原理分析和全面的場景用法。
適合對不了解并發容器的同學,工作中遇到類似的場景,能夠對文中提到的并發容器留有簡單印象就好。
一、ConcurrentHashMap
下面的程序中,切換任意Map的實現方式,如TreeMap、HashTable、ConcurrentHashMap等,運行程序,觀察執行結果:
public class T01_ConcurrentMap {public static void main(String[] args) { // Map<String, String> map = new TreeMap<>(); // Map<String, String> map = new Hashtable<>();Map<String, String> map = new ConcurrentHashMap<>(); // Map<String, String> map = new ConcurrentSkipListMap<>(); // Map<String, String> map = new HashMap<>();Random rdm = new Random();Thread[] ths = new Thread[100];CountDownLatch latch = new CountDownLatch(ths.length);long start = System.currentTimeMillis();for (int i = 0; i < ths.length; i++) {ths[i] = new Thread(() -> {for (int j = 0; j < 10000; j++)map.put("a" + rdm.nextInt(100000), "a" + rdm.nextInt(100000));latch.countDown();});}Arrays.asList(ths).forEach(t -> t.start());try {latch.await();} catch (InterruptedException e) {e.printStackTrace();}long end = System.currentTimeMillis();System.out.println(end - start);} }ConcurrentHashMap和HashTable,前者在執行效率方面要比后者高,因為HashTable在put每個對象的時候,都需要鎖定整個Map對象;而ConcurrentHashMap的實現方式是通過“分段鎖”,即將鎖的作用范圍細化,在put的時候,只鎖定對應的一段,從而提高效率。
ConcurrentSkipListMap跳表,不僅支持并發,而且是有序的,因為提供了排序功能,在性能方面可能不及Concurrent-HashMap。
二、CopyOnWriteList寫時復制容器
切換下面程序中List的實現方式(ArrayList、Vector、CopyOnWriteArrayList)觀察執行結果。
public class T02_CopyOnWriteList {public static void main(String[] args) {// List<String> lists = new ArrayList<>();// 會產生并發問題// List<String> lists = new Vector<>();List<String> lists = new CopyOnWriteArrayList<>();Random rdm = new Random();Thread[] ths = new Thread[100];for (int i = 0; i < ths.length; i++) {Runnable task = () -> {for (int j = 0; j < 1000; j++) {lists.add("a" + rdm.nextInt(10000));}};ths[i] = new Thread(task);}runAndComputeTime(ths);System.out.println(lists.size());}private static void runAndComputeTime(Thread[] ths) {long t1 = System.currentTimeMillis();Arrays.asList(ths).forEach(t -> t.start());Arrays.asList(ths).forEach(t -> {try {t.join();} catch (Exception e) {e.printStackTrace();}});long t2 = System.currentTimeMillis();System.out.println(t2 - t1);} }CopyOnWriteList寫的效率非常低,讀的效率非常高。這是因為在向寫時復制容器中添加一個元素的時候會將整個容器復制一份,再對復制后的容器進行插入;而讀的時候不需要加鎖。因此,這種容器一般用于極少修改,而讀取頻繁的應用場景。
三、Collections.synchronizedXx
Collections是一個容器工具類,通過類似synchronizedList(List<T> list)、synchronizedMap(Map<K,V> m)等命名形式的靜態方法,可以返回一個加了鎖的對應容器。實際上,通過這種方式獲得的同步容器僅僅是將普通的非線程安全的容器的方法進行synchronized封裝:
使用方法非常簡單,即傳入的非同步容器,返回一個同步容器:
List<String> strs = new ArrayList<>(); List<String> syncStrs = Collections.synchronizedList(strs);?四、ConcurrentLinkedQueue
線程安全的并發隊列,就可以使用ConcurrentLinkedQueue,是“尾進頭出”的并發隊列;另外還有一個ConcurrentLinkedDeque 是并發的雙端隊列,在實際開發中用處也非常大。
執行下面的程序,觀察輸出結果,重點理解個別方法的含義:
public static void main(String[] args) {Queue<String> strs = new ConcurrentLinkedQueue<>();Deque<String> names = new ConcurrentLinkedDeque<>();for (int i = 0; i < 10; i++) {strs.offer("S" + i); // addnames.offer("N" + i);}System.out.println("output#1 : " + strs);System.out.println("output#2 : " + strs.size());System.out.println("output#3 : " + strs.poll()); // FIFO,容器中的刪除System.out.println("output#4 : " + strs.size());System.out.println("output#5 : " + strs.peek()); // 取出,容器中的不刪System.out.println("output#6 : " + strs.size());System.out.println("output#7 : " + names);System.out.println("output#8 : " + names.pollLast()); // LIFOSystem.out.println("output#9 : " + names);System.out.println("output#10 : " + names.peekFirst()); // FIFOSystem.out.println("output#11 : " + names);}執行結果:
?五、LinkedBlockingQueue
使用LinkedBlockingQueue非常適合解決生產者-消費者模式的問題。
public class T05_LinkedBlockingQueue {static BlockingQueue<String> strs = new LinkedBlockingQueue<>();static Random rdm = new Random();public static void main(String[] args) {new Thread(() -> {for (int i = 0; i < 100; i++) {try {strs.put("a" + i);TimeUnit.MILLISECONDS.sleep(rdm.nextInt(1000));} catch (Exception e) {e.printStackTrace();}}}, "p1").start();for (int i = 0; i < 5; i++) {new Thread(() -> {for (;;) {try {System.out.println(Thread.currentThread().getName()+ " take " + strs.take());// 如果空了,就會等待} catch (Exception e) {e.printStackTrace();}}}, "c" + i).start();}} }Queue在高并發的情況下,通常可以使用兩種隊列:ConcurrentLinkedQueue 和 BlockingQueue(接口)。
BlockingQueue是阻塞式隊列,它是一個接口,可以根據有界或無界等實際需要考慮使用它的相關實現類,如ArrayBlockingQueue或上面代碼中用到的LinkedBlockingQueue。
六、ArrayBlockingQueue
ArrayBlockingQueue是一個有界隊列,在構造時可以指定元素個數,隊列會自行維護元素的個數:
public class T06_ArrayBlockingQueue {static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10);static Random rdm = new Random();public static void main(String[] args) throws InterruptedException {for (int i = 0; i < 10; i++) {strs.put("a" + i);}// strs.put("aaa");// 線程阻塞,知道有空閑位置 // strs.add("aaa"); // output:java.lang.IllegalStateException: Queue full // strs.offer("aaa"); // output:[a0, a1, a2, a3, a4, a5, a6, a7, a8, a9]strs.offer("aaa", 3, TimeUnit.SECONDS);// 阻塞,但可以指定超時時間System.out.println(strs);} }注意觀察此容器為我們提供的不同場景下可以選擇的添加元素的方式,執行結果見代碼注釋部分。
七、DelayQueue
DelayQueue也是一種無界隊列,容器中的每個元素會記錄一個倒計時,等待時間結束后才可以被消費者取出。可以用于執行定時任務。
public class T07_DelayQueue {static BlockingQueue<MyTask> tasks = new java.util.concurrent.DelayQueue<>();static class MyTask implements Delayed {long runningTime;MyTask(long rt) {runningTime = rt;}@Overridepublic int compareTo(Delayed o) {if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {return -1;} else if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {return 1;} else {return 0;}}@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}@Overridepublic String toString() {return String.valueOf(runningTime);}}public static void main(String[] args) throws InterruptedException {long now = System.currentTimeMillis();MyTask t1 = new MyTask(now + 1000);MyTask t2 = new MyTask(now + 2000);MyTask t3 = new MyTask(now + 1500);MyTask t4 = new MyTask(now + 2500);MyTask t5 = new MyTask(now + 500);tasks.put(t1);tasks.put(t2);tasks.put(t3);tasks.put(t4);tasks.put(t5);System.out.println(tasks);for (int i = 0; i < 5; i++) {System.out.println(tasks.take());}} }八、TransferQueue
TransferQueue提供了一種特殊的方法叫做transfer(), 這個方法在有消費者等待消費的時候,不會將元素放入隊列中,而是直接傳遞給消費者。因此這種方法非常適用于高并發的情況下。 但是,當沒有消費者的時候,那么transfer就會阻塞,而其他類似的方法如put(),add()都不會阻塞。在實時消息處理中用到的比較多, 例如Netty。
public class T08_TransferQueue {public static void main(String[] args) throws InterruptedException {LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();new Thread(() -> {try {System.out.println(strs.take());} catch (Exception e) {e.printStackTrace();}}).start();strs.transfer("aaa");new Thread(() -> {try {System.out.println(strs.take());} catch (Exception e) {e.printStackTrace();}}).start();} }九、SynchronusQueue
同步隊列 SynchronousQueue是一種特殊的TransferQueue。 它的容量為0,是沒有容量的隊列,在消費者正在等待的時候,必須使用put(實際上內部使用的是transfer),放入的任何元素 都必須直接交給消費者,而不能放入容器中。
public class T09_SynchronousQueue { // 容量為0public static void main(String[] args) throws InterruptedException {BlockingQueue<String> strs = new SynchronousQueue<>();new Thread(() -> {try {System.out.println(strs.take());} catch (Exception e) {e.printStackTrace();}}).start();// strs.put("aaa"); // 阻塞等待消費者消費strs.add("aaa");System.out.println(strs.size());}}總結
綜上,是對一些常用容器的介紹和案例展示,重點要理解它們的應用區別和使用場景,最起碼要有個大概印象,在遇到類似問題的時候考慮使用它們。
ConcurrentHashMap
CopyOnWriteList
Collections.synchronizedXx
ConcurrentLinkedQueue
BlockingQueue
? ? LinkedBQ
? ? ArrayBQ
? ? TransferQueue
? ? SynchronusQueue
DelayQueue執行定時任務
鳴謝
《馬士兵老師高并發編程系列--第三部分》
總結
以上是生活随笔為你收集整理的Java 多线程 —— 常用并发容器的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Exception和Error深入分析~
- 下一篇: Java常用设计模式————单例模式