日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

Python paho-mqtt消息队列

發布時間:2025/3/15 python 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Python paho-mqtt消息队列 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

官方文檔

https://docs.emqx.io/broker/v3/cn/

安裝

pip install paho-mqtt

?

服務接收測試:

mosquitto_sub -h 192.168.203.13 -u honey -P honey -t secevent/# -v mosquitto_sub -h 192.168.203.13 -u honey -P honey -t hserver/# -v

客戶端代碼:

#!/usr/bin/env python # --*-- coding:UTF-8 --*--import time import sys import paho.mqtt.client as mqttmq_send = "secevent/P0001/events" mq_send_heartbeat = "hserver/P0001/status"def mq_init():def on_connect(client, userdata, flags, rc):logger.debug("mqtt connect:" + str(rc))if rc == 0:logger.info("Mq connect success.")else:logger.error("Mq connect faild.")def on_message(client, userdata, msg):logger.debug(msg.tocic + " " + str(msg.payload))try:global_par.mq = mqtt.Client()global_par.mq.loop_start()global_par.mq.on_connect = on_connectglobal_par.mq.on_message = on_messageglobal_par.mq.username_pw_set("honey", "honey")global_par.mq.connect("192.168.203.13", 1883)mq.publish(mq_send, "aaaaa", 2)mq.publish(mq_send_heartbeat, "bbbbb", 2, retain=True) #保留最后一條記錄except Exception as e:logger.error("Mq init error:{0}".format(e))

?

一、Client模塊

與MQTT代理(broker)進行通信的主要類。

(一)使用流程

  • 使用connect()/connect_async()?連接MQTT代理
  • 頻繁的調用loop()來維持與MQTT代理之間的流量?
    • 或者使用loop_start()來設置一個線程為你調用loop()
    • 或者在一個阻塞的函數中調用loop_forever()來為你調用loop()
  • 使用subscribe()訂閱一個主題(topic)并接受消息(messages)
  • 使用publish()來發送消息
  • 使用disconnect()來斷開與MQTT代理的連接

(二)回調(Callbacks)

1.基本概念

使用回調處理從MQTT代理返回的數據,要使用回調需要先定義回調函數然后將其指派給客戶端實例(client)。?
例如:

# 定義一個回調函數 def on_connect(client, userdata, flags, rc):print("Connection returned " + str(rc))# 將回調函數指派給客戶端實例 client.on_connect = on_connect

所有的回調函數都有client和userdata參數。?
client是調用回調的客戶端實例;?
userdata可以使任何類型的用戶數據,可以在創建新客戶端實例時設置或者使用user_data_set(userdata)設置。

2.回調種類

(1)on_connect()

當代理響應連接請求時調用。?
on_connect(client, userdata, flags, rc):?
flags是一個包含代理回復的標志的字典;?
rc的值決定了連接成功或者不成功:

值連接情況
0連接成功
1協議版本錯誤
2無效的客戶端標識
3服務器無法使用
4錯誤的用戶名或密碼
5未經授權

(2)on_disconnect()

當與代理斷開連接時調用

on_disconnect(client, userdata, rc):

rc參數表示斷開狀態。?
如果MQTT_ERR_SUCCESS(0),回調被調用以響應disconnect()調用。 如果以任何其他值斷開連接是意外的,例如可能出現網絡錯誤。

(3)on_message()

on_message(client, userdata, message):

當收到關于客戶訂閱的主題的消息時調用。?
message是一個描述所有消息參數的MQTTMessage。

(4)on_publish()

當使用使用publish()發送的消息已經傳輸到代理時被調用。

對于Qos級別為1和2的消息,這意味著已經完成了與代理的握手。?
對于Qos級別為0的消息,這只意味著消息離開了客戶端。?
mid變量與從相應的publish()返回的mid變量匹配,以允許跟蹤傳出的消息。

此回調很重要,因為即使publish()調用返回,但并不總意味著消息已發送。

(5)on_subscribe()

當代理響應訂閱請求時被調用。

on_subscribe(client, userdata, mid, granted_qos):

mid變量匹配從相應的subscri?
be()返回的mid變量。?
‘granted_qos’變量是一個整數列表,它提供了代理為每個不同的訂閱請求授予的QoS級別。

(6)on_unsubscribe()

當代理響應取消訂閱請求時調用。

on_unsubscribe(client, userdata, mid):

mid匹配從相應的unsubscribe()調用返回的中間變量。

(7)on_log()

當客戶端有日志信息時調用

on_log(client, userdata, level, buf):

