MQTT协议笔记之mqtt.io项目TCP协议支持
前言
MQTT定義了物聯網傳輸協議,其標準傾向于原始TCP實現。構建于TCP的上層協議堆棧,諸如HTTP等,在空間上多了一些處理路徑,稍微耗費了CPU和內存,雖看似微乎其微,但對很多處理能力不足的嵌入式設備而言,選擇原始的TCP卻是最好的選擇。
但單純TCP不是所有物件聯網的最佳選擇,提供構建與TCP基礎之上的傳統的HTTP通信支持,尤其是瀏覽器、性能富裕的桌面涉及領域,還是企業最 可信賴、最可控的傳輸方式之一。支持多種多樣的連接通道,讓目前所有一切皆可聯網,除了原始TCP Socket,還要支持構建于其之上的HTTP、HTML5 Websocket,就很有必要。
mqtt.io,Pub/Sub中間件,也可以稱之為推送服務器,涵蓋所有主流桌面系統、瀏覽器平臺,并且傾斜 于移動互聯網,以及物聯網的廣闊適應天地。使用一句英文概括可能更為合適:"Make everything connect”,讓所有物件都可連接。其業務目標,可用下圖概括:
mqtt.io致力于做下一代支持所有主流桌面平臺、所有主流瀏覽器、所有可聯網物件都可以聯網的PUB/SUB消息推送系統。
構建此系統,在于降低傳統企業各自分散的推送系統,統一運營,統一管理,節省人員、運維開支。
注意事項
依賴
數據流轉
解碼器
用于轉換二進制流到JAVA對象的過程:
| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051 | package io.mqtt.handler.coder; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; import java.io.ByteArrayInputStream; import java.util.List; import org.meqantt.message.Message; import org.meqantt.message.MessageInputStream; public class MqttMessageNewDecoder extends MessageToMessageDecoder<ByteBuf> { @Override public void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception { if (buf.readableBytes() < 2) { return; } buf.markReaderIndex(); buf.readByte(); // read away header int msgLength = 0; int multiplier = 1; int digit; int lengthSize = 0; do { lengthSize++; digit = buf.readByte(); msgLength += (digit & 0x7f) * multiplier; multiplier *= 128; if ((digit & 0x80) > 0 && !buf.isReadable()) { buf.resetReaderIndex(); return; } } while ((digit & 0x80) > 0); if (buf.readableBytes() < msgLength) { buf.resetReaderIndex(); return; } byte[] data = new byte[1 + lengthSize + msgLength]; buf.resetReaderIndex(); buf.readBytes(data); MessageInputStream mis = new MessageInputStream( new ByteArrayInputStream(data)); Message msg = mis.readMessage(); mis.close(); out.add(msg); } } |
?
編碼器
對所有要寫入網卡緩沖區的JAVA對象轉換成二進制:
| 12345678910111213141516171819202122232425 | package io.mqtt.handler.coder; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageEncoder; import java.util.List; import org.meqantt.message.Message; @Sharable public class MqttMessageNewEncoder extends MessageToMessageEncoder<Object> { @Override protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception { if (!(msg instanceof Message)) { return; } byte[] data = ((Message) msg).toBytes(); out.add(Unpooled.wrappedBuffer(data)); } } |
?
借助于mqtt-library項目,編解碼不復雜。
MQTT的消息處理
?
| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 | package io.mqtt.handler; import io.mqtt.processer.ConnectProcesser; import io.mqtt.processer.DisConnectProcesser; import io.mqtt.processer.PingReqProcesser; import io.mqtt.processer.Processer; import io.mqtt.processer.PublishProcesser; import io.mqtt.processer.SubscribeProcesser; import io.mqtt.processer.UnsubscribeProcesser; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.ReadTimeoutException; import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.meqantt.message.ConnAckMessage; import org.meqantt.message.ConnAckMessage.ConnectionStatus; import org.meqantt.message.DisconnectMessage; import org.meqantt.message.Message; import org.meqantt.message.Message.Type; import org.meqantt.message.PingRespMessage; public class MqttMessageHandler extends ChannelInboundHandlerAdapter { private static PingRespMessage PINGRESP = new PingRespMessage(); private static final Map<Message.Type, Processer> processers; static { Map<Message.Type, Processer> map = new HashMap<Message.Type, Processer>( 6); map.put(Type.CONNECT, new ConnectProcesser()); map.put(Type.PUBLISH, new PublishProcesser()); map.put(Type.SUBSCRIBE, new SubscribeProcesser()); map.put(Type.UNSUBSCRIBE, new UnsubscribeProcesser()); map.put(Type.PINGREQ, new PingReqProcesser()); map.put(Type.DISCONNECT, new DisConnectProcesser()); processers = Collections.unmodifiableMap(map); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) throws Exception { try { if (e.getCause() instanceof ReadTimeoutException) { ctx.write(PINGRESP).addListener( ChannelFutureListener.CLOSE_ON_FAILURE); } else { ctx.channel().close(); } } catch (Throwable t) { t.printStackTrace(); ctx.channel().close(); } e.printStackTrace(); } @Override public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception { Message msg = (Message) obj; Processer p = processers.get(msg.getType()); if (p == null) { return; } Message rmsg = p.proc(msg, ctx); if (rmsg == null) { return; } if (rmsg instanceof ConnAckMessage && ((ConnAckMessage) rmsg).getStatus() != ConnectionStatus.ACCEPTED) { ctx.write(rmsg).addListener(ChannelFutureListener.CLOSE); } else if (rmsg instanceof DisconnectMessage) { ctx.write(rmsg).addListener(ChannelFutureListener.CLOSE); } else { ctx.write(rmsg).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } } |
?
更具體的可以查看項目。
小結
簡單介紹了一個簡單的不能再簡單的MQTT Server,只具有最基本的QoS 0類型的消息訂閱等。
后面,對HTML 5 Websocket,會在現有基礎代碼之上,不做多大改動,增加對MQTT Over WebSocket的支持。
總結
以上是生活随笔為你收集整理的MQTT协议笔记之mqtt.io项目TCP协议支持的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: MyEclipse移动开发教程:移动We
- 下一篇: jquery easyui datagr