日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

Flink 消息聚合处理方案

發(fā)布時(shí)間:2024/8/23 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink 消息聚合处理方案 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

微博機(jī)器學(xué)習(xí)平臺(tái)使用 Flink 實(shí)時(shí)處理用戶行為日志和生成標(biāo)簽,并且在生成標(biāo)簽后寫(xiě)入存儲(chǔ)系統(tǒng)。為了降低存儲(chǔ)系統(tǒng)的 IO 負(fù)載,有批量寫(xiě)入的需求,同時(shí)對(duì)數(shù)據(jù)延遲也需要進(jìn)行一定的控制,因此需要一種有效的消息聚合處理方案。

在本篇文章中我們將詳細(xì)介紹 Flink 中對(duì)消息進(jìn)行聚合處理的方案,描述不同方案中可能遇到的問(wèn)題和解決方法,并進(jìn)行對(duì)比。

基于 flatMap 的解決方案

這是我們能夠想到最直觀的解決方案,即在自定義的 flatMap 方法中對(duì)消息進(jìn)行聚合,偽代碼如下:

對(duì)應(yīng)的作業(yè)拓?fù)浜瓦\(yùn)行狀態(tài)如下:


該方案的優(yōu)點(diǎn)如下:

  • 邏輯簡(jiǎn)單直觀,各并發(fā)間負(fù)載均勻。
  • flatMap 可以和上游算子 chain 到一起,減少網(wǎng)絡(luò)傳輸開(kāi)銷(xiāo)。
  • 使用 operator state 完成 checkpoint,支持正常和改并發(fā)恢復(fù)。
  • 與此同時(shí),由于使用 operator state,因此所有數(shù)據(jù)都保存在 JVM 堆上,當(dāng)數(shù)據(jù)量較大時(shí)有 GC/OOM 風(fēng)險(xiǎn)。

    使用 Count Window 的解決方案

    對(duì)于大規(guī)模 state 數(shù)據(jù),Flink 推薦使用 RocksDB backend,并且只支持在 KeyedStream 上使用。與此同時(shí),KeyedStream 支持通過(guò) Count Window 來(lái)實(shí)現(xiàn)消息聚合,因此 Count Window 成為第二個(gè)可選方案。

    由于需要使用 KeyedStream,我們面臨的第一個(gè)問(wèn)題就是如何生成 key。一個(gè)比較自然的想法是直接使用隨機(jī)數(shù),偽代碼示例如下:

    對(duì)應(yīng)的作業(yè)拓?fù)淙缦?#xff1a;

    然而實(shí)際上線測(cè)試時(shí)出現(xiàn)了數(shù)據(jù)傾斜,不同并發(fā)間會(huì)出現(xiàn)負(fù)載不均,部分 task 接收不到數(shù)據(jù)從而 TPS 為 0:

    在我們的場(chǎng)景下,除了有批量寫(xiě)入降低 IO 的需求,對(duì)數(shù)據(jù)延遲也需要控制,當(dāng) key set 太大時(shí),每個(gè) key 累積指定數(shù)據(jù)條數(shù)的時(shí)間將增加,會(huì)導(dǎo)致數(shù)據(jù)寫(xiě)入的延遲增大,因此我們需要控制 key set 的大小。經(jīng)過(guò)分析,當(dāng) key set 較小時(shí),Flink 默認(rèn)的數(shù)據(jù)分發(fā)策略在并發(fā)間分布不均,從而導(dǎo)致了上述數(shù)據(jù)傾斜的問(wèn)題。下面我們從源碼級(jí)別對(duì)此進(jìn)行說(shuō)明。

    首先,Flink 為了保證從 state 中恢復(fù)數(shù)據(jù)時(shí)產(chǎn)生最小的 IO,引入了 key group 的概念。Key group 數(shù)目等于最大并發(fā)數(shù)(max parallelism),取值范圍是 128-32768。當(dāng)做數(shù)據(jù)分發(fā)的時(shí)候,key 會(huì)按照規(guī)則被分發(fā)到 key group 里面,相關(guān)代碼如下所示:

    keyGroup->KeyGroupRangeAssignment.assignToKeyGroup(key,maxParallelism);

    然后,key group 會(huì)按照規(guī)則被分發(fā)到每個(gè) task 上,代碼示例如下:

    Task->String.valueOf(KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(maxParallelism, parallelism, keyGroup));

    通過(guò) debug 可以發(fā)現(xiàn),當(dāng) key 的數(shù)量較小時(shí),該分發(fā)策略會(huì)導(dǎo)致 key 在 task 之間分配不均勻,測(cè)試代碼如下:

    輸出結(jié)果如下:

    {0=4, 1=4, 2=1}
    {0=651, 1=686, 2=710}

    可以看到,當(dāng)只有 10 個(gè) key 時(shí),并發(fā)間分布很不均勻;但當(dāng) key 的數(shù)量增加到 2048 時(shí),就相對(duì)均勻了。

    在了解了 key 的分發(fā)策略之后,我們可以相應(yīng)的調(diào)整 key 的生成規(guī)則,來(lái)達(dá)到指定并發(fā)度和 key set 大小前提下的數(shù)據(jù)均勻,如下述代碼所示:


    我們利用 maxParallelism 和 parallelism 生成 key,并將其存儲(chǔ)到一個(gè)大小為 parallelism 的 map 里,以 taskid 作為 map key ,每個(gè) task 對(duì)應(yīng)的 key list 作為 value,來(lái)保證每個(gè) taskid 對(duì)應(yīng)的 list 都存儲(chǔ)了相同數(shù)量的 key。

    最后,再將 map 打平,存儲(chǔ)到一個(gè)數(shù)組里。在使用的時(shí)候,我們可以從該數(shù)組里隨機(jī)取數(shù)來(lái)作為key,就能達(dá)到平均分配的目的了。

    該方案的執(zhí)行效果如下:

    可以看到數(shù)據(jù)傾斜的問(wèn)題得以解決,每個(gè)任務(wù)的負(fù)載都比較均勻。但需要注意的是由于引入了 key by,因此會(huì)有數(shù)據(jù) shuffle,對(duì)比 flatmap 方案會(huì)有額外的網(wǎng)絡(luò)開(kāi)銷(xiāo)。另外由于生成 key 的規(guī)則和實(shí)際并發(fā)度有關(guān),因此該方案不支持改并發(fā)恢復(fù),或者說(shuō)如果修改并發(fā),那么在 restore 的時(shí)候會(huì)發(fā)生數(shù)據(jù)錯(cuò)亂的問(wèn)題,這一點(diǎn)需要尤為注意。

    方案對(duì)比和總結(jié)

    最后我們將兩種解決方案的優(yōu)缺點(diǎn)對(duì)比總結(jié)如下:

    在數(shù)據(jù)量不大且內(nèi)存充足的情況下,建議使用 flatmap 方案;在數(shù)據(jù)量較大且可以保證不修改并發(fā)的情況下,建議使用 count window 方案并使用 RocksDB 進(jìn)行 state數(shù)據(jù)存儲(chǔ);在數(shù)據(jù)量較大且需要修改并發(fā)的情況下,當(dāng)前給出的兩種方案都無(wú)法解決,需要尋求新的解決方案。

    作者介紹:

    曹富強(qiáng)、張穎,微博機(jī)器學(xué)習(xí)研發(fā)中心-系統(tǒng)工程師。現(xiàn)負(fù)責(zé)微博機(jī)器學(xué)習(xí)平臺(tái)數(shù)據(jù)計(jì)算模塊,主要涉及實(shí)時(shí)計(jì)算 Flink、Storm、Spark Streaming,離線計(jì)算 Hive、Spark 等。目前專注于 Flink 在微博機(jī)器學(xué)習(xí)場(chǎng)景的應(yīng)用。

    原文鏈接
    本文為云棲社區(qū)原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。

    總結(jié)

    以上是生活随笔為你收集整理的Flink 消息聚合处理方案的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。