日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程语言 > java >内容正文

java

Java并发编程笔记之ConcurrentLinkedQueue源码探究

發(fā)布時(shí)間:2023/12/1 java 56 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java并发编程笔记之ConcurrentLinkedQueue源码探究 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

JDK 中基于鏈表的非阻塞無(wú)界隊(duì)列 ConcurrentLinkedQueue 原理剖析,ConcurrentLinkedQueue 內(nèi)部是如何使用 CAS 非阻塞算法來(lái)保證多線程下入隊(duì)出隊(duì)操作的線程安全?

ConcurrentLinkedQueue是線程安全的無(wú)界非阻塞隊(duì)列,其底層數(shù)據(jù)結(jié)構(gòu)是使用單向鏈表實(shí)現(xiàn),入隊(duì)和出隊(duì)操作是使用我們經(jīng)常提到的CAS來(lái)保證線程安全的。

我們首先看一下ConcurrentLinkedQueue的類(lèi)圖結(jié)構(gòu)先,好有一個(gè)內(nèi)部邏輯有一個(gè)大概的印象,如下圖所示:

可以清楚的看到ConcurrentLinkedQueue內(nèi)部的隊(duì)列是使用單向鏈表方式實(shí)現(xiàn),類(lèi)中兩個(gè)volatile 類(lèi)型的Node 節(jié)點(diǎn)分別用來(lái)存放隊(duì)列的首位節(jié)點(diǎn)。

首先我們先來(lái)看一下ConcurrentLinkedQueue的構(gòu)造函數(shù),如下:

public ConcurrentLinkedQueue() {head = tail = new Node<E>(null); }

通過(guò)無(wú)參構(gòu)造函數(shù)可知默認(rèn)頭尾節(jié)點(diǎn)都是指向 item 為 null 的哨兵節(jié)點(diǎn)。

Node節(jié)點(diǎn)內(nèi)部則維護(hù)一個(gè)volatile 修飾的變量item 用來(lái)存放節(jié)點(diǎn)的值,next用來(lái)存放鏈表的下一個(gè)節(jié)點(diǎn),從而鏈接為一個(gè)單向無(wú)界鏈表,這就是單向無(wú)界的根本原因。如下圖:

?

接下來(lái)看ConcurrentLinkedQueue 主要關(guān)注入隊(duì),出隊(duì),獲取隊(duì)列元素的方法的源碼,如下所示:

1.首先看入隊(duì)方法offer,offer 操作是在隊(duì)列末尾添加一個(gè)元素,如果傳遞的參數(shù)是 null 則拋出 NPE 異常,否者由于 ConcurrentLinkedQueue 是無(wú)界隊(duì)列該方法一直會(huì)返回 true。另外由于使用 CAS 無(wú)阻塞算法,該方法不會(huì)阻塞調(diào)用線程,其源碼如下:

?

