日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 >

Flink ProcessFunction 介绍使用

發(fā)布時間:2024/8/23 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink ProcessFunction 介绍使用 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

目錄

實現(xiàn)功能

代碼

測試

問題


官網(wǎng)描述:https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/operators/process_function.html

The?ProcessFunction?is a low-level stream processing operation, giving access to the basic building blocks of all (acyclic) streaming applications:

  • events (stream elements)
  • state (fault-tolerant, consistent, only on keyed stream)
  • timers (event time and processing time, only on keyed stream)

The?ProcessFunction?can be thought of as a?FlatMapFunction?with access to keyed state and timers. It handles events by being invoked for each event received in the input stream(s).

For fault-tolerant state, the?ProcessFunction?gives access to Flink’s?keyed state, accessible via the?RuntimeContext, similar to the way other stateful functions can access keyed state.

The timers allow applications to react to changes in processing time and in?event time. Every call to the function?processElement(...)?gets a?Context?object which gives access to the element’s event time timestamp, and to the?TimerService. The?TimerService?can be used to register callbacks for future event-/processing-time instants. With event-time timers, the?onTimer(...)?method is called when the current watermark is advanced up to or beyond the timestamp of the timer, while with processing-time timers,?onTimer(...)?is called when wall clock time reaches the specified time. During that call, all states are again scoped to the key with which the timer was created, allowing timers to manipulate keyed state.

?

ProcessFunction是一個低階的流處理操作,它可以訪問流處理程序的基礎(chǔ)構(gòu)建模塊:

1.事件(event)(流元素)。

2.狀態(tài)(state)(容錯性,一致性,僅在keyed stream中)。

3.定時器(timers)(event time和processing time, 僅在keyed stream中)。

?

state和timers 僅在keyed stream中使用,這里我們先介紹KeyedProcessFunction方法使用

實現(xiàn)功能

通過socketTextStream讀取9999端口數(shù)據(jù),統(tǒng)計在一定時間內(nèi)不同類型商品的銷售總額度,如果持續(xù)銷售額度為0,則執(zhí)行定時器通知老板,是不是賣某種類型商品的員工偷懶了(只做功能演示,根據(jù)個人業(yè)務(wù)來使用,比如統(tǒng)計UV等操作)

代碼

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.api.scala.typeutils.Types import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collectorobject ProcessFuncationScala {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval stream: DataStream[String] = env.socketTextStream("localhost", 9999)val typeAndData: DataStream[(String, String)] = stream.map(x => (x.split(",")(0), x.split(",")(1))).setParallelism(4)typeAndData.keyBy(0).process(new MyprocessFunction()).print("結(jié)果")env.execute()}/*** 實現(xiàn):* 根據(jù)key分類,統(tǒng)計每個key進來的數(shù)據(jù)量,定期統(tǒng)計數(shù)量,如果數(shù)量為0則預(yù)警*/class MyprocessFunction extends KeyedProcessFunction[Tuple,(String,String),String]{//統(tǒng)計間隔時間val delayTime : Long = 1000 * 10lazy val state : ValueState[(String,Long)] = getRuntimeContext.getState[(String,Long)](new ValueStateDescriptor[(String, Long)]("cjcount",classOf[Tuple2[String,Long]]))override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, (String, String), String]#OnTimerContext, out: Collector[String]): Unit = {printf("定時器觸發(fā),時間為:%d,狀態(tài)為:%s,key為:%s\n",timestamp,state.value(),ctx.getCurrentKey)if(state.value()._2==0){//該時間段數(shù)據(jù)為0,進行預(yù)警printf("類型為:%s,數(shù)據(jù)為0,預(yù)警\n",state.value()._1)}//定期數(shù)據(jù)統(tǒng)計完成后,清零state.update(state.value()._1,0)//再次注冊定時器執(zhí)行val currentTime: Long = ctx.timerService().currentProcessingTime()ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime)}override def processElement(value: (String, String), ctx: KeyedProcessFunction[Tuple, (String, String), String]#Context, out: Collector[String]): Unit = {printf("狀態(tài)值:%s,state是否為空:%s\n",state.value(),(state.value()==null))if(state.value() == null){//獲取時間val currentTime: Long = ctx.timerService().currentProcessingTime()//注冊定時器十秒后觸發(fā)ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime)printf("定時器注冊時間:%d\n",currentTime+10000L)state.update(value._1,value._2.toInt)} else{//統(tǒng)計數(shù)據(jù)val key: String = state.value()._1var count: Long = state.value()._2count += value._2.toInt//更新state值state.update((key,count))}println(getRuntimeContext.getTaskNameWithSubtasks+"->"+value)printf("狀態(tài)值:%s\n",state.value())//返回處理后結(jié)果out.collect("處理后返回數(shù)據(jù)->"+value)}}}

?

代碼中使用ValueState記錄了狀態(tài)信息,每次來商品都會進行總額度累加;商品第一次進入的時候會注冊一個定時器,每隔十秒執(zhí)行一次,定時器做預(yù)警功能,如果十秒內(nèi)商品銷售等于0,我們則進行預(yù)警。

測試

往端口輸入數(shù)據(jù)

十秒內(nèi)輸入四條數(shù)據(jù)

帽子,12 帽子,12 鞋,10 鞋,10

?通過我們打印我們會發(fā)現(xiàn)統(tǒng)計完成,

定時器觸發(fā),時間為:1586005420511,狀態(tài)為:(鞋,20),key為:(鞋) 定時器觸發(fā),時間為:1586005421080,狀態(tài)為:(帽子,24),key為:(帽子)

如果我們十秒內(nèi)不輸入數(shù)據(jù),則會提示數(shù)據(jù)為0,進行預(yù)警

定時器觸發(fā),時間為:1586005406244,狀態(tài)為:(帽子,0),key為:(帽子) 類型為:帽子,數(shù)據(jù)為0,預(yù)警 定時器觸發(fā),時間為:1586005406244,狀態(tài)為:(鞋,0),key為:(鞋) 類型為:鞋,數(shù)據(jù)為0,預(yù)警

問題

到這里我們已經(jīng)實現(xiàn)了定期統(tǒng)計功能,但有沒有發(fā)現(xiàn),如果帽子分配在task1執(zhí)行,鞋在task2執(zhí)行,鞋一天進來1億條數(shù)據(jù),帽子進來1條數(shù)據(jù),我們會出現(xiàn)嚴(yán)重的數(shù)據(jù)傾斜問題。

我們實際看一下具體問題

計算結(jié)果我們就先不看了,直接看數(shù)據(jù)分配問題

三個task階段 , Socket是單并行的source,我們將并行度改為4

?

輸入數(shù)據(jù):1條 帽子,10?;50條 鞋,10

我們看Map階段,數(shù)據(jù)是均衡的,因為這里還沒有進行keyby

?

我們再看keyby后的task

我們發(fā)現(xiàn)50條數(shù)據(jù)都在ID為3的subtask中,出現(xiàn)了嚴(yán)重數(shù)據(jù)傾斜問題?

這種問題我們可以進行兩階段keyby解決該問題

具體數(shù)據(jù)傾斜問題參考:https://datamining.blog.csdn.net/article/details/105322423

總結(jié)

以上是生活随笔為你收集整理的Flink ProcessFunction 介绍使用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。