spring boot 整合 谷歌guava的EventBus 實現單機版的消息發布訂閱
大型分布式系統,直接用mq解耦,那么單機系統怎么辦,可以考慮用EventBus
用EventBus的好處也是異步解耦,和mq的類似,可以勉強認為是單機版的mq
先解釋,后附示例代碼(jdk1.8)
EventBus 處理的事情類似觀察者模式,基于事件驅動,觀察者們監聽自己感興趣的特定事件,進行相應的處理。
流程是這樣的:
1、先自定義一個注解@EventBusListener,所有異步實現的類都加這個注解,代表自己是一個監聽器
2、再定義一個實現類,就用這個@EventBusListener注解,代表自己是監聽器
類中方法用@Subscribe注解,方法入參類和后面寫的發送異步或同步消息的類保持一致即可,這個類也叫消息體,是這個消息的注意內容,比如訂單號id,消息體最好是自己定義的不同的類
3、核心的類是EventBusCenter,負責將帶有@EventBusListener注解的bean注冊成一個監聽器,并提供發送異步或同步消息的方法入口
4、業務service中注入EventBusCenter,并調用發送異步或同步消息的方法,方法入參是消息體,不同的消息體對應不同的發布訂閱,訂閱的時候也是根據這個消息體來區分不同類型的消息的
簡單的源碼解讀
核心類是:com.google.common.eventbus.SubscriberRegistry
里面有個靜態變量:subscriberMethodsCache
類在初始化的時候,會給這個靜態變量復制,拿到所有的@Subscribe注釋的方法和對應類的映射關系
然后我們代碼中會注冊監聽器,就把方法和監聽器對應上了
至此,方法的入參類型和方法和類的對應關系就知道了,key就是方法參數類型,value就是一組對應的方法和類
用的時候,調用asyncEventBus.post(event);
入參event就是真正的消息體類型,然后會根據這個類型去上面找對應的方法和類,然后獲取bean,調用
所以說,這個發布訂閱,就是通過消息體類型去唯一識別的方法的。
對比mq的主題概念,這個消息體類型,就可以看成是主題的意思。
下面附示例代碼
模擬OrderService中訂單創建后,發送訂單創建的異步事件,再發送訂單修改的異步事件,再發送訂單修改的同步事件
訂閱端是OrderChangeListener和OrderChangeListener02兩個訂閱
OrderChangeListener訂閱了訂單創建和訂單修改事件
OrderChangeListener02訂閱了訂單創建事件
執行流程是:
啟動springboot,注冊bean的時候遇到EventBusCenter,開始注冊OrderChangeListener和OrderChangeListener02為監聽器
啟動springboot后立即執行FistRun類,里面直接調用訂單創建方法,發布訂單創建和修改的消息
OrderChangeListener和OrderChangeListener02消費消息,完
<dependency><groupId>org
.projectlombok
</groupId
><artifactId>lombok
</artifactId
><optional>true</optional
>
</dependency
>
<dependency><groupId>com
.google
.guava
</groupId
><artifactId>guava
</artifactId
><version>22.0</version
>
</dependency
>
@Target({ElementType
.TYPE
})
@Retention(RetentionPolicy
.RUNTIME
)
public @
interface EventBusListener {
}
@Component
public class EventBusCenter {private EventBus syncEventBus
= new EventBus();private AsyncEventBus asyncEventBus
= new AsyncEventBus(Executors
.newCachedThreadPool());public void postSync(Object event
) {syncEventBus
.post(event
);}public void postAsync(Object event
) {asyncEventBus
.post(event
);}@PostConstructpublic void init() {List
<Object> listeners
= SpringContextUtils
.getBeansWithAnnotation(EventBusListener
.class);for (Object listener
: listeners
) {asyncEventBus
.register(listener
);syncEventBus
.register(listener
);}}
}
@Component
public class SpringContextUtils implements BeanFactoryPostProcessor {private static ConfigurableListableBeanFactory beanFactory
;@Overridepublic void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory
) throws BeansException
{SpringContextUtils
.beanFactory
= configurableListableBeanFactory
;}public static <T> T
getBean(String name
) throws BeansException
{return (T
) beanFactory
.getBean(name
);}public static <T> T
getBean(Class
<T> clz
) throws BeansException
{T result
= beanFactory
.getBean(clz
);return result
;}public static <T> List
<T> getBeansOfType(Class
<T> type
) {return beanFactory
.getBeansOfType(type
).entrySet().stream().map(entry
-> entry
.getValue()).collect(Collectors
.toList());}public static List
<Object> getBeansWithAnnotation(Class
<? extends Annotation> annotationType
) {Map
<String, Object> beansWithAnnotation
= beanFactory
.getBeansWithAnnotation(annotationType
);return beansWithAnnotation
.entrySet().stream().map(entry
-> entry
.getValue()).collect(Collectors
.toList());
}
}
@Data
public class OrderCreatedEvent {private long orderId
;private long userId
;public OrderCreatedEvent(long orderId
, long userId
) {this.setOrderId(orderId
);this.setUserId(userId
);}
}
@Data
public class OrderChangeEvent {private long orderId
;private long userId
;public OrderChangeEvent(long orderId
, long userId
) {this.setOrderId(orderId
);this.setUserId(userId
);}
}
@Component
@EventBusListener
@Slf4j
public class OrderChangeListener {@Subscribepublic void created(OrderCreatedEvent event
) throws InterruptedException
{long orderId
= event
.getOrderId();Thread
.sleep(300);log
.info("訂單創建監聽,發送短信,orderId=" + orderId
);}@Subscribepublic void change(OrderChangeEvent event
) throws InterruptedException
{long orderId
= event
.getOrderId();Thread
.sleep(200);log
.info("訂單修改監聽,物流變化,orderId=" + orderId
);}
}
@Component
@EventBusListener
@Slf4j
public class OrderChangeListener02 {@Subscribepublic void created(OrderCreatedEvent event
) {long orderId
= event
.getOrderId();long userId
= event
.getUserId();log
.info("訂單創建監聽02,修改庫存,orderId=" + orderId
);}}
@Service
@Slf4j
public class OrderService {@Autowiredprivate EventBusCenter eventBusCenter
;public void createOrder() throws InterruptedException
{eventBusCenter
.postAsync(new OrderCreatedEvent(1L
, 1L
));System
.out
.println("發送異步事件,訂單創建");eventBusCenter
.postAsync(new OrderChangeEvent(1L
, 1L
));System
.out
.println("發送異步事件,訂單修改");Thread
.sleep(500);try {System
.out
.println("發送同步事件,訂單修改,開始");eventBusCenter
.postSync(new OrderChangeEvent(1L
, 1L
));System
.out
.println("發送同步事件,訂單修改,結束");} catch (Exception e
) {log
.error("發送同步事件,抓異常");}}
}
@Component
@Slf4j
@Order(1)
public class FistRun implements CommandLineRunner {@Autowiredprivate OrderService orderService
;@Overridepublic void run(String
... args
) throws Exception
{log
.info("FistRun start===============");orderService
.createOrder();}
}
總結
以上是生活随笔為你收集整理的spring boot 整合 谷歌guava的EventBus 实现单机版的消息发布订阅的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。