日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) >

Java Review - 并发编程_ConcurrentLinkedQueue原理源码剖析

發(fā)布時(shí)間:2025/3/21 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java Review - 并发编程_ConcurrentLinkedQueue原理源码剖析 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

文章目錄

  • 概述
  • ConcurrentLinkedQueue
  • 核心方法&源碼解讀
    • offer
    • add
    • poll
    • peek
    • size
    • remove
    • contains
  • 總結(jié)


概述

JDK中提供了一系列場(chǎng)景的并發(fā)安全隊(duì)列??偟膩?lái)說(shuō),按照實(shí)現(xiàn)方式的不同可分為阻塞隊(duì)列和非阻塞隊(duì)列,

  • 阻塞隊(duì)列使用鎖實(shí)現(xiàn)
  • 非阻塞隊(duì)列則使用CAS非阻塞算法實(shí)現(xiàn)


ConcurrentLinkedQueue

ConcurrentLinkedQueue是線程安全的無(wú)界非阻塞隊(duì)列,其底層數(shù)據(jù)結(jié)構(gòu)使用單向鏈表實(shí)現(xiàn),對(duì)于入隊(duì)和出隊(duì)操作使用CAS來(lái)實(shí)現(xiàn)線程安全。

【類圖】

ConcurrentLinkedQueue內(nèi)部的隊(duì)列使用單向鏈表方式實(shí)現(xiàn),

其中有兩個(gè)volatile類型的Node節(jié)點(diǎn)分別用來(lái)存放隊(duì)列的首、尾節(jié)點(diǎn)。

從下面的無(wú)參構(gòu)造函數(shù)可知,默認(rèn)頭、尾節(jié)點(diǎn)都是指向item為null的哨兵節(jié)點(diǎn)。 新元素會(huì)被插入隊(duì)列末尾,出隊(duì)時(shí)從隊(duì)列頭部獲取一個(gè)元素。

在Node節(jié)點(diǎn)內(nèi)部則維護(hù)一個(gè)使用volatile修飾的變量item,用來(lái)存放節(jié)點(diǎn)的值;next用來(lái)存放鏈表的下一個(gè)節(jié)點(diǎn),從而鏈接為一個(gè)單向無(wú)界鏈表。其內(nèi)部則使用UNSafe工具類提供的CAS算法來(lái)保證出入隊(duì)時(shí)操作鏈表的原子性。


核心方法&源碼解讀

下面我們介紹ConcurrentLinkedQueue的幾個(gè)主要方法的實(shí)現(xiàn)原理。

offer

在鏈表末尾添加一個(gè)元素

/*** Inserts the specified element at the tail of this queue.* As the queue is unbounded, this method will never return {@code false}.** @return {@code true} (as specified by {@link Queue#offer})* @throws NullPointerException if the specified element is null*/ public boolean offer(E e) {//1 e為null則拋出空指針異常checkNotNull(e);//2 構(gòu)造Node節(jié)點(diǎn)構(gòu)造函數(shù)內(nèi)部調(diào)用unsafe.putObject,后面統(tǒng)一講final Node<E> newNode = new Node<E>(e);//3 從尾節(jié)點(diǎn)插入for (Node<E> t = tail, p = t;;) {Node<E> q = p.next;// 4 如果q=null說(shuō)明p是尾節(jié)點(diǎn)則插入if (q == null) {// 5 使用cas設(shè)置p節(jié)點(diǎn)的next節(jié)點(diǎn) if (p.casNext(null, newNode)) {// 6 cas成功說(shuō)明新增節(jié)點(diǎn)已經(jīng)被放入鏈表,然后設(shè)置當(dāng)前尾節(jié)點(diǎn)(包含head,1,3,5.。。個(gè)節(jié)點(diǎn)為尾節(jié)點(diǎn))if (p != t) // hop two nodes at a timecasTail(t, newNode); // Failure is OK.return true;}// Lost CAS race to another thread; re-read next}else if (p == q)// 7 //多線程操作時(shí)候,由于poll時(shí)候會(huì)把老的head變?yōu)樽砸?#xff0c;然后head的next變?yōu)樾耯ead,所以這里需要//重新找新的head,因?yàn)樾碌膆ead后面的節(jié)點(diǎn)才是激活的節(jié)點(diǎn)p = (t != (t = tail)) ? t : head;else// 8 尋找尾節(jié)點(diǎn) p = (p != t && t != (t = tail)) ? t : q;} }
  • 首先看當(dāng)一個(gè)線程調(diào)用offer(item)時(shí)的情況。首先代碼(1)對(duì)傳參進(jìn)行空檢查,如果為null則拋出NPE異常,否則執(zhí)行代碼(2)并使用item作為構(gòu)造函數(shù)參數(shù)創(chuàng)建一個(gè)新的節(jié)點(diǎn),然后代碼(3)從隊(duì)列尾部節(jié)點(diǎn)開始循環(huán),打算從隊(duì)列尾部添加元素,當(dāng)執(zhí)行到代碼(4)時(shí)隊(duì)列狀態(tài)如下所示。

這時(shí)候節(jié)點(diǎn)p、t、head、tail同時(shí)指向了item為null的哨兵節(jié)點(diǎn),由于哨兵節(jié)點(diǎn)的next節(jié)點(diǎn)為null,所以這里q也指向null。

  • q==null則執(zhí)行代碼(5),通過(guò)CAS原子操作判斷p節(jié)點(diǎn)的next節(jié)點(diǎn)是否為null,如果為null則使用節(jié)點(diǎn)newNode替換p的next節(jié)點(diǎn),然后執(zhí)行代碼(6),這里由于p==t所以沒有設(shè)置尾部節(jié)點(diǎn),然后退出offer方法,這時(shí)候隊(duì)列的狀態(tài)如下圖所示

(2)上面是一個(gè)線程調(diào)用offer方法的情況,如果多個(gè)線程同時(shí)調(diào)用,就會(huì)存在多個(gè)線程同時(shí)執(zhí)行到代碼(5)的情況。假設(shè)線程A調(diào)用offer(item1),線程B調(diào)用offer(item2),同時(shí)執(zhí)行到代碼(5)p.casNext(null, newNode)。

由于CAS的比較設(shè)置操作是原子性的,所以這里假設(shè)線程A先執(zhí)行了比較設(shè)置操作,發(fā)現(xiàn)當(dāng)前p的next節(jié)點(diǎn)確實(shí)是null,則會(huì)原子性地更新next節(jié)點(diǎn)為 item1,這時(shí)候線程B也會(huì)判斷p的next節(jié)點(diǎn)是否為null,結(jié)果發(fā)現(xiàn)不是null(因?yàn)榫€程A已經(jīng)設(shè)置了p的next節(jié)點(diǎn)為 item1),則會(huì)跳到代碼(3),然后執(zhí)行到代碼(4),這時(shí)候的隊(duì)列分布如下圖所示。

