Mqtt 客户端 java API 教程
本文介紹MQTT消息,使用Eclipse Paho 庫作為MQTT java客戶端發送、接收消息。
MQTT 介紹
MQTT (MQ Telemetry Transport) 是一種消息協議,用于解決需要簡單、輕量方法在低能耗設備間傳輸數據,如在工業領域。隨著物聯網(IoT)設備的日益普及,MQTT的使用也越來越多,以致于OASIS宣布將MQTT(消息隊列遙測傳輸)作為新興的物聯網消息傳遞協議的首選標準。
該協議支持單一消息傳遞模式:發布-訂閱模式??蛻舳税l送的每個消息都包含一個關聯的“主題”,消息服務器使用該主題將消息路由到訂閱的客戶端。主題名稱可以是簡單的字符串,如“oiltemp”或類似路徑的字符串“motor/1/rpm”。
消費者為了接收消息,需用其明確的主題名稱或包含支持通配符的字符串訂閱一個或多個主題(“#”表示多級主題,“+”表示單級主題)。
依賴庫
需要Paho 庫的 Maven 依賴:
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.0</version> </dependency>準備客戶端
要使用Paho 庫,手續需要實現IMattClient 接口,用于從MQTT服務端接收或發送消息。該接口包括所有方法,如建立連接、發送或接收消息等。
Paho 默認提供了兩個IMattClient 接口的實現,一個異步客戶端MqttAsyncClient、一個同步MqttClient。本文聚焦同步版本,它的語義相對簡單。準備客戶端需要兩個步驟,第一實例化MqttClient類,第二連接至服務器。下面詳細說明。
創建MqttClient實例
下面代碼片段顯示如何創建IMqttClient同步實例:
String publisherId = UUID.randomUUID().toString(); IMqttClient publisher = new MqttClient("tcp://iot.eclipse.org:1883",publisherId);上面使用最簡單的構造函數,一個服務端地址參數,另一個客戶端表示(需唯一)。這里使用UUID確保每個客戶端不重復,實際應用中ID命名應該有一定意義。
Paho還提供了其他的構造函數,我們可以使用它們來定制用于存儲未確認消息的持久性機制和/或用于運行協議引擎實現所需的后臺任務的ScheduledExecutorService。代碼中的服務器地址是Paho項目托管的公共MQTT代理,它允許任何有互聯網連接的人測試客戶機,而不需要任何身份驗證。
連接MQTT服務器
前面定義的MqttClient 實例并沒有連接至服務器,我們需要調用connect方法,可以傳入MqttConnectOptions實例作為參數,指定協議的選項,如指定用戶明和密碼、session恢復模式,重連接等。連接代碼如下:
public class MqttUtils {String publisherId = "t001";String url = "tcp://localhost:1883";IMqttClient mqttClient;public MqttUtils() {MqttConnectOptions options = new MqttConnectOptions();options.setAutomaticReconnect(true);options.setCleanSession(true);options.setConnectionTimeout(10);options.setUserName("admin");options.setPassword("a123".toCharArray());try{mqttClient = new MqttClient(url, this.publisherId);mqttClient.connect(options);}catch (Exception e){e.printStackTrace();}} }上面連接選項解釋如下:
- 客戶端在遇到網絡問題時會自動重新連接
- 丟棄上一次運行中未發送的消息
- 連接超時設置為10秒
發送消息
使用已經連接的MqttClient發送消息非常簡單。我們使用publish()方法的一個變體將有效負載(總是一個字節數組)發送到給定的主題,使用以下服務質量選項之一:
- 0 -“最多一次”語義,也稱為“發了就忘了”。當可以接受消息丟失時使用此選項,因為它不需要任何形式的確認或持久性
- 1 -“至少一次”語義。當消息丟失不可接受且您的訂閱者可以處理副本時,請使用此選項
- 2 -“恰好一次”語義。當消息丟失不可接受且訂閱者無法處理副本時,請使用此選項
在我們的示例中,EngineTemperatureSensor 類扮演模擬傳感器的角色,每當我們調用它的call()方法時,它都會產生一個新的溫度讀數。
這個類實現了Callable接口,所以我們可以很容易地將它與java.util.concurrent包中可用的ExecutorService實現類一起使用:
MqttMessage封裝有效負載(消息體)、請求的服務質量以及為消息保留的標志。此標志指示代理應該保留此消息,直到訂閱者使用該消息為止。利用該特性可以實現當新的訂閱者連接至服務器時(可能是同一客戶端斷開了連接),會立刻接收到保留消息。
接收消息
為了接收服務器消息,需要使用subscribe()方法,可以指定下列參數:
- 一個或多個主題過濾器
- 服務質量QoS
- 回調方法用于處理接收消息
在下面的示例中,我們將展示如何向現有的IMqttClient實例添加消息偵聽器,以接收來自給定主題的消息。我們使用CountDownLatch作為回調和主執行線程之間的同步機制,每當有新消息到達時,就遞減它。
在示例代碼中使用了不同的IMqttClient實例來接收消息,這樣做只是為了更清楚哪個客戶端做什么。這不是Paho的限制-如果你愿意,你可以使用同一客戶端來發布和接收消息:
CountDownLatch receivedSignal = new CountDownLatch(10); subscriber.subscribe(EngineTemperatureSensor.TOPIC, (topic, msg) -> {byte[] payload = msg.getPayload();// ... payload handling omittedreceivedSignal.countDown(); }); receivedSignal.await(1, TimeUnit.MINUTES);上面調用subscribe()方法的subscriber變量將IMqttMessageListener實例作為它的第二個參數。我們使用簡單的lambda函數來處理有效負載并減少計數器。如果在指定的時間窗口(1分鐘)內沒有足夠的消息到達,await()方法將拋出異常。
在使用Paho時,我們不需要顯式地確認收到消息。如果回調正常返回,Paho假定它是成功的消費,并向服務器發送一個確認。
如果回調拋出異常,則客戶端將被關閉。請注意,這將導致在QoS級別為0時發送的任何消息丟失。當客戶端重新連接并再次訂閱主題時,以QoS級別1或2發送的消息將被服務器重發。
完整的測試接收代碼如下:
@Test public void whenSendMultipleMessages_thenSuccess() throws Exception {String publisherId = UUID.randomUUID().toString();MqttClient publisher = new MqttClient("tcp://iot.eclipse.org:1883",publisherId);String subscriberId = UUID.randomUUID().toString();MqttClient subscriber = new MqttClient("tcp://iot.eclipse.org:1883",subscriberId);MqttConnectOptions options = new MqttConnectOptions();options.setAutomaticReconnect(true);options.setCleanSession(true);options.setConnectionTimeout(10);publisher.connect(options); subscriber.connect(options);CountDownLatch receivedSignal = new CountDownLatch(10);subscriber.subscribe(EngineTemperatureSensor.TOPIC, (topic, msg) -> {byte[] payload = msg.getPayload();log.info("[I82] Message received: topic={}, payload={}", topic, new String(payload));receivedSignal.countDown();});Callable<Void> target = new EngineTemperatureSensor(publisher);ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();executor.scheduleAtFixedRate(() -> {try {target.call();}catch(Exception ex) {throw new RuntimeException(ex);}}, 1, 1, TimeUnit.SECONDS);receivedSignal.await(1, TimeUnit.MINUTES);executor.shutdown();assertTrue(receivedSignal.getCount() == 0 , "Countdown should be zero");log.info("[I105] Success !"); }總結
在本文中,我們演示了如何使用Eclipse Paho提供的庫在Java應用程序中添加對MQTT協議的支持。該庫處理所有低級協議細節,讓我們專注于解決方案的業務方面,同時留出良好的空間來定制其內部特性,例如消息持久性。
總結
以上是生活随笔為你收集整理的Mqtt 客户端 java API 教程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java整人代码大全_整人代码大全.do
- 下一篇: 李宏毅机器学习Homework1(代码简