日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

2021年大数据Spark(五十三):Structured Streaming Deduplication

發布時間:2023/11/28 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 2021年大数据Spark(五十三):Structured Streaming Deduplication 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

Streaming Deduplication

介紹

需求

???????代碼演示


Streaming Deduplication

介紹

在實時流式應用中,最典型的應用場景:網站UV統計。

1:實時統計網站UV,比如每日網站UV;

2:統計最近一段時間(比如一個小時)網站UV,可以設置水位Watermark;

?

Structured Streaming可以使用deduplication對有無Watermark的流式數據進行去重操作:

1.無 Watermark:對重復記錄到達的時間沒有限制。查詢會保留所有的過去記錄作為狀態用于去重;

2.有 Watermark:對重復記錄到達的時間有限制。查詢會根據水印刪除舊的狀態數據;

?

官方提供示例代碼如下:

?

?

???????需求

對網站用戶日志數據,按照userId和eventType去重統計

數據如下:


{"eventTime": "2016-01-10 10:01:50","eventType": "browse","userID":"1"}{"eventTime": "2016-01-10 10:01:50","eventType": "click","userID":"1"}{"eventTime": "2016-01-10 10:01:55","eventType": "browse","userID":"1"}{"eventTime": "2016-01-10 10:01:55","eventType": "click","userID":"1"}{"eventTime": "2016-01-10 10:01:50","eventType": "browse","userID":"1"}{"eventTime": "2016-01-10 10:01:50","eventType": "click","userID":"1"}{"eventTime": "2016-01-10 10:02:00","eventType": "click","userID":"1"}{"eventTime": "2016-01-10 10:01:50","eventType": "browse","userID":"1"}{"eventTime": "2016-01-10 10:01:50","eventType": "click","userID":"1"}{"eventTime": "2016-01-10 10:01:51","eventType": "click","userID":"1"}{"eventTime": "2016-01-10 10:01:50","eventType": "browse","userID":"1"}{"eventTime": "2016-01-10 10:01:50","eventType": "click","userID":"3"}{"eventTime": "2016-01-10 10:01:51","eventType": "click","userID":"2"}

?

???????代碼演示

package cn.itcast.structedstreamingimport org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.{DataFrame, SparkSession}object StructuredDeduplication {def main(args: Array[String]): Unit = {// 1. 構建SparkSession會話實例對象,設置屬性信息val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]").config("spark.sql.shuffle.partitions", "3").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import org.apache.spark.sql.functions._import spark.implicits._// 1. 從TCP Socket 讀取數據val inputTable: DataFrame = spark.readStream.format("socket").option("host", "node1").option("port", 9999).load()// 2. 數據處理分析val resultTable: DataFrame = inputTable.as[String].filter(StringUtils.isNotBlank(_))// 樣本數據:{“eventTime”: “2016-01-10 10:01:50”,“eventType”: “browse”,“userID”:“1”}.select(get_json_object($"value", "$.eventTime").as("event_time"),get_json_object($"value", "$.eventType").as("event_type"),get_json_object($"value", "$.userID").as("user_id"))// 按照UserId和EventType去重.dropDuplicates("user_id", "event_type").groupBy($"user_id", $"event_type").count()// 3. 設置Streaming應用輸出及啟動val query: StreamingQuery = resultTable.writeStream.outputMode(OutputMode.Complete()).format("console").option("numRows", "10").option("truncate", "false").start()query.awaitTermination()query.stop()}
}

?

運行應用結果如下:

?

總結

以上是生活随笔為你收集整理的2021年大数据Spark(五十三):Structured Streaming Deduplication的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。