日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flink keyby 数据倾斜问题处理

發布時間:2024/8/23 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink keyby 数据倾斜问题处理 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

上一篇我們使用keyby后發現數據嚴重傾斜

https://datamining.blog.csdn.net/article/details/105316728

大概看下問題所在,大量數據在一個subtask中運行

這里我們使用兩階段keyby?解決該問題

之前的問題如下圖所示

我們期望的是

但我們的需要根據key進行聚合統計,那么把相同的key放在不同的subtask如何統計?

我們看下圖(只畫了主要部分)

1.首先將key打散,我們加入將key轉化為 key-隨機數 ,保證數據散列

2.對打散后的數據進行聚合統計,這時我們會得到數據比如 : (key1-12,1),(key1-13,19),(key1-1,20),(key2-123,11),(key2-123,10)

3.將散列key還原成我們之前傳入的key,這時我們的到數據是聚合統計后的結果,不是最初的原數據

4.二次keyby進行結果統計,輸出到addSink

直接看實現代碼

import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.api.scala.typeutils.Types import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.functions.windowing.WindowFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collectorobject ProcessFunctionScalaV2 {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.enableCheckpointing(2000)val stream: DataStream[String] = env.socketTextStream("localhost", 9999)val typeAndData: DataStream[(String, Long)] = stream.map(x => (x.split(",")(0), x.split(",")(1).toLong))val dataStream: DataStream[(String, Long)] = typeAndData.map(x => (x._1 + "-" + scala.util.Random.nextInt(100), x._2))val keyByAgg: DataStream[DataJast] = dataStream.keyBy(_._1).timeWindow(Time.seconds(10)).aggregate(new CountAggregate())keyByAgg.print("第一次keyby輸出")val result: DataStream[DataJast] = keyByAgg.map(data => {val newKey: String = data.key.substring(0, data.key.indexOf("-"))println(newKey)DataJast(newKey, data.count)}).keyBy(_.key).process(new MyProcessFunction())result.print("第二次keyby輸出")env.execute()}case class DataJast(key :String,count:Long)//計算keyby后,每個Window中的數據總和class CountAggregate extends AggregateFunction[(String, Long),DataJast, DataJast] {override def createAccumulator(): DataJast = {println("初始化")DataJast(null,0)}override def add(value: (String, Long), accumulator: DataJast): DataJast = {if(accumulator.key==null){printf("第一次加載,key:%s,value:%d\n",value._1,value._2)DataJast(value._1,value._2)}else{printf("數據累加,key:%s,value:%d\n",value._1,accumulator.count+value._2)DataJast(value._1,accumulator.count + value._2)}}override def getResult(accumulator: DataJast): DataJast = {println("返回結果:"+accumulator)accumulator}override def merge(a: DataJast, b: DataJast): DataJast = {DataJast(a.key,a.count+b.count)}}/*** 實現:* 根據key分類,統計每個key進來的數據量,定期統計數量*/class MyProcessFunction extends KeyedProcessFunction[String,DataJast,DataJast]{val delayTime : Long = 1000L * 30lazy val valueState:ValueState[Long] = getRuntimeContext.getState[Long](new ValueStateDescriptor[Long]("ccount",classOf[Long]))override def processElement(value: DataJast, ctx: KeyedProcessFunction[String, DataJast, DataJast]#Context, out: Collector[DataJast]): Unit = {if(valueState.value()==0){valueState.update(value.count)printf("運行task:%s,第一次初始化數量:%s\n",getRuntimeContext.getIndexOfThisSubtask,value.count)val currentTime: Long = ctx.timerService().currentProcessingTime()//注冊定時器ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime)}else{valueState.update(valueState.value()+value.count)printf("運行task:%s,更新統計結果:%s\n" ,getRuntimeContext.getIndexOfThisSubtask,valueState.value())}}override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, DataJast, DataJast]#OnTimerContext, out: Collector[DataJast]): Unit = {//定時器執行,可加入業務操作printf("運行task:%s,觸發定時器,30秒內數據一共,key:%s,value:%s\n",getRuntimeContext.getIndexOfThisSubtask,ctx.getCurrentKey,valueState.value())//定時統計完成,初始化統計數據valueState.update(0)//注冊定時器val currentTime: Long = ctx.timerService().currentProcessingTime()ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime)}}}

對key進行散列?

val dataStream: DataStream[(String, Long)] = typeAndData.map(x => (x._1 + "-" + scala.util.Random.nextInt(100), x._2))

?設置窗口滾動時間,每隔十秒統計一次每隔key下的數據總量

val keyByAgg: DataStream[DataJast] = dataStream.keyBy(_._1).timeWindow(Time.seconds(10)).aggregate(new AverageAggregate())keyByAgg.print("第一次keyby輸出")

還原key,并進行二次keyby,對數據總量進行累加

val result: DataStream[DataJast] = keyByAgg.map(data => {val newKey: String = data.key.substring(0, data.key.indexOf("-"))println(newKey)DataJast(newKey, data.count)}).keyBy(_.key).process(new MyProcessFunction())

?

我們看下優化后的狀態

先看下第一map,直接從端口拿數據,這不涉及keyby,所以這個沒影響

再看下第一次keyby后的結果,因為我們散列后,flink根據哈希進行分配,所以數據不是百分之百平均,但是很明顯基本上已經均衡了,不會出現這里1一條,那里1條這種狀況

再看下第二次keyby,這里會發現我們ID的2的subtask有820條數據,其他的沒有數據;這里是正常現象,因為我們是對第一次聚合后的數據進行keyby統計,所以這里的數據大小會非常小,比如我們原始數據一條數據有1M大小,1000條數據就1個G,業務往往還有其他操作,我們再第一次keyby?散列時處理其他邏輯(比如ETL等等操作),最終將統計結果輸出給第二次keyby,很可能1個G的數據,最終只有1kb,這比我們將1個G的數據放在一個subtask中處理好很多。

上面我們自定義了MyProcessFunction方法,設置每30秒執行一次,實際業務場景,我們可能會設置一小時執行一次。

至此我們既保證了數據定時統計,也保證了數據不傾斜問題。

創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

總結

以上是生活随笔為你收集整理的Flink keyby 数据倾斜问题处理的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。