2021年大数据Spark(五十二):Structured Streaming 事件时间窗口分析
目錄
事件時(shí)間窗口分析
時(shí)間概念
???????event-time
???????延遲數(shù)據(jù)處理
???????延遲數(shù)據(jù)
???????Watermarking 水位
???????官方案例演示
事件時(shí)間窗口分析
在SparkStreaming中窗口統(tǒng)計(jì)分析:Window Operation(設(shè)置窗口大小WindowInterval和滑動(dòng)大小SlideInterval),按照Streaming 流式應(yīng)用接收數(shù)據(jù)的時(shí)間進(jìn)行窗口設(shè)計(jì)的,其實(shí)是不符合實(shí)際應(yīng)用場(chǎng)景的。
例如,在物聯(lián)網(wǎng)數(shù)據(jù)平臺(tái)中,每個(gè)設(shè)備產(chǎn)生的數(shù)據(jù),其中包含數(shù)據(jù)產(chǎn)生的時(shí)間,然而數(shù)據(jù)需要經(jīng)過(guò)一系列采集傳輸才能被流式計(jì)算框架處理:SparkStreaming,此過(guò)程需要時(shí)間的,再按照處理時(shí)間來(lái)統(tǒng)計(jì)業(yè)務(wù)的時(shí)候,準(zhǔn)確性降低,存在不合理性。
在結(jié)構(gòu)化流Structured Streaming中窗口數(shù)據(jù)統(tǒng)計(jì)時(shí)間是基于數(shù)據(jù)本身事件時(shí)間EventTime字段統(tǒng)計(jì),更加合理性,官方文檔:
http://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html#window-operations-on-event-time
?
???????時(shí)間概念
在Streaming流式數(shù)據(jù)處理中,按照時(shí)間處理數(shù)據(jù),其中時(shí)間有三種概念:
1)、事件時(shí)間EventTime,表示數(shù)據(jù)本身產(chǎn)生的時(shí)間,該字段在數(shù)據(jù)本身中;
2)、注入時(shí)間IngestionTime,表示數(shù)據(jù)到達(dá)流式系統(tǒng)時(shí)間,簡(jiǎn)而言之就是流式處理系統(tǒng)接收到數(shù)據(jù)的時(shí)間;
3)、處理時(shí)間ProcessingTime,表示數(shù)據(jù)被流式系統(tǒng)真正開(kāi)始計(jì)算操作的時(shí)間。
?
不同流式計(jì)算框架支持時(shí)間不一樣,
SparkStreaming框架僅僅支持處理時(shí)間ProcessTime,
StructuredStreaming支持事件時(shí)間和處理時(shí)間,
Flink框架支持三種時(shí)間數(shù)據(jù)操作,
實(shí)際項(xiàng)目中往往針對(duì)【事件時(shí)間EventTime】進(jìn)行數(shù)據(jù)處理操作,更加合理化。
?
???????event-time
基于事件時(shí)間窗口聚合操作:基于窗口的聚合(例如每分鐘事件數(shù))只是事件時(shí)間列上特殊類(lèi)型的分組和聚合,其中每個(gè)時(shí)間窗口都是一個(gè)組,并且每一行可以屬于多個(gè)窗口/組。
事件時(shí)間EventTime是嵌入到數(shù)據(jù)本身中的時(shí)間,數(shù)據(jù)實(shí)際真實(shí)產(chǎn)生的時(shí)間。例如,如果希望獲得每分鐘由物聯(lián)網(wǎng)設(shè)備生成的事件數(shù),那么可能希望使用生成數(shù)據(jù)的時(shí)間(即數(shù)據(jù)中的事件時(shí)間event time),而不是Spark接收數(shù)據(jù)的時(shí)間(receive time/archive time)。
這個(gè)事件時(shí)間很自然地用這個(gè)模型表示,設(shè)備中的每個(gè)事件(Event)都是表中的一行(Row),而事件時(shí)間(Event Time)是行中的一列值(Column Value)。
因此,這種基于事件時(shí)間窗口的聚合查詢(xún)既可以在靜態(tài)數(shù)據(jù)集(例如,從收集的設(shè)備事件日志中)上定義,也可以在數(shù)據(jù)流上定義,從而使用戶(hù)的使用更加容易。
修改詞頻統(tǒng)計(jì)程序,數(shù)據(jù)流包含每行數(shù)據(jù)以及生成每行行的時(shí)間。希望在10分鐘的窗口內(nèi)對(duì)單詞進(jìn)行計(jì)數(shù),每5分鐘更新一次,如下圖所示:
?
單詞在10分鐘窗口【12:00-12:10、12:05-12:15、12:10-12:20】等之間接收的單詞中計(jì)數(shù)。注意,【12:00-12:10】表示處理數(shù)據(jù)的事件時(shí)間為12:00之后但12:10之前的數(shù)據(jù)。思考一下,12:07的一條數(shù)據(jù),應(yīng)該增加對(duì)應(yīng)于兩個(gè)窗口12:00-12:10和12:05-12:15的計(jì)數(shù)。
基于事件時(shí)間窗口統(tǒng)計(jì)有兩個(gè)參數(shù)索引:分組鍵(如單詞)和窗口(事件時(shí)間字段)。
?
?
- event-time 窗口生成
Structured Streaming中如何依據(jù)EventTime事件時(shí)間生成窗口的呢?查看類(lèi)TimeWindowing源碼中生成窗口規(guī)則:
org.apache.spark.sql.catalyst.analysis.TimeWindowing// 窗口個(gè)數(shù)/* 最大的窗口數(shù) = 向上取整(窗口長(zhǎng)度/滑動(dòng)步長(zhǎng)) */maxNumOverlapping <- ceil(windowDuration / slideDuration)for (i <- 0 until maxNumOverlapping)/**timestamp是event-time 傳進(jìn)的時(shí)間戳startTime是window窗口參數(shù),默認(rèn)是0 second 從時(shí)間的0s含義:event-time從1970年...有多少個(gè)滑動(dòng)步長(zhǎng),如果說(shuō)浮點(diǎn)數(shù)會(huì)向上取整*/windowId <- ceil((timestamp - startTime) / slideDuration)/**windowId * slideDuration ?向上取能整除滑動(dòng)步長(zhǎng)的時(shí)間(i - maxNumOverlapping) * slideDuration 每一個(gè)窗口開(kāi)始時(shí)間相差一個(gè)步長(zhǎng)*/windowStart <- windowId * slideDuration + (i - maxNumOverlapping) * slideDuration + startTimewindowEnd <- windowStart + windowDurationreturn windowStart, windowEnd
?
將【(event-time向上取?能整除?滑動(dòng)步長(zhǎng)的時(shí)間)?-?(最大窗口數(shù)×滑動(dòng)步長(zhǎng))】作為"初始窗口"的開(kāi)始時(shí)間,然后按照窗口滑動(dòng)寬度逐漸向時(shí)間軸前方推進(jìn),直到某個(gè)窗口不再包含該event-time 為止,最終以"初始窗口"與"結(jié)束窗口"之間的若干個(gè)窗口作為最終生成的 event-time 的時(shí)間窗口。
?
每個(gè)窗口的起始時(shí)間start與結(jié)束時(shí)間end都是前閉后開(kāi)(左閉右開(kāi))的區(qū)間,因此初始窗口和結(jié)束窗口都不會(huì)包含 event-time,最終不會(huì)被使用。假設(shè)數(shù)據(jù)為【2019-08-14 10:50:00, dog】,按照上述規(guī)則計(jì)算窗口示意圖如下:
?
得到窗口如下:
?
???????延遲數(shù)據(jù)處理
Structed Streaming與Spark Streaming相比一大特性就是支持基于數(shù)據(jù)中的時(shí)間戳的數(shù)據(jù)處理。也就是在處理數(shù)據(jù)時(shí),可以對(duì)記錄中的eventTime事件時(shí)間字段進(jìn)行考慮。因?yàn)閑ventTime更好的代表數(shù)據(jù)本身的信息,且可以借助eventTime處理比預(yù)期晚到達(dá)的數(shù)據(jù),但是需要有一個(gè)限度(閾值),不能一直等,應(yīng)該要設(shè)定最多等多久。
???????延遲數(shù)據(jù)
在很多流計(jì)算系統(tǒng)中,數(shù)據(jù)延遲到達(dá)(the events arrives late to the application)的情況很常見(jiàn),并且很多時(shí)候是不可控的,因?yàn)楹芏鄷r(shí)候是外圍系統(tǒng)自身問(wèn)題造成的。Structured Streaming可以保證一條舊的數(shù)據(jù)進(jìn)入到流上時(shí),依然可以基于這些“遲到”的數(shù)據(jù)重新計(jì)算并更新計(jì)算結(jié)果。
?
????上圖中在12:04(即事件時(shí)間)生成的單詞可能在12:11被應(yīng)用程序接收,此時(shí),應(yīng)用程序應(yīng)使用時(shí)間12:04而不是12:11更新窗口12:00-12:10的舊計(jì)數(shù)。但是會(huì)出現(xiàn)如下兩個(gè)問(wèn)題:
?問(wèn)題一:延遲數(shù)據(jù)計(jì)算是否有價(jià)值
- 如果某些數(shù)據(jù),延遲很長(zhǎng)時(shí)間(如30分鐘)才到達(dá)流式處理系統(tǒng),數(shù)據(jù)還需要再次計(jì)算嗎?計(jì)算的結(jié)果還有價(jià)值嗎?原因在于流式處理系統(tǒng)處理數(shù)據(jù)關(guān)鍵核心在于實(shí)時(shí)性;
- 實(shí)踐表明,流計(jì)算關(guān)注的是近期數(shù)據(jù),更新一個(gè)很早之前的狀態(tài)往往已經(jīng)不再具有很大的業(yè)務(wù)價(jià)值;
?問(wèn)題二:以前狀態(tài)保存浪費(fèi)資源
- 實(shí)時(shí)統(tǒng)計(jì)來(lái)說(shuō),如果保存很久以前的數(shù)據(jù)狀態(tài),很多時(shí)候沒(méi)有作用的,反而浪費(fèi)大量資源;
- Spark 2.1引入的watermarking允許用戶(hù)指定延遲數(shù)據(jù)的閾值,也允許引擎清除掉舊的狀態(tài)。即根據(jù)watermark機(jī)制來(lái)設(shè)置和判斷消息的有效性,如可以獲取消息本身的時(shí)間戳,然后根據(jù)該時(shí)間戳來(lái)判斷消息的到達(dá)是否延遲(亂序)以及延遲的時(shí)間是否在容忍的范圍內(nèi)(延遲的數(shù)據(jù)是否處理)。
?
???????Watermarking 水位
水位watermarking官方定義:
lets the engine automatically track the current event time in the data and attempt to clean up old state accordingly.
通過(guò)指定event-time列(上一批次數(shù)據(jù)中EventTime最大值)和預(yù)估事件的延遲時(shí)間上限(Threshold)來(lái)定義一個(gè)查詢(xún)的水位線(xiàn)watermark。翻譯:讓Spark SQL引擎自動(dòng)追蹤數(shù)據(jù)中當(dāng)前事件時(shí)間EventTime,依據(jù)規(guī)則清除舊的狀態(tài)數(shù)據(jù)。
Watermark = MaxEventTime - Threshod
1:執(zhí)行第一批次數(shù)據(jù)時(shí),Watermarker為0,所以此批次中所有數(shù)據(jù)都參與計(jì)算;
2:Watermarker值只能逐漸增加,不能減少;
3:Watermark機(jī)制主要解決處理聚合延遲數(shù)據(jù)和減少內(nèi)存中維護(hù)的聚合狀態(tài);
4:設(shè)置Watermark以后,輸出模式OutputMode只能是Append和Update;
如下方式設(shè)置閾值Threshold,計(jì)算每批次數(shù)據(jù)執(zhí)行時(shí)的水位Watermark:
?
看一下官方案例:詞頻統(tǒng)計(jì)WordCount,設(shè)置閾值Threshold為10分鐘,每5分鐘觸發(fā)執(zhí)行一次。
?
- 延遲到達(dá)但沒(méi)有超過(guò)watermark:(12:08, dog)
在12:20觸發(fā)執(zhí)行窗口(12:10-12:20)數(shù)據(jù)中,(12:08, dog) 數(shù)據(jù)是延遲數(shù)據(jù),閾值Threshold設(shè)定為10分鐘,此時(shí)水位線(xiàn)【W(wǎng)atermark = 12:14 - 10m = 12:04】,因?yàn)?span style="color:#ff0000;">12:14是上個(gè)窗口(12:05-12:15)中接收到的最大的事件時(shí)間,代表目標(biāo)系統(tǒng)最后時(shí)刻的狀態(tài),由于12:08在12:04之后,因此被視為“雖然遲到但尚且可以接收”的數(shù)據(jù)而被更新到了結(jié)果表中,也就是(12:00 - 12:10, dog, 2)和(12:05 - 12:11, dog, 3)。
?
- ?超出watermark:(12:04, donkey)
在12:25觸發(fā)執(zhí)行窗口(12:15-12:25)數(shù)據(jù)中,(12:04, donkey)數(shù)據(jù)是延遲數(shù)據(jù),上個(gè)窗口中接收到最大的事件時(shí)間為12:21,此時(shí)水位線(xiàn)【W(wǎng)atermark = 12:21 - 10m = 12:11】,而(12:04, ?donkey)比這個(gè)值還要早,說(shuō)明它”太舊了”,所以不會(huì)被更新到結(jié)果表中了。
?
???設(shè)置水位線(xiàn)Watermark以后,不同輸出模式OutputMode,結(jié)果輸出不一樣:
- ?Update模式:總是傾向于“盡可能早”的將處理結(jié)果更新到sink,當(dāng)出現(xiàn)遲到數(shù)據(jù)時(shí),早期的某個(gè)計(jì)算結(jié)果將會(huì)被更新;
- ?Append模式:推遲計(jì)算結(jié)果的輸出到一個(gè)相對(duì)較晚的時(shí)刻,確保結(jié)果是穩(wěn)定的,不會(huì)再被更新,比如:12:00 - 12:10窗口的處理結(jié)果會(huì)等到watermark更新到12:11之后才會(huì)寫(xiě)入到sink。
如果用于接收處理結(jié)果的sink不支持更新操作,則只能選擇Append模式。
?
???????官方案例演示
編寫(xiě)代碼,演示官方案例,如下幾點(diǎn)注意:
1、該outputMode為update模式,即只會(huì)輸出那些有更新的數(shù)據(jù)!!
2、官網(wǎng)案例該開(kāi)窗窗口長(zhǎng)度為10 min,步長(zhǎng)5 min,水印為eventtime-10 min,但是測(cè)試的時(shí)候用秒
3、官網(wǎng)案例trigger(Trigger.ProcessingTime("5 minutes")),但是測(cè)試的時(shí)候用秒
測(cè)試數(shù)據(jù):
2019-10-10 12:00:07,dog2019-10-10 12:00:08,owl2019-10-10 12:00:14,dog2019-10-10 12:00:09,cat2019-10-10 12:00:15,cat2019-10-10 12:00:08,dog2019-10-10 12:00:13,owl2019-10-10 12:00:21,owl2019-10-10 12:00:04,donkey ?--丟失2019-10-10 12:00:17,owl ????--不丟失
具體案例代碼如下:
package cn.itcast.structedstreamingimport java.sql.Timestampimport org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}/*** 基于Structured Streaming 讀取TCP Socket讀取數(shù)據(jù),事件時(shí)間窗口統(tǒng)計(jì)詞頻,將結(jié)果打印到控制臺(tái)* 每5秒鐘統(tǒng)計(jì)最近10秒內(nèi)的數(shù)據(jù)(詞頻:WordCount),設(shè)置水位Watermark時(shí)間為10秒* 2019-10-10 12:00:07,dog* 2019-10-10 12:00:08,owl** 2019-10-10 12:00:14,dog* 2019-10-10 12:00:09,cat** 2019-10-10 12:00:15,cat* 2019-10-10 12:00:08,dog* 2019-10-10 12:00:13,owl* 2019-10-10 12:00:21,owl** 2019-10-10 12:00:04,donkey ?--丟失* 2019-10-10 12:00:17,owl ????--不丟失*/
object StructuredWindow {def main(args: Array[String]): Unit = {// 1. 構(gòu)建SparkSession實(shí)例對(duì)象,傳遞sparkConf參數(shù)val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]").config("spark.sql.shuffle.partitions", "3").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import org.apache.spark.sql.functions._import spark.implicits._// 2. 使用SparkSession從TCP Socket讀取流式數(shù)據(jù)val inputStreamDF: DataFrame = spark.readStream.format("socket").option("host", "node1").option("port", 9999).load()// 3. 針對(duì)獲取流式DStream進(jìn)行詞頻統(tǒng)計(jì)val resultStreamDF = inputStreamDF.as[String].filter(StringUtils.isNotBlank(_))// 將每行數(shù)據(jù)進(jìn)行分割單詞: 2019-10-12 09:00:02,cat dog.flatMap(line => {val arr = line.trim.split(",")val timestampStr: String = arr(0)val wordsStr: String = arr(1)wordsStr.split("\\s+")//(時(shí)間戳,單詞).map((Timestamp.valueOf(timestampStr), _))})// 設(shè)置列的名稱(chēng).toDF("timestamp", "word")// TODO:設(shè)置水位Watermark.withWatermark("timestamp", "10 seconds")// TODO:設(shè)置基于事件時(shí)間(event time)窗口?-> time, 每5秒統(tǒng)計(jì)最近10秒內(nèi)數(shù)據(jù).groupBy(window($"timestamp", "10 seconds", "5 seconds"),$"word").count()// 按照窗口字段降序排序//.orderBy($"window")/*root|-- window: struct (nullable = true)| ???|-- start: timestamp (nullable = true)| ???|-- end: timestamp (nullable = true)|-- word: string (nullable = true)|-- count: long (nullable = false)*///resultStreamDF.printSchema()// 4. 將計(jì)算的結(jié)果輸出,打印到控制臺(tái)val query: StreamingQuery = resultStreamDF.writeStream.outputMode(OutputMode.Update()).format("console").option("numRows", "100").option("truncate", "false").trigger(Trigger.ProcessingTime("5 seconds")).start()query.awaitTermination()query.stop()}
}
?
總結(jié)
以上是生活随笔為你收集整理的2021年大数据Spark(五十二):Structured Streaming 事件时间窗口分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 2021年大数据Spark(五十一):S
- 下一篇: 2021年大数据Spark(五十三):S