日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

并发队列-无界非阻塞队列 ConcurrentLinkedQueue 原理探究

發(fā)布時(shí)間:2025/4/14 编程问答 48 豆豆
生活随笔 收集整理的這篇文章主要介紹了 并发队列-无界非阻塞队列 ConcurrentLinkedQueue 原理探究 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

并發(fā)隊(duì)列-無界非阻塞隊(duì)列 ConcurrentLinkedQueue 原理探究

http://www.importnew.com/25668.html


一、 前言

常用的并發(fā)隊(duì)列有阻塞隊(duì)列和非阻塞隊(duì)列,前者使用鎖實(shí)現(xiàn),后者則使用CAS非阻塞算法實(shí)現(xiàn),使用非阻塞隊(duì)列一般性能比較好,下面就看看常用的非阻塞ConcurrentLinkedQueue是如何使用CAS實(shí)現(xiàn)的。

二、 ConcurrentLinkedQueue類圖結(jié)構(gòu)

如圖ConcurrentLinkedQueue中有兩個(gè)volatile類型的Node節(jié)點(diǎn)分別用來存在列表的首尾節(jié)點(diǎn),其中head節(jié)點(diǎn)存放鏈表第一個(gè)item為null的節(jié)點(diǎn),tail則并不是總指向最后一個(gè)節(jié)點(diǎn)。Node節(jié)點(diǎn)內(nèi)部則維護(hù)一個(gè)變量item用來存放節(jié)點(diǎn)的值,next用來存放下一個(gè)節(jié)點(diǎn),從而鏈接為一個(gè)單向無界列表。

123public ConcurrentLinkedQueue() {????head = tail = new Node<E>(null);}

如上代碼初始化時(shí)候會構(gòu)建一個(gè)item為NULL的空節(jié)點(diǎn)作為鏈表的首尾節(jié)點(diǎn)。

三、offer操作

offer操作是在鏈表末尾添加一個(gè)元素,下面看看實(shí)現(xiàn)原理。

