关于spring boot集成MQTT的一写新人问题
這幾天弄了下mqtt ,發(fā)現(xiàn)有很多問(wèn)題,網(wǎng)上搜不到什么解決辦法,所以自己記錄下來(lái),也讓初識(shí)mqtt的人少走一些坑,關(guān)于我寫的不對(duì)的也希望看到的人能指出來(lái)互相學(xué)習(xí)下
安裝
說(shuō)到mqtt,首先肯定要安裝了,安裝什么的地址:http://activemq.apache.org/ap...
我本地是Windows的環(huán)境,所以裝的是Windows版本,這里是第一個(gè)注意的地方,因?yàn)楹竺媸褂玫臅r(shí)候windows和linux的有一些不同
下載完成之后就是解壓安裝了,這里解壓完成之后進(jìn)入bin目錄下,自己用cmd或者直接進(jìn)去在此處打開命令窗口也行,然后運(yùn)行apollo.cmd 創(chuàng)建一個(gè)服務(wù)實(shí)例我的實(shí)例名稱是mybroker所以命令是 apollo.cmd create mybroker,這個(gè)名稱自己可以隨便指定
創(chuàng)建完實(shí)例后發(fā)現(xiàn)bin 目錄下多了一個(gè)文件夾,這個(gè)文件夾就是你實(shí)例名稱,進(jìn)入文件夾運(yùn)行
.apollo-broker.cmd run 命令
這樣就啟動(dòng)成功了
啟動(dòng)成功可以去http://localhost:61680/console/index.html看看,登錄賬號(hào)和密碼在mybrokeretcusers.properties文件中找到輸入就可以進(jìn)去了
頁(yè)面上有連接信息和訂閱主題的一些對(duì)應(yīng)信息,有興趣的自己看下,后面也會(huì)講到的
使用
安裝成功接下來(lái)就是使用了,首先創(chuàng)建一個(gè)maven工程,引入配置
<!--mqtt--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency>由于我們后面處理訂閱消息的消費(fèi)者打印的日志是用了slf4j為了方便也引入了lombok的配置 :
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>引入完成以后就可以開始準(zhǔn)備開始使用mqtt了
這里為了方便維護(hù)和配置我把一些配置參數(shù)放在了properties文件里面:
這里我遇到了一個(gè)坑,專門注釋了,就是訂閱端訂閱消息的id 和 發(fā)布端發(fā)布消息的id 一定不能一樣,這樣會(huì)導(dǎo)致mqtt識(shí)別到兩個(gè)一樣的id,消息一發(fā)就斷開連接了,訂閱端總是收不到消息,這個(gè)問(wèn)題我找了好長(zhǎng)時(shí)間都不知道問(wèn)題出在哪,剛接觸的很容易搞錯(cuò),第二個(gè)問(wèn)題就是mqtt的服務(wù)器連接地址,在Windows和linux下tcp的端口是不一樣的,在啟動(dòng)的apollo的日志中可以看出來(lái)
監(jiān)聽的tcp端口是61613,看別人很多的demo上都是1883,如果一直連不上,原因可能是因?yàn)檫@個(gè)
接下來(lái)就是spring.mqtt.default.topic 配置了,這個(gè)是mqtt訂閱和推送的消息主題,既然你想發(fā)消息那么訂閱消息的主題和發(fā)布消息的主題一致才能收到消息,和rabbitmq一樣
然后就是客戶端
@Configuration @IntegrationComponentScan @Slf4j public class MqttSenderConfig {@Value("${spring.mqtt.username}")private String username;@Value("${spring.mqtt.password}")private String password;@Value("${spring.mqtt.url}")private String hostUrl;@Value("${spring.mqtt.client.id}")private String clientId;@Value("${spring.mqtt.default.topic}")private String defaultTopic;@Beanpublic MqttConnectOptions getMqttConnectOptions(){MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();mqttConnectOptions.setUserName(username);mqttConnectOptions.setPassword(password.toCharArray());mqttConnectOptions.setServerURIs(new String[]{hostUrl});mqttConnectOptions.setKeepAliveInterval(2);return mqttConnectOptions;}@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(getMqttConnectOptions());return factory;}@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());messageHandler.setAsync(true);messageHandler.setDefaultTopic(defaultTopic);return messageHandler;}@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}}這里有點(diǎn)問(wèn)題,如果你是復(fù)制我的代碼的話MessageHandler 這個(gè)類是沒(méi)有的需要自己手動(dòng)導(dǎo)包,看了源碼發(fā)現(xiàn)這里需要的是一個(gè)消息處理的handler需要是org.springframework.messaging.MessageHandler的實(shí)現(xiàn),直接導(dǎo)入這個(gè)包就行了
@Component @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MsgWriter {void sendToMqtt(String data);void sendToMqtt(String payload,@Header(MqttHeaders.TOPIC) String topic);void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);}這個(gè)是消息發(fā)送接口,需要發(fā)送消息的時(shí)候直接調(diào)用就行了,提供了幾個(gè)重載方法payload或者data是發(fā)送消息的內(nèi)容
topic是消息發(fā)送的主題,這里可以自己靈活定義,也可以使用默認(rèn)的主題,就是配置文件的主題,qos是mqtt 對(duì)消息處理的幾種機(jī)制分為0,1,2 其中0表示的是訂閱者沒(méi)收到消息不會(huì)再次發(fā)送,消息會(huì)丟失,1表示的是會(huì)嘗試重試,一直到接收到消息,但這種情況可能導(dǎo)致訂閱者收到多次重復(fù)消息,2相比多了一次去重的動(dòng)作,確保訂閱者收到的消息有一次
當(dāng)然,這三種模式下的性能肯定也不一樣,qos=0是最好的,2是最差的 ,有興趣的可以去詳細(xì)了解我在這不多贅述
上面就完成了消息的發(fā)送,可以去http://localhost:61680/console/index.html看看消息的記錄,這里我寫了一個(gè)接口調(diào)用sendToMqtt方法發(fā)送一條消息
會(huì)看到收到有兩個(gè)主題,我的是因?yàn)槲矣嗛喠藘蓚€(gè)主題所以上面顯示的是兩個(gè),我的剛才發(fā)布消息的主題是too所以打開會(huì)看到too有消息送達(dá)過(guò)來(lái)
如果你還沒(méi)寫訂閱方的話consumers是沒(méi)有的,現(xiàn)在顯示我發(fā)了7條消息,證明發(fā)送成功了
接下來(lái)就是訂閱方,為了方便我就直接寫在啟動(dòng)類上了,沒(méi)有用到所有的配置
@SpringBootApplication @EnableAutoConfiguration public class MytestApplication {public static void main(String[] args) {SpringApplication.run(MytestApplication.class, args);}@Value("${spring.mqtt.server.id}")private String serverId;@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setServerURIs("tcp://localhost:61613");factory.setUserName("admin");factory.setPassword("password");return factory;}// consumer 訂閱者監(jiān)聽消息@Beanpublic IntegrationFlow mqttInFlow() {return IntegrationFlows.from(mqttInbound()).transform(p -> p + ", received from MQTT").handle(logger()).get();}private LoggingHandler logger() {LoggingHandler loggingHandler = new LoggingHandler("INFO");loggingHandler.setLoggerName("siSample");return loggingHandler;}@Beanpublic MessageProducerSupport mqttInbound() {MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(serverId,mqttClientFactory(), "too");adapter.setCompletionTimeout(5000);adapter.setConverter(new DefaultPahoMessageConverter());adapter.setQos(1);return adapter;}}這里訂閱的主題可以指定,我訂閱的是剛才發(fā)的too主題,還有訂閱方的id 別和發(fā)送方的id 一樣
重新啟動(dòng)項(xiàng)目,發(fā)送消息,會(huì)發(fā)現(xiàn)控制臺(tái)已經(jīng)打印出消息
代表訂閱方已經(jīng)成功收到消息,同時(shí)
也顯示消息訂閱方和記錄,至此一個(gè)完整的消息發(fā)送和訂閱完成,比較簡(jiǎn)單,但是一不留神很容易出現(xiàn)問(wèn)題,希望能幫助到新入門的人
總結(jié)
以上是生活随笔為你收集整理的关于spring boot集成MQTT的一写新人问题的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Windows Api如何创建一个快捷方
- 下一篇: 深入理解springMVC