日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

2021年大数据Spark(五十二):Structured Streaming 事件时间窗口分析

發布時間:2023/11/28 生活经验 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 2021年大数据Spark(五十二):Structured Streaming 事件时间窗口分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

事件時間窗口分析

時間概念

???????event-time

???????延遲數據處理

???????延遲數據

???????Watermarking 水位

???????官方案例演示


事件時間窗口分析

在SparkStreaming中窗口統計分析:Window Operation(設置窗口大小WindowInterval和滑動大小SlideInterval),按照Streaming 流式應用接收數據的時間進行窗口設計的,其實是不符合實際應用場景的。

例如,在物聯網數據平臺中,每個設備產生的數據,其中包含數據產生的時間,然而數據需要經過一系列采集傳輸才能被流式計算框架處理:SparkStreaming,此過程需要時間的,再按照處理時間來統計業務的時候,準確性降低,存在不合理性。

在結構化流Structured Streaming中窗口數據統計時間是基于數據本身事件時間EventTime字段統計,更加合理性,官方文檔:

http://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html#window-operations-on-event-time

?

???????時間概念

在Streaming流式數據處理中,按照時間處理數據,其中時間有三種概念:

1)、事件時間EventTime,表示數據本身產生的時間,該字段在數據本身中;

2)、注入時間IngestionTime,表示數據到達流式系統時間,簡而言之就是流式處理系統接收到數據的時間;

3)、處理時間ProcessingTime,表示數據被流式系統真正開始計算操作的時間。

?

不同流式計算框架支持時間不一樣,

SparkStreaming框架僅僅支持處理時間ProcessTime,

StructuredStreaming支持事件時間和處理時間,

Flink框架支持三種時間數據操作,

實際項目中往往針對【事件時間EventTime】進行數據處理操作,更加合理化。

?

???????event-time

基于事件時間窗口聚合操作:基于窗口的聚合(例如每分鐘事件數)只是事件時間列上特殊類型的分組和聚合,其中每個時間窗口都是一個組,并且每一行可以屬于多個窗口/組。

事件時間EventTime是嵌入到數據本身中的時間,數據實際真實產生的時間。例如,如果希望獲得每分鐘由物聯網設備生成的事件數,那么可能希望使用生成數據的時間(即數據中的事件時間event time),而不是Spark接收數據的時間(receive time/archive time)。

這個事件時間很自然地用這個模型表示,設備中的每個事件(Event)都是表中的一行(Row),而事件時間(Event Time)是行中的一列值(Column Value)。

因此,這種基于事件時間窗口的聚合查詢既可以在靜態數據集(例如,從收集的設備事件日志中)上定義,也可以在數據流上定義,從而使用戶的使用更加容易。

修改詞頻統計程序,數據流包含每行數據以及生成每行行的時間。希望在10分鐘的窗口內對單詞進行計數,每5分鐘更新一次,如下圖所示:

?

單詞在10分鐘窗口【12:00-12:10、12:05-12:15、12:10-12:20】等之間接收的單詞中計數。注意,【12:00-12:10】表示處理數據的事件時間為12:00之后但12:10之前的數據。思考一下,12:07的一條數據,應該增加對應于兩個窗口12:00-12:10和12:05-12:15的計數。

基于事件時間窗口統計有兩個參數索引:分組鍵(如單詞)和窗口(事件時間字段)。

?

?

  • event-time 窗口生成

Structured Streaming中如何依據EventTime事件時間生成窗口的呢?查看類TimeWindowing源碼中生成窗口規則:

org.apache.spark.sql.catalyst.analysis.TimeWindowing// 窗口個數/* 最大的窗口數 = 向上取整(窗口長度/滑動步長) */maxNumOverlapping <- ceil(windowDuration / slideDuration)for (i <- 0 until maxNumOverlapping)/**timestamp是event-time 傳進的時間戳startTime是window窗口參數,默認是0 second 從時間的0s含義:event-time從1970年...有多少個滑動步長,如果說浮點數會向上取整*/windowId <- ceil((timestamp - startTime) / slideDuration)/**windowId * slideDuration ?向上取能整除滑動步長的時間(i - maxNumOverlapping) * slideDuration 每一個窗口開始時間相差一個步長*/windowStart <- windowId * slideDuration + (i - maxNumOverlapping) * slideDuration + startTimewindowEnd <- windowStart + windowDurationreturn windowStart, windowEnd

