日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Java Review - 并发编程_ConcurrentLinkedQueue原理源码剖析

發布時間:2025/3/21 java 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java Review - 并发编程_ConcurrentLinkedQueue原理源码剖析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

  • 概述
  • ConcurrentLinkedQueue
  • 核心方法&源碼解讀
    • offer
    • add
    • poll
    • peek
    • size
    • remove
    • contains
  • 總結


概述

JDK中提供了一系列場景的并發安全隊列。總的來說,按照實現方式的不同可分為阻塞隊列和非阻塞隊列,

  • 阻塞隊列使用鎖實現
  • 非阻塞隊列則使用CAS非阻塞算法實現


ConcurrentLinkedQueue

ConcurrentLinkedQueue是線程安全的無界非阻塞隊列,其底層數據結構使用單向鏈表實現,對于入隊和出隊操作使用CAS來實現線程安全。

【類圖】

ConcurrentLinkedQueue內部的隊列使用單向鏈表方式實現,

其中有兩個volatile類型的Node節點分別用來存放隊列的首、尾節點。

從下面的無參構造函數可知,默認頭、尾節點都是指向item為null的哨兵節點。 新元素會被插入隊列末尾,出隊時從隊列頭部獲取一個元素。

在Node節點內部則維護一個使用volatile修飾的變量item,用來存放節點的值;next用來存放鏈表的下一個節點,從而鏈接為一個單向無界鏈表。其內部則使用UNSafe工具類提供的CAS算法來保證出入隊時操作鏈表的原子性。


核心方法&源碼解讀

下面我們介紹ConcurrentLinkedQueue的幾個主要方法的實現原理。

offer

在鏈表末尾添加一個元素

