2021年大数据Spark(五十二):Structured Streaming 事件时间窗口分析
目錄
事件時間窗口分析
時間概念
???????event-time
???????延遲數據處理
???????延遲數據
???????Watermarking 水位
???????官方案例演示
事件時間窗口分析
在SparkStreaming中窗口統計分析:Window Operation(設置窗口大小WindowInterval和滑動大小SlideInterval),按照Streaming 流式應用接收數據的時間進行窗口設計的,其實是不符合實際應用場景的。
例如,在物聯網數據平臺中,每個設備產生的數據,其中包含數據產生的時間,然而數據需要經過一系列采集傳輸才能被流式計算框架處理:SparkStreaming,此過程需要時間的,再按照處理時間來統計業務的時候,準確性降低,存在不合理性。
在結構化流Structured Streaming中窗口數據統計時間是基于數據本身事件時間EventTime字段統計,更加合理性,官方文檔:
http://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html#window-operations-on-event-time
?
???????時間概念
在Streaming流式數據處理中,按照時間處理數據,其中時間有三種概念:
1)、事件時間EventTime,表示數據本身產生的時間,該字段在數據本身中;
2)、注入時間IngestionTime,表示數據到達流式系統時間,簡而言之就是流式處理系統接收到數據的時間;
3)、處理時間ProcessingTime,表示數據被流式系統真正開始計算操作的時間。
?
不同流式計算框架支持時間不一樣,
SparkStreaming框架僅僅支持處理時間ProcessTime,
StructuredStreaming支持事件時間和處理時間,
Flink框架支持三種時間數據操作,
實際項目中往往針對【事件時間EventTime】進行數據處理操作,更加合理化。
?
???????event-time
基于事件時間窗口聚合操作:基于窗口的聚合(例如每分鐘事件數)只是事件時間列上特殊類型的分組和聚合,其中每個時間窗口都是一個組,并且每一行可以屬于多個窗口/組。
事件時間EventTime是嵌入到數據本身中的時間,數據實際真實產生的時間。例如,如果希望獲得每分鐘由物聯網設備生成的事件數,那么可能希望使用生成數據的時間(即數據中的事件時間event time),而不是Spark接收數據的時間(receive time/archive time)。
這個事件時間很自然地用這個模型表示,設備中的每個事件(Event)都是表中的一行(Row),而事件時間(Event Time)是行中的一列值(Column Value)。
因此,這種基于事件時間窗口的聚合查詢既可以在靜態數據集(例如,從收集的設備事件日志中)上定義,也可以在數據流上定義,從而使用戶的使用更加容易。
修改詞頻統計程序,數據流包含每行數據以及生成每行行的時間。希望在10分鐘的窗口內對單詞進行計數,每5分鐘更新一次,如下圖所示:
?
單詞在10分鐘窗口【12:00-12:10、12:05-12:15、12:10-12:20】等之間接收的單詞中計數。注意,【12:00-12:10】表示處理數據的事件時間為12:00之后但12:10之前的數據。思考一下,12:07的一條數據,應該增加對應于兩個窗口12:00-12:10和12:05-12:15的計數。
基于事件時間窗口統計有兩個參數索引:分組鍵(如單詞)和窗口(事件時間字段)。
?
?
- event-time 窗口生成
Structured Streaming中如何依據EventTime事件時間生成窗口的呢?查看類TimeWindowing源碼中生成窗口規則:
org.apache.spark.sql.catalyst.analysis.TimeWindowing// 窗口個數/* 最大的窗口數 = 向上取整(窗口長度/滑動步長) */maxNumOverlapping <- ceil(windowDuration / slideDuration)for (i <- 0 until maxNumOverlapping)/**timestamp是event-time 傳進的時間戳startTime是window窗口參數,默認是0 second 從時間的0s含義:event-time從1970年...有多少個滑動步長,如果說浮點數會向上取整*/windowId <- ceil((timestamp - startTime) / slideDuration)/**windowId * slideDuration ?向上取能整除滑動步長的時間(i - maxNumOverlapping) * slideDuration 每一個窗口開始時間相差一個步長*/windowStart <- windowId * slideDuration + (i - maxNumOverlapping) * slideDuration + startTimewindowEnd <- windowStart + windowDurationreturn windowStart, windowEnd
?
將【(event-time向上取?能整除?滑動步長的時間)?-?(最大窗口數×滑動步長)】作為"初始窗口"的開始時間,然后按照窗口滑動寬度逐漸向時間軸前方推進,直到某個窗口不再包含該event-time 為止,最終以"初始窗口"與"結束窗口"之間的若干個窗口作為最終生成的 event-time 的時間窗口。
?
每個窗口的起始時間start與結束時間end都是前閉后開(左閉右開)的區間,因此初始窗口和結束窗口都不會包含 event-time,最終不會被使用。假設數據為【2019-08-14 10:50:00, dog】,按照上述規則計算窗口示意圖如下:
?
得到窗口如下:
?
???????延遲數據處理
Structed Streaming與Spark Streaming相比一大特性就是支持基于數據中的時間戳的數據處理。也就是在處理數據時,可以對記錄中的eventTime事件時間字段進行考慮。因為eventTime更好的代表數據本身的信息,且可以借助eventTime處理比預期晚到達的數據,但是需要有一個限度(閾值),不能一直等,應該要設定最多等多久。
???????延遲數據
在很多流計算系統中,數據延遲到達(the events arrives late to the application)的情況很常見,并且很多時候是不可控的,因為很多時候是外圍系統自身問題造成的。Structured Streaming可以保證一條舊的數據進入到流上時,依然可以基于這些“遲到”的數據重新計算并更新計算結果。
?
????上圖中在12:04(即事件時間)生成的單詞可能在12:11被應用程序接收,此時,應用程序應使用時間12:04而不是12:11更新窗口12:00-12:10的舊計數。但是會出現如下兩個問題:
?問題一:延遲數據計算是否有價值
- 如果某些數據,延遲很長時間(如30分鐘)才到達流式處理系統,數據還需要再次計算嗎?計算的結果還有價值嗎?原因在于流式處理系統處理數據關鍵核心在于實時性;
- 實踐表明,流計算關注的是近期數據,更新一個很早之前的狀態往往已經不再具有很大的業務價值;
?問題二:以前狀態保存浪費資源
- 實時統計來說,如果保存很久以前的數據狀態,很多時候沒有作用的,反而浪費大量資源;
- Spark 2.1引入的watermarking允許用戶指定延遲數據的閾值,也允許引擎清除掉舊的狀態。即根據watermark機制來設置和判斷消息的有效性,如可以獲取消息本身的時間戳,然后根據該時間戳來判斷消息的到達是否延遲(亂序)以及延遲的時間是否在容忍的范圍內(延遲的數據是否處理)。
?
???????Watermarking 水位
水位watermarking官方定義:
lets the engine automatically track the current event time in the data and attempt to clean up old state accordingly.
通過指定event-time列(上一批次數據中EventTime最大值)和預估事件的延遲時間上限(Threshold)來定義一個查詢的水位線watermark。翻譯:讓Spark SQL引擎自動追蹤數據中當前事件時間EventTime,依據規則清除舊的狀態數據。
Watermark = MaxEventTime - Threshod
1:執行第一批次數據時,Watermarker為0,所以此批次中所有數據都參與計算;
2:Watermarker值只能逐漸增加,不能減少;
3:Watermark機制主要解決處理聚合延遲數據和減少內存中維護的聚合狀態;
4:設置Watermark以后,輸出模式OutputMode只能是Append和Update;
如下方式設置閾值Threshold,計算每批次數據執行時的水位Watermark:
?
看一下官方案例:詞頻統計WordCount,設置閾值Threshold為10分鐘,每5分鐘觸發執行一次。
?
- 延遲到達但沒有超過watermark:(12:08, dog)
在12:20觸發執行窗口(12:10-12:20)數據中,(12:08, dog) 數據是延遲數據,閾值Threshold設定為10分鐘,此時水位線【Watermark = 12:14 - 10m = 12:04】,因為12:14是上個窗口(12:05-12:15)中接收到的最大的事件時間,代表目標系統最后時刻的狀態,由于12:08在12:04之后,因此被視為“雖然遲到但尚且可以接收”的數據而被更新到了結果表中,也就是(12:00 - 12:10, dog, 2)和(12:05 - 12:11, dog, 3)。
?
- ?超出watermark:(12:04, donkey)
在12:25觸發執行窗口(12:15-12:25)數據中,(12:04, donkey)數據是延遲數據,上個窗口中接收到最大的事件時間為12:21,此時水位線【Watermark = 12:21 - 10m = 12:11】,而(12:04, ?donkey)比這個值還要早,說明它”太舊了”,所以不會被更新到結果表中了。
?
???設置水位線Watermark以后,不同輸出模式OutputMode,結果輸出不一樣:
- ?Update模式:總是傾向于“盡可能早”的將處理結果更新到sink,當出現遲到數據時,早期的某個計算結果將會被更新;
- ?Append模式:推遲計算結果的輸出到一個相對較晚的時刻,確保結果是穩定的,不會再被更新,比如:12:00 - 12:10窗口的處理結果會等到watermark更新到12:11之后才會寫入到sink。
如果用于接收處理結果的sink不支持更新操作,則只能選擇Append模式。
?
???????官方案例演示
編寫代碼,演示官方案例,如下幾點注意:
1、該outputMode為update模式,即只會輸出那些有更新的數據!!
2、官網案例該開窗窗口長度為10 min,步長5 min,水印為eventtime-10 min,但是測試的時候用秒
3、官網案例trigger(Trigger.ProcessingTime("5 minutes")),但是測試的時候用秒
測試數據:
2019-10-10 12:00:07,dog2019-10-10 12:00:08,owl2019-10-10 12:00:14,dog2019-10-10 12:00:09,cat2019-10-10 12:00:15,cat2019-10-10 12:00:08,dog2019-10-10 12:00:13,owl2019-10-10 12:00:21,owl2019-10-10 12:00:04,donkey ?--丟失2019-10-10 12:00:17,owl ????--不丟失
具體案例代碼如下:
package cn.itcast.structedstreamingimport java.sql.Timestampimport org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}/*** 基于Structured Streaming 讀取TCP Socket讀取數據,事件時間窗口統計詞頻,將結果打印到控制臺* 每5秒鐘統計最近10秒內的數據(詞頻:WordCount),設置水位Watermark時間為10秒* 2019-10-10 12:00:07,dog* 2019-10-10 12:00:08,owl** 2019-10-10 12:00:14,dog* 2019-10-10 12:00:09,cat** 2019-10-10 12:00:15,cat* 2019-10-10 12:00:08,dog* 2019-10-10 12:00:13,owl* 2019-10-10 12:00:21,owl** 2019-10-10 12:00:04,donkey ?--丟失* 2019-10-10 12:00:17,owl ????--不丟失*/
object StructuredWindow {def main(args: Array[String]): Unit = {// 1. 構建SparkSession實例對象,傳遞sparkConf參數val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]").config("spark.sql.shuffle.partitions", "3").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import org.apache.spark.sql.functions._import spark.implicits._// 2. 使用SparkSession從TCP Socket讀取流式數據val inputStreamDF: DataFrame = spark.readStream.format("socket").option("host", "node1").option("port", 9999).load()// 3. 針對獲取流式DStream進行詞頻統計val resultStreamDF = inputStreamDF.as[String].filter(StringUtils.isNotBlank(_))// 將每行數據進行分割單詞: 2019-10-12 09:00:02,cat dog.flatMap(line => {val arr = line.trim.split(",")val timestampStr: String = arr(0)val wordsStr: String = arr(1)wordsStr.split("\\s+")//(時間戳,單詞).map((Timestamp.valueOf(timestampStr), _))})// 設置列的名稱.toDF("timestamp", "word")// TODO:設置水位Watermark.withWatermark("timestamp", "10 seconds")// TODO:設置基于事件時間(event time)窗口?-> time, 每5秒統計最近10秒內數據.groupBy(window($"timestamp", "10 seconds", "5 seconds"),$"word").count()// 按照窗口字段降序排序//.orderBy($"window")/*root|-- window: struct (nullable = true)| ???|-- start: timestamp (nullable = true)| ???|-- end: timestamp (nullable = true)|-- word: string (nullable = true)|-- count: long (nullable = false)*///resultStreamDF.printSchema()// 4. 將計算的結果輸出,打印到控制臺val query: StreamingQuery = resultStreamDF.writeStream.outputMode(OutputMode.Update()).format("console").option("numRows", "100").option("truncate", "false").trigger(Trigger.ProcessingTime("5 seconds")).start()query.awaitTermination()query.stop()}
}
?
總結
以上是生活随笔為你收集整理的2021年大数据Spark(五十二):Structured Streaming 事件时间窗口分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Spark(五十一):S
- 下一篇: 2021年大数据Spark(五十三):S