日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

在美团呆了7年的架构师带你解读Disruptor系列并发框架

發布時間:2024/1/18 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 在美团呆了7年的架构师带你解读Disruptor系列并发框架 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

前言

理解Disruptor的最佳方式是,將其與一些容易理解和目的相似的東西比較。這里的參照物就是java里的阻塞隊列(BlockingQueue)。

與BlockingQueue的異同:
同:目的相同,都是為了在同一進程的線程間傳輸數據。
異:對消費者多播事件;預分配事件內存;可選無鎖。

核心概念

  • Ring Buffer : 曾經的核心。自從3.0以上,環形緩沖器只作為Disruptor存儲和更新數據(事件)的容器。對于一些高級用法,可以完全替換為用戶提供的容器。
  • Sequence:Disruptor使用Sequence作為一種確定特定組件位置的方法。每個消費者(EventProcessor)都維護一個Sequence,Disruptor自己也是一樣。大部分并發代碼以來這些Sequence值的移動,因此Sequence支持AtomicLong的當前許多特性。事實上,兩者唯一的區別是Sequence包含了附加功能來防止Sequence和其他值的偽共享。
  • Sequencer:Disruptor的真正核心。此接口的兩個實現(單生產者和多生產者)都實現了用于在生產者和消費者間快速準確傳遞數據的并發算法。
  • Sequence Barrier:由Sequencer產生,持有Sequencer的主要發布Sequence和任意獨立消費者的Sequence的索引。它包含判斷是否有可供消費者處理的可用事件的邏輯。
  • Wait Strategy:等待策略決定了一個消費者如何等待生產者發布到Disruptor的事件。
  • Event:生產者傳遞給消費者的數據單元。用戶自定義。
  • EventProcessor:處理Disruptor事件的主要循環,擁有消費者的Sequence。有一個BatchEventProcessor包含了一個事件循環的高效實現,會在事件可用時回調用戶提供的EventHandler接口實現。
  • EventHandler:用戶實現接口,代表Disruptor的一個消費者。
  • Producer:用戶調用Disruptor進行入隊的代碼。在框架中沒有代碼表示。

多播事件

這是queue和Disruptor最大的行為區別。隊列中的一個事件只能被一個消費者消費,而Disruptor中的時間會發布給所有消費者。這是由于Disruptor意圖處理同一數據的獨立并行處理操作(譯注:類似JMS的topic模式)。比如LMAX中同一數據需要進行記錄日志、復制和業務邏輯操作。當然,在Disruptor中同時并行處理不同事件可以使用WorkerPool(譯注:類似JMS的queue模式中的多消費者實現)。但需要注意的是,由于這種特性并非是Disruptor的首要工作,所以使用WorkerPool可能并不是最高效的做法。
查看上圖,三個消費者JournalConsumer、ReplicationConsumer和ApplicationConsumer將會以相同順序接收Disruptor所有可用消息。這實現了這些消費者的并行工作。

消費者依賴圖

為了支持并發處理的真實世界應用,很有必要支持消費者間的協調工作。回顧上圖,在日志記錄和復制消費者完成工作前,有必要阻止業務邏輯消費者的進一步工作。我們稱這個概念為gating,更準確的說是這種行為的超集稱為gating。Gating發生在兩個地方:第一用來保證生產者不能超過消費者。這通過調用RingBuffer.addGatingConsumers()把相關消費者添加到Disruptor實現。第二,先前提到的情況是通過從必須首先完成其處理的組件構造包含序列的SequenceBarrier來實現的。
回顧圖1,有三個消費者監聽RingBuffer的事件。在這個例子中,有一個依賴圖。ApplicationConsumer依賴JournalConsumer和ReplicationConsumer。這意味著JournalConsumer和ReplicationConsumer可以相互并行運行。依賴關系可以從ApplicationConsumer的SequenceBarrier和JournalConsumer及ReplicationConsumer的Sequence觀察到。同時引起注意的是Sequencer和下游消費者的關系。它的一個角色是保證發布不會環繞RingBuffer。為了做到這點,下游消費者的Sequence不能小于RingBuffer的Sequence。然而,使用依賴圖會發生一個有趣的優化。由于ApplicationConsumer Sequence保證小于等于JournalConsumer和ReplicationConsumer(由依賴關系保證),Sequencer只需要觀察ApplicationConsumer的Sequence。從廣義上來說,Sequencer只需要關注依賴樹種葉子節點的消費者Sequence。

