日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

利用blink CEP实现流计算中的超时统计问题

發布時間:2024/8/23 编程问答 43 豆豆
生活随笔 收集整理的這篇文章主要介紹了 利用blink CEP实现流计算中的超时统计问题 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

案例與解決方案匯總頁:
阿里云實時計算產品案例&解決方案匯總

一. 背景介紹

如<利用blink+MQ實現流計算中的延時統計問題>一文中所描述的場景,我們將其簡化為以下案例:
實時流的數據源結構如下:

物流訂單號支付時間倉接單時間倉出庫時間
LP12018-08-01 08:00??
LP12018-08-01 08:002018-08-01 09:00?
LP22018-08-01 09:10??
LP22018-08-01 09:102018-08-01 09:50?
LP22018-08-01 09:102018-08-01 09:50?2018-08-01 12:00

我們期望通過以上數據源,按照支付日期統計,每個倉庫的倉接單量、倉出庫量、倉接單超2H未出庫單量、倉接單超6H未出庫單量。可以看出,其中LP1倉接單時間是2018-08-01 09:00,但一直到2018-08-01 12:00點之前,一直都沒有出庫,LP1滿足倉接單超2H未出庫的行為。

該場景的難點就在于:訂單未出庫。而對于TT中的源消息流,訂單未出庫,TT就不會下發新的消息,不下發新的消息,blink就無法被觸發計算。而針對上述的場景,對于LP1,我們需要在倉接單時間是2018-08-01 09:00+2H,也就是2018-08-01 11:00的之后,就要知道LP1已經倉接單但超2H未出庫了。

二. 解決方案

本文主要是利用blink CEP來實現上述場景,具體實現步驟如下所述。
第一步:在source DDL中定義event_timestamp,并定義sink,如下:

----定義source create table sourcett_dwd_ri (lg_order_code varchar comment '物流訂單號',ded_pay_time varchar comment '支付時間',store_code varchar comment '倉庫編碼',store_name varchar comment '倉庫名稱',wms_create_time varchar comment '倉接單時間',wms_consign_create_time varchar comment '倉出庫時間',evtstamp as case when coalesce(wms_create_time, '') <> ''then to_timestamp(wms_create_time, 'yyyy-MM-dd HH:mm:ss')else to_timestamp('1970-01-01 00:00:00', 'yyyy-MM-dd HH:mm:ss')end --構造event_timestamp,如果源表本身帶有消息的occur_time,可直接選擇occur_time作為event_timestamp,WATERMARK FOR evtstamp AS withOffset(evtstamp, 10000) --設置延遲10秒處理 ) with (type='tt',topic='dwd_ri',accessKey='xxxxxx',accessId='xxxxxx',lengthCheck='PAD',nullValues='\\N|' );----定義sink create table sink_hybrid_blink_cep (ded_pay_date varchar comment '支付日期',store_code varchar comment '倉庫編碼',store_name varchar comment '倉庫名稱',wms_create_ord_cnt bigint comment '倉接單量',wms_confirm_ord_cnt bigint comment '倉出庫量',wmsin_nowmsout_2h_ord_cnt bigint comment '倉接單超2小時未出庫單量',wmsin_nowmsout_6h_ord_cnt bigint comment '倉接單超6小時未出庫單量' ,sub_partition bigint comment '二級分區(支付日期)',PRIMARY KEY (ded_pay_date, store_code, sub_partition) ) with (type='PetaData',url = 'xxxxxx',tableName='blink_cep',userName='xxxxxx',password='xxxxxx',bufferSize='30000',batchSize='3000',batchWriteTimeoutMs='15000' );

第二步:根據blink CEP的標準語義進行改寫,如下:

create view blink_cep_v1 as select '倉接單-倉出庫超時' as timeout_type,lg_order_code,wms_create_time as start_time,wms_consign_create_time as end_time from source_dwd_csn_whc_lgt_fl_ord_ri MATCH_RECOGNIZE (PARTITION BY lg_order_codeORDER BY evtstampMEASURESe1.wms_create_time as wms_create_time,e2.wms_consign_create_time as wms_consign_create_timeONE ROW PER MATCH WITH TIMEOUT ROWS --重要,必須設置延遲也下發AFTER MATCH SKIP TO NEXT ROWPATTERN (e1 -> e2) WITHIN INTERVAL '6' HOUREMIT TIMEOUT (INTERVAL '2' HOUR, INTERVAL '6' HOUR)DEFINEe1 as e1.wms_create_time is not null and e1.wms_consign_create_time is null,e2 as e2.wms_create_time is not null and e2.wms_consign_create_time is not null ) where wms_create_time is not null --重要,可以大大減少進入CEP的消息量 and wms_consign_create_time is null --重要,可以大大減少進入CEP的消息量 ;

