日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

使用 Python MQTT 客户端 Paho-MQTT 的初学者指南

發布時間:2023/12/14 python 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用 Python MQTT 客户端 Paho-MQTT 的初学者指南 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Paho-MQTT是由Eclipse基金會開發的開源Python MQTT客戶端。Paho-MQTT可以在任何支持Python的設備上運行。在本教程中,我們將使用 Paho 構建一個 MQTT 客戶端。我將把庫的每個功能添加到客戶端程序中,并解釋它是如何工作的。在本教程結束時,您將對庫的工作原理有一個基本的了解。

如果您不熟悉 MQTT,最好先學習我的上一篇《MQTT基礎知識及工作原理》

0. 安裝 Python MQTT 客戶端 Paho-MQTT

Paho MQTT 需要 Python 版本 3.4+。要進行安裝,請打開主機或終端,然后輸入:

pip install paho-mqtt

1. 建立與 MQTT 代理的連接

本教程將使用 Eclipse 提供的公共 MQTT 代理。

警告: 不要在生產環境中使用公有 MQTT 代理。

要在PC上運行代理,您可以安裝mosquitto:
在Windows和Ubuntu/Debian上安裝Mosquitto

建立與 MQTT 代理的連接

import paho.mqtt.client as mqtt broker_url = "mqtt.eclipse.org" broker_port = 1883 client = mqtt.Client() client.connect(broker_url, broker_port)

