高并发之并发容器,了解多少(从入门到超神)
點擊上方“好好學java”,選擇“置頂公眾號”
優秀學習資源、干貨第一時間送達!
?精彩內容?
java實戰練習項目教程
2018微服務資源springboot、springcloud、docker、dubbo實戰等傾心分享
2018年java架構師全套學習教程
最新大數據培訓完整視頻教程
2018年java最新全套培訓學習教程
一、ConcurrentHashMap
在上面已經提到過ConcurrentHashMap,ConcurrentHashMap相比Hashtable能夠進一步提高并發性,其原理圖如下:
HashMap,Hashtable與ConcurrentHashMap都是實現的哈希表數據結構,在隨機讀取的時候效率很高。Hashtable實現同步是利用synchronized關鍵字進行鎖定的,其是針對整張哈希表進行鎖定的,即每次鎖住整張表讓線程獨占,在線程安全的背后是巨大的浪費。ConcurrentHashMap和Hashtable主要區別就是圍繞著鎖的粒度進行區別以及如何區鎖定。
上圖中,左邊是Hashtable的實現方式,可以看到鎖住整個哈希表;而右邊則是ConcurrentHashMap的實現方式,單獨鎖住每一個桶(segment).ConcurrentHashMap將哈希表分為16個桶(默認值),諸如get(),put(),remove()等常用操作只鎖當前需要用到的桶,而size()才鎖定整張表。原來只能一個線程進入,現在卻能同時接受16個寫線程并發進入(寫線程需要鎖定,而讀線程幾乎不受限制),并發性的提升是顯而易見的。
而在迭代時,ConcurrentHashMap使用了不同于傳統集合的快速失敗迭代器(fast-fail iterator)的另一種迭代方式,稱為弱一致迭代器。在這種迭代方式中,當iterator被創建后集合再發生改變就不再是拋出ConcurrentModificationException,取而代之的是在改變時實例化出新的數據從而不影響原有的數據,iterator完成后再將頭指針替換為新的數據,這樣iterator線程可以使用原來老的數據,而寫線程也可以并發的完成改變,更重要的,這保證了多個線程并發執行的連續性和擴展性,是性能提升的關鍵。
我們在上面闡述了ConcurrentHashMap的使用特點和原理,分別在同樣的一個高并發場景下,測試不同的方式產生的延時(ms):
Map<String,?String>?map?=?new?ConcurrentHashMap<>();//483 Map<String,?String>?map?=?new?ConcurrentSkipListMap<>();?//高并發并且排序?559 Map<String,?String>?map?=?new?Hashtable<>();?//499? Map<String,?String>?map?=Collections.synchronizedMap(new?HashMap<>());?//?530 Map<String,?String>?map?=Collections.synchronizedMap(new?TreeMap());?//905以ConcurrentLinkedQueue為例,他實現了Queue接口,實例化方式如下:
Queue<String>?strs?=?new?ConcurrentLinkedQueue<>();添加元素的方法:offer()
取出隊頭的方法:poll()
判斷隊列長度:size()
對于雙端隊列,使用ConcurrentLinkedDeque類型來實現.
下面我們再看一個具體的實例:
public?class?T01_ConcurrentMap?{public?static?void?main(String[]?args)?{Map<String,?String>?map?=?new?ConcurrentHashMap<String,?String>();//Map<String,?String>?map?=?new?ConcurrentSkipListMap<String,?String>();?//高并發并且排序//Map<String,?String>?map?=?new?Hashtable<>();//Map<String,?String>?map?=?new?HashMap<String,?String>();Random?random?=?new?Random();Thread[]?threads?=?new?Thread[100];CountDownLatch?latch?=?new?CountDownLatch(threads.length);long?start?=?System.currentTimeMillis();for?(int?i?=?0;?i?<?threads.length;?i++)?{threads[i]?=?new?Thread(()->{for(int?j=0;?j<10000;j++)?map.put("a"?+?random.nextInt(100000),?"a"?+?random.nextInt(100000));latch.countDown();});}Arrays.asList(threads).forEach(t->t.start());try?{latch.await();}?catch?(InterruptedException?e)?{e.printStackTrace();}long?end?=?System.currentTimeMillis();System.out.println(end-start);} }啟動100個線程,向圖中添加100000個元素,分別使用Hashtable,HashMap,ConcurrentHashMap,ConcurrentSkipListMap定義map,判斷程序完成的時間。最終發現,ConcurrentHashMap要比HashMap效率高,ConcurrentHashMap是將大鎖分成若干小鎖,實現多個線程共同運行,所以,效率有很大差距。ConcurrentSkipListMap較ConcurrentHashMap除了實現高并發外還能夠排序。
參考:
http://blog.csdn.net/sunxianghuang/article/details/52221913 http://www.educity.cn/java/498061.html二、ConcurrentQueue
與ConcurrentHashMap相同,ConcurrentQueue也是通過同樣的方式來提高并發性能的。
我們在同步容器中提到過火車票問題:
有N張火車票,每張票都有一個編號,同時有10個窗口對外售票,寫一個模擬程序。
在上述問題中,也可以使用ConcurrentQueue進一步提高并發性:
static?Queue<String>?tickets?=?new?ConcurrentLinkedQueue<>();具體的代碼是這樣的:
public?class?TicketSeller4?{static?Queue<String>?tickets?=?new?ConcurrentLinkedQueue<>();static?{for(int?i=0;?i<1000;?i++)?tickets.add("票編號:"?+?i);}public?static?void?main(String[]?args)?{for(int?i=0;?i<10;?i++)?{new?Thread(()->{while(true)?{String?s?=?tickets.poll();if(s?==?null)?break;else?System.out.println("銷售了--"?+?s);}}).start();}} }這里面通過ConcurrentLinkedQueue的poll()方法來實現獲取容器成員的。用這個類型可以進一步提高并發性。
具體基本操作實例
public?class?T04_ConcurrentQueue?{public?static?void?main(String[]?args)?{Queue<String>?strings?=?new?ConcurrentLinkedQueue<String>();for?(int?i?=?0;?i?<?10;?i++)?{strings.offer("a"?+?i);?//相當于add,?放進隊列}System.out.println(strings);System.out.println(strings.size());System.out.println(strings.poll());?//取出并移除掉System.out.println(strings.size());System.out.println(strings.peek());?//取出,不會移除。相當于get(0)System.out.println(strings.size());} }三、CopyOnWriteArrayList
寫時復制容器,即copy-on-write,在多線程環境下,寫時效率低,讀時效率高,適合寫少讀多的環境。對比測試幾種情況:
List<String>?lists?=?new?ArrayList<>(); //這個會出并發問題!報錯:ArrayIndexOutOfBoundsException List<String>?lists?=?new?Vector();//111?ms List<String>?lists?=?new?CopyOnWriteArrayList<>();//5230?ms //測試核心代碼: Runnable?task?=?new?Runnable()?{ @Override public?void?run()?{for(int?i=0;?i<1000;?i++)?lists.add("a"?+?????r.nextInt(10000)); } }; //多線程向該容器中不斷加入數據。從JDK 5開始Java并發包里提供了兩個使用CopyOnWrite機制實現的并發容器,它們是CopyOnWriteArrayList和CopyOnWriteArraySet。
當我們往一個容器添加元素的時候,不直接往當前容器添加,而是先將當前容器進行Copy,復制出一個新的容器,然后向新的容器里添加元素,添加完元素之后,再將原容器的引用指向新的容器。這樣做的好處是我們可以對CopyOnWrite容器進行并發的讀,而不需要加鎖,因為在當前讀的容器中不會添加任何元素。所以CopyOnWrite容器是一種讀寫分離的思想,讀和寫對應不同的容器。
四、BlockingQueue
這種并發容器,會自動實現阻塞式的生產者/消費者模式。使用隊列解耦合,在實現異步事物的時候很有用。下面的例子,實現了阻塞隊列:
LinkedBlockingQueue static?BlockingQueue<String>?strs?=?new?LinkedBlockingQueue<>(10); strs.put("a"?+?i);?//加入隊列,如果滿了,就會等待 strs.take();?//取出隊列元素,如果空了,就會等待在實例化時,可以指定具體的隊列容量。
在加入成員的時候,除了使用put方法還可以使用其他方法:
五、ArrayBlockingQueue
static?BlockingQueue<String>?strs?=?new?ArrayBlockingQueue<>(10);對象的方法和上面的BlockingQueue是一樣的,用法也是一樣的。
二者的區別主要是:
LinkedBlockingQueue是一個單向鏈表實現的阻塞隊列,在鏈表一頭加入元素,如果隊列滿,就會阻塞,另一頭取出元素,如果隊列為空,就會阻塞。
LinkedBlockingQueue內部使用ReentrantLock實現插入鎖(putLock)和取出鎖(takeLock)。
相比于數組實現的ArrayBlockingQueue的有界情況,我們稱之為有界隊列,LinkedBlockingQueue可認為是無界隊列。當然,也可以向上面那樣指定隊列容量,但是這個參數常常是省略的,多用于任務隊列。
六、LinkedBlockingQueue實例
public?class?T05_LinkedBlockingQueue?{private?static?BlockingQueue<String>?strings?=?new?LinkedBlockingQueue<String>();private?static?Random?r?=?new?Random();public?static?void?main(String[]?args)?{new?Thread(()->{for?(int?i?=?0;?i?<?100;?i++)?{try?{strings.put("a"?+?i);?//如果滿了,就會等待TimeUnit.SECONDS.sleep(r.nextInt(10));}?catch?(Exception?e)?{e.printStackTrace();}}},?"p1").start();for?(int?i?=?0;?i?<?5;?i++)?{new?Thread(()->{for(;;){try?{System.out.println(Thread.currentThread().getName()?+?"take?-"?+?strings.take());?//如果空了,就會等待}?catch?(Exception?e)?{e.printStackTrace();}?}},"c"?+?i).start();}} }LinkedBlockingQueue是使用鏈表是實現的阻塞式容器。
七、DelayQueue
DelayQueue也是一個BlockingQueue,其特化的參數是Delayed。
Delayed擴展了Comparable接口,比較的基準為延時的時間值,Delayed接口的實現類getDelay()的返回值應為固定值(final).DelayQueue內部是使用PriorityQueue實現的,即:
可以說,DelayQueue是一個使用優先隊列(PriorityQueue)實現的BlockingQueue,優先隊列的比較基準值是時間。這是一個無界的BlockingQueue,用于放置實現了Delayed接口的對象,其中的對象只能在其到期時才能從隊列中取走。這種隊列是有序的,即隊頭對象的延遲到期時間最長。但是要注意的是,不能將null元素放置到這種隊列中。
Delayed,一種混合風格的接口,用來標記那些應該在給定延遲時間之后執行的對象。此接口的實現類必須重寫一個 compareTo() 方法,該方法提供與此接口的 getDelay()方法一致的排序。
DelayQueue存儲的對象是實現了Delayed接口的對象,在這個對象中,需要重寫compareTo()和getDelay()方法,例如:
因此,當我們在main()函數中,向該隊列加入元素后再取出元素的過程,就會存在延時,可以這樣驗證:
long?now?=?System.currentTimeMillis(); MyTask?t1?=?new?MyTask(now?+?1000); MyTask?t2?=?new?MyTask(now?+?2000); MyTask?t3?=?new?MyTask(now?+?1500); MyTask?t4?=?new?MyTask(now?+?2500); MyTask?t5?=?new?MyTask(now?+?500); tasks.put(t1); tasks.put(t2); tasks.put(t3); tasks.put(t4); tasks.put(t5); System.out.println(tasks); for(int?i=0;?i<5;?i++)?{ System.out.println(tasks.take()); }注意:為了方便查看到效果,可以重寫toString()函數,來保證打印出來的結果有意義:
例如:
DelayQueue可以用在諸如用監控線程來輪詢是否有超時任務出現,來處理某些具有等待時延的情況,這樣,可以避免由于數量巨大造成的輪詢效率差的問題。例如:
關閉空閑連接:服務器中,有很多客戶端的連接,空閑一段時間之后需要關閉他們。
緩存:緩存中的對象,超過了空閑時間,需要從緩存中移出。
任務超時處理:在網絡協議滑動窗口請求應答式交互時,處理超時未響應的請求。
實例:
public?class?T07_DelayQueue?{private?static?BlockingQueue<MyTask>?tasks?=?new?DelayQueue<>();private?static?Random?r?=?new?Random();static?class?MyTask?implements?Delayed{long?runningTime;public?MyTask(long?rt)?{this.runningTime?=?rt;}@Overridepublic?int?compareTo(Delayed?o)?{if?(this.getDelay(TimeUnit.MILLISECONDS)?<?o.getDelay(TimeUnit.MICROSECONDS))?{return?-1;}else?if?(this.getDelay(TimeUnit.MILLISECONDS)?>?o.getDelay(TimeUnit.MILLISECONDS))?{return?1;}else?{return?0;}}@Overridepublic?long?getDelay(TimeUnit?unit)?{return?unit.convert(runningTime?-?System.currentTimeMillis(),?TimeUnit.MILLISECONDS);}@Overridepublic?String?toString()?{return?""?+?runningTime;}public?static?void?main(String[]?args)?throws?InterruptedException?{long?now?=?System.currentTimeMillis();MyTask?t1?=?new?MyTask(now?+?1000);MyTask?t2?=?new?MyTask(now?+?2000);MyTask?t3?=?new?MyTask(now?+?1500);MyTask?t4?=?new?MyTask(now?+?2500);MyTask?t5?=?new?MyTask(now?+?500);tasks.put(t1);tasks.put(t2);tasks.put(t3);tasks.put(t4);tasks.put(t5);System.out.println(tasks);for?(int?i?=?0;?i?<?5;?i++)?{System.out.println(tasks.take());}}} }八、LinkedTransferQueue
TransferQueue是一個繼承了BlockingQueue的接口,并且增加若干新的方法。LinkedTransferQueue是TransferQueue接口的實現類,其定義為一個無界的隊列,具有先進先出(FIFO)的特性。
TransferQueue接口含有下面幾個重要方法:
transfer(E e)
若當前存在一個正在等待獲取的消費者線程,即立刻移交之;否則,會插入當前元素e到隊列尾部,并且等待進入阻塞狀態,到有消費者線程取走該元素。
tryTransfer(E e)
若當前存在一個正在等待獲取的消費者線程(使用take()或者poll()函數),使用該方法會即刻轉移/傳輸對象元素e;若不存在,則返回false,并且不進入隊列。這是一個不阻塞的操作。
tryTransfer(E e,long timeout,TimeUnit unit)
若當前存在一個正在等待獲取的消費者線程,會立即傳輸給它;否則將插入元素e到隊列尾部,并且等待被消費者線程獲取消費掉;若在指定的時間內元素e無法被消費者線程獲取,則返回false,同時該元素被移除。
hasWaitingConsumer()
判斷是否存在消費者線程。
getWaitingConsumerCount()
獲取所有等待獲取元素的消費線程數量。
size()
因為隊列的異步特性,檢測當前隊列的元素個數需要逐一迭代,無法保證原子性,可能會得到一個不太準確的結果,尤其是在遍歷時有可能隊列發生更改。
使用方法:
LinkedTransferQueue<String>?strs?=?new?LinkedTransferQueue<>();//實例化如果當前沒有消費者線程(存在take方法的線程):
strs.transfer("aaa");該方法會一直阻塞在這里,知道有消費者線程存在。
而如果使用傳統的put()方法來加入元素的話,則不會發生阻塞現象。
同樣,獲取隊列中元素的方法take()也是阻塞在這里等待獲取新的元素的。
九、SynchronousQueue
SynchronousQueue也是一種BlockingQueue,是一種無緩沖的等待隊列。所以,在某次添加元素后必須等待其他線程取走后才能繼續添加;可以認為SynchronousQueue是一個緩存值為0的阻塞隊列(也可以認為是1),它的isEmpty()方法永遠返回是true,remainingCapacity()方法永遠返回是0.
remove()和removeAll()方法永遠返回是false,iterator()方法永遠返回空,peek()方法永遠返回null.
在使用put()方法時,會一直阻塞在這里,等待被消費:
實例:
public?class?T09_Synchronized?{public?static?void?main(String[]?args)?throws?InterruptedException?{BlockingQueue<String>?strings?=?new?SynchronousQueue<String>();new?Thread(()->{try?{System.out.println(strings.take());}?catch?(Exception?e)?{e.printStackTrace();}}).start();strings.put("aaa");?//阻塞等待消費者消費//strings.add("aaa");System.out.println(strings.size());} }參考資料
https://blog.csdn.net/qq_34707744/article/details/79746622 https://blog.csdn.net/wang7807564/article/details/80048576推薦閱讀
1.?解密微信小程序登錄全過程(ssm實現)
2.?springmvc入門
3.?servlet就是這么簡單
4.?重溫javaweb過濾器filter
附上熱門QQ群,存放資源和歷史資料,2000容量(低門檻付費群),長按二維碼入群
總結
以上是生活随笔為你收集整理的高并发之并发容器,了解多少(从入门到超神)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 解密微信小程序Java登录流程(ssm实
- 下一篇: VS2015提示gets未定义