mica-mqtt 1.0.2 发布,完善 stater 和 example
一、簡介
mica mqtt
mica-mqtt基于t-io實現(xiàn)的簡單、低延遲、高性能的 mqtt 物聯(lián)網(wǎng)開源組件。使用詳見 mica-mqtt gitee 源碼mica-mqtt-example模塊。
mica-mqtt: 基于 t-io 實現(xiàn)的低延遲、高性能的 mqtt 組件。 記得右上角點個star 關(guān)注更新!
二、功能
支持 MQTT v3.1、v3.1.1 以及 v5.0 協(xié)議。
支持 MQTT client 客戶端。
支持 MQTT server 服務(wù)端。
支持 MQTT 遺囑消息。
支持 MQTT 保留消息。
支持自定義消息(mq)處理轉(zhuǎn)發(fā)實現(xiàn)集群。
MQTT 客戶端 阿里云 mqtt 連接 demo。
支持 GraalVM 編譯成本機(jī)可執(zhí)行程序。
支持 Spring boot 項目快速接入(mica-mqtt-spring-boot-starter)。
mica-mqtt-spring-boot-starter 支持對接 Prometheus + Grafana。
三、待辦
添加 websocket 支持(已預(yù)研成功)。
優(yōu)化處理 mqtt session,以及支持 v5.0
四、更新記錄
文檔添加集群處理步驟說明,添加遺囑消息、保留消息的使用場景。
✨ 去除演示中的 qos2 參數(shù),性能損耗大避免誤用。
✨ 遺囑、保留消息內(nèi)部消息轉(zhuǎn)發(fā)抽象。
✨ 添加 mica-mqtt-spring-boot-example 。感謝 wsq( @冷月宮主 )pr。
✨ mica-mqtt-spring-boot-starter 支持客戶端接入和服務(wù)端優(yōu)化。感謝 wsq( @冷月宮主 )pr。
✨ mica-mqtt-spring-boot-starter 服務(wù)端支持指標(biāo)收集??蓪?Prometheus + Grafana 監(jiān)控。
✨ mqtt server 接受連接時,先判斷該 clientId 是否存在其它連接,有則解綁并關(guān)閉其他連接。
⬆️ 升級 mica-auto 到 2.1.3 修復(fù) ide 多模塊增量編譯問題。
五、Spring boot 快速接入
5.1 添加依賴
<dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-mqtt-spring-boot-starter</artifactId>
<version>1.0.2</version>
</dependency>
5.2 服務(wù)端 yml 配置
mqtt:
server:
enabled: true # 是否開啟,默認(rèn):true
ip: 127.0.0.1 # 服務(wù)端 ip 默認(rèn):127.0.0.1
port: 5883 # 端口,默認(rèn):1883
name: Mica-Mqtt-Server # 名稱,默認(rèn):Mica-Mqtt-Server
buffer-allocator: HEAP # 堆內(nèi)存和堆外內(nèi)存,默認(rèn):堆內(nèi)存
heartbeat-timeout: 120000 # 心跳超時,單位毫秒,默認(rèn): 1000 * 120
read-buffer-size: 8092 # 接收數(shù)據(jù)的 buffer size,默認(rèn):8092
max-bytes-in-message: 8092 # 消息解析最大 bytes 長度,默認(rèn):8092
debug: true # 如果開啟 prometheus 指標(biāo)收集建議關(guān)閉
5.3 服務(wù)端可實現(xiàn)接口(注冊成 Spring Bean 即可)
|
接口 |
是否必須 |
說明 |
|
IMqttServerAuthHandler |
是 |
用于客戶端認(rèn)證 |
|
IMqttMessageListener |
是 |
消息監(jiān)聽 |
|
IMqttConnectStatusListener |
是 |
連接狀態(tài)監(jiān)聽 |
|
IMqttSessionManager |
否 |
session 管理 |
|
IMqttMessageStore |
集群是,單機(jī)否 |
遺囑和保留消息存儲 |
|
AbstractMqttMessageDispatcher |
集群是,單機(jī)否 |
消息轉(zhuǎn)發(fā),(遺囑、保留消息轉(zhuǎn)發(fā)) |
|
IpStatListener |
否 |
t-io ip 狀態(tài)監(jiān)聽 |
5.4 服務(wù)端自定義配置(可選)
@Configuration(proxyBeanMethods = false)
public class MqttServerCustomizerConfiguration {
@Bean
public MqttServerCustomizer activeRecordPluginCustomizer() {
return new MqttServerCustomizer() {
@Override
public void customize(MqttServerCreator creator) {
// 此處可自定義配置 creator,會覆蓋 yml 中的配置
System.out.println("----------------MqttServerCustomizer-----------------");
}
};
}
}
5.5 MqttServerTemplate 使用示例
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.ByteBuffer;
/**
* @author wsq
*/
@Service
public class ServerService {
@Autowired
private MqttServerTemplate server;
public boolean publish(String body) {
server.publishAll("/test/123", ByteBuffer.wrap(body.getBytes()));
return true;
}
}
5.6 基于 mq 消息廣播集群處理
實現(xiàn)IMqttConnectStatusListener處理設(shè)備狀態(tài)存儲。
實現(xiàn)IMqttMessageListener將消息轉(zhuǎn)發(fā)到 mq,業(yè)務(wù)按需處理 mq 消息。
實現(xiàn)IMqttMessageStore存儲遺囑和保留消息。
實現(xiàn)AbstractMqttMessageDispatcher將消息發(fā)往 mq,mq 再廣播回 mqtt 集群,mqtt 將消息發(fā)送到設(shè)備。
業(yè)務(wù)消息發(fā)送到 mq,mq 廣播到 mqtt 集群,mqtt 將消息發(fā)送到設(shè)備。
5.7 Prometheus + Grafana 監(jiān)控對接
得益于t-io良好的設(shè)計,監(jiān)控指標(biāo)直接對接的t-iostat,目前支持下列指標(biāo),后期會不斷完善。
|
支持得指標(biāo) |
說明 |
|
mqtt_connections_accepted |
共接受過連接數(shù) |
|
mqtt_connections_closed |
關(guān)閉過的連接數(shù) |
|
mqtt_connections_size |
當(dāng)前連接數(shù) |
|
mqtt_messages_handled_packets |
已處理消息數(shù) |
|
mqtt_messages_handled_bytes |
已處理消息字節(jié)數(shù) |
|
mqtt_messages_received_packets |
已接收消息數(shù) |
|
mqtt_messages_received_bytes |
已處理消息字節(jié)數(shù) |
|
mqtt_messages_send_packets |
已發(fā)送消息數(shù) |
|
mqtt_messages_send_bytes |
已發(fā)送消息字節(jié)數(shù) |
關(guān)于
mica-mqtt-spring-boot-starter客戶端等更多使用方式請查看 gitee
mica-mqtt-spring-boot-starter 模塊 readme 文檔。
六、普通 java 項目接入
6.1 maven 依賴
<dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-mqtt-core</artifactId>
<version>1.0.2</version>
</dependency>
6.2 mica-mqtt 客戶端
// 初始化 mqtt 客戶端
MqttClient client = MqttClient.create()
.ip("127.0.0.1")
.port(1883) // 默認(rèn):1883
.username("admin")
.password("123456")
.version(MqttVersion.MQTT_5) // 默認(rèn):3_1_1
.clientId("xxxxxx") // 默認(rèn):MICA-MQTT- 前綴和 36進(jìn)制的納秒數(shù)
.connect(); // 連接
// 消息訂閱,同類方法 subxxx
client.subQos0("/test/#", (topic, payload) -> {
logger.info(topic + ' ' + ByteBufferUtil.toString(payload));
});
// 取消訂閱
client.unSubscribe("/test/#");
// 發(fā)送消息
client.publish("/test/client", ByteBuffer.wrap("mica最牛皮".getBytes(StandardCharsets.UTF_8)));
// 斷開連接
client.disconnect();
// 重連
client.reconnect();
// 停止
client.stop();
6.3 mica-mqtt 服務(wù)端
// 注意:為了能接受更多鏈接(降低內(nèi)存),請?zhí)砑?jvm 參數(shù) -Xss129k
MqttServer mqttServer = MqttServer.create()
// 默認(rèn):127.0.0.1
.ip("127.0.0.1")
// 默認(rèn):1883
.port(1883)
// 默認(rèn)為: 8092(mqtt 默認(rèn)最大消息大?。瑸榱私档蛢?nèi)存可以減小小此參數(shù),如果消息過大 t-io 會嘗試解析多次(建議根據(jù)實際業(yè)務(wù)情況而定)
.readBufferSize(512)
// 自定義認(rèn)證
.authHandler((clientId, userName, password) -> true)
// 消息監(jiān)聽
.messageListener((clientId, topic, mqttQoS, payload) -> {
logger.info("clientId:{} topic:{} mqttQoS:{} message:{}", clientId, topic, mqttQoS, ByteBufferUtil.toString(payload));
})
// ssl 配置
.useSsl("", "", "")
// 自定義客戶端上下線監(jiān)聽
.connectStatusListener(new IMqttConnectStatusListener() {
@Override
public void online(String clientId) {
}
@Override
public void offline(String clientId) {
}
})
// 自定義消息轉(zhuǎn)發(fā),可用 mq 廣播實現(xiàn)集群化處理
.messageDispatcher(new IMqttMessageDispatcher() {
@Override
public void config(MqttServer mqttServer) {
}
@Override
public boolean send(Message message) {
return false;
}
@Override
public boolean send(String clientId, Message message) {
return false;
}
})
.debug() // 開啟 t-io debug 信息日志
.start();
// 發(fā)送給某個客戶端
mqttServer.publish("clientId","/test/123", ByteBuffer.wrap("mica最牛皮".getBytes()));
// 發(fā)送給所有在線監(jiān)聽這個 topic 的客戶端
mqttServer.publishAll("/test/123", ByteBuffer.wrap("mica最牛皮".getBytes()));
// 停止服務(wù)
mqttServer.stop();
七、效果演示
mica-mqtt-example 效果演示
JUST DO IT!
總結(jié)
以上是生活随笔為你收集整理的mica-mqtt 1.0.2 发布,完善 stater 和 example的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 纯CSS3实现Material Desi
- 下一篇: vnetclient.exe是什么