根據(jù)上面的狀態(tài)圖可知線程B接下來(lái)會(huì)執(zhí)行代碼(8),然后把q賦給了p,這時(shí)候隊(duì)列狀態(tài)如下圖所示。

然后線程B再次跳轉(zhuǎn)到代碼(3)執(zhí)行,當(dāng)執(zhí)行到代碼(4)時(shí)隊(duì)列狀態(tài)如下圖所示


由于這時(shí)候q==null,所以線程B會(huì)執(zhí)行代碼(5),通過(guò)CAS操作判斷當(dāng)前p的next節(jié)點(diǎn)是否是null,不是則再次循環(huán)嘗試,是則使用item2替換。假設(shè)CAS成功了,那么執(zhí)行代碼(6),由于p!=t,所以設(shè)置tail節(jié)點(diǎn)為item2,然后退出offer方法。這時(shí)候隊(duì)列分布如下圖所示。

分析到現(xiàn)在,就差代碼(7)還沒走過(guò),其實(shí)這一步要在執(zhí)行poll操作后才會(huì)執(zhí)行。這里先來(lái)看一下執(zhí)行poll操作后可能會(huì)存在的一種情況,如下圖所示。

下面分析當(dāng)隊(duì)列處于這種狀態(tài)時(shí)調(diào)用offer添加元素,執(zhí)行到代碼(4)時(shí)的狀態(tài)圖,如下


這里由于q節(jié)點(diǎn)不為空并且pq所以執(zhí)行代碼(7),由于ttail所以p被賦值為head,然后重新循環(huán),循環(huán)后執(zhí)行到代碼(4),這時(shí)候隊(duì)列狀態(tài)如下圖所示。

這時(shí)候由于q==null,所以執(zhí)行代碼(5)進(jìn)行CAS操作,如果當(dāng)前沒有其他線程執(zhí)行offer操作,則CAS操作會(huì)成功,p的next節(jié)點(diǎn)被設(shè)置為新增節(jié)點(diǎn)。然后執(zhí)行代碼(6),由于p!=t所以設(shè)置新節(jié)點(diǎn)為隊(duì)列的尾部節(jié)點(diǎn),現(xiàn)在隊(duì)列狀態(tài)如圖

需要注意的是,這里自引用的節(jié)點(diǎn)會(huì)被垃圾回收掉。

可見,offer操作中的關(guān)鍵步驟是代碼(5),通過(guò)原子CAS操作來(lái)控制某時(shí)只有一個(gè)線程可以追加元素到隊(duì)列末尾。進(jìn)行CAS競(jìng)爭(zhēng)失敗的線程會(huì)通過(guò)循環(huán)一次次嘗試進(jìn)行CAS操作,直到CAS成功才會(huì)返回,也就是通過(guò)使用無(wú)限循環(huán)不斷進(jìn)行CAS嘗試方式來(lái)替代阻塞算法掛起調(diào)用線程。相比阻塞算法,這是使用CPU資源換取阻塞所帶來(lái)的開銷。


