【实践案例】Databricks 数据洞察在美的暖通与楼宇的应用实践
簡(jiǎn)介: 獲取更詳細(xì)的 Databricks 數(shù)據(jù)洞察相關(guān)信息,可至產(chǎn)品詳情頁(yè)查看:https://www.aliyun.com/product/bigdata/spark
作者
美的暖通與樓宇事業(yè)部 先行研究中心智能技術(shù)部
?
美的暖通 IoT 數(shù)據(jù)平臺(tái)建設(shè)背景
美的暖通與樓宇事業(yè)部(以下簡(jiǎn)稱美的暖通)是美的集團(tuán)旗下五大板塊之一,產(chǎn)品覆蓋多聯(lián)機(jī)組、大型冷水機(jī)組、單元機(jī)、機(jī)房空調(diào)、扶梯、直梯、貨梯以及樓宇自控軟件和建筑弱電集成解決方案,遠(yuǎn)銷海內(nèi)外200多個(gè)國(guó)家。當(dāng)前事業(yè)部設(shè)備數(shù)據(jù)上云僅停留在數(shù)據(jù)存儲(chǔ)層面,缺乏挖掘數(shù)據(jù)價(jià)值的平臺(tái),造成大量數(shù)據(jù)荒廢,并且不斷消耗存儲(chǔ)資源,增加存儲(chǔ)費(fèi)用和維護(hù)成本。另一方面,現(xiàn)有數(shù)據(jù)驅(qū)動(dòng)應(yīng)用缺乏部署平臺(tái),難以產(chǎn)生實(shí)際價(jià)值。因此,急需統(tǒng)一通用的 IoT 數(shù)據(jù)平臺(tái),以支持設(shè)備運(yùn)行數(shù)據(jù)的快速分析和建模。
?
我們的 IoT 數(shù)據(jù)平臺(tái)建設(shè)基于阿里云 Databricks 數(shù)據(jù)洞察全托管 Spark 產(chǎn)品,以下是整體業(yè)務(wù)架構(gòu)圖。在本文后面的章節(jié),我們將就IoT數(shù)據(jù)平臺(tái)建設(shè)技術(shù)選型上的一些思考,以及 Spark 技術(shù)棧尤其是 Delta Lake 場(chǎng)景的應(yīng)用實(shí)踐做一下分享。
?
?
選擇 Spark & Delta Lake
在數(shù)據(jù)平臺(tái)計(jì)算引擎層技術(shù)選型上,由于我們數(shù)據(jù)團(tuán)隊(duì)剛剛成立,前期的架構(gòu)選型我們做了很多的調(diào)研,綜合各個(gè)方面考慮,希望選擇一個(gè)成熟且統(tǒng)一的平臺(tái):既能夠支持?jǐn)?shù)據(jù)處理、數(shù)據(jù)分析場(chǎng)景,也能夠很好地支撐數(shù)據(jù)科學(xué)場(chǎng)景。加上團(tuán)隊(duì)成員對(duì) Python 及 Spark 的經(jīng)驗(yàn)豐富,所以,從一開(kāi)始就將目標(biāo)鎖定到了 Spark 技術(shù)棧。
?
選擇 Databricks 數(shù)據(jù)洞察 Delta Lake
通過(guò)與阿里云計(jì)算平臺(tái)團(tuán)隊(duì)進(jìn)行多方面的技術(shù)交流以及實(shí)際的概念驗(yàn)證,我們最終選擇了阿里云 Databricks 數(shù)據(jù)洞察產(chǎn)品。作為 Spark 引擎的母公司,其商業(yè)版 Spark 引擎,全托管 Spark 技術(shù)棧,統(tǒng)一的數(shù)據(jù)工程和數(shù)據(jù)科學(xué)等,都是我們決定選擇 Databricks 數(shù)據(jù)洞察的重要原因。
?
具體來(lái)看,Databricks 數(shù)據(jù)洞察提供的核心優(yōu)勢(shì)如下:
- Saas 全托管 Spark:免運(yùn)維,無(wú)需關(guān)注底層資源情況,降低運(yùn)維成本,聚焦分析業(yè)務(wù)
- 完整 Spark 技術(shù)棧集成:一站式集成 Spark 引擎和 Delta Lake 數(shù)據(jù)湖,100%兼容開(kāi)源 Spark 社區(qū)版;Databricks 做商業(yè)支持,最快體驗(yàn) Spark 最新版本特性
- 總成本降低:商業(yè)版本 Spark 及 Delta Lake 性能優(yōu)勢(shì)顯著;同時(shí)基于計(jì)算存儲(chǔ)分離架構(gòu),存儲(chǔ)依托阿里云 OSS 對(duì)象存儲(chǔ),借助阿里云 JindoFS 緩存層加速;能夠有效降低集群總體使用成本
- 高品質(zhì)支持以及SLA保障:阿里云和 Databricks 提供覆蓋 Spark 全棧的技術(shù)支持;提供商業(yè)化 SLA 保障與7*24小時(shí) Databricks 專家支持服務(wù)
?
IoT 數(shù)據(jù)平臺(tái)整體架構(gòu)
整體的架構(gòu)如上圖所示。
?
我們接入的 IoT 數(shù)據(jù)分為兩部分,歷史存量數(shù)據(jù)和實(shí)時(shí)數(shù)據(jù)。目前,歷史存量數(shù)據(jù)是通過(guò) Spark SQL 以天為單位從不同客戶關(guān)系數(shù)據(jù)庫(kù)批量導(dǎo)入 Delta Lake 表中;實(shí)時(shí)數(shù)據(jù)通過(guò) IoT 平臺(tái)采集到云 Kafka ,經(jīng)由 Spark Structured Streaming 消費(fèi)后實(shí)時(shí)寫入到 Delta Lake 表中。在這個(gè)過(guò)程中,我們將實(shí)時(shí)數(shù)據(jù)和歷史數(shù)據(jù)都 sink 到同一張 Delta 表里,這種批流一體操作可大大簡(jiǎn)化我們的 ETL 流程(參考后面的案例部分)。數(shù)據(jù)管道下游,我們對(duì)接數(shù)據(jù)分析及數(shù)據(jù)科學(xué)工作流。
?
IoT 數(shù)據(jù)采集:從 Little Data 到 Big Data
作為 IoT 場(chǎng)景的典型應(yīng)用,美的暖通最核心的數(shù)據(jù)均來(lái)自 IoT 終端設(shè)備。在整個(gè) IoT 環(huán)境下,分布著無(wú)數(shù)個(gè)終端傳感器。從小的維度看,傳感器產(chǎn)生的數(shù)據(jù)本身屬于 Small Data(或者稱為 Little Data)。當(dāng)把所有傳感器連接成一個(gè)大的 IoT 網(wǎng)絡(luò),產(chǎn)生自不同傳感器的數(shù)據(jù)經(jīng)由 Gateway 與云端相連接,并最終在云端形成 Big Data 。
?
在我們的場(chǎng)景下,IoT 平臺(tái)本身會(huì)對(duì)不同協(xié)議的數(shù)據(jù)進(jìn)行初步解析,通過(guò)定制的硬件網(wǎng)絡(luò)設(shè)備將解析后的半結(jié)構(gòu)化 JSON 數(shù)據(jù)經(jīng)由網(wǎng)絡(luò)發(fā)送到云 Kafka。云 Kafka 扮演了整個(gè)數(shù)據(jù)管道的入口。
?
數(shù)據(jù)入湖:Delta Lake
IoT 場(chǎng)景下的數(shù)據(jù)有如下幾個(gè)特點(diǎn):
- 時(shí)序數(shù)據(jù):傳感器產(chǎn)生的數(shù)據(jù)記錄中包含時(shí)間相關(guān)的信息,數(shù)據(jù)本身具有時(shí)間屬性,因此不同的數(shù)據(jù)之間可能存在一定的相關(guān)性。利用 as-of-join 將不同時(shí)間序列數(shù)據(jù) join 到一起是下游數(shù)據(jù)預(yù)測(cè)分析的基礎(chǔ)
- 數(shù)據(jù)的實(shí)時(shí)性:傳感器實(shí)時(shí)生成數(shù)據(jù)并以最低延遲的方式傳輸?shù)綌?shù)據(jù)管道,觸發(fā)規(guī)則引擎,生成告警和事件,通知相關(guān)工作人員。
- 數(shù)據(jù)體量巨大:IoT 網(wǎng)絡(luò)環(huán)境下遍布各地的成千上萬(wàn)臺(tái)設(shè)備及其傳感器再通過(guò)接入服務(wù)將海量的數(shù)據(jù)歸集到平臺(tái)
- 數(shù)據(jù)協(xié)議多樣:通常在 IoT 平臺(tái)接入的不同種類設(shè)備中,上傳數(shù)據(jù)協(xié)議種類多樣,數(shù)據(jù)編碼格式不統(tǒng)一
?
IoT 數(shù)據(jù)上述特點(diǎn)給數(shù)據(jù)處理、數(shù)據(jù)分析及數(shù)據(jù)科學(xué)等帶來(lái)了諸多挑戰(zhàn),慶幸的是,這些挑戰(zhàn)借助 Spark 和 Delta Lake 都可以很好地應(yīng)對(duì)。Delta Lake 提供了 ACID 事務(wù)保證,支持增量更新數(shù)據(jù)表以及流批同時(shí)寫數(shù)據(jù)。借助 Spark Structed Streaming 可以實(shí)現(xiàn) IoT 時(shí)序數(shù)據(jù)實(shí)時(shí)入湖。
?
以下是 Delta Lake 經(jīng)典的三級(jí)數(shù)據(jù)表架構(gòu)。具體到美的暖通 IoT 數(shù)據(jù)場(chǎng)景,我們針對(duì)每一層級(jí)的數(shù)據(jù)表分別做了如下定義:
?
?
- Bronze 表:存儲(chǔ)原生數(shù)據(jù)(Raw Data),數(shù)據(jù)經(jīng)由 Spark Structed Streaming 從 Kafka 消費(fèi)下來(lái)后 upsert 進(jìn) Delta Lake 表,該表作為唯一的真實(shí)數(shù)據(jù)表 ?(Single Source of Truth)
- Silver表:該表是在對(duì) Bronze 表的數(shù)據(jù)進(jìn)行加工處理的基礎(chǔ)上生成的中間表,在美的暖通的場(chǎng)景下,數(shù)據(jù)加工處理的步驟涉及到一些復(fù)雜的時(shí)序數(shù)據(jù)計(jì)算邏輯,這些邏輯都包裝在了 Pandas UDF 里提供給 Spark 計(jì)算使用
- Gold 表:Silver 表的數(shù)據(jù)施加 Schema 約束并做進(jìn)一步清洗后的數(shù)據(jù)匯入 Gold 表,該表提供給下游的 Ad Hoc 查詢分析及數(shù)據(jù)科學(xué)使用
?
數(shù)據(jù)分析:Ad-Hoc 查詢
我們內(nèi)部在開(kāi)源 Superset 基礎(chǔ)上定制了內(nèi)部版本的 SQL 查詢與數(shù)據(jù)可視化平臺(tái),通過(guò) PyHive 連接到 Databricks 數(shù)據(jù)洞察 Spark Thrift Server 服務(wù),可以將 SQL 提交到集群上。商業(yè)版本的 thrift server 在可用性及性能方面都做了增強(qiáng),Databricks 數(shù)據(jù)洞察針對(duì) JDBC 連接安全認(rèn)證提供了基于 LDAP 的用戶認(rèn)證實(shí)現(xiàn)。借助 Superset ,數(shù)據(jù)分析師及數(shù)據(jù)科學(xué)家可以快速高效的對(duì) Delta Lake 表進(jìn)行數(shù)據(jù)探索。
?
數(shù)據(jù)科學(xué):Workspace
樓宇能耗預(yù)測(cè)與設(shè)備故障診斷預(yù)測(cè)是美的暖通 IoT 大數(shù)據(jù)平臺(tái)建設(shè)的兩個(gè)主要業(yè)務(wù)目標(biāo)。在 IoT 數(shù)據(jù)管道下游,需要對(duì)接機(jī)器學(xué)習(xí)平臺(tái)。現(xiàn)階段為了更快速方便地支撐起數(shù)據(jù)科學(xué)場(chǎng)景,我們將 Databricks 數(shù)據(jù)洞察集群與阿里云數(shù)據(jù)開(kāi)發(fā)平臺(tái) DDC 打通。DDC 集成了在數(shù)據(jù)科學(xué)場(chǎng)景下更友好的 Jupyter Notebook ,通過(guò)在 Jupyter 上使用 PySpark ,可以將作業(yè)跑到 Databricks 數(shù)據(jù)洞察集群上;同時(shí),也可以借助 Apache Airflow 對(duì)作業(yè)進(jìn)行調(diào)度。同時(shí),考慮到機(jī)器學(xué)習(xí)模型構(gòu)建、迭代訓(xùn)練、指標(biāo)檢測(cè)、部署等基本環(huán)節(jié),我們也在探索 MLOps ,目前這部分工作還在籌備中。
?
典型應(yīng)用場(chǎng)景介紹
Delta Lake 數(shù)據(jù)入湖(批流一體)
?
使用 UDF 函數(shù)定義流數(shù)據(jù)寫入 Delta Lake 的 Merge 規(guī)則
%spark import org.apache.spark.sql._ import io.delta.tables._// Function to upsert `microBatchOutputDF` into Delta table using MERGE def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {// Set the dataframe to view namemicroBatchOutputDF.createOrReplaceTempView("updates")// Use the view name to apply MERGE// NOTE: You have to use the SparkSession that has been used to define the `updates` dataframemicroBatchOutputDF.sparkSession.sql(s"""MERGE INTO delta_{table_name} tUSING updates sON s.uuid = t.uuidWHEN MATCHED THEN UPDATE SET t.device_id = s.device_id,t.indoor_temperature = s.indoor_temperature,t.ouoor_temperature = s.ouoor_temperature,t.chiller_temperature = s.chiller_temperature,t.electricity = s.electricity,t.protocal_version = s.protocal_version,t.dt=s.dt,t.update_time=current_timestamp()WHEN NOT MATCHED THEN INSERT (t.uuid,t.device_id,t.indoor_temperature,t.ouoor_temperature ,t.chiller_temperature ,t.electricity,t.protocal_version,t.dt,t.create_time,t.update_time)values (s.uuid,s.device_id,s.indoor_temperature,s.ouoor_temperature,s.chiller_temperature,s.electricity,s.protocal_version ,s.dt,current_timestamp(),current_timestamp())""") }?
使用 Spark Structured Streaming 實(shí)時(shí)流寫入 Delta Lake
?
%sparkimport org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming.Triggerdef getquery(checkpoint_dir:String,tableName:String,servers:String,topic:String ) {var streamingInputDF = spark.readStream.format("kafka").option("kafka.bootstrap.servers", servers).option("subscribe", topic) .option("startingOffsets", "latest") .option("minPartitions", "10") .option("failOnDataLoss", "true").load() val resDF=streamingInputDF.select(col("value").cast("string")).withColumn("newMessage",split(col("value"), " ")).filter(col("newMessage").getItem(7).isNotNull).select(col("newMessage").getItem(0).as("uuid"),col("newMessage").getItem(1).as("device_id"),col("newMessage").getItem(2).as("indoor_temperature"),col("newMessage").getItem(3).as("ouoor_temperature"),col("newMessage").getItem(4).as("chiller_temperature"),col("newMessage").getItem(5).as("electricity"),col("newMessage").getItem(6).as("protocal_version")).withColumn("dt",date_format(current_date(),"yyyyMMdd")) val query = resDF.writeStream.format("delta").option("checkpointLocation", checkpoint_dir).trigger(Trigger.ProcessingTime("60 seconds")) // 執(zhí)行流處理時(shí)間間隔.foreachBatch(upsertToDelta _) //引用upsertToDelta函數(shù).outputMode("update")query.start() }?
數(shù)據(jù)災(zāi)備:Deep Clone
由于 Delta Lake 的數(shù)據(jù)僅接入實(shí)時(shí)數(shù)據(jù),對(duì)于存量歷史數(shù)據(jù)我們是通過(guò) SparkSQL 一次性 Sink Delta Lake 的表中,這樣我們流和批處理時(shí)只維護(hù)一張 Delta 表,所以我們只在最初對(duì)這兩部分?jǐn)?shù)據(jù)做一次 Merge 操作。同時(shí)為了保證數(shù)據(jù)的高安全,我們使用 Databricks Deep Clone 來(lái)做數(shù)據(jù)災(zāi)備,每天會(huì)定時(shí)更新來(lái)維護(hù)一張從表以備用。對(duì)于每日新增的數(shù)據(jù),使用 Deep Clone 同樣只會(huì)對(duì)新數(shù)據(jù) Insert 對(duì)需要更新的數(shù)據(jù) Update 操作,這樣可以大大提高執(zhí)行效率。
?
CREATE OR REPLACE TABLE delta.delta_{table_name}_cloneDEEP CLONE delta.delta_{table_name};?
性能優(yōu)化:OPTIMIZE & Z-Ordering
在流處理場(chǎng)景下會(huì)產(chǎn)生大量的小文件,大量小文件的存在會(huì)嚴(yán)重影響數(shù)據(jù)系統(tǒng)的讀性能。Delta Lake 提供了 OPTIMIZE 命令,可以將小文件進(jìn)行合并壓縮,另外,針對(duì) Ad-Hoc 查詢場(chǎng)景,由于涉及對(duì)單表多個(gè)維度數(shù)據(jù)的查詢,我們借助 Delta Lake 提供的 Z-Ordering 機(jī)制,可以有效提升查詢的性能。從而極大提升讀取表的性能。DeltaLake 本身提供了 Auto Optimize 選項(xiàng),但是會(huì)犧牲少量寫性能,增加數(shù)據(jù)寫入 delta 表的延遲。相反,執(zhí)行 OPTIMIZE 命令并不會(huì)影響寫的性能,因?yàn)?Delta Lake 本身支持 MVCC,支持 OPTIMIZE 的同時(shí)并發(fā)執(zhí)行寫操作。因此,我們采用定期觸發(fā)執(zhí)行 OPTIMIZE 的方案,每小時(shí)通過(guò) OPTIMIZE 做一次合并小文件操作,同時(shí)執(zhí)行 VACCUM 來(lái)清理過(guò)期數(shù)據(jù)文件:
OPTIMIZE delta.delta_{table_name} ZORDER by device_id, indoor_temperature; set spark.databricks.delta.retentionDurationCheck.enabled = false; VACUUM delta.delta_{table_name} RETAIN 1 HOURS;?
另外,針對(duì) Ad-Hoc 查詢場(chǎng)景,由于涉及對(duì)單表多個(gè)維度數(shù)據(jù)的查詢,我們借助 Delta Lake 提供的 Z-Ordering 機(jī)制,可以有效提升查詢的性能。
?
總結(jié)與展望
我們基于阿里云 Databricks 數(shù)據(jù)洞察產(chǎn)品提供的商業(yè)版 Spark 及 Delta Lake 技術(shù)棧快速構(gòu)建了 IoT 數(shù)據(jù)處理平臺(tái),Databricks 數(shù)據(jù)洞察全托管免運(yùn)維、商業(yè)版本引擎性能上的優(yōu)勢(shì)以及計(jì)算/存儲(chǔ)分離的架構(gòu),為我們節(jié)省了總體成本。同時(shí),Databricks 數(shù)據(jù)洞察產(chǎn)品自身提供的豐富特性,也極大提升了我們數(shù)據(jù)團(tuán)隊(duì)的生產(chǎn)力,為數(shù)據(jù)分析業(yè)務(wù)的快速開(kāi)展交付奠定了基礎(chǔ)。未來(lái),美的暖通希望與阿里云 Databricks 數(shù)據(jù)洞察團(tuán)隊(duì)針對(duì) IoT 場(chǎng)景輸出更多行業(yè)先進(jìn)解決方案。
原文鏈接
本文為阿里云原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
以上是生活随笔為你收集整理的【实践案例】Databricks 数据洞察在美的暖通与楼宇的应用实践的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 全球边缘计算大会:阿里云资深技术专家李克
- 下一篇: 友盟+《小程序用户增长白皮书》:从五个角