/*** Inserts the specified element at the tail of this queue.* As the queue is unbounded, this method will never return {@code false}.** @return {@code true} (as specified by {@link Queue#offer})* @throws NullPointerException if the specified element is null*/ public boolean offer(E e) {//1 e為null則拋出空指針異常checkNotNull(e);//2 構造Node節點構造函數內部調用unsafe.putObject,后面統一講final Node<E> newNode = new Node<E>(e);//3 從尾節點插入for (Node<E> t = tail, p = t;;) {Node<E> q = p.next;// 4 如果q=null說明p是尾節點則插入if (q == null) {// 5 使用cas設置p節點的next節點 if (p.casNext(null, newNode)) {// 6 cas成功說明新增節點已經被放入鏈表,然后設置當前尾節點(包含head,1,3,5.。。個節點為尾節點)if (p != t) // hop two nodes at a timecasTail(t, newNode); // Failure is OK.return true;}// Lost CAS race to another thread; re-read next}else if (p == q)// 7 //多線程操作時候,由于poll時候會把老的head變為自引用,然后head的next變為新head,所以這里需要//重新找新的head,因為新的head后面的節點才是激活的節點p = (t != (t = tail)) ? t : head;else// 8 尋找尾節點 p = (p != t && t != (t = tail)) ? t : q;} }
  • 首先看當一個線程調用offer(item)時的情況。首先代碼(1)對傳參進行空檢查,如果為null則拋出NPE異常,否則執行代碼(2)并使用item作為構造函數參數創建一個新的節點,然后代碼(3)從隊列尾部節點開始循環,打算從隊列尾部添加元素,當執行到代碼(4)時隊列狀態如下所示。

這時候節點p、t、head、tail同時指向了item為null的哨兵節點,由于哨兵節點的next節點為null,所以這里q也指向null。

  • q==null則執行代碼(5),通過CAS原子操作判斷p節點的next節點是否為null,如果為null則使用節點newNode替換p的next節點,然后執行代碼(6),這里由于p==t所以沒有設置尾部節點,然后退出offer方法,這時候隊列的狀態如下圖所示

(2)上面是一個線程調用offer方法的情況,如果多個線程同時調用,就會存在多個線程同時執行到代碼(5)的情況。假設線程A調用offer(item1),線程B調用offer(item2),同時執行到代碼(5)p.casNext(null, newNode)。

由于CAS的比較設置操作是原子性的,所以這里假設線程A先執行了比較設置操作,發現當前p的next節點確實是null,則會原子性地更新next節點為 item1,這時候線程B也會判斷p的next節點是否為null,結果發現不是null(因為線程A已經設置了p的next節點為 item1),則會跳到代碼(3),然后執行到代碼(4),這時候的隊列分布如下圖所示。

根據上面的狀態圖可知線程B接下來會執行代碼(8),然后把q賦給了p,這時候隊列狀態如下圖所示。

然后線程B再次跳轉到代碼(3)執行,當執行到代碼(4)時隊列狀態如下圖所示


由于這時候q==null,所以線程B會執行代碼(5),通過CAS操作判斷當前p的next節點是否是null,不是則再次循環嘗試,是則使用item2替換。假設CAS成功了,那么執行代碼(6),由于p!=t,所以設置tail節點為item2,然后退出offer方法。這時候隊列分布如下圖所示。

分析到現在,就差代碼(7)還沒走過,其實這一步要在執行poll操作后才會執行。這里先來看一下執行poll操作后可能會存在的一種情況,如下圖所示。

下面分析當隊列處于這種狀態時調用offer添加元素,執行到代碼(4)時的狀態圖,如下


這里由于q節點不為空并且pq所以執行代碼(7),由于ttail所以p被賦值為head,然后重新循環,循環后執行到代碼(4),這時候隊列狀態如下圖所示。

這時候由于q==null,所以執行代碼(5)進行CAS操作,如果當前沒有其他線程執行offer操作,則CAS操作會成功,p的next節點被設置為新增節點。然后執行代碼(6),由于p!=t所以設置新節點為隊列的尾部節點,現在隊列狀態如圖

需要注意的是,這里自引用的節點會被垃圾回收掉。

可見,offer操作中的關鍵步驟是代碼(5),通過原子CAS操作來控制某時只有一個線程可以追加元素到隊列末尾。進行CAS競爭失敗的線程會通過循環一次次嘗試進行CAS操作,直到CAS成功才會返回,也就是通過使用無限循環不斷進行CAS嘗試方式來替代阻塞算法掛起調用線程。相比阻塞算法,這是使用CPU資源換取阻塞所帶來的開銷。


add

add操作是在鏈表末尾添加一個元素,其實在內部調用的還是offer操作’\

/*** Inserts the specified element at the tail of this queue.* As the queue is unbounded, this method will never throw* {@link IllegalStateException} or return {@code false}.** @return {@code true} (as specified by {@link Collection#add})* @throws NullPointerException if the specified element is null*/public boolean add(E e) {return offer(e);}


poll

poll操作是在隊列頭部獲取并移除一個元素,如果隊列為空則返回null。

public E poll() {// 1. goto標記restartFromHead:// 2 無限循環for (;;) {for (Node<E> h = head, p = h, q;;) {// 3 保存當前節點E item = p.item;// 4 當前item有值,則CAS變為nullif (item != null && p.casItem(item, null)) {// Successful CAS is the linearization point// for item to be removed from this queue.// 5 cas成功則標記當前節點并從鏈表中移除if (p != h) // hop two nodes at a timeupdateHead(h, ((q = p.next) != null) ? q : p);return item;}// 6 當前隊列為空則返回nullelse if ((q = p.next) == null) {updateHead(h, p);return null;}// 7 如果當前節點被自己引用,則重新查找新的隊列頭節點else if (p == q)continue restartFromHead;else // 8 p = q;}}}

poll方法在移除一個元素時,只是簡單地使用CAS操作把當前節點的item值設置為null,然后通過重新設置頭節點將該元素從隊列里面移除,被移除的節點就成了孤立節點,這個節點會在垃圾回收時被回收掉。另外,如果在執行分支中發現頭節點被修改了,要跳到外層循環重新獲取新的頭節點。


peek

peek操作是獲取隊列頭部一個元素(只獲取不移除),如果隊列為空則返回null

public E peek() {// 1 restartFromHead:for (;;) {// 2 for (Node<E> h = head, p = h, q;;) {E item = p.item;// 3 if (item != null || (q = p.next) == null) {updateHead(h, p);return item;}// 4 else if (p == q)continue restartFromHead;else// 5 p = q;}}}

Peek操作的代碼結構與poll操作類似,不同之處在于代碼(3)中少了castItem操作。

其實這很正常,因為peek只是獲取隊列頭元素值,并不清空其值。根據前面的介紹我們知道第一次執行offer后head指向的是哨兵節點(也就是item為null的節點),那么第一次執行peek時在代碼(3)中會發現item==null,然后執行q=p.next,這時候q節點指向的才是隊列里面第一個真正的元素,或者如果隊列為null則q指向null。

總結:peek操作的代碼與poll操作類似,只是前者只獲取隊列頭元素但是并不從隊列里將它刪除,而后者獲取后需要從隊列里面將它刪除。

另外,在第一次調用peek操作時,會刪除哨兵節點,并讓隊列的head節點指向隊列里面第一個元素或者null。


size

計算當前隊列元素個數,在并發環境下不是很有用,因為CAS沒有加鎖,所以從調用size函數到返回結果期間有可能增刪元素,導致統計的元素個數不精確

/*** Returns the number of elements in this queue. If this queue* contains more than {@code Integer.MAX_VALUE} elements, returns* {@code Integer.MAX_VALUE}.** <p>Beware that, unlike in most collections, this method is* <em>NOT</em> a constant-time operation. Because of the* asynchronous nature of these queues, determining the current* number of elements requires an O(n) traversal.* Additionally, if elements are added or removed during execution* of this method, the returned result may be inaccurate. Thus,* this method is typically not very useful in concurrent* applications.** @return the number of elements in this queue*/public int size() {int count = 0;for (Node<E> p = first(); p != null; p = succ(p))if (p.item != null)// Collection.size() spec says to max outif (++count == Integer.MAX_VALUE)break;return count;} // 獲取第一個元素,哨兵元素不算,沒有則為nullNode<E> first() {restartFromHead:for (;;) {for (Node<E> h = head, p = h, q;;) {boolean hasItem = (p.item != null);if (hasItem || (q = p.next) == null) {updateHead(h, p);return hasItem ? p : null;}else if (p == q)continue restartFromHead;elsep = q;}}}


remove

如果隊列里面存在該元素則刪除該元素,如果存在多個則刪除第一個,并返回true,否則返回false。

/*** Removes a single instance of the specified element from this queue,* if it is present. More formally, removes an element {@code e} such* that {@code o.equals(e)}, if this queue contains one or more such* elements.* Returns {@code true} if this queue contained the specified element* (or equivalently, if this queue changed as a result of the call).** @param o element to be removed from this queue, if present* @return {@code true} if this queue changed as a result of the call*/public boolean remove(Object o) {//查找元素為空,直接返回falseif (o == null) return false;Node<E> pred = null;for (Node<E> p = first(); p != null; p = succ(p)) {E item = p.item;//相等則使用cas值null,同時一個線程成功,失敗的線程循環查找隊列中其他元素是否有匹配的。if (item != null &&o.equals(item) &&p.casItem(item, null)) {//獲取next元素Node<E> next = succ(p);//如果有前驅節點,并且next不為空則鏈接前驅節點到next,if (pred != null && next != null)pred.casNext(p, next);return true;}pred = p;}return false; }


contains

判斷隊列里面是否含有指定對象,由于是遍歷整個隊列,所以像size 操作一樣結果也不是那么精確,有可能調用該方法時元素還在隊列里面,但是遍歷過程中其他線程才把該元素刪除了,那么就會返回false。

/*** Returns {@code true} if this queue contains the specified element.* More formally, returns {@code true} if and only if this queue contains* at least one element {@code e} such that {@code o.equals(e)}.** @param o object to be checked for containment in this queue* @return {@code true} if this queue contains the specified element*/public boolean contains(Object o) {if (o == null) return false;for (Node<E> p = first(); p != null; p = succ(p)) {E item = p.item;if (item != null && o.equals(item))return true;}return false;}

總結

ConcurrentLinkedQueue的底層使用單向鏈表數據結構來保存隊列元素,每個元素被包裝成一個Node節點。隊列是靠頭、尾節點來維護的,創建隊列時頭、尾節點指向一個item為null的哨兵節點。

第一次執行peek或者first操作時會把head指向第一個真正的隊列元素。由于使用非阻塞CAS算法,沒有加鎖,所以在計算size時有可能進行了offer、poll或者remove操作,導致計算的元素個數不精確,所以在并發情況下size函數不是很有用。

如下圖所示,入隊、出隊都是操作使用volatile修飾的tail、head節點,要保證在多線程下出入隊線程安全,只需要保證這兩個Node操作的可見性和原子性即可。由于volatile本身可以保證可見性,所以只需要保證對兩個變量操作的原子性即可。

offer操作是在tail后面添加元素,也就是調用tail.casNext方法,而這個方法使用的是CAS操作,只有一個線程會成功,然后失敗的線程會循環,重新獲取tail,再執行casNext方法。poll操作也通過類似CAS的算法保證出隊時移除節點操作的原子性

總結

以上是生活随笔為你收集整理的Java Review - 并发编程_ConcurrentLinkedQueue原理源码剖析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。