日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 运维知识 > 数据库 >内容正文

数据库

使用Blink SQL+UDAF实现差值聚合计算

發(fā)布時(shí)間:2024/8/23 数据库 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用Blink SQL+UDAF实现差值聚合计算 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

本案例根據(jù)某電網(wǎng)公司的真實(shí)業(yè)務(wù)需求,通過(guò)Blink SQL+UDAF實(shí)現(xiàn)實(shí)時(shí)流上的差值聚合計(jì)算,通過(guò)本案例,讓讀者熟悉UDAF編寫,并理解UDAF中的方法調(diào)用關(guān)系和順序。
感謝@軍長(zhǎng)在實(shí)現(xiàn)過(guò)程中的指導(dǎo)。筆者水平有限,若有紕漏,請(qǐng)批評(píng)指出。

一、客戶需求

電網(wǎng)公司每天采集各個(gè)用戶的電表數(shù)據(jù)(格式如下表),其中data_date為電表數(shù)據(jù)上報(bào)時(shí)間,cons_id為電表id,r1為電表度數(shù),其他字段與計(jì)算邏輯無(wú)關(guān),可忽略。為了后續(xù)演示方便,僅輸入cons_id=100000002的數(shù)據(jù)。

no(string)data_date(string)cons_id(string)org_no(string)r1(double)
101201907161000000023540113.76
101201907171000000023540114.12
101201907181000000023540116.59
101201907191000000023540118.89

表1:輸入數(shù)據(jù)
電網(wǎng)公司希望通過(guò)實(shí)時(shí)計(jì)算(Blink)對(duì)電表數(shù)據(jù)處理后,每天得到每個(gè)電表最近兩天(當(dāng)天和前一天)的差值數(shù)據(jù),結(jié)果類似如下表:

cons_id(string)data_date(string)subDegreeR1(double)
100000002201907170.36
100000002201907182.47
100000002201907192.3

表2:期望的輸出數(shù)據(jù)

二、需求分析

根據(jù)客戶的需求,比較容易得到兩種解決方案:1、通過(guò)over窗口(2 rows over window)開(kāi)窗進(jìn)行差值聚合;2、通過(guò)hop窗口(sliding=1天,size=2天)進(jìn)行差值聚合。
over窗口和hop窗口均是Blink支持的標(biāo)準(zhǔn)窗口,使用起來(lái)非常簡(jiǎn)單。本需求的最大難點(diǎn)在于差值聚合,Blink支持SUM、MAX、MIN、AVG等內(nèi)置的聚合函數(shù),但沒(méi)有滿足業(yè)務(wù)需求的差值聚合函數(shù),因此需要通過(guò)自定義聚合函數(shù)(UDAF)來(lái)實(shí)現(xiàn)。

三、UDAF開(kāi)發(fā)

實(shí)時(shí)計(jì)算自定義函數(shù)開(kāi)發(fā)搭建環(huán)境請(qǐng)參考UDX概述,在此不再贅述。本案例使用Blink2.2.7版本,下面簡(jiǎn)要描述關(guān)鍵代碼的編寫。
完整代碼(為了方便上傳,使用了txt格式):SubtractionUdaf.txt
1、在com.alibaba.blink.sql.udx.SubtractionUdaf包中創(chuàng)建一個(gè)繼承AggregateFunction類的SubtractionUdaf類。

public class SubtractionUdaf extends AggregateFunction<Double, SubtractionUdaf.Accum>

其中Double是UDAF輸出的類型,在本案例中為相鄰兩天的電表差值度數(shù)。SubtractionUdaf.Accum是內(nèi)部自定義的accumulator數(shù)據(jù)結(jié)構(gòu)。
2、定義accumulator數(shù)據(jù)結(jié)構(gòu),用戶保存UDAF的狀態(tài)。

