mqtt 发送消息过多_阿里云MQTT服务端注解式消息处理分发与同步调用实践小结
一、前言
前段時(shí)間公司預(yù)研了設(shè)備app端與服務(wù)端的交互方案,出于多方面考量最終選用了阿里云的微服務(wù)隊(duì)列MQTT方案,基于此方案,本人主要實(shí)踐有:
1. 封裝了RocketMQ實(shí)現(xiàn)MQTT訂閱與發(fā)布的實(shí)現(xiàn)細(xì)節(jié);
2. 實(shí)現(xiàn)了注解式分發(fā)處理,可利用如MqttController, MqttTopicMapping等相關(guān)自定義注解的方式來(lái)統(tǒng)一訂閱MQTT的Topic以及消息處理的分發(fā);
3. 使用了一套請(qǐng)求和響應(yīng)的同步機(jī)制來(lái)達(dá)到PUB/SUB異步通信的偽同步調(diào)用。
Github 地址點(diǎn)此鏈接
二、RocketMQ的接入細(xì)節(jié)
1. 為什么服務(wù)端要使用RocketMQ接入
阿里云微消息隊(duì)列MQTT是在以消息隊(duì)列 RocketMQ 為核心存儲(chǔ)的基礎(chǔ)上,實(shí)現(xiàn)更適合移動(dòng)互聯(lián)網(wǎng)和IoT領(lǐng)域的無(wú)狀態(tài)網(wǎng)關(guān),兩者之間具備天然的數(shù)據(jù)互通性。MQTT實(shí)例本身并不提供消息數(shù)據(jù)持久化功能,消息數(shù)據(jù)持久化需要搭配后端的消息存儲(chǔ)實(shí)例來(lái)使用。因此現(xiàn)階段每一個(gè)阿里云MQTT實(shí)例都必須配套一個(gè)消息存儲(chǔ)實(shí)例,即RocketMQ實(shí)例來(lái)提供消息數(shù)據(jù)持久化功能,因此他們之間可以說(shuō)是消息互通的,即可用RocketMQ訂閱的方式來(lái)消費(fèi)用MQTT協(xié)議發(fā)布的消息,同理也可用 MQTT協(xié)議訂閱的方式來(lái)消費(fèi)RocketMQ發(fā)布的消息。
幫助文檔也給出了以下兩種產(chǎn)品的區(qū)別說(shuō)明:
微消息隊(duì)列MQTT基于MQTT協(xié)議實(shí)現(xiàn),單個(gè)客戶(hù)端的處理能力較弱。因此,微消息隊(duì)列MQTT適用于擁有大量在線客戶(hù)端(很多企業(yè)設(shè)備端過(guò)萬(wàn),甚至上百萬(wàn)),但每個(gè)客戶(hù)端消息較少的場(chǎng)景。相比之下,消息隊(duì)列RocketMQ是面向服務(wù)端的消息引擎,主要用于服務(wù)組件之間的解耦、異步通知、削峰填谷等,服務(wù)器規(guī)模較小(極少企業(yè)服務(wù)器規(guī)模過(guò)萬(wàn)),但需要大量的消息處理,吞吐量要求高。因此,消息隊(duì)列RocketMQ適用于服務(wù)端進(jìn)行大批量的數(shù)據(jù)處理和分析的場(chǎng)景。
基于以上區(qū)別,官方也推薦在移動(dòng)端設(shè)備上使用微消息隊(duì)列MQTT,而在服務(wù)端應(yīng)用中則使用消息隊(duì)列RocketMQ,具體則可以通過(guò) MQTT SDK 以公網(wǎng)訪問(wèn)方式來(lái)實(shí)現(xiàn)設(shè)備間的通信,通過(guò)MQ SDK以?xún)?nèi)網(wǎng)方式來(lái)實(shí)現(xiàn)服務(wù)端通信。
2. RocketMQ如何對(duì)接
RocketMQ與MQTT在消息結(jié)構(gòu)和一些屬性字段上都有一定的映射關(guān)系,具體內(nèi)容(摘自幫助文檔)如下。
微消息隊(duì)列MQTT使用MQTT協(xié)議接入,而消息隊(duì)列RocketMQ使用的是私有協(xié)議,因此,兩者的關(guān)鍵概念存在如下映射關(guān)系。
MQ與MQTT消息結(jié)構(gòu)映射關(guān)系如上圖所示,MQTT協(xié)議中Topic是多級(jí)結(jié)構(gòu),而消息隊(duì)列RocketMQ的Topic 僅有一級(jí),因此,MQTT中的一級(jí)Topic映射到消息隊(duì)列RocketMQ的Topic,而二級(jí)和三級(jí)Topic則映射到消息隊(duì)列RocketMQ的消息屬性(Properties)中。
消息隊(duì)列 RocketMQ 協(xié)議中的消息(Message)可以擁有自定義屬性(Properties),而MQTT協(xié)議目前的版本不支持屬性,但為了方便對(duì)MQTT協(xié)議中的Header信息和設(shè)備信息進(jìn)行溯源,MQTT的部分信息將被映射到 RocketMQ的消息屬性中,方便使用消息隊(duì)列RocketMQ的SDK接入的用戶(hù)獲取。
目前,微消息隊(duì)列MQTT和消息隊(duì)列RocketMQ支持的屬性字段映射表如下圖所示。使用消息隊(duì)列RocketMQ的SDK的應(yīng)用和使用消息隊(duì)列MQTT的SDK的應(yīng)用進(jìn)行交互時(shí),可以通過(guò)讀寫(xiě)這些屬性字段來(lái)達(dá)到信息獲取或者設(shè)置的目的。
屬性字段映射關(guān)系3. RocketMQ對(duì)MQTT消息訂閱的實(shí)現(xiàn)
Properties properties = new Properties(); // 在控制臺(tái)創(chuàng)建的Group ID properties.put(PropertyKeyConst.GROUP_ID, "xxx"); // 阿里云AccessKey properties.put(PropertyKeyConst.AccessKey, "xxx"); // 阿里云SecretKey properties.put(PropertyKeyConst.SecretKey, "xxx"); // 在RocketMQ控制臺(tái)的實(shí)例基本信息中可查看到的TCP協(xié)議接入點(diǎn) properties.put(PropertyKeyConst.NAMESRV_ADDR,"xxx"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("topic", "*", new MessageListener() { //訂閱全部 Tagpublic Action consume(Message message, ConsumeContext context) {//獲得mqtt消息中的第一級(jí)topicString mqttFirstTopic = message.getTopic();//獲得mqtt消息中除去1級(jí)后的所有topicString mqttSecondTopic = message.getUserProperties(PropertyKeyConst.MqttSecondTopic);//獲得mqtt消息中的messageIdString messageId = message.getUserProperties("UNIQ_KEY");//獲得mqtt消息中的消息體String messageBody = new String(message.getBody());//...return Action.CommitMessage;}}); consumer.start();實(shí)現(xiàn)主要注意2點(diǎn):
- 這邊的 MQ 只需要訂閱 MQTT 的一級(jí) Topic 。如果 MQTT 會(huì)發(fā)布2個(gè) Topic 的消息 robot/alarm 和 robot/task/test ,則在此處只需要訂閱 robot 這個(gè)第一級(jí)Topic即可。
- MQTT 的一些屬性字段可以從 RocketMQ 消息 Message 的 userProperties 字段中獲得,比如上面代碼中通過(guò) message.getUserProperties(PropertyKeyConst.MqttSecondTopic); 可以獲得 MQTT 中的 除去1級(jí)后的所有 Topic 字符串,如上述舉例的2個(gè) Topic 可分別獲得 /alarm 和 /task/test。 具體能夠獲得哪些字段可以參考上一節(jié)的屬性字段映射表,也可自行查看 PropertyKeyConst 類(lèi)中定義的一些字符串常量來(lái)大概知曉。
使用阿里云MQTT控制臺(tái)發(fā)送一個(gè)MQTT消息,如圖所示:
MQTT控制臺(tái)發(fā)送消在程序中加一個(gè)斷點(diǎn)獲得當(dāng)前Message對(duì)象的字段如下:
Message消息體上圖可看到userProperties中的一些值,比如qoslevel,mqttSecondTopic等,這些字段都可以在PropertyKeyConst 類(lèi)中找到對(duì)應(yīng)的字符串常量,但是UNIQ_KEY,cleansessionflag等PropertyKeyConst 類(lèi)中并沒(méi)有對(duì)應(yīng)的字符串常量,這邊暫時(shí)就message.getUserProperties("UNIQ_KEY")這樣使用自定義字符量來(lái)獲得。
4. RocketMQ對(duì)MQTT消息發(fā)布的實(shí)現(xiàn)
Properties properties = new Properties(); // 在控制臺(tái)創(chuàng)建的Group ID properties.put(PropertyKeyConst.GROUP_ID, "xxx"); // 阿里云AccessKey properties.put(PropertyKeyConst.AccessKey, "xxx"); // 阿里云SecretKey properties.put(PropertyKeyConst.SecretKey, "xxx"); // 在RocketMQ控制臺(tái)的實(shí)例基本信息中可查看到的TCP協(xié)議接入點(diǎn) properties.put(PropertyKeyConst.NAMESRV_ADDR,"xxx"); //設(shè)置發(fā)送超時(shí)時(shí)間,單位毫秒 properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000"); Producer producer = ONSFactory.createProducer(properties); // 在發(fā)送消息前,必須調(diào)用 start 方法來(lái)啟動(dòng) Producer,只需調(diào)用一次即可 producer.start();//發(fā)送一個(gè)mqtt消息 String parentTopic = topic.substring(0, topic.indexOf("/")); String subTopic = topic.substring(topic.indexOf("/")); Message msg = new Message(parentTopic, "", message.getBytes()); msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, subTopic); msg.putUserProperties(PropertyKeyConst.MqttQOS, qos); msg.putUserProperties("cleansessionflag", "" + cleanSessionFlag); SendResult result = producer.send(msg);- 該代碼僅實(shí)現(xiàn)了普通消息的同步發(fā)送,若需發(fā)送順序消息、延時(shí)消息等,可參考SDK幫助文檔創(chuàng)建不同的Producer實(shí)現(xiàn)即可。
- 上述代碼將需要發(fā)送的MQTT全量Topic拆分成1級(jí)與2級(jí),1級(jí)Topic設(shè)置為MQ中的Topic參數(shù),2級(jí)Topic字符串則設(shè)為userProperties中PropertyKeyConst.MqttSecondTopic的,其他屬相如qoslevel和cleansessionflag等也是通過(guò)userProperties的相關(guān)字段來(lái)設(shè)置。
三、注解式分發(fā)處理的實(shí)現(xiàn)
1. 前置知識(shí)點(diǎn)
1.1 BeanPostProcessor
BeanPostProcessor是Spring IOC容器給我們提供的一個(gè)擴(kuò)展接口。BeanPostProcessor接口定義了兩個(gè)方法:
public interface BeanPostProcessor {// 前置處理Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException;// 后置處理Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException; }Spring中Bean的整個(gè)生命周期如圖所示:
Bean生命周期postProcessBeforeInitialization()方法與postProcessAfterInitialization()分別對(duì)應(yīng)圖中前置處理和后置處理兩個(gè)步驟將執(zhí)行的方法。這兩個(gè)方法中都傳入了bean對(duì)象實(shí)例的引用,為擴(kuò)展容器的對(duì)象實(shí)例化過(guò)程提供了很大便利,在這兒幾乎可以對(duì)傳入的實(shí)例執(zhí)行任何操作。
可以看到,Spring容器通過(guò)BeanPostProcessor給了我們一個(gè)機(jī)會(huì)對(duì)Spring管理的bean進(jìn)行再加工,注解、AOP等功能的實(shí)現(xiàn)均大量使用了BeanPostProcessor。通過(guò)實(shí)現(xiàn)BeanPostProcessor的接口,在其中處理方法中判斷bean對(duì)象上是否有自定義的一些注解,如果有,則可以對(duì)這個(gè)bean實(shí)例繼續(xù)進(jìn)行其他操作,這也是本例中使用該接口要實(shí)現(xiàn)的主要目的。
1.2 ApplicationListener
在IOC的容器的啟動(dòng)過(guò)程,當(dāng)所有的bean都已經(jīng)處理完成之后,spring ioc容器會(huì)有一個(gè)發(fā)布事件的動(dòng)作。從 AbstractApplicationContext 的源碼中就可以看出:
protected void finishRefresh() {// Initialize lifecycle processor for this context.initLifecycleProcessor();// Propagate refresh to lifecycle processor first.getLifecycleProcessor().onRefresh();// Publish the final event.publishEvent(new ContextRefreshedEvent(this));// Participate in LiveBeansView MBean, if active.LiveBeansView.registerApplicationContext(this); }因此當(dāng)所有的bean都初始化完成并被成功裝載后會(huì)觸發(fā)ContextRefreshedEvent事件。
ApplicationListener是spring中用來(lái)監(jiān)聽(tīng)事件(ApplicationEvent)的傳遞,每個(gè)實(shí)現(xiàn)了ApplicationListener接口的bean都會(huì)收到ApplicationEvent對(duì)象的通知,每個(gè)ApplicationListener可根據(jù)事件類(lèi)型只接收處理自己感興趣的事件,因此利用實(shí)現(xiàn)ApplicationListener的接口可以收到監(jiān)聽(tīng)ContextRefreshedEvent動(dòng)作,然后可以寫(xiě)自己的一些處理邏輯,比如初始化環(huán)境,準(zhǔn)備測(cè)試數(shù)據(jù)、加載一些數(shù)據(jù)到內(nèi)存等等。用法如下:
@Component public class TestApplicationListener implements ApplicationListener<ContextRefreshedEvent>{@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {//todo:一些處理邏輯}}1.3 反射
Java反射機(jī)制是在運(yùn)行狀態(tài)中,對(duì)于任意一個(gè)類(lèi),都能夠知道這個(gè)類(lèi)的所有屬性和方法;對(duì)于任意一個(gè)對(duì)象,都能夠調(diào)用它的任意一個(gè)方法和屬性;這種動(dòng)態(tài)獲取的信息以及動(dòng)態(tài)調(diào)用對(duì)象的方法的功能稱(chēng)為java語(yǔ)言的反射機(jī)制。
在Java中,Class類(lèi)與java.lang.reflect類(lèi)庫(kù)一起對(duì)反射技術(shù)進(jìn)行了全力的支持。獲取Class對(duì)象有三種方式:
- 通過(guò)實(shí)例對(duì)象獲得:Class<?> class = object.getClass();
- 通過(guò)類(lèi)名獲得:Class<?> class = ClassName.class;
- 通過(guò)類(lèi)名全路徑獲得:Class<?> class = Class.forName("類(lèi)名全路徑");
反射包中常用的類(lèi)主要有
- Constructor,表示的類(lèi)的構(gòu)造方法信息,利用它可以在運(yùn)行時(shí)動(dòng)態(tài)創(chuàng)建對(duì)象
- Field,表示類(lèi)的成員變量信息,通過(guò)它可以在運(yùn)行時(shí)動(dòng)態(tài)修改成員變量的屬性值(包含private)
- Method,表示類(lèi)的成員方法信息,通過(guò)它可以動(dòng)態(tài)調(diào)用對(duì)象的方法(包含private)
下面說(shuō)明一下本例中用到的一些反射api:
//獲得Class對(duì)象 Class clazz= obj.getClass();//判斷注解B是否在此A上 boolean isAnnotation= A.isAnnotationPresent(B.class);//獲得該clazz上的注解對(duì)象 B b=clazz.getAnnotation(B.class));//獲得本類(lèi)以及父類(lèi)或者父接口中所有的公共方法 Method[] methods=clazz.getMethods();//獲取方法上的所有參數(shù) Parameter[] parameters = method.getParameters();//執(zhí)行某對(duì)象的方法,owner為該對(duì)象,paramValues為入?yún)?shù)組,method為Method對(duì)象 method.invoke(owner, paramValues);2. 整體實(shí)現(xiàn)思路
3. 實(shí)現(xiàn)細(xì)節(jié)
3.1 自定義注解
@MqttController在類(lèi)上使用,其中parentTopic值為需要監(jiān)聽(tīng)的1級(jí)Topic,其中使用@Component可以使其注解的類(lèi)實(shí)例化為為Bean對(duì)象放入到Spring容器中,基于此才能在利用BeanPostProcessor中獲得其對(duì)象。
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Component public @interface MqttController {/*** 監(jiān)聽(tīng)的父topic** @return 監(jiān)聽(tīng)的父topic*/String parentTopic(); }@MqttTopicMapping在方法上使用,其中subTopic的值為需要訂閱的子Topic,與1級(jí)Topic共同組成MQTT的Topic
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface MqttTopicMapping {/*** 訂閱的子topic,默認(rèn)可以只訂閱1級(jí)topic** @return 訂閱的子topic*/String subTopic() default ""; }MqttMessageBody在方法參數(shù)上使用,使得參數(shù)自動(dòng)獲得messageBody對(duì)象
@Target(ElementType.PARAMETER) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface MqttMessageBody {}MqttMessageId在方法參數(shù)上使用,使得參數(shù)自動(dòng)獲得messageId的值
@Target(ElementType.PARAMETER) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface MqttMessageId {}自定義注解的使用示例如下:
@Slf4j @MqttController(parentTopic = "robot1") public class MqttRobot1 {@MqttTopicMappingpublic void dealFirstTopic() {log.info("MqttRobot1.dealAlarm 收到消息啦,只處理了一級(jí)topic");}@MqttTopicMapping(subTopic = "alarm")public void dealAlarm(@MqttMessageId String messageId, @MqttMessageBody AlarmVo alarmVo) {log.info("MqttRobot1.dealAlarm 收到消息啦");log.info("messageId:{}", messageId);log.info("alarmVo:{}", alarmVo);}@MqttTopicMapping(subTopic = "task")public void dealTask() {log.info("MqttRobot1.dealTask 收到消息啦");} }3.2 提取Method和Bean對(duì)象
在MqttHandlerFactory類(lèi)中定義以下幾個(gè)容器,分別存儲(chǔ)mqtt處理類(lèi)的bean,mqtt處理方法以及parentTopic列表
/*** 用于存儲(chǔ)mqtt處理類(lèi)的bean,key為parentTopic/subTopic*/ private static Map<String, Object> mqttControllers = new HashMap<>();/*** 用于存儲(chǔ)mqtt處理方法,key為parentTopic/subTopic*/ private static Map<String, Method> mqttHandlers = new HashMap<>();/*** 存儲(chǔ)parentTopic列表*/ private static Set<String> parentTopicSet = new HashSet<>();利用BeanPostProcessor接口來(lái)處理實(shí)現(xiàn)了自定義注解的Bean對(duì)象。
具體代碼及注釋如下:
@Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {Class beanClazz = bean.getClass();//使用的MqttController注解的bean對(duì)象if (beanClazz.isAnnotationPresent(MqttController.class)) {//獲得MqttController的注解值//存儲(chǔ)parentTopic列表String parentTopic = ((MqttController) beanClazz.getAnnotation(MqttController.class)).parentTopic();MqttHandlerFactory.getParentTopicSet().add(parentTopic);for (Method method : beanClazz.getMethods()) {//獲得MqttTopicMapping的Method對(duì)象if (method.isAnnotationPresent(MqttTopicMapping.class)) {//獲得MqttTopicMapping的注解值String subTopic = method.getAnnotation(MqttTopicMapping.class).subTopic();String realTopic;if ("".equals(subTopic)) {realTopic = parentTopic + "/";} else {realTopic = (parentTopic + "/" + subTopic + "/").replaceAll("/+", "/");}if (null != MqttHandlerFactory.getMqttHandler(realTopic)) {throw new MqttBeansException(bean.getClass().getSimpleName() + " topic 重復(fù)定義,值為" + realTopic);}//存儲(chǔ)mqtt處理類(lèi)的beanMqttHandlerFactory.registerMqttHandler(realTopic, method);//存儲(chǔ)mqtt處理方法MqttHandlerFactory.registerMqttController(realTopic, bean);log.info("MqttHandler Mapped "{}" onto {}", realTopic, method.toString());}}}return bean; }3.3 mq的消息訂閱與處理
實(shí)現(xiàn)ApplicationListener接口在所有Bean對(duì)象加載完之后根據(jù)前面記錄的parentTopicSet作為所有需要訂閱的1級(jí)Topic開(kāi)始訂閱。
在訂閱消息處理中從message信息可以獲得其對(duì)應(yīng)的1級(jí)Topic與2級(jí)Topic,將其處理成MQTT的全Toic并從前面記錄的mqttHandlers,mqttControllers中獲得對(duì)應(yīng)的Method對(duì)象及Bean對(duì)象,從message信息中提取對(duì)應(yīng)的messageId級(jí)messageBody并設(shè)置為使用了@MqttMessageBody和@MqttMessageId的注解的參數(shù)中,利用反射method.invoke(MqttHandlerFactory.getMqttController(mqttTopic), paramValues);來(lái)實(shí)現(xiàn)方法的調(diào)用。
具體代碼及注釋如下:
@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {//...//設(shè)置mqConsumer實(shí)例化所需propertiesConsumer mqConsumer = ONSFactory.createConsumer(properties);Set<String> parentTopicSet = MqttHandlerFactory.getParentTopicSet();if (parentTopicSet.size() == 0) {log.warn("當(dāng)前應(yīng)用并未有任何topic訂閱");}//根據(jù)parentTopic和subTopic訂閱parentTopicSet.forEach(parentTopic -> {log.info("Add a new rocketMq subscription,topic:{}", parentTopic);mqConsumer.subscribe(parentTopic, "*", (message, context) -> {log.debug("MqReceive Message: " + message);//獲得topicString mqttFirstTopic = message.getTopic();String mqttSecondTopic = message.getUserProperties(PropertyKeyConst.MqttSecondTopic);if (null == mqttSecondTopic) {//只有1級(jí)topic的情況mqttSecondTopic = "/";}if (!"/".equals(mqttSecondTopic.substring(mqttSecondTopic.length() - 1))) {mqttSecondTopic += "/";}String mqttTopic = mqttFirstTopic + mqttSecondTopic;Method method = MqttHandlerFactory.getMqttHandler(mqttTopic);if (null == method) {log.warn("當(dāng)前沒(méi)有處理該topic的handler,topic:{}", mqttTopic);return Action.CommitMessage;} else {//獲得mqtt的一些數(shù)據(jù)String messageId = message.getUserProperties("UNIQ_KEY");String messageBody = new String(message.getBody());//處理入?yún)arameter[] parameters = method.getParameters();Object[] paramValues = new Object[parameters.length];for (int i = 0; i < parameters.length; i++) {if (parameters[i].isAnnotationPresent(MqttMessageId.class)) {//@MqttMessageId注解的參數(shù)paramValues[i] = messageId;} else if (parameters[i].isAnnotationPresent(MqttMessageBody.class)) {//@MqttMessageBody注解的參數(shù)Class parameterClazz = parameters[i].getType();try {paramValues[i] = JSONObject.parseObject(messageBody, parameterClazz);} catch (Exception e) {log.error("mqttMessageBody 格式錯(cuò)誤,messageId:{},messageBody:{}", messageId, messageBody);// return Action.ReconsumeLater;return Action.CommitMessage;}} else {//自己定義的一些參數(shù)就給null把paramValues[i] = null;}}try {method.invoke(MqttHandlerFactory.getMqttController(mqttTopic), paramValues);} catch (Exception e) {log.error("處理失敗啦");}}return Action.CommitMessage;});});mqConsumer.start();log.info("MqConsumer Started");}四、MQTT同步調(diào)用的實(shí)現(xiàn)
MQTT協(xié)議是基于PUB/SUB的異步通信模式,不適用于服務(wù)端同步控制設(shè)備端返回結(jié)果的場(chǎng)景。通過(guò)制定一套請(qǐng)求和響應(yīng)的同步機(jī)制,可以無(wú)需改動(dòng)MQTT協(xié)議來(lái)達(dá)到同步調(diào)用的目的。
1. 整體實(shí)現(xiàn)思路
MQTT的同步調(diào)用實(shí)際上是使用了兩個(gè)異步調(diào)用完成的,即生產(chǎn)者調(diào)用消費(fèi)者的同時(shí),自己也作為消費(fèi)者等待某一隊(duì)列的返回消息,消費(fèi)者接受到生產(chǎn)者的消息同時(shí),也作為消息發(fā)送者發(fā)送一消息給生產(chǎn)者。
具體同步調(diào)用機(jī)制示意如下:
同步調(diào)用示意圖首先服務(wù)端和設(shè)備端服務(wù)端都訂閱了相關(guān)的Topic,服務(wù)端發(fā)起同步調(diào)用即發(fā)布一個(gè)示意需同步返回的message到指定request Topic,設(shè)備端接收到該message后處理完業(yè)務(wù)邏輯則會(huì)將調(diào)用結(jié)果發(fā)布一個(gè)返回message到request消息體中攜帶的response Topic中,最后服務(wù)端接收到設(shè)備端返回的message可以從消息體中獲得其調(diào)用結(jié)果。
整個(gè)調(diào)用過(guò)程中客戶(hù)端需要做的工作有:
服務(wù)端需要做的工作有:
服務(wù)端處理異步為同步調(diào)用的邏輯借鑒了Dubbo底層將Netty的異步調(diào)用轉(zhuǎn)化成同步的方式,下面在實(shí)現(xiàn)細(xì)節(jié)中會(huì)具體闡述。
2. 實(shí)現(xiàn)細(xì)節(jié)
MQTT同步調(diào)用的代碼如下:
public String publishSync(String topic, String qos, boolean cleanSessionFlag, Object data, int timeout)throws MqttRemoteException {String parentTopic = topic.substring(0, topic.indexOf("/"));String subTopic = topic.substring(topic.indexOf("/"));String mId = UUID.randomUUID().toString().replaceAll("-", "");MqttMessage mqttMessage = new MqttMessage(mId, replyParentTopic + "/" + mId, data);Message msg = new Message(parentTopic, "", JSON.toJSONString(mqttMessage).getBytes());msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, subTopic);msg.putUserProperties(PropertyKeyConst.MqttQOS, qos);msg.putUserProperties("cleansessionflag", "" + cleanSessionFlag);MqttFuture mqttFuture = new MqttFuture(mqttMessage, timeout);try {producer.send(msg);} catch (ONSClientException e) {mqttFuture.cancel();throw e;}return mqttFuture.get(); }前部分就是正常RocketMQ發(fā)布MQTT消息的代碼,但是發(fā)送的消息是自定義的MqttMessage,同時(shí)這邊在調(diào)用producer.send(msg);前先構(gòu)建了一個(gè)MqttFuture對(duì)象然后發(fā)送完消息后使用mqttFuture.get();來(lái)獲得同步調(diào)用的結(jié)果。
MqttMessage定義如下,其中syncFlag表示該消息是否需要同步返回,mId表示該消息的唯一id,用于本地判斷消息返回具體映射哪個(gè)同步調(diào)用的key,replyTopic表示該消息需要返回消息的Topic,data則是具體業(yè)務(wù)數(shù)據(jù)。
@Data public class MqttMessage implements Serializable {private static final long serialVersionUID = 6648680154051903549L;/*** 是否需要同步返回,默認(rèn)為true*/private boolean syncFlag = true;/*** 生成的id,用uuid生成把*/private final String mId;/*** 客戶(hù)端返回消息的Topic*/private final String replyTopic;/*** 發(fā)送數(shù)據(jù)*/private Object data;public MqttMessage(String mId, String replyTopic, Object data) {this.mId = mId;this.replyTopic = replyTopic;this.data = data;} }MqttFuture對(duì)象則是用來(lái)處理同步調(diào)用的邏輯,每一個(gè)MqttFuture對(duì)象都有有一個(gè)mId作為唯一標(biāo)識(shí),發(fā)送的message消息體,調(diào)用返回結(jié)果MqttResponse(包括正常mqtt返回或者超時(shí)等異常返回結(jié)果)和同步調(diào)用超時(shí)時(shí)間,還有一個(gè)鎖及其創(chuàng)建的Condition,用來(lái)處理線程的等待與通知,后面會(huì)對(duì)其具體邏輯進(jìn)行分析。每創(chuàng)建一個(gè)新的MqttFuture對(duì)象都會(huì)將其放入到存儲(chǔ)MqttFuture的Map中,key即為該消息的mId。
@Slf4j @Data public class MqttFuture {public static final int DEFAULT_TIMEOUT = 1000;public static final Map<String, MqttFuture> FUTURES = new ConcurrentHashMap<>();private final Lock lock = new ReentrantLock();private final Condition done = lock.newCondition();/*** 唯一id*/private final String mId;/*** 發(fā)送的message消息體*/private final MqttMessage message;/*** 設(shè)置的同步調(diào)用超時(shí)時(shí)間*/private final int timeout;/*** 等待開(kāi)始時(shí)間*/private final long start = System.currentTimeMillis();/*** 返回結(jié)果*/private volatile MqttResponse response;public MqttFuture(MqttMessage message, int timeout) {this.message = message;this.timeout = timeout;this.mId = message.getMId();FUTURES.put(mId, this);}//...調(diào)用返回結(jié)果MqttResponse定義如下,如果是正常成功返回則返回狀態(tài)是OK且會(huì)有對(duì)應(yīng)的消息體messageResult,如果超時(shí)或者其他異常情況則會(huì)返回對(duì)應(yīng)的錯(cuò)誤消息errorMessage。
@Data public class MqttResponse {/*** ok狀態(tài),正常返回result,否則返回errorMessage*/public static final Integer OK = 20000;/*** 客戶(hù)端超時(shí)未處理*/public static final Integer TIMEOUT = 40001;/*** 服務(wù)端主動(dòng)取消*/public static final Integer CANCEL = 40002;private Integer mStatus = OK;/*** request生成的messageId*/private String mId;/*** 收到的消息體*/private String messageResult;/*** 狀態(tài)不是成功返回的錯(cuò)誤信息*/private String errorMessage;public MqttResponse(String mId) {this.mId = mId;} }在上述MQ同步調(diào)用代碼中,若調(diào)用producer.send(msg);同步發(fā)送mqtt消息失敗的話,則會(huì)調(diào)用mqttFuture.cancel();來(lái)取消該MqttFture對(duì)象,代碼如下,這邊主要是設(shè)置了異常的相應(yīng)response避過(guò)將該MqttFuture對(duì)象從存儲(chǔ)MqttFuture的Map中移除。
@Slf4j @Data public class MqttFuture {//...public void cancel() {MqttResponse errorResult = new MqttResponse(mId);errorResult.setMStatus(MqttResponse.CANCEL);errorResult.setErrorMessage("主動(dòng)請(qǐng)求取消");response = errorResult;FUTURES.remove(mId);}//...若調(diào)用producer.send(msg);同步發(fā)送mqtt消息成功的話則會(huì)調(diào)用mqttFuture.get();來(lái)獲得其同步調(diào)用的結(jié)果,具體代碼如下,首先判斷是否調(diào)用已完成(有響應(yīng)結(jié)果mqttResponse,包括正常獲得返回結(jié)果或者超時(shí)等其他異常的情況),若完成的話則直接返回調(diào)用結(jié)果或者拋出相應(yīng)的異常,若沒(méi)有調(diào)用完成則獲得鎖并循環(huán)判斷是否調(diào)用完成,沒(méi)有的話則調(diào)用done.await(timeout, TimeUnit.MILLISECONDS);來(lái)實(shí)現(xiàn)該線程的超時(shí)等待直至其他線程調(diào)用該Condition的signal方法將其喚醒。
@Slf4j @Data public class MqttFuture {//...public String get() throws MqttRemoteException {return this.get(timeout);}public String get(int timeout) throws MqttRemoteException {if (timeout <= 0) {timeout = DEFAULT_TIMEOUT;}if (!isDone()) {long start = System.currentTimeMillis();lock.lock();try {while (!isDone()) {done.await(timeout, TimeUnit.MILLISECONDS);if (isDone() || System.currentTimeMillis() - start > timeout) {break;}}} catch (InterruptedException e) {throw new RuntimeException(e);} finally {lock.unlock();}if (!isDone()) {throw new MqttTimeoutException("Waiting client-side response timeout");}}if (response == null) {throw new IllegalStateException("response cannot be null");}if (response.getMStatus().equals(MqttResponse.OK)) {return response.getMessageResult();}if (response.getMStatus().equals(MqttResponse.TIMEOUT)) {throw new MqttTimeoutException("Waiting client-side response timeout");}throw new MqttRemoteException(response.getErrorMessage());}/*** 判斷是否有response結(jié)果** @return 是否返回結(jié)果*/public boolean isDone() {return response != null;}//...將上述等待喚醒的代碼如下,只需調(diào)用下面的received方法并傳入相應(yīng)的MqttResponse結(jié)果即可,該方法會(huì)在后面講的判斷調(diào)用超時(shí)和正常mqtt消息結(jié)果返回的情況中調(diào)用。
@Slf4j @Data public class MqttFuture {//...private void doReceived(MqttResponse res) {lock.lock();try {response = res;done.signal();} finally {lock.unlock();}}public static void received(MqttResponse response) {MqttFuture future = FUTURES.remove(response.getMId());if (future != null) {future.doReceived(response);} else {log.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response);}}//...后臺(tái)會(huì)開(kāi)啟一個(gè)線程來(lái)掃描超時(shí)任務(wù),主要是遍歷上述存儲(chǔ)MqttFuture的Map,如果對(duì)應(yīng)的MqttFuture并沒(méi)有完成(獲得對(duì)應(yīng)的response)且調(diào)用時(shí)間已超過(guò)設(shè)置的超時(shí)時(shí)間,則設(shè)置一個(gè)超時(shí)異常的MqttResponse并調(diào)用MqttFuture.received(timeoutResponse);來(lái)喚醒上述代碼中的done.await(timeout, TimeUnit.MILLISECONDS);的線程等待。該線程每30s會(huì)遍歷一個(gè)存儲(chǔ)MqttFuture的Map。
@Slf4j @Data public class MqttFuture {//...private static class RemotingInvocationTimeoutScan implements Runnable {@Overridepublic void run() {while (true) {try {for (MqttFuture future : FUTURES.values()) {if (future == null || future.isDone()) {continue;}if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {//當(dāng)前mqtt請(qǐng)求已超時(shí)MqttResponse timeoutResponse = new MqttResponse(future.getMId());timeoutResponse.setMStatus(MqttResponse.TIMEOUT);MqttFuture.received(timeoutResponse);}}//每30ms掃一次Thread.sleep(30);} catch (Throwable e) {log.error("Exception when scan the timeout invocation of remoting.", e);}}}}static {Thread th = new Thread(new RemotingInvocationTimeoutScan(), "MqttResponseTimeoutScanTimer");th.setDaemon(true);th.start();}//...為了處理正常的mqtt的消息返回,除了使用上一節(jié)中講到的自定義注解MqttController來(lái)訂閱相關(guān)的Topic,還需要訂閱特殊的一個(gè)Topic來(lái)處理mqtt同步調(diào)用的返回消息,訂閱代碼如下,根據(jù)收到的mId和messageBody設(shè)置對(duì)應(yīng)的MqttResponse并調(diào)用MqttFuture.received(response);來(lái)喚醒上述代碼中的done.await(timeout, TimeUnit.MILLISECONDS);的線程等待。
//訂閱偽同步請(qǐng)求回復(fù) log.info("Add a new rocketMq subscription,topic:{}", replyParentTopic); mqConsumer.subscribe(replyParentTopic, "*", (message, context) -> {String mqttSecondTopic = message.getUserProperties(PropertyKeyConst.MqttSecondTopic);if (null == mqttSecondTopic) {mqttSecondTopic = "";}String mId = mqttSecondTopic.replaceAll("/", "");String messageBody = new String(message.getBody());MqttResponse response = new MqttResponse(mId);response.setMessageResult(messageBody);MqttFuture.received(response);return Action.CommitMessage; });五、demo使用
1. 項(xiàng)目結(jié)構(gòu)
由于使用了springboot框架來(lái)實(shí)現(xiàn)該demo,所以項(xiàng)目結(jié)構(gòu)如下:
demo項(xiàng)目結(jié)其中mqtt工具包目錄如下:
mqtt工具包結(jié)構(gòu)2. mqtt工具包的使用
2.1 在yml配置中添加相關(guān)配置
配置示例如下,其中xxx改為自己使用的即可
ali:mqtt:accessKey: xxxsecretKey: xxxgroupId: xxxnamesrvAddr: xxxsendMsgTimeoutMillis: 3000#消費(fèi)者線程固定位50個(gè)consumeThreadNums: 50 # 用于同步調(diào)用返回發(fā)送的topicreplyParentTopic: xxx2.2 添加工具包中的MqttConfig
@Import({ MqttConfig.class}) @Configuration public class MqttConfigure {}2.3 自定義注解的使用
@Slf4j @MqttController(parentTopic = "robot1") public class MqttRobot1 {@MqttTopicMappingpublic void dealFirstTopic() {log.info("MqttRobot1.dealAlarm 收到消息啦,只處理了一級(jí)topic");}@MqttTopicMapping(subTopic = "alarm")public void dealAlarm(@MqttMessageId String messageId, @MqttMessageBody AlarmVo alarmVo) {log.info("MqttRobot1.dealAlarm 收到消息啦");log.info("messageId:{}", messageId);log.info("alarmVo:{}", alarmVo);}@MqttTopicMapping(subTopic = "task")public void dealTask() {log.info("MqttRobot1.dealTask 收到消息啦");} }2.4 測(cè)試同步調(diào)用,模擬MQTT客戶(hù)端消息返回代碼
mqtt客戶(hù)端實(shí)現(xiàn)代碼示例參考阿里云官方demo https://github.com/AliwareMQ/lmq-demo
其中xxx的地方都改成自己的即可,下面代碼中mqttClient2.publish(replyTopic, message);即將結(jié)果發(fā)送到replyTopic中。
public class MqttClientTest {public static void main(String[] args) throws Exception {String instanceId = "xxx";String endPoint = "xxx";String accessKey = "xxx";String secretKey = "xxx";String clientId = "xxx";final String parentTopic = "xxx";//這邊需自定義mqtt客戶(hù)端topic,final String mq4IotTopic = parentTopic + "/" + "xxx" + "/xxx";final int qosLevel = 0;ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey,clientId);final MemoryPersistence memoryPersistence = new MemoryPersistence();final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);final MqttClient mqttClient2 = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);/*** 客戶(hù)端設(shè)置好發(fā)送超時(shí)時(shí)間,防止無(wú)限阻塞*/mqttClient.setTimeToWait(5000);final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());mqttClient.setCallback(new MqttCallbackExtended() {@Overridepublic void connectComplete(boolean reconnect, String serverURI) {/*** 客戶(hù)端連接成功后就需要盡快訂閱需要的 topic*/System.out.println("connect success");executorService.submit(new Runnable() {@Overridepublic void run() {try {final String topicFilter[] = { mq4IotTopic };final int[] qos = { qosLevel };mqttClient.subscribe(topicFilter, qos);} catch (MqttException e) {e.printStackTrace();}}});}@Overridepublic void connectionLost(Throwable throwable) {throwable.printStackTrace();}@Overridepublic void messageArrived(String s, MqttMessage mqttMessage) throws Exception {System.out.println("receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));JSONObject jsonObject = JSON.parseObject(new String(mqttMessage.getPayload()));String mId = jsonObject.getString("mId");String replyTopic = jsonObject.getString("replyTopic");String result = mId + "回復(fù)啦";MqttMessage message = new MqttMessage(result.getBytes());message.setQos(qosLevel);//這邊會(huì)將結(jié)果發(fā)送到replyTopic中mqttClient2.publish(replyTopic, message);System.out.println("發(fā)送啦");}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);}});mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());mqttClient2.connect(connectionOptionWrapper.getMqttConnectOptions());Thread.sleep(Long.MAX_VALUE);} }3. demo體驗(yàn)
啟動(dòng)項(xiàng)目可以在控制臺(tái)看到有如下日志:
MQTT處理Topic映射日志MQTT訂閱Topic日志從日志中可以看出程序自動(dòng)處理了自定義注解的Mqtt消息處理的映射,并根據(jù)mqtt的1級(jí)Topic進(jìn)行了RocketMQ的相關(guān)訂閱。
提供了一個(gè)測(cè)試MQTT消息簡(jiǎn)單發(fā)送接口如下:
@GetMapping("/publish") public String doPublish(@RequestParam("topic") String topic, @RequestParam("message") String message) {try {return mqttClient.publish(topic, message);} catch (ONSClientException e) {return e.getMessage();} }使用 http://localhost:8080/mqtt/publish?topic=robot1/alarm&message={id:"1",code:"heheh"}來(lái)調(diào)用該接口。
可以在控制臺(tái)看到如下日志打印:
MQTT訂閱的消息日志由于本demo中使用自定義注解訂閱了該Topic,所以調(diào)用該接口發(fā)送消息之后也會(huì)被本demo成功接收,并分發(fā)到對(duì)應(yīng)的處理函數(shù)中,因此調(diào)用了該接口后可以在控制臺(tái)看到如上日志打印,可以看到MQTT消息成功發(fā)布,訂閱到該消息并實(shí)現(xiàn)了MQTT消息的處理分發(fā)。
提供了一個(gè)測(cè)試MQTT同步調(diào)用的接口如下:
@GetMapping("/publish-sync")public String publishSync(@RequestParam("topic") String topic, @RequestParam("message") String message) {try {return mqttClient.publishSync(topic, message, 5000);} catch (MqttRemoteException e) {return e.getMessage();} }使用http://localhost:8080/mqtt/publish-sync?topic=robot1/alarm/GID_ROBOT@@@DEVICEID_001&message=hehehe來(lái)調(diào)用接口。
可以看到下圖的結(jié)果:
MQTT同步調(diào)用超時(shí)這是MQTT同步調(diào)用超時(shí)的情況,因?yàn)榇藭r(shí)還沒(méi)有開(kāi)啟對(duì)應(yīng)的MQTT客戶(hù)端,因此發(fā)送的MQTT消息并沒(méi)有客戶(hù)端進(jìn)行回應(yīng),所以出現(xiàn)了調(diào)用超時(shí)的情況,如果運(yùn)行上述模擬MQTT客戶(hù)端消息返回的代碼后再次調(diào)用該接口,可以看到同步調(diào)用成功返回了對(duì)應(yīng)的結(jié)果,如圖所示。
MQTT同步調(diào)用成功返回源碼地址如下,僅供學(xué)習(xí)參考
DavidDingXu/panda-mqtt?github.com六、參考
- 阿里云微消息隊(duì)列 MQTT幫助文檔
- 阿里云消息隊(duì)列 RocketMQ幫助文檔
本文原創(chuàng),歡迎轉(zhuǎn)載,轉(zhuǎn)載請(qǐng)注明出處,如有不正確的地方懇請(qǐng)各位看官指正。
總結(jié)
以上是生活随笔為你收集整理的mqtt 发送消息过多_阿里云MQTT服务端注解式消息处理分发与同步调用实践小结的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: ip地址转换pta题目_PTA「实验2-
- 下一篇: vcenter服务器修改ip,vcent