這是怎么回事?

  • 我們導入 paho 庫,并將代理地址設置為 ,將端口號設置為 。iot.eclipse.org1883
  • 1883是 MQTT 中所有未加密連接的缺省端口號。

  • 我們創建一個 MQTT 客戶端對象并調用它。我們將在下一節中看到有關 paho 客戶端對象的更多信息。client

  • 接下來,我們使用代理的地址和端口號調用函數。connect()

  • 如果連接成功,該函數將返回 0。connect()

    讓我們分解一下客戶端對象:

    1.1 客戶端對象

    該對象創建 MQTT 客戶機。它需要4個可選參數:client()

    client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp")

    client_id是客戶端在連接到代理時給出的唯一字符串。如果您未提供客戶端 ID,代理將向客戶端分配一個客戶端 ID。

    每個客戶端必須具有唯一的客戶端 ID。代理使用客戶機 ID 來唯一標識每個用戶。如果使用相同的客戶端 ID 連接第二個客戶端,則第一個客戶端將斷開連接。

    clean_session是默認設置為True的布爾值。

    如果設置為False,則代理存儲有關客戶端的信息。
    如果設置為True,代理將刪除有關客戶端的所有存儲信息。
    要了解干凈會話,請參閱: MQTT 中的干凈會話說明

    userdata是可以作為參數發送到回調的數據。有關回調的更多信息,請參見第 4 節。

    protocol可以是其中之一,也可以取決于您要使用的版本。MQTTv31MQTTv311

    transport缺省值為 。如果要通過WebSockets發送消息,請設置為 。tcpwebsockets

    為了干凈地斷開與代理的連接,我們可以使用函數。disconnect()

    2. 發布消息

    要發布消息,我們使用該函數。該函數采用 4 個參數:publish()

    publish(topic, payload=None, qos=0, retain=False)

    topic是包含主題名稱的字符串。

    主題名稱區分大小寫。

    payload是一個字符串,其中包含將發布到主題的消息。訂閱該主題的任何客戶端都將看到有效負載消息。

    這些是可選參數:

    qos為 0、1 或 2。它默認為 0。服務質量是保證收到消息的級別。
    要了解不同的 MQTT 服務級別質量,請參閱此帖子
    : MQTT QoS 級別說明

    retain是默認為False的布爾值。如果設置為 True,則它告訴代理將該主題上的該消息存儲為"最后一條好消息"。
    要了解 MQTT 中的消息保留,請參閱以下內容:MQTT 保留的消息

    將發布函數添加到我們的代碼中:

    import paho.mqtt.client as mqtt broker_url = "mqtt.eclipse.org" broker_port = 1883 client = mqtt.Client() client.connect(broker_url, broker_port) client.publish(topic="TestingTopic", payload="TestingPayload", qos=0, retain=False)要檢查消息是否已成功發布到某個主題,我們需要一個訂閱該主題的客戶端。## 3. 訂閱主題要訂閱主題,我們需要該函數。該函數采用 2 個參數subscribe()```powershell subscribe("topicName", qos=0)

    topic是包含主題名稱的字符串。

    注意:主題名稱區分大小寫。

    qos為 0、1 或 2。它將降級為 0。

    讓我們修改我們的程序以訂閱要發布到的主題:

    import paho.mqtt.client as mqtt broker_url = "mqtt.eclipse.org" broker_port = 1883 client = mqtt.Client() client.connect(broker_url, broker_port) client.subscribe("TestingTopic", qos=1) client.publish(topic="TestingTopic", payload="TestingPayload", qos=1, retain=False) 注意:如果您希望客戶端訂閱多個主題,則可以將它們放在元組列表中。示例:client.subscribe(['topicName1', 1),('topicName2', 1)])元組的格式為 [(主題名稱,QoS 級別)]

    當您執行此程序時,您會注意到已發布的消息仍未顯示在控制臺/終端上。

    訂閱某個主題會告訴代理向您發送發布到該主題的消息。我們已經訂閱了該主題,但我們需要一個回調函數來處理這些消息。

  • 回調
    回調是在事件發生時執行的函數。在 paho 中,這些事件包括連接、斷開連接、訂閱、取消訂閱、發布、接收消息、日志記錄。
  • 在對程序實現回調之前,我們需要首先了解程序如何調用這些回調。為此,我們將使用 paho 中可用的不同循環函數。

    4.1 循環函數

    循環是客戶端對象的函數。當客戶端收到消息時,該消息將存儲在接收緩沖區中。當要將消息從客戶端發送到代理時,該消息將存儲在發送緩沖區中。循環函數用于處理緩沖區中的任何消息并調用相應的回調函數。它們還將嘗試在斷開連接時重新連接到代理。

    大多數循環函數都是異步運行的,這意味著當調用循環函數時,它將在單獨的線程上運行。

    有 3 種類型的循環函數:

    4.1.1 loop_forever():這是一個阻塞循環函數。這意味著循環函數將繼續運行,并且在調用此函數后無法執行任何其他行。如果要無限期地運行程序,并且程序中有單個客戶端構造函數,請使用此功能。

    import paho.mqtt.client as mqtt broker_url = "mqtt.eclipse.org" broker_port = 1883 client = mqtt.Client() client.connect(broker_url, broker_port) client.subscribe("TestingTopic", qos=1) client.publish(topic="TestingTopic", payload="TestingPayload", qos=1, retain=False) client.loop_forever()

    4.1.2 loop_start()/loop_stop(): 這是一個非阻塞循環函數,這意味著你可以調用這個函數,并在函數調用后繼續執行代碼。顧名思義,loop_start() 用于啟動循環函數,loop_stop() 用于停止循環函數。如果需要在同一程序中創建多個客戶端對象,則可以使用 loop_start()。

    import paho.mqtt.client as mqtt broker_url = "mqtt.eclipse.org" broker_port = 1883 client = mqtt.Client() client.connect(broker_url, broker_port) client.subscribe("TestingTopic", qos=1) client.publish(topic="TestingTopic", payload="TestingPayload", qos=1, retain=False) client.loop_start() #Some Executable Code Here client.loop_stop() #Client Loop Stops After loop_stop() is called

    4.1.3 loop():這是一個阻塞循環函數。loop() 和 loop_forever() 之間的區別在于,如果調用 loop() 函數,則必須手動處理重新連接,這與后者不同。loop_forever() & loop_start() 函數將在斷開連接時自動嘗試重新連接到代理。除非在特殊情況下,否則不建議使用 loop()。

    現在回到回調。在下一節中,我們將在程序中實現每個回調。

    4.2 on_connect

    每次客戶端連接/重新連接到代理時都會調用回調。讓我們將回調添加到我們的程序中。on_connect()

    import paho.mqtt.client as mqtt broker_url = "mqtt.eclipse.org" broker_port = 1883 def on_connect(client, userdata, flags, rc):print("Connected With Result Code: {}".format(rc)) def on_disconnect(client, userdata, rc):print("Client Got Disconnected") client = mqtt.Client() client.on_connect = on_connect client.connect(broker_url, broker_port) client.subscribe("TestingTopic", qos=1) client.publish(topic="TestingTopic", payload="TestingPayload", qos=1, retain=False) client.loop_forever()

    必須將回調分配給客戶端對象。如果不是,則回調將不會執行

    這是怎么回事?

    我們創建on_connect回調函數。它需要4個參數:客戶端對象,用戶數據,標志,rc。
    對象。client
    userdata是在客戶端構造函數中聲明的自定義數據。如果要將自定義數據傳遞到回調中,則需要它。
    flags是一個字典對象,用于檢查是否已將客戶端對象的 clean 會話設置為 True 或 False。
    rc這是result code,用于檢查連接狀態。不同的結果代碼是:

    0:連接成功 1:連接被拒絕 – 協議不正確 版本 2:連接被拒絕 – 客戶端標識符無效 3:連接被拒絕 – 服務器不可用 4:連接被拒絕 – 用戶名或密碼錯誤 5:連接被拒絕 – 未授權 6-255:當前未使用。

    該程序的輸出是:

    Connected With Result Code 0

    這意味著連接成功。

    4.3 on_disconnect
    當客戶端與代理斷開連接時,將調用回調。on_disconnect()

    import paho.mqtt.client as mqtt broker_url = "mqtt.eclipse.org" broker_port = 1883 def on_connect(client, userdata, flags, rc):print("Connected With Result Code " (rc)) def on_disconnect(client, userdata, rc):print("Client Got Disconnected") client = mqtt.Client() client.on_connect = on_connect client.connect(broker_url, broker_port) client.subscribe("TestingTopic", qos=1) client.publish(topic="TestingTopic", payload="TestingPayload", qos=1, retain=False) client.loop_forever()

    不要忘記將回調函數分配給客戶端對象!
    client.on_disconnect = on_disconnect

    運行該程序,斷開互聯網連接,然后查看是否打印了消息"客戶端已斷開連接"。此外,重新連接到互聯網以查看客戶端是否重新連接到代理。

    可以將多個客戶端對象附加到單個回調函數。當您的程序連接到多個代理時,這很有用。

    4.4 on_message

    回到盡管訂閱了主題但仍未顯示消息的問題:回調用于處理發布到已訂閱主題的消息。on_message()

    在我們的程序中,我們需要做3件事:1.
    訂閱我們要發布的主題。
    2. 使用回調處理已發布的消息。在我們的程序中,我們將簡單地打印消息。
    3. 將回調函數分配給客戶端對象的屬性。on_message

    讓我們創建另一個 mqtt 客戶端。此新程序將向已訂閱現有程序的主題發布消息。

    sub.py(接收客戶端)

    import paho.mqtt.client as mqtt broker_url = "mqtt.eclipse.org" broker_port = 1883 def on_connect(client, userdata, flags, rc):print("Connected With Result Code "+rc) def on_message(client, userdata, message):print("Message Recieved: "+message.payload.decode()) client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message client.connect(broker_url, broker_port) client.subscribe("TestingTopic", qos=1) client.publish(topic="TestingTopic", payload="TestingPayload", qos=1, retain=False) client.loop_forever()

    pub.py(發布客戶端)

    import paho.mqtt.client as mqtt broker_url = "mqtt.eclipse.org" broker_port = 1883 client = mqtt.Client() client.connect(broker_url, broker_port) client.publish(topic="TestingTopic", payload="TestingPayload", qos=1, retain=False)

    on_message() 回調有 3 個參數:、 & 。我已經解釋了本節中的前兩個。
    該對象有 4 個屬性:、 、 、 。clientuserdatamessageon_connect()messagetopicpayloadqosretain

    如果您注意到對象的每個屬性也是我們在客戶端函數中使用的參數。messagepublish()

    先運行,然后運行 。輸出將顯示消息和主題。sub.pypub.py

    如果郵件未打印出來,請檢查是否:

    您已將該函數添加到客戶端對象的屬性中。on_message()
    如果您已訂閱 了 中的回調中的主題。on_connect()sub.py
    如果已分別在 和 函數中正確輸入主題名稱。subscribe()publish()sub.pypub.py
    如果要在函數中打印消息。on_message()
    如何組織和處理消息?

    如果您從不同的主題收到多條消息,并且需要以不同的方式處理每個主題的消息,那么我們必須對消息進行排序。

    執行此操作的一種方法是使用該屬性檢查將消息發布到哪個主題。然后,您可以創建 if 條件來相應地處理消息。message.topic

    更好的方法是使用對象的功能。這將創建多個回調來處理來自不同主題的消息。
    參數采用主題名稱,參數采用將處理該主題消息的回調的名稱。message_callback_add(sub, callback)clientsubcallback

    例:

    sub.py

    import paho.mqtt.client as mqtt broker_url = "mqtt.eclipse.org" broker_port = 1883 def on_connect(client, userdata, flags, rc):print("Connected With Result Code " (rc)) def on_message_from_kitchen(client, userdata, message):print("Message Recieved from Kitchen: "+message.payload.decode()) def on_message_from_bedroom(client, userdata, message):print("Message Recieved from Bedroom: "+message.payload.decode()) def on_message(client, userdata, message):print("Message Recieved from Others: "+message.payload.decode()) client = mqtt.Client() client.on_connect = on_connect #To Process Every Other Message client.on_message = on_message client.connect(broker_url, broker_port) client.subscribe("TestingTopic", qos=1) client.subscribe("KitchenTopic", qos=1) client.subscribe("BedroomTopic", qos=1) client.message_callback_add("KitchenTopic", on_message_from_kitchen) client.message_callback_add("BedroomTopic", on_message_from_bedroom) client.loop_forever()

    在向其添加回調之前,請確保您已訂閱這兩個主題。

    默認情況下,來自其他主題的消息將由 on_message() 處理

    您可以通過發布到"KitchenTopic"和"臥室主題"并查看它是否單獨處理來嘗試上述程序。

    4.5 on_publish

    執行函數時調用該函數。這將返回一個元組 。on_publish()publish()(result, mid)

    result是錯誤代碼。錯誤代碼 0 表示消息已成功發布。

    mid代表消息 ID。它是一個整數,是客戶端分配的唯一消息標識符。如果使用 QoS 級別 1 或 2,則客戶端循環將使用 標識尚未發送的消息。mid

    4.6 on_subscribe()/on_unsubscribe()

    當客戶端訂閱主題時,將調用回調。例:on_subscribe()on_subscribe(client, userdata, mid, granted_qos)

    當客戶端取消訂閱某個主題時,將調用回調。例:on_unsubscribe()on_unsubscribe(client, userdata, mid)

    mid是函數中討論的消息 id。on_publish()

    granted_qos是訂閱時該主題的 qos 級別。如果需要,您可以在程序中嘗試一下。

    5. 其他有用的 Paho-MQTT 功能

    Paho還有一些有用的函數,可以在程序中使用它們來執行函數,而不必創建客戶端構造函數并經歷創建回調的麻煩。

    5.1 單() / 多() 發布

    單個或多個函數用于將單個消息或多條消息發布到代理上的主題,而無需創建客戶機對象。

    single(topic, payload=None, qos=0, retain=False, hostname=broker_url, port=broker_port, client_id="", keepalive=60, will=None, auth=None, tls=None, protocol=mqtt.MQTTv311, transport="tcp")

    topic是唯一必需的參數。這是一個包含主題名稱的字符串。

    auth是包含用戶名和密碼的字典(如果代理需要)。(eclipse 代理不需要身份驗證)。
    例:auth = {‘username’:“username”, ‘password’:""}

    tls如果我們使用 TLS/SSL 加密,則需要。
    例:dict = {‘ca_certs’:"", ‘certfile’:"", ‘keyfile’:"", ‘tls_version’:"", ‘ciphers’:"<ciphers">}

    transport接受 2 個值:和 。如果您不想使用websockets,請將其設置為 。websocketstcptcp

    其他參數類似于客戶端對象中的參數。

    multiple(msgs, hostname=broker_url, port=broker_port, client_id="", keepalive=60, will=None, auth=None, tls=None, protocol=mqtt.MQTTv311, transport="tcp")

    msgs是必需的參數。它包含要發布的消息的列表。每條消息都可以是字典或元組。

    字典必須采用以下格式:
    msg = {‘topic’:"< topicname >", ‘payload’:"", ‘qos’:‘0’, ‘retain’:‘False’}

    元組必須采用以下格式:
    ("< topicname >", “< payload >”, qos, retain)

    在這兩種格式中,名稱都必須存在。topic

    5.2 簡單() / 回調()

    simple 是一個阻止函數,它訂閱一個主題或一個主題列表,并返回發布到該主題的消息。

    simple(topics, qos=0, msg_count=1, retained=False, hostname=“localhost”, port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None, protocol=mqtt.MQTTv311)

    topics是唯一必需的參數。這可以是單個主題的字符串,也可以是多個主題的列表。

    msg_count是在斷開與代理的連接之前必須返回的消息數。對于 msg_count > 1,該函數將返回消息列表。

    其他參數已經在 single() / multiple() 部分中進行了解釋。

    callback與 相似,唯一的區別是它需要一個額外的參數,即回調。簡單函數僅返回來自主題的消息,回調函數將返回的消息發送到任何函數進行處理。simple
    callback(callback, topics, qos=0, userdata=None, hostname=“iot.eclipse.org”, port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None, protocol=mqtt.MQTTv311)

    6. Will_set()

    這是一個非常有用的功能。當客戶端連接到代理時,它會告訴代理,如果它斷開連接,它必須向主題發布消息。它是客戶端構造函數的一個屬性。

    client.will_set(topic, payload=None, qos=0, retain=False)

    要了解如何以及何時在MQTT中使用Last Will & Testament,請參閱這篇文章
    :MQTT Last Will and Testament(用示例解釋)

    您的反饋對我來說非常重要。如果本教程對您有所幫助,或者,如果文章中的某些部分有錯誤或解釋不正確,請在下面的評論中告訴我。

    總結

    以上是生活随笔為你收集整理的使用 Python MQTT 客户端 Paho-MQTT 的初学者指南的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。