RabbitMQ封装实战
先說下背景:上周開始給項目添加曾經沒有過的消息中間件。雖然說,一路到頭非常容易,直接google,萬事不愁~可是生活遠不僅是眼前的“茍且”。首先是想使用其他項目使用過的一套對mq封裝的框架,融合進來。雖然折騰了上周六周日兩天,總算吧老框架融進項目中了,可是周一來公司和大數據哥們兒一聯調發現,收不到數據!所以沒辦法,當場使用原生那一套擼了個版本出來~可是,可是,可是,俗話說得好:生命在于折騰!在上周末融合老框架的時候,我把源碼讀了遍,發現了很多很好的封裝思想,Ok,這周末總算閑了下來,我就運用這個思想,封裝一個輕量級的唄,說干就干!
主要思想
說到封裝,我想,應該主要是要盡可能減小用戶使用的復雜度,盡量少的進行配置,書寫,甚至能盡量少的引入第三發或是原生類庫。所以在這種想法之下,這套框架的精髓主要在以下幾點:
- 使用注解,減少用戶配置
- 將不同的生產者消費者的初始化方式統一
- 初次注冊生產者或者消費者的時候,進行隊列的自動注冊
- 再統一的初始化方式中,使用動態代理的方式,代理到具體的生產者或是消費者的發送接收方法
在這種模式下,我們不用過多的配置,直接建立一個接口,接口上面使用注解聲明隊列的名稱,然后使用同一的Bean進行初始化,就齊活了!
統一初始化Bean的實現
不說啥,直接上代碼:
public class RabbitMQProducerFactoryBean<T> extends RabbitMQProducerInterceptor implements FactoryBean<T> {private Logger logger = LoggerFactory.getLogger(getClass());private Class<?> serviceInterface;@Autowiredprivate ConnectionFactory rabbitConnectionFactory;@Value("${mq.queue.durable}")private String durable;@Value("${mq.queue.exclusive}")private String exclusive;@Value("${mq.queue.autoDelete}")private String autoDelete;@SuppressWarnings("unchecked")/**這個方法很特殊,繼承自FactoryBean,就是說管理權歸屬IoC容器。每次注冊一個隊列的時候,并且注入到具體的service中使用的時候,就會調用這個getObject方法。所以,對于使用本類初始化的bean,其類型并非本類,而是本類的屬性serviceInterface類型,因為最終getObject的結果是返回了一個動態代理,代理到了serviceInterface。**/@Overridepublic T getObject() throws Exception {//初始化if (getQueueName() != null) {logger.info("指定的目標列隊名[{}],覆蓋接口定義。", getQueueName());} else {RPCQueueName name = serviceInterface.getAnnotation(RPCQueueName.class);if (name == null)throw new IllegalArgumentException("接口" + serviceInterface.getCanonicalName() + "沒有指定@RPCQueueName");setQueueName(name.value());}//創建隊列declareQueue();logger.info("建立MQ客戶端代理接口[{}],目標隊列[{}]。", serviceInterface.getCanonicalName(), getQueueName());return (T) Proxy.newProxyInstance(getClass().getClassLoader(), new Class<?>[]{serviceInterface}, this);//動態代理到目標接口}private void declareQueue() {Connection connection = rabbitConnectionFactory.createConnection();Channel channel = connection.createChannel(true);try {channel.queueDeclare(getQueueName(), Boolean.valueOf(durable), Boolean.valueOf(exclusive), Boolean.valueOf(autoDelete), null);logger.info("注冊隊列成功!");} catch (IOException e) {logger.warn("隊列注冊失敗", e);}} ......}public class RabbitMQProducerInterceptor implements InvocationHandler {private Logger logger = LoggerFactory.getLogger(getClass());private String queueName;@Autowiredprivate AmqpTemplate amqpTemplate;@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {Object sendObj;Class<?>[] parameterTypes = method.getParameterTypes();String methodName = method.getName();boolean isSendOneJson = Objects.nonNull(args) && args.length == 1 && (args[0] instanceof String);if (isSendOneJson) {sendObj = args[0];logger.info("發送單一json字符串消息:{}", (String) sendObj);} else {sendObj = new RemoteInvocation(methodName, parameterTypes, args);logger.info("發送封裝消息體:{}", JSONSerializeUtil.jsonSerializerNoType(sendObj));}logger.info("發送異步消息到[{}],方法名為[{}]", queueName, method.getName());//異步方式使用,同時要告知服務端不要發送響應amqpTemplate.convertAndSend(queueName, sendObj);return null;}...... }下面是核心的配置文件
<?xml version="1.0" encoding="UTF-8" standalone="no"?> <beans default-lazy-init="false"xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:p="http://www.springframework.org/schema/p" xmlns:rabbit="http://www.springframework.org/schema/rabbit"xmlns:task="http://www.springframework.org/schema/task"xsi:schemaLocation="http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsdhttp://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.1.xsdhttp://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"><rabbit:connection-factory id="rabbitConnectionFactory"host="${mq.host}" port="${mq.port}" virtual-host="${mq.vhost}"username="${mq.username}" password="${mq.password}" /><!-- 供自動創建隊列 --><rabbit:admin connection-factory="rabbitConnectionFactory" /><rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory"/><!-- 創建生產者 --><bean id="sendMsg" class="com.example.demo.RabbitMQProducerFactoryBean"><property name="serviceInterface" value="com.example.demo.ISendMsg" /></bean></beans>說明:每次要使用mq,直接導入這個基本配置,和基礎jar包即可。對于配置文件中的生產者聲明,已經直接簡化到三行,這一部分可以單獨創建一個類似于producer-config.xml專門的配置文件。
附屬類
這里主要就是涉及一個注解類:
@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) public @interface RPCQueueName {String value(); }說明:主要用于隊列名稱的聲明。可以拓展的再建立其他的注解類,并在RabbitMQProducerFactoryBean中進行具體的邏輯實現。對于未來功能添加,起到了非常好的解耦效果。
具體的接口:
@RPCQueueName("test.demo.ISendMsg") public interface ISendMsg {void sendMsg(String msg); }說明:這樣,就聲明了個隊列名叫test.demo.ISendMsg的生產者,每次講IsendMsg注入到要發送消息的Service里面,直接調用sendMsg即可向注解聲明的隊列發送消息了。
恩,開源
寫了個springboot的小demo:
github地址
接下來我會更新消費者的封裝,今天先放一放,出去動動。。哈哈
轉載于:https://www.cnblogs.com/1024Community/p/8688753.html
總結
以上是生活随笔為你收集整理的RabbitMQ封装实战的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Vue-router路由基础总结(一)
- 下一篇: Play! Framework 系列(四