spark sql uv_使用Spark Streaming SQL进行PV/UV统计
作者:關(guān)文選,花名云魄,阿里云E-MapReduce 高級(jí)開(kāi)發(fā)工程師,專(zhuān)注于流式計(jì)算,Spark Contributor
1.背景介紹
PV/UV統(tǒng)計(jì)是流式分析一個(gè)常見(jiàn)的場(chǎng)景。通過(guò)PV可以對(duì)訪(fǎng)問(wèn)的網(wǎng)站做流量或熱點(diǎn)分析,例如廣告主可以通過(guò)PV值預(yù)估投放廣告網(wǎng)頁(yè)所帶來(lái)的流量以及廣告收入。另外一些場(chǎng)景需要對(duì)訪(fǎng)問(wèn)的用戶(hù)作分析,比如分析用戶(hù)的網(wǎng)頁(yè)點(diǎn)擊行為,此時(shí)就需要對(duì)UV做統(tǒng)計(jì)。
使用Spark Streaming SQL,并結(jié)合Redis可以很方便進(jìn)行PV/UV的統(tǒng)計(jì)。本文將介紹通過(guò)Streaming SQL消費(fèi)Loghub中存儲(chǔ)的用戶(hù)訪(fǎng)問(wèn)信息,對(duì)過(guò)去1分鐘內(nèi)的數(shù)據(jù)進(jìn)行PV/UV統(tǒng)計(jì),將結(jié)果存入Redis中。
2.準(zhǔn)備工作
創(chuàng)建E-MapReduce 3.23.0以上版本的Hadoop集群。
下載并編譯E-MapReduce-SDK包
編譯完后, assembly/target目錄下會(huì)生成emr-datasources_shaded_${version}.jar,其中${version}為sdk的版本。
數(shù)據(jù)源
本文采用Loghub作為數(shù)據(jù)源,有關(guān)日志采集、日志解析請(qǐng)參考日志服務(wù)。
3.統(tǒng)計(jì)PV/UV
一般場(chǎng)景下需要將統(tǒng)計(jì)出的PV/UV以及相應(yīng)的統(tǒng)計(jì)時(shí)間存入Redis。其他一些業(yè)務(wù)場(chǎng)景中,也會(huì)只保存最新結(jié)果,用新的結(jié)果不斷覆蓋更新舊的數(shù)據(jù)。以下首先介紹第一種情況的操作流程。
3.1啟動(dòng)客戶(hù)端
命令行啟動(dòng)streaming-sql客戶(hù)端
streaming-sql --master yarn-client --num-executors 2 --executor-memory 2g --executor-cores 2 --jars emr-datasources_shaded_2.11-${version}.jar --driver-class-path emr-datasources_shaded_2.11-${version}.jar也可以創(chuàng)建SQL語(yǔ)句文件,通過(guò)streaming-sql -f的方式運(yùn)行。
3.1定義數(shù)據(jù)表
數(shù)據(jù)源表定義如下
CREATE TABLE loghub_source(user_ip STRING, __time__ TIMESTAMP)USING loghubOPTIONS(sls.project=${sls.project},sls.store=${sls.store},access.key.id=${access.key.id},access.key.secret=${access.key.secret},endpoint=${endpoint});其中,數(shù)據(jù)源表包含user_ip和__time__兩個(gè)字段,分別代表用戶(hù)的IP地址和loghub上的時(shí)間列。OPTIONS中配置項(xiàng)的值根據(jù)實(shí)際配置。
結(jié)果表定義如下
其中,user_ip對(duì)應(yīng)數(shù)據(jù)中的用戶(hù)IP字段,配置項(xiàng)${redis_host}的值根據(jù)實(shí)際配置。
3.2創(chuàng)建流作業(yè)
CREATE SCAN loghub_scanON loghub_sourceUSING STREAMOPTIONS(watermark.column='__time__',watermark.delayThreshold='10 second');CREATE STREAM jobOPTIONS(checkpointLocation=${checkpoint_location})INSERT INTO redis_sinkSELECT COUNT(user_ip) AS pv, approx_count_distinct( user_ip) AS uv, window.end AS intervalFROM loghub_scanGROUP BY TUMBLING(__time__, interval 1 minute), window;4.3查看統(tǒng)計(jì)結(jié)果
最終的統(tǒng)計(jì)結(jié)果如下圖所示
可以看到,每隔一分鐘都會(huì)生成一條數(shù)據(jù),key的形式為表名:interval,value為pv和uv的值。
3.4實(shí)現(xiàn)覆蓋更新
將結(jié)果表的配置項(xiàng)key.column修改為一個(gè)固定的值,例如定義如下
CREATE TABLE redis_sinkUSING redisOPTIONS(table='statistic_info',host=${redis_host},key.column='statistic_type');創(chuàng)建流作業(yè)的SQL改為
CREATE STREAM jobOPTIONS(checkpointLocation='/tmp/spark-test/checkpoint')INSERT INTO redis_sinkSELECT "PV_UV" as statistic_type,COUNT(user_ip) AS pv, approx_count_distinct( user_ip) AS uv, window.end AS intervalFROM loghub_scanGROUP BY TUMBLING(__time__, interval 1 minute), window;最終的統(tǒng)計(jì)結(jié)果如下圖所示
可以看到,Redis中值保留了一個(gè)值,這個(gè)值每分鐘都被更新,value包含pv、uv和interval的值。
4.總結(jié)
本文簡(jiǎn)要介紹了使用Streaming SQL結(jié)合Redis實(shí)現(xiàn)流式處理中統(tǒng)計(jì)PV/UV的需求。后續(xù)文章,我將介紹Spark Streaming SQL的更多內(nèi)容。
猜你喜歡1、重磅|Spark Delta Lake 現(xiàn)在由Linux基金會(huì)托管,將成為數(shù)據(jù)湖的開(kāi)放標(biāo)準(zhǔn)
2、云棲大會(huì) | Apache Spark 3.0 和 Koalas 最新進(jìn)展
3、原創(chuàng)干貨 | 史上最全的大數(shù)據(jù)學(xué)習(xí)資源(Awesome Big Data)
4、Spark Delta Lake 0.4.0 發(fā)布,支持 Python API 和部分 SQL
過(guò)往記憶大數(shù)據(jù)微信群,請(qǐng)?zhí)砑游⑿?#xff1a;fangzhen0219,備注【進(jìn)群】
總結(jié)
以上是生活随笔為你收集整理的spark sql uv_使用Spark Streaming SQL进行PV/UV统计的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 用SolidWorks螺旋线、线性阵列建
- 下一篇: 打开数据库_数据库客户端navicat遇