生活随笔
收集整理的這篇文章主要介紹了
Eclipse Paho MQTT客户端Java源码分析
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
Eclipse Paho MQTT客戶端Java源碼分析
- 一、如何創建MQTT客戶端
- 1.1 定義連接配置
- 1.2 設置回調
- 1.3 開啟連接、訂閱及推送
- 1.4 MQTT消息類型
- 二、到底誰在干活
- 2.1 從connect說起
- 2.2 接著run
- 2.3 執行
- 三、安全機制
- 四、封裝成工具類
一、如何創建MQTT客戶端
就像搭積木一樣創建客戶端
1.1 定義連接配置
負責類:MqttConnectOptions
職責:設置連接的用戶名、密碼、心跳、超時、重連等參數
源代碼:
public static MqttConnectOptions getMqttConnectOptions(String userName
,String password
) {MqttConnectOptions options
= new MqttConnectOptions();options
.setCleanSession(true);options
.setUserName(userName
);options
.setPassword(password
.toCharArray());options
.setConnectionTimeout(10);options
.setKeepAliveInterval(20);options
.setAutomaticReconnect(true);options
.setWill("willTopic", WILL_DATA
, 2, false);return options
;}
1.2 設置回調
負責類:MqttCallbackExtended
職責:通過回調監聽數據通道的不同事件,包含連接成功、連接丟失、發送成功、收到數據
源代碼:
@Slf4jprivate static class BtcMqttCallback implements MqttCallbackExtended{
@Overridepublic void connectionLost(Throwable cause
) {if (cause
!= null){log
.error("連接丟失",cause
);}}public void deliveryComplete(IMqttDeliveryToken token
) {try {token
.waitForCompletion();log
.info("發送MQTT數據成功,消息是{}: {} : {}",token
.getTopics(),token
.getMessage().getQos(),token
.getMessage());} catch (MqttException e
) {e
.printStackTrace();}}public void messageArrived(String topic
, MqttMessage message
) {String msg
= new String(message
.getPayload(), Charset.forName("UTF-8"));System.out
.println("messageArrived() topic:" + topic
);System.out
.println(msg
);}@Overridepublic void connectComplete(boolean reconnect
, String serverURI
) {log
.info("連接成功: {}, 服務器地址是: {}",reconnect
, serverURI
);}}
1.3 開啟連接、訂閱及推送
負責類:MqttClient
職責:負責通道的建立、數據的訂閱以及數據的推送
源代碼:
public static MqttClient createNewMqttClient(String url
,String clientId
,String password
){MqttConnectOptions options
= getMqttConnectOptions("root",password
);MqttClient mqttClient
= null;try {mqttClient
= new MqttClient(url
,clientId
);mqttClient
.setCallback(new BtcMqttCallback());mqttClient
.connect(options
);} catch (MqttException e
) {e
.printStackTrace();}return mqttClient
;}
1.4 MQTT消息類型
負責類:MqttWireMessage
職責:MQTT消息實體類,種類共計0x0F種
源代碼:
public static final byte MESSAGE_TYPE_CONNECT
= 1;
public static final byte MESSAGE_TYPE_CONNACK
= 2;
public static final byte MESSAGE_TYPE_PUBLISH
= 3;
public static final byte MESSAGE_TYPE_PUBACK
= 4;
public static final byte MESSAGE_TYPE_PUBREC
= 5;
public static final byte MESSAGE_TYPE_PUBREL
= 6;
public static final byte MESSAGE_TYPE_PUBCOMP
= 7;
public static final byte MESSAGE_TYPE_SUBSCRIBE
= 8;
public static final byte MESSAGE_TYPE_SUBACK
= 9;
public static final byte MESSAGE_TYPE_UNSUBSCRIBE
= 10;
public static final byte MESSAGE_TYPE_UNSUBACK
= 11;
public static final byte MESSAGE_TYPE_PINGREQ
= 12;
public static final byte MESSAGE_TYPE_PINGRESP
= 13;
public static final byte MESSAGE_TYPE_DISCONNECT
= 14;
private static final String PACKET_NAMES
[] = { "reserved", "CONNECT", "CONNACK", "PUBLISH","PUBACK", "PUBREC", "PUBREL", "PUBCOMP", "SUBSCRIBE", "SUBACK","UNSUBSCRIBE", "UNSUBACK", "PINGREQ", "PINGRESP", "DISCONNECT" };
對應的MQTT消息類型為:
二、到底誰在干活
2.1 從connect說起
從源碼來看,最后負責通信的職責類是ClientComms,該類就是抽象的數據通道,從MqttClient.connect()出發,一步一步進入到ClientComms。
該類ConnectBG是ClientComms私有類,實現了Runnable接口,主要工作都在熟知的run方法里。
2.2 接著run
進入到ConnectBG的run方法里可以看到,網絡模塊以及MQTT數據的接收、發送和事件回調分別起了一個任務,都提交到默認為10個線程的線程池ExecutorService類中執行。由ConnectBG類名可知,將這些事件任務都放到后臺執行,防止阻塞主線程,如socket創建就很費時。
receiver
= new CommsReceiver(clientComms
, clientState
, tokenStore
, networkModule
.getInputStream());
receiver
.start("MQTT Rec: "+getClient().getClientId(), executorService
);
sender
= new CommsSender(clientComms
, clientState
, tokenStore
, networkModule
.getOutputStream());
sender
.start("MQTT Snd: "+getClient().getClientId(), executorService
);
callback
.start("MQTT Call: "+getClient().getClientId(), executorService
);
2.3 執行
通過回調方式對通信事件處理,底層執行類是CommsCallback,直接看其run方法中的邏輯。
主要是看其中的handleActionComplete(MqttToken token)方法,進而進入ireActionEvent(token)方法里。
if ( mqttCallback
!= null && token
instanceof MqttDeliveryToken && token
.isComplete()) {mqttCallback
.deliveryComplete((MqttDeliveryToken) token
);}fireActionEvent(token
);
我們會看到當數據發送事件成功時,會觸發該事件的回調執行并攜帶執行結果的狀態IMqttToken。
三、安全機制
3.1 重連機制
MqttClient的重連采用退避的方式每次重連的時間都會加倍,最初會等待 1 秒,對于每次失敗的重新連接嘗試,延遲將加倍,直到最大值 2 分鐘。
3.2 心跳機制
3.3 超時機制
四、封裝成工具類
思路是:
每次收到客戶端上傳到數據中心的消息,就創建一個MqttClient對象,并將其添加到并發列表Vector中,通過對客戶端連接狀態的判斷,進行數據的處理。
總結
以上是生活随笔為你收集整理的Eclipse Paho MQTT客户端Java源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。