public boolean offer(E e) {//(1)e為null則拋出空指針異常 checkNotNull(e);//(2)構(gòu)造Node節(jié)點(diǎn)final Node<E> newNode = new Node<E>(e);//(3)從尾節(jié)點(diǎn)進(jìn)行插入for (Node<E> t = tail, p = t;;) {Node<E> q = p.next;//(4)如果q==null說(shuō)明p是尾節(jié)點(diǎn),則執(zhí)行插入if (q == null) {//(5)使用CAS設(shè)置p節(jié)點(diǎn)的next節(jié)點(diǎn)if (p.casNext(null, newNode)) {//(6)cas成功,則說(shuō)明新增節(jié)點(diǎn)已經(jīng)被放入鏈表,然后設(shè)置當(dāng)前尾節(jié)點(diǎn)if (p != t)casTail(t, newNode); // Failure is OK.return true;}}else if (p == q)//(7)//多線程操作時(shí)候,由于poll操作移除元素后有可能會(huì)把head變?yōu)樽砸?#xff0c;然后head的next變?yōu)樾耯ead,所以這里需要//重新找新的head,因?yàn)樾碌膆ead后面的節(jié)點(diǎn)才是正常的節(jié)點(diǎn)。p = (t != (t = tail)) ? t : head;else//(8) 尋找尾節(jié)點(diǎn)p = (p != t && t != (t = tail)) ? t : q;} }

?類(lèi)圖結(jié)構(gòu)時(shí)候談到構(gòu)造隊(duì)列時(shí)候參構(gòu)造函數(shù)創(chuàng)建了一個(gè) item 為 null 的哨兵節(jié)點(diǎn),并且 head 和 tail 都是指向這個(gè)節(jié)點(diǎn),下面通過(guò)圖形結(jié)合來(lái)講解下 offer 操作的代碼實(shí)現(xiàn)。

  1.首先看一下,當(dāng)一個(gè)線程調(diào)用offer(item)時(shí)候情況:首先代碼(1)對(duì)傳參判斷空檢查,如果為null 則拋出空指針異常,然后代碼(2)則使用item作為構(gòu)造函數(shù)參數(shù)創(chuàng)建一個(gè)新的節(jié)點(diǎn),

代碼(3)從隊(duì)列尾部節(jié)點(diǎn)開(kāi)始循環(huán),目的是從隊(duì)列尾部添加元素。如下圖:

?

上圖是執(zhí)行代碼(4)時(shí)候隊(duì)列的情況,這時(shí)候節(jié)點(diǎn) p , t ,head ,tail 同時(shí)指向了item為null的哨兵節(jié)點(diǎn),由于哨兵節(jié)點(diǎn)的next節(jié)點(diǎn)為null,所以這里q指向也是null。

代碼(4)發(fā)現(xiàn)q==null? 則執(zhí)行代碼(5),通過(guò)CAS原子操作判斷p 節(jié)點(diǎn)的next節(jié)點(diǎn)是否為null,如果為null 則使用節(jié)點(diǎn)newNode替換p 的next節(jié)點(diǎn),

然后執(zhí)行代碼(6),由于 p == t ,所以沒(méi)有設(shè)置尾部節(jié)點(diǎn),然后退出offer方法,這時(shí)候隊(duì)列的狀態(tài)圖如下:

?

上面講解的是一個(gè)線程調(diào)用offer方法的情況下,如果多個(gè)線程同時(shí)調(diào)用,就會(huì)存在多個(gè)線程同時(shí)執(zhí)行到代碼(5),假設(shè)線程A調(diào)用offer(item1),

線程B調(diào)用offer(item2),線程 A 和線程B同時(shí)到 p.casNext(null,newNode)。而CAS的比較并設(shè)置操作是原子性的,假設(shè)線程A先執(zhí)行了比較設(shè)置操作,

則發(fā)現(xiàn)當(dāng)前P的next節(jié)點(diǎn)確實(shí)是null ,則會(huì)原子性更新next節(jié)點(diǎn)為newNode,這時(shí)候線程B 也會(huì)判斷p 的next節(jié)點(diǎn)是否為null,結(jié)果發(fā)現(xiàn)不是null,(因?yàn)榫€程 A 已經(jīng)設(shè)置了 p 的 next 為 newNode)則會(huì)跳到代碼(3),

然后執(zhí)行到代碼(4)的時(shí)候的隊(duì)列分布圖如下:

?根據(jù)這個(gè)狀態(tài)圖可知線程B會(huì)執(zhí)行代碼(8),然后q 賦值給了p,這個(gè)時(shí)候狀態(tài)圖為:

然后線程B再次跳轉(zhuǎn)到代碼(3)執(zhí)行,當(dāng)執(zhí)行到代碼(4)時(shí)候隊(duì)列狀態(tài)圖為:

由于這時(shí)候q == null ,所以線程B 會(huì)執(zhí)行步驟(5),通過(guò)CAS操作判斷 當(dāng)前p的next 節(jié)點(diǎn)是否為null ,不是則再次循環(huán)后嘗試,是則使用newNode替換,假設(shè)CAS成功了,那么執(zhí)行步驟(6),

由于 p != t 所以設(shè)置tail節(jié)點(diǎn)為newNode ,然后退出offer方法。這時(shí)候隊(duì)列的狀態(tài)圖為:

到現(xiàn)在為止,offer代碼在執(zhí)行路徑現(xiàn)在就差步驟(7)還沒(méi)有執(zhí)行過(guò),其實(shí)這個(gè)要在執(zhí)行poll操作才會(huì)出現(xiàn)的,這里先看一下執(zhí)行poll操作后可能會(huì)存在的一種情況,如下圖所示:

下面分析下當(dāng)隊(duì)列處于這種狀態(tài)調(diào)用offer添加元素代碼執(zhí)行到代碼(4)的時(shí)候的隊(duì)列狀態(tài)圖,如下:

由于q節(jié)點(diǎn)不為空并且p==q 所以執(zhí)行代碼(7),因?yàn)?t == tail所以p 被賦值為head ,然后進(jìn)入循環(huán),循環(huán)后執(zhí)行到代碼(4)的時(shí)候的隊(duì)列狀態(tài)圖,如下:

