簡介:在實際業(yè)務使用中,需要經(jīng)常實時做一些數(shù)據(jù)分析,包括實時PV和UV展示,實時銷售數(shù)據(jù),實時店鋪UV以及實時推薦系統(tǒng)等,基于此類需求,Confluent+實時計算Flink版是一個高效的方案。
業(yè)務背景
在實際業(yè)務使用中,需要經(jīng)常實時做一些數(shù)據(jù)分析,包括實時PV和UV展示,實時銷售數(shù)據(jù),實時店鋪UV以及實時推薦系統(tǒng)等,基于此類需求,Confluent+實時計算Flink版是一個高效的方案。
Confluent是基于Apache Kafka提供的企業(yè)級全托管流數(shù)據(jù)服務,由 Apache Kafka 的原始創(chuàng)建者構(gòu)建,通過企業(yè)級功能擴展了 Kafka 的優(yōu)勢,同時消除了 Kafka管理或監(jiān)控的負擔。
實時計算Flink版是阿里云基于 Apache Flink 構(gòu)建的企業(yè)級實時大數(shù)據(jù)計算商業(yè)產(chǎn)品。實時計算 Flink 由 Apache Flink 創(chuàng)始團隊官方出品,擁有全球統(tǒng)一商業(yè)化品牌,提供全系列產(chǎn)品矩陣,完全兼容開源 Flink API,并充分基于強大的阿里云平臺提供云原生的 Flink 商業(yè)增值能力。
一、準備工作-創(chuàng)建Confluent集群和實時計算Flink版集群
登錄Confluent管理控制臺,創(chuàng)建Confluent集群,創(chuàng)建步驟參考 Confluent集群開通
登錄實時計算Flink版管理控制臺,創(chuàng)建vvp集群。請注意,創(chuàng)建vvp集群選擇的vpc跟confluent集群的region和vpc使用同一個,這樣可以在vvp內(nèi)部訪問confluent的內(nèi)部域名。
二、最佳實踐-實時統(tǒng)計玩家充值金額-Confluent+實時計算Flink+Hologres
2.1 新建Confluent消息隊列
在confluent集群列表頁,登錄control center
在左側(cè)選中Topics,點擊Add a topic按鈕,創(chuàng)建一個名為confluent-vvp-test的topic,將partition設置為3
2.2 配置結(jié)果表 Hologres
進入Hologres控制臺,點擊Hologres實例,在DB管理中新增數(shù)據(jù)庫`mydb`
登錄Hologres數(shù)據(jù)庫,新建SQL
Hologres中創(chuàng)建結(jié)果表 SQL語句
--用戶累計消費結(jié)果表CREATE TABLE consume (appkey VARCHAR,serverid VARCHAR,servertime VARCHAR,roleid VARCHAR,amount FLOAT,dt VARCHAR,primary key(appkey,dt));
2.3 創(chuàng)建實時計算vvp作業(yè)
首先登錄vvp控制臺,選擇集群所在region,點擊控制臺,進入開發(fā)界面
點擊作業(yè)開發(fā)Tab,點擊新建文件,文件名稱:confluent-vvp-hologres,文件類型選擇:流作業(yè)/SQL
在輸入框?qū)懭胍韵麓a:
create TEMPORARY table kafka_game_consume_source( appkey STRING,servertime STRING,consumenum DOUBLE,roleid STRING,serverid STRING
) with ('connector' = 'kafka','topic' = 'game_consume_log','properties.bootstrap.servers' = 'kafka.confluent.svc.cluster.local.xxx:9071[xxx可以找開發(fā)同學查看]','properties.group.id' = 'gamegroup','format' = 'json','properties.ssl.truststore.location' = '/flink/usrlib/truststore.jks','properties.ssl.truststore.password' = '[your truststore password]','properties.security.protocol'='SASL_SSL','properties.sasl.mechanism'='PLAIN','properties.sasl.jaas.config'='org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="xxx[集群的用戶]" password="xxx[相應的密碼]";'
);
-- 創(chuàng)建累計消費hologres sink表
CREATE TEMPORARY TABLE consume(appkey STRING,serverid STRING,servertime STRING,roleid STRING,amount DOUBLE,dt STRING,PRIMARY KEY (appkey,dt) NOT ENFORCED)WITH ('connector' = 'hologres','dbname' = 'mydb','endpoint' = 'hgprecn-cn-tl32gkaet006-cn-beijing-vpc.hologres.aliyuncs.com:80','password' = '[your appkey secret]','tablename' = 'consume','username' = '[your app key]','mutateType' = 'insertorreplace');
--{"appkey":"appkey1","servertime":"2020-09-30 14:10:36","consumenum":33.8,"roleid":"roleid1","serverid":"1"}
--{"appkey":"appkey2","servertime":"2020-09-30 14:11:36","consumenum":30.8,"roleid":"roleid2","serverid":"2"}
--{"appkey":"appkey1","servertime":"2020-09-30 14:13:36","consumenum":31.8,"roleid":"roleid1","serverid":"1"}
--{"appkey":"appkey2","servertime":"2020-09-30 14:20:36","consumenum":33.8,"roleid":"roleid2","serverid":"2"}
--{"appkey":"appkey1","servertime":"2020-09-30 14:30:36","consumenum":73.8,"roleid":"roleid1","serverid":"1"}-- 計算每個用戶累積消費金額insert into consumeSELECTappkey,LAST_VALUE(serverid) as serverid,LAST_VALUE(servertime) as servertime,LAST_VALUE(roleid) as roleid,sum(consumenum) as amount,substring(servertime,1,10) as dtFROM kafka_game_consume_sourceGROUP BY appkey,substring(servertime,1,10)having sum(consumenum) > 0;
在高級配置里,增加依賴文件truststore.jks(訪問內(nèi)部域名得添加這個文件,訪問公網(wǎng)域名可以不用),訪問依賴文件的固定路徑前綴都是/flink/usrlib/(這里就是/flink/usrlib/truststore.jks)
點擊上線按鈕,完成上線
在運維作用列表里找到剛上線的作用,點擊啟動按鈕,等待狀態(tài)更新為running,運行成功。
在control center的【Topics->Messages】頁面,逐條發(fā)送測試消息,格式為:
{"appkey":"appkey1","servertime":"2020-09-30 14:10:36","consumenum":33.8,"roleid":"roleid1","serverid":"1"}
{"appkey":"appkey2","servertime":"2020-09-30 14:11:36","consumenum":30.8,"roleid":"roleid2","serverid":"2"}
{"appkey":"appkey1","servertime":"2020-09-30 14:13:36","consumenum":31.8,"roleid":"roleid1","serverid":"1"}
{"appkey":"appkey2","servertime":"2020-09-30 14:20:36","consumenum":33.8,"roleid":"roleid2","serverid":"2"}
{"appkey":"appkey1","servertime":"2020-09-30 14:30:36","consumenum":73.8,"roleid":"roleid1","serverid":"1"}
2.4 查看用戶充值金額實時統(tǒng)計效果
三、最佳實踐-電商實時PV和UV統(tǒng)計-Confluent+實時計算Flink+RDS
3.1 新建Confluent消息隊列
在confluent集群列表頁,登錄control center
在左側(cè)選中Topics,點擊Add a topic按鈕,創(chuàng)建一個名為pv-uv的topic,將partition設置為3
3.2 創(chuàng)建云數(shù)據(jù)庫RDS結(jié)果表
登錄 RDS 管理控制臺頁面,購買RDS。確保RDS與Flink全托管集群在相同region,相同VPC下
添加虛擬交換機網(wǎng)段(vswitch IP段)進入RDS白名單,詳情參考:設置白名單文檔
3.【vswitch IP段】可在 flink的工作空間詳情中查詢
在【賬號管理】頁面創(chuàng)建賬號【高權限賬號】
數(shù)據(jù)庫實例下【數(shù)據(jù)庫管理】新建數(shù)據(jù)庫【conflufent_vvp】
使用系統(tǒng)自帶的DMS服務登陸RDS,登錄名和密碼輸入上面創(chuàng)建的高權限賬戶
雙擊【confluent_vvp】數(shù)據(jù)庫,打開SQLConsole,將以下建表語句復制粘貼到 SQLConsole中,創(chuàng)建結(jié)果表
CREATE TABLE result_cps_total_summary_pvuv_min(summary_date date NOT NULL COMMENT '統(tǒng)計日期',summary_min varchar(255) COMMENT '統(tǒng)計分鐘',pv bigint COMMENT 'pv',uv bigint COMMENT 'uv',currenttime timestamp COMMENT '當前時間',primary key(summary_date,summary_min)
)
3.3 創(chuàng)建實時計算VVP作業(yè)
1.【[VVP控制臺】新建文件
在SQL區(qū)域輸入以下代碼:
--數(shù)據(jù)的訂單源表
CREATE TABLE source_ods_fact_log_track_action (account_id VARCHAR,--用戶IDclient_ip VARCHAR,--客戶端IPclient_info VARCHAR,--設備機型信息platform VARCHAR,--系統(tǒng)版本信息imei VARCHAR,--設備唯一標識`version` VARCHAR,--版本號`action` VARCHAR,--頁面跳轉(zhuǎn)描述gpm VARCHAR,--埋點鏈路c_time VARCHAR,--請求時間target_type VARCHAR,--目標類型target_id VARCHAR,--目標IDudata VARCHAR,--擴展信息,JSON格式session_id VARCHAR,--會話IDproduct_id_chain VARCHAR,--商品ID串cart_product_id_chain VARCHAR,--加購商品IDtag VARCHAR,--特殊標記`position` VARCHAR,--位置信息network VARCHAR,--網(wǎng)絡使用情況p_dt VARCHAR,--時間分區(qū)天p_platform VARCHAR --系統(tǒng)版本信息
) WITH ('connector' = 'kafka','topic' = 'game_consume_log','properties.bootstrap.servers' = 'kafka.confluent.svc.cluster.local.c79f69095bc5d4d98b01136fe43e31b93:9071','properties.group.id' = 'gamegroup','format' = 'json','properties.ssl.truststore.location' = '/flink/usrlib/truststore.jks','properties.ssl.truststore.password' = '【your password】','properties.security.protocol'='SASL_SSL','properties.sasl.mechanism'='PLAIN','properties.sasl.jaas.config'='org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="【your user name】" password="【your password】";'
);
--{"account_id":"id1","client_ip":"172.11.1.1","client_info":"mi10","p_dt":"2021-12-01","c_time":"2021-12-01 19:10:00"}
CREATE TABLE result_cps_total_summary_pvuv_min (summary_date date,--統(tǒng)計日期summary_min varchar,--統(tǒng)計分鐘pv bigint,--點擊量uv bigint,--一天內(nèi)同個訪客多次訪問僅計算一個UVcurrenttime timestamp,--當前時間primary key (summary_date, summary_min)
) WITH (type = 'rds',url = 'url = 'jdbc:mysql://rm-【your rds clusterId】.mysql.rds.aliyuncs.com:3306/confluent_vvp',',tableName = 'result_cps_total_summary_pvuv_min',userName = 'flink_confluent_vip',password = '【your rds password】'
);
CREATE VIEW result_cps_total_summary_pvuv_min_01 AS
selectcast (p_dt as date) as summary_date --時間分區(qū), count (client_ip) as pv --客戶端的IP, count (distinct client_ip) as uv --客戶端去重, cast (max (c_time) as TIMESTAMP) as c_time --請求的時間
fromsource_ods_fact_log_track_action
groupby p_dt;
INSERTinto result_cps_total_summary_pvuv_min
selecta.summary_date,--時間分區(qū)cast (DATE_FORMAT (c_time, 'HH:mm') as varchar) as summary_min,--取出小時分鐘級別的時間a.pv,a.uv,CURRENT_TIMESTAMP as currenttime --當前時間
fromresult_cps_total_summary_pvuv_min_01 AS a;
點擊【上線】之后,在作業(yè)運維頁面點擊啟動按鈕,直到狀態(tài)更新為RUNNING狀態(tài)。
在control center的【Topics->Messages】頁面,逐條發(fā)送測試消息,格式為:
{"account_id":"id1","client_ip":"72.11.1.111","client_info":"mi10","p_dt":"2021-12-01","c_time":"2021-12-01 19:11:00"}
{"account_id":"id2","client_ip":"72.11.1.112","client_info":"mi10","p_dt":"2021-12-01","c_time":"2021-12-01 19:12:00"}
{"account_id":"id3","client_ip":"72.11.1.113","client_info":"mi10","p_dt":"2021-12-01","c_time":"2021-12-01 19:13:00"}
3.4 查看PV和UV效果
? ? 可以看出rds數(shù)據(jù)表的pv和uv會隨著發(fā)送的消息數(shù)據(jù),動態(tài)的變化,同時還可以通過【數(shù)據(jù)可視化】來查看相應的圖表信息。
pv圖表展示:
uv圖表展示:
原文鏈接
本文為阿里云原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
以上是生活随笔為你收集整理的基于Confluent+Flink的实时数据分析最佳实践的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。