level變量給出了消息的嚴重性,并且將是MQTT_LOG_INFO,MQTT_LOG_NOTICE,MQTT_LOG_WARNING,MQTT_LOG_ERR和MQTT_LOG_DEBUG中的一個。?
buf變量用于存儲信息。

(三)方法

1.構造函數Client()

Client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp") 參數含義
client_id連接到代理時使用的唯一客戶端ID字符串。 如果client_id長度為零或無,則會隨機生成一個。 在這種情況下,clean_session參數必須為True。
clean_session一個決定客戶端類型的布爾值。 如果為True,那么代理將在其斷開連接時刪除有關此客戶端的所有信息。 如果為False,則客戶端是持久客戶端,當客戶端斷開連接時,訂閱信息和排隊消息將被保留。
userdata用戶定義的任何類型的數據作為userdata參數傳遞給回調函數。 它可能會在稍后使用user_data_set()函數進行更新。
protocol用于此客戶端的MQTT協議的版本。 可以是MQTTv31或MQTTv311。
transport設置為“websockets”通過WebSockets發送MQTT。 保留默認的“tcp”使用原始TCP。

示例:

import paho.mqtt.client as mqtt client = mqtt.Client()

2.reinitialise()

reinitialise(client_id="", clean_session=True, userdata=None)

reinitialise()函數將客戶端重置為其開始狀態,就像它剛剛創建一樣。 它采用與Client()構造函數相同的參數。?
示例:

client.reinitialise()

3.選項函數

這些函數表示可以在客戶端上設置以修改其行為的選項。 在大多數情況下,這必須在連接到代理之前完成。

(1)max_inflight_messages_set()

max_inflight_messages_set(self, inflight)

設置QoS> 0的消息的最大數量,該消息一次可以部分通過其網絡流量。默認為20.增加此值將消耗更多內存,但可以增加吞吐量。

(2)max_queued_messages_set()

max_queued_messages_set(self, queue_size)

設置傳出消息隊列中可等待處理的具有QoS> 0的傳出消息的最大數量。默認為0表示無限制。 當隊列已滿時,任何其他傳出的消息都將被丟棄。

(3)message_retry_set()

message_retry_set(retry)

如果代理沒有響應,設置在重發QoS> 0的消息之前以秒為單位的時間。默認設置為5秒,通常不需要更改。

(4)ws_set_options()

ws_set_options(self, path="/mqtt", headers=None)

設置websocket連接選項。 只有在transport =“websockets”被傳入Client()構造函數時才會使用這些選項。

參數含義
path代理使用的mqtt路徑
headers可以是一個字典,指定應該附加到標準websocket頭部的額外頭部列表,也可以是可調用的正常websocket頭部并返回帶有一組頭部以連接到代理的新字典。

必須在調用connect()之前調用。

(4)tls_set()

tls_set(ca_certs=None, certfile=None, keyfile=None, cert_reqs=ssl.CERT_REQUIRED,tls_version=ssl.PROTOCOL_TLS, ciphers=None)

配置網絡加密和身份驗證選項。 啟用SSL / TLS支持。

參數含義
ca_certs證書頒發機構證書文件的字符串路徑,該證書文件將被視為受此客戶端信任。
certfile, keyfile分別指向PEM編碼的客戶端證書和私鑰的字符串。 如果這些參數不是None,那么它們將用作基于TLS的身份驗證的客戶端信息。 對此功能的支持取決于代理。
cert_reqs定義客戶對經紀人施加的證書要求。 默認情況下,這是ssl.CERT_REQUIRED,這意味著代理必須提供證書。 有關此參數的更多信息,請參閱ssl pydoc。
tls_version指定要使用的SSL / TLS協議的版本。 默認情況下(如果python版本支持它),檢測到最高的TLS版本。
ciphers指定哪些加密密碼可供此連接使用的字符串,或者使用None來使用默認值。 有關更多信息,請參閱ssl pydoc。

必須在調用connect()之前調用。

(5)tls_set_context()

配置網絡加密和認證上下文。 啟用SSL / TLS支持。

tls_set_context(context=None) 參數含義
context一個ssl.SSLContext對象。

必須在調用connect()之前調用。

(6)tls_insecure_set()

配置服務器證書中服務器主機名的驗證。

tls_insecure_set(value)

如果value設置為True,則不可能保證您連接的主機不模擬您的服務器。 這在初始服務器測試中可能很有用,但是,惡意的第三方通過可以DNS欺騙模擬您的服務器。

  • 請勿在真實系統中使用此功能。 將值設置為True意味著使用加密沒有意義。
  • 必須在connect*()之前和tls_set()或tls_set_context()之后調用。

(7)enable_logger()

使用標準的Python日志包啟用日志記錄。 這可以與on_log回調方法同時使用

