emqx 使用端口_数据传输、存储、展现,EMQ X + TDengine 搭建 MQTT 物联网数据可视化平台...
物聯(lián)網(wǎng)數(shù)據(jù)采集涉及到大量設(shè)備接入、海量的時(shí)序數(shù)據(jù)傳輸,EMQ X 消息中間件與 TDengine 大數(shù)據(jù)平臺(tái)的組合技術(shù)棧完全能夠勝任場(chǎng)景中的海量時(shí)間序列監(jiān)測(cè)數(shù)據(jù)的傳輸、存儲(chǔ)和計(jì)算。
數(shù)據(jù)入庫后,往往需要其他方式如數(shù)據(jù)可視化系統(tǒng)將數(shù)據(jù)按照規(guī)則統(tǒng)計(jì)、展現(xiàn)出來,實(shí)現(xiàn)數(shù)據(jù)的監(jiān)控、指標(biāo)統(tǒng)計(jì)等業(yè)務(wù)需求,以便充分發(fā)揮數(shù)據(jù)的價(jià)值,TDengine 搭配開源軟件 Grafana 可以快速搭建物聯(lián)網(wǎng)數(shù)據(jù)可視化平臺(tái)。
上述整套方案無需代碼開發(fā),涉及的產(chǎn)品均能提供開源軟件、企業(yè)服務(wù)、云端 SaaS 服務(wù)不同層次的交付模式,能夠根據(jù)項(xiàng)目需求實(shí)現(xiàn)免費(fèi)版或企業(yè)版私有化落地以及云端部署。
方案介紹? ?EMQ X 簡(jiǎn)介
EMQ X(https://www.emqx.io/cn/)?是基于高并發(fā)的 Erlang/OTP 語言平臺(tái)開發(fā),支持百萬級(jí)連接和分布式集群架構(gòu),發(fā)布訂閱模式的開源 MQTT 消息服務(wù)器。EMQ X 內(nèi)置了大量開箱即用的功能,其開源版 EMQ X Broker 及企業(yè)版 EMQ X Enterprise 均支持通過規(guī)則引擎將設(shè)備消息存儲(chǔ)到 TDengine。
? ?TDengine 是什么
TDengine 是濤思數(shù)據(jù)專為物聯(lián)網(wǎng)、車聯(lián)網(wǎng)、工業(yè)互聯(lián)網(wǎng)、IT 運(yùn)維等設(shè)計(jì)和優(yōu)化的大數(shù)據(jù)平臺(tái)。除核心的快 10 倍以上的時(shí)序數(shù)據(jù)庫功能外,還提供緩存、數(shù)據(jù)訂閱、流式計(jì)算等功能,最大程度減少研發(fā)和運(yùn)維的復(fù)雜度,且核心代碼,包括集群功能全部開源。
TDengine 提供社區(qū)版、企業(yè)版和云服務(wù)版,安裝/使用教程詳見 TDengine 使用文檔 (https://www.taosdata.com/cn/documentation20)。
? ?Grafana 簡(jiǎn)介
Grafana 是一個(gè)跨平臺(tái)、開源的度量分析和可視化工具,可以查詢處理各類數(shù)據(jù)源中的數(shù)據(jù),進(jìn)行可視化的展示。它可以快速靈活創(chuàng)建的客戶端圖表,面板插件有許多不同方式的可視化指標(biāo)和日志,官方庫中具有豐富的儀表盤插件,比如熱圖、折線圖、圖表等多種展示方式;支持 Graphite,TDengine、InfluxDB,OpenTSDB,Prometheus,Elasticsearch,CloudWatch和 KairosDB 等數(shù)據(jù)源,支持?jǐn)?shù)據(jù)項(xiàng)獨(dú)立/混合查詢展示;可以創(chuàng)建自定義告警規(guī)則并通知到其他消息處理服務(wù)或組件中。
業(yè)務(wù)場(chǎng)景本文模擬物聯(lián)網(wǎng)環(huán)境數(shù)據(jù)采集場(chǎng)景,假設(shè)現(xiàn)有一定數(shù)據(jù)的環(huán)境數(shù)據(jù)采集點(diǎn),所有采集點(diǎn)數(shù)據(jù)均通過 MQTT 協(xié)議傳輸至采集平臺(tái)(MQTT Publish),主題設(shè)計(jì)如下:
sensor/data傳感器發(fā)送的數(shù)據(jù)格式為 JSON,數(shù)據(jù)包括傳感器采集的溫度、濕度、噪聲音量、PM10、PM2.5、二氧化硫、二氧化氮、一氧化碳、傳感器 ID、區(qū)域、采集時(shí)間等數(shù)據(jù)。
{ "temperature": 30, "humidity" : 20, "volume": 44.5, "PM10": 23, "pm25": 61, "SO2": 14, "NO2": 4, "CO": 5, "id": "10-c6-1f-1a-1f-47", "area": 1, "ts": 1596157444170}現(xiàn)在需要實(shí)時(shí)存儲(chǔ)以便在后續(xù)任意時(shí)間查看數(shù)據(jù),提出以下的需求:
每個(gè)設(shè)備按照每 5 秒鐘一次的頻率進(jìn)行數(shù)據(jù)上報(bào),數(shù)據(jù)庫需存儲(chǔ)每條數(shù)據(jù)以供后續(xù)回溯分析;
通過可視化系統(tǒng)查看任意區(qū)域、任意時(shí)間區(qū)間內(nèi)的指標(biāo)數(shù)據(jù),如平均值、最大值、最小值。
本文所用各個(gè)組件均有 Docker 鏡像,除 EMQ X 需要修改少數(shù)配置為了便于操作使用下載安裝外,TDengine 與 Grafana 均使用 Docker 搭建。
安裝包資源與使用教程參照各自官網(wǎng):
EMQ X:EMQ 官網(wǎng) https://www.emqx.io
TDengine:濤思數(shù)據(jù)官網(wǎng) https://www.taosdata.com/cn/
Grafana:Grafana 官網(wǎng) https://grafana.com/
? ?安裝 EMQ X
如果您是 EMQ X 新手用戶,推薦通過 EMQ X 文檔 (https://docs.emqx.io) 快速上手
訪問 EMQ 官網(wǎng) (https://www.emqx.io/downloads) 下載適合您操作系統(tǒng)的安裝包,本文截稿時(shí) EMQ X 開源版最新版本為 v4.1.2,下載 zip 包的啟動(dòng)步驟如下 :
## 解壓下載好的安裝包unzip emqx-macosx-v4.1.1.zipcd emqx## 以 console 模式啟動(dòng) EMQ X 方便調(diào)試./bin/emqx console啟動(dòng)成功后瀏覽器訪問 http://127.0.0.1:18083 訪問 EMQ X 管理控制臺(tái) Dashboard,使用 admin public 默認(rèn)用戶名密碼完成初次登錄。
? ?安裝 TDengine
為了方便測(cè)試使用通過 Docker 進(jìn)行安裝(需映射網(wǎng)絡(luò)端口),也可以使用安裝包的方式進(jìn)行安裝:
## 拉取并啟動(dòng)容器docker run -d --name tdengine -p 6030-6041:6030-6041 tdengine/tdengine:latest## 啟動(dòng)后檢查容器運(yùn)行狀態(tài)docker ps -a? ?Grafana 安裝
使用以下命令通過 Docker 安裝并啟動(dòng) Grafana:
docker run -d --name=grafana -p 3000:3000 grafana/grafana啟動(dòng)成功后瀏覽器訪問 http://127.0.0.1:3000 訪問 Grafana 可視化面板,使用 admin admin 默認(rèn)用戶名密碼完成初次登錄,登錄后按照提示修改密碼使用新密碼登錄進(jìn)入主界面:
配置 EMQ X 存儲(chǔ)數(shù)據(jù)到 TDengine? ?TDengine 創(chuàng)建數(shù)據(jù)庫與數(shù)據(jù)表
進(jìn)入TDengine Docker 容器:
docker exec -it tdengine bash創(chuàng)建 “test” 數(shù)據(jù)庫:
taoscreate database test;創(chuàng)建 sensor_data 表,關(guān)于 TDengine 數(shù)據(jù)結(jié)構(gòu)以及 SQL 命令參見 TAOS SQL (https://www.taosdata.com/cn/documentation20/taos-sql/#表管理) :
use test;CREATE TABLE sensor_data ( ts timestamp, temperature float, humidity float, volume float, PM10 float, pm25 float, SO2 float, NO2 float, CO float, sensor_id NCHAR(255), area TINYINT, coll_time timestamp);? ?配置 EMQ X 規(guī)則引擎
打開 EMQ X Dashboared,進(jìn)入 規(guī)則引擎 -> 規(guī)則 頁面,點(diǎn)擊 創(chuàng)建 按鈕進(jìn)入創(chuàng)建頁面。
規(guī)則 SQL
規(guī)則 SQL 用于 EMQ X 消息以及事件篩選,以下 SQL 表示從 sensor/data 主題篩選出 payload 數(shù)據(jù):
SELECT payloadFROM "sensor/data"使用SQL 測(cè)試功能,輸入測(cè)試數(shù)據(jù)進(jìn)行篩選結(jié)果測(cè)試,測(cè)試有結(jié)果且輸出內(nèi)容如下,標(biāo)明 SQL 編寫正確:
{ "payload": "{\"temperature\":30,\"humidity\":20,\"volume\":44.5,\"PM10\":23,\"pm2.5\":61,\"SO2\":14,\"NO2\":4,\"CO\":5,\"id\":\"10-c6-1f-1a-1f-47\",\"area\":1,\"ts\":1596157444170}"}響應(yīng)動(dòng)作
為支持各種不同類型平臺(tái)的開發(fā),TDengine 提供符合 REST 設(shè)計(jì)標(biāo)準(zhǔn)的 API。通過 RESTful Connector (https://www.taosdata.com/cn/documentation20/connector/#RESTful-Connector)?提供了最簡(jiǎn)單的連接方式,即使用 HTTP 請(qǐng)求攜帶認(rèn)證信息與要執(zhí)行的 SQL 操作 TDengine。
使用 EMQ X 開源版中的發(fā)送到 Web 服務(wù)即可通過 RESTful Connector 寫入數(shù)據(jù)到 TDengine。即將到來的 EMQ X 企業(yè)版 4.1.1 版本將提供原生更高性能的寫入 Connector。
發(fā)送到 Web 服務(wù)需要兩個(gè)數(shù)據(jù),一個(gè)是關(guān)聯(lián)資源,另一個(gè)是消息內(nèi)容模板。
關(guān)聯(lián)資源:HTTP 服務(wù)器配置信息,此處為 TDengine 的 RESTful Connector
消息內(nèi)容模板:此處為攜帶數(shù)據(jù)的 INSERT SQL,注意我們應(yīng)當(dāng)在 SQL 中指定數(shù)據(jù)庫名,字符類型也要用單引號(hào)括起來, 消息內(nèi)容模板為:
創(chuàng)建過程
點(diǎn)擊響應(yīng)動(dòng)作下的添加按鈕,在彈出框內(nèi)選擇 發(fā)送數(shù)據(jù)到 Web 服務(wù),點(diǎn)擊 新建資源 新建一個(gè) WebHook 資源。
資源類型選擇 Webhook,請(qǐng)求 URL 填寫 http://127.0.0.1:6041/rest/sql,請(qǐng)求方法選擇 POST,還需添加 Authorization 請(qǐng)求頭作為認(rèn)證信息。
Authorization 的值為 Basic + TDengine 的 {username}:{password} 經(jīng)過 Base64 編碼之后的字符串, 例如 root:taosdata 編碼后為 cm9vdDp0YW9zZGF0YQ==,實(shí)際填入的值為:Basic cm9vdDp0YW9zZGF0YQ==
在響應(yīng)動(dòng)作創(chuàng)建頁面選擇新建的資源,并填入消息模板內(nèi)容即可。
生成模擬數(shù)據(jù)以下腳本模擬了 10000 個(gè)設(shè)備在過去 24 小時(shí)內(nèi)、每隔 5 秒鐘上報(bào)一條模擬數(shù)據(jù)并發(fā)送到 EMQ X 的場(chǎng)景。
總數(shù)據(jù)量:24 * 3600 / 5 * 10000 = 1.72 億條
消息 TPS:2000
讀者安裝 Node.js ,按需修改配置參數(shù)后可以通過以下命令啟動(dòng):
npm install mqtt mockjs --save --registry=https://registry.npm.taobao.orgnode mock.js附:模擬生成數(shù)據(jù)并發(fā)送到 EMQ X 代碼,請(qǐng)根據(jù)集群性能調(diào)整相關(guān)參數(shù)
// mock.jsconst mqtt = require('mqtt')const Mock = require('mockjs')const EMQX_SERVER = 'mqtt://localhost:1883'const CLIENT_NUM = 10000const STEP = 5000 // 模擬采集時(shí)間間隔 msconst AWAIT = 5000 // 每次發(fā)送完后休眠時(shí)間,防止消息速率過快 msconst CLIENT_POOL = []startMock()function sleep(timer = 100) { return new Promise(resolve => { setTimeout(resolve, timer) })}async function startMock() { const now = Date.now() for (let i = 0; i < CLIENT_NUM; i++) { const client = await createClient(`mock_client_${i}`) CLIENT_POOL.push(client) } // last 24h every 5s const last = 24 * 3600 * 1000 for (let ts = now - last; ts <= now; ts += STEP) { for (const client of CLIENT_POOL) { const mockData = generateMockData() const data = { ...mockData, id: client.clientId, area: 0, ts, } client.publish('sensor/data', JSON.stringify(data)) } const dateStr = new Date(ts).toLocaleTimeString() console.log(`${dateStr} send success.`) await sleep(AWAIT) } console.log(`Done, use ${(Date.now() - now) / 1000}s`)}/** * Init a virtual mqtt client * @param {string} clientId ClientID */function createClient(clientId) { return new Promise((resolve, reject) => { const client = mqtt.connect(EMQX_SERVER, { clientId, }) client.on('connect', () => { console.log(`client ${clientId} connected`) resolve(client) }) client.on('reconnect', () => { console.log('reconnect') }) client.on('error', (e) => { console.error(e) reject(e) }) })}/*** Generate mock data*/function generateMockData() { return { "temperature": parseFloat(Mock.Random.float(22, 100).toFixed(2)), "humidity": parseFloat(Mock.Random.float(12, 86).toFixed(2)), "volume": parseFloat(Mock.Random.float(20, 200).toFixed(2)), "PM10": parseFloat(Mock.Random.float(0, 300).toFixed(2)), "pm25": parseFloat(Mock.Random.float(0, 300).toFixed(2)), "SO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)), "NO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)), "CO": parseFloat(Mock.Random.float(0, 50).toFixed(2)), "area": Mock.Random.integer(0, 20), "ts": 1596157444170, }}可視化配置組件安裝完成,模擬數(shù)據(jù)寫入成功后,按照 Grafana 可視化界面的操作指引,完成業(yè)務(wù)所需數(shù)據(jù)可視化配置。
? ?添加數(shù)據(jù)源(Add data source)
添加數(shù)據(jù)源,即顯示的數(shù)據(jù)源信息。選取 TDengine 類型數(shù)據(jù)源,輸入連接參數(shù)進(jìn)行配置,默認(rèn)情況下,關(guān)鍵配置信息如下:
? ?添加儀表盤(New Dashboard)
添加好數(shù)據(jù)源后,添加需要顯示的數(shù)據(jù)儀表盤信息。儀表盤為多個(gè)可視化面板的集合,點(diǎn)擊 New Dashboard 后,選擇 + Query 通過查詢來添加數(shù)據(jù)面板。
創(chuàng)建面板需要四個(gè)步驟,分別是 Queries(查詢)、Visualization(可視化)、General(圖表配置)、Alert(告警),創(chuàng)建時(shí)間
? ?平均值面板
使用 Grafana 的可視化查詢構(gòu)建工具,查詢出所有設(shè)備的平均值。
以下 SQL 按照指定時(shí)間段($form $to)、指定時(shí)間間隔($interval),查詢出數(shù)據(jù)中關(guān)鍵指標(biāo)的平均值:
select avg(temperature), avg(humidity), avg(volume), avg(PM10), avg(pm25), avg(SO2), avg(NO2), avg(CO) from test.sensor_data where coll_time >= $from and coll_time < $to interval($interval)Visualization 默認(rèn)不做更改,General 里面修改面板名稱為 歷史平均值,如果需要對(duì)業(yè)務(wù)進(jìn)行監(jiān)控告警,可以在 Alert 里編排告警規(guī)則,此處僅做可視化展示,不使用此功能。
完成創(chuàng)建后,點(diǎn)擊左上角返回按鈕,該 Dashboard 里成功添加一個(gè)數(shù)據(jù)面板。點(diǎn)擊頂部導(dǎo)航欄保存圖標(biāo),輸入 Dashboard 名稱完成 Dashboard 的創(chuàng)建。
? ?最大值、最小值面板
繼續(xù)點(diǎn)擊 Dashboard 的 Add panel 按鈕,添加最大值、最小值圖表。操作步驟同添加平均值,僅對(duì)查詢中 SELECT 統(tǒng)計(jì)方法字段做出調(diào)整,調(diào)整為 AVG 函數(shù)為 MAX 與 MIN:
select max(temperature), max(humidity), max(volume), max(PM10), max(pm25), max(SO2), max(NO2), max(CO), min(temperature), min(humidity), min(volume), min(PM10), min(pm25), min(SO2), min(NO2), min(CO) from test.sensor_data where coll_time >= $from and coll_time < $to interval($interval)? ?儀表盤效果
保存儀表盤,拖拽調(diào)整每個(gè)數(shù)據(jù)面板大小、位置,最終得到一個(gè)視覺效果較好的數(shù)據(jù)儀表盤。儀表盤右上角可以選擇時(shí)間區(qū)間、自動(dòng)刷新時(shí)間,此時(shí)設(shè)備持續(xù)發(fā)送數(shù)據(jù)采集數(shù)據(jù),儀表盤數(shù)據(jù)值會(huì)有所變動(dòng),實(shí)現(xiàn)了比較好的可視化效果。
總結(jié)至此我們借助 EMQ X + TDengine 完成了物聯(lián)網(wǎng)數(shù)據(jù)傳輸、存儲(chǔ)、展現(xiàn)整個(gè)流程的系統(tǒng)搭建,讀者可以了解到 EMQ X 豐富的拓展能力與 TDengine 完備的大數(shù)據(jù)平臺(tái)特性在物聯(lián)網(wǎng)數(shù)據(jù)采集中的應(yīng)用。深入學(xué)習(xí)掌握 Grafana 的其他功能后,用戶可以定制出更完善的數(shù)據(jù)可視化乃至監(jiān)控告警系統(tǒng)。
?點(diǎn)擊"閱讀原文" ,了解更多
↓↓↓
總結(jié)
以上是生活随笔為你收集整理的emqx 使用端口_数据传输、存储、展现,EMQ X + TDengine 搭建 MQTT 物联网数据可视化平台...的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 过滤敏感词
- 下一篇: c++ 遍历所有点且距离最短_编程小白暑