?

將【(event-time向上取?能整除?滑動步長的時間)?-?(最大窗口數×滑動步長)】作為"初始窗口"的開始時間,然后按照窗口滑動寬度逐漸向時間軸前方推進,直到某個窗口不再包含該event-time 為止,最終以"初始窗口"與"結束窗口"之間的若干個窗口作為最終生成的 event-time 的時間窗口。

?

每個窗口的起始時間start結束時間end都是前閉后開(左閉右開)的區間,因此初始窗口和結束窗口都不會包含 event-time,最終不會被使用。假設數據為【2019-08-14 10:50:00, dog】,按照上述規則計算窗口示意圖如下:

?

得到窗口如下:

?

???????延遲數據處理

Structed Streaming與Spark Streaming相比一大特性就是支持基于數據中的時間戳的數據處理。也就是在處理數據時,可以對記錄中的eventTime事件時間字段進行考慮。因為eventTime更好的代表數據本身的信息,且可以借助eventTime處理比預期晚到達的數據,但是需要有一個限度(閾值),不能一直等,應該要設定最多等多久。

???????延遲數據

在很多流計算系統中,數據延遲到達(the events arrives late to the application)的情況很常見,并且很多時候是不可控的,因為很多時候是外圍系統自身問題造成的。Structured Streaming可以保證一條舊的數據進入到流上時,依然可以基于這些“遲到”的數據重新計算并更新計算結果

?

????上圖中在12:04(即事件時間)生成的單詞可能在12:11被應用程序接收,此時,應用程序應使用時間12:04而不是12:11更新窗口12:00-12:10的舊計數。但是會出現如下兩個問題:

?問題一:延遲數據計算是否有價值

  • 如果某些數據,延遲很長時間(如30分鐘)才到達流式處理系統,數據還需要再次計算嗎?計算的結果還有價值嗎?原因在于流式處理系統處理數據關鍵核心在于實時性
  • 實踐表明,流計算關注的是近期數據,更新一個很早之前的狀態往往已經不再具有很大的業務價值;

?問題二:以前狀態保存浪費資源

  • 實時統計來說,如果保存很久以前的數據狀態,很多時候沒有作用的,反而浪費大量資源
  • Spark 2.1引入的watermarking允許用戶指定延遲數據的閾值,也允許引擎清除掉舊的狀態。即根據watermark機制來設置和判斷消息的有效性,如可以獲取消息本身的時間戳,然后根據該時間戳來判斷消息的到達是否延遲(亂序)以及延遲的時間是否在容忍的范圍內(延遲的數據是否處理)。

?

???????Watermarking 水位

水位watermarking官方定義:


lets the engine automatically track the current event time in the data and attempt to clean up old state accordingly.

通過指定event-time列(上一批次數據中EventTime最大值)和預估事件的延遲時間上限(Threshold)來定義一個查詢的水位線watermark。翻譯:讓Spark SQL引擎自動追蹤數據中當前事件時間EventTime,依據規則清除舊的狀態數據

Watermark = MaxEventTime - Threshod

1:執行第一批次數據時,Watermarker為0,所以此批次中所有數據都參與計算;

2:Watermarker值只能逐漸增加,不能減少;

3:Watermark機制主要解決處理聚合延遲數據和減少內存中維護的聚合狀態;

4:設置Watermark以后,輸出模式OutputMode只能是Append和Update;

如下方式設置閾值Threshold,計算每批次數據執行時的水位Watermark:

?

看一下官方案例:詞頻統計WordCount,設置閾值Threshold為10分鐘,每5分鐘觸發執行一次。

?

  • 延遲到達但沒有超過watermark:(12:08, dog)