由于 q ==null,所以執(zhí)行代碼(5),進(jìn)行CAS操作,如果當(dāng)前沒(méi)有其他線程執(zhí)行offer操作,則CAS操作會(huì)成功,p的next節(jié)點(diǎn)被設(shè)置為新增節(jié)點(diǎn),然后執(zhí)行代碼(6),

由于p != t 所以設(shè)置新節(jié)點(diǎn)為隊(duì)列尾節(jié)點(diǎn),現(xiàn)在隊(duì)列狀態(tài)圖,如下:

在這里的自引用的節(jié)點(diǎn)會(huì)被垃圾回收掉,可見(jiàn)offer操作里面關(guān)鍵步驟是代碼(5)通過(guò)原子CAS操作來(lái)進(jìn)行控制同時(shí)只有一個(gè)線程可以追加元素到隊(duì)列末尾,進(jìn)行cas競(jìng)爭(zhēng)失敗的線程,

則會(huì)通過(guò)循環(huán)一次次嘗試進(jìn)行cas操作,知道cas成功才會(huì)返回,也就是通過(guò)使用無(wú)限循環(huán)里面不斷進(jìn)行CAS嘗試方式來(lái)替代阻塞算法掛起調(diào)用線程,相比阻塞算法,這是使用CPU資源換取阻塞帶來(lái)的開(kāi)銷(xiāo)。

?

  2.poll操作,poll 操作是在隊(duì)列頭部獲取并且移除一個(gè)元素,如果隊(duì)列為空則返回 null,我們首先看改方法的源碼,如下:

