日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

每秒钟承载600万订单级别的无锁并行计算框架 Disruptor学习

發布時間:2023/12/10 编程问答 50 豆豆
生活随笔 收集整理的這篇文章主要介紹了 每秒钟承载600万订单级别的无锁并行计算框架 Disruptor学习 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1.來源

Disruptor是英國外匯交易公司LMAX開發的一個高性能隊列,研發的初衷是解決內部的內存隊列的延遲問題,而不是分布式隊列。基于Disruptor開發的系統單線程能支撐每秒600萬訂單,2010年在QCon演講后,獲得了業界關注。

2.應用背景和介紹

據目前資料顯示:應用Disruptor的知名項目有如下的一些:Storm, Camel, Log4j2,還有目前的美團點評技術團隊也有很多不少的應用,或者說有一些借鑒了它的設計機制。
Disruptor是一個高性能的線程間異步通信的框架,即在同一個JVM進程中的多線程間消息傳遞。
?

Disruptor是英國外匯交易公司LMAX開發的一個高性能隊列,研發的初衷是解決內存隊列的延遲問題。與Kafka、RabbitMQ用于服務間的消息隊列不同,disruptor一般用于線程間消息的傳遞。基于Disruptor開發的系統單線程能支撐每秒600萬訂單。

disruptor是用于一個JVM中多個線程之間的消息隊列,作用與ArrayBlockingQueue有相似之處,但是disruptor從功能、性能都遠好于ArrayBlockingQueue,當多個線程之間傳遞大量數據或對性能要求較高時,可以考慮使用disruptor作為ArrayBlockingQueue的替代者。
?官方也對disruptor和ArrayBlockingQueue的性能在不同的應用場景下做了對比,目測性能只有有5~10倍左右的提升。

隊列

隊列是屬于一種數據結構,隊列采用的FIFO(first in firstout),新元素(等待進入隊列的元素)總是被插入到尾部,而讀取的時候總是從頭部開始讀取。在計算中隊列一般用來做排隊(如線程池的等待排隊,鎖的等待排隊),用來做解耦(生產者消費者模式),異步等等

在jdk中的隊列都實現了java.util.Queue接口,在隊列中又分為兩類,一類是線程不安全的,ArrayDeque,LinkedList等等,還有一類都在java.util.concurrent包下屬于線程安全,而在我們真實的環境中,我們的機器都是屬于多線程,當多線程對同一個隊列進行排隊操作的時候,如果使用線程不安全會出現,覆蓋數據,數據丟失等無法預測的事情,所以我們這個時候只能選擇線程安全的隊列。
其次還剩下ArrayBlockingQueue,LinkedBlockingQueue兩個隊列,他們兩個都是用ReentrantLock控制的線程安全,他們兩個的區別一個是數組,一個是鏈表,在隊列中,一般獲取這個隊列元素之后緊接著會獲取下一個元素,或者一次獲取多個隊列元素都有可能,而數組在內存中地址是連續的,在操作系統中會有緩存的優化(下面也會介紹緩存行),所以訪問的速度會略勝一籌,我們也會盡量去選擇ArrayBlockingQueue。而事實證明在很多第三方的框架中,比如早期的log4j異步,都是選擇的ArrayBlockingQueue。

在jdk中提供的線程安全的隊列下面簡單列舉部分隊列:\

?

?

我們可以看見,我們無鎖的隊列是無界的,有鎖的隊列是有界的,這里就會涉及到一個問題,我們在真正的線上環境中,無界的隊列,對我們系統的影響比較大,有可能會導致我們內存直接溢出,所以我們首先得排除無界隊列,當然并不是無界隊列就沒用了,只是在某些場景下得排除。其次還剩下ArrayBlockingQueue,LinkedBlockingQueue兩個隊列,他們兩個都是用ReentrantLock控制的線程安全,他們兩個的區別一個是數組,一個是鏈表。
(LinkedBlockingQueue 其實也是有界隊列,但是不設置大小時就時Integer.MAX_VALUE),ArrayBlockingQueue,LinkedBlockingQueue也有自己的弊端,就是性能比較低,為什么jdk會增加一些無鎖的隊列,其實就是為了增加性能,很苦惱,又需要無鎖,又需要有界,答案就是Disruptor

