异步CDI事件
幾天前,在我們的常規(guī)代碼審查中,我的一位同事提出了一個(gè)問(wèn)題,即如果可能,一次同時(shí)調(diào)用CDI觀察者(這樣的方法帶有參數(shù)@Observes的方法)會(huì)發(fā)生什么?用于不同的事件實(shí)例。 換句話說(shuō),在產(chǎn)生少量事件之后,以下方法是否可能同時(shí)被多個(gè)線程處理:
考慮一下之后,我決定運(yùn)行一些測(cè)試并在本文中描述結(jié)果。
最初的結(jié)果:發(fā)生了CDI事件以同步模式觸發(fā),這讓我有些驚訝。 為什么?
到目前為止,我是這樣看的:CDI觀察者允許我非常干凈地將事件生產(chǎn)者與事件消費(fèi)者分開(kāi),因此我沒(méi)有任何硬編碼的偵聽(tīng)器注冊(cè),維護(hù)偵聽(tīng)器列表并手動(dòng)通知它們。 CDI容器為我做一切。
因此,如果我們將生產(chǎn)者與消費(fèi)者完全分開(kāi),我認(rèn)為存在某種事件總線運(yùn)行在專門(mén)的線程執(zhí)行程序池中,該池負(fù)責(zé)注冊(cè)事件與調(diào)用的觀察者方法之間的中介。 我想我是基于其他事件/偵聽(tīng)器解決方案(例如Google Guava EventBus)的這一假設(shè)。 它們使您有機(jī)會(huì)定義是否要使用同步(默認(rèn), EventBus )或異步事件分派器( AsyncEventBus) 。
而且,如果EJB既是生產(chǎn)者又是消費(fèi)者,那么我認(rèn)為它具有與異步EJB調(diào)用相同的功能。 異步事件觀察器唯一可能的JTA事務(wù)屬性是: REQUIRED , REQUIRES_NEW或NOT_SUPPORTED 。
現(xiàn)在,這就是我期望的所有工作方式,這似乎與當(dāng)前狀態(tài)大不相同 。 現(xiàn)實(shí)生活表明CDI事件是同步的。
使異步事件在CDI 1.1中可用存在一個(gè)問(wèn)題,但是我不確定此功能的當(dāng)前狀態(tài)如何,并且在CDI 1.1(Java EE 7的一部分)中沒(méi)有找到有關(guān)此功能的信息。
讓我們看看如何獨(dú)自處理它。
目錄
默認(rèn)同步事件
讓我們從顯示問(wèn)題的基本示例開(kāi)始。 看一下代碼–首先,CDI Bean生產(chǎn)者:
@Path("/produce") public class EventGenerator {@Injectprivate Logger logger;@Injectprivate Event<MyEvent> events;@Path("/cdiBean/{eventsNum}")@GETpublic String generateEvents(@PathParam("eventsNum") int numberOfEventsToGenerate) {for (int i = 0; i < numberOfEventsToGenerate; i++) {MyEvent event = new MyEvent(i);logger.info("Generating Event: " + event);events.fire(event);}return "Finished. Generated " + numberOfEventsToGenerate + " events.";} }MyEvent只是一些事件對(duì)象,在這里并不是很重要。 它存儲(chǔ)我們?cè)趯?shí)例化時(shí)傳遞的事件序列號(hào)。
消費(fèi)者是一個(gè)非常簡(jiǎn)單的CDI Bean:
public class EventConsumer {@Injectprivate Logger logger;public void consumeEvent(@Observes MyEvent myEvent) throws InterruptedException {logger.info("Receiving event: " + myEvent);TimeUnit.MILLISECONDS.sleep(500);} }請(qǐng)注意,我已經(jīng)插入了一個(gè)線程睡眠來(lái)模擬一些長(zhǎng)時(shí)間運(yùn)行的事件接收器進(jìn)程。
現(xiàn)在,讓我們通過(guò)調(diào)用EventProducer公開(kāi)的REST命令來(lái)運(yùn)行此示例。 結(jié)果(運(yùn)行JBoss EAP 6.1 Alpha )將類似于以下內(nèi)容:
14:15:59,196 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [ seqNo = 0 ] 14:15:59,197 [com.piotrnowicki.EventConsumer](http- / 127.0 .0.1:8080-1)接收事件:MyEvent [ seqNo = 0 ] 14:15:59,697 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [ seqNo = 1 ] 14 :15:59,698 [com.piotrnowicki.EventConsumer](http- / 127.0.0.1:8080-1)接收事件:MyEvent [ seqNo = 1 ] 14:16:00,199 [com.piotrnowicki.EventGenerator](http- / 127.0。 0.1:8080-1)生成事件:MyEvent [ seqNo = 2 ] 14:16:00,200 [com.piotrnowicki.EventConsumer](http- / 127.0.0.1:8080-1)接收事件:MyEvent [ seqNo = 2 ]
它顯示了CDI事件的同步性質(zhì)–事件的產(chǎn)生和使用發(fā)生在同一線程中,一個(gè)接一個(gè)地發(fā)生。
那么,如何使用CDI實(shí)現(xiàn)異步事件?
解決方案1 ??– CDI生產(chǎn)者和Singleton EJB作為接收者
生產(chǎn)者堅(jiān)持使用–純CDI bean:
@Path("/produce") public class EventGenerator {@Path("/cdiBean/{eventsNum}")@GETpublic String generateEvents(@PathParam("eventsNum") int numberOfEventsToGenerate) { ... } }現(xiàn)在,如果您將接收器變成@Singleton EJB,并將observes方法標(biāo)記為@Asynchronous,如下所示:
@Singleton public class EventConsumer {@Asynchronouspublic void consumeEvent(@Observes MyEvent myEvent) throws InterruptedException { ... } }您將得到以下結(jié)果:
14:21:19,341 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [seqNo = 0] 14:21:19,343 [com.piotrnowicki.EventGenerator](http- / 127.0 .0.1:8080-1)生成事件:MyEvent [seqNo = 1] 14:21:19,343 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [seqNo = 2] 14 :21: 19,347 [com.piotrnowicki.EventConsumer](EJB默認(rèn)– 2)接收事件:MyEvent [seqNo = 1] 14:21: 19,848 [com.piotrnowicki.EventConsumer](EJB默認(rèn)– 1)接收事件:MyEvent [seqNo = 0] 14:21: 20,350 [com.piotrnowicki.EventConsumer](EJB默認(rèn)值– 3)接收事件:MyEvent [seqNo = 2]
事件是一個(gè)接一個(gè)地產(chǎn)生的,并且是在單獨(dú)的線程中產(chǎn)生的。SingletonEJB一次又一次地為它們提供服務(wù)(請(qǐng)查看事件處理的時(shí)間。)這是因?yàn)镾ingleton EJB的每個(gè)業(yè)務(wù)方法都具有隱式寫(xiě)鎖定。 因此,這是:
異步: 是
線程安全的觀察者方法: 是
解決方案2 –使用Singleton EJB作為具有讀取鎖定的接收器
這種方法與解決方案1非常相似,但是,由于所有事件處理都是并行進(jìn)行的,因此它為您提供了更高的吞吐量。
我們的生產(chǎn)者保持不變–它是一個(gè)CDI bean:
@Path("/produce") public class EventGenerator {@Path("/cdiBean/{eventsNum}")@GETpublic String generateEvents(@PathParam("eventsNum") int numberOfEventsToGenerate) { ... } }我們的使用者將@Lock(READ)添加到其@Lock(READ)方法中; 這使得能夠同時(shí)處理多個(gè)事件的魔力:
@Singleton public class EventConsumer {@Asynchronous@Lock(LockType.READ)public void consumeEvent(@Observes MyEvent myEvent) throws InterruptedException { ... } }結(jié)果就是這樣:
14:24:44,202 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [seqNo = 0] 14:24:44,204 [com.piotrnowicki.EventGenerator](http- / 127.0 .0.1:8080-1)生成事件:MyEvent [seqNo = 1] 14:24:44,205 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [seqNo = 2] 14 :24: 44,207 [com.piotrnowicki.EventConsumer](EJB默認(rèn)– 4)接收事件:MyEvent [seqNo = 0] 14:24: 44,207 [com.piotrnowicki.EventConsumer](EJB默認(rèn)– 6)接收事件:MyEvent [seqNo = 2] 14:24: 44,207 [com.piotrnowicki.EventConsumer](EJB默認(rèn)值– 5)接收事件:MyEvent [seqNo = 1]
同時(shí)服務(wù)事件的不同線程為您提供更大的吞吐量。 因此,這是:
異步: 是
線程安全的觀察者方法: 否
解決方案3 – EJB生產(chǎn)者和CDI使用者
CDI允許您觀察特定交易階段的事件。 您可以使用@Observes(during=TransactionPhase...)指定它。 在我們的情況下,我們希望CDI堆疊所有這些事件并僅在事務(wù)結(jié)束后才調(diào)用觀察者。 為此,我們只需將以上屬性添加到我們的CDI Bean觀察器中:
public class EventConsumer { public void consumeEvent(@Observes(during = TransactionPhase.AFTER_COMPLETION) MyEvent myEvent) { ... } }現(xiàn)在,我們只需要確保EventGenerator方法中有正在運(yùn)行的事務(wù)EventGenerator 。 我們可以通過(guò)將CDI Bean轉(zhuǎn)換為@Stateless EJB并使用其隱式REQUIRED TransactionAttribute來(lái)快速完成此操作:
@Stateless @Path("/produce") public class EventGenerator {@Path("/cdiBean/{eventsNum}")@GETpublic String generateEvents(@PathParam("eventsNum") int numberOfEventsToGenerate) { ... } }這是我們可能最終得到的結(jié)果:
14:39:06,776 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [seqNo = 0] 14:39:06,776 [com.piotrnowicki.EventGenerator](http- / 127.0 .0.1:8080-1)生成事件:MyEvent [seqNo = 1] 14:39:06,776 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [seqNo = 2] 14 :39: 06,778 [com.piotrnowicki.EventConsumer](http- / 127.0.0.1:8080-1)接收事件:MyEvent [seqNo = 2] 14:39: 07,279 [com.piotrnowicki.EventConsumer](http- / 127.0。 0.1:8080-1)接收事件:MyEvent [seqNo = 0] 14:39: 07,780 [com.piotrnowicki.EventConsumer](http- / 127.0.0.1:8080-1)
EJB EventGenerator啟動(dòng)事務(wù),并且只有在事務(wù)完成之后,才會(huì)以序列化的方式調(diào)用CDI bean觀察器。
異步: 是
線程安全的觀察者方法: 是
解決方案4 – EJB生產(chǎn)者和EJB使用者
這與解決方案3非常相似。我們的生成器保持不變(無(wú)狀態(tài)EJB):
@Stateless @Path("/produce") public class EventGenerator {@Path("/cdiBean/{eventsNum}")@GETpublic String generateEvents(@PathParam("eventsNum") int numberOfEventsToGenerate) { ... } }現(xiàn)在對(duì)EventConsumer進(jìn)行了更改:
@Singleton public class EventConsumer {@Asynchronous@Lock(LockType.READ)public void consumeEvent(@Observes(during = TransactionPhase.AFTER_COMPLETION) MyEvent myEvent) throws InterruptedException { ... } }結(jié)果可能如下:
14:44:09,363 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [seqNo = 0] 14:44:09,464 [com.piotrnowicki.EventGenerator](http- / 127.0 .0.1:8080-1)生成事件:MyEvent [seqNo = 1] 14:44:09,564 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [seqNo = 2] 14 :44: 09,670 [com.piotrnowicki.EventConsumer](EJB默認(rèn)– 8)接收事件:MyEvent [seqNo = 2] 14: 44 : 09,670 [com.piotrnowicki.EventConsumer](EJB默認(rèn)– 2)接收事件:MyEvent [seqNo = 1] 14:44: 09,670 [com.piotrnowicki.EventConsumer](EJB默認(rèn)– 1)接收事件:MyEvent [seqNo = 0]
我們?cè)谶@里使用了兩個(gè)功能–一個(gè)是事件使用者方法是異步的,第二個(gè)是在生產(chǎn)者事務(wù)完成之前不會(huì)通知使用者。 這給我們:
異步: 是
線程安全的觀察者方法: 否
解決方案4與解決方案2
這兩個(gè)解決方案似乎是相同的。 它們僅與消費(fèi)者的注釋不同: @Observes與@Observes(during = TransactionPhase.AFTER_COMPLETION) 。 此外,對(duì)于我們的測(cè)試用例,它們的行為相同: 它們是異步的,并且多個(gè)線程可以同時(shí)處理事件接收器 。 但是,它們之間有一個(gè)很大的區(qū)別。
在我們的測(cè)試案例中,我們一個(gè)接一個(gè)地觸發(fā)事件。 想象一下,事件觸發(fā)之間還有其他操作。 在這種情況下:
- 解決方案2( @Observes )將在第一個(gè)事件觸發(fā)后立即開(kāi)始處理事件,
- 解決方案4( @Observes(during = TransactionPhase.AFTER_COMPLETION) )將在事務(wù)完成后立即開(kāi)始處理,因此將觸發(fā)所有事件。
這顯示了這種情況的可能結(jié)果:
解決方案2( @Observes )
15:01:34,318 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [ seqNo = 0 ] 15:01:34,320 [com.piotrnowicki.EventConsumer](EJB默認(rèn)– 3 )接收事件:MyEvent [ seqNo = 0 ] 15:01:34,419 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [ seqNo = 1 ] 15:01:34,420 [com .piotrnowicki.EventConsumer](EJB默認(rèn)– 6)接收事件:MyEvent [ seqNo = 1 ] 15:01:34,520 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [ seqNo = 2 ] 15:01:34,521 [com.piotrnowicki.EventConsumer](EJB默認(rèn)值– 9)接收事件:MyEvent [ seqNo = 2 ]
解決方案4( @Observes(during = TransactionPhase.AFTER_COMPLETION) )
15:00:41,126 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [ seqNo = 0 ] 15:00:41,226 [com.piotrnowicki.EventGenerator](http- / 127.0 .0.1:8080-1)生成事件:MyEvent [ seqNo = 1 ] 15:00:41,326 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [ seqNo = 2 ] 15 :00:41,432 [com.piotrnowicki.EventConsumer](EJB默認(rèn)– 10)接收事件:MyEvent [ seqNo = 2 ] 15:00:41,432 [com.piotrnowicki.EventConsumer](EJB默認(rèn)– 4)接收事件:MyEvent [ seqNo = 1 ] 15:00:41,432 [com.piotrnowicki.EventConsumer](EJB默認(rèn)值– 5)接收事件:MyEvent [ seqNo = 0 ]
解決方案5 – EJB生產(chǎn)者和CDI使用者II
到目前為止,我們已經(jīng)嘗試使接收器異步。 也有相反的方法–我們可以使事件生成器異步 。 我們可以通過(guò)將生產(chǎn)者標(biāo)記為@Stateless并調(diào)用自己的異步方法來(lái)觸發(fā)事件來(lái)實(shí)現(xiàn):
@Stateless @Path("/produce") public class EventGenerator {// ...@Resourceprivate SessionContext sctx;@Path("/cdiBean/{eventsNum}")@GETpublic String generateEvents(@PathParam("eventsNum") int numberOfEventsToGenerate) {for (int i = 0; i < numberOfEventsToGenerate; i++) {sctx.getBusinessObject(EventGenerator.class).fireEvent(new MyEvent(i));}return "Finished. Generated " + numberOfEventsToGenerate + " events.";}@Asynchronouspublic void fireEvent(final MyEvent event) {events.fire(event);} } 使用SessionContext仔細(xì)研究EJB自動(dòng)引用。 在這種情況下,這是必需的,因?yàn)槲覀兿M萜鞣峙晌覀兊姆椒ㄕ{(diào)用并添加它的異步性質(zhì)。 我們不希望使之成為本地呼叫,所以我們拒絕使用隱含的this對(duì)象。
另一方面,事件使用者是純CDI bean:
結(jié)果可能如下:
00:40:32,820 [com.piotrnowicki.EventGenerator](EJB默認(rèn)– 2)正在生成事件:MyEvent [seqNo = 1] 00:40:32,820 [com.piotrnowicki.EventGenerator](EJB默認(rèn)– 3)正在生成事件:MyEvent [ SEQNO = 2] 00:40:32820 [com.piotrnowicki.EventGenerator](EJB默認(rèn)- 1)產(chǎn)生事件:MyEvent [SEQNO = 0] 00:40:32821 [com.piotrnowicki.EventConsumer](EJB默認(rèn)- 1)接收事件:MyEvent [seqNo = 0] 00 : 40 : 32,821 [com.piotrnowicki.EventConsumer](EJB默認(rèn)– 2)接收事件:MyEvent [seqNo = 1] 00 : 40 : 32,821 [com.piotrnowicki.EventConsumer](EJB默認(rèn)– 3)接收事件:MyEvent [seqNo = 2]
異步: 是
線程安全的觀察者方法: 否
解決方案6 –使用JMS進(jìn)行CDI
這是Juliano Viana在他的博客上提出的解決方案。 它使用JMS作為事件總線。 生成CDI事件,然后由負(fù)責(zé)將該事件放入JMS主題/隊(duì)列的某個(gè)類獲取。 從主題/隊(duì)列中獲取消息的MDB正在生成一個(gè)調(diào)用實(shí)際接收者的事件。 這不僅為您提供了事件的異步傳遞,而且還為其添加了事務(wù)性質(zhì)。 例如,如果事件接收者無(wú)法處理該消息–它可以回滾該事務(wù),并且隊(duì)列將確保該消息將被重新發(fā)送(也許您的事件處理器下次將能夠處理此事件?)
結(jié)論
CDI 1.0不支持異步事件生成。 CDI 1.1似乎也沒(méi)有這種支持。
但是,這并不意味著您無(wú)法實(shí)現(xiàn)異步處理。 已經(jīng)存在基于EJB 3.1或現(xiàn)有CDI觀察器屬性的現(xiàn)有解決方案。 您還應(yīng)該能夠編寫(xiě)可移植的CDI擴(kuò)展 ,以將此功能添加到代碼中。
翻譯自: https://www.javacodegeeks.com/2013/05/asynchronous-cdi-events.html
總結(jié)
- 上一篇: 2023 东京电玩展直播时间表公布,将提
- 下一篇: Maven部署到Nexus