事件預分配

Disruptor的一個目標是可以用于低延遲環境中。在低延遲系統中,有必要減少或消除內存分配。在Java系統中,目標是減少由于垃圾回收造成的停頓次數(在低延遲的C/C++系統中,重內存分配會由于內存分配器的征用也可能導致問題)。
為了支持這個目標,用戶可以預分配Disruptor中事件的存儲。用戶提供的EventFactory會在Disruptor中RingBuffer每個條目構建時調用。當發布新數據到Disruptor時,有API供用戶調用來持有構建出的對象,這樣可以調用對象的方法或更新對象屬性。在正確實現下,Disruptor保證這些操作操作是并發安全的。

可選的無鎖

對低延遲的渴望造就的另一個實現細節是無鎖算法在Disruptor中的大量使用。所有內存可見性和正確性保證使用內存屏障和/或CAS操作實現。真正使用鎖的場景只有一個,那就是使用BlockingWatiStrategy。這樣做只為了使用Condition讓消費線程可以在等待新事件到達前進行park操作。許多低延遲系統使用忙等待(busy-wait)來避免使用Condition可能導致的抖動,然而一些系統的忙等待操作會導致性能的急劇下降,尤其是CPU資源被嚴重制約時。比方說在虛擬環境下的web服務器。

入門指南

基本的事件生產和消費

從簡單的事件開始:

public class LongEvent {private long value;public void set(long value){this.value = value;} }

為了讓Disruptor能夠預分配事件,我們需要提供一個EventFactory完成構建:

import com.lmax.disruptor.EventFactory;public class LongEventFactory implements EventFactory<LongEvent> {public LongEvent newInstance(){return new LongEvent();} }

事件定義好后,需要創建消費者處理這些事件。這里只做簡單的打印:

import com.lmax.disruptor.EventHandler;public class LongEventHandler implements EventHandler<LongEvent> {public void onEvent(LongEvent event, long sequence, boolean endOfBatch){System.out.println("Event: " + event);} }

我們還需要一個事件的生產源,舉個例子,假定數據是來自某種I/O設備,如網絡或文件的字節緩沖(ByteBuffer)。

