日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

[源码]解析 SynchronousQueue 上界,下界.. 数据保存和数据传递. 堵塞队列. 有无频繁await?...

發(fā)布時(shí)間:2025/3/14 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 [源码]解析 SynchronousQueue 上界,下界.. 数据保存和数据传递. 堵塞队列. 有无频繁await?... 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
簡(jiǎn)析SynchronousQueue。LinkedBlockingQueue(兩個(gè)locker,更快),ArrayBlockingQueue(一個(gè)locker,讀寫都競(jìng)爭(zhēng)) 三者都是blockingQueue. 對(duì)于blockingQueue的堵塞和非堵塞方法對(duì)注記方案: * oppo(oppo手機(jī))是一對(duì),offer和poll不堵塞 * ppt是一對(duì).put和take都堵塞. 解析源碼之前先實(shí)戰(zhàn)看下SynchronousQueue.
public?static?void?main(String[]?args)?throws?InterruptedException?{ ??????????final?SynchronousQueue<Long>?workQueue?=?new?SynchronousQueue<Long>(); ????????boolean?offer?=?workQueue.offer(2L);?//?offer不能放入,前面無(wú)堵塞線程,本身也不堵塞不會(huì)放入到堵塞線程隊(duì)列中 ????????System.out.println("main?thread:?offer="?+?offer); ????????Long?poll?=?workQueue.poll();?// 不能放poll出元素為null,前面無(wú)堵塞線程,本身也不堵塞不會(huì)放入到堵塞線程隊(duì)列中 ????????System.out.println("main?thread:?poll="?+?poll); ? ? ? ? ExecutorService?newCachedThreadPool?=?Executors.newFixedThreadPool(4);?// 內(nèi)部源代碼實(shí)現(xiàn)是:new?ThreadPoolExecutor(0,Integer.MAX_VALUE,?60L,?TimeUnit.SECONDS,new?SynchronousQueue<Runnable>());? ? ? ? ????????System.out.println("/**=====================以下是隊(duì)列是?take類型========*/"); ????????newCachedThreadPool.execute(new?Runnable()?{ ????????????@Override ????????????public?void?run()?{ ????????????????try?{ ????????????????????System.out ????????????????????????????.println("take?thread?1:?begin?take?and?thread?will?be?blocked?by?call??park(await)?method"); ????????????????????Long?take?=?workQueue.take(); ????????????????????System.out ????????????????????????????.println("take?thread?1:???take?suceffull?take?object=" ????????????????????????????????????+?take); ????????????????}?catch?(InterruptedException?e1)?{ ????????????????????//?TODO?Auto-generated?catch?block ????????????????????e1.printStackTrace(); ????????????????} ????????????} ????????}); ????????newCachedThreadPool.execute(new?Runnable()?{ ????????????@Override ????????????public?void?run()?{ ????????????????try?{ ????????????????????System.out ????????????????????????????.println("take?thread?2:?begin?take?and?thread?will?be?blocked?by?call??park(await)?method"); ????????????????????Long?take?=?workQueue.take(); ????????????????????System.out ????????????????????????????.println("take?thread?2:???take?suceffull?take?object=" ????????????????????????????????????+?take); ????????????????}?catch?(InterruptedException?e1)?{ ????????????????????//?TODO?Auto-generated?catch?block ????????????????????e1.printStackTrace(); ????????????????} ????????????} ????????}); ????????Thread.sleep(1000); ?????????poll?=?workQueue.poll(); ????????System.out.println("main?thread:?隊(duì)列是take類型.?同類型操作分兩種:1.堵塞take會(huì)堵塞,增加隊(duì)列中?2.非堵塞操作poll失敗.?Queue接口poll失敗,??poll="?+?poll); ?????????offer?=?workQueue.offer(2L);??//? ????????System.out.println("main?thread:?隊(duì)列是take類型.?非同類型無(wú)論堵塞不堵塞都成功.?非同類型非堵塞操作Queue接口?offer?成功??"?+?offer); ????????Thread.sleep(2000); ????????long?object?=?123L; ????????System.out.println("put?thead:?begin?put?"?+?object); ????????try?{ ????????????workQueue.put(object); ????????}?catch?(InterruptedException?e)?{ ????????????e.printStackTrace(); ????????} ????????System.out ????????????????.println("put?thead:?隊(duì)列是take類型.?非同類型無(wú)論堵塞不堵塞都成功.??blockingQueue?堵塞接口?put調(diào)用未堵塞,調(diào)用成功?SynchronousQueue?will?unpark(notify)?the?take?thread?"); ???? ????????System.out.println("/**=====================以下是?Put類型========*/"); ???? ????????System.out.println("/*先堵塞兩個(gè)put*/"); ????????newCachedThreadPool.execute(new?Runnable()?{ ????????????@Override ????????????public?void?run()?{ ????????????????long?object?=?123L; ????????????????System.out.println("put?thead?1:?begin?put?"?+?object); ????????????????try?{ ????????????????????workQueue.put(object); ????????????????}?catch?(InterruptedException?e)?{ ????????????????????e.printStackTrace(); ????????????????} ????????????????System.out ????????????????????????.println("put?thead?1:?finish?put?sucefully?,?SynchronousQueue?will?unpark(notify)?the?take?thread?"); ????????????} ????????}); ????????newCachedThreadPool.execute(new?Runnable()?{ ????????????@Override ????????????public?void?run()?{ ????????????????long?object?=?123L; ????????????????System.out.println("put?thead?2:?begin?put?"?+?object); ????????????????try?{ ????????????????????workQueue.put(object); ????????????????}?catch?(InterruptedException?e)?{ ????????????????????e.printStackTrace(); ????????????????} ????????????????System.out ????????????????????????.println("put?thead?2:?finish?put?sucefully?,?SynchronousQueue?will?unpark(notify)?the?take?thread?"); ????????????} ????????}); ????????Thread.sleep(2000); ????????System.out.println("/*試圖offer*/"); ?????????offer?=?workQueue.offer(2L);??//? ????????????System.out.println("main?thread:?隊(duì)列是put類型.??同類型操作分兩種:1.堵塞put會(huì)堵塞,增加隊(duì)列中?2.非堵塞操作offer失敗.??非堵塞操作Queue接口?offer?成功??"?+?offer); ???????????? ??????????????poll?=?workQueue.poll(); ????????????System.out.println("main?thread:?隊(duì)列是put類型.?非同類型無(wú)論堵塞不堵塞都成功.??poll="?+?poll); ???????????? ????Long?take?=?workQueue.take(); ????System.out ????????????.println("main?thread:?隊(duì)列是put類型.?非同類型無(wú)論堵塞不堵塞都成功.?BlockingQueue堵塞接口take調(diào)用未堵塞,?take?suceffull?take?object=" ????????????????????+?take); ????????newCachedThreadPool.shutdown(); ?????}? ? ?? 輸出: main?thread:?offer=false main?thread:?poll=null /**=====================以下是隊(duì)列是?take類型========*/ take?thread?1:?begin?take?and?thread?will?be?blocked?by?call??park(await)?method take?thread?2:?begin?take?and?thread?will?be?blocked?by?call??park(await)?method main?thread:?隊(duì)列是take類型.?同類型操作分兩種:1.堵塞take會(huì)堵塞,增加隊(duì)列中?2.非堵塞操作poll失敗.?Queue接口poll失敗,??poll=null main?thread:?隊(duì)列是take類型.?非同類型無(wú)論堵塞不堵塞都成功.?非同類型非堵塞操作Queue接口?offer?成功?

?true

take?thread?2:???take?suceffull?take?object=2 put?thead:?begin?put?123 take?thread?1:???take?suceffull?take?object=123 put?thead:?隊(duì)列是take類型.?非同類型無(wú)論堵塞不堵塞都成功.??blockingQueue?堵塞接口?put調(diào)用未堵塞,調(diào)用成功?SynchronousQueue?will?unpark(notify)?the?take?thread? /**=====================以下是?Put類型========*/ /*先堵塞兩個(gè)put*/ put?thead?1:?begin?put?123 put?thead?2:?begin?put?123 /*試圖offer*/ main?thread:?隊(duì)列是put類型.??同類型操作分兩種:1.堵塞put會(huì)堵塞,增加隊(duì)列中?2.非堵塞操作offer失敗.??非堵塞操作Queue接口?offer?成功??false main?thread:?隊(duì)列是put類型.?非同類型無(wú)論堵塞不堵塞都成功.??poll=123 main?thread:?隊(duì)列是put類型.?非同類型無(wú)論堵塞不堵塞都成功.?BlockingQueue堵塞接口take調(diào)用未堵塞,?take?suceffull?take?object=123 put?thead?2:?finish?put?sucefully?,?SynchronousQueue?will?unpark(notify)?the?take?thread? put?thead?1:?finish?put?sucefully?,?SynchronousQueue?will?unpark(notify)?the?take?thread?

依據(jù)上面的代碼能夠簡(jiǎn)單總結(jié)SynchronousQueue 的queue特點(diǎn): 1. queue有三種類型:?空類型,take以及put類型, 分別說(shuō)明. 1.1. 空類型時(shí), take和put都會(huì)被堵塞,?非堵塞offer和poll都不會(huì)堵塞. 無(wú)法成功操作. 1.2.?隊(duì)列是某種類型時(shí), ? ? ??同類型操作:?分兩種:?1.堵塞method會(huì)堵塞,增加堵塞隊(duì)列中?2.非堵塞method操作失敗 . ? ? ?非同類型:?無(wú)論堵塞不堵塞method都成功. 詳細(xì)說(shuō)來(lái): 1.2.1. 隊(duì)列是take類型時(shí),?.? 同類型操作:堵塞take操作會(huì)堵塞,非堵塞 poll()失敗,? 非同類型:?異類的put和offer可以成功. 1.2.2.?隊(duì)列是put類型.,? ? ??? ??? ??同類型操作:堵塞put操作會(huì)被堵塞,非堵塞offer操作失敗.? ? ??? ??? ??非同類型:?異類的take,poll會(huì)成功2. 關(guān)于隊(duì)列長(zhǎng)度,界: 一個(gè)線程堵塞隊(duì)列要么全是take線程,要么全是put線程.?后來(lái)的互補(bǔ)的操作將會(huì)匹配對(duì)頭的線程.使退出隊(duì)列. 1. 里面沒(méi)有不論什么數(shù)據(jù).調(diào)用offer()無(wú)法成功,返回flase,表示填充失敗.不會(huì)被堵塞. 2. 先take,被堵塞,直到有一個(gè)線程來(lái)offer. 兩個(gè)不同的互補(bǔ)碰撞發(fā)生匹配完畢(fullfill). 之前的take的線程被喚醒獲得offer的提供的數(shù)據(jù). 再來(lái)解析SynchronousQueue 源碼, SynchronousQueue?源碼凝視中關(guān)鍵術(shù)語(yǔ)解析: ?Node類型:?一共分層兩種. 一種是 isDate=true. (相應(yīng)offer , put 函數(shù)) 一種是isDate=false (相應(yīng) take函數(shù)) dual queue:dual的含義就好理解了,由于僅僅有兩類,能夠當(dāng)isDate=true和isDate=false遇到時(shí)會(huì)匹配.可翻譯為成雙的,對(duì)偶的. 對(duì)偶隊(duì)列. same mode:?同樣的模式(isDate都=true,或者isDate都=false).比方take產(chǎn)生的Node和前面已經(jīng)放入到隊(duì)列中的take動(dòng)作Node就屬于同一個(gè)模式 complementary :互補(bǔ)的.比方先take,放到隊(duì)列中.后面來(lái)一個(gè)offer動(dòng)作就是complementary (互補(bǔ)).反之亦然. fulfill:?字面英文翻譯,完畢.詳細(xì)到算法里的含義是一個(gè)動(dòng)作和之前的complementary(譯為互補(bǔ))的動(dòng)作得到匹配. ============= SynchronousQueue以下的一個(gè)部分凝視部分翻譯. /* ??*?A?dual?queue?(and?similarly?stack)?is?one?that?at?any?given ?????*?time?either?holds?"data"?--?items?provided?by?put?operations, ?????*?or?"requests"?--?slots?representing?take?operations,?or?is ?????*?empty.?A?call?to?"fulfill"?(i.e.,?a?call?requesting?an?item ?????*?from?a?queue?holding?data?or?vice?versa)?dequeues?a ?????*?complementary?node.??The?most?interesting?feature?of?these ?????*?queues?is?that?any?operation?can?figure?out?which?mode?the ?????*?queue?is?in,?and?act?accordingly?without?needing?locks. 不論什么一個(gè)操作 put/take都會(huì)產(chǎn)生一個(gè)節(jié)點(diǎn),抓住數(shù)據(jù)(假設(shè)是take,數(shù)據(jù)為null). 一個(gè)實(shí)現(xiàn)"匹配"的調(diào)用將會(huì)將互補(bǔ)的節(jié)點(diǎn)退出隊(duì)列. 最有趣的是不論什么一個(gè)操作都能指出眼下的隊(duì)列處于何種類型(注:隊(duì)列里要么全是take線程,要么全是put線程).而且不須要鎖. ? ? ? ?? ??//?以下的凝視比較了java實(shí)現(xiàn)的算法和借鑒的算法(見(jiàn)凝視中網(wǎng)址)有何差別 ? ? ??*?The?algorithms?here?differ?from?the?versions?in?the?above?paper ?????*?in?extending?them?for?use?in?synchronous?queues,?as?well?as ?????*?dealing?with?cancellation.?The?main?differences?include:
?????*??1.?The?original?algorithms?used?bit-marked?pointers,?but ?????*?????the?ones?here?use?mode?bits?in?nodes,?leading?to?a?number ?????*?????of?further?adaptations. ?????*??2.?SynchronousQueues?must?block?threads?waiting?to?become ?????*?????fulfilled. 必須等待匹配的線程.?fulfilled-完畢的意思. ?????*??3.?Support?for?cancellation?via?timeout?and?interrupts, ?????*?????including?cleaning?out?cancelled?nodes/threads ?????*?????from?lists?to?avoid?garbage?retention?and?memory?depletion. //可以取消或者中斷 ? ? ?* =============再來(lái)看看SynchronousQueue.TransferQueue.transfer以下的凝視.========= /** 當(dāng)隊(duì)列是空,或者是同一種Mode時(shí),直接放入到列隊(duì)尾.不會(huì)完畢(fullfill) * 2. If queue apparently contains waiting items, and this 我一開始沒(méi)理解的一點(diǎn)是: 當(dāng)一個(gè)head是 isDate=false , tail是isDate=true時(shí), 一個(gè)線程進(jìn)來(lái)的操作是isDate=false時(shí). 不會(huì)進(jìn)入①,進(jìn)入②后看代碼又無(wú)法和head完畢匹配(fullfill). 后來(lái)想明確了,這樣的情況不會(huì)發(fā)生.由于tail是isDate=true,這個(gè)會(huì)與head完畢匹配(fullfill).換句話說(shuō). 隊(duì)列里tail和head肯定是same mode.所以當(dāng)①推斷失敗,進(jìn)入②后, /** ?????????*?Puts?or?takes?an?item. ?????????*/ ????????Object?transfer(Object?e,?boolean?timed,?long?nanos)?{ ????????????/*?Basic?algorithm?is?to?loop?trying?to?take?either?of ?????????????*?two?actions: ?????????????* ?????????????*?1.?If?queue?apparently?empty?or?holding?same-mode?nodes, ?????????????*????try?to?add?node?to?queue?of?waiters,?wait?to?be ?????????????*????fulfilled?(or?cancelled)?and?return?matching?item. ?????????????* ??//?same-mode指的是take還是put.. 假設(shè)空或者是同類型,那么就放入隊(duì)列堵塞等待 ?????????????*?2.?If?queue?apparently?contains?waiting?items,?and?this ?????????????*????call?is?of?complementary?mode,?try?to?fulfill?by?CAS'ing ?????????????*????item?field?of?waiting?node?and?dequeuing?it,?and?then ?????????????*????returning?matching?item. ?????????????* ??// 假設(shè)是不同,剛好是互補(bǔ)(complementary)節(jié)點(diǎn),那么剛好可以匹配(fulfill?) ?????????????*?In?each?case,?along?the?way,?check?for?and?try?to?help ?????????????*?advance?head?and?tail?on?behalf?of?other?stalled/slow ?????????????*?threads. ?????????????* ?????????????*?The?loop?starts?off?with?a?null?check?guarding?against ?????????????*?seeing?uninitialized?head?or?tail?values.?This?never ?????????????*?happens?in?current?SynchronousQueue,?but?could?if ?????????????*?callers?held?non-volatile/final?ref?to?the ?????????????*?transferer.?The?check?is?here?anyway?because?it?places ?????????????*?null?checks?at?top?of?loop,?which?is?usually?faster ?????????????*?than?having?them?implicitly?interspersed. ?????????????*/ ????????????QNode?s?=?null;?//?constructed/reused?as?needed ????????????boolean?isData?=?(e?!=?null); ????????????for?(;;)?{ ????????????????QNode?t?=?tail; ????????????????QNode?h?=?head; ????????????????if?(t?==?null?||?h?==?null)?????????//?saw?uninitialized?value ????????????????????continue;???????????????????????//?spin ????????????????if?(h?==?t?||?t.isData?==?isData)?{?//?empty?or?same-mode?隊(duì)列里假設(shè)都是take線程.此線程還是take調(diào)用,進(jìn)入該分支 ????????????????????QNode?tn?=?t.next; ????????????????????if?(t?!=?tail)??????????????????//?inconsistent?read ????????????????????????continue; ????????????????????if?(tn?!=?null)?{???????????????//?lagging?tail ????????????????????????advanceTail(t,?tn); ????????????????????????continue; ????????????????????} ????????????????????if?(timed?&&?nanos?<=?0)????????//?can't?wait ????????????????????????return?null; ????????????????????if?(s?==?null) ????????????????????????s?=?new?QNode(e,?isData); ????????????????????if?(!t.casNext(null,?s))????????//?failed?to?link?in ????????????????????????continue; ????????????????????advanceTail(t,?s);??????????????//?swing?tail?and?wait ????????????????????Object?x?=?awaitFulfill(s,?e,?timed,?nanos); ????????????????????if?(x?==?s)?{???????????????????//?wait?was?cancelled ????????????????????????clean(t,?s); ????????????????????????return?null; ????????????????????} ????????????????????if?(!s.isOffList())?{???????????//?not?already?unlinked ????????????????????????advanceHead(t,?s);??????????//?unlink?if?head ????????????????????????if?(x?!=?null)??????????????//?and?forget?fields ????????????????????????????s.item?=?s; ????????????????????????s.waiter?=?null; ????????????????????} ????????????????????return?(x?!=?null)?

?x?:?e;

????????????????}?else?{????????????????????????????//?complementary-mode ????????????????????QNode?m?=?h.next;???????????????//?node?to?fulfill//進(jìn)入該分支,由于整個(gè)隊(duì)列的節(jié)點(diǎn)類型都一樣,肯定能和head完畢匹配(fulfill) ????????????????????if?(t?!=?tail?||?m?==?null?||?h?!=?head) ????????????????????????continue;???????????????????//?inconsistent?read ????????????????????Object?x?=?m.item; ????????????????????if?(isData?==?(x?!=?null)?||????//?m?already?fulfilled ????????????????????????x?==?m?||???????????????????//?m?cancelled ????????????????????????!m.casItem(x,?e))?{?????????//?lost?CAS??//這一步非常非常重要,成功把被匹配線程的數(shù)據(jù)節(jié)點(diǎn)改成了自己的節(jié)點(diǎn),實(shí)現(xiàn)了傳輸數(shù)據(jù) ????????????????????????advanceHead(h,?m);??????????//?dequeue?and?retry? ????????????????????????continue; ????????????????????} ????????????????????advanceHead(h,?m);??????????????//?successfully?fulfilled?// 完畢匹配,將被匹配的線程退出原隊(duì)列 ????????????????????LockSupport.unpark(m.waiter); ??// 喚醒被匹配的線程 ????????????????????return?(x?!=?null)?

?x?:?e;

????????????????} ????????????} ????????}