add

add操作是在鏈表末尾添加一個(gè)元素,其實(shí)在內(nèi)部調(diào)用的還是offer操作’\

/*** Inserts the specified element at the tail of this queue.* As the queue is unbounded, this method will never throw* {@link IllegalStateException} or return {@code false}.** @return {@code true} (as specified by {@link Collection#add})* @throws NullPointerException if the specified element is null*/public boolean add(E e) {return offer(e);}


poll

poll操作是在隊(duì)列頭部獲取并移除一個(gè)元素,如果隊(duì)列為空則返回null。

public E poll() {// 1. goto標(biāo)記restartFromHead:// 2 無(wú)限循環(huán)for (;;) {for (Node<E> h = head, p = h, q;;) {// 3 保存當(dāng)前節(jié)點(diǎn)E item = p.item;// 4 當(dāng)前item有值,則CAS變?yōu)閚ullif (item != null && p.casItem(item, null)) {// Successful CAS is the linearization point// for item to be removed from this queue.// 5 cas成功則標(biāo)記當(dāng)前節(jié)點(diǎn)并從鏈表中移除if (p != h) // hop two nodes at a timeupdateHead(h, ((q = p.next) != null) ? q : p);return item;}// 6 當(dāng)前隊(duì)列為空則返回nullelse if ((q = p.next) == null) {updateHead(h, p);return null;}// 7 如果當(dāng)前節(jié)點(diǎn)被自己引用,則重新查找新的隊(duì)列頭節(jié)點(diǎn)else if (p == q)continue restartFromHead;else // 8 p = q;}}}

poll方法在移除一個(gè)元素時(shí),只是簡(jiǎn)單地使用CAS操作把當(dāng)前節(jié)點(diǎn)的item值設(shè)置為null,然后通過(guò)重新設(shè)置頭節(jié)點(diǎn)將該元素從隊(duì)列里面移除,被移除的節(jié)點(diǎn)就成了孤立節(jié)點(diǎn),這個(gè)節(jié)點(diǎn)會(huì)在垃圾回收時(shí)被回收掉。另外,如果在執(zhí)行分支中發(fā)現(xiàn)頭節(jié)點(diǎn)被修改了,要跳到外層循環(huán)重新獲取新的頭節(jié)點(diǎn)。


peek

peek操作是獲取隊(duì)列頭部一個(gè)元素(只獲取不移除),如果隊(duì)列為空則返回null

public E peek() {// 1 restartFromHead:for (;;) {// 2 for (Node<E> h = head, p = h, q;;) {E item = p.item;// 3 if (item != null || (q = p.next) == null) {updateHead(h, p);return item;}// 4 else if (p == q)continue restartFromHead;else// 5 p = q;}}}

Peek操作的代碼結(jié)構(gòu)與poll操作類似,不同之處在于代碼(3)中少了castItem操作。

其實(shí)這很正常,因?yàn)閜eek只是獲取隊(duì)列頭元素值,并不清空其值。根據(jù)前面的介紹我們知道第一次執(zhí)行offer后head指向的是哨兵節(jié)點(diǎn)(也就是item為null的節(jié)點(diǎn)),那么第一次執(zhí)行peek時(shí)在代碼(3)中會(huì)發(fā)現(xiàn)item==null,然后執(zhí)行q=p.next,這時(shí)候q節(jié)點(diǎn)指向的才是隊(duì)列里面第一個(gè)真正的元素,或者如果隊(duì)列為null則q指向null。

總結(jié):peek操作的代碼與poll操作類似,只是前者只獲取隊(duì)列頭元素但是并不從隊(duì)列里將它刪除,而后者獲取后需要從隊(duì)列里面將它刪除。

另外,在第一次調(diào)用peek操作時(shí),會(huì)刪除哨兵節(jié)點(diǎn),并讓隊(duì)列的head節(jié)點(diǎn)指向隊(duì)列里面第一個(gè)元素或者null。


size

計(jì)算當(dāng)前隊(duì)列元素個(gè)數(shù),在并發(fā)環(huán)境下不是很有用,因?yàn)镃AS沒有加鎖,所以從調(diào)用size函數(shù)到返回結(jié)果期間有可能增刪元素,導(dǎo)致統(tǒng)計(jì)的元素個(gè)數(shù)不精確