public E poll() {//(1) goto標(biāo)記 restartFromHead://(2)無(wú)限循環(huán)for (;;) {for (Node<E> h = head, p = h, q;;) {//(3)保存當(dāng)前節(jié)點(diǎn)值E item = p.item;//(4)當(dāng)前節(jié)點(diǎn)有值則cas變?yōu)閚ullif (item != null && p.casItem(item, null)) {//(5)cas成功標(biāo)志當(dāng)前節(jié)點(diǎn)以及從鏈表中移除if (p != h) updateHead(h, ((q = p.next) != null) ? q : p);return item;}//(6)當(dāng)前隊(duì)列為空則返回nullelse if ((q = p.next) == null) {updateHead(h, p);return null;}//(7)自引用了,則重新找新的隊(duì)列頭節(jié)點(diǎn)else if (p == q)continue restartFromHead;else//(8)p = q;}}}   final void updateHead(Node<E> h, Node<E> p) {if (h != p && casHead(h, p))h.lazySetNext(h);}

poll操作是從隊(duì)頭獲取元素,所以代碼(2)內(nèi)層循環(huán)是從head節(jié)點(diǎn)開(kāi)始迭代,代碼(3)獲取當(dāng)前隊(duì)頭的節(jié)點(diǎn),當(dāng)隊(duì)列一開(kāi)始為空的時(shí)候隊(duì)列狀態(tài)為:

由于head 節(jié)點(diǎn)指向的item 為null 的哨兵節(jié)點(diǎn),所以會(huì)執(zhí)行到代碼(6),假設(shè)這個(gè)過(guò)程沒(méi)有線程調(diào)用offer,則此時(shí)q等于null? ,如下圖:

所以執(zhí)行updateHead方法,由于h 等于 p所以沒(méi)有設(shè)置頭節(jié)點(diǎn),poll方法直接返回null。

假設(shè)執(zhí)行到代碼(6)的時(shí)候已經(jīng)有其他線程調(diào)用了offer 方法成功添加了一個(gè)元素到隊(duì)列,這時(shí)候q執(zhí)行的是新增元素的節(jié)點(diǎn),這時(shí)候隊(duì)列狀態(tài)圖為:

所以代碼(6)判斷結(jié)果為false,然后會(huì)轉(zhuǎn)向代碼(7)執(zhí)行,而此時(shí)p不等于q,所以轉(zhuǎn)向代碼(8)執(zhí)行,執(zhí)行結(jié)果是p指向了節(jié)點(diǎn)q,此時(shí)的隊(duì)列狀態(tài)如下:

然后程序轉(zhuǎn)向代碼(3)執(zhí)行,p現(xiàn)在指向的元素值不為null,則執(zhí)行p.casItem(item, null)?通過(guò) CAS 操作嘗試設(shè)置 p 的 item 值為 null,

如果此時(shí)沒(méi)有其他線程進(jìn)行poll操作,CAS成功則執(zhí)行代碼(5),由于此時(shí) p != h ,所以設(shè)置頭節(jié)點(diǎn)為p,poll然后返回被從隊(duì)列移除的節(jié)點(diǎn)值item。此時(shí)隊(duì)列狀態(tài)為:

這個(gè)狀態(tài)就是前面提到offer操作的時(shí)候,offer代碼的執(zhí)行路徑(7)執(zhí)行的前提狀態(tài)。

假如現(xiàn)在一個(gè)線程調(diào)用了poll操作,則在執(zhí)行代碼(4)的時(shí)候的隊(duì)列狀態(tài)為:

可以看到這時(shí)候執(zhí)行代碼(6)返回null。

現(xiàn)在poll的代碼還有個(gè)分支(7)還沒(méi)有被執(zhí)行過(guò),那么什么時(shí)候會(huì)執(zhí)行呢?假設(shè)線程A執(zhí)行poll操作的時(shí)候,當(dāng)前的隊(duì)列狀態(tài),如下:

那么執(zhí)行p.casItem(item, null)?通過(guò) CAS 操作嘗試設(shè)置 p 的 item 值為 null。

假設(shè) CAS 設(shè)置成功則標(biāo)示該節(jié)點(diǎn)從隊(duì)列中移除了,此時(shí)隊(duì)列狀態(tài)為:

然后由于p != h,所以會(huì)執(zhí)行updateHead 方法,假如線程A執(zhí)行updateHead前,另外一個(gè)線程B開(kāi)始poll操作,這時(shí)候線程B的p指向head節(jié)點(diǎn),

但是還沒(méi)有執(zhí)行到代碼(6),這時(shí)候隊(duì)列狀態(tài)為:

然后線程A執(zhí)行?updateHead 操作,執(zhí)行完畢后線程 A 退出,這時(shí)候隊(duì)列狀態(tài)為:

然后線程B繼續(xù)執(zhí)行代碼(6)q=p.next由于該節(jié)點(diǎn)是自引用節(jié)點(diǎn)所以p==q,所以會(huì)執(zhí)行代碼(7)跳到外層循環(huán)restartFromHead,重新獲取當(dāng)前隊(duì)列隊(duì)頭 head, 現(xiàn)在狀態(tài)為:

?

總結(jié):poll元素移除一個(gè) 元素的時(shí)候,只是簡(jiǎn)單的使用CAS操作把當(dāng)前節(jié)點(diǎn)的item值設(shè)置為null,然后通過(guò)重新設(shè)置頭節(jié)點(diǎn)讓該元素從隊(duì)列里面摘除,

被摘除的節(jié)點(diǎn)就成了孤立節(jié)點(diǎn),這個(gè)節(jié)點(diǎn)會(huì)被在GC的時(shí)候會(huì)被回收掉。另外,執(zhí)行分支中如果發(fā)現(xiàn)頭節(jié)點(diǎn)被修改了要跳到外層循環(huán)重新獲取新的頭節(jié)點(diǎn)。

?

  3.peek操作,peek 操作是獲取隊(duì)列頭部一個(gè)元素(只不獲取不移除),如果隊(duì)列為空則返回 null,其源碼如下:

public E peek() {//(1) restartFromHead:for (;;) {for (Node<E> h = head, p = h, q;;) {//(2)E item = p.item;//(3)if (item != null || (q = p.next) == null) {updateHead(h, p);return item;}//(4)else if (p == q)continue restartFromHead;else//(5)p = q;}} }

代碼結(jié)構(gòu)與poll操作類(lèi)似,不同于代碼(3)的使用只是少了castItem 操作,其實(shí)這很正常,因?yàn)閜eek只是獲取隊(duì)列頭元素值,并不清空其值,

根據(jù)前面我們知道第一次執(zhí)行 offer 后 head 指向的是哨兵節(jié)點(diǎn)(也就是 item 為 null 的節(jié)點(diǎn)),那么第一次peek的時(shí)候,代碼(3)中會(huì)發(fā)現(xiàn)item==null,

然后會(huì)執(zhí)行 q = p.next, 這時(shí)候 q 節(jié)點(diǎn)指向的才是隊(duì)列里面第一個(gè)真正的元素或者如果隊(duì)列為 null 則 q 指向 null。

?

當(dāng)隊(duì)列為空的時(shí)候,隊(duì)列狀態(tài)圖,如下:

這時(shí)候執(zhí)行updateHead 由于 h 節(jié)點(diǎn)等于 p 節(jié)點(diǎn)所以不進(jìn)行任何操作,然后 peek 操作會(huì)返回 null。

當(dāng)隊(duì)列中至少有一個(gè)元素的時(shí)候(假如只有一個(gè)),這時(shí)候隊(duì)列狀態(tài)為:

這時(shí)候執(zhí)行代碼(5)這時(shí)候 p 指向了 q 節(jié)點(diǎn),然后執(zhí)行代碼(3)這時(shí)候隊(duì)列狀態(tài)為:

執(zhí)行代碼(3)發(fā)現(xiàn) item 不為 null,則執(zhí)行 updateHead 方法,由于 h!=p, 所以設(shè)置頭結(jié)點(diǎn),設(shè)置后隊(duì)列狀態(tài)為:

可以看到其實(shí)就是剔除了哨兵節(jié)點(diǎn)。

?

總結(jié):peek操作代碼與poll操作類(lèi)似,只是前者只獲取隊(duì)列頭元素,但是并不從隊(duì)列里面刪除,而后者獲取后需要從隊(duì)列里面刪除,另外,在第一次調(diào)用peek操作的時(shí)候,

會(huì)刪除哨兵節(jié)點(diǎn),并讓隊(duì)列的head節(jié)點(diǎn)指向隊(duì)列里面第一個(gè)元素或者null。

?

  4.size方法,獲取當(dāng)前隊(duì)列元素個(gè)數(shù),在并發(fā)環(huán)境下不是很有用,因?yàn)?CAS 沒(méi)有加鎖所以從調(diào)用 size 函數(shù)到返回結(jié)果期間有可能增刪元素,導(dǎo)致統(tǒng)計(jì)的元素個(gè)數(shù)不精確。源碼如下:

public int size() {int count = 0;for (Node<E> p = first(); p != null; p = succ(p))if (p.item != null)// 最大返回Integer.MAX_VALUEif (++count == Integer.MAX_VALUE)break;return count; } //獲取第一個(gè)隊(duì)列元素(哨兵元素不算),沒(méi)有則為null Node<E> first() {restartFromHead:for (;;) {for (Node<E> h = head, p = h, q;;) {boolean hasItem = (p.item != null);if (hasItem || (q = p.next) == null) {updateHead(h, p);return hasItem ? p : null;}else if (p == q)continue restartFromHead;elsep = q;}} } //獲取當(dāng)前節(jié)點(diǎn)的next元素,如果是自引入節(jié)點(diǎn)則返回真正頭節(jié)點(diǎn) final Node<E> succ(Node<E> p) {Node<E> next = p.next;return (p == next) ? head : next; }

?

  5.remove方法,如果隊(duì)列里面存在該元素則刪除給元素,如果存在多個(gè)則刪除第一個(gè),并返回 true,否者返回 false。源碼如下:

public boolean remove(Object o) {//查找元素為空,直接返回falseif (o == null) return false;Node<E> pred = null;for (Node<E> p = first(); p != null; p = succ(p)) {E item = p.item;//相等則使用cas值null,同時(shí)一個(gè)線程成功,失敗的線程循環(huán)查找隊(duì)列中其它元素是否有匹配的。if (item != null &&o.equals(item) &&p.casItem(item, null)) {//獲取next元素Node<E> next = succ(p);//如果有前驅(qū)節(jié)點(diǎn),并且next不為空則鏈接前驅(qū)節(jié)點(diǎn)到next,if (pred != null && next != null)pred.casNext(p, next);return true;}pred = p;}return false; }

?

ConcurrentLinkedQueue 底層使用單向鏈表數(shù)據(jù)結(jié)構(gòu)來(lái)保存隊(duì)列元素,每個(gè)元素被包裝為了一個(gè) Node 節(jié)點(diǎn),隊(duì)列是靠頭尾節(jié)點(diǎn)來(lái)維護(hù)的,創(chuàng)建隊(duì)列時(shí)候頭尾節(jié)點(diǎn)指向一個(gè) item 為 null 的哨兵節(jié)點(diǎn),

第一次 peek 或者 first 時(shí)候會(huì)把 head 指向第一個(gè)真正的隊(duì)列元素。由于使用非阻塞 CAS 算法,沒(méi)有加鎖,所以獲取 size 的時(shí)候有可能進(jìn)行了 offer,poll 或者 remove 操作,導(dǎo)致獲取的元素個(gè)數(shù)不精確,所以在并發(fā)情況下 size 函數(shù)不是很有用。

?

  • JDK 中基于鏈表的非阻塞無(wú)界隊(duì)列 ConcurrentLinkedQueue 原理剖析,ConcurrentLinkedQueue 內(nèi)部是如何使用 CAS 非阻塞算法來(lái)保證多線程下入隊(duì)出隊(duì)操作的線程安全?

?

轉(zhuǎn)載于:https://www.cnblogs.com/huangjuncong/p/9196240.html

總結(jié)

以上是生活随笔為你收集整理的Java并发编程笔记之ConcurrentLinkedQueue源码探究的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。