日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

disruptor实现细节及源码分析

發(fā)布時(shí)間:2025/7/14 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 disruptor实现细节及源码分析 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

disruptor實(shí)現(xiàn)細(xì)節(jié)及源碼分析

一、?????背景介紹

???? Disruptor它是一個(gè)開源的并發(fā)框架,并獲得?2011 Duke’s?程序框架創(chuàng)新獎(jiǎng),能夠在無鎖的情況下實(shí)現(xiàn)網(wǎng)絡(luò)的Queue并發(fā)操作。

???? 說明:下文所有內(nèi)容基于disruptor3.34版本。

二、?????應(yīng)用場景

???? 在消費(fèi)者--生產(chǎn)者模式中或發(fā)布訂閱模式中使用。

????具有以下特點(diǎn):

  • 無鎖的設(shè)計(jì)及CAS式的原子訪問。

  • 預(yù)分配存儲(chǔ)空間,避免垃圾回收帶來的資源消耗。

    三、?????核心對象

  • ????????RingBuffer:環(huán)形的一個(gè)數(shù)據(jù)結(jié)構(gòu),對象初始化時(shí),會(huì)使用事件Event進(jìn)行填充。Buffer的大小必須是2的冪次方,方便移位操作。

    ?????????

    ????????Event:無指定具體接口,用戶自己實(shí)現(xiàn),可以攜帶任何業(yè)務(wù)數(shù)據(jù)。

    ?????????

    ????????EventFactory:產(chǎn)生事件Event的工廠,由用戶自己實(shí)現(xiàn)。

    ?????????

    ????????EventTranslator:事件發(fā)布的回調(diào)接口,由用戶實(shí)現(xiàn),負(fù)責(zé)將業(yè)務(wù)參數(shù)設(shè)置到事件中。

    ?????????

    ????????Sequencer:序列產(chǎn)生器,也是協(xié)調(diào)生產(chǎn)者和消費(fèi)者及實(shí)現(xiàn)高并發(fā)的核心。有MultiProducerSequencer SingleProducerSequencer兩個(gè)實(shí)現(xiàn)類。

    ?????????

    ????????SequenceBarrier:擁有RingBuffer的發(fā)布事件Sequence引用和消費(fèi)者依賴的Sequence引用。決定消費(fèi)者消費(fèi)可消費(fèi)的Sequence

    ?????????

    ????????EventHandler:事件的處理者,由用戶自己實(shí)現(xiàn)。

    ?????????

    ????????EventProcessor:事件的處理器,單獨(dú)在一個(gè)線程中運(yùn)行。

    ?????????

    ????????WorkHandler:事件的處理者,由用戶自己實(shí)現(xiàn)。

    ?????????

    ????????WorkProcessor:事件的處理器,單獨(dú)在一個(gè)線程中運(yùn)行。

    ?????????

    ????????WorkerPool:一組WorkProcessor的處理。

    ?????????

    ????????WaitStrategy:在消費(fèi)者比生產(chǎn)者快時(shí),消費(fèi)者處理器的等待策略。

    ?

    四、?????簡單示例

    • 定義業(yè)務(wù)數(shù)據(jù)類:

    public?class?MyData?{private?long?value;public?MyData(long?value){this.value?=?value;}public?long?getValue()?{return?value;}public?void?setValue(long?value)?{this.value?=?value;}public?String?toString(){StringBuffer?sb?=?new?StringBuffer();sb.append("value=").append(value);return?sb.toString();} }

    • 定義事件類:

    public?class?MyEvent?{private?MyData?data;public?MyData?getData()?{return?data;}public?void?setData(MyData?data)?{this.data?=?data;}}

    • 定義事件處理類:

    public?class?MyEventHandler?implements?EventHandler<MyEvent>{@Overridepublic?void?onEvent(MyEvent?event,?long?sequence,?boolean?endOfBatch)throws?Exception?{System.out.println("事件處理:"+event.getData());}}

    • 定義事件工廠類:

    public?class?MyFactory?implements?EventFactory<MyEvent>{@Overridepublic?MyEvent?newInstance()?{return?new?MyEvent();}}
    • 定義事件發(fā)布輔助類:

    public?class?MyEventTranslatorOneArg?implements?EventTranslatorOneArg<MyEvent,MyData>{@Overridepublic?void?translateTo(MyEvent?event,?long?sequence,?MyData?data)?{System.out.println("發(fā)布事件:"+data);event.setData(data);}}

    • 主類:

    public?class?MyMainTest?{public?static?void?main(String[]args){Disruptor<MyEvent>?disruptor?=?newDisruptor<MyEvent>(new?MyFactory(),//事件工廠128,?//必須為2的冪次方new?ThreadFactory(){//線程工廠@Overridepublic?Thread?newThread(Runnable?runnable)?{returnnew?Thread(runnable);}},ProducerType.SINGLE,//指定生產(chǎn)者為一個(gè)或多個(gè)newYieldingWaitStrategy());//等待策略//指定處理器disruptor.handleEventsWith(newMyEventHandler());disruptor.start();//發(fā)布事件MyEventTranslatorOneArg?translator?=?newMyEventTranslatorOneArg();for(int?i?=?0;?i?<?10;?i++){disruptor.publishEvent(translator,?new?MyData(i));}disruptor.shutdown();} }

    五、?????實(shí)現(xiàn)原理及源碼分析

    • RingBuffer的實(shí)現(xiàn):

    ? ? 封裝了一個(gè)對象數(shù)組,RingBuffer實(shí)例化時(shí),用Event填充。生產(chǎn)者和消費(fèi)者通過對序列(long的原子操作封裝)取模計(jì)算獲取對象數(shù)組中Event

    ?

    public?E?get(long?sequence){return?elementAt(sequence); }protected?final?E?elementAt(long?sequence){return?(E)?UNSAFE.getObject(entries,?REF_ARRAY_BASE?+?((sequence?&?indexMask)?<<?REF_ELEMENT_SHIFT));}


    • 單個(gè)生產(chǎn)者的實(shí)現(xiàn):

    ? ? 保存有所有消費(fèi)者當(dāng)前消費(fèi)的前一個(gè)序列值,在取下一個(gè)要發(fā)布的序列時(shí),檢查要發(fā)布的序列是否覆蓋所有消費(fèi)者正在處理的最小序列。如果未覆蓋,則獲取可發(fā)布的游標(biāo)值,如果覆蓋(說明緩存已經(jīng)滿了),則自旋等待,直到可以發(fā)布。發(fā)布事件時(shí)則先發(fā)布,后指定當(dāng)前游標(biāo)為發(fā)布的序列值。

    ?

    public?long?next(int?n){if?(n?<?1){thrownew?IllegalArgumentException("n?must?be?>?0");}//當(dāng)前生產(chǎn)者發(fā)布的的最大序列l(wèi)ong?nextValue?=?this.nextValue;long?nextSequence?=?nextValue?+?n;//要發(fā)布的最大序列l(wèi)ong?wrapPoint?=?nextSequence?-?bufferSize;//覆蓋點(diǎn)long?cachedGatingSequence?=?this.cachedValue;//消費(fèi)者中處理序列最小的前一個(gè)序列//緩存已滿??或者處理器處理異常時(shí)if?(wrapPoint?>?cachedGatingSequence?||cachedGatingSequence?>?nextValue){long?minSequence;//等待直到有可用的緩存while?(wrapPoint?>?(minSequence?=?Util.getMinimumSequence(gatingSequences,nextValue))){LockSupport.parkNanos(1L);//?TODO:?Use?waitStrategy?to?spin?}this.cachedValue?=?minSequence;}//更新當(dāng)前生產(chǎn)者發(fā)布的的最大序列this.nextValue?=?nextSequence;return?nextSequence;}


    • 多個(gè)生產(chǎn)者的實(shí)現(xiàn):

    ????保存有所有消費(fèi)者當(dāng)前消費(fèi)的前一個(gè)序列值,并維護(hù)一個(gè)和RingBuffer一樣大小的數(shù)組,在取下一個(gè)要發(fā)布的序列時(shí),檢查要發(fā)布的序列是否覆蓋所有消費(fèi)者正在處理的最小序列。如果未覆蓋,則先發(fā)布,后指定當(dāng)前游標(biāo)為發(fā)布的序列值,如果未覆蓋,則獲取可發(fā)布的游標(biāo)值,如果覆蓋(說明緩存已經(jīng)滿了),則自旋等待,直到可以發(fā)布。一個(gè)生產(chǎn)者獲取可發(fā)布的序列后,立即更新當(dāng)前游標(biāo)。發(fā)布事件時(shí)生產(chǎn)者每發(fā)布一個(gè)序列,則記錄到數(shù)組指定位置。

    public?long?next(int?n){if?(n?<?1){thrownew?IllegalArgumentException("n?must?be?>?0");}long?current;long?next;do{//當(dāng)前游標(biāo)current?=?cursor.get();//要發(fā)布的游標(biāo)next?=?current?+?n;//覆蓋點(diǎn)long?wrapPoint?=?next?-?bufferSize;//消費(fèi)者中處理序列最小的前一個(gè)序列l(wèi)ong?cachedGatingSequence?=?gatingSequenceCache.get();//緩存已滿??或者處理器處理異常時(shí)if?(wrapPoint?>?cachedGatingSequence?||cachedGatingSequence?>?current){long?gatingSequence?=?Util.getMinimumSequence(gatingSequences,?current);if?(wrapPoint?>?gatingSequence){LockSupport.parkNanos(1);//?TODO,?should?we?spin?based?on?the?waitstrategy?//緩存滿時(shí),繼續(xù)再次嘗試continue;}//更新當(dāng)前生產(chǎn)者發(fā)布的的最大序列g(shù)atingSequenceCache.set(gatingSequence);}elseif?(cursor.compareAndSet(current,?next)){//成功獲取到發(fā)布序列并設(shè)置當(dāng)前游標(biāo)成功時(shí)跳出循環(huán)break;}}while?(true);return?next;}

    • 消費(fèi)者的實(shí)現(xiàn):

    ????消費(fèi)者保持一個(gè)自己的序列,每次累加后nextSequence,去獲取可訪問的最大序列。對于一個(gè)生產(chǎn)者,就是nextSequenceRingBuffer當(dāng)前游標(biāo)的序列。對于多個(gè)生產(chǎn)者,就是nextSequenceRingBuffer當(dāng)前游標(biāo)之間,最大的連續(xù)的序列集。

    public?long?waitFor(finallong?sequence)throws?AlertException,InterruptedException,?TimeoutException{checkAlert();//獲取最大的可消費(fèi)的序列,依賴等待策略long?availableSequence?=?waitStrategy.waitFor(sequence,?cursorSequence,?dependentSequence,?this);if?(availableSequence?<?sequence){return?availableSequence;}return?sequencer.getHighestPublishedSequence(sequence,availableSequence);}

    ? ? ?一個(gè)生產(chǎn)者:

    public?long?getHighestPublishedSequence(long?lowerBound,?long?availableSequence){//?返回最大序列availableSequencereturn?availableSequence; }

    ? ? ?多個(gè)生產(chǎn)者:

    public?boolean?isAvailable(long?sequence){int?index?=?calculateIndex(sequence);int?flag?=?calculateAvailabilityFlag(sequence);long?bufferAddress?=?(index?*?SCALE)?+?BASE;//相應(yīng)位置上的值相等,說明已經(jīng)發(fā)布該序列returnUNSAFE.getIntVolatile(availableBuffer,bufferAddress)?==?flag;}@Overridepublic?long?getHighestPublishedSequence(long?lowerBound,?long?availableSequence){//從數(shù)組中找出未發(fā)布序列,即由小到大連續(xù)的發(fā)布序列for?(long?sequence?=?lowerBound;?sequence?<=?availableSequence;sequence++){if?(!isAvailable(sequence)){//返回未發(fā)布序列的前一個(gè)序列return?sequence?-?1;}}return?availableSequence;}


    • 等待策略:

    ????????消費(fèi)者在緩存中沒有可以消費(fèi)的事件時(shí),采取的等待策略:

    ?????????

    ????????BlockingWaitStrategy:通過線程阻塞的方式,等待生產(chǎn)者喚醒

    ?????????

    ????????BusySpinWaitStrategy:線程一直自旋等待,比較耗CPU

    ?????????

    ????????LiteBlockingWaitStrategy:通過線程阻塞的方式,等待生產(chǎn)者喚醒,比BlockingWaitStrategy要輕,某些情況下可以減少阻塞的次數(shù)。

    ?????????

    ????????PhasedBackoffWaitStrategy:根據(jù)指定的時(shí)間段參數(shù)和指定的等待策略決定采用哪種等待策略。

    ?????????

    ????????SleepingWaitStrategy:可通過參數(shù)設(shè)置,使線程通過Thread.yield()主動(dòng)放棄執(zhí)行,通過線程調(diào)度器重新調(diào)度;或一直自旋等待。

    ????????TimeoutBlockingWaitStrategy:通過參數(shù)設(shè)置阻塞時(shí)間,如果超時(shí)則拋出異常。

    ?????????

    ????????YieldingWaitStrategy 通過Thread.yield()主動(dòng)放棄執(zhí)行,通過線程調(diào)度器重新調(diào)度。

    ????????


    轉(zhuǎn)載于:https://blog.51cto.com/11246272/1745472

    總結(jié)

    以上是生活随笔為你收集整理的disruptor实现细节及源码分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。