案例與解決方案匯總頁: 阿里云實時計算產品案例&解決方案匯總
一. 背景介紹
菜鳥的物流數據本身就有鏈路復雜、實操節點多、匯總維度多、考核邏輯復雜的特點,對于實時數據的計算存在很大挑戰。經過倉配ETL團隊的努力,目前倉配實時數據已覆蓋了絕大多數場景,但是有這樣一類特殊指標:“晚點超時指標”(例如:出庫超6小時未攬收的訂單量),仍存在實時匯總計算困難。原因在于:流計算是基于消息觸發計算的,若沒有消息到達到則無法計算,這類指標恰好是要求在指定的超時時間計算出有多少未達到的消息。然而,這類指標對于指導實操有著重要意義,可以告知運營小二當前多少訂單積壓在哪些作業節點,應該督促哪些實操人員加快作業,這對于物流的時效KPI達成至關重要。
之前的方案是:由產品前端根據用戶的請求查詢OLAP數據庫,由OLAP從明細表出結果。大促期間,用戶請求量大,加之數據量大,故對OLAP的明細查詢造成了比較大的壓力。
二. 解決方案
2.1 ? 問題定義
“超時晚點指標” 是指,一筆訂單的兩個相鄰的實操節點node_n-1 、node_n 的完成時間 time_n-1、time_n, 當滿足 : time_n is null ?&& current_time - time_n-1 > kpi_length 時,time_flag_n 為 true , 該筆訂單計入 超時晚點指標的計數。 如下圖,有一筆訂單其 node_1 為出庫節點,時間為time_1 = '2018-06-18 00:00:00' ,運營對出庫與攬收之間考核的時長 kpi_length = 6h, 那么當前自然時間 current_time > '2018-06-18 06:00:00' 時,且node_2攬收節點的time_2 為null,則該筆訂單的 timeout_flag_2 = true , “出庫超6小時未攬收訂單量” 加1。由于要求time_2 為null,即要求沒有攬收消息下發的情況下讓流計算做匯總值更新,這違背了流計算基于消息觸發的基本原理,故流計算無法直接算出這種“超時晚點指標”。
決問題的基本思路是:在考核時刻(即 kpi_time = time_n-1+kpi_length )“制造”出一條消息下發給流計算,觸發匯總計算。繼續上面的例子:在考核時刻“2018-06-18 06:00:00”利用MetaQ定時消息功能“制造”出一條消息下發給流計算匯總任務,觸發對該筆訂單的 time_out_flag_2 的判斷,增加匯總計數。同時,還利用 Blink 的Retraction 機制,當time_2 由null變成有值的時候,Blink 可以對 time_out_flag_2 更新,重新計數。
2.2 方案架構
如上圖所示: Step1: ?Blink job1 接收來自上游系統的訂單數據,做清洗加工,生成訂單明細表:dwd_ord_ri,利用TT下發給Blink job2 和 Blink job3。 Step2:Blink job2 收到 dwd_ord_ri后,對每筆訂單算出考核時刻 kpi_time = time_n-1+kpi_length,作為MetaQ消息的“TIMER_DELIVER_MS” 屬性,寫入MetaQ。MetaQ的定時消息功能,可以根據用戶寫入的TIMER_DELIVER_MS 在指定時刻下發給消費者,即上圖中的Blink job3。 Step3:Blink job3 接收 TT、MetaQ 兩個消息源,先做Join,再對time_flag判斷,最后做Aggregate計算。同一筆訂單,dwd_ord_ri、timing_msg任意一個消息到來,都會觸發join,time_flag判斷,aggregate重新計算一遍,Blink的Retraction可對結果進行實時更新。
2.3 實現細節
本方案根據物流場景中多種實操節點、多種考核時長的特點,從Blink SQL代碼 和 自定義Sink兩方面做了特殊設計,從而實現了靈活配置、高效開發。 (1) Blink job2 --- 生成定時消息 關鍵Blink SQL 代碼如下。約定每條record的第一個字段為投遞時間列表,即MetaQ向消費者下發消息的時刻List,也就是上面所說的多個考核時刻。第二個字段為保序字段,比如在物流場景中經常以訂單code、運單號作為保序主鍵。該代碼實現了對每個出庫的物流訂單,根據其出庫時間,向后延遲6小時(21600000毫秒)、12小時(43200000毫秒)、24小時(86400000毫秒)由MetaQ向消費者下發三個定時消息。
create table metaq_timing_msg
(
deliver_time_list varchar comment '投遞時間列表', -- 約定第一個字段為投遞時間list
lg_code varchar comment '物流訂單code', -- 約定第二字段為保序主鍵
node_name varchar comment '節點名稱',
node_time varchar comment '節點時間',
)
WITH
(
type = 'custom',
class = 'com.alibaba.xxx.xxx.udf.MetaQTimingMsgSink',
tag = 'store',
topic = 'blink_metaq_delay_msg_test',
producergroup = 'blinktest',
retrytimes = '5',
sleeptime = '1000'
);
insert into metaq_timing_msg
select
concat_ws(',',
cast( (UNIX_TIMESTAMP(store_out_time)*1000 + 21600000) as varchar), --6小時
cast( (UNIX_TIMESTAMP(store_out_time)*1000 + 43200000) as varchar), --12小時
cast( (UNIX_TIMESTAMP(store_out_time)*1000 + 86400000) as varchar) --24小時
) as deliver_time_list,
lg_code,
'wms' as node_name,
store_out_time as node_time
from
(
select
lg_code,
FIRST_VALUE(store_out_time) as store_out_time
from srctable
group by lg_code
)b
where store_out_time is not null ;
(2) Blink 自定義Sink --- MetaQTimingMsg Sink Blink的當前版本還不支持 MetaQ的定時消息功能的Sink,故利用 Blink的自定義Sink功能,并結合菜鳥物流數據的特點開發了MetaQTimingMsg Sink。關鍵代碼如下(實現 writeAddRecord 方法)。
@Override
public void writeAddRecord(Row row) throws IOException {
Object deliverTime = row.getField(0);
String[] deliverTimeList = deliverTime.toString().split(",");
for(String dTime:deliverTimeList){String orderCode = row.getField(1).toString();String key = orderCode + "_" + dTime;Message message = newMessage(row, dTime, key);boolean result = sendMessage(message,orderCode);if(!result){LOG.error(orderCode + " : " + dTime + " send failed");}}
}private Message newMessage(Row row,String deliverMillisec,String key){//Support Varbinary Type Insert Into MetaQMessage message = new Message();message.setKeys(key);message.putUserProperty("TIMER_DELIVER_MS",deliverMillisec);int arity = row.getArity();Object[] values = new Object[arity];for(int i=0;i<arity;i++){values[i]=row.getField(i);}String lineStr=org.apache.commons.lang3.StringUtils.join(values, FIELD_DELIMITER);try {byte[] bytes = lineStr.getBytes(ENCODING);message.setBody(bytes);message.setWaitStoreMsgOK(true);} catch (UnsupportedEncodingException e) {LOG.error("create new message error",e);}return message;
}private boolean sendMessage(Message message,String orderCode){long retryTime = 0;boolean isSendSuccess = true;if(message != null){message.setTopic(topicName);message.setTags(tagName);}SendResult result = producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {.... // 針對物流訂單code的hash算法return list.get(index.intValue());} },orderCode);if(!result.getSendStatus().equals(SendStatus.SEND_OK)){LOG.error("" + orderCode +" write to metaq result is " + result.getSendStatus().toString());isSendSuccess = false;}return isSendSuccess;
}
}
(3)Blink job3 --- 匯總計算 關鍵Blink SQL 代碼如下,統計了每個倉庫的“出庫超6小時未攬收物理訂單”、“出庫超12小時未攬收物理訂單”、“出庫超24小時未攬收物理訂單”的匯總值。代碼中使用了“stringLast()”函數處理來自dwd_ord_ri的每條消息,以取得每個物流訂單的最新出庫攬收情況,利用Blink Retraction機制,更新匯總值。
create view dws_store_view as
select t1.store_code,max(t1.store_name) as store_name,count(case when length(trim(t1.store_out_time)) = 19and t1.tms_collect_time is null and NOW()-UNIX_TIMESTAMP(t1.store_out_time,'yyyy-MM-dd HH:mm:ss') >= 21600then t2.lg_code end ) as tms_not_collect_6h_ord_cnt, ---出庫超6小時未攬收物流訂單量count(case when length(trim(t1.store_out_time)) = 19and t1.tms_collect_time is null and NOW()-UNIX_TIMESTAMP(t1.store_out_time,'yyyy-MM-dd HH:mm:ss') >= 43200then t2.lg_code end ) as tms_not_collect_12h_ord_cnt,---出庫超6小時未攬收物流訂單量count(case when length(trim(t1.store_out_time)) = 19and t1.tms_collect_time is null and NOW()-UNIX_TIMESTAMP(t1.store_out_time,'yyyy-MM-dd HH:mm:ss') >= 86400then t2.lg_code end ) as tms_not_collect_24h_ord_cnt ---出庫超6小時未攬收物流訂單量
from
(select lg_code,coalesce(store_code,'-1') as store_code,store_name,store_out_time,tms_collect_timefrom (select lg_code,max(store_code) as store_code,max(store_name) as store_name,stringLast(store_out_time) as store_out_time,stringLast(tms_collect_time)as tms_collect_time, from dwd_ord_rigroup by lg_code) a
) t1
left outer join
(select lg_code,from timing_msg where node_name = 'wms'group by lg_code
) t2
on t1.lg_code = t2.lg_code
group byt1.store_code;
三. ?方案優勢
3.1 配置靈活
我們從“Blink SQL 代碼” 和“自定義MetaQ” 兩個方面設計,用戶可以根據具體的業務場景,在Blink SQL的一個view里就能實現多種節點多種考核時間的定時消息生成,而不是針對每一個實操節點的每一種定時指標都要寫一個view,這樣大大節省了代碼量,提升了開發效率。例如對于倉庫節點的出庫超6小時未攬收、超12小時未攬收、超24小時未攬收,這三個指標利用上述方案,僅需在Blink job2的中metaq_timing_msg的第一個字段deliver_time_list中拼接三個kpi_length,即6小時、12小時、24小時為一個字符串即可,由MetaQTimingMsg Sink自動拆分成三條消息下發給MetaQ。對于不同的節點的考核,僅需在node_name,node_time填寫不同的節點名稱和節點實操時間即可。
3.2 主鍵保序
如2.3節所述,自定義的Sink中 實現了MetaQ的 MessageQueueSelector 接口的 select() 方法,同時在Blink SQL 生成的MetaQ消息默認第二個字段為保序主鍵字段。從而,可以根據用戶自定義的主鍵,保證同一主鍵的所有消息放在同一個通道內處理,從而保證按主鍵保序,這對于流計算非常關鍵,能夠實現數據的實時準確性。
3.3 性能優良
讓專業的團隊做專業的事。個人認為,這種大規模的消息存儲、消息下發的任務本就應該交給“消息中間件”來處理,這樣既可以做到計算與消息存儲分離,也可以方便消息的管理,比如針對不同的實操節點,我們還可以定義不同的MetaQ的tag。 另外,正如2.2節所述,我們對定時消息量做了優化。考慮到一筆訂單的屬性字段或其他節點更新會下發多條消息,我們利用了Blink的FIRST_VALUE函數,在Blink job2中同一筆訂單的的一種考核指標只下發一條定時消息,大大減少了消息量,減輕了Blink的寫壓力,和MetaQ的存儲。
四. 自我介紹
馬汶園 ? ?阿里巴巴 -菜鳥網絡—數據部 ? ? ?數據工程師 菜鳥倉配實時研發核心成員,主導多次倉配大促實時數據研發,對利用Blink的原理與特性解決物流場景問題有深入思考與理解。
?
原文鏈接 本文為云棲社區原創內容,未經允許不得轉載。
創作挑戰賽 新人創作獎勵來咯,堅持創作打卡瓜分現金大獎
總結
以上是生活随笔 為你收集整理的利用blink+MQ实现流计算中的超时统计问题 的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網站內容還不錯,歡迎將生活随笔 推薦給好友。