Disruptor

Disruptor是英國外匯交易公司LMAX開發的一個高性能隊列,并且是一個開源的并發框架,并獲得2011Duke’s程序框架創新獎。能夠在無鎖的情況下實現網絡的Queue并發操作,基于Disruptor開發的系統單線程能支撐每秒600萬訂單。目前,包括Apache Storm、Camel、Log4j2等等知名的框架都在內部集成了Disruptor用來替代jdk的隊列,以此來獲得高性能。

為什么這么牛逼?

在Disruptor中有三大殺器:

  • CAS
  • 消除偽共享
  • RingBuffer

?

3.1.1鎖和CAS

我們ArrayBlockingQueue為什么會被拋棄的一點,就是因為用了重量級lock鎖,在我們加鎖過程中我們會把鎖掛起,解鎖后,又會把線程恢復,這一過程會有一定的開銷,并且我們一旦沒有獲取鎖,這個線程就只能一直等待,這個線程什么事也不能做。

CAS(compare and swap),顧名思義先比較在交換,一般是比較是否是老的值,如果是的進行交換設置,大家熟悉樂觀鎖的人都知道CAS可以用來實現樂觀鎖,CAS中沒有線程的上下文切換,減少了不必要的開銷
而我們的Disruptor也是基于CAS。

3.1.2偽共享

到了偽共享就不得不說計算機CPU緩存,緩存大小是CPU的重要指標之一,而且緩存的結構和大小對CPU速度的影響非常大,CPU內緩存的運行頻率極高,一般是和處理器同頻運作,工作效率遠遠大于系統內存和硬盤。實際工作時,CPU往往需要重復讀取同樣的數據塊,而緩存容量的增大,可以大幅度提升CPU內部讀取數據的命中率,而不用再到內存或者硬盤上尋找,以此提高系統性能。但是從CPU芯片面積和成本的因素來考慮,緩存都很小。

CPU緩存可以分為一級緩存,二級緩存,如今主流CPU還有三級緩存,甚至有些CPU還有四級緩存。每一級緩存中所儲存的全部數據都是下一級緩存的一部分,這三種緩存的技術難度和制造成本是相對遞減的,所以其容量也是相對遞增的。

每一次你聽見intel發布新的cpu什么,比如i7-7700k,8700k,都會對cpu緩存大小進行優化,感興趣可以自行下來搜索,這些的發布會或者發布文章。

Martin和Mike的 QConpresentation演講中給出了一些每個緩存時間:

緩存行

在cpu的多級緩存中,并不是以獨立的項來保存的,而是類似一種pageCahe的一種策略,以緩存行來保存,而緩存行的大小通常是64字節,在Java中Long是8個字節,所以可以存儲8個Long,舉個例子,你訪問一個long的變量的時候,他會把幫助再加載7個,我們上面說為什么選擇數組不選擇鏈表,也就是這個原因,在數組中可以依靠緩沖行得到很快的訪問。

?

緩存行是萬能的嗎?NO,因為他依然帶來了一個缺點,我在這里舉個例子說明這個缺點,可以想象有個數組隊列,ArrayQueue,他的數據結構如下:

?

對于maxSize是我們一開始就定義好的,數組的大小,對于currentIndex,是標志我們當前隊列的位置,這個變化比較快,可以想象你訪問maxSize的時候,是不是把currentIndex也加載進來了,這個時候,其他線程更新currentIndex,就會把cpu中的緩存行置位無效,請注意這是CPU規定的,他并不是只吧currentIndex置位無效,如果此時又繼續訪問maxSize他依然得繼續從內存中讀取,但是MaxSize卻是我們一開始定義好的,我們應該訪問緩存即可,但是卻被我們經常改變的currentIndex所影響。


Padding的魔法

為了解決上面緩存行出現的問題,在Disruptor中采用了Padding的方式

?

其中的Value就被其他一些無用的long變量給填充了。這樣你修改Value的時候,就不會影響到其他變量的緩存行。

