當前位置:
首頁 >
2021年大数据Spark(五十三):Structured Streaming Deduplication
發布時間:2023/11/28
31
豆豆
生活随笔
收集整理的這篇文章主要介紹了
2021年大数据Spark(五十三):Structured Streaming Deduplication
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
目錄
Streaming Deduplication
介紹
需求
???????代碼演示
Streaming Deduplication
介紹
在實時流式應用中,最典型的應用場景:網站UV統計。
1:實時統計網站UV,比如每日網站UV;
2:統計最近一段時間(比如一個小時)網站UV,可以設置水位Watermark;
?
Structured Streaming可以使用deduplication對有無Watermark的流式數據進行去重操作:
1.無 Watermark:對重復記錄到達的時間沒有限制。查詢會保留所有的過去記錄作為狀態用于去重;
2.有 Watermark:對重復記錄到達的時間有限制。查詢會根據水印刪除舊的狀態數據;
?
官方提供示例代碼如下:
?
?
???????需求
對網站用戶日志數據,按照userId和eventType去重統計
數據如下:
{"eventTime": "2016-01-10 10:01:50","eventType": "browse","userID":"1"}{"eventTime": "2016-01-10 10:01:50","eventType": "click","userID":"1"}{"eventTime": "2016-01-10 10:01:55","eventType": "browse","userID":"1"}{"eventTime": "2016-01-10 10:01:55","eventType": "click","userID":"1"}{"eventTime": "2016-01-10 10:01:50","eventType": "browse","userID":"1"}{"eventTime": "2016-01-10 10:01:50","eventType": "click","userID":"1"}{"eventTime": "2016-01-10 10:02:00","eventType": "click","userID":"1"}{"eventTime": "2016-01-10 10:01:50","eventType": "browse","userID":"1"}{"eventTime": "2016-01-10 10:01:50","eventType": "click","userID":"1"}{"eventTime": "2016-01-10 10:01:51","eventType": "click","userID":"1"}{"eventTime": "2016-01-10 10:01:50","eventType": "browse","userID":"1"}{"eventTime": "2016-01-10 10:01:50","eventType": "click","userID":"3"}{"eventTime": "2016-01-10 10:01:51","eventType": "click","userID":"2"}
?
???????代碼演示
package cn.itcast.structedstreamingimport org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.{DataFrame, SparkSession}object StructuredDeduplication {def main(args: Array[String]): Unit = {// 1. 構建SparkSession會話實例對象,設置屬性信息val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]").config("spark.sql.shuffle.partitions", "3").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import org.apache.spark.sql.functions._import spark.implicits._// 1. 從TCP Socket 讀取數據val inputTable: DataFrame = spark.readStream.format("socket").option("host", "node1").option("port", 9999).load()// 2. 數據處理分析val resultTable: DataFrame = inputTable.as[String].filter(StringUtils.isNotBlank(_))// 樣本數據:{“eventTime”: “2016-01-10 10:01:50”,“eventType”: “browse”,“userID”:“1”}.select(get_json_object($"value", "$.eventTime").as("event_time"),get_json_object($"value", "$.eventType").as("event_type"),get_json_object($"value", "$.userID").as("user_id"))// 按照UserId和EventType去重.dropDuplicates("user_id", "event_type").groupBy($"user_id", $"event_type").count()// 3. 設置Streaming應用輸出及啟動val query: StreamingQuery = resultTable.writeStream.outputMode(OutputMode.Complete()).format("console").option("numRows", "10").option("truncate", "false").start()query.awaitTermination()query.stop()}
}
?
運行應用結果如下:
?
總結
以上是生活随笔為你收集整理的2021年大数据Spark(五十三):Structured Streaming Deduplication的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Spark(五十二):S
- 下一篇: 2021年大数据Flink(一):乘风破