多线程与高并发(六):线程池可用的各种高并发容器详解:CopyOnWriteList,BlockingQueue等
容器
物理結(jié)構(gòu):數(shù)組、鏈表
邏輯結(jié)構(gòu):很多
Queue主要是為高并發(fā)準(zhǔn)備的。
Vector Hashtable
Vector Hashtable 自帶鎖,有很多設(shè)計(jì)上不完善的地方,現(xiàn)在基本上不用。
測試Hashtable的性能:用100的線程,先put進(jìn)去1000000個(gè)數(shù),再get 1000000個(gè)數(shù)
直接使用Hashmap,多線程時(shí)會出現(xiàn)問題
package com.mashibing.juc.c_023_02_FromHashtableToCHM;import java.util.HashMap; import java.util.UUID;public class T02_TestHashMap {static HashMap<UUID, UUID> m = new HashMap<>();static int count = Constants.COUNT;static UUID[] keys = new UUID[count];static UUID[] values = new UUID[count];static final int THREAD_COUNT = Constants.THREAD_COUNT;static {for (int i = 0; i < count; i++) {keys[i] = UUID.randomUUID();values[i] = UUID.randomUUID();}}static class MyThread extends Thread {int start;int gap = count/THREAD_COUNT;public MyThread(int start) {this.start = start;}@Overridepublic void run() {for(int i=start; i<start+gap; i++) {m.put(keys[i], values[i]);}}}public static void main(String[] args) {long start = System.currentTimeMillis();Thread[] threads = new Thread[THREAD_COUNT];for(int i=0; i<threads.length; i++) {threads[i] =new MyThread(i * (count/THREAD_COUNT));}for(Thread t : threads) {t.start();}for(Thread t : threads) {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}}long end = System.currentTimeMillis();System.out.println(end - start);System.out.println(m.size());} }使用SynchronizedHashMap,效率與直接使用Hashtable區(qū)別不是很大
package com.mashibing.juc.c_023_02_FromHashtableToCHM;import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID;public class T03_TestSynchronizedHashMap {static Map<UUID, UUID> m = Collections.synchronizedMap(new HashMap<UUID, UUID>());static int count = Constants.COUNT;static UUID[] keys = new UUID[count];static UUID[] values = new UUID[count];static final int THREAD_COUNT = Constants.THREAD_COUNT;static {for (int i = 0; i < count; i++) {keys[i] = UUID.randomUUID();values[i] = UUID.randomUUID();}}static class MyThread extends Thread {int start;int gap = count/THREAD_COUNT;public MyThread(int start) {this.start = start;}@Overridepublic void run() {for(int i=start; i<start+gap; i++) {m.put(keys[i], values[i]);}}}public static void main(String[] args) {long start = System.currentTimeMillis();Thread[] threads = new Thread[THREAD_COUNT];for(int i=0; i<threads.length; i++) {threads[i] =new MyThread(i * (count/THREAD_COUNT));}for(Thread t : threads) {t.start();}for(Thread t : threads) {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}}long end = System.currentTimeMillis();System.out.println(end - start);System.out.println(m.size());//-----------------------------------start = System.currentTimeMillis();for (int i = 0; i < threads.length; i++) {threads[i] = new Thread(()->{for (int j = 0; j < 10000000; j++) {m.get(keys[10]);}});}for(Thread t : threads) {t.start();}for(Thread t : threads) {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}}end = System.currentTimeMillis();System.out.println(end - start);} }使用ConcurrentHashMap,插入的效率與前面的Hashtable差不多,但是讀取的效率非常高。
package com.mashibing.juc.c_023_02_FromHashtableToCHM;import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap;public class T04_TestConcurrentHashMap {static Map<UUID, UUID> m = new ConcurrentHashMap<>();static int count = Constants.COUNT;static UUID[] keys = new UUID[count];static UUID[] values = new UUID[count];static final int THREAD_COUNT = Constants.THREAD_COUNT;static {for (int i = 0; i < count; i++) {keys[i] = UUID.randomUUID();values[i] = UUID.randomUUID();}}static class MyThread extends Thread {int start;int gap = count/THREAD_COUNT;public MyThread(int start) {this.start = start;}@Overridepublic void run() {for(int i=start; i<start+gap; i++) {m.put(keys[i], values[i]);}}}public static void main(String[] args) {long start = System.currentTimeMillis();Thread[] threads = new Thread[THREAD_COUNT];for(int i=0; i<threads.length; i++) {threads[i] =new MyThread(i * (count/THREAD_COUNT));}for(Thread t : threads) {t.start();}for(Thread t : threads) {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}}long end = System.currentTimeMillis();System.out.println(end - start);System.out.println(m.size());//-----------------------------------start = System.currentTimeMillis();for (int i = 0; i < threads.length; i++) {threads[i] = new Thread(()->{for (int j = 0; j < 10000000; j++) {m.get(keys[10]);}});}for(Thread t : threads) {t.start();}for(Thread t : threads) {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}}end = System.currentTimeMillis();System.out.println(end - start);} }最早的容器Vector是自帶鎖的,但是你整個(gè)操作調(diào)用了兩個(gè)原子方法的話,整體并不是原子的。你還需要在外面加sync
/*** 有N張火車票,每張票都有一個(gè)編號* 同時(shí)有10個(gè)窗口對外售票* 請寫一個(gè)模擬程序* <p>* 分析下面的程序可能會產(chǎn)生哪些問題?* 重復(fù)銷售?超量銷售?* <p>* 使用Vector或者Collections.synchronizedXXX* 分析一下,這樣能解決問題嗎?* <p>* 就算操作A和B都是同步的,但A和B組成的復(fù)合操作也未必是同步的,仍然需要自己進(jìn)行同步* 就像這個(gè)程序,判斷size和進(jìn)行remove必須是一整個(gè)的原子操作** @author 馬士兵*/ package com.mashibing.juc.c_024_FromVectorToQueue;import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeUnit;public class TicketSeller3 {static List<String> tickets = new LinkedList<>();static {for (int i = 0; i < 1000; i++) tickets.add("票 編號:" + i);}public static void main(String[] args) {for (int i = 0; i < 10; i++) {new Thread(() -> {while (true) {synchronized (tickets) {if (tickets.size() <= 0) break;try {TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("銷售了--" + tickets.remove(0));}}}).start();}} }使用Queue:ConcurrentLinkedQueue,里面很多方法是CAS實(shí)現(xiàn)的
/*** 有N張火車票,每張票都有一個(gè)編號* 同時(shí)有10個(gè)窗口對外售票* 請寫一個(gè)模擬程序* 分析下面的程序可能會產(chǎn)生哪些問題?* 重復(fù)銷售?超量銷售?* 使用Vector或者Collections.synchronizedXXX* 分析一下,這樣能解決問題嗎?* 就算操作A和B都是同步的,但A和B組成的復(fù)合操作也未必是同步的,仍然需要自己進(jìn)行同步* 就像這個(gè)程序,判斷size和進(jìn)行remove必須是一整個(gè)的原子操作* 使用ConcurrentQueue提高并發(fā)性*/ package com.mashibing.juc.c_024_FromVectorToQueue;import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue;public class TicketSeller4 {static Queue<String> tickets = new ConcurrentLinkedQueue<>();static {for (int i = 0; i < 1000; i++) tickets.add("票 編號:" + i);}public static void main(String[] args) {for (int i = 0; i < 10; i++) {new Thread(() -> {while (true) {String s = tickets.poll();if (s == null) break;else System.out.println("銷售了--" + s);}}).start();}} }多線程常用的容器
ConcurrentHashMap
里面用的是CAS操作,而CAS在Tree操作的時(shí)候太復(fù)雜了,所以不存在ConcurrentTreeMap,為了排序,換了跳表的結(jié)構(gòu)代替Tree結(jié)構(gòu)
跳表
- 底層是鏈表
- 拿出關(guān)鍵元素新開一層
- 在查找的時(shí)候,從上往下查
- CAS的實(shí)現(xiàn)難度比TreeMap容易很多
- 查找操作的時(shí)間復(fù)雜度比鏈表快很多
CopyOnWriteArrayList
本質(zhì)上和ReadWrite是一個(gè)思路
寫時(shí)復(fù)制,適用于讀線程多,寫線程少的情況。(讀的時(shí)候不加鎖),寫的時(shí)候copy一個(gè)新的,寫完之后把舊的指針指向新的。
寫的效率比較低,因?yàn)槭菙?shù)組,每次寫的時(shí)候都要復(fù)制。
add操作的源碼如下:
示例
代碼T05-T09:BlockingQueue講解
Queue和List的區(qū)別是什么?
- 提供了很多在多線程訪問下比較友好的API
- offer,peek,pool
BlockingQueue的特點(diǎn)是什么?
BlockingQueue的優(yōu)勢在于,增加了更多API,比如put,take
或者阻塞,或者指定時(shí)間等待
實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者模型,也是多線程里面最重要的一個(gè)模型,也是MQ的基礎(chǔ)——MQ的本質(zhì),就是一個(gè)大型的生產(chǎn)者、消費(fèi)者模型
LinkedBlockingQueue
LinkedBlockingQueue,是用鏈表實(shí)現(xiàn)的BlockingQueue
阻塞使用await實(shí)現(xiàn)的,底層應(yīng)該是park
Queue常用接口
LinkedBlockingQueue示例
DelayQueue
是BlockingQueue的一種,是一種阻塞的隊(duì)列。
需要實(shí)現(xiàn)compareTo方法
需要指定等待時(shí)間
用來按時(shí)間進(jìn)行任務(wù)調(diào)度
PriorityQueue
內(nèi)部進(jìn)行了排序,底層是一個(gè)二叉樹(小頂堆)的結(jié)構(gòu)
package com.mashibing.juc.c_025;import java.util.PriorityQueue;public class T07_01_PriorityQueque {public static void main(String[] args) {PriorityQueue<String> q = new PriorityQueue<>();q.add("c");q.add("e");q.add("a");q.add("d");q.add("z");for (int i = 0; i < 5; i++) {System.out.println(q.poll());}} } 輸出: a c d e zSynchronousQueue
容量為0,不能往里裝東西,只有有一個(gè)線程等著的時(shí)候,才能把東西遞到這個(gè)線程手里,是用來一個(gè)線程給另外一個(gè)線程傳數(shù)據(jù)的。
本質(zhì)和Exchanger比較相似,也是需要兩個(gè)線程同步對接,否則都會阻塞著。
在線程池里面,線程之間進(jìn)行任務(wù)調(diào)度的時(shí)候,經(jīng)常會用到。
package com.mashibing.juc.c_025;import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue;public class T08_SynchronusQueue { //容量為0public static void main(String[] args) throws InterruptedException {BlockingQueue<String> strs = new SynchronousQueue<>();new Thread(() -> {try {System.out.println(strs.take());} catch (InterruptedException e) {e.printStackTrace();}}).start();strs.put("aaa"); //阻塞等待消費(fèi)者消費(fèi)//strs.put("bbb");//strs.add("aaa");System.out.println(strs.size());} }TransferQueue
裝完,阻塞等著,有線程把它取走,再離開
要先開啟消費(fèi)者線程,再往里面transfer,要不然就阻塞了~
場景1:要求某件任務(wù)有一個(gè)結(jié)果(比如一個(gè)訂單等付款完成之后,確認(rèn)有線程去處理它了,再給客戶反饋)
場景2:確認(rèn)收錢完成之后,才能把商品取走,比如面對面付款
示例:
package com.mashibing.juc.c_025;import java.util.concurrent.LinkedTransferQueue;public class T09_TransferQueue {public static void main(String[] args) throws InterruptedException {LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();new Thread(() -> {try {System.out.println(strs.take());} catch (InterruptedException e) {e.printStackTrace();}}).start();strs.transfer("aaa");//strs.put("aaa");/*new Thread(() -> {try {System.out.println(strs.take());} catch (InterruptedException e) {e.printStackTrace();}}).start();*/} }經(jīng)典的交替打印面試題可以用 TransferQueue 實(shí)現(xiàn)
package com.mashibing.juc.c_026_00_interview.A1B2C3;import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.TransferQueue;public class T13_TransferQueue {public static void main(String[] args) {char[] aI = "1234567".toCharArray();char[] aC = "ABCDEFG".toCharArray();TransferQueue<Character> queue = new LinkedTransferQueue<Character>();new Thread(() -> {try {for (char c : aI) {System.out.print(queue.take());queue.transfer(c);}} catch (InterruptedException e) {e.printStackTrace();}}, "t1").start();new Thread(() -> {try {for (char c : aC) {queue.transfer(c);System.out.print(queue.take());}} catch (InterruptedException e) {e.printStackTrace();}}, "t2").start();} }下節(jié)課預(yù)習(xí)
- Callable
- Future, Completable Future
總結(jié)
以上是生活随笔為你收集整理的多线程与高并发(六):线程池可用的各种高并发容器详解:CopyOnWriteList,BlockingQueue等的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JVM从入门到精通(五): Java运行
- 下一篇: JVM从入门到精通(六):JVM调优必备