0.堵塞有幾種?

?1. lock獲取不到鎖堵塞. 2. 獲取到鎖可是await堵塞.然后又釋放鎖(見(jiàn) <[源代碼]Condition的原理,簡(jiǎn)單案例(ArrayBlockingQueue),復(fù)雜案例(LinkedBlockingQueue).>) 1.?LinkedBlockingQueue 和?SynchronousQueue是否有界,上界,下界范圍是多少?? 2.?LinkedBlockingQueue?和?SynchronousQueue 的有幾種堵塞線程? 前者一個(gè)線程堵塞隊(duì)列(封裝在reentrantLock內(nèi),使用者不知),一個(gè)條件隊(duì)列(封裝在ConditionObject內(nèi),使用者不知). 條件隊(duì)列signal后把線程會(huì)放到堵塞隊(duì)列里,見(jiàn)wiz<Condition的原理,簡(jiǎn)單案例(ArrayBlockingQueue),復(fù)雜案例(LinkedBlockingQueue).> . 后者僅僅有一個(gè)線程堵塞隊(duì)列. 其各自的堵塞隊(duì)列數(shù)據(jù)結(jié)構(gòu)有何不同?前者無(wú)數(shù)據(jù),后者有數(shù)據(jù).? 2.?LinkedBlockingQueue?和?SynchronousQueue怎樣在線程之間傳遞數(shù)據(jù)?