/*** Returns the number of elements in this queue. If this queue* contains more than {@code Integer.MAX_VALUE} elements, returns* {@code Integer.MAX_VALUE}.** <p>Beware that, unlike in most collections, this method is* <em>NOT</em> a constant-time operation. Because of the* asynchronous nature of these queues, determining the current* number of elements requires an O(n) traversal.* Additionally, if elements are added or removed during execution* of this method, the returned result may be inaccurate. Thus,* this method is typically not very useful in concurrent* applications.** @return the number of elements in this queue*/public int size() {int count = 0;for (Node<E> p = first(); p != null; p = succ(p))if (p.item != null)// Collection.size() spec says to max outif (++count == Integer.MAX_VALUE)break;return count;} // 獲取第一個(gè)元素,哨兵元素不算,沒有則為nullNode<E> first() {restartFromHead:for (;;) {for (Node<E> h = head, p = h, q;;) {boolean hasItem = (p.item != null);if (hasItem || (q = p.next) == null) {updateHead(h, p);return hasItem ? p : null;}else if (p == q)continue restartFromHead;elsep = q;}}}


remove

如果隊(duì)列里面存在該元素則刪除該元素,如果存在多個(gè)則刪除第一個(gè),并返回true,否則返回false。

/*** Removes a single instance of the specified element from this queue,* if it is present. More formally, removes an element {@code e} such* that {@code o.equals(e)}, if this queue contains one or more such* elements.* Returns {@code true} if this queue contained the specified element* (or equivalently, if this queue changed as a result of the call).** @param o element to be removed from this queue, if present* @return {@code true} if this queue changed as a result of the call*/public boolean remove(Object o) {//查找元素為空,直接返回falseif (o == null) return false;Node<E> pred = null;for (Node<E> p = first(); p != null; p = succ(p)) {E item = p.item;//相等則使用cas值null,同時(shí)一個(gè)線程成功,失敗的線程循環(huán)查找隊(duì)列中其他元素是否有匹配的。if (item != null &&o.equals(item) &&p.casItem(item, null)) {//獲取next元素Node<E> next = succ(p);//如果有前驅(qū)節(jié)點(diǎn),并且next不為空則鏈接前驅(qū)節(jié)點(diǎn)到next,if (pred != null && next != null)pred.casNext(p, next);return true;}pred = p;}return false; }


contains

判斷隊(duì)列里面是否含有指定對(duì)象,由于是遍歷整個(gè)隊(duì)列,所以像size 操作一樣結(jié)果也不是那么精確,有可能調(diào)用該方法時(shí)元素還在隊(duì)列里面,但是遍歷過(guò)程中其他線程才把該元素刪除了,那么就會(huì)返回false。

/*** Returns {@code true} if this queue contains the specified element.* More formally, returns {@code true} if and only if this queue contains* at least one element {@code e} such that {@code o.equals(e)}.** @param o object to be checked for containment in this queue* @return {@code true} if this queue contains the specified element*/public boolean contains(Object o) {if (o == null) return false;for (Node<E> p = first(); p != null; p = succ(p)) {E item = p.item;if (item != null && o.equals(item))return true;}return false;}

總結(jié)

ConcurrentLinkedQueue的底層使用單向鏈表數(shù)據(jù)結(jié)構(gòu)來(lái)保存隊(duì)列元素,每個(gè)元素被包裝成一個(gè)Node節(jié)點(diǎn)。隊(duì)列是靠頭、尾節(jié)點(diǎn)來(lái)維護(hù)的,創(chuàng)建隊(duì)列時(shí)頭、尾節(jié)點(diǎn)指向一個(gè)item為null的哨兵節(jié)點(diǎn)。

第一次執(zhí)行peek或者first操作時(shí)會(huì)把head指向第一個(gè)真正的隊(duì)列元素。由于使用非阻塞CAS算法,沒有加鎖,所以在計(jì)算size時(shí)有可能進(jìn)行了offer、poll或者remove操作,導(dǎo)致計(jì)算的元素個(gè)數(shù)不精確,所以在并發(fā)情況下size函數(shù)不是很有用。

如下圖所示,入隊(duì)、出隊(duì)都是操作使用volatile修飾的tail、head節(jié)點(diǎn),要保證在多線程下出入隊(duì)線程安全,只需要保證這兩個(gè)Node操作的可見性和原子性即可。由于volatile本身可以保證可見性,所以只需要保證對(duì)兩個(gè)變量操作的原子性即可。

offer操作是在tail后面添加元素,也就是調(diào)用tail.casNext方法,而這個(gè)方法使用的是CAS操作,只有一個(gè)線程會(huì)成功,然后失敗的線程會(huì)循環(huán),重新獲取tail,再執(zhí)行casNext方法。poll操作也通過(guò)類似CAS的算法保證出隊(duì)時(shí)移除節(jié)點(diǎn)操作的原子性

總結(jié)

以上是生活随笔為你收集整理的Java Review - 并发编程_ConcurrentLinkedQueue原理源码剖析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。