enable_logger(logger=None)

如果指定了記錄器,那么將使用該logging.Logger對象,否則將自動創建一個。?
按照以下映射將Paho日志記錄級別轉換為標準日志級別:

Pahologging
MQTT_LOG_ERRligging.ERROR
MQTT_LOG_WARNINGlogging.WARNING
MQTT_LOG_NOTICElogging.INFO (no direct equivalent)
MQTT_LOG_INFOlogging.INFO
MQTT_LOG_DEBUGlogging.DEBUG

(8)disable_logger()

使用標準python日志包禁用日志記錄。 這對on_log回調沒有影響。

disable_logger()

(9)username_pw_set()

為代理認證設置一個用戶名和一個可選的密碼。必須在connect*()之前調用。

username_pw_set(username, password=None)

(10)user_data_set()

設置在生成事件時將傳遞給回調的私人用戶數據。 將其用于您自己的目的以支持您的應用程序。

user_data_set(userdata)

(11)will_set()

設置要發送給代理的遺囑。 如果客戶端斷開而沒有調用disconnect(),代理將代表它發布消息。

will_set(topic, payload=None, qos=0, retain=False) 參數含義
topic該遺囑消息發布的主題
payload該消息將作為遺囑發送
qos用于遺囑的服務質量等級
retain如果設置為True,遺囑消息將被設置為該主題的“最后已知良好”/保留消息。

如果qos不是0,1或2,或者主題為None或字符串長度為零,則引發ValueError。

(11)reconnect_delay_set()

客戶端將自動重試連接。 在每次嘗試之間,它會在min_delay和max_delay之間等待幾秒鐘。

reconnect_delay_set(min_delay=1, max_delay=120)

當連接丟失時,最初重新連接嘗試延遲min_delay秒。 延遲在隨后的嘗試到中增加一倍。當連接完成時(例如收到CONNACK,而不僅僅是TCP連接建立),延遲重置為min_delay。

4.connect()

connect()函數將客戶端連接到代理。 這是一個阻塞函數。

connect(host, port=1883, keepalive=60, bind_address="") 參數含義
host遠程代理的主機名或IP地址
port要連接的服務器主機的網絡端口。 默認為1883
keepalive與代理通信之間允許的最長時間段(以秒為單位)。 如果沒有其他消息正在交換,則它將控制客戶端向代理發送ping消息的速率
bind_address假設存在多個接口,將綁定此客戶端的本地網絡接口的IP地址

5.connect_async()

與loop_start()一起使用以非阻塞方式連接。 直到調用loop_start()之前,連接才會完成。

connect_async(host, port=1883, keepalive=60, bind_address="")

6.connect_srv()

使用SRV DNS查找連接到代理以獲取代理地址。

connect_srv(domain, keepalive=60, bind_address="") 參數含義
domain該DNS域搜索SRV記錄。 如果無,請嘗試確定本地域名。

7.reconnect()

使用先前提供的詳細信息重新連接到經紀商。 在調用此函數之前,您必須先調用connect *()。

reconnect()

8.disconnect()

干凈地從代理斷開連接。 使用disconnect()不會導致代理發送遺囑消息。

disconnect()

9.loop()

定期調用處理網絡事件。

loop(timeout=1.0, max_packets=1)

此調用在select()中等待,直到網絡套接字可用于讀取或寫入(如果適用),然后處理傳入/傳出數據。

參數含義
timeout此方法最多可阻塞timeout秒
max_packetsmax_packets參數已過時,應保留為未設置狀態。

示例:

run = True while run:client.loop()

10.loop_start() / loop_stop()

這些功能實現了到網絡循環的線程接口。

loop_start() loop_stop(force=False)

在connect*()之前或之后調用loop_start()一次,會在后臺運行一個線程來自動調用loop()。這釋放了可能阻塞的其他工作的主線程。這個調用也處理重新連接到代理。?
調用loop_stop()來停止后臺線程。

force參數目前被忽略。?
示例:

client.connect("iot.eclipse.org") client.loop_start()while True:temperature = sensor.blocking_read()client.publish("paho/temperature", temperature)

11.loop_forever()

這是網絡循環的阻塞形式,直到客戶端調用disconnect()時才會返回。它會自動處理重新連接。

loop_forever(timeout=1.0, max_packets=1, retry_first_connection=False)

除了使用connect_async時的第一次連接嘗試以外,請使用retry_first_connection = True使其重試第一個連接。這可能會導致客戶端連接到一個不存在的主機的情況

12.publish()

從客戶端發送消息給代理。

publish(topic, payload=None, qos=0, retain=False)