最后順便一提,在jdk8中提供了@Contended的注解,當然一般來說只允許Jdk中內部,如果你自己使用那就得配置Jvm參數 -RestricContentended = fase,將限制這個注解置位取消。很多文章分析了ConcurrentHashMap,但是都把這個注解給忽略掉了,在ConcurrentHashMap中就使用了這個注解,在ConcurrentHashMap每個桶都是單獨的用計數器去做計算,而這個計數器由于時刻都在變化,所以被用這個注解進行填充緩存行優化,以此來增加性能。



作者:tracy_668
鏈接:https://www.jianshu.com/p/bad7b4b44e48
來源:簡書
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。

?

下面的例子是測試利用cache line的特性和不利用cache line的特性的效果對比.


?

什么是偽共享

ArrayBlockingQueue有三個成員變量:

這三個變量很容易放到一個緩存行中, 但是之間修改沒有太多的關聯. 所以每次修改, 都會使之前緩存的數據失效, 從而不能完全達到共享的效果.

如上圖所示, 當生產者線程put一個元素到ArrayBlockingQueue時, putIndex會修改, 從而導致消費者線程的緩存中的緩存行無效, 需要從主存中重新讀取.

這種無法充分使用緩存行特性的現象, 稱為偽共享

3.1.3RingBuffer

ringbuffer到底是什么
它是一個環(首尾相接的環),你可以把它用做在不同上下文(線程)間傳遞數據的buffer。


?