第三步:根據blink的執行機制,我們通過源實時流sourcett_dwd_ri與超時消息流blink_cep_v1關聯,來觸發blink對超時消息進行聚合操作,如下:

create view blink_cep_v2 as select a.lg_order_code as lg_order_code,last_value(a.store_code ) as store_code,last_value(a.store_name ) as store_name,last_value(a.ded_pay_time ) as ded_pay_time,last_value(a.wms_create_time ) as wms_create_time,last_value(a.real_wms_confirm_time ) as real_wms_confirm_time,last_value(case when coalesce(a.wms_create_time, '') <> ''and coalesce(a.real_wms_confirm_time, '') = '' and now() - unix_timestamp(a.wms_create_time,'yyyy-MM-dd HH:mm:ss') >= 7200then 'Y' else 'N' end) as flag_01,last_value(case when coalesce(a.wms_create_time, '') <> ''and coalesce(a.real_wms_confirm_time, '') = '' and now() - unix_timestamp(a.wms_create_time,'yyyy-MM-dd HH:mm:ss') >= 21600then 'Y' else 'N' end) as flag_02 from(select lg_order_code as lg_order_code,last_value(store_code ) as store_code,last_value(store_name ) as store_name,last_value(ded_pay_time ) as ded_pay_time,last_value(wms_create_time ) as wms_create_time,last_value(wms_consign_create_time) as real_wms_confirm_timefrom sourcett_dwd_rigroup by lg_order_code) a left outer join(select lg_order_code,count(*) as cntfrom blink_cep_v1group by lg_order_code) b on a.lg_order_code = b.lg_order_code group by a.lg_order_code ;insert into sink_hybrid_blink_cep select regexp_replace(substring(a.ded_pay_time, 1, 10), '-', '') as ded_pay_date,a.store_code,max(a.store_name) as store_name,count(case when coalesce(a.wms_create_time, '') <> '' then a.lg_order_code end) as wmsin_ord_cnt,count(case when coalesce(a.real_wms_confirm_time, '') <> '' then a.lg_order_code end) as wmsout_ord_cnt,count(case when a.flag_01 = 'Y' then a.lg_order_code end) as wmsin_nowmsout_2h_ord_cnt,count(case when a.flag_02 = 'Y' then a.lg_order_code end) as wmsin_nowmsout_6h_ord_cnt,cast(regexp_replace(SUBSTRING(ded_pay_time, 1, 10), '-', '') as bigint) as sub_partition from blink_cep_v2 as t1 where coalesce(lg_cancel_time, '') = '' and coalesce(ded_pay_time, '') <> '' group by regexp_replace(substring(ded_pay_time, 1, 10), '-', ''),a.store_code ;

三. 問題拓展

  • blink CEP的參數比較多,要完全看懂,著實需要一些時間,但CEP的強大是毋庸置疑的。CEP不僅可以解決物流場景中的超時統計問題,風控中的很多場景也是信手拈來。這里有一個風控中的場景,通過上述物流案例的用法,我們是否能推敲出這個場景的用法呢?
    風控案例測試數據如下:
  • 刷卡時間銀行卡ID刷卡地點
    2018-04-13 12:00:001WW
    2018-04-13 12:05:001WW1
    2018-04-13 12:10:001WW2
    2018-04-13 12:20:001WW

    我們認為,當一張銀行卡在10min之內,在不同的地點被刷卡大于等于兩次,我們就期望對消費者出發預警機制。

  • blink CEP是萬能的么?答案是否定的,當消息亂序程度比較高的時候,實時性和準確性就成了一對矛盾的存在。要想實時性比較高,必然要求設置的offset越小越好,但offset設置比較小,就直接可能導致很多eventtime<watermark-offset的消息,直接被丟棄,準確性很難保證。比如,在CP回傳物流詳情的時候,經常回傳的時間跟實操的時間差異很大(實操時間是10點,但回傳時間是15點),如果以實操時間作為eventtime,可能就會導致這種差異很大的消息被直接丟掉,無法進入CEP,進而無法觸發CEP后續的計算,在使用CEP的過程中,應該注意這一點。
  • 四. 作者簡介

    花名:緣橋,來自菜鳥-CTO-數據部-倉配數據研發,主要負責菜鳥倉配業務的離線和實時數據倉庫建設以及創新數據技術和工具的探索和應用。

    ?

    原文鏈接
    本文為云棲社區原創內容,未經允許不得轉載。

    總結

    以上是生活随笔為你收集整理的利用blink CEP实现流计算中的超时统计问题的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。