消息將會發送給代理,并隨后從代理發送到訂閱匹配主題的任何客戶端。

參數含義
topic該消息發布的主題
payload要發送的實際消息。如果沒有給出,或設置為無,則將使用零長度消息。 傳遞int或float將導致有效負載轉換為表示該數字的字符串。 如果你想發送一個真正的int / float,使用struct.pack()來創建你需要的負載
qos服務的質量級別
retain如果設置為True,則該消息將被設置為該主題的“最后已知良好”/保留的消息

返回以下屬性和方法的MQTTMessageInfo:?
rc:發布的結果。

內容含義
MQTT_ERR_SUCCESS成功
MQTT_ERR_NO_CONN客戶端當前未連接
MQTT_ERR_QUEUE_SIZE當使用max_queued_messages_set來指示消息既不排隊也不發送。

mid:發布請求的消息ID。?
如果mid已定義,則可以通過檢查on_publish()回調中的mid來跟蹤發布請求。?
wait_for_publish():函數將阻塞,直到消息發布。 如果消息未排隊(rc == MQTT_ERR_QUEUE_SIZE),它將引發ValueError。?
is_published:如果消息已發布,is_published返回True。 如果消息未排隊(rc == MQTT_ERR_QUEUE_SIZE),它將引發ValueError。

如果主題為無,長度為零或無效(包含通配符),qos不是0,1或2之一,或者有效負載長度大于268435455字節,則會引發ValueError。

13.subscribe()

subscribe(topic, qos=0)

訂閱一個或多個主題。?
這個函數可以用三種不同的方式調用:

(1)簡單的字符串和整數

subscribe("my/topic", 2) 參數值
topic一個字符串,指定要訂閱的訂閱主題
qos期望的服務質量等級。 默認為0。

(2)字符串和整數元組

subscribe(("my/topic", 1)) 參數值
topic(topic,qos)的元組。 主題和qos都必須存在于元組中。
qos沒有使用

(3)字符串和整數元組的列表

這允許在單個SUBSCRIPTION命令中使用多個主題訂閱,這比使用多個訂閱subscribe()更有效。

subscribe([("my/topic", 0), ("another/topic", 2)]) 參數值
topic格式元組列表(topic,qos)。 topic和qos都必須出現在所有的元組中。
qos沒有使用

該函數返回一個元組(result,mid)。

如果qos不是0,1或2,或者主題為None或字符串長度為零,或者topic不是字符串,元組或列表,則引發ValueError。

14.unsubscribe()

取消訂閱一個或多個主題。

unsubscribe(topic) 參數含義
topic主題的單個字符串或者字符串列表

返回一個元組(result, mid)

15.外部事件循環支持

(1)loop_read()

loop_read(max_packets=1)

當套接字準備好讀取時調用。 max_packets已過時,應保持未設置狀態。

(2)loop_write()

loop_write(max_packets=1)

當套接字準備好寫入時調用。 max_packets已過時,應保持未設置狀態。

(3)loop_misc()

loop_misc()

每隔幾秒呼叫一次以處理消息重試和ping。

(4)socket()

socket()

返回客戶端中使用的套接字對象,以允許與其他事件循環進行交互。

(5)want_write()

want_write()

如果有數據等待寫入,則返回true,以允許將客戶端與其他事件循環連接。

16.全局輔助函數

client模塊還提供了一些全局幫助函數。?
(1)topic_matches_sub(sub,topic)可用于檢查主題是否與預訂匹配。?
(2)connack_string(connack_code)返回與CONNACK結果關聯的錯誤字符串。?
(3)error_string(mqtt_errno)返回與Paho MQTT錯誤號關聯的錯誤字符串。

?

二、Publish模塊

該模塊提供了一些幫助功能,可以以一次性方式直接發布消息。換句話說,它們對于您想要發布給代理的單個/多個消息然后斷開與其他任何必需的連接的情況非常有用。

(一)方法

1.Single

將一條消息發布給代理,然后徹底斷開連接。

