日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

并发编程实践之公平有界阻塞队列实现

發(fā)布時間:2024/8/23 编程问答 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 并发编程实践之公平有界阻塞队列实现 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

簡介:?JUC 工具包是 JAVA 并發(fā)編程的利器。本文講述在沒有 JUC 工具包幫助下,借助原生的 JAVA 同步原語, 如何實現(xiàn)一個公平有界的阻塞隊列。希望你也能在文后體會到并發(fā)編程的復(fù)雜之處,以及 JUC 工具包的強。

作者 | 李新然
來源 | 阿里技術(shù)公眾號

一 背景

JUC 工具包是 JAVA 并發(fā)編程的利器。

本文講述在沒有 JUC 工具包幫助下,借助原生的 JAVA 同步原語, 如何實現(xiàn)一個公平有界的阻塞隊列。

希望你也能在文后體會到并發(fā)編程的復(fù)雜之處,以及 JUC 工具包的強大。

二 方法

本文使用到的基本工具:

  • 同步監(jiān)聽器 synchronized ,方法基本和代碼塊級別;
  • Object 基礎(chǔ)類的 wait, notify, notifyAll;
  • 基于以上基礎(chǔ)工具,實現(xiàn)公平有界的阻塞隊列,此處:

  • 將公平的定義限定為 FIFO ,也就是先阻塞等待的請求,先解除等待;
  • 并不保證解除等待后執(zhí)行 Action 的先后順序;
  • 確保隊列的大小始終不超過設(shè)定的容量;但阻塞等待的請求數(shù)不做限制;
  • 三 實現(xiàn)

    1 基礎(chǔ)版本

    首先,考慮在非并發(fā)場景下,借助 ADT 實現(xiàn)一個基礎(chǔ)版本

    interface Queue {boolean offer(Object obj);Object poll();} class FairnessBoundedBlockingQueue implements Queue {// 當(dāng)前大小protected int size;// 容量protected final int capacity;// 頭指針,empty: head.next == tail == nullprotected Node head;// 尾指針protected Node tail;public FairnessBoundedBlockingQueue(int capacity) {this.capacity = capacity;this.head = new Node(null);this.tail = head;this.size = 0;}// 如果隊列已滿,通過返回值標(biāo)識public boolean offer(Object obj) {if (size < capacity) {Node node = new Node(obj);tail.next = node;tail = node;++size;return true;}return false;}// 如果隊列為空,head.next == null;返回空元素public Object poll() {if (head.next != null) {Object result = head.next.value;head.next.value = null;head = head.next; // 丟棄頭結(jié)點--size;return result;}return null;}class Node {Object value;Node next;Node(Object obj) {this.value = obj;next = null;}} }

    以上

  • 定義支持隊列的兩個基礎(chǔ)接口, poll 和 offer;
  • 隊列的實現(xiàn),采用經(jīng)典實現(xiàn);
  • 考慮在隊列空的情況下, poll 返回為空,非阻塞;
  • 隊列在滿的情況下, offer 返回 false ,入隊不成功,無異常;
  • 需要注意的一點:在出隊時,本文通過遷移頭結(jié)點的方式實現(xiàn),避免修改尾結(jié)點。
    在下文實現(xiàn)并發(fā)版本時,會看到此處的用意。

    2 并發(fā)版本

    如果在并發(fā)場景下,上述的實現(xiàn)面臨一些問題,同時未實現(xiàn)給定的一些需求。

    通過添加 synchronized ,保證并發(fā)條件下的線程安全問題。

    注意此處做同步的原因是為了保證類的不變式。

    并發(fā)問題

    在并發(fā)場景下,基礎(chǔ)版本的實現(xiàn)面臨的問題包括:原子性,可見性和指令重排的問題。

    參考 JMM 的相關(guān)描述。

    并發(fā)問題,最簡單的解決方法是:通過 synchronized 加鎖,一次性解決問題。

    // 省略接口定義 class BoundedBlockingQueue implements Queue {// 當(dāng)前大小protected int size;// 容量protected final int capacity;// 頭指針,empty: head.next == tail == nullprotected Node head;// 尾指針protected Node tail;public BoundedBlockingQueue(int capacity) {this.capacity = capacity;this.head = new Node(null);this.tail = head;this.size = 0;}// 如果隊列已滿,通過返回值標(biāo)識public synchronized boolean offer(Object obj) {if (size < capacity) {Node node = new Node(obj);tail.next = node;tail = node;++size;return true;}return false;}// 如果隊列為空,head.next == null;返回空元素public synchronized Object poll() {if (head.next != null) {Object result = head.next.value;head.next.value = null;head = head.next; // 丟棄頭結(jié)點--size;return result;}return null;}// 省略 Node 的定義 }

    以上,簡單粗暴的加 synchronized 可以解決問題,但會引入新的問題:系統(tǒng)活性問題(此問題下文會解決)。

    同時,簡單加 synchronized 同步是無法實現(xiàn)阻塞等待;即

  • 如果隊列為空,那么出隊的動作還是會立即返回,返回為空;
  • 如果隊列已滿,那么入隊動作還是會立即返回,返回操作不成功;
  • 實現(xiàn)阻塞等待,需要借助 JAVA 中的 PV 原語:wait, notify, notifyAll 。

    參考:JDK 中對 wait, notify, notifyAll 的相關(guān)描述。

    衛(wèi)式方法

    阻塞等待,可以通過簡單的衛(wèi)式方法來實現(xiàn),此問題本質(zhì)上可以抽象為:

  • 任何一個方法都需要在滿足一定條件下才可以執(zhí)行;
  • 執(zhí)行方法前需要首先校驗不變式,然后執(zhí)行變更;
  • 在執(zhí)行完成后,校驗是否滿足后驗不變式;
  • WHEN(condition) Object action(Object arg) {checkPreCondition();doAction(arg);checkPostCondition(); }

    此種抽象 Ada 在語言層面上實現(xiàn)。在 JAVA 中,借助 wait, notify, notifyAll 可以翻譯為:

    // 當(dāng)前線程 synchronized Object action(Object arg) {while(!condition) {wait();}// 前置條件,不變式checkPreCondition();doAction();// 后置條件,不變式checkPostCondition(); }// 其他線程 synchronized Object notifyAction(Object arg) {notifyAll(); }

    需要注意:

  • 通常會采用 notifyAll 發(fā)送通知,而非 notify ;因為如果當(dāng)前線程收到 notify 通知后被中斷,那么系統(tǒng)將一直等待下去。
  • 如果使用了 notifyAll 那么衛(wèi)式語句必須放在 while 循環(huán)中;因為線程喚醒后,執(zhí)行條件已經(jīng)不滿足,雖然當(dāng)前線程持有互斥鎖。
  • 衛(wèi)式條件的所有變量,有任何變更都需要發(fā)送 notifyAll 不然面臨系統(tǒng)活性問題
  • 據(jù)此,不難實現(xiàn)簡單的阻塞版本的有界隊列,如下

    interface Queue {boolean offer(Object obj) throws InterruptedException;Object poll() throws InterruptedException;} class FairnessBoundedBlockingQueue implements Queue {// 當(dāng)前大小protected int size;// 容量protected final int capacity;// 頭指針,empty: head.next == tail == nullprotected Node head;// 尾指針protected Node tail;public FairnessBoundedBlockingQueue(int capacity) {this.capacity = capacity;this.head = new Node(null);this.tail = head;this.size = 0;}// 如果隊列已滿,通過返回值標(biāo)識public synchronized boolean offer(Object obj) throws InterruptedException {while (size < capacity) {wait();}Node node = new Node(obj);tail.next = node;tail = node;++size;notifyAll(); // 可以出隊return true;}// 如果隊列為空,阻塞等待public synchronized Object poll() throws InterruptedException {while (head.next == null) {wait();}Object result = head.next.value;head.next.value = null;head = head.next; // 丟棄頭結(jié)點--size;notifyAll(); // 可以入隊return result;}// 省略 Node 的定義 }

    以上,實現(xiàn)了阻塞等待,但也引入了更大的性能問題

  • 入隊和出隊動作阻塞等待同一把鎖,惡性競爭;
  • 當(dāng)隊列變更時,所有阻塞線程被喚醒,大量的線程上下文切換,競爭同步鎖,最終可能只有一個線程能執(zhí)行;
  • 需要注意的點:

  • 阻塞等待 wait 會拋出中斷異常。關(guān)于異常的問題下文會處理;
  • 接口需要支持拋出中斷異常;
  • 隊里變更需要 notifyAll 避免線程中斷或異常,丟失消息;
  • 3 鎖拆分優(yōu)化

    以上第一個問題,可以通過鎖拆分來解決,即:定義兩把鎖,讀鎖和寫鎖;讀寫分離。

    // 省略接口定義 class FairnessBoundedBlockingQueue implements Queue {// 容量protected final int capacity;// 頭指針,empty: head.next == tail == nullprotected Node head;// 尾指針protected Node tail;// guard: canPollCount, headprotected final Object pollLock = new Object();protected int canPollCount;// guard: canOfferCount, tailprotected final Object offerLock = new Object();protected int canOfferCount;public FairnessBoundedBlockingQueue(int capacity) {this.capacity = capacity;this.canPollCount = 0;this.canOfferCount = capacity;this.head = new Node(null);this.tail = head;}// 如果隊列已滿,通過返回值標(biāo)識public boolean offer(Object obj) throws InterruptedException {synchronized(offerLock) {while(canOfferCount <= 0) {offerLock.wait();}Node node = new Node(obj);tail.next = node;tail = node;canOfferCount--;}synchronized(pollLock) {++canPollCount;pollLock.notifyAll();}return true;}// 如果隊列為空,阻塞等待public Object poll() throws InterruptedException {Object result = null;synchronized(pollLock) {while(canPollCount <= 0) {pollLock.wait();}result = head.next.value;head.next.value = null;head = head.next;canPollCount--;}synchronized(offerLock) {canOfferCount++;offerLock.notifyAll();}return result;}// 省略 Node 定義 }

    以上

  • 定義了兩把鎖, pollLock 和 offerLock 拆分出隊和入隊競爭;
  • 入隊鎖同步的變量為:callOfferCount 和 tail;
  • 出隊鎖同步的變量為:canPollCount 和 head;
  • 出隊的動作:首先拿到 pollLock 衛(wèi)式等待后,完成出隊動作;然后拿到 offerLock 發(fā)送通知,解除入隊的等待線程。
  • 入隊的動作:首先拿到 offerLock 衛(wèi)式等待后,完成入隊的動作;然后拿到 pollLock 發(fā)送通知,解除出隊的等待線程。
  • 以上實現(xiàn)

  • 確保通過入隊鎖和出隊鎖,分別保證入隊和出隊的原子性;
  • 出隊動作,通過特別的實現(xiàn),確保出隊只會變更 head ,避免獲取 offerLock;
  • 通過 offerLock.notifyAll 和 pollLock.notifyAll 解決讀寫競爭的問題;
  • 但上述實現(xiàn)還有未解決的問題:

    當(dāng)有多個入隊線程等待時,一次出隊的動作會觸發(fā)所有入隊線程競爭,大量的線程上下文切換,最終只有一個線程能執(zhí)行。

    即,還有 讀與讀 和 寫與寫 之間的競爭問題。

    4 狀態(tài)追蹤解除競爭

    此處可以通過狀態(tài)追蹤,解除讀與讀之間和寫與寫之間的競爭問題

    class FairnessBoundedBlockingQueue implements Queue {// 容量protected final int capacity;// 頭指針,empty: head.next == tail == nullprotected Node head;// 尾指針protected Node tail;// guard: canPollCount, headprotected final Object pollLock = new Object();protected int canPollCount;protected int waitPollCount;// guard: canOfferCount, tailprotected final Object offerLock = new Object();protected int canOfferCount;protected int waitOfferCount;public FairnessBoundedBlockingQueue(int capacity) {this.capacity = capacity;this.canPollCount = 0;this.canOfferCount = capacity;this.waitPollCount = 0;this.waitOfferCount = 0;this.head = new Node(null);this.tail = head;}// 如果隊列已滿,通過返回值標(biāo)識public boolean offer(Object obj) throws InterruptedException {synchronized(offerLock) {while(canOfferCount <= 0) {waitOfferCount++;offerLock.wait();waitOfferCount--;}Node node = new Node(obj);tail.next = node;tail = node;canOfferCount--;}synchronized(pollLock) {++canPollCount;if (waitPollCount > 0) {pollLock.notify();}}return true;}// 如果隊列為空,阻塞等待public Object poll() throws InterruptedException {Object result;synchronized(pollLock) {while(canPollCount <= 0) {waitPollCount++;pollLock.wait();waitPollCount--;}result = head.next.value;head.next.value = null;head = head.next;canPollCount--;}synchronized(offerLock) {canOfferCount++;if (waitOfferCount > 0) {offerLock.notify();}}return result;}// 省略 Node 的定義 }

    以上

  • 通過 waitOfferCount 和 waitPollCount 的狀態(tài)追蹤解決 讀寫內(nèi)部的競爭問題;
  • 當(dāng)隊列變更時,根據(jù)追蹤的狀態(tài),決定是否派發(fā)消息,觸發(fā)線程阻塞狀態(tài)解除;
  • 但,上述的實現(xiàn)在某些場景下會運行失敗,面臨活性問題,考慮

    情況一:

  • 初始狀態(tài)隊列為空 線程 A 執(zhí)行出隊動作,被阻塞在 pollLock , 此時 waitPollCount==1;
  • 此時線程 A 在執(zhí)行 wait 時被中斷,拋出異常, waitPollCount==1 并未被重置;
  • 阻塞隊列為空,但 waitPollCount==1 類狀態(tài)異常;
  • 情況二:

  • 初始狀態(tài)隊列為空 線程 A B 執(zhí)行出隊動作,被阻塞在 pollLock , 此時 waitPollCount==2;
  • 線程 C 執(zhí)行入隊動作,可以立即執(zhí)行,執(zhí)行完成后,觸發(fā) pollLock 解除一個線程等待 notify;
  • 觸發(fā)的線程在 JVM 實現(xiàn)中是隨機的,假設(shè)線程 A 被解除阻塞;
  • 假設(shè)線程 A 在阻塞過程中已被中斷,阻塞解除后 JVM 檢查 interrupted 狀態(tài),拋出 InterruptedException 異常;
  • 此時隊列中有一個元素,但線程 A 仍阻塞在 pollLock 中,且一直阻塞下去;
  • 以上為解除阻塞消息丟失的例子,問題的根源在與異常處理。

    5 解決異常問題

    解決線程中斷退出的問題,線程校驗中斷狀態(tài)的場景

  • JVM 通常只會在有限的幾個場景檢測線程的中斷狀態(tài), wait, Thread.join, Thread.sleep;
  • JVM 在檢測到線程中斷狀態(tài) Thread.interrupted() 后,會清除中斷標(biāo)志,拋出 InterruptedException;
  • 通常為了保證線程對中斷及時響應(yīng), run 方法中需要自主檢測中斷標(biāo)志,中斷線程,特別是對中斷比較敏感需要保持類的不變式的場景;
  • class FairnessBoundedBlockingQueue implements Queue {// 容量protected final int capacity;// 頭指針,empty: head.next == tail == nullprotected Node head;// 尾指針protected Node tail;// guard: canPollCount, head, waitPollCountprotected final Object pollLock = new Object();protected int canPollCount;protected int waitPollCount;// guard: canOfferCount, tail, waitOfferCountprotected final Object offerLock = new Object();protected int canOfferCount;protected int waitOfferCount;public FairnessBoundedBlockingQueue(int capacity) {this.capacity = capacity;this.canPollCount = 0;this.canOfferCount = capacity;this.waitPollCount = 0;this.waitOfferCount = 0;this.head = new Node(null);this.tail = head;}// 如果隊列已滿,通過返回值標(biāo)識public boolean offer(Object obj) throws InterruptedException {if (Thread.interrupted()) {throw new InterruptedException(); // 線程已中斷,直接退出即可,防止中斷線程競爭鎖}synchronized(offerLock) {while(canOfferCount <= 0) {waitOfferCount++;try {offerLock.wait();} catch (InterruptedException e) {// 觸發(fā)其他線程offerLock.notify();throw e;} finally {waitOfferCount--;}}Node node = new Node(obj);tail.next = node;tail = node;canOfferCount--;}synchronized(pollLock) {++canPollCount;if (waitPollCount > 0) {pollLock.notify();}}return true;}// 如果隊列為空,阻塞等待public Object poll() throws InterruptedException {if (Thread.interrupted()) {throw new InterruptedException();}Object result = null;synchronized(pollLock) {while(canPollCount <= 0) {waitPollCount++;try {pollLock.wait();} catch (InterruptedException e) {pollLock.notify();throw e;} finally {waitPollCount--;}}result = head.next.value;head.next.value = 0;// ignore head;head = head.next;canPollCount--;}synchronized(offerLock) {canOfferCount++;if (waitOfferCount > 0) {offerLock.notify();}}return result;}// 省略 Node 的定義 }

    以上

  • 當(dāng)?shù)却€程中斷退出時,捕獲中斷異常,通過 pollLock.notify 和 offerLock.notify 轉(zhuǎn)發(fā)消息;
  • 通過在 finally 中恢復(fù)狀態(tài)追蹤變量;
  • 通過狀態(tài)變量追蹤可以解決讀與讀之間和寫與寫之間的鎖競爭問題。

    以下考慮如果解決讀與讀之間和寫與寫之間的公平性問題。

    6 解決公平性

    公平性的問題的解決需要將狀態(tài)變量的追蹤轉(zhuǎn)換為:請求監(jiān)視器追蹤。

  • 每個請求對應(yīng)一個監(jiān)視器;
  • 通過內(nèi)部維護一個 FIFO 隊列,實現(xiàn)公平性;
  • 在隊列狀態(tài)變更時,釋放隊列中的監(jiān)視器;
  • 以上邏輯可以統(tǒng)一抽象為

    boolean needToWait; synchronized(this) {needToWait = calculateNeedToWait();if (needToWait) {enqueue(monitor); // 請求對應(yīng)的monitor} } if (needToWait) {monitor.doWait(); }

    需要注意

  • monitor.doWait() 需要在 this 的衛(wèi)式語句之外,因為如果在內(nèi)部, monitor.doWait 并不會釋放 this鎖;
  • calculateNeedToWait() 需要在 this 的守衛(wèi)之內(nèi)完成,避免同步問題;
  • 需要考慮中斷異常的問題;
  • 基于以上的邏輯抽象,實現(xiàn)公平隊列

    // 省略接口定義 class FairnessBoundedBlockingQueue implements Queue {// 容量protected final int capacity;// 頭指針,empty: head.next == tail == nullprotected Node head;// 尾指針protected Node tail;// guard: canPollCount, head, pollQueueprotected final Object pollLock = new Object();protected int canPollCount;// guard: canOfferCount, tail, offerQueueprotected final Object offerLock = new Object();protected int canOfferCount;protected final WaitQueue pollQueue = new WaitQueue();protected final WaitQueue offerQueue = new WaitQueue();public FairnessBoundedBlockingQueue(int capacity) {this.capacity = capacity;this.canOfferCount = capacity;this.canPollCount = 0;this.head = new Node(null);this.tail = head;}// 如果隊列已滿,通過返回值標(biāo)識public boolean offer(Object obj) throws InterruptedException {if (Thread.interrupted()) {throw new InterruptedException(); // 線程已中斷,直接退出即可,防止中斷線程競爭鎖}WaitNode wait = null;synchronized(offerLock) {// 在有阻塞請求或者隊列為空時,阻塞等待if (canOfferCount <= 0 || !offerQueue.isEmpty()) {wait = new WaitNode();offerQueue.enq(wait);} else {// continue.}}try {if (wait != null) {wait.doWait();}if (Thread.interrupted()) {throw new InterruptedException();}} catch (InterruptedException e) {offerQueue.doNotify();throw e;}// 確保此時線程狀態(tài)正常,以下不會校驗中斷synchronized(offerLock) {Node node = new Node(obj);tail.next = node;tail = node;canOfferCount--;}synchronized(pollLock) {++canPollCount;pollQueue.doNotify();}return true;}// 如果隊列為空,阻塞等待public Object poll() throws InterruptedException {if (Thread.interrupted()) {throw new InterruptedException();}Object result = null;WaitNode wait = null;synchronized(pollLock) {// 在有阻塞請求或者隊列為空時,阻塞等待if (canPollCount <= 0 || !pollQueue.isEmpty()) {wait = new WaitNode();pollQueue.enq(wait);} else {// ignore}}try {if (wait != null) {wait.doWait();}if (Thread.interrupted()) {throw new InterruptedException();}} catch (InterruptedException e) {// 傳遞消息pollQueue.doNotify();throw e;}// 以下不會檢測線程中斷狀態(tài)synchronized(pollLock) {result = head.next.value;head.next.value = 0;// ignore head;head = head.next;canPollCount--;}synchronized(offerLock) {canOfferCount++;offerQueue.doNotify();}return result;}class WaitQueue {WaitNode head;WaitNode tail;WaitQueue() {head = new WaitNode();tail = head;}synchronized void doNotify() {for(;;) {WaitNode node = deq();if (node == null) {break;} else if (node.doNotify()) {// 此處確保NOTIFY成功break;} else {// ignore, and retry.}}}synchronized boolean isEmpty() {return head.next == null;}synchronized void enq(WaitNode node) {tail.next = node;tail = tail.next;}synchronized WaitNode deq() {if (head.next == null) {return null;}WaitNode res = head.next;head = head.next;if (head.next == null) {tail = head; // 為空,遷移tail節(jié)點}return res;}}class WaitNode {boolean released;WaitNode next;WaitNode() {released = false;next = null;}synchronized void doWait() throws InterruptedException {try {while (!released) {wait();} } catch (InterruptedException e) {if (!released) {released = true;throw e;} else {// 如果是NOTIFY之后收到中斷的信號,不能拋出異常;需要做RELAY處理Thread.currentThread().interrupt();}}}synchronized boolean doNotify() {if (!released) {released = true;notify();// 明確釋放了一個線程,返回truereturn true;} else {// 沒有釋放新的線程,返回falsereturn false;}}}// 省略 Node 的定義 }

    以上

  • 核心是替換狀態(tài)追蹤變量為同步節(jié)點, WaitNode;
  • WaitNode 通過簡單的同步隊列組織實現(xiàn) FIFO 協(xié)議,每個線程等待各自的 WaitNode 監(jiān)視器;
  • WaitNode 內(nèi)部維持 released 狀態(tài),標(biāo)識線程阻塞狀態(tài)是否被釋放,主要是為了處理中斷的問題;
  • WaitQueue 本身是全同步的,由于已解決了讀寫競爭已經(jīng)讀寫內(nèi)部競爭的問題, WaitQueue 同步并不會造成問題;
  • WaitQueue 是無界隊列,是一個潛在的問題;但由于其只做同步的追蹤,而且追蹤的通常是線程,通常并不是問題;
  • 最終的公平有界隊列實現(xiàn),無論是入隊還是出隊,首先衛(wèi)式語句判定是否需要入隊等待,如果入隊等待,通過公平性協(xié)議等待;
  • 當(dāng)信號釋放時,借助讀寫鎖同步更新隊列;最后同樣借助讀寫鎖,觸發(fā)隊列更新消息;

    7 等待時間的問題

    并發(fā)場景下,等待通常會設(shè)置為限時等待 TIMED_WAITING ,避免死鎖或損失系統(tǒng)活性;

    實現(xiàn)同步隊列的限時等待,并沒想象的那么困難

    class TimeoutException extends InterruptedException {}class WaitNode {boolean released;WaitNode next;WaitNode() {released = false;next = null;}synchronized void doWait(long milliSeconds) throws InterruptedException {try {long startTime = System.currentTimeMillis();long toWait = milliSeconds;for (;;) {wait(toWait);if (released) {return;}long now = System.currentTimeMillis();toWait = toWait - (now - startTime);if (toWait <= 0) {throw new TimeoutException();}}} catch (InterruptedException e) {if (!released) {released = true;throw e;} else {// 如果已經(jīng)釋放信號量,此處不拋出異常;但恢復(fù)中斷狀態(tài)Thread.currentThread().interrupt();}}}synchronized boolean doNotify() {if (!released) {released = true;notify();return true;} else {return false;}}

    由于所有的等待都阻塞在 WaitNode 監(jiān)視器,以上

    • 首先定義超時異常,此處只是為了方便異常處理,繼承 InterruptedException;
    • 此處依賴于 wait(long timeout) 的超時等待實現(xiàn),這通常不是問題;

    最后,將 WaitNode 超時等待的邏輯,帶入到 FairnessBoundedBlockingQueue 實現(xiàn)中,即可。

    四 總結(jié)

    本文通過一步步迭代,最終借助 JAVA 同步原語實現(xiàn)初版的公平有界隊列。迭代實現(xiàn)過程中可以看到以下幾點:

  • 觀念的轉(zhuǎn)變,將調(diào)用一個類的方法思維轉(zhuǎn)換為:在滿足一定條件下方法才可以調(diào)用,在調(diào)用前需要滿足不變式,調(diào)用后滿足不變式;由于并發(fā)的問題很難測試,通常要采用衛(wèi)式表達證明并發(fā)的正確性;
  • 在迭代實現(xiàn)中會看到很多模式,比如,讀寫分離時,其實可以抽象為讀鎖和寫鎖;就得到了一個抽象的 Lock 的定義;比如,讀寫狀態(tài)追蹤,可以采用 Exchanger 抽象表達;
  • 另外,本文的實現(xiàn)遠(yuǎn)非完善,還需要考慮支持 Iterator 遍歷、狀態(tài)查詢及數(shù)據(jù)遷移等操作;
  • 最后,相信大家再看 JUC 的工具包實現(xiàn),定有不一樣的體會。

    原文鏈接
    本文為阿里云原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。

    總結(jié)

    以上是生活随笔為你收集整理的并发编程实践之公平有界阻塞队列实现的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。