日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

2021年大数据Spark(五十三):Structured Streaming Deduplication

發布時間:2023/11/28 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 2021年大数据Spark(五十三):Structured Streaming Deduplication 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

Streaming Deduplication

介紹

需求

???????代碼演示


Streaming Deduplication

介紹

在實時流式應用中,最典型的應用場景:網站UV統計。

1:實時統計網站UV,比如每日網站UV;

2:統計最近一段時間(比如一個小時)網站UV,可以設置水位Watermark;

?

Structured Streaming可以使用deduplication對有無Watermark的流式數據進行去重操作:

1.無 Watermark:對重復記錄到達的時間沒有限制。查詢會保留所有的過去記錄作為狀態用于去重;

2.有 Watermark:對重復記錄到達的時間有限制。查詢會根據水印刪除舊的狀態數據;

?

官方提供示例代碼如下:

?

?

???????需求

對網站用戶日志數據,按照userId和eventType去重統計

數據如下:


{"eventTime": "2016-01-10 10:01:50","eventType": "browse","userID":"1"}{"eventTime": "2016-01-10 10:01:50","eventType": "click","userID":"1"}{"eventTime": "2016-01-10 10:01:55","eventType": "browse","userID":"1"}{"eventTime": "2016-01-10 10:01:55","eventType": "click","userID":"1"}{"eventTime": "2016-01-10 10:01:50","eventType": "browse","userID":"1"}{"eventTime": "2016-01-10 10:01:50","eventType": "click","userID":"1"}{"eventTime": "2016-01-10 10:02:00","eventType": "click","userID":"1"}{"eventTime": "2016-01-10 10:01:50","eventType": "browse","userID":"1"}{"eventTime": "2016-01-10 10:01:50","eventType": "click","userID":"1"}{"eventTime": "2016-01-10 10:01:51","eventType": "click","userID":"1"}{"eventTime": "2016-01-10 10:01:50","eventType": "browse","userID":"1"}{"eventTime": "2016-01-10 10:01:50","eventType": "click","userID":"3"}{"eventTime": "2016-01-10 10:01:51","eventType": "click","userID":"2"}

?

???????代碼演示

package cn.itcast.structedstreamingimport org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.{DataFrame, SparkSession}object StructuredDeduplication {def main(args: Array[String]): Unit = {// 1. 構建SparkSession會話實例對象,設置屬性信息val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]").config("spark.sql.shuffle.partitions", "3").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import org.apache.spark.sql.functions._import spark.implicits._// 1. 從TCP Socket 讀取數據val inputTable: DataFrame = spark.readStream.format("socket").option("host", "node1").option("port", 9999).load()// 2. 數據處理分析val resultTable: DataFrame = inputTable.as[String].filter(StringUtils.isNotBlank(_))// 樣本數據:{“eventTime”: “2016-01-10 10:01:50”,“eventType”: “browse”,“userID”:“1”}.select(get_json_object($"value", "$.eventTime").as("event_time"),get_json_object($"value", "$.eventType").as("event_type"),get_json_object($"value", "$.userID").as("user_id"))// 按照UserId和EventType去重.dropDuplicates("user_id", "event_type").groupBy($"user_id", $"event_type").count()// 3. 設置Streaming應用輸出及啟動val query: StreamingQuery = resultTable.writeStream.outputMode(OutputMode.Complete()).format("console").option("numRows", "10").option("truncate", "false").start()query.awaitTermination()query.stop()}
}

?

運行應用結果如下:

?

總結

以上是生活随笔為你收集整理的2021年大数据Spark(五十三):Structured Streaming Deduplication的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。