并发编程实践之公平有界阻塞队列实现
簡介:?JUC 工具包是 JAVA 并發(fā)編程的利器。本文講述在沒有 JUC 工具包幫助下,借助原生的 JAVA 同步原語, 如何實現(xiàn)一個公平有界的阻塞隊列。希望你也能在文后體會到并發(fā)編程的復(fù)雜之處,以及 JUC 工具包的強。
作者 | 李新然
來源 | 阿里技術(shù)公眾號
一 背景
JUC 工具包是 JAVA 并發(fā)編程的利器。
本文講述在沒有 JUC 工具包幫助下,借助原生的 JAVA 同步原語, 如何實現(xiàn)一個公平有界的阻塞隊列。
希望你也能在文后體會到并發(fā)編程的復(fù)雜之處,以及 JUC 工具包的強大。
二 方法
本文使用到的基本工具:
基于以上基礎(chǔ)工具,實現(xiàn)公平有界的阻塞隊列,此處:
三 實現(xiàn)
1 基礎(chǔ)版本
首先,考慮在非并發(fā)場景下,借助 ADT 實現(xiàn)一個基礎(chǔ)版本
interface Queue {boolean offer(Object obj);Object poll();} class FairnessBoundedBlockingQueue implements Queue {// 當(dāng)前大小protected int size;// 容量protected final int capacity;// 頭指針,empty: head.next == tail == nullprotected Node head;// 尾指針protected Node tail;public FairnessBoundedBlockingQueue(int capacity) {this.capacity = capacity;this.head = new Node(null);this.tail = head;this.size = 0;}// 如果隊列已滿,通過返回值標(biāo)識public boolean offer(Object obj) {if (size < capacity) {Node node = new Node(obj);tail.next = node;tail = node;++size;return true;}return false;}// 如果隊列為空,head.next == null;返回空元素public Object poll() {if (head.next != null) {Object result = head.next.value;head.next.value = null;head = head.next; // 丟棄頭結(jié)點--size;return result;}return null;}class Node {Object value;Node next;Node(Object obj) {this.value = obj;next = null;}} }以上
需要注意的一點:在出隊時,本文通過遷移頭結(jié)點的方式實現(xiàn),避免修改尾結(jié)點。
在下文實現(xiàn)并發(fā)版本時,會看到此處的用意。
2 并發(fā)版本
如果在并發(fā)場景下,上述的實現(xiàn)面臨一些問題,同時未實現(xiàn)給定的一些需求。
通過添加 synchronized ,保證并發(fā)條件下的線程安全問題。
注意此處做同步的原因是為了保證類的不變式。
并發(fā)問題
在并發(fā)場景下,基礎(chǔ)版本的實現(xiàn)面臨的問題包括:原子性,可見性和指令重排的問題。
參考 JMM 的相關(guān)描述。
并發(fā)問題,最簡單的解決方法是:通過 synchronized 加鎖,一次性解決問題。
// 省略接口定義 class BoundedBlockingQueue implements Queue {// 當(dāng)前大小protected int size;// 容量protected final int capacity;// 頭指針,empty: head.next == tail == nullprotected Node head;// 尾指針protected Node tail;public BoundedBlockingQueue(int capacity) {this.capacity = capacity;this.head = new Node(null);this.tail = head;this.size = 0;}// 如果隊列已滿,通過返回值標(biāo)識public synchronized boolean offer(Object obj) {if (size < capacity) {Node node = new Node(obj);tail.next = node;tail = node;++size;return true;}return false;}// 如果隊列為空,head.next == null;返回空元素public synchronized Object poll() {if (head.next != null) {Object result = head.next.value;head.next.value = null;head = head.next; // 丟棄頭結(jié)點--size;return result;}return null;}// 省略 Node 的定義 }以上,簡單粗暴的加 synchronized 可以解決問題,但會引入新的問題:系統(tǒng)活性問題(此問題下文會解決)。
同時,簡單加 synchronized 同步是無法實現(xiàn)阻塞等待;即
實現(xiàn)阻塞等待,需要借助 JAVA 中的 PV 原語:wait, notify, notifyAll 。
參考:JDK 中對 wait, notify, notifyAll 的相關(guān)描述。
衛(wèi)式方法
阻塞等待,可以通過簡單的衛(wèi)式方法來實現(xiàn),此問題本質(zhì)上可以抽象為:
此種抽象 Ada 在語言層面上實現(xiàn)。在 JAVA 中,借助 wait, notify, notifyAll 可以翻譯為:
// 當(dāng)前線程 synchronized Object action(Object arg) {while(!condition) {wait();}// 前置條件,不變式checkPreCondition();doAction();// 后置條件,不變式checkPostCondition(); }// 其他線程 synchronized Object notifyAction(Object arg) {notifyAll(); }需要注意:
據(jù)此,不難實現(xiàn)簡單的阻塞版本的有界隊列,如下
interface Queue {boolean offer(Object obj) throws InterruptedException;Object poll() throws InterruptedException;} class FairnessBoundedBlockingQueue implements Queue {// 當(dāng)前大小protected int size;// 容量protected final int capacity;// 頭指針,empty: head.next == tail == nullprotected Node head;// 尾指針protected Node tail;public FairnessBoundedBlockingQueue(int capacity) {this.capacity = capacity;this.head = new Node(null);this.tail = head;this.size = 0;}// 如果隊列已滿,通過返回值標(biāo)識public synchronized boolean offer(Object obj) throws InterruptedException {while (size < capacity) {wait();}Node node = new Node(obj);tail.next = node;tail = node;++size;notifyAll(); // 可以出隊return true;}// 如果隊列為空,阻塞等待public synchronized Object poll() throws InterruptedException {while (head.next == null) {wait();}Object result = head.next.value;head.next.value = null;head = head.next; // 丟棄頭結(jié)點--size;notifyAll(); // 可以入隊return result;}// 省略 Node 的定義 }以上,實現(xiàn)了阻塞等待,但也引入了更大的性能問題
需要注意的點:
3 鎖拆分優(yōu)化
以上第一個問題,可以通過鎖拆分來解決,即:定義兩把鎖,讀鎖和寫鎖;讀寫分離。
// 省略接口定義 class FairnessBoundedBlockingQueue implements Queue {// 容量protected final int capacity;// 頭指針,empty: head.next == tail == nullprotected Node head;// 尾指針protected Node tail;// guard: canPollCount, headprotected final Object pollLock = new Object();protected int canPollCount;// guard: canOfferCount, tailprotected final Object offerLock = new Object();protected int canOfferCount;public FairnessBoundedBlockingQueue(int capacity) {this.capacity = capacity;this.canPollCount = 0;this.canOfferCount = capacity;this.head = new Node(null);this.tail = head;}// 如果隊列已滿,通過返回值標(biāo)識public boolean offer(Object obj) throws InterruptedException {synchronized(offerLock) {while(canOfferCount <= 0) {offerLock.wait();}Node node = new Node(obj);tail.next = node;tail = node;canOfferCount--;}synchronized(pollLock) {++canPollCount;pollLock.notifyAll();}return true;}// 如果隊列為空,阻塞等待public Object poll() throws InterruptedException {Object result = null;synchronized(pollLock) {while(canPollCount <= 0) {pollLock.wait();}result = head.next.value;head.next.value = null;head = head.next;canPollCount--;}synchronized(offerLock) {canOfferCount++;offerLock.notifyAll();}return result;}// 省略 Node 定義 }以上
以上實現(xiàn)
但上述實現(xiàn)還有未解決的問題:
當(dāng)有多個入隊線程等待時,一次出隊的動作會觸發(fā)所有入隊線程競爭,大量的線程上下文切換,最終只有一個線程能執(zhí)行。
即,還有 讀與讀 和 寫與寫 之間的競爭問題。
4 狀態(tài)追蹤解除競爭
此處可以通過狀態(tài)追蹤,解除讀與讀之間和寫與寫之間的競爭問題
class FairnessBoundedBlockingQueue implements Queue {// 容量protected final int capacity;// 頭指針,empty: head.next == tail == nullprotected Node head;// 尾指針protected Node tail;// guard: canPollCount, headprotected final Object pollLock = new Object();protected int canPollCount;protected int waitPollCount;// guard: canOfferCount, tailprotected final Object offerLock = new Object();protected int canOfferCount;protected int waitOfferCount;public FairnessBoundedBlockingQueue(int capacity) {this.capacity = capacity;this.canPollCount = 0;this.canOfferCount = capacity;this.waitPollCount = 0;this.waitOfferCount = 0;this.head = new Node(null);this.tail = head;}// 如果隊列已滿,通過返回值標(biāo)識public boolean offer(Object obj) throws InterruptedException {synchronized(offerLock) {while(canOfferCount <= 0) {waitOfferCount++;offerLock.wait();waitOfferCount--;}Node node = new Node(obj);tail.next = node;tail = node;canOfferCount--;}synchronized(pollLock) {++canPollCount;if (waitPollCount > 0) {pollLock.notify();}}return true;}// 如果隊列為空,阻塞等待public Object poll() throws InterruptedException {Object result;synchronized(pollLock) {while(canPollCount <= 0) {waitPollCount++;pollLock.wait();waitPollCount--;}result = head.next.value;head.next.value = null;head = head.next;canPollCount--;}synchronized(offerLock) {canOfferCount++;if (waitOfferCount > 0) {offerLock.notify();}}return result;}// 省略 Node 的定義 }以上
但,上述的實現(xiàn)在某些場景下會運行失敗,面臨活性問題,考慮
情況一:
情況二:
以上為解除阻塞消息丟失的例子,問題的根源在與異常處理。
5 解決異常問題
解決線程中斷退出的問題,線程校驗中斷狀態(tài)的場景
以上
通過狀態(tài)變量追蹤可以解決讀與讀之間和寫與寫之間的鎖競爭問題。
以下考慮如果解決讀與讀之間和寫與寫之間的公平性問題。
6 解決公平性
公平性的問題的解決需要將狀態(tài)變量的追蹤轉(zhuǎn)換為:請求監(jiān)視器追蹤。
以上邏輯可以統(tǒng)一抽象為
boolean needToWait; synchronized(this) {needToWait = calculateNeedToWait();if (needToWait) {enqueue(monitor); // 請求對應(yīng)的monitor} } if (needToWait) {monitor.doWait(); }需要注意
基于以上的邏輯抽象,實現(xiàn)公平隊列
// 省略接口定義 class FairnessBoundedBlockingQueue implements Queue {// 容量protected final int capacity;// 頭指針,empty: head.next == tail == nullprotected Node head;// 尾指針protected Node tail;// guard: canPollCount, head, pollQueueprotected final Object pollLock = new Object();protected int canPollCount;// guard: canOfferCount, tail, offerQueueprotected final Object offerLock = new Object();protected int canOfferCount;protected final WaitQueue pollQueue = new WaitQueue();protected final WaitQueue offerQueue = new WaitQueue();public FairnessBoundedBlockingQueue(int capacity) {this.capacity = capacity;this.canOfferCount = capacity;this.canPollCount = 0;this.head = new Node(null);this.tail = head;}// 如果隊列已滿,通過返回值標(biāo)識public boolean offer(Object obj) throws InterruptedException {if (Thread.interrupted()) {throw new InterruptedException(); // 線程已中斷,直接退出即可,防止中斷線程競爭鎖}WaitNode wait = null;synchronized(offerLock) {// 在有阻塞請求或者隊列為空時,阻塞等待if (canOfferCount <= 0 || !offerQueue.isEmpty()) {wait = new WaitNode();offerQueue.enq(wait);} else {// continue.}}try {if (wait != null) {wait.doWait();}if (Thread.interrupted()) {throw new InterruptedException();}} catch (InterruptedException e) {offerQueue.doNotify();throw e;}// 確保此時線程狀態(tài)正常,以下不會校驗中斷synchronized(offerLock) {Node node = new Node(obj);tail.next = node;tail = node;canOfferCount--;}synchronized(pollLock) {++canPollCount;pollQueue.doNotify();}return true;}// 如果隊列為空,阻塞等待public Object poll() throws InterruptedException {if (Thread.interrupted()) {throw new InterruptedException();}Object result = null;WaitNode wait = null;synchronized(pollLock) {// 在有阻塞請求或者隊列為空時,阻塞等待if (canPollCount <= 0 || !pollQueue.isEmpty()) {wait = new WaitNode();pollQueue.enq(wait);} else {// ignore}}try {if (wait != null) {wait.doWait();}if (Thread.interrupted()) {throw new InterruptedException();}} catch (InterruptedException e) {// 傳遞消息pollQueue.doNotify();throw e;}// 以下不會檢測線程中斷狀態(tài)synchronized(pollLock) {result = head.next.value;head.next.value = 0;// ignore head;head = head.next;canPollCount--;}synchronized(offerLock) {canOfferCount++;offerQueue.doNotify();}return result;}class WaitQueue {WaitNode head;WaitNode tail;WaitQueue() {head = new WaitNode();tail = head;}synchronized void doNotify() {for(;;) {WaitNode node = deq();if (node == null) {break;} else if (node.doNotify()) {// 此處確保NOTIFY成功break;} else {// ignore, and retry.}}}synchronized boolean isEmpty() {return head.next == null;}synchronized void enq(WaitNode node) {tail.next = node;tail = tail.next;}synchronized WaitNode deq() {if (head.next == null) {return null;}WaitNode res = head.next;head = head.next;if (head.next == null) {tail = head; // 為空,遷移tail節(jié)點}return res;}}class WaitNode {boolean released;WaitNode next;WaitNode() {released = false;next = null;}synchronized void doWait() throws InterruptedException {try {while (!released) {wait();} } catch (InterruptedException e) {if (!released) {released = true;throw e;} else {// 如果是NOTIFY之后收到中斷的信號,不能拋出異常;需要做RELAY處理Thread.currentThread().interrupt();}}}synchronized boolean doNotify() {if (!released) {released = true;notify();// 明確釋放了一個線程,返回truereturn true;} else {// 沒有釋放新的線程,返回falsereturn false;}}}// 省略 Node 的定義 }以上
當(dāng)信號釋放時,借助讀寫鎖同步更新隊列;最后同樣借助讀寫鎖,觸發(fā)隊列更新消息;
7 等待時間的問題
并發(fā)場景下,等待通常會設(shè)置為限時等待 TIMED_WAITING ,避免死鎖或損失系統(tǒng)活性;
實現(xiàn)同步隊列的限時等待,并沒想象的那么困難
class TimeoutException extends InterruptedException {}class WaitNode {boolean released;WaitNode next;WaitNode() {released = false;next = null;}synchronized void doWait(long milliSeconds) throws InterruptedException {try {long startTime = System.currentTimeMillis();long toWait = milliSeconds;for (;;) {wait(toWait);if (released) {return;}long now = System.currentTimeMillis();toWait = toWait - (now - startTime);if (toWait <= 0) {throw new TimeoutException();}}} catch (InterruptedException e) {if (!released) {released = true;throw e;} else {// 如果已經(jīng)釋放信號量,此處不拋出異常;但恢復(fù)中斷狀態(tài)Thread.currentThread().interrupt();}}}synchronized boolean doNotify() {if (!released) {released = true;notify();return true;} else {return false;}}由于所有的等待都阻塞在 WaitNode 監(jiān)視器,以上
- 首先定義超時異常,此處只是為了方便異常處理,繼承 InterruptedException;
- 此處依賴于 wait(long timeout) 的超時等待實現(xiàn),這通常不是問題;
最后,將 WaitNode 超時等待的邏輯,帶入到 FairnessBoundedBlockingQueue 實現(xiàn)中,即可。
四 總結(jié)
本文通過一步步迭代,最終借助 JAVA 同步原語實現(xiàn)初版的公平有界隊列。迭代實現(xiàn)過程中可以看到以下幾點:
最后,相信大家再看 JUC 的工具包實現(xiàn),定有不一樣的體會。
原文鏈接
本文為阿里云原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
以上是生活随笔為你收集整理的并发编程实践之公平有界阻塞队列实现的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Apache Flink 在京东的实践与
- 下一篇: IplImage* cvmat* mat