single(topic, payload=None, qos=0, retain=False, hostname="localhost",port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None,protocol=mqtt.MQTTv311, transport="tcp") 參數含義
topic唯一必需的參數必須是負載將發布到的主題字符串。
payload要發布的有效載荷。 如果“”或None,零長度的有效載荷將被發布
qos發布時使用的qos默認為0
retain設置消息保留(True)或不(False)
hostname一個包含要連接的代理地址的字符串。 默認為localhost
port要連接到代理的端口。 默認為1883
client_id要使用的MQTT客戶端ID。 如果“”或None,Paho庫會自動生成客戶端ID
keepalive客戶端的存活超時值。 默認為60秒
will一個包含客戶端遺囑參數的字典,will = {‘topic’: “<topic>”, ‘payload’:”<payload”>, ‘qos’:<qos>, ‘retain’:<retain>}.
auth一個包含客戶端驗證參數的字典,auth = {‘username’:”<username>”, ‘password’:”<password>”}
tls一個包含客戶端的TLS配置參數的字典,dict = {‘ca_certs’:”<ca_certs>”, ‘certfile’:”<certfile>”, ‘keyfile’:”<keyfile>”, ‘tls_version’:”<tls_version>”, ‘ciphers’:”<ciphers”>}
protocol選擇要使用的MQTT協議的版本。 使用MQTTv31或MQTTv311。
transport設置為“websockets”通過WebSockets發送MQTT。 保留默認的“tcp”使用原始TCP。

示例:

import paho.mqtt.publish as publishpublish.single("paho/test/single", "payload", hostname="iot.eclipse.org")

2.Multiple

將多條消息發布給代理,然后干凈地斷開連接。

multiple(msgs, hostname="localhost", port=1883, client_id="", keepalive=60,will=None, auth=None, tls=None, protocol=mqtt.MQTTv311, transport="tcp") 參數含義
msgs要發布的消息列表。 每條消息是一個字典或一個元組。msg = {‘topic’:”<topic>”, ‘payload’:”<payload>”, ‘qos’:<qos>, ‘retain’:<retain>}或(“<topic>”, “<payload>”, qos, retain)

有關hostname,port,client_id,keepalive,will,auth,tls,protocol,transport的描述,請參閱single()。?
示例:

import paho.mqtt.publish as publishmsgs = [{'topic':"paho/test/multiple", 'payload':"multiple 1"},("paho/test/multiple", "multiple 2", 0, False)] publish.multiple(msgs, hostname="iot.eclipse.org")

三、Subscribe模塊

該模塊提供了一些幫助功能,以允許直接訂閱和處理消息。

(1)方法

1.Simple

訂閱一組主題并返回收到的消息。 這是一個阻塞函數。

simple(topics, qos=0, msg_count=1, retained=False, hostname="localhost",port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None,protocol=mqtt.MQTTv311) 參數含義
topics唯一需要的參數是客戶端將訂閱的主題字符串。 如果需要訂閱多個主題,這可以是字符串或字符串列表
qos訂閱時使用的qos默認為0
msg_count從代理檢索的消息數量。 默認為1.如果為1,則返回一個MQTTMessage對象。 如果> 1,則返回MQTTMessages列表
retained設置為True以考慮保留的消息,將其設置為False以忽略具有保留標志設置的消息
hostname一個包含要連接的代理地址的字符串。 默認為localhost
port要連接到代理的端口。 默認為1883
client_id要使用的MQTT客戶端ID。 如果“”或None,Paho庫會自動生成客戶端ID
keepalive客戶端的存活超時值。 默認為60秒。
will一個包含客戶端遺囑參數的字典,will = {‘topic’: “<topic>”, ‘payload’:”<payload”>, ‘qos’:<qos>, ‘retain’:<retain>}.
auth一個包含客戶端驗證參數的字典,auth = {‘username’:”<username>”, ‘password’:”<password>”}
tls一個包含客戶端的TLS配置參數的字典,dict = {‘ca_certs’:”<ca_certs>”, ‘certfile’:”<certfile>”, ‘keyfile’:”<keyfile>”, ‘tls_version’:”<tls_version>”, ‘ciphers’:”<ciphers”>}
protocol選擇要使用的MQTT協議的版本。 使用MQTTv31或MQTTv311。

2.Callback

訂閱一組主題并使用用戶提供的回叫處理收到的消息。

callback(callback, topics, qos=0, userdata=None, hostname="localhost",port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None,protocol=mqtt.MQTTv311) 參數含義
callback一個“on_message”回調將被用于每個收到的消息
topics客戶端將訂閱的主題字符串。 如果需要訂閱多個主題,這可以是字符串或字符串列表。
qos訂閱時使用的qos默認為0
userdata用戶提供的對象將在收到消息時傳遞給on_message回調函數

有關hostname,port,client_id,keepalive,will,auth,tls,protocol的描述,請參閱simple()。?
示例:

import paho.mqtt.subscribe as subscribedef on_message_print(client, userdata, message):print("%s %s" % (message.topic, message.payload))subscribe.callback(on_message_print, "paho/test/callback", hostname="iot.eclipse.org"

參考資料:https://pypi.python.org/pypi/paho-mqtt

總結

以上是生活随笔為你收集整理的Python paho-mqtt消息队列的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。