生活随笔
收集整理的這篇文章主要介紹了
disruptor笔记之六:常见场景
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
歡迎訪問我的GitHub
這里分類和匯總了欣宸的全部原創(含配套源碼):https://github.com/zq2599/blog_demos
《disruptor筆記》系列鏈接
快速入門Disruptor類分析環形隊列的基礎操作(不用Disruptor類)事件消費知識點小結事件消費實戰常見場景等待策略知識點補充(終篇)
本篇概覽
- 本文是《disruptor筆記》系列的第六篇,主要內容是將一些常用的消費模式做匯總,后續日常開發中如果有需要就能拿來即用;
- 以下是常用的模式:
多個消費者獨立消費,前文已實現,本篇跳過多個消費者共同消費,前文已實現,本篇跳過既有獨立消費,也有共同消費,前文已實現,本篇跳過多個生產者和多個獨立消費者:
C1、C2獨立消費,C3依賴C1和C2
C1獨立消費,C2和C3也獨立消費,但依賴C1,C4依賴C2和C3:
C1和C2獨立消費,C3和C4也是獨立消費,但C3和C4都依賴C1和C2,然后C5依賴C3和C4:
C1和C2共同消費,C3和C4也是共同消費,但C3和C4都依賴C1和C2,然后C5依賴C3和C4:
C1和C2共同消費,C3和C4獨立消費,但C3和C4都依賴C1和C2,然后C5依賴C3和C4:
C1和C2獨立消費,C3和C4是共同消費,但C3和C4都依賴C1和C2,然后C5依賴C3和C4:
關于本篇代碼
- 為了省事兒,本次不會新建工程,而是直接使用前文的consume-mode模塊,因此,下面這些類直接就直接使用了,無需重寫代碼:
事件定義:OrderEvent事件工廠:OrderEventFactory事件生產者:OrderEventProducer用在獨立消費場景的事件消費者:MailEventHandler用在共同消費場景的事件消費者:MailWorkHandler
源碼下載
- 本篇實戰中的完整源碼可在GitHub下載到,地址和鏈接信息如下表所示(https://github.com/zq2599/blog_demos):
名稱鏈接備注
| 項目主頁 | https://github.com/zq2599/blog_demos | 該項目在GitHub上的主頁 |
| git倉庫地址(https) | https://github.com/zq2599/blog_demos.git | 該項目源碼的倉庫地址,https協議 |
| git倉庫地址(ssh) | git@github.com:zq2599/blog_demos.git | 該項目源碼的倉庫地址,ssh協議 |
- 這個git項目中有多個文件夾,本次實戰的源碼在disruptor-tutorials文件夾下,如下圖紅框所示:
- disruptor-tutorials是個父工程,里面有多個module,本篇實戰的module是consume-mode,如下圖紅框所示:
多個生產者和多個獨立消費者
咱們即將實現下圖的邏輯:
- 前面幾篇文章所有實戰的生產者都只有一個,到了本篇,為了讓consume-mode模塊的代碼能夠支持多生產者,咱們要對功能業務的抽象父類做以下兩處改動:
init方法原本為private型,現在為了能讓子類重此方法,將其改為protected類型;增加名為publishWithProducer2的方法,可見內部只有拋出異常,要想其正常工作,需要子類自己來實現:
public void publishWithProducer2(String value
) throws Exception {throw new Exception("父類未實現此方法,請在子類中重寫此方法后再調用");
}
- 為了實現多生產者功能,新增MultiProducerServiceImpl.java,有幾處要注意的地方稍后會提到:
package com.bolingcavalry.service.impl;import com.bolingcavalry.service.*;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import lombok.Setter;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;@Service("multiProducerService")
public class MultiProducerServiceImpl extends ConsumeModeService {@Setterprotected OrderEventProducer producer2
;@PostConstruct@Overrideprotected void init() {disruptor
= new Disruptor<>(new OrderEventFactory(),BUFFER_SIZE
,new CustomizableThreadFactory("event-handler-"),ProducerType.MULTI
,new BlockingWaitStrategy());disruptorOperate();disruptor
.start();setProducer(new OrderEventProducer(disruptor
.getRingBuffer()));setProducer2(new OrderEventProducer(disruptor
.getRingBuffer()));}@Overrideprotected void disruptorOperate() {MailEventHandler c1
= new MailEventHandler(eventCountPrinter
);MailEventHandler c2
= new MailEventHandler(eventCountPrinter
);disruptor
.handleEventsWith(c1
, c2
);}@Overridepublic void publishWithProducer2(String value
) throws Exception {producer2
.onData(value
);}
}
重寫父類的init方法,主要是實例化Disruptor的時候,多傳入兩個參數:ProducerType.MULTI表示生產類型是多生產者,BlockingWaitStrategy是等待策略,之前的代碼中咱們沒有傳此參數時,默認的就是BlockingWaitStrategyinit方法中還執行了setProducer2方法,設置成員變量producer2重寫publishWithProducer2方法,調用成員變量producer2發表事件重寫disruptorOperate方法,里面設置了兩個獨立消費者
- 驗證上述代碼的方式依舊是單元測試,打開ConsumeModeServiceTest.java,新增以下代碼,可見新增了兩個線程同時執行發布事件的操作:
@Autowired@Qualifier("multiProducerService")ConsumeModeService multiProducerService
;@Testpublic void testMultiProducerService() throws InterruptedException {log
.info("start testMultiProducerService");CountDownLatch countDownLatch
= new CountDownLatch(1);int expectEventCount
= EVENT_COUNT
*4;multiProducerService
.setCountDown(countDownLatch
, expectEventCount
);new Thread(() -> {for(int i
=0;i
<EVENT_COUNT
;i
++) {log
.info("publich {}", i
);multiProducerService
.publish(String.valueOf(i
));}}).start();new Thread(() -> {for(int i
=0;i
<EVENT_COUNT
;i
++) {log
.info("publishWithProducer2 {}", i
);try {multiProducerService
.publishWithProducer2(String.valueOf(i
));} catch (Exception e
) {e
.printStackTrace();}}}).start();countDownLatch
.await();assertEquals(expectEventCount
, multiProducerService
.eventCount());}
C1、C2獨立消費,C3依賴C1和C2
- 邏輯圖如下:
- 實現代碼如下,非常簡單,依賴關系用then即可實現:
package com.bolingcavalry.service.impl;import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import com.bolingcavalry.service.MailWorkHandler;
import com.bolingcavalry.service.SmsEventHandler;
import org.springframework.stereotype.Service;@Service("scene5")
public class Scene5 extends ConsumeModeService {@Overrideprotected void disruptorOperate() {MailEventHandler c1
= new MailEventHandler(eventCountPrinter
);MailEventHandler c2
= new MailEventHandler(eventCountPrinter
);MailEventHandler c3
= new MailEventHandler(eventCountPrinter
);disruptor
.handleEventsWith(c1
, c2
).then(c3
);}
}
@Autowired@Qualifier("scene5")Scene5 scene5
;@Testpublic void testScene5
() throws InterruptedException {log
.info("start testScene5");testConsumeModeService(scene5
,EVENT_COUNT
,EVENT_COUNT
* 3);}
- 為了節省篇幅,測試結果就不貼了,要注意的是,每個事件都一定是C1和C2先消費過,才會被C3消費到;
C1獨立消費,C2和C3也獨立消費,但依賴C1,C4依賴C2和C3
package com.bolingcavalry.service.impl;import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import org.springframework.stereotype.Service;@Service("scene6")
public class Scene6 extends ConsumeModeService {@Overrideprotected void disruptorOperate() {MailEventHandler c1
= new MailEventHandler(eventCountPrinter
);MailEventHandler c2
= new MailEventHandler(eventCountPrinter
);MailEventHandler c3
= new MailEventHandler(eventCountPrinter
);MailEventHandler c4
= new MailEventHandler(eventCountPrinter
);disruptor
.handleEventsWith(c1
).then(c2
, c3
).then(c4
);}
}
@Autowired@Qualifier("scene6")Scene6 scene6
;@Testpublic void testScene6
() throws InterruptedException {log
.info("start testScene6");testConsumeModeService(scene6
,EVENT_COUNT
,EVENT_COUNT
* 4);}
C1和C2獨立消費,C3和C4也是獨立消費,但C3和C4都依賴C1和C2,然后C5依賴C3和C4
package com.bolingcavalry.service.impl;import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import org.springframework.stereotype.Service;@Service("scene7")
public class Scene7 extends ConsumeModeService {@Overrideprotected void disruptorOperate() {MailEventHandler c1
= new MailEventHandler(eventCountPrinter
);MailEventHandler c2
= new MailEventHandler(eventCountPrinter
);MailEventHandler c3
= new MailEventHandler(eventCountPrinter
);MailEventHandler c4
= new MailEventHandler(eventCountPrinter
);MailEventHandler c5
= new MailEventHandler(eventCountPrinter
);disruptor
.handleEventsWith(c1
, c2
).then(c3
, c4
).then(c5
);}
}
@Autowired@Qualifier("scene7")Scene7 scene7
;@Testpublic void testScene7
() throws InterruptedException {log
.info("start testScene7");testConsumeModeService(scene7
,EVENT_COUNT
,EVENT_COUNT
* 5);}
C1和C2共同消費,C3和C4也是共同消費,但C3和C4都依賴C1和C2,然后C5依賴C3和C4
package com.bolingcavalry.service.impl;import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import com.bolingcavalry.service.MailWorkHandler;
import org.springframework.stereotype.Service;
@Service("scene8")
public class Scene8 extends ConsumeModeService {@Overrideprotected void disruptorOperate() {MailWorkHandler c1
= new MailWorkHandler(eventCountPrinter
);MailWorkHandler c2
= new MailWorkHandler(eventCountPrinter
);MailWorkHandler c3
= new MailWorkHandler(eventCountPrinter
);MailWorkHandler c4
= new MailWorkHandler(eventCountPrinter
);MailWorkHandler c5
= new MailWorkHandler(eventCountPrinter
);disruptor
.handleEventsWithWorkerPool(c1
, c2
).thenHandleEventsWithWorkerPool(c3
, c4
).thenHandleEventsWithWorkerPool(c5
);}
}
@Autowired@Qualifier("scene8")Scene8 scene8
;@Testpublic void testScene8
() throws InterruptedException {log
.info("start testScene8");testConsumeModeService(scene8
,EVENT_COUNT
,EVENT_COUNT
* 3);}
C1和C2共同消費,C3和C4獨立消費,但C3和C4都依賴C1和C2,然后C5依賴C3和C4
package com.bolingcavalry.service.impl;import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import com.bolingcavalry.service.MailWorkHandler;
import org.springframework.stereotype.Service;@Service("scene9")
public class Scene9 extends ConsumeModeService {@Overrideprotected void disruptorOperate() {MailWorkHandler c1
= new MailWorkHandler(eventCountPrinter
);MailWorkHandler c2
= new MailWorkHandler(eventCountPrinter
);MailEventHandler c3
= new MailEventHandler(eventCountPrinter
);MailEventHandler c4
= new MailEventHandler(eventCountPrinter
);MailEventHandler c5
= new MailEventHandler(eventCountPrinter
);disruptor
.handleEventsWithWorkerPool(c1
, c2
).then(c3
, c4
).then(c5
);}
}
@Autowired@Qualifier("scene9")Scene9 scene9
;@Testpublic void testScene9
() throws InterruptedException {log
.info("start testScene9");testConsumeModeService(scene9
,EVENT_COUNT
,EVENT_COUNT
* 4);}
C1和C2獨立消費,C3和C4是共同消費,但C3和C4都依賴C1和C2,然后C5依賴C3和C4
package com.bolingcavalry.service.impl;import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import com.bolingcavalry.service.MailWorkHandler;
import org.springframework.stereotype.Service;@Service("scene10")
public class Scene10 extends ConsumeModeService {@Overrideprotected void disruptorOperate() {MailEventHandler c1
= new MailEventHandler(eventCountPrinter
);MailEventHandler c2
= new MailEventHandler(eventCountPrinter
);MailWorkHandler c3
= new MailWorkHandler(eventCountPrinter
);MailWorkHandler c4
= new MailWorkHandler(eventCountPrinter
);MailEventHandler c5
= new MailEventHandler(eventCountPrinter
);disruptor
.handleEventsWith(c1
, c2
).thenHandleEventsWithWorkerPool(c3
, c4
).then(c5
);}
}
@Testpublic void testScene10
() throws InterruptedException {log
.info("start testScene10");testConsumeModeService(scene10
,EVENT_COUNT
,EVENT_COUNT
* 4);}
- 至此,一些常見場景的代碼已完成,希望本文能給您一些參考,幫您更得心應手的用好這個優秀的工具;
你不孤單,欣宸原創一路相伴
Java系列Spring系列Docker系列kubernetes系列數據庫+中間件系列DevOps系列
總結
以上是生活随笔為你收集整理的disruptor笔记之六:常见场景的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。