日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

spring boot 整合 谷歌guava的EventBus 实现单机版的消息发布订阅

發布時間:2024/7/23 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spring boot 整合 谷歌guava的EventBus 实现单机版的消息发布订阅 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

spring boot 整合 谷歌guava的EventBus 實現單機版的消息發布訂閱

大型分布式系統,直接用mq解耦,那么單機系統怎么辦,可以考慮用EventBus
用EventBus的好處也是異步解耦,和mq的類似,可以勉強認為是單機版的mq

先解釋,后附示例代碼(jdk1.8)

EventBus 處理的事情類似觀察者模式,基于事件驅動,觀察者們監聽自己感興趣的特定事件,進行相應的處理。
流程是這樣的:
1、先自定義一個注解@EventBusListener,所有異步實現的類都加這個注解,代表自己是一個監聽器
2、再定義一個實現類,就用這個@EventBusListener注解,代表自己是監聽器
類中方法用@Subscribe注解,方法入參類和后面寫的發送異步或同步消息的類保持一致即可,這個類也叫消息體,是這個消息的注意內容,比如訂單號id,消息體最好是自己定義的不同的類
3、核心的類是EventBusCenter,負責將帶有@EventBusListener注解的bean注冊成一個監聽器,并提供發送異步或同步消息的方法入口
4、業務service中注入EventBusCenter,并調用發送異步或同步消息的方法,方法入參是消息體,不同的消息體對應不同的發布訂閱,訂閱的時候也是根據這個消息體來區分不同類型的消息的

簡單的源碼解讀

核心類是:com.google.common.eventbus.SubscriberRegistry
里面有個靜態變量:subscriberMethodsCache
類在初始化的時候,會給這個靜態變量復制,拿到所有的@Subscribe注釋的方法和對應類的映射關系
然后我們代碼中會注冊監聽器,就把方法和監聽器對應上了
至此,方法的入參類型和方法和類的對應關系就知道了,key就是方法參數類型,value就是一組對應的方法和類

用的時候,調用asyncEventBus.post(event);
入參event就是真正的消息體類型,然后會根據這個類型去上面找對應的方法和類,然后獲取bean,調用

所以說,這個發布訂閱,就是通過消息體類型去唯一識別的方法的。
對比mq的主題概念,這個消息體類型,就可以看成是主題的意思。

下面附示例代碼

模擬OrderService中訂單創建后,發送訂單創建的異步事件,再發送訂單修改的異步事件,再發送訂單修改的同步事件
訂閱端是OrderChangeListener和OrderChangeListener02兩個訂閱
OrderChangeListener訂閱了訂單創建和訂單修改事件
OrderChangeListener02訂閱了訂單創建事件

執行流程是:

啟動springboot,注冊bean的時候遇到EventBusCenter,開始注冊OrderChangeListener和OrderChangeListener02為監聽器
啟動springboot后立即執行FistRun類,里面直接調用訂單創建方法,發布訂單創建和修改的消息
OrderChangeListener和OrderChangeListener02消費消息,完

<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional> </dependency> <dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>22.0</version> </dependency> @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface EventBusListener { } @Component public class EventBusCenter {// 管理同步事件private EventBus syncEventBus = new EventBus();// 管理異步事件private AsyncEventBus asyncEventBus = new AsyncEventBus(Executors.newCachedThreadPool());public void postSync(Object event) {syncEventBus.post(event);}public void postAsync(Object event) {asyncEventBus.post(event);}@PostConstructpublic void init() {// 獲取所有帶有 @EventBusListener 的 bean,將他們注冊為監聽者List<Object> listeners = SpringContextUtils.getBeansWithAnnotation(EventBusListener.class);for (Object listener : listeners) {asyncEventBus.register(listener);syncEventBus.register(listener);}} } @Component public class SpringContextUtils implements BeanFactoryPostProcessor {private static ConfigurableListableBeanFactory beanFactory;@Overridepublic void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {SpringContextUtils.beanFactory = configurableListableBeanFactory;}public static <T> T getBean(String name) throws BeansException {return (T) beanFactory.getBean(name);}public static <T> T getBean(Class<T> clz) throws BeansException {T result = beanFactory.getBean(clz);return result;}public static <T> List<T> getBeansOfType(Class<T> type) {return beanFactory.getBeansOfType(type).entrySet().stream().map(entry -> entry.getValue()).collect(Collectors.toList());}// 上面的例子用到了這個public static List<Object> getBeansWithAnnotation(Class<? extends Annotation> annotationType) {Map<String, Object> beansWithAnnotation = beanFactory.getBeansWithAnnotation(annotationType);// java 8 的寫法,將 map 的 value 收集起來到一個 list 中return beansWithAnnotation.entrySet().stream().map(entry -> entry.getValue()).collect(Collectors.toList());// java 7 // List<Object> result = new ArrayList<>(); // for (Map.Entry<String, Object> entry : beansWithAnnotation.entrySet()) { // result.add(entry.getValue()); // } // return result;} } @Data public class OrderCreatedEvent {private long orderId;private long userId;public OrderCreatedEvent(long orderId, long userId) {this.setOrderId(orderId);this.setUserId(userId);} } @Data public class OrderChangeEvent {private long orderId;private long userId;public OrderChangeEvent(long orderId, long userId) {this.setOrderId(orderId);this.setUserId(userId);} } @Component @EventBusListener @Slf4j public class OrderChangeListener {@Subscribepublic void created(OrderCreatedEvent event) throws InterruptedException {long orderId = event.getOrderId();Thread.sleep(300);log.info("訂單創建監聽,發送短信,orderId=" + orderId);}@Subscribepublic void change(OrderChangeEvent event) throws InterruptedException {long orderId = event.getOrderId();Thread.sleep(200);log.info("訂單修改監聽,物流變化,orderId=" + orderId);} } @Component @EventBusListener @Slf4j public class OrderChangeListener02 {@Subscribepublic void created(OrderCreatedEvent event) {long orderId = event.getOrderId();long userId = event.getUserId();// 訂單創建成功后的各種操作,如發短信、發郵件等等。// 注意,事件可以被訂閱多次,也就是說可以有很多方法監聽 OrderCreatedEvent 事件,// 所以沒必要在一個方法中處理發短信、發郵件、更新庫存等log.info("訂單創建監聽02,修改庫存,orderId=" + orderId);}} @Service @Slf4j public class OrderService {@Autowiredprivate EventBusCenter eventBusCenter;public void createOrder() throws InterruptedException {// 創建訂單// 發送異步事件eventBusCenter.postAsync(new OrderCreatedEvent(1L, 1L));System.out.println("發送異步事件,訂單創建");eventBusCenter.postAsync(new OrderChangeEvent(1L, 1L));System.out.println("發送異步事件,訂單修改");//發送同步事件Thread.sleep(500);try {System.out.println("發送同步事件,訂單修改,開始");eventBusCenter.postSync(new OrderChangeEvent(1L, 1L));System.out.println("發送同步事件,訂單修改,結束");} catch (Exception e) {log.error("發送同步事件,抓異常");}} } @Component @Slf4j @Order(1) public class FistRun implements CommandLineRunner {@Autowiredprivate OrderService orderService;@Overridepublic void run(String... args) throws Exception {log.info("FistRun start===============");orderService.createOrder();} }

總結

以上是生活随笔為你收集整理的spring boot 整合 谷歌guava的EventBus 实现单机版的消息发布订阅的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。