public static class Accum {private long currentTime;//最新度數(shù)的上報(bào)時(shí)間private double oldDegree;//前一次度數(shù)private double newDegree;//當(dāng)前最新度數(shù)private long num; //accumulator中已經(jīng)計(jì)算的record數(shù)量,主要用于mergeprivate List<Tuple2<Double, Long>> listInput;//緩存所有的輸入,主要用于retract}

3、實(shí)現(xiàn)createAccumulator方法,初始化UDAF的accumulator

//初始化udaf的accumulatorpublic SubtractionUdaf.Accum createAccumulator() {SubtractionUdaf.Accum acc = new SubtractionUdaf.Accum();acc.currentTime = 0;acc.oldDegree = 0.0;acc.newDegree = 0.0;acc.num = 0;acc.listInput = new ArrayList<Tuple2<Double, Long>>();return acc;}

4、實(shí)現(xiàn)getValue方法,用于通過(guò)存放狀態(tài)的accumulator計(jì)算UDAF的結(jié)果,本案例需求是計(jì)算新舊數(shù)據(jù)兩者的差值。

public Double getValue(SubtractionUdaf.Accum accumulator) {return accumulator.newDegree - accumulator.oldDegree;}

5、實(shí)現(xiàn)accumulate方法,用于根據(jù)輸入數(shù)據(jù)更新UDAF存放狀態(tài)的accumulator。考慮到數(shù)據(jù)可能亂序以及可能的retract,數(shù)據(jù)數(shù)據(jù)包括了對(duì)應(yīng)的度數(shù)iValue,還包括上報(bào)度數(shù)的時(shí)間(構(gòu)造的事件時(shí)間ts)。

public void accumulate(SubtractionUdaf.Accum accumulator, double iValue, long ts) {System.out.println("method : accumulate" );accumulator.listInput.add(Tuple2.of(Double.valueOf(iValue),Long.valueOf(ts)));Collections.sort(accumulator.listInput,this.comparator);//按照時(shí)間排序accumulator.num ++;if(accumulator.listInput.size() == 1){accumulator.newDegree = iValue;accumulator.oldDegree = 0.0;accumulator.currentTime = ts;}else {//處理可能存在的數(shù)據(jù)亂序問(wèn)題accumulator.newDegree = accumulator.listInput.get(0).f0;accumulator.currentTime = accumulator.listInput.get(0).f1;accumulator.oldDegree = accumulator.listInput.get(1).f0;}}

其中accumulator為UDAF的狀態(tài),iValue和ts為實(shí)際的輸入數(shù)據(jù)。
注意需要處理可能存在的輸入數(shù)據(jù)亂序問(wèn)題。
6、實(shí)現(xiàn)retract方法,用于在某些優(yōu)化場(chǎng)景下(如使用over窗口)對(duì)retract的數(shù)據(jù)進(jìn)行處理。

public void retract(SubtractionUdaf.Accum accumulator, double iValue, long ts) throws Exception{if(accumulator.listInput.contains(Tuple2.of(iValue, ts))){if(accumulator.listInput.indexOf(Tuple2.of(iValue, ts)) == 0){//retract的是最新值accumulator.listInput.remove(0);accumulator.num--;if(accumulator.listInput.isEmpty()){accumulator.currentTime = 0;accumulator.oldDegree = 0.0;accumulator.newDegree = 0.0;}else if(accumulator.listInput.size() == 1) {accumulator.currentTime = accumulator.listInput.get(0).f1;accumulator.newDegree = accumulator.listInput.get(0).f0;accumulator.oldDegree = 0.0;}else{accumulator.currentTime = accumulator.listInput.get(0).f1;accumulator.newDegree = accumulator.listInput.get(0).f0;accumulator.oldDegree = accumulator.listInput.get(1).f0;}} else if(accumulator.listInput.indexOf(Tuple2.of(iValue, ts)) == 1){//retract的是次新值accumulator.listInput.remove(1);accumulator.num--;if(accumulator.listInput.size() == 1){accumulator.oldDegree = 0.0;}else {accumulator.oldDegree = accumulator.listInput.get(1).f0;}}else {//retract的是其他值accumulator.listInput.remove(Tuple2.of(iValue, ts));accumulator.num--;}}else {throw new Exception("Cannot retract a unexist record : iValue = "+ iValue + "timestamp = "+ ts);}}

需要考慮retract的是最新的數(shù)據(jù)還是次新的數(shù)據(jù),需要不同的邏輯處理。
7、實(shí)現(xiàn)merge方法,用于某些優(yōu)化場(chǎng)景(如使用hop窗口)。

public void merge(SubtractionUdaf.Accum accumulator, Iterable<SubtractionUdaf.Accum> its) {int i = 0;System.out.println("method : merge" );System.out.println("accumulator : "+ accumulator.newDegree);System.out.println("accumulator : "+ accumulator.currentTime);for (SubtractionUdaf.Accum entry : its) {if(accumulator.currentTime < entry.currentTime){if(entry.num > 1){accumulator.currentTime = entry.currentTime;accumulator.oldDegree = entry.oldDegree;accumulator.newDegree = entry.newDegree;accumulator.num += entry.num;accumulator.listInput.addAll(entry.listInput);}else if(entry.num == 1){accumulator.currentTime = entry.currentTime;accumulator.oldDegree = accumulator.newDegree;accumulator.newDegree = entry.newDegree;accumulator.num ++;accumulator.listInput.addAll(entry.listInput);}}else{if(accumulator.num > 1){accumulator.num += entry.num;accumulator.listInput.addAll(entry.listInput);}else if(accumulator.num == 1){accumulator.oldDegree = entry.newDegree;accumulator.num += entry.num;accumulator.listInput.addAll(entry.listInput);}else if(accumulator.num == 0){accumulator.currentTime = entry.currentTime;accumulator.oldDegree = entry.oldDegree;accumulator.newDegree = entry.newDegree;accumulator.num = entry.num;accumulator.listInput.addAll(entry.listInput);}}Collections.sort(accumulator.listInput,this.comparator);System.out.println("merge : "+i);System.out.println("newDegree : "+entry.newDegree);System.out.println("oldDegree = "+entry.oldDegree);System.out.println("currentTime : "+entry.currentTime);}}

需要考慮merge的是否是比當(dāng)前新的數(shù)據(jù),需要不同的處理邏輯。
8、其他方面,考慮到需要對(duì)輸入度數(shù)按照事件時(shí)間排序,在open方法中實(shí)例化了自定義的Comparator類,對(duì)accumulator數(shù)據(jù)結(jié)構(gòu)中的inputList按事件時(shí)間的降序排序。

public void open(FunctionContext context) throws Exception {//定義record的先后順序,用于listInput的排序,時(shí)間越新的record在list中越前面this.comparator = new Comparator<Tuple2<Double, Long>>() {public int compare( Tuple2<Double, Long> o1, Tuple2<Double, Long> o2) {if (Long.valueOf(o1.f1) < Long.valueOf(o2.f1)) {return 1;} else if (Long.valueOf(o1.f1) > Long.valueOf(o2.f1)) {return -1;}else {return 0;}}};}

請(qǐng)參考[使用IntelliJ IDEA開(kāi)發(fā)自定義函數(shù)]()完成UDAF編譯、打包,并參考UDX概述完成資源的上傳和引用。

四、SQL開(kāi)發(fā)及測(cè)試結(jié)果

(一)over窗口

SQL代碼如下,語(yǔ)法檢查、上線、啟動(dòng)作業(yè)(選擇當(dāng)前啟動(dòng)位點(diǎn))。并將表1數(shù)據(jù)上傳至datahub。

CREATE FUNCTION OverWindowSubtractionUdaf as 'com.alibaba.blink.sql.udx.SubtractionUdaf';CREATE TABLE input_dh_e_mp_read_curve (`no` VARCHAR,data_date VARCHAR,cons_id VARCHAR,org_no VARCHAR,r1 DOUBLE,ts as TO_TIMESTAMP(concat(data_date,'000000'),'yyyyMMddHHmmss'),WATERMARK wk FOR ts as withOffset(ts, 2000) ) WITH (type = 'datahub',endPoint = 'http://dh-cn-shanghai.aliyun-inc.com',roleArn='acs:ram::XXX:role/aliyunstreamdefaultrole',project = 'jszc_datahub',topic = 'input_dh_e_mp_read_curve' ); CREATE TABLE data_out(cons_id varchar,data_date varchar,subDegreeR1 DOUBLE )with(type = 'print' );INSERT into data_out SELECTcons_id,last_value(data_date) OVER (PARTITION BY cons_id ORDER BY ts ROWS BETWEEN 1 preceding AND CURRENT ROW) as data_date,OverWindowSubtractionUdaf(r1,unix_timestamp(ts)) OVER (PARTITION BY cons_id ORDER BY ts ROWS BETWEEN 1 preceding AND CURRENT ROW) as data_date FROM input_dh_e_mp_read_curve

由于使用了print connector,從對(duì)應(yīng)的sink的taskmanager.out日志中可以查看到輸出如下(已忽略其他debug日志):

task-1> (+)100000002,20190716,13.76 task-1> (+)100000002,20190717,0.35999999999999943 task-1> (+)100000002,20190718,2.4700000000000006

對(duì)比期望輸出(表2),20190717和20190718兩個(gè)窗口的數(shù)據(jù)均正確,表明業(yè)務(wù)邏輯正確,但此輸出與期望輸出有少許差異:
(1)20190716輸出為13.76,這是因?yàn)榈谝粋€(gè)over窗口只有一條數(shù)據(jù)導(dǎo)致的,這種數(shù)據(jù)可以在業(yè)務(wù)層過(guò)濾掉;
(2)20190719的數(shù)據(jù)沒(méi)有輸出,這是因?yàn)槲覀冊(cè)O(shè)置了watermark,測(cè)試環(huán)境下20190719之后沒(méi)有數(shù)據(jù)進(jìn)來(lái)觸發(fā)20190719對(duì)應(yīng)的窗口的結(jié)束。

(二)hop窗口

SQL代碼如下:語(yǔ)法檢查、上線、啟動(dòng)作業(yè)(選擇當(dāng)前啟動(dòng)位點(diǎn))。并將表1數(shù)據(jù)上傳至datahub。

CREATE FUNCTION HopWindowSubtractionUdaf as 'com.alibaba.blink.sql.udx.SubtractionUdaf';CREATE TABLE input_dh_e_mp_read_curve (`no` VARCHAR,data_date VARCHAR,cons_id VARCHAR,org_no VARCHAR,r1 DOUBLE,ts as TO_TIMESTAMP(concat(data_date,'000000'),'yyyyMMddHHmmss'),WATERMARK wk FOR ts as withOffset(ts, 2000) ) WITH (type = 'datahub',endPoint = 'http://dh-cn-shanghai.aliyun-inc.com',roleArn='acs:ram::XXX:role/aliyunstreamdefaultrole',project = 'jszc_datahub',topic = 'input_dh_e_mp_read_curve' ); CREATE TABLE data_out(cons_id varchar,data_date varchar,subDegreeR1 DOUBLE )with(type = 'print' ); INSERT into data_out SELECTcons_id,DATE_FORMAT(HOP_end(ts, INTERVAL '1' day,INTERVAL '2' day), 'yyyyMMdd'),HopWindowSubtractionUdaf(r1,unix_timestamp(ts)) FROM input_dh_e_mp_read_curve group by hop(ts, INTERVAL '1' day,INTERVAL '2' day),cons_id;

由于使用了print connector,從對(duì)應(yīng)的sink的taskmanager.out日志中可以查看到輸出如下(已忽略其他debug日志):

task-1> (+)100000002,20190716,13.76 task-1> (+)100000002,20190717,0.35999999999999943 task-1> (+)100000002,20190718,2.4700000000000006

對(duì)比期望輸出(表2),20190717和20190718兩個(gè)窗口的數(shù)據(jù)均正確,表明業(yè)務(wù)邏輯正確,但此輸出與期望輸出有少許差異:
(1)20190716輸出為13.76,這是因?yàn)榈谝粋€(gè)hop窗口只有一條數(shù)據(jù)導(dǎo)致的,這種數(shù)據(jù)可以在業(yè)務(wù)層過(guò)濾掉;
(2)20190719的數(shù)據(jù)沒(méi)有輸出,這是因?yàn)槲覀冊(cè)O(shè)置了watermark,測(cè)試環(huán)境下20190719之后沒(méi)有數(shù)據(jù)進(jìn)來(lái)觸發(fā)20190719對(duì)應(yīng)的窗口的結(jié)束。

五、幾點(diǎn)思考

1、關(guān)于UDAF內(nèi)部方法的調(diào)用關(guān)系和順序

UDAF中主要有createAccumulator、getValue、accumulate、retract和merge方法,其調(diào)用關(guān)系和順序并不是完全確定,而是與Blink底層優(yōu)化、Blink版本、開(kāi)窗類型(如hop還是over窗口)等相關(guān)。
比較確定的是一次正常(沒(méi)有failover)的作業(yè),createAccumulator方法只在作業(yè)啟動(dòng)時(shí)調(diào)用一次,accumulate方法在每條數(shù)據(jù)輸入時(shí)調(diào)用一次,在觸發(fā)數(shù)據(jù)輸出時(shí)會(huì)調(diào)用一次getValue(并不代表只調(diào)用一次)。
而retract方法和merge方法則跟具體的優(yōu)化方式或開(kāi)窗類型有關(guān),本案例中over窗口調(diào)用retract方法而不調(diào)用merge方法,hop窗口調(diào)用merge方法而不調(diào)用retract方法。
大家可以增加日志,觀察這幾個(gè)方法的調(diào)用順序,還是蠻有意思的。

2、如何知道需要實(shí)現(xiàn)UDAF中的哪些方法

UDAF中必須實(shí)現(xiàn)createAccumulator、getValue、accumulate方法,可選擇實(shí)現(xiàn)retract和merge方法。
一般情況下,可先實(shí)現(xiàn)createAccumulator、getValue、accumulate三個(gè)方法,然后編寫SQL后進(jìn)行語(yǔ)法檢查,SQL編譯器會(huì)提示是否需要retract或merge方法。
比如,如果沒(méi)有實(shí)現(xiàn)retract方法,在使用over窗口時(shí),語(yǔ)法檢查會(huì)報(bào)類似如下錯(cuò)誤:

org.apache.flink.table.api.ValidationException: Function class 'com.alibaba.blink.sql.udx.SubtractionUdaf' does not implement at least one method named 'retract' which is public, not abstract and (in case of table functions) not static.

比如,如果沒(méi)有實(shí)現(xiàn)merge方法,在使用over窗口時(shí),語(yǔ)法檢查會(huì)報(bào)類似如下錯(cuò)誤:

org.apache.flink.table.api.ValidationException: Function class 'com.alibaba.blink.sql.udx.SubtractionUdaf' does not implement at least one method named 'merge' which is public, not abstract and (in case of table functions) not static.

3、本案例存在優(yōu)化空間的地方

(1)本案例沒(méi)有考慮數(shù)據(jù)缺失的問(wèn)題,比如因?yàn)槟撤N原因(網(wǎng)絡(luò)問(wèn)題、數(shù)據(jù)采集問(wèn)題等)缺少20190717的數(shù)據(jù)。這種情況下會(huì)是什么樣的結(jié)果?大家可以自行測(cè)試下;
(2)本案例使用了一個(gè)List,然后通過(guò)Collections.sort方法進(jìn)行排序,這不是很優(yōu)的方法,如果用優(yōu)先級(jí)隊(duì)列(priority?queue)性能應(yīng)該會(huì)更好;

原文鏈接
本文為云棲社區(qū)原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。

總結(jié)

以上是生活随笔為你收集整理的使用Blink SQL+UDAF实现差值聚合计算的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。