import com.lmax.disruptor.RingBuffer;public class LongEventProducer {private final RingBuffer<LongEvent> ringBuffer;public LongEventProducer(RingBuffer<LongEvent> ringBuffer){this.ringBuffer = ringBuffer;}public void onData(ByteBuffer bb){long sequence = ringBuffer.next(); // Grab the next sequencetry{LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor// for the sequenceevent.set(bb.getLong(0)); // Fill with data}finally{ringBuffer.publish(sequence);}} }

可以發現,相比使用簡單的queue,事件的發布更具有相關性。這是由于需要事件預分配。事件發布需要(最低)2階段方式,先聲明環形緩沖器中的槽位,再發布可用數據。同時也需要把發布過程使用try/finally塊包裹起來。如果聲明了環形緩沖的一個槽位(通過調用RingBuffer.next())然后必須發布這個序列。如果沒有這么做,會導致Disruptor的狀態損壞(corruption)。特別地,在多生產者的情況下,這將會導致消費者阻塞,只能通過重啟解決。

使用3.x版本的Translator

Disruptor3.0提供了一種富Lambda風格的API,旨在幫助開發者屏蔽直接操作RingBuffer的復雜性,所以3.0以上版本發布消息更好的辦法是通過事件發布者(Event Publisher)或事件翻譯器(Event Translator)API。如下

import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.EventTranslatorOneArg;public class LongEventProducerWithTranslator {private final RingBuffer<LongEvent> ringBuffer;public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer){this.ringBuffer = ringBuffer;}private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =new EventTranslatorOneArg<LongEvent, ByteBuffer>(){public void translateTo(LongEvent event, long sequence, ByteBuffer bb){event.set(bb.getLong(0));}};public void onData(ByteBuffer bb){ringBuffer.publishEvent(TRANSLATOR, bb);} }

這種方法另一個好處是翻譯器代碼可以放到一個單獨的類中,以便于更容易進行單元測試。Disruptor提供了一些用于翻譯器的不同的接口(EventTranslator,EventTranslatorOneArg,EventTranslatorTwoArg,等)。這樣做的原因是,允許翻譯器表示為靜態類,或以非捕獲lambda表達式(使用java8時)作為翻譯方法參數,通過調用RingBuffer上的翻譯器進行傳遞。
最后一步是把上面這些步驟統一到一起。可以手工把這些組件都組裝到一起,但還是有點復雜,所以引入了DSL來簡化構建。盡管通過DSL的方式不能使用有些復雜選項,但這種方式還是適合絕大多數場景。

import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.RingBuffer; import java.nio.ByteBuffer; import java.util.concurrent.Executor; import java.util.concurrent.Executors;public class LongEventMain {public static void main(String[] args) throws Exception{// Executor that will be used to construct new threads for consumersExecutor executor = Executors.newCachedThreadPool();// The factory for the eventLongEventFactory factory = new LongEventFactory();// Specify the size of the ring buffer, must be power of 2.int bufferSize = 1024;// Construct the DisruptorDisruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, executor);// Connect the handlerdisruptor.handleEventsWith(new LongEventHandler());// Start the Disruptor, starts all threads runningdisruptor.start();// Get the ring buffer from the Disruptor to be used for publishing.RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();LongEventProducer producer = new LongEventProducer(ringBuffer);ByteBuffer bb = ByteBuffer.allocate(8);for (long l = 0; true; l++){bb.putLong(0, l);producer.onData(bb);Thread.sleep(1000);}} }

使用Java8

Disruptor API的設計影響之一是Java 8將依靠功能接口的概念作為Java Lambdas的類型聲明。 Disruptor API中的大多數接口定義符合功能接口的要求,因此可以使用Lambda而不是自定義類,這樣可以減少所需的重復代碼(boiler place)。

import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.RingBuffer; import java.nio.ByteBuffer; import java.util.concurrent.Executor; import java.util.concurrent.Executors;public class LongEventMain {public static void main(String[] args) throws Exception{// Executor that will be used to construct new threads for consumersExecutor executor = Executors.newCachedThreadPool();// Specify the size of the ring buffer, must be power of 2.int bufferSize = 1024;// Construct the DisruptorDisruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor);// Connect the handlerdisruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event));// Start the Disruptor, starts all threads runningdisruptor.start();// Get the ring buffer from the Disruptor to be used for publishing.RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();ByteBuffer bb = ByteBuffer.allocate(8);for (long l = 0; true; l++){bb.putLong(0, l);ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);Thread.sleep(1000);}} }

注意有一些類(如handler,translator)不再需要了。還要注意用于publishEvent()的lambda是如何引用傳入的參數的。如果使用如下代碼代替:

ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) {bb.putLong(0, l);ringBuffer.publishEvent((event, sequence) -> event.set(bb.getLong(0)));Thread.sleep(1000); }

這會創建一個capturing lambda,意味著需要實例化一個對象來持有ByteBuffer bb變量,通過調用publishEvent()來傳遞lambda。這樣會創建額外不必須的垃圾,所以如果需要低GC壓力就需要傳遞參數給lambda。
使用這種方法引用可以代替匿名的lamdba,以這種方式重寫這個例子是可能的。

import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.RingBuffer; import java.nio.ByteBuffer; import java.util.concurrent.Executor; import java.util.concurrent.Executors;public class LongEventMain {public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch){System.out.println(event);}public static void translate(LongEvent event, long sequence, ByteBuffer buffer){event.set(buffer.getLong(0));}public static void main(String[] args) throws Exception{// Executor that will be used to construct new threads for consumersExecutor executor = Executors.newCachedThreadPool();// Specify the size of the ring buffer, must be power of 2.int bufferSize = 1024;// Construct the DisruptorDisruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor);// Connect the handlerdisruptor.handleEventsWith(LongEventMain::handleEvent);// Start the Disruptor, starts all threads runningdisruptor.start();// Get the ring buffer from the Disruptor to be used for publishing.RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();ByteBuffer bb = ByteBuffer.allocate(8);for (long l = 0; true; l++){bb.putLong(0, l);ringBuffer.publishEvent(LongEventMain::translate, bb);Thread.sleep(1000);}} }

基本調優選項

使用上述的方法可以在最廣泛的部署場景中工作正常。然而,如果你能夠確定Disruptor將要運行的硬件和軟件環境,就可以調整參數提升性能。主要有以下兩種調優方式:單vs.多生產者和替換等待策略。

單vs.多生產者

提高并發系統性能的最佳方法之一就是遵守單作者原則(Single Writer Principle https://mechanical-sympathy.blogspot.tw/2011/09/single-writer-principle.html,這適用于Disruptor。如果你的情況是只有一個線程會在Disruptor中發布事件,那就可以利用此功能獲得額外的性能提升。

public class LongEventMain {public static void main(String[] args) throws Exception{//.....// Construct the Disruptor with a SingleProducerSequencerDisruptor<LongEvent> disruptor = new Disruptor(factory, bufferSize, ProducerType.SINGLE, new BlockingWaitStrategy(), executor);//.....} }

OneToOne 性能測試(https://github.com/LMAX-Exchange/disruptor/blob/master/src/perftest/java/com/lmax/disruptor/sequenced/OneToOneSequencedThroughputTest.java)可以說明這種技術能夠提升多少性能。以下測試使用i7 Sandy Bridge MacBook Air。
多生產者

Run 0, Disruptor=26,553,372 ops/sec
Run 1, Disruptor=28,727,377 ops/sec
Run 2, Disruptor=29,806,259 ops/sec
Run 3, Disruptor=29,717,682 ops/sec
Run 4, Disruptor=28,818,443 ops/sec
Run 5, Disruptor=29,103,608 ops/sec
Run 6, Disruptor=29,239,766 ops/sec

單生產者

Run 0, Disruptor=89,365,504 ops/sec
Run 1, Disruptor=77,579,519 ops/sec
Run 2, Disruptor=78,678,206 ops/sec
Run 3, Disruptor=80,840,743 ops/sec
Run 4, Disruptor=81,037,277 ops/sec
Run 5, Disruptor=81,168,831 ops/sec
Run 6, Disruptor=81,699,346 ops/sec

替換等待策略

BlockingWaitStategy

Disruptor默認的等待策略是BlockingWaitStategy。在BlockingWaitStategy內部使用一個典型的鎖和條件(a typical lock and condition)變量處理線程喚醒。BlockingWaitStategy是可用等待策略中最慢的,但也是在CPU使用上最保守的,同時也將在最廣泛的部署選項中提供最一致的行為。然而,再說一次,了解部署系統可以獲得額外的性能提升。

SleepingWaitStrategy

類似BlockingWaitStategy,SleepingWaitStrategy也試圖在CPU使用上保持保守,這通過一個忙等待(busy wait loop)循環實現,但在循環中間會調用LockSupport.parkNanos(1)。在一個典型的Linux系統,這樣會暫停線程大概60μs(譯注1μs=1000ns)。但它的好處是生產者線程除了增加響應的計數器外,不需要采取任何行動,而且不需要給條件變量發信號的成本(cost of signalling a condition variable)。然而,生產者和消費者轉移事件的平均延遲會增加。這種方式最好工作在不需要低延遲,但對生產者線程影響最小的情況下。一個常見的使用場景是異步日志。

YieldingWaitStrategy

可用于低延遲系統的兩種等待策略其中之一,這種策略通過消耗CPU時鐘周期來達到優化延遲的目的。這種策略使用忙循環(busy spin)等待正確的序號到達。在循環內部,Thread.yield()將被調用,來允許其他排隊中的線程運行。當需要很高的性能,而且事件處理者EventHandler的線程數少于CPU邏輯核心數時(比如使用超線程時),推薦使用這種策略。

BusySpinWaitStrategy

這種策略有最高的性能,但也有最高的部署邊境限制。這種等待策略應該只用于事件處理者線程小于CPU物理核心數。

清除環形緩沖的對象

使用Disruptor傳輸數據時,對象的存活周期有可能比預期更長。為了避免發生這種情況,有必要在事件處理完畢后做清理。如果有一個事件處理器,在這個事件處理器中做清理就足夠了。如果有一個事件處理鏈,那就可能會在鏈尾需要一個特定的處理器來清理這個對象。

class ObjectEvent<T> {T val;void clear(){val = null;} }public class ClearingEventHandler<T> implements EventHandler<ObjectEvent<T>> {public void onEvent(ObjectEvent<T> event, long sequence, boolean endOfBatch){// Failing to call clear here will result in the// object associated with the event to live until// it is overwritten once the ring buffer has wrapped// around to the beginning.event.clear();} }public static void main(String[] args) {Disruptor<ObjectEvent<String>> disruptor = new Disruptor<>(() -> ObjectEvent<String>(), bufferSize, executor);disruptor.handleEventsWith(new ProcessingEventHandler()).then(new ClearingObjectHandler()); }

總結

以上是生活随笔為你收集整理的在美团呆了7年的架构师带你解读Disruptor系列并发框架的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。