Flink 消息聚合处理方案
微博機(jī)器學(xué)習(xí)平臺(tái)使用 Flink 實(shí)時(shí)處理用戶行為日志和生成標(biāo)簽,并且在生成標(biāo)簽后寫(xiě)入存儲(chǔ)系統(tǒng)。為了降低存儲(chǔ)系統(tǒng)的 IO 負(fù)載,有批量寫(xiě)入的需求,同時(shí)對(duì)數(shù)據(jù)延遲也需要進(jìn)行一定的控制,因此需要一種有效的消息聚合處理方案。
在本篇文章中我們將詳細(xì)介紹 Flink 中對(duì)消息進(jìn)行聚合處理的方案,描述不同方案中可能遇到的問(wèn)題和解決方法,并進(jìn)行對(duì)比。
基于 flatMap 的解決方案
這是我們能夠想到最直觀的解決方案,即在自定義的 flatMap 方法中對(duì)消息進(jìn)行聚合,偽代碼如下:
對(duì)應(yīng)的作業(yè)拓?fù)浜瓦\(yùn)行狀態(tài)如下:
該方案的優(yōu)點(diǎn)如下:
與此同時(shí),由于使用 operator state,因此所有數(shù)據(jù)都保存在 JVM 堆上,當(dāng)數(shù)據(jù)量較大時(shí)有 GC/OOM 風(fēng)險(xiǎn)。
使用 Count Window 的解決方案
對(duì)于大規(guī)模 state 數(shù)據(jù),Flink 推薦使用 RocksDB backend,并且只支持在 KeyedStream 上使用。與此同時(shí),KeyedStream 支持通過(guò) Count Window 來(lái)實(shí)現(xiàn)消息聚合,因此 Count Window 成為第二個(gè)可選方案。
由于需要使用 KeyedStream,我們面臨的第一個(gè)問(wèn)題就是如何生成 key。一個(gè)比較自然的想法是直接使用隨機(jī)數(shù),偽代碼示例如下:
對(duì)應(yīng)的作業(yè)拓?fù)淙缦?#xff1a;
然而實(shí)際上線測(cè)試時(shí)出現(xiàn)了數(shù)據(jù)傾斜,不同并發(fā)間會(huì)出現(xiàn)負(fù)載不均,部分 task 接收不到數(shù)據(jù)從而 TPS 為 0:
在我們的場(chǎng)景下,除了有批量寫(xiě)入降低 IO 的需求,對(duì)數(shù)據(jù)延遲也需要控制,當(dāng) key set 太大時(shí),每個(gè) key 累積指定數(shù)據(jù)條數(shù)的時(shí)間將增加,會(huì)導(dǎo)致數(shù)據(jù)寫(xiě)入的延遲增大,因此我們需要控制 key set 的大小。經(jīng)過(guò)分析,當(dāng) key set 較小時(shí),Flink 默認(rèn)的數(shù)據(jù)分發(fā)策略在并發(fā)間分布不均,從而導(dǎo)致了上述數(shù)據(jù)傾斜的問(wèn)題。下面我們從源碼級(jí)別對(duì)此進(jìn)行說(shuō)明。
首先,Flink 為了保證從 state 中恢復(fù)數(shù)據(jù)時(shí)產(chǎn)生最小的 IO,引入了 key group 的概念。Key group 數(shù)目等于最大并發(fā)數(shù)(max parallelism),取值范圍是 128-32768。當(dāng)做數(shù)據(jù)分發(fā)的時(shí)候,key 會(huì)按照規(guī)則被分發(fā)到 key group 里面,相關(guān)代碼如下所示:
keyGroup->KeyGroupRangeAssignment.assignToKeyGroup(key,maxParallelism);然后,key group 會(huì)按照規(guī)則被分發(fā)到每個(gè) task 上,代碼示例如下:
Task->String.valueOf(KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(maxParallelism, parallelism, keyGroup));通過(guò) debug 可以發(fā)現(xiàn),當(dāng) key 的數(shù)量較小時(shí),該分發(fā)策略會(huì)導(dǎo)致 key 在 task 之間分配不均勻,測(cè)試代碼如下:
輸出結(jié)果如下:
{0=4, 1=4, 2=1}
{0=651, 1=686, 2=710}
可以看到,當(dāng)只有 10 個(gè) key 時(shí),并發(fā)間分布很不均勻;但當(dāng) key 的數(shù)量增加到 2048 時(shí),就相對(duì)均勻了。
在了解了 key 的分發(fā)策略之后,我們可以相應(yīng)的調(diào)整 key 的生成規(guī)則,來(lái)達(dá)到指定并發(fā)度和 key set 大小前提下的數(shù)據(jù)均勻,如下述代碼所示:
我們利用 maxParallelism 和 parallelism 生成 key,并將其存儲(chǔ)到一個(gè)大小為 parallelism 的 map 里,以 taskid 作為 map key ,每個(gè) task 對(duì)應(yīng)的 key list 作為 value,來(lái)保證每個(gè) taskid 對(duì)應(yīng)的 list 都存儲(chǔ)了相同數(shù)量的 key。
最后,再將 map 打平,存儲(chǔ)到一個(gè)數(shù)組里。在使用的時(shí)候,我們可以從該數(shù)組里隨機(jī)取數(shù)來(lái)作為key,就能達(dá)到平均分配的目的了。
該方案的執(zhí)行效果如下:
可以看到數(shù)據(jù)傾斜的問(wèn)題得以解決,每個(gè)任務(wù)的負(fù)載都比較均勻。但需要注意的是由于引入了 key by,因此會(huì)有數(shù)據(jù) shuffle,對(duì)比 flatmap 方案會(huì)有額外的網(wǎng)絡(luò)開(kāi)銷(xiāo)。另外由于生成 key 的規(guī)則和實(shí)際并發(fā)度有關(guān),因此該方案不支持改并發(fā)恢復(fù),或者說(shuō)如果修改并發(fā),那么在 restore 的時(shí)候會(huì)發(fā)生數(shù)據(jù)錯(cuò)亂的問(wèn)題,這一點(diǎn)需要尤為注意。
方案對(duì)比和總結(jié)
最后我們將兩種解決方案的優(yōu)缺點(diǎn)對(duì)比總結(jié)如下:
在數(shù)據(jù)量不大且內(nèi)存充足的情況下,建議使用 flatmap 方案;在數(shù)據(jù)量較大且可以保證不修改并發(fā)的情況下,建議使用 count window 方案并使用 RocksDB 進(jìn)行 state數(shù)據(jù)存儲(chǔ);在數(shù)據(jù)量較大且需要修改并發(fā)的情況下,當(dāng)前給出的兩種方案都無(wú)法解決,需要尋求新的解決方案。
作者介紹:
曹富強(qiáng)、張穎,微博機(jī)器學(xué)習(xí)研發(fā)中心-系統(tǒng)工程師。現(xiàn)負(fù)責(zé)微博機(jī)器學(xué)習(xí)平臺(tái)數(shù)據(jù)計(jì)算模塊,主要涉及實(shí)時(shí)計(jì)算 Flink、Storm、Spark Streaming,離線計(jì)算 Hive、Spark 等。目前專注于 Flink 在微博機(jī)器學(xué)習(xí)場(chǎng)景的應(yīng)用。
原文鏈接
本文為云棲社區(qū)原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
以上是生活随笔為你收集整理的Flink 消息聚合处理方案的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 基于阿里云 MaxCompute 构建企
- 下一篇: 从零开始入门 K8s | K8s 安全之