日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

emq数据储存到mysql_EMQ X 规则引擎系列(三)存储消息到 InfluxDB 时序数据库

發(fā)布時間:2024/1/23 数据库 43 豆豆
生活随笔 收集整理的這篇文章主要介紹了 emq数据储存到mysql_EMQ X 规则引擎系列(三)存储消息到 InfluxDB 时序数据库 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

前言

InfluxDB 是一個用于存儲和分析時間序列數(shù)據(jù)的開源數(shù)據(jù)庫,內(nèi)置 HTTP API,類 SQL 語句的支持和無結(jié)構(gòu)的特性對使用者而言都非常友好。它強大的數(shù)據(jù)吞吐能力以及穩(wěn)定的性能表現(xiàn)使其非常適合 IoT 領(lǐng)域。

通過 EMQ X 消息引擎,我們可以自定義 Template 文件,然后將 Json 格式的 MQTT 消息轉(zhuǎn)換為 Measurement 寫入 InfluxDB:

場景介紹

該場景需要將 EMQ X 指定主題下且滿足條件的消息存儲到 InfluxDB 時序數(shù)據(jù)庫。為了便于后續(xù)分析檢索,消息內(nèi)容需要進行拆分存儲。

該場景下客戶端上報數(shù)據(jù)如下:

Topic:data/sensor

Payload:

{

"location": "bedroom",

"data": {

"temperature": 25,

"humidity": 46.4,

"pm2_5": 0.5

}

}

準(zhǔn)備工作

數(shù)據(jù)庫安裝及初始化

創(chuàng)建 db 數(shù)據(jù)庫并開放 8089 UDP 端口。

$ docker pull influxdb

$ git clone -b v1.0.0 https://github.com/palkan/influx_udp.git

$ cd influx_udp

$ docker run --name=influxdb --rm -d -p 8086:8086 -p 8089:8089/udp -v ${PWD}/files/influxdb.conf:/etc/influxdb/influxdb.conf:ro -e INFLUXDB_DB=db influxdb:latest

配置說明

創(chuàng)建資源

打開 EMQ X Dashboard,進入左側(cè)菜單的 資源 頁面,點擊 新建 按鈕,選擇 InfluxDB 資源類型并完成相關(guān)配置進行資源創(chuàng)建。

創(chuàng)建規(guī)則

進入左側(cè)菜單的 規(guī)則 頁面,點擊 新建 按鈕,進行規(guī)則創(chuàng)建。觸發(fā)事件 選擇 message.publish,即在 EMQ X 收到 PUBLISH 消息時觸發(fā)該規(guī)則進行數(shù)據(jù)處理。

選定觸發(fā)事件后,我們可在界面上看到可選字段及示例 SQL:

篩選所需字段

規(guī)則引擎使用 SQL 語句過濾和處理數(shù)據(jù)。例如前文提到的場景中我們需要將 payload 中的字段提取出來使用,則可以通過 payload. 實現(xiàn)。同時我們僅僅期望處理 data/sensor 主題,那么可以在 WHERE 子句中使用主題通配符 =~ 對 topic 進行篩選:topic =~ 'data/sensor', 最終我們得到 SQL 如下:

SELECT

payload.location as location,

payload.data.temperature as temperature,

payload.data.humidity as humidity,

payload.data.pm2_5 as pm2_5

FROM

"message.publish"

WHERE

topic =~ 'data/sensor'

SQL 測試

借助 SQL 測試功能,我們可以快速確認剛剛填寫的 SQL 語句是否能達到我們的目的。首先填寫用于測試的 payload 等數(shù)據(jù)如下:

然后點擊 測試 按鈕,得到以下輸出結(jié)果,與預(yù)期相符。

{

"humidity": 46.4,

"location": "bedroom",

"pm2_5": 0.5,

"temperature": 25

}

添加響應(yīng)動作,存儲消息到 InfluxDB

SQL 條件輸入輸出無誤后,我們繼續(xù)添加響應(yīng)動作,配置寫入 SQL 語句,將篩選結(jié)果存儲到 InfluxDB。

點擊響應(yīng)動作中的 添加 按鈕,選擇動作 保存數(shù)據(jù)到 InfluxDB,選取剛剛創(chuàng)建的 InfluxDB 資源,再按照實際需求將 ${fieldName} 填寫到 Field Keys, Tag Keys 和 Timestamp Key 中,Measurement 表示將數(shù)據(jù)寫入 InfluxDB 時使用的 Measurement,最后點擊 新建 按鈕完成規(guī)則創(chuàng)建。

測試

預(yù)期結(jié)果

我們成功創(chuàng)建了一條規(guī)則,包含一個處理動作,動作期望效果如下:

客戶端向 data/sensor 主題上報消息時,該消息將命中規(guī)則,規(guī)則列表中 已命中 數(shù)字將會增加 1;

InfluxDB 的 db 數(shù)據(jù)庫中將會增加一條數(shù)據(jù),數(shù)據(jù)內(nèi)容與處理后的消息內(nèi)容一致。

使用 Dashboard 中的 Websocket 工具測試

切換到 工具 --> Websocket 頁面,使用任意 Client ID 連接到 EMQ X,連接成功后在 消息 卡片中發(fā)送如下消息:

Topic:data/sensor

Payload:

{

"location": "bedroom",

"data": {

"temperature": 25,

"humidity": 46.4,

"pm2_5": 0.5

}

}

點擊 發(fā)送 按鈕,發(fā)送成功后可以看到當(dāng)前規(guī)則已命中次數(shù)已經(jīng)變?yōu)?1。

然后檢查 InfluxDB,新的 data point 是否添加成功:

$ docker exec -it influxdb influx

> use db

Using database db

> select * from "sensor_data"

name: sensor_data

time humidity location pm2_5 temperature

---- -------- -------- ----- -----------

1561535778444457348 46.4 bedroom 0.5 25

至此,我們通過規(guī)則引擎實現(xiàn)了存儲消息到 InfluxDB 數(shù)據(jù)庫的業(yè)務(wù)開發(fā)。

在閱讀該教程之前,假定你已經(jīng)了解 MQTT、EMQ X 的簡單知識。

超強干貨來襲 云風(fēng)專訪:近40年碼齡,通宵達旦的技術(shù)人生

總結(jié)

以上是生活随笔為你收集整理的emq数据储存到mysql_EMQ X 规则引擎系列(三)存储消息到 InfluxDB 时序数据库的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。