在12:20觸發執行窗口(12:10-12:20)數據中,(12:08, dog) 數據是延遲數據,閾值Threshold設定為10分鐘,此時水位線【Watermark = 12:14 - 10m = 12:04】,因為12:14是上個窗口(12:05-12:15)中接收到的最大的事件時間,代表目標系統最后時刻的狀態,由于12:08在12:04之后,因此被視為“雖然遲到但尚且可以接收”的數據而被更新到了結果表中,也就是(12:00 - 12:10, dog, 2)和(12:05 - 12:11, dog, 3)

?

  • ?超出watermark:(12:04, donkey)

在12:25觸發執行窗口(12:15-12:25)數據中,(12:04, donkey)數據是延遲數據,上個窗口中接收到最大的事件時間為12:21,此時水位線【Watermark = 12:21 - 10m = 12:11】,而(12:04, ?donkey)比這個值還要早,說明它”太舊了”,所以不會被更新到結果表中了。

?

???設置水位線Watermark以后,不同輸出模式OutputMode,結果輸出不一樣:

  • ?Update模式:總是傾向于“盡可能早”的將處理結果更新到sink,當出現遲到數據時,早期的某個計算結果將會被更新;
  • ?Append模式:推遲計算結果的輸出到一個相對較晚的時刻,確保結果是穩定的,不會再被更新,比如:12:00 - 12:10窗口的處理結果會等到watermark更新到12:11之后才會寫入到sink。

如果用于接收處理結果的sink不支持更新操作,則只能選擇Append模式。

?

???????官方案例演示

編寫代碼,演示官方案例,如下幾點注意:

1、該outputMode為update模式,即只會輸出那些有更新的數據!!

2、官網案例該開窗窗口長度為10 min,步長5 min,水印為eventtime-10 min,但是測試的時候用秒

3、官網案例trigger(Trigger.ProcessingTime("5 minutes")),但是測試的時候用秒

測試數據:

2019-10-10 12:00:07,dog2019-10-10 12:00:08,owl2019-10-10 12:00:14,dog2019-10-10 12:00:09,cat2019-10-10 12:00:15,cat2019-10-10 12:00:08,dog2019-10-10 12:00:13,owl2019-10-10 12:00:21,owl2019-10-10 12:00:04,donkey ?--丟失2019-10-10 12:00:17,owl ????--不丟失

具體案例代碼如下:

