日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

disruptor笔记之六:常见场景

發布時間:2024/3/24 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 disruptor笔记之六:常见场景 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

歡迎訪問我的GitHub

這里分類和匯總了欣宸的全部原創(含配套源碼):https://github.com/zq2599/blog_demos

《disruptor筆記》系列鏈接

  • 快速入門
  • Disruptor類分析
  • 環形隊列的基礎操作(不用Disruptor類)
  • 事件消費知識點小結
  • 事件消費實戰
  • 常見場景
  • 等待策略
  • 知識點補充(終篇)
  • 本篇概覽

    • 本文是《disruptor筆記》系列的第六篇,主要內容是將一些常用的消費模式做匯總,后續日常開發中如果有需要就能拿來即用;
    • 以下是常用的模式:
  • 多個消費者獨立消費,前文已實現,本篇跳過
  • 多個消費者共同消費,前文已實現,本篇跳過
  • 既有獨立消費,也有共同消費,前文已實現,本篇跳過
  • 多個生產者和多個獨立消費者:
  • C1、C2獨立消費,C3依賴C1和C2
  • C1獨立消費,C2和C3也獨立消費,但依賴C1,C4依賴C2和C3
  • C1和C2獨立消費,C3和C4也是獨立消費,但C3和C4都依賴C1和C2,然后C5依賴C3和C4
  • C1和C2共同消費,C3和C4也是共同消費,但C3和C4都依賴C1和C2,然后C5依賴C3和C4
  • C1和C2共同消費,C3和C4獨立消費,但C3和C4都依賴C1和C2,然后C5依賴C3和C4
  • C1和C2獨立消費,C3和C4是共同消費,但C3和C4都依賴C1和C2,然后C5依賴C3和C4
  • 關于本篇代碼

    • 為了省事兒,本次不會新建工程,而是直接使用前文的consume-mode模塊,因此,下面這些類直接就直接使用了,無需重寫代碼:
  • 事件定義:OrderEvent
  • 事件工廠:OrderEventFactory
  • 事件生產者:OrderEventProducer
  • 用在獨立消費場景的事件消費者:MailEventHandler
  • 用在共同消費場景的事件消費者:MailWorkHandler
  • 源碼下載

    • 本篇實戰中的完整源碼可在GitHub下載到,地址和鏈接信息如下表所示(https://github.com/zq2599/blog_demos):
    名稱鏈接備注
    項目主頁https://github.com/zq2599/blog_demos該項目在GitHub上的主頁
    git倉庫地址(https)https://github.com/zq2599/blog_demos.git該項目源碼的倉庫地址,https協議
    git倉庫地址(ssh)git@github.com:zq2599/blog_demos.git該項目源碼的倉庫地址,ssh協議
    • 這個git項目中有多個文件夾,本次實戰的源碼在disruptor-tutorials文件夾下,如下圖紅框所示:
    • disruptor-tutorials是個父工程,里面有多個module,本篇實戰的module是consume-mode,如下圖紅框所示:

    多個生產者和多個獨立消費者

    咱們即將實現下圖的邏輯:

    • 前面幾篇文章所有實戰的生產者都只有一個,到了本篇,為了讓consume-mode模塊的代碼能夠支持多生產者,咱們要對功能業務的抽象父類做以下兩處改動:
  • init方法原本為private型,現在為了能讓子類重此方法,將其改為protected類型;
  • 增加名為publishWithProducer2的方法,可見內部只有拋出異常,要想其正常工作,需要子類自己來實現:
  • public void publishWithProducer2(String value) throws Exception {throw new Exception("父類未實現此方法,請在子類中重寫此方法后再調用"); }
    • 為了實現多生產者功能,新增MultiProducerServiceImpl.java,有幾處要注意的地方稍后會提到:
    package com.bolingcavalry.service.impl;import com.bolingcavalry.service.*; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import lombok.Setter; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct;@Service("multiProducerService") public class MultiProducerServiceImpl extends ConsumeModeService {/*** 第二個生產者*/@Setterprotected OrderEventProducer producer2;@PostConstruct@Overrideprotected void init() {// 實例化disruptor = new Disruptor<>(new OrderEventFactory(),BUFFER_SIZE,new CustomizableThreadFactory("event-handler-"),// 生產類型是多生產者ProducerType.MULTI,// BlockingWaitStrategy是默認的等待策略new BlockingWaitStrategy());// 留給子類實現具體的事件消費邏輯disruptorOperate();// 啟動disruptor.start();// 第一個生產者setProducer(new OrderEventProducer(disruptor.getRingBuffer()));// 第二個生產者setProducer2(new OrderEventProducer(disruptor.getRingBuffer()));}@Overrideprotected void disruptorOperate() {// 一號消費者MailEventHandler c1 = new MailEventHandler(eventCountPrinter);// 二號消費者MailEventHandler c2 = new MailEventHandler(eventCountPrinter);// 調用handleEventsWithWorkerPool,表示創建的多個消費者以共同消費的模式消費disruptor.handleEventsWith(c1, c2);}@Overridepublic void publishWithProducer2(String value) throws Exception {producer2.onData(value);} }
    • 上述代碼有以下幾處要注意:
  • 重寫父類的init方法,主要是實例化Disruptor的時候,多傳入兩個參數:ProducerType.MULTI表示生產類型是多生產者,BlockingWaitStrategy是等待策略,之前的代碼中咱們沒有傳此參數時,默認的就是BlockingWaitStrategy
  • init方法中還執行了setProducer2方法,設置成員變量producer2
  • 重寫publishWithProducer2方法,調用成員變量producer2發表事件
  • 重寫disruptorOperate方法,里面設置了兩個獨立消費者
    • 驗證上述代碼的方式依舊是單元測試,打開ConsumeModeServiceTest.java,新增以下代碼,可見新增了兩個線程同時執行發布事件的操作:
    @Autowired@Qualifier("multiProducerService")ConsumeModeService multiProducerService;@Testpublic void testMultiProducerService() throws InterruptedException {log.info("start testMultiProducerService");CountDownLatch countDownLatch = new CountDownLatch(1);// 兩個生產者,每個生產100個事件,一共生產兩百個事件// 兩個獨立消費者,每人消費200個事件,因此一共消費400個事件int expectEventCount = EVENT_COUNT*4;// 告訴service,等消費到400個消息時,就執行countDownLatch.countDown方法multiProducerService.setCountDown(countDownLatch, expectEventCount);// 啟動一個線程,用第一個生產者生產事件new Thread(() -> {for(int i=0;i<EVENT_COUNT;i++) {log.info("publich {}", i);multiProducerService.publish(String.valueOf(i));}}).start();// 再啟動一個線程,用第二個生產者生產事件new Thread(() -> {for(int i=0;i<EVENT_COUNT;i++) {log.info("publishWithProducer2 {}", i);try {multiProducerService.publishWithProducer2(String.valueOf(i));} catch (Exception e) {e.printStackTrace();}}}).start();// 當前線程開始等待,前面的service.setCountDown方法已經告訴過service,// 等消費到expectEventCount個消息時,就執行countDownLatch.countDown方法// 千萬注意,要調用await方法,而不是wait方法!countDownLatch.await();// 消費的事件總數應該等于發布的事件數assertEquals(expectEventCount, multiProducerService.eventCount());}
    • 測試結果如下,測試通過,符合預期:

    C1、C2獨立消費,C3依賴C1和C2

    • 邏輯圖如下:
    • 實現代碼如下,非常簡單,依賴關系用then即可實現:
    package com.bolingcavalry.service.impl;import com.bolingcavalry.service.ConsumeModeService; import com.bolingcavalry.service.MailEventHandler; import com.bolingcavalry.service.MailWorkHandler; import com.bolingcavalry.service.SmsEventHandler; import org.springframework.stereotype.Service;@Service("scene5") public class Scene5 extends ConsumeModeService {@Overrideprotected void disruptorOperate() {MailEventHandler c1 = new MailEventHandler(eventCountPrinter);MailEventHandler c2 = new MailEventHandler(eventCountPrinter);MailEventHandler c3 = new MailEventHandler(eventCountPrinter);disruptor// C1、C2獨立消費.handleEventsWith(c1, c2)// C3依賴C1和C2.then(c3);} }
    • 單元測試代碼:
    @Autowired@Qualifier("scene5")Scene5 scene5;@Testpublic void testScene5 () throws InterruptedException {log.info("start testScene5");testConsumeModeService(scene5,EVENT_COUNT,// 三個獨立消費者,一共消費300個事件EVENT_COUNT * 3);}
    • 為了節省篇幅,測試結果就不貼了,要注意的是,每個事件都一定是C1和C2先消費過,才會被C3消費到;

    C1獨立消費,C2和C3也獨立消費,但依賴C1,C4依賴C2和C3

    • 邏輯圖如下:
    • 實現代碼如下:
    package com.bolingcavalry.service.impl;import com.bolingcavalry.service.ConsumeModeService; import com.bolingcavalry.service.MailEventHandler; import org.springframework.stereotype.Service;@Service("scene6") public class Scene6 extends ConsumeModeService {@Overrideprotected void disruptorOperate() {MailEventHandler c1 = new MailEventHandler(eventCountPrinter);MailEventHandler c2 = new MailEventHandler(eventCountPrinter);MailEventHandler c3 = new MailEventHandler(eventCountPrinter);MailEventHandler c4 = new MailEventHandler(eventCountPrinter);disruptor// C1.handleEventsWith(c1)// C2和C3也獨立消費.then(c2, c3)// C4依賴C2和C3.then(c4);} }
    • 單元測試代碼:
    @Autowired@Qualifier("scene6")Scene6 scene6;@Testpublic void testScene6 () throws InterruptedException {log.info("start testScene6");testConsumeModeService(scene6,EVENT_COUNT,// 四個獨立消費者,一共消費400個事件EVENT_COUNT * 4);}

    C1和C2獨立消費,C3和C4也是獨立消費,但C3和C4都依賴C1和C2,然后C5依賴C3和C4

    • 邏輯圖如下:
    • 實現代碼如下:
    package com.bolingcavalry.service.impl;import com.bolingcavalry.service.ConsumeModeService; import com.bolingcavalry.service.MailEventHandler; import org.springframework.stereotype.Service;@Service("scene7") public class Scene7 extends ConsumeModeService {@Overrideprotected void disruptorOperate() {MailEventHandler c1 = new MailEventHandler(eventCountPrinter);MailEventHandler c2 = new MailEventHandler(eventCountPrinter);MailEventHandler c3 = new MailEventHandler(eventCountPrinter);MailEventHandler c4 = new MailEventHandler(eventCountPrinter);MailEventHandler c5 = new MailEventHandler(eventCountPrinter);disruptor// C1和C2獨立消費.handleEventsWith(c1, c2)// C3和C4也是獨立消費,但C3和C4都依賴C1和C2.then(c3, c4)// 然后C5依賴C3和C4.then(c5);} }
    • 單元測試代碼:
    @Autowired@Qualifier("scene7")Scene7 scene7;@Testpublic void testScene7 () throws InterruptedException {log.info("start testScene7");testConsumeModeService(scene7,EVENT_COUNT,// 五個獨立消費者,一共消費500個事件EVENT_COUNT * 5);}

    C1和C2共同消費,C3和C4也是共同消費,但C3和C4都依賴C1和C2,然后C5依賴C3和C4

    • 邏輯圖如下:
    • 實現代碼如下:
    package com.bolingcavalry.service.impl;import com.bolingcavalry.service.ConsumeModeService; import com.bolingcavalry.service.MailEventHandler; import com.bolingcavalry.service.MailWorkHandler; import org.springframework.stereotype.Service;/*** @author will (zq2599@gmail.com)* @version 1.0* @description: C1和C2共同消費,C3和C4也是共同消費,但C3和C4都依賴C1和C2,然后C5依賴C3和C4* @date 2021/5/23 11:05*/ @Service("scene8") public class Scene8 extends ConsumeModeService {@Overrideprotected void disruptorOperate() {MailWorkHandler c1 = new MailWorkHandler(eventCountPrinter);MailWorkHandler c2 = new MailWorkHandler(eventCountPrinter);MailWorkHandler c3 = new MailWorkHandler(eventCountPrinter);MailWorkHandler c4 = new MailWorkHandler(eventCountPrinter);MailWorkHandler c5 = new MailWorkHandler(eventCountPrinter);disruptor// C1和C2共同消費.handleEventsWithWorkerPool(c1, c2)// C3和C4也是獨立消費,但C3和C4都依賴C1和C2.thenHandleEventsWithWorkerPool(c3, c4)// 然后C5依賴C3和C4.thenHandleEventsWithWorkerPool(c5);} }
    • 單元測試代碼:
    @Autowired@Qualifier("scene8")Scene8 scene8;@Testpublic void testScene8 () throws InterruptedException {log.info("start testScene8");testConsumeModeService(scene8,EVENT_COUNT,// C1和C2共同消費,C3和C4共同消費,C5雖然只是一個,但也是共同消費模式,// 也就是一共有三組消費者,所以一共消費300個事件EVENT_COUNT * 3);}

    C1和C2共同消費,C3和C4獨立消費,但C3和C4都依賴C1和C2,然后C5依賴C3和C4

    • 邏輯圖如下:
    • 實現代碼如下:
    package com.bolingcavalry.service.impl;import com.bolingcavalry.service.ConsumeModeService; import com.bolingcavalry.service.MailEventHandler; import com.bolingcavalry.service.MailWorkHandler; import org.springframework.stereotype.Service;@Service("scene9") public class Scene9 extends ConsumeModeService {@Overrideprotected void disruptorOperate() {MailWorkHandler c1 = new MailWorkHandler(eventCountPrinter);MailWorkHandler c2 = new MailWorkHandler(eventCountPrinter);MailEventHandler c3 = new MailEventHandler(eventCountPrinter);MailEventHandler c4 = new MailEventHandler(eventCountPrinter);MailEventHandler c5 = new MailEventHandler(eventCountPrinter);disruptor// C1和C2共同消費.handleEventsWithWorkerPool(c1, c2)// C3和C4獨立消費,但C3和C4都依賴C1和C2.then(c3, c4)// 然后C5依賴C3和C4.then(c5);} }
    • 單元測試代碼:
    @Autowired@Qualifier("scene9")Scene9 scene9;@Testpublic void testScene9 () throws InterruptedException {log.info("start testScene9");testConsumeModeService(scene9,EVENT_COUNT,// C1和C2共同消費(100個事件),// C3和C4獨立消費(200個事件),// C5獨立消費(100個事件),// 所以一共消費400個事件EVENT_COUNT * 4);}

    C1和C2獨立消費,C3和C4是共同消費,但C3和C4都依賴C1和C2,然后C5依賴C3和C4

    • 邏輯圖如下:
    • 實現代碼如下:
    package com.bolingcavalry.service.impl;import com.bolingcavalry.service.ConsumeModeService; import com.bolingcavalry.service.MailEventHandler; import com.bolingcavalry.service.MailWorkHandler; import org.springframework.stereotype.Service;@Service("scene10") public class Scene10 extends ConsumeModeService {@Overrideprotected void disruptorOperate() {MailEventHandler c1 = new MailEventHandler(eventCountPrinter);MailEventHandler c2 = new MailEventHandler(eventCountPrinter);MailWorkHandler c3 = new MailWorkHandler(eventCountPrinter);MailWorkHandler c4 = new MailWorkHandler(eventCountPrinter);MailEventHandler c5 = new MailEventHandler(eventCountPrinter);disruptor// C1和C2共同消費.handleEventsWith(c1, c2)// C3和C4是共同消費,但C3和C4都依賴C1和C2.thenHandleEventsWithWorkerPool(c3, c4)// 然后C5依賴C3和C4.then(c5);} }
    • 單元測試代碼:
    @Testpublic void testScene10 () throws InterruptedException {log.info("start testScene10");testConsumeModeService(scene10,EVENT_COUNT,// C1和C2獨立消費(200個事件),// C3和C4共同消費(100個事件),// C5獨立消費(100個事件),// 所以一共消費400個事件EVENT_COUNT * 4);}
    • 至此,一些常見場景的代碼已完成,希望本文能給您一些參考,幫您更得心應手的用好這個優秀的工具;

    你不孤單,欣宸原創一路相伴

  • Java系列
  • Spring系列
  • Docker系列
  • kubernetes系列
  • 數據庫+中間件系列
  • DevOps系列
  • 總結

    以上是生活随笔為你收集整理的disruptor笔记之六:常见场景的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。