让人抓头的Java并发(四) 阻塞队列--CPU飙升排查案例
在上一篇文章中給大家介紹了牛批的AQS,大致講解了JUC中同步的思路。本來還沒想好這一篇應該寫點什么,剛好上周某個同事的代碼出現問題,排查后發現是使用阻塞隊列不當導致的,所以本篇決定介紹下阻塞隊列。
真實案例分析
錯誤案例:
說來也是挺巧的,那天一位同事iMac換了Macbook Pro。然后像往常一樣啟動了各個服務,過了會電腦風扇瘋狂工作發出響聲,由于平常iMac上IDEA項目開的比較多占用較多內存時間長了也會卡頓,所以他并沒有在意。但是之后一直是這樣我們便覺得很奇怪,然后打開了他的活動監視器,發現某個Java進程竟然占用了百分之九十的CPU,然后確認是哪一個項目,最后通過jstack查看該項目中的線程情況,定位到了某個自定義線程,然后查看代碼發現如下:
MyThreadPool.exportEnclosurePool.execute(() -> {while (true) {BlockingQueue<EnclosureRequest> blockingQueue = requestQueue.getBlockingQueue();while (!blockingQueue.isEmpty()) {System.out.println("開始消費");EnclosureRequest one = null;try {one = blockingQueue.take();ossService.exportEnclosureToLocalServer(one.getEnclosureList(), one.getSobId(), one.getUserUuid(), one.getUserName(), one.getTmpFileName(), one.getZipUuidList());} catch (Exception e) {e.printStackTrace();}}} }復制代碼該同事的需求是做一個隊列化附件導出的功能,因此他選擇了生產者消費者模式,采用阻塞隊列來實現;但是由于對此不太熟悉,所以寫出了這段有問題的代碼,導致死循環;萬幸的是這段代碼在測試分支上被我們發現了并沒有上正式。正確的消費者代碼實現如下:
正確實現:
MyThreadPool.exportEnclosurePool.execute(() -> {BlockingQueue<EnclosureRequest> blockingQueue = requestQueue.getBlockingQueue();while (true) {try {EnclosureRequest one = blockingQueue.take();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("開始消費");ossService.exportEnclosureToLocalServer(one.getEnclosureList(), one.getSobId(), one.getUserUuid(), one.getUserName(), one.getTmpFileName(), one.getZipUuidList());} } 復制代碼阻塞隊列簡介
阻塞隊列是一個插入和移除方法支持附加操作的隊列;- 支持阻塞的插入方法:當阻塞隊列滿時,隊列會阻塞插入元素的線程,直到隊列不為滿。
- 支持阻塞的移除方法:當阻塞隊列為空時,獲取隊列元素的線程會被阻塞直到隊列不為空。
四種處理方式:
| 拋出異常 | add(e) | remove() |
| 返回boolean值 | offer(e) | poll() |
| 阻塞 | put(e) | take() |
| 超時退出 | offer(e,time,unit) | poll(time,unit) |
?小提示: 如果是無界阻塞隊列,隊列不可能出現滿的情況,所以使用put()方法永遠不會被阻塞,使用offer()方法永遠返回true
Java中的阻塞列隊介紹
- ArrayBlockingQueue:基于數組的有界阻塞隊列,支持配置公平性策略。
- LinkedBlockingQueue:基于鏈表的無界(默認Integer.MAX_VALUE)阻塞隊列,Executors中newFixedThreadPool()和newSingleThreadExecutor()使用的工作隊列,所以不推薦使用Executors。
- LinkedBlockingDeque:基于鏈表的無界(默認Integer.MAX_VALUE)雙向阻塞隊列
- LinkedTransferQueue:基于鏈表的無界阻塞隊列,該隊列提供transfer(e)方法,如果有消費者正在等待則直接把元素給消費者,否者將元素放在隊列的tail節點并阻塞到該元素被消費。
- PriorityBlockingQueue:支持優先級排序的無界阻塞隊列,默認情況下采用自然順序升序排序,也可以通過類重寫compareTo()方法來指定元素排序規則,或者初始化隊列時指定構造參數Comparator來排序。
- DelayQueue:使用PriorityQueue實現的無界延時阻塞隊列。
- SynchronousQueue:不存儲元素的阻塞隊列,每一個put操作必須阻塞到一個take操作發生,否則不能繼續添加元素。支持配置公平性策略。
阻塞隊列(LinkedBlockingQueue)實現原理分析
LinkedBlockingQueue是一個由成員變量Node組成的單鏈表結構,默認容量為Integer的最大值,其內部還有兩把ReentrantLock鎖putLock、takeLock用于保證插入和刪除的線程安全(其他阻塞隊列中使用一個ReentrantLock鎖),兩個Condition等待隊列notEmpty、notFull用于存放take()和put()阻塞的線程。這里我簡單分析下它兩個比較重要的方法put()和take()。
源碼分析
/*** 由Node節點組成單鏈表結構*/ static class Node<E> {E item;Node<E> next;Node(E x) { item = x; } } /** 用于移除操作的鎖 */ private final ReentrantLock takeLock = new ReentrantLock();/** 阻塞于take的等待隊列 */ private final Condition notEmpty = takeLock.newCondition();/** 用于插入操作的鎖 */ private final ReentrantLock putLock = new ReentrantLock();/** 阻塞于put的等待隊列 */ private final Condition notFull = putLock.newCondition();/*** 不指定容量默認是Integer的最大值*/ public LinkedBlockingQueue() {this(Integer.MAX_VALUE); }/*** 阻塞式插入元素(隊列為滿則阻塞)*/ public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;// 獲取插入鎖(響應中斷)putLock.lockInterruptibly();try {// 如果當前隊列長度到達容量上限則當前線程釋放鎖加入不為滿等待隊列中while (count.get() == capacity) {notFull.await();}// 將元素加入隊尾enqueue(node);// 當前隊列長度加一(返回值是加一之前)c = count.getAndIncrement();// 如果加入后隊列長度小于容量上限則通知不為滿等待隊列中的線程if (c + 1 < capacity)notFull.signal();} finally {// 釋放鎖putLock.unlock();}// 如果在插入元素之前隊列為空則通知不為空等待隊列中的線程if (c == 0)signalNotEmpty(); } /*** 阻塞式移除元素(隊列為空則阻塞)*/ public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;// 獲取移除鎖(響應中斷)takeLock.lockInterruptibly();try {// 如果當前隊列為空則當前線程釋放鎖加入不為空等待隊列while (count.get() == 0) {notEmpty.await();}// 移除隊頭元素x = dequeue();c = count.getAndDecrement();// 如果移除之后還有元素則通知不為空等待隊列中的線程if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}// 如果移除元素之前到達容量上線則通知不為滿等待隊列中的線程if (c == capacity)signalNotFull();return x;}復制代碼圖解分析
需要注意的是put()操作將元素加入隊列后釋放鎖是在判斷容量是否小于上限通知notFull等待隊列之后,通知notEmpty隊列之前需要先獲取takeLock,take()操作同理。
?小提示: LinkedBlockingQueue的put()和take()方法中和其他阻塞隊列有個很大的區別。其他阻塞隊列每次put()和take()都會去通知相應的等待隊列,但是LinkedBlockingQueue只有在put前是空的去通知notEmpty,take前是滿的去通知notFull等待隊列,并且put后未滿去通知notFull等待隊列,take后未空去通知notEmpty等待隊列。關于這點我個人的理解是由于LinkedBlockingQueue里分讀寫鎖,如果每次take都通知notFull的話,需要另外去獲取putLock產生競爭;用已經獲取putLock的線程去喚醒notFull等待隊列中線程減少了鎖的競爭。其他阻塞隊列中只有一把鎖,所以通知不需要另外競爭鎖。當然這只是我個人的看法而已,希望有了解的小伙伴指教。
總結
阻塞隊列在并發中很重要,前面介紹的線程池中就用到了阻塞隊列,生產者消費者模型也是可以用阻塞隊列實現,到此已經介紹了AQS、阻塞隊列、線程池,希望你們能關聯起來理解加深印象。
往期文章:
- 讓人抓頭的Java并發(三) 強大的AQS!
- 讓人抓頭的Java并發(二) 線程池ThreadPoolExecutor分析
- 讓人抓頭的Java并發(一) 輕松認識多線程
歡迎同樣有感興趣的小伙伴一起探討
轉載于:https://juejin.im/post/5d2801066fb9a07ed524cbab
總結
以上是生活随笔為你收集整理的让人抓头的Java并发(四) 阻塞队列--CPU飙升排查案例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 手动设计简单的Token验证
- 下一篇: Linux下关机、重启