前者獲取鎖后放置到數(shù)據(jù)隊(duì)列中,然后unpark鎖.后者將數(shù)據(jù)傳遞給線程節(jié)點(diǎn)上的數(shù)據(jù)引用.使讀線程解鎖后能讀取到.盡管SynchronousQueue沒(méi)有了數(shù)據(jù)隊(duì)列,用每一個(gè)線程持有一個(gè)數(shù)據(jù)替代了.

3.?LinkedBlockingQueue.先offer,再poll 和?SynchronousQueue?先offer,再poll有何差別??? 4.?LinkedBlockingQueue 先put,再take?和?SynchronousQueue?先put,再take有何差別? 前者能夠先放在數(shù)據(jù)隊(duì)列上.后者沒(méi)有地方來(lái)接受他的數(shù)據(jù),必須等待到有一個(gè)take線程產(chǎn)生的數(shù)據(jù)節(jié)點(diǎn)來(lái)接受數(shù)據(jù).

LinkedBlockingQueue,ArrayBlockingQueue: 有數(shù)據(jù)隊(duì)列和堵塞線程隊(duì)列 1. 數(shù)據(jù)隊(duì)列有最大長(zhǎng)度,有界,默認(rèn)是Integer.Max;? 2. 數(shù)據(jù)隊(duì)列達(dá)到上界,下界時(shí),對(duì)對(duì)應(yīng)的堵塞方法有影響. LinkedBlockingQueue 兩個(gè)locker,更復(fù)雜.?ArrayBlockingQueue一個(gè)locker,兩個(gè)Condition. SynchronousQueue : 最重要的差別:? 外在: 一個(gè)有容量>=1,依賴緩沖區(qū)線程之間交換數(shù)據(jù). 一個(gè)無(wú)容量須要線程時(shí)時(shí)產(chǎn)生數(shù)據(jù)節(jié)點(diǎn)來(lái)接受傳遞數(shù)據(jù). 內(nèi)部:數(shù)據(jù)讀取匹配的方式不同:?BlockingQueue與已存放在隊(duì)列上的數(shù)據(jù)配對(duì),?SynchronousQueue是與已堵塞的線程配對(duì)(一個(gè)線程id相應(yīng)著一個(gè)數(shù)據(jù),這是?SynchronousQueue特有的特點(diǎn)) 對(duì)于LinkedBlockingQueue。ArrayBlockingQueue,有數(shù)據(jù)隊(duì)列,也有線程堵塞隊(duì)列?.??數(shù)據(jù)配對(duì)即可 ? ??? ? 對(duì)于SynchronousQueue?,無(wú)數(shù)據(jù)隊(duì)列,僅僅有線程堵塞隊(duì)列/stack.?與已堵塞的線程配對(duì)即可. 要理解上面這句話.這幾個(gè)問(wèn)題思考下. 1.?LinkedBlockingQueue 和?SynchronousQueue是否有界? ?前者有上界,下界>=1. 后者上界,下界都是0.(無(wú)緩沖層就無(wú)法傳遞數(shù)據(jù),將數(shù)據(jù)巧妙地保存在了由線程調(diào)用時(shí)產(chǎn)生的節(jié)點(diǎn)上(和線程同生共死).) 2.?LinkedBlockingQueue?和?SynchronousQueue 的有幾種堵塞線程? 其各自的隊(duì)列數(shù)據(jù)結(jié)構(gòu)有何不同? ? ? 總結(jié): 前者堵塞隊(duì)列上無(wú)數(shù)據(jù),后者堵塞隊(duì)列上有數(shù)據(jù)和操作類型.兩者在堵塞時(shí)都利用堆棧的局部變量來(lái)臨時(shí)保存數(shù)據(jù)和傳遞數(shù)據(jù). 前者的利用排它鎖,堵塞數(shù)據(jù)隊(duì)列上無(wú)數(shù)據(jù), AbstractQueueSyncronizer( 可能是公平也可能是非公平鎖,插入瞬間非公平 ), 堵塞時(shí)將數(shù)據(jù)保存在內(nèi)存堆棧局部變量上,每次獲得鎖后將數(shù)據(jù)傳遞給數(shù)據(jù)隊(duì)列,? ? ? ?后者獲得匹配后,綜合匹配的線程數(shù)據(jù),返回非null數(shù)據(jù). 并改動(dòng)匹配線程的數(shù)據(jù)且喚醒被匹配的堵塞線程.被匹配的堵塞線程依據(jù)其堵塞隊(duì)列上的新數(shù)據(jù)和原線程內(nèi)存堆棧上的局部變量數(shù)據(jù)(重點(diǎn),難點(diǎn))綜合返回非null數(shù)據(jù).?詳細(xì)見(jiàn)源碼凝視.
第二個(gè)問(wèn)題引出的另外一個(gè)話題是因?yàn)?strong>SynchronousQueue?已經(jīng)沒(méi)有了數(shù)據(jù)隊(duì)列緩沖區(qū),導(dǎo)致SynchronousQueue 中繼承Queue的put方法的語(yǔ)義都量變到質(zhì)變了. ? ? ? BlockingQueue 繼承自Queue的put ? ? ? ? javaDoc : ??Inserts the specified element into this queue, waiting if necessary forspaceto become available.
? ? ? SynchronousQueue??繼承自Queue的put javaDoc : ??Adds the specified element to this queue, waiting if necessary foranother thread to receive it. 數(shù)據(jù)存放不同: 因?yàn)?span style="font-family:'Microsoft YaHei';font-size:15px;line-height:30px;">?LinkedBlockingQueue。ArrayBlockingQueue代碼實(shí)現(xiàn)上通過(guò)數(shù)據(jù)隊(duì)列轉(zhuǎn)發(fā)數(shù)據(jù)的. 故這兩者不能設(shè)置queye長(zhǎng)度的最大值為0. 兩者通過(guò)堆內(nèi)存?zhèn)鬟f,notifyAll堵塞線程.?假設(shè)數(shù)據(jù)隊(duì)列是0,數(shù)據(jù)就無(wú)法傳遞了. SynchronousQueue 無(wú)數(shù)據(jù)隊(duì)列,那么數(shù)據(jù)怎樣傳輸呢??代碼實(shí)現(xiàn)上其數(shù)據(jù)是直接通過(guò)堆內(nèi)存直接傳遞給堵塞線程. 線程1被堵塞將數(shù)據(jù)同一時(shí)候存放在線程堆棧上的局部變量以及和線程id綁定的隊(duì)列節(jié)點(diǎn)(是field屬性,狀態(tài))上.?線程2來(lái)匹配,會(huì)改變?cè)氯€程的堆內(nèi)存的值,使得原堵塞線程可以獲取兩份數(shù)據(jù).這兩份數(shù)據(jù)中肯定有份是生產(chǎn)者提供的數(shù)據(jù),一份是消費(fèi)者偽造的假數(shù)據(jù),通過(guò)標(biāo)示為推斷終于得到生產(chǎn)者得到的數(shù)據(jù). 對(duì)于SynchronousQueue? ?最大值是0,也沒(méi)有其它線程生成數(shù)據(jù)節(jié)點(diǎn),put時(shí)無(wú)法存放數(shù)據(jù),?讓put一開始就進(jìn)入了堵塞.? 對(duì)于LinkedBlockingQueue,ArrayBlockingQueue, put僅僅有在數(shù)據(jù)隊(duì)列滿了才會(huì)堵塞.
應(yīng)用場(chǎng)景: 以Executors.newCachedThreadPool()為例,CachedThreadPool特點(diǎn): 有任務(wù)時(shí)可以無(wú)限制生成線程,無(wú)任務(wù)時(shí)也可以高速回收線程. 用線程不斷生成替代了緩沖隊(duì)列.?該javadoc上,明白說(shuō)明了適合大批量的小任務(wù). 不適合一下子大量,一下子又無(wú)數(shù)據(jù). 不太適合生產(chǎn)者生產(chǎn)速率動(dòng)蕩變化,每一個(gè)任務(wù)都非常長(zhǎng)的場(chǎng)景. </pre><pre name="code" class="java">public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } 利用了SynchronousQueue.offer和take的調(diào)用配合實(shí)現(xiàn)了CachedThreadPool();相比blockingQueue長(zhǎng)處:?Syncronize一個(gè)堵塞隊(duì)列.+ 死循環(huán)cas(compare and swap),不會(huì)頻繁的線程掛起和喚醒(park和unpark) ? ?和new ThreadPoolExecutor時(shí)設(shè)置coreSize=0,linkedblockingQueue 容量=1 的差別是后者維護(hù)一個(gè)size=1的堵塞隊(duì)列,隊(duì)列常常在滿和空之間切換,須要頻繁的線程掛起和喚醒(park和unpark) ? ? ?? 強(qiáng)烈建議打開eclipse相應(yīng)的源碼,.jdk1.6 里的src.zip 源碼包1 public void execute(Runnable command) { 2 if (command == null) 3 throw new NullPointerException(); 4 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { 5 if (runState == RUNNING && workQueue.offer(command)) { 6 if (runState != RUNNING || poolSize == 0) 7 ensureQueuedTaskHandled(command); 8 } 9 else if (!addIfUnderMaximumPoolSize(command)) 10 reject(command); // is shutdown or saturated 11 } 12 }
? ?剛開始execute(runable),queue.offer()失敗,產(chǎn)生新的線程運(yùn)行任務(wù).一直不停的offer,產(chǎn)生新的線程.到后面老的線程運(yùn)行完成會(huì)調(diào)用take()第一次execute(runable),第5行queue.offer()失敗,進(jìn)入第9行產(chǎn)生max配額的新線程運(yùn)行任務(wù)runnable.后面不停的execute(),一直不停的offer失敗,產(chǎn)生max配額的新線程去運(yùn)行runnable.直到第一次的線程運(yùn)行任務(wù)完成后會(huì)調(diào)用SynchronousQueue.take(). 這樣假設(shè)再有execute(),第5行就能匹配成功新的offer就能配對(duì)成功,runnable實(shí)例被老的線程獲取運(yùn)行,不會(huì)去新建線程.這個(gè)實(shí)現(xiàn)了動(dòng)態(tài)的線程池.所以java才說(shuō)適合大批量的小的異步任務(wù)(These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks.) Executors.固定大小的線程的優(yōu)點(diǎn)是.線程資源是有限的,每一個(gè)線程512k?-Xss???? 每一個(gè)線程的Stack大小,默認(rèn)堆棧.避免有些無(wú)限制增加線程池的問(wèn)題.沒(méi)有提供可配置的BlockingQueue容量大小.


轉(zhuǎn)載于:https://www.cnblogs.com/zsychanpin/p/7010142.html

總結(jié)

以上是生活随笔為你收集整理的[源码]解析 SynchronousQueue 上界,下界.. 数据保存和数据传递. 堵塞队列. 有无频繁await?...的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。