package cn.itcast.structedstreamingimport java.sql.Timestampimport org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}/*** 基于Structured Streaming 讀取TCP Socket讀取數據,事件時間窗口統計詞頻,將結果打印到控制臺* 每5秒鐘統計最近10秒內的數據(詞頻:WordCount),設置水位Watermark時間為10秒* 2019-10-10 12:00:07,dog* 2019-10-10 12:00:08,owl** 2019-10-10 12:00:14,dog* 2019-10-10 12:00:09,cat** 2019-10-10 12:00:15,cat* 2019-10-10 12:00:08,dog* 2019-10-10 12:00:13,owl* 2019-10-10 12:00:21,owl** 2019-10-10 12:00:04,donkey ?--丟失* 2019-10-10 12:00:17,owl ????--不丟失*/
object StructuredWindow {def main(args: Array[String]): Unit = {// 1. 構建SparkSession實例對象,傳遞sparkConf參數val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]").config("spark.sql.shuffle.partitions", "3").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import org.apache.spark.sql.functions._import spark.implicits._// 2. 使用SparkSession從TCP Socket讀取流式數據val inputStreamDF: DataFrame = spark.readStream.format("socket").option("host", "node1").option("port", 9999).load()// 3. 針對獲取流式DStream進行詞頻統計val resultStreamDF = inputStreamDF.as[String].filter(StringUtils.isNotBlank(_))// 將每行數據進行分割單詞: 2019-10-12 09:00:02,cat dog.flatMap(line => {val arr = line.trim.split(",")val timestampStr: String = arr(0)val wordsStr: String = arr(1)wordsStr.split("\\s+")//(時間戳,單詞).map((Timestamp.valueOf(timestampStr), _))})// 設置列的名稱.toDF("timestamp", "word")// TODO:設置水位Watermark.withWatermark("timestamp", "10 seconds")// TODO:設置基于事件時間(event time)窗口?-> time, 每5秒統計最近10秒內數據.groupBy(window($"timestamp", "10 seconds", "5 seconds"),$"word").count()// 按照窗口字段降序排序//.orderBy($"window")/*root|-- window: struct (nullable = true)| ???|-- start: timestamp (nullable = true)| ???|-- end: timestamp (nullable = true)|-- word: string (nullable = true)|-- count: long (nullable = false)*///resultStreamDF.printSchema()// 4. 將計算的結果輸出,打印到控制臺val query: StreamingQuery = resultStreamDF.writeStream.outputMode(OutputMode.Update()).format("console").option("numRows", "100").option("truncate", "false").trigger(Trigger.ProcessingTime("5 seconds")).start()query.awaitTermination()query.stop()}
}

?

總結

以上是生活随笔為你收集整理的2021年大数据Spark(五十二):Structured Streaming 事件时间窗口分析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 午夜写真片福利电影网 | 看免费黄色片 | 成人精品久久久午夜福利 | 在线免费观看黄视频 | 中文字幕一区二区三区精彩视频 | 在线视频日韩欧美 | 日韩精品一二 | 求av网站 | 中文字幕av久久 | 久久精品2 | 免费毛片小视频 | 免费av一级片| 久久性视频| 成人自拍视频在线观看 | 欧美成人综合 | 久久国产一级 | 91成人黄色 | 国产一二在线观看 | 永久在线 | 麻豆免费在线视频 | 亚洲不卡在线视频 | 欧美一区二区三区小说 | 国产亲伦免费视频播放 | 久久免费手机视频 | 国产又粗又大又硬 | 精品一区二区三区视频 | 色欲国产精品一区二区 | 郑艳丽三级 | 亚洲性生活 | 豆花av在线 | 日韩一级在线播放 | 国产欧美日韩在线 | 黄在线免费观看 | 久久精品无码一区二区三区 | 中文日韩字幕 | 91热热| 亚洲va国产天堂va久久 en | 久久午夜鲁丝片午夜精品 | 亚洲综合大片69999 | 国产免费黄色录像 | 国产成人精品影院 | 欧美福利电影 | 精品不卡一区二区三区 | 欧美日韩不卡一区二区三区 | 性生活一区| 天堂视频免费在线观看 | 成人福利免费视频 | 国产伦理一区二区三区 | 国产精品夫妻自拍 | 欧美亚洲天堂网 | 久久这里有精品 | 精品三级av | 你懂的国产在线 | 久久中文在线 | 国产午夜在线播放 | 日本午夜电影 | 欧美日韩中文字幕在线播放 | www在线免费观看 | 免费观看日批视频 | 久草在在线视频 | 国内成人自拍视频 | 麻豆蜜桃wwww精品无码 | 成人午夜视频精品一区 | 99在线小视频 | 91久久精品国产91久久 | 白丝校花扒腿让我c | 99精品国自产在线 | 岛国一区 | 日本午夜网站 | 午夜色网 | 99热这里只有精品在线观看 | mm131丰满少妇人体欣赏图 | 射射综合网 | 少妇又色又紧又爽又刺激视频 | 毛片网站在线播放 | 巨乳中文字幕 | 国产一区午夜 | 99资源站| 久久视频免费观看 | youjizzcom日本| 中文精品一区二区三区 | 国产精品视频久久久 | 青青草草 | 日本精品久久久久中文字幕 | 椎名空在线观看 | 公妇借种乱htp109cc | 91小宝寻花一区二区三区 | 国产欧美一区二区三区视频在线观看 | 97视频播放 | 国产一区二区三区三州 | 青青青手机视频在线观看 | 粗大黑人巨茎大战欧美成人 | 欧美激情精品久久久久久 | 成年人视频在线播放 | 久久视频免费 | 粗暴video蹂躏hd | 日韩成人专区 | 国产成人自拍在线 | 欧美91视频 |