基本來說,ringbuffer擁有一個序號,這個序號指向數組中下一個可用的元素。(如下圖右邊的圖片表示序號,這個序號指向數組的索引4的位置

?

?

隨著你不停地填充這個buffer(可能也會有相應的讀取),這個序號會一直增長,直到繞過這個環。

?

要找到數組中當前序號指向的元素,可以通過sequence & (array length-1) = array index,比如一共有8槽,3&(8-1)=3,HashMap就是用這個方式來定位數組元素的,這種方式比取模的速度更快。

常用的隊列之間的區別

  • 沒有尾指針。只維護了一個指向下一個可用位置的序號。
  • 不刪除buffer中的數據,也就是說這些數據一直存放在buffer中,直到新的數據覆蓋他們

ringbuffer采用這種數據結構原因

  • 因為它是數組,所以要比鏈表快,數組內元素的內存地址的連續性存儲的。這是對CPU緩存友好的—也就是說,在硬件級別,數組中的元素是會被預加載的,因此在ringbuffer當中,cpu無需時不時去主存加載數組中的下一個元素。因為只要一個元素被加載到緩存行,其他相鄰的幾個元素也會被加載進同一個緩存行。
  • 其次,你可以為數組預先分配內存,使得數組對象一直存在(除非程序終止)。這就意味著不需要花大量的時間用于垃圾回收。此外,不像鏈表那樣,需要為每一個添加到其上面的對象創造節點對象—對應的,當刪除節點時,需要執行相應的內存清理操作。


如何從Ringbuffer讀取

消費者(Consumer)是一個想從Ring Buffer里讀取數據的線程,它可以訪問ConsumerBarrier對象——這個對象由RingBuffer創建并且代表消費者與RingBuffer進行交互。就像Ring Buffer顯然需要一個序號才能找到下一個可用節點一樣,消費者也需要知道它將要處理的序號——每個消費者都需要找到下一個它要訪問的序號。在上面的例子中,消費者處理完了Ring Buffer里序號8之前(包括8)的所有數據,那么它期待訪問的下一個序號是9。

消費者可以調用ConsumerBarrier對象的waitFor()方法,傳遞它所需要的下一個序號.

?

final long availableSeq = consumerBarrier.waitFor(nextSequence);

ConsumerBarrier返回RingBuffer的最大可訪問序號——在上面的例子中是12。ConsumerBarrier有一個WaitStrategy方法來決定它如何等待這個序號.

接下來

接下來,消費者會一直逛來逛去,等待更多數據被寫入 Ring Buffer。并且,寫入數據后消費者會收到通知——節點 9,10,11 和 12 已寫入。現在序號 12 到了,消費者可以指示 ConsumerBarrier 去拿這些序號里的數據了。



?

在Disruptor中采用了數組的方式保存了我們的數據,上面我們也介紹了采用數組保存我們訪問時很好的利用緩存,但是在Disruptor中進一步選擇采用了環形數組進行保存數據,也就是RingBuffer。在這里先說明一下環形數組并不是真正的環形數組,在RingBuffer中是采用取余的方式進行訪問的,比如數組大小為 10,0訪問的是數組下標為0這個位置,其實10,20等訪問的也是數組的下標為0的這個位置。

實際上,在這些框架中取余并不是使用%運算,都是使用的&與運算,這就要求你設置的大小一般是2的N次方也就是,10,100,1000等等,這樣減去1的話就是,1,11,111,就能很好的使用index & (size -1),這樣利用位運算就增加了訪問速度。
如果在Disruptor中你不用2的N次方進行大小設置,他會拋出buffersize必須為2的N次方異常。


  • Producer會向這個RingBuffer中填充元素,填充元素的流程是首先從RingBuffer讀取下一個Sequence,之后在這個Sequence位置的槽填充數據,之后發布。
  • Consumer消費RingBuffer中的數據,通過SequenceBarrier來協調不同的Consumer的消費先后順序,以及獲取下一個消費位置Sequence。
  • Producer在RingBuffer寫滿時,會從頭開始繼續寫替換掉以前的數據。但是如果有SequenceBarrier指向下一個位置,則不會覆蓋這個位置,阻塞到這個位置被消費完成。Consumer同理,在所有Barrier被消費完之后,會阻塞到有新的數據進來。

Disruptor的設計方案

Disruptor通過以下設計來解決隊列速度慢的問題:

  • 環形數組結構
    為了避免垃圾回收, 采用數組而非鏈表. 同時, 數組對處理器的緩存機制更加友好.
  • 元素位置定位
    數組長度2^n, 通過位運算, 加快定位的速度. 下標采取遞增的形式. 不用擔心index溢出的問題. index是long類型, 即使100萬QPS的處理速度, 也需要30萬年才能用完.
  • 無鎖設計
    每個生產者或者消費者線程, 會先申請可以操作的元素在數組中的位置, 申請到之后, 直接在該位置寫入或者讀取數據.

下面忽略數組的環形結構, 介紹一下如何實現無鎖設計. 整個過程通過原子變量CAS, 保證操作的線程安全.

一個生產者

生產者單線程寫數據的流程比較簡單:

  • 申請寫入m個元素;
  • 若是有m個元素可以寫入, 則返回最大的序列號. 這兒主要判斷是否會覆蓋未讀的元素
  • 若是返回的正確, 則生產者開始寫入元素.

  • 多個生產者
    多個生產者的情況下, 會遇到“如何防止多個線程重復寫同一個元素”的問題. Disruptor的解決方法是, 每個線程獲取不同的一段數組空間進行操作. 這個通過CAS很容易達到. 只需要在分配元素的時候, 通過CAS判斷一下這段空間是否已經分配出去即可.

    但是會遇到一個新問題: 如何防止讀取的時候, 讀到還未寫的元素. Disruptor在多個生產者的情況下, 引入了一個與Ring Buffer大小相同的buffer: available Buffer. 當某個位置寫入成功的時候, 便把availble Buffer相應的位置置位, 標記為寫入成功. 讀取的時候, 會遍歷available Buffer, 來判斷元素是否已經就緒.

    讀數據
    生產者多線程寫入的情況會復雜很多:

  • 申請讀取到序號n;
  • 若writer cursor >= n, 這時仍然無法確定連續可讀的最大下標. 從reader cursor開始讀取available Buffer, 一直查到第一個不可用的元素, 然后返回最大連續可讀元素的位置;
  • 消費者讀取元素.
  • 如下圖所示, 讀線程讀到下標為2的元素, 三個線程Writer1/Writer2/Writer3正在向RingBuffer相應位置寫數據, 寫線程被分配到的最大元素下標是11.
    讀線程申請讀取到下標從3到11的元素, 判斷writer cursor>=11. 然后開始讀取availableBuffer, 從3開始, 往后讀取, 發現下標為7的元素沒有生產成功, 于是WaitFor(11)返回6.

    然后, 消費者讀取下標從3到6共計4個元素.



    ?

    ?

    寫數據
    多個生產者寫入的時候:

  • 申請寫入m個元素;
  • 若是有m個元素可以寫入, 則返回最大的序列號. 每個生產者會被分配一段獨享的空間;
  • 生產者寫入元素, 寫入元素的同時設置available Buffer里面相應的位置, 以標記自己哪些位置是已經寫入成功的.
    如下圖所示, Writer1和Writer2兩個線程寫入數組, 都申請可寫的數組空間. Writer1被分配了下標3到下表5的空間, Writer2被分配了下標6到下標9的空間.
  • Writer1寫入下標3位置的元素, 同時把available Buffer相應位置置位, 標記已經寫入成功, 往后移一位, 開始寫下標4位置的元素. Writer2同樣的方式. 最終都寫入完成.



    ?

    防止不同生產者對同一段空間寫入的代碼, 如下所示:

    通過do/while循環的條件cursor.compareAndSet(current, next), 來判斷每次申請的空間是否已經被其他生產者占據. 假如已經被占據, 該函數會返回失敗, While循環重新執行, 申請寫入空間.

    消費者的流程與生產者非常類似, 這兒就不多描述了. Disruptor通過精巧的無鎖設計實現了在高并發情形下的高性能.



    3.2Disruptor怎么使用

    package concurrent;import sun.misc.Contended;import java.util.concurrent.ThreadFactory;import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType;/*** @Description:* @Created on 2019-10-04*/ public class DisruptorTest {public static void main(String[] args) throws Exception {// 隊列中的元素class Element {@Contendedprivate String value;public String getValue() {return value;}public void setValue(String value) {this.value = value;}}// 生產者的線程工廠ThreadFactory threadFactory = new ThreadFactory() {int i = 0;@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "simpleThread" + String.valueOf(i++));}};// RingBuffer生產工廠,初始化RingBuffer的時候使用EventFactory<Element> factory = new EventFactory<Element>() {@Overridepublic Element newInstance() {return new Element();}};// 處理Event的handlerEventHandler<Element> handler = new EventHandler<Element>() {@Overridepublic void onEvent(Element element, long sequence, boolean endOfBatch) throws InterruptedException {System.out.println("Element: " + Thread.currentThread().getName() + ": " + element.getValue() + ": " + sequence); // Thread.sleep(10000000);}};// 阻塞策略BlockingWaitStrategy strategy = new BlockingWaitStrategy();// 指定RingBuffer的大小int bufferSize = 8;// 創建disruptor,采用單生產者模式Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);// 設置EventHandlerdisruptor.handleEventsWith(handler);// 啟動disruptor的線程disruptor.start();for (int i = 0; i < 10; i++) {disruptor.publishEvent((element, sequence) -> {System.out.println("之前的數據" + element.getValue() + "當前的sequence" + sequence);element.setValue("我是第" + sequence + "個");});}} }

    ?

    在Disruptor中有幾個比較關鍵的:

    • ThreadFactory:這是一個線程工廠,用于我們Disruptor中生產、消費的時候需要的線程。
    • EventFactory:事件工廠,用于產生我們隊列元素的工廠。在Disruptor中,他會在初始化的時候直接填充滿RingBuffer,一次到位。
    • EventHandler:用于處理Event的handler,這里一個EventHandler可以看做是一個消費者,但是多個EventHandler他們都是獨立消費的隊列。
    • WorkHandler:也是用于處理Event的handler,和上面區別在于,多個消費者都是共享同一個隊列。
    • WaitStrategy:等待策略,在Disruptor中有多種策略,來決定消費者在消費時,如果沒有數據采取的策略是什么?下面簡單列舉一下Disruptor中的部分策略
  • BlockingWaitStrategy:通過線程阻塞的方式,等待生產者喚醒,被喚醒后,再循環檢查依賴的sequence是否已經消費。
  • BusySpinWaitStrategy:線程一直自旋等待,可能比較耗cpu
  • YieldingWaitStrategy:嘗試100次,然后Thread.yield()讓出cpu

  • ?

    總結

    以上是生活随笔為你收集整理的每秒钟承载600万订单级别的无锁并行计算框架 Disruptor学习的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。