12345678910111213141516171819202122232425262728293031323334public boolean offer(E e) {????//e為null則拋出空指針異常????checkNotNull(e);???//構(gòu)造Node節(jié)點(diǎn)構(gòu)造函數(shù)內(nèi)部調(diào)用unsafe.putObject,后面統(tǒng)一講????final Node<E> newNode = new Node<E>(e);????//從尾節(jié)點(diǎn)插入????for (Node<E> t = tail, p = t;;) {????????Node<E> q = p.next;????????//如果q=null說明p是尾節(jié)點(diǎn)則插入????????if (q == null) {????????????//cas插入(1)????????????if (p.casNext(null, newNode)) {????????????????//cas成功說明新增節(jié)點(diǎn)已經(jīng)被放入鏈表,然后設(shè)置當(dāng)前尾節(jié)點(diǎn)(包含head,1,3,5.。。個(gè)節(jié)點(diǎn)為尾節(jié)點(diǎn))????????????????if (p != t) // hop two nodes at a time????????????????????casTail(t, newNode);? // Failure is OK.????????????????return true;????????????}????????????// Lost CAS race to another thread; re-read next????????}????????else if (p == q)//(2)????????????//多線程操作時(shí)候,由于poll時(shí)候會把老的head變?yōu)樽砸?#xff0c;然后head的next變?yōu)樾耯ead,所以這里需要????????????//重新找新的head,因?yàn)樾碌膆ead后面的節(jié)點(diǎn)才是激活的節(jié)點(diǎn)????????????p = (t != (t = tail)) ? t : head;????????else????????????// 尋找尾節(jié)點(diǎn)(3)????????????p = (p != t && t != (t = tail)) ? t : q;????}}

從構(gòu)造函數(shù)知道一開始有個(gè)item為null的哨兵節(jié)點(diǎn),并且head和tail都是指向這個(gè)節(jié)點(diǎn),然后當(dāng)一個(gè)線程調(diào)用offer時(shí)候首先

如圖首先查找尾節(jié)點(diǎn),q==null,p就是尾節(jié)點(diǎn),所以執(zhí)行p.casNext通過cas設(shè)置p的next為新增節(jié)點(diǎn),這時(shí)候p==t所以不重新設(shè)置尾節(jié)點(diǎn)為當(dāng)前新節(jié)點(diǎn)。由于多線程可以調(diào)用offer方法,所以可能兩個(gè)線程同時(shí)執(zhí)行到了(1)進(jìn)行cas,那么只有一個(gè)會成功(假如線程1成功了),成功后的鏈表為:

失敗的線程會循環(huán)一次這時(shí)候指針為:

這時(shí)候會執(zhí)行(3)所以p=q,然后在循環(huán)后指針位置為:

所以沒有其他線程干擾的情況下會執(zhí)行(1)執(zhí)行cas把新增節(jié)點(diǎn)插入到尾部,沒有干擾的情況下線程2 cas會成功,然后去更新尾節(jié)點(diǎn)tail,由于p!=t所以更新。這時(shí)候鏈表和指針為:

假如線程2cas時(shí)候線程3也在執(zhí)行,那么線程3會失敗,循環(huán)一次后,線程3的節(jié)點(diǎn)狀態(tài)為:

這時(shí)候p!=t ;并且t的原始值為told,t的新值為tnew ,所以told!=tnew,所以 p=tnew=tail;

然后在循環(huán)一下后節(jié)點(diǎn)狀態(tài):

q==null所以執(zhí)行(1)。

現(xiàn)在就差p==q這個(gè)分支還沒有走,這個(gè)要在執(zhí)行poll操作后才會出現(xiàn)這個(gè)情況。poll后會存在下面的狀態(tài)

這個(gè)時(shí)候添加元素時(shí)候指針分布為:

所以會執(zhí)行(2)分支 結(jié)果 p=head
然后循環(huán),循環(huán)后指針分布:

所以執(zhí)行(1),然后p!=t所以設(shè)置tail節(jié)點(diǎn)。現(xiàn)在分布圖:

自引用的節(jié)點(diǎn)會被垃圾回收掉。

四、 add操作

add操作是在鏈表末尾添加一個(gè)元素,下面看看實(shí)現(xiàn)原理。
其實(shí)內(nèi)部調(diào)用的還是offer

123public boolean add(E e) {????return offer(e);}

五、poll操作

poll操作是在鏈表頭部獲取并且移除一個(gè)元素,下面看看實(shí)現(xiàn)原理。

123456789101112131415161718192021222324252627282930313233343536public E poll() {????restartFromHead:????//死循環(huán)????for (;;) {????????//死循環(huán)????????for (Node<E> h = head, p = h, q;;) {????????????//保存當(dāng)前節(jié)點(diǎn)值????????????E item = p.item;????????????//當(dāng)前節(jié)點(diǎn)有值則cas變?yōu)閚ull(1)????????????if (item != null && p.casItem(item, null)) {????????????????//cas成功標(biāo)志當(dāng)前節(jié)點(diǎn)以及從鏈表中移除????????????????if (p != h) // 類似tail間隔2設(shè)置一次頭節(jié)點(diǎn)(2)????????????????????updateHead(h, ((q = p.next) != null) ? q : p);????????????????return item;????????????}????????????//當(dāng)前隊(duì)列為空則返回null(3)????????????else if ((q = p.next) == null) {????????????????updateHead(h, p);????????????????return null;????????????}????????????//自引用了,則重新找新的隊(duì)列頭節(jié)點(diǎn)(4)????????????else if (p == q)????????????????continue restartFromHead;????????????else//(5)????????????????p = q;????????}????}}????final void updateHead(Node<E> h, Node<E> p) {????????if (h != p && casHead(h, p))????????????h.lazySetNext(h);????}

當(dāng)隊(duì)列為空時(shí)候:

可知執(zhí)行(3)這時(shí)候有兩種情況,第一沒有其他線程添加元素時(shí)候(3)結(jié)果為true然后因?yàn)閔!=p為false所以直接返回null。第二在執(zhí)行q=p.next前,其他線程已經(jīng)添加了一個(gè)元素到隊(duì)列,這時(shí)候(3)返回false,然后執(zhí)行(5)p=q,然后循環(huán)后節(jié)點(diǎn)分布:

這時(shí)候執(zhí)行(1)分支,進(jìn)行cas把當(dāng)前節(jié)點(diǎn)值值為null,同時(shí)只有一個(gè)線程會成功,cas成功 標(biāo)示該節(jié)點(diǎn)從隊(duì)列中移除了,然后p!=h,調(diào)用updateHead方法,參數(shù)為h,p;h!=p所以把p變?yōu)楫?dāng)前鏈表head節(jié)點(diǎn),然后h節(jié)點(diǎn)的next指向自己。現(xiàn)在狀態(tài)為:

cas失敗 后 會再次循環(huán),這時(shí)候分布圖為:

這時(shí)候執(zhí)行(3)返回null.

現(xiàn)在還有個(gè)分支(4)沒有執(zhí)行過,那么什么時(shí)候會執(zhí)行那?

這時(shí)候執(zhí)行(1)分支,進(jìn)行cas把當(dāng)前節(jié)點(diǎn)值值為null,同時(shí)只有一個(gè)線程A會成功,cas成功 標(biāo)示該節(jié)點(diǎn)從隊(duì)列中移除了,然后p!=h,調(diào)用updateHead方法,假如執(zhí)行updateHead前另外一個(gè)線程B開始poll這時(shí)候它p指向?yàn)樵瓉淼膆ead節(jié)點(diǎn),然后當(dāng)前線程A執(zhí)行updateHead這時(shí)候B線程鏈表狀態(tài)為:

所以會執(zhí)行(4)重新跳到外層循環(huán),獲取當(dāng)前head,現(xiàn)在狀態(tài)為:

六、peek操作

peek操作是獲取鏈表頭部一個(gè)元素(只讀取不移除),下面看看實(shí)現(xiàn)原理。
代碼與poll類似,只是少了castItem.并且peek操作會改變head指向,offer后head指向哨兵節(jié)點(diǎn),第一次peek后head會指向第一個(gè)真的節(jié)點(diǎn)元素。

12345678910111213141516public E peek() {????restartFromHead:????for (;;) {????????for (Node<E> h = head, p = h, q;;) {????????????E item = p.item;????????????if (item != null || (q = p.next) == null) {????????????????updateHead(h, p);????????????????return item;????????????}????????????else if (p == q)????????????????continue restartFromHead;????????????else????????????????p = q;????????}????}}

七、size操作

獲取當(dāng)前隊(duì)列元素個(gè)數(shù),在并發(fā)環(huán)境下不是很有用,因?yàn)槭褂肅AS沒有加鎖所以從調(diào)用size函數(shù)到返回結(jié)果期間有可能增刪元素,導(dǎo)致統(tǒng)計(jì)的元素個(gè)數(shù)不精確。

123456789101112131415161718192021222324252627282930313233public int size() {????int count = 0;????for (Node<E> p = first(); p != null; p = succ(p))????????if (p.item != null)????????????// 最大返回Integer.MAX_VALUE????????????if (++count == Integer.MAX_VALUE)????????????????break;????return count;}//獲取第一個(gè)隊(duì)列元素(哨兵元素不算),沒有則為nullNode<E> first() {????restartFromHead:????for (;;) {????????for (Node<E> h = head, p = h, q;;) {????????????boolean hasItem = (p.item != null);????????????if (hasItem || (q = p.next) == null) {????????????????updateHead(h, p);????????????????return hasItem ? p : null;????????????}????????????else if (p == q)????????????????continue restartFromHead;????????????else????????????????p = q;????????}????}}//獲取當(dāng)前節(jié)點(diǎn)的next元素,如果是自引入節(jié)點(diǎn)則返回真正頭節(jié)點(diǎn)final Node<E> succ(Node<E> p) {????Node<E> next = p.next;????return (p == next) ? head : next;}

八、remove操作

如果隊(duì)列里面存在該元素則刪除給元素,如果存在多個(gè)則刪除第一個(gè),并返回true,否者返回false

12345678910111213141516171819202122232425public boolean remove(Object o) {????//查找元素為空,直接返回false????if (o == null) return false;????Node<E> pred = null;????for (Node<E> p = first(); p != null; p = succ(p)) {????????E item = p.item;????????//相等則使用cas值null,同時(shí)一個(gè)線程成功,失敗的線程循環(huán)查找隊(duì)列中其他元素是否有匹配的。????????if (item != null &&????????????o.equals(item) &&????????????p.casItem(item, null)) {????????????//獲取next元素????????????Node<E> next = succ(p);????????????//如果有前驅(qū)節(jié)點(diǎn),并且next不為空則鏈接前驅(qū)節(jié)點(diǎn)到next,????????????if (pred != null && next != null)????????????????pred.casNext(p, next);????????????return true;????????}????????pred = p;????}????return false;}

九、contains操作

判斷隊(duì)列里面是否含有指定對象,由于是遍歷整個(gè)隊(duì)列,所以類似size 不是那么精確,有可能調(diào)用該方法時(shí)候元素還在隊(duì)列里面,但是遍歷過程中才把該元素刪除了,那么就會返回false.

123456789public boolean contains(Object o) {????if (o == null) return false;????for (Node<E> p = first(); p != null; p = succ(p)) {????????E item = p.item;????????if (item != null && o.equals(item))????????????return true;????}????return false;}

十、開源框架中使用

Tomcat中NioEndPoint中的每個(gè)poller里面就維護(hù)一個(gè)ConcurrentLinkedQueue<Runnable>用來作為緩沖存放任務(wù)。

10.1 Acceptor線程

accept線程作用是接受客戶端發(fā)來的連接請求并放入到事件隊(duì)列。

看下代碼:

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071protected class Acceptor extends AbstractEndpoint.Acceptor {????????@Override????????public void run() {????????????int errorDelay = 0;????????????// 一直循環(huán)直到接收到shutdown命令????????????while (running) {????????????????...????????????????if (!running) {????????????????????break;????????????????}????????????????state = AcceptorState.RUNNING;????????????????try {????????????????????//如果達(dá)到max connections個(gè)請求則等待????????????????????countUpOrAwaitConnection();????????????????????SocketChannel socket = null;????????????????????try {????????????????????????// 從TCP緩存獲取一個(gè)完成三次握手的套接字,沒有則阻塞????????????????????????// socket????????????????????????socket = serverSock.accept();????????????????????} catch (IOException ioe) {????????????????????????...????????????????????}????????????????????// Successful accept, reset the error delay????????????????????errorDelay = 0;???????????????????if (running && !paused) {????????????????????????if (!setSocketOptions(socket)) {????????????????????????????countDownConnection();????????????????????????????closeSocket(socket);????????????????????????}????????????????????} else {????????????????????????countDownConnection();????????????????????????closeSocket(socket);????????????????????}???????????????????....????????????????} catch (SocketTimeoutException sx) {????????????????????// Ignore: Normal condition????????????????....????????????}????????????state = AcceptorState.ENDED;????????}????}?protected boolean setSocketOptions(SocketChannel socket) {????????// Process the connection????????try {????????????//disable blocking, APR style, we are gonna be polling it???????????...????????????getPoller0().register(channel);????????} catch (Throwable t) {???????????...????????????return false;????????}????????return true;}public void register(final NioChannel socket) {???...????addEvent(r);}public void addEvent(Runnable event) {????events.offer(event);????...}

10.2 Poll線程

poll線程作用是從事件隊(duì)列里面獲取事件把鏈接套接字加入selector,并且監(jiān)聽socket事件進(jìn)行處理。

12345678910111213141516171819202122232425262728293031323334353637383940414243public void run() {????while (true) {????????try {????????????...????????????if (close) {???????????????...????????????} else {????????????????hasEvents = events();????????????}????????????try {????????????????...????????????} catch ( NullPointerException x ) {...????????????}????????????Iterator<SelectionKey> iterator =????????????????keyCount > 0 ? selector.selectedKeys().iterator() : null;????????????// 遍歷所有注冊的channel對感興趣的事件處理????????????while (iterator != null && iterator.hasNext()) {????????????????SelectionKey sk = iterator.next();????????????????KeyAttachment attachment = (KeyAttachment)sk.attachment();????????????????if (attachment == null) {????????????????????iterator.remove();????????????????} else {????????????????????attachment.access();????????????????????iterator.remove();????????????????????processKey(sk, attachment);????????????????}????????????}//while????????????//process timeouts????????????timeout(keyCount,hasEvents);????????????if ( oomParachute > 0 && oomParachuteData == null ) checkParachute();????????} catch (OutOfMemoryError oom) {????????????...????????}????}//while????synchronized (this) {????????this.notifyAll();????}????stopLatch.countDown();}
123456789101112131415161718192021222324252627282930313233343536373839public boolean events() {????????????boolean result = false;????????????//從隊(duì)列獲取任務(wù)并執(zhí)行????????????Runnable r = null;????????????while ( (r = events.poll()) != null ) {????????????????result = true;????????????????try {????????????????????r.run();????????????????????if ( r instanceof PollerEvent ) {????????????????????????((PollerEvent)r).reset();????????????????????????eventCache.offer((PollerEvent)r);????????????????????}????????????????} catch ( Throwable x ) {????????????????????log.error("",x);????????????????}????????????}????????????return result;????????}//如配置線程池則請求交給線程池處理。public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {????try {????????KeyAttachment attachment = (KeyAttachment)socket.getAttachment();????????if (attachment == null) {????????????return false;????????}????????attachment.setCometNotify(false); //will get reset upon next reg????????SocketProcessor sc = processorCache.poll();????????if ( sc == null ) sc = new SocketProcessor(socket,status);????????else sc.reset(socket,status);????????if ( dispatch && getExecutor()!=null ) getExecutor().execute(sc);????????else sc.run();????} catch (RejectedExecutionException rx) {???????...????}????return true;}

十一、有意思的問題

10.1 一個(gè)判斷的執(zhí)行結(jié)果分析

offer中有個(gè) 判斷 t != (t = tail)假如 t=node1;tail=node2;并且node1!=node2那么這個(gè)判斷是true還是false那,答案是true,這個(gè)判斷是看當(dāng)前t是不是和tail相等,相等則返回true否者為false,但是無論結(jié)果是啥執(zhí)行后t的值都是tail。

下面從字節(jié)碼來分析下為啥?

  • 一個(gè)例子
123456789public static void main(String[] args)? {????int t = 2;????int tail = 3;????System.out.println(t != (t = tail));}

結(jié)果為:true;

  • 字節(jié)碼文件:

字節(jié)碼命令介紹可參考: http://blog.csdn.net/web_code/article/details/12164733

一開始棧為空

  • 第0行指令作用是把值2入棧棧頂元素為2

  • 第1行指令作用是將棧頂int類型值保存到局部變量t中。

  • 第2行指令作用是把值3入棧棧頂元素為3

  • 第3行指令作用是將棧頂int類型值保存到局部變量tail中。

  • 第4調(diào)用打印命令
  • 第7行指令作用是把變量t中的值入棧

  • 第8行指令作用是把變量tail中的值入棧

  • 現(xiàn)在棧里面元素為3,2并且3位棧頂
  • 第9行指令作用是當(dāng)前棧頂元素入棧,所以現(xiàn)在棧內(nèi)容3,3,2

  • 第10行指令作用是把棧頂元素存放到t,現(xiàn)在棧內(nèi)容3,2

  • 第11行指令作用是判斷棧頂兩個(gè)元素值,相等則跳轉(zhuǎn) 18。由于現(xiàn)在棧頂嚴(yán)肅為3,2不相等所以返回true.
  • 第14行指令作用是把1入棧。

然后回頭分析下!=是雙目運(yùn)算符,應(yīng)該是首先把左邊的操作數(shù)入棧,然后在去計(jì)算了右側(cè)操作數(shù)。

10.2 Node的構(gòu)造函數(shù)

另外對于每個(gè)節(jié)點(diǎn)Node在構(gòu)造時(shí)候使用UnSafe.putObject設(shè)置item替代了直接對volatile的賦值,這個(gè)是為了性能考慮?為啥不直接賦值那,看看類注解怎么說:

123Node(E item) {????UNSAFE.putObject(this, itemOffset, item);}

When constructing a Node (before enqueuing it) we avoid paying for a volatile write to item by using Unsafe.putObject instead of a normal write. This allows the cost of enqueue to be”one-and-a-half”
CASes.

也就是說當(dāng)構(gòu)造Node節(jié)點(diǎn)時(shí)候(這時(shí)候節(jié)點(diǎn)還沒有放入隊(duì)列鏈表)為了避免正常的寫volatile變量的開銷 使用了Unsafe.putObject代替。這使元素進(jìn)隊(duì)列僅僅花費(fèi)1.5個(gè)cas操作的耗時(shí)。這個(gè)是說使用Unsafe.putObject比直接給volatile變量賦值更高效?目前還沒有查到相關(guān)資料。

十二、總結(jié)

ConcurrentLinkedQueue使用CAS非阻塞算法實(shí)現(xiàn)使用CAS解決了當(dāng)前節(jié)點(diǎn)與next節(jié)點(diǎn)之間的安全鏈接和對當(dāng)前節(jié)點(diǎn)值的賦值。由于使用CAS沒有使用鎖,所以獲取size的時(shí)候有可能進(jìn)行offer,poll或者remove操作,導(dǎo)致獲取的元素個(gè)數(shù)不精確,所以在并發(fā)情況下size函數(shù)不是很有用。另外第一次peek或者first時(shí)候會把head指向第一個(gè)真正的隊(duì)列元素。

下面總結(jié)下如何實(shí)現(xiàn)線程安全的,可知入隊(duì)出隊(duì)函數(shù)都是操作volatile變量:head,tail。所以要保證隊(duì)列線程安全只需要保證對這兩個(gè)Node操作的可見性和原子性,由于volatile本身保證可見性,所以只需要看下多線程下如果保證對著兩個(gè)變量操作的原子性(CAS)

對于offer操作是在tail后面添加元素,也就是調(diào)用tail.casNext方法,而這個(gè)方法是使用的CAS操作,只有一個(gè)線程會成功,然后失敗的線程會循環(huán)一下,重新獲取tail,然后執(zhí)行casNext方法。對于poll也是這樣的。


轉(zhuǎn)載于:https://www.cnblogs.com/silyvin/p/9106630.html

總結(jié)

以上是生活随笔為你收集整理的并发队列-无界非阻塞队列 ConcurrentLinkedQueue 原理探究的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。