Flink-SQL源码解读(一)window算子的创建的源码分析
本文大體框架參考 https://blog.csdn.net/LS_ice/article/details/90711744
flink版本:1.9
Intro
作為無(wú)限流的核心機(jī)制,流可以分割為大小有限的窗口,通過(guò)在窗口內(nèi)進(jìn)行聚合。把源源不斷產(chǎn)生的數(shù)據(jù)根據(jù)不同的條件劃分成一段一段有邊界的數(shù)據(jù)區(qū)間,使用戶能夠利用窗口功能實(shí)現(xiàn)很多復(fù)雜的統(tǒng)計(jì)分析需求。
windowOperator算子的創(chuàng)建架構(gòu)
window語(yǔ)法主要是在group by語(yǔ)句中使用,calcite創(chuàng)建WindowOperator算子伴隨著聚合策略的實(shí)現(xiàn),包括聚合規(guī)則匹配(StreamExecGroupWindowAggregateRule),以及生成聚合physical算子StreamExecGroupWindowAggregate兩個(gè)子流程
上圖內(nèi)部流程分析:
應(yīng)用層SQL:
1.1 window分類(lèi)及配置,包括滑動(dòng)、翻轉(zhuǎn)、會(huì)話類(lèi)型窗口
1.2 window時(shí)間類(lèi)型配置,默認(rèn)待字段名的EventTime,也可以通過(guò)PROCTIME()配置為ProcessingTime
Calcite解析引擎:
2.1 Calcite SQL解析,包括邏輯、優(yōu)化、物理計(jì)劃和算子綁定(#translateToPlanInternal),在本文特指StreamExecGroupWindowAggregateRule和StreamExecGroupWindowAggregate物理計(jì)劃
WindowOperator算子創(chuàng)建相關(guān):
3.1 StreamExecGroupWindowAggregate#createWindowOperator創(chuàng)建算子
3.2 WindowAssigner的創(chuàng)建,根據(jù)輸入的數(shù)據(jù),和窗口類(lèi)型,生成多個(gè)窗口
3.3 processElement()真實(shí)處理數(shù)據(jù),包括聚合運(yùn)算,生成窗口,更新緩存,提交數(shù)據(jù)等功能
3.4 Trigger根據(jù)數(shù)據(jù)或時(shí)間,來(lái)決定窗口觸發(fā)
StreamExecGroupWindowAggregateRule
/home/graviti/flink/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecGroupWindowAggregateRule.scala
會(huì)對(duì)window進(jìn)行提前匹配,生成的WindowEmitStrategy內(nèi)部具有:是否為EventTime表標(biāo)識(shí)、是否為SessionWindow、early fire和late fire配置、延遲毫秒數(shù)(窗口結(jié)束時(shí)間加上這個(gè)毫秒數(shù)即數(shù)據(jù)清理時(shí)間)獲取聚合邏輯計(jì)劃中,window配置的時(shí)間字段,記錄時(shí)間字段index信息,window的觸發(fā)和清理都會(huì)用到這個(gè)時(shí)間
2.1 match匹配是否是group by 語(yǔ)句
override def matches(call: RelOptRuleCall): Boolean = {val agg: FlinkLogicalWindowAggregate = call.rel(0)// check if we have grouping setsval groupSets = agg.getGroupType != Group.SIMPLEif (groupSets || agg.indicator) {throw new TableException("GROUPING SETS are currently not supported.")}true}2.2 生成WindowEmitStrategy 包含window的一些配置屬性
/home/graviti/flink/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala
object WindowEmitStrategy {/**是否為EventTime表標(biāo)識(shí)、* 是否為SessionWindow、* early fire和late fire配置、* 延遲毫秒數(shù)(窗口結(jié)束時(shí)間加上這個(gè)毫秒數(shù)即數(shù)據(jù)清理時(shí)間)* 是否允許延遲* @param tableConfig* @param window* @return*/def apply(tableConfig: TableConfig, window: LogicalWindow): WindowEmitStrategy = {val isEventTime = isRowtimeAttribute(window.timeAttribute)val isSessionWindow = window.isInstanceOf[SessionGroupWindow]val allowLateness = if (isSessionWindow) {// ignore allow lateness in session window because retraction is not supported0L} else if (tableConfig.getMinIdleStateRetentionTime < 0) {// min idle state retention time is not set, use 0L as default which means not allow lateness0L} else {// use min idle state retention time as allow latenesstableConfig.getMinIdleStateRetentionTime}val enableEarlyFireDelay = tableConfig.getConfiguration.getBoolean(TABLE_EXEC_EMIT_EARLY_FIRE_ENABLED)val earlyFireDelay = getMillisecondFromConfigDuration(tableConfig, TABLE_EXEC_EMIT_EARLY_FIRE_DELAY)val enableLateFireDelay = tableConfig.getConfiguration.getBoolean(TABLE_EXEC_EMIT_LATE_FIRE_ENABLED)val lateFireDelay = getMillisecondFromConfigDuration(tableConfig, TABLE_EXEC_EMIT_LATE_FIRE_DELAY)new WindowEmitStrategy(isEventTime,isSessionWindow,earlyFireDelay,enableEarlyFireDelay,lateFireDelay,enableLateFireDelay,allowLateness)}下面是convert函數(shù)的完整實(shí)現(xiàn)
大概分為三個(gè)部分:
- 生成WindowEmitStrategy
- 生成時(shí)間字段index信息,window的觸發(fā)與清理都會(huì)用到這個(gè)時(shí)間
- 調(diào)用StreamExecGroupWindowAggregate,創(chuàng)建算子
下面來(lái)看WindowAggregate
/home/graviti/flink/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupWindowAggregateBase.scala
它是繼承了StreamExecGroupWindowAggregateBase這個(gè)抽象類(lèi),我們直接來(lái)看這個(gè)代碼。
主要做了三件事:
- 生成inpu信息
- aggHandler用于生成代碼,實(shí)現(xiàn)accumulate,retract,merge,update等方法
- 創(chuàng)建算子,設(shè)置window類(lèi)型,retract標(biāo)識(shí),trigger,聚合函數(shù)句柄等。
優(yōu)化點(diǎn):
注意到代碼有注釋
EventTimeTumblingGroupWindow should sort the stream on event time
這是可以著手優(yōu)化的地方
上一小節(jié),已經(jīng)完成了WindowOperator參數(shù)的設(shè)定,并創(chuàng)建實(shí)例,接下來(lái)我們主要分析WindowOperator真實(shí)處理數(shù)據(jù)的流程(起點(diǎn)在WindowOperator#processElement方法):
Windowoperator
當(dāng)流中一個(gè)元素到達(dá)的時(shí)候,它會(huì)被分配到一個(gè)key,由KeySelector(udf)完成。
然后WindowAssinger會(huì)給這個(gè)元素分配到0到多個(gè)窗口。
經(jīng)過(guò)這些步驟之后,這個(gè)元素就相當(dāng)于被放入一個(gè)pane,即窗格。
所謂窗格,是一系列擁有相同key和在相同窗口的元素。這是flink對(duì)窗口做的一種優(yōu)化,具體原理見(jiàn)窗格介紹
然后每一個(gè)窗格都有一個(gè)屬于其的Trigger實(shí)例,Trigger決定什么時(shí)候窗格內(nèi)的元素會(huì)被觸發(fā)計(jì)算。計(jì)算的業(yè)務(wù)邏輯是由用戶自己定義的。
@Overridepublic void processElement(StreamRecord<IN> element) throws Exception {//對(duì)每個(gè)元素分配窗口final Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp(), windowAssignerContext);//if element is handled by none of assigned elementWindowsboolean isSkippedElement = true;final K key = this.<K>getKeyedStateBackend().getCurrentKey();//if (windowAssigner instanceof MergingWindowAssigner) {MergingWindowSet<W> mergingWindows = getMergingWindowSet();for (W window: elementWindows) {//接下來(lái)是遍歷涉及的窗口進(jìn)行聚合,包括從windowState獲取聚合前值、使用句柄進(jìn)行聚合、// 更新?tīng)顟B(tài)至windowState,將當(dāng)前轉(zhuǎn)態(tài)// adding the new window might result in a merge, in that case the actualWindow// is the merged window and we work with that. If we don't merge then// actualWindow == windowW actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() {@Overridepublic void merge(W mergeResult,Collection<W> mergedWindows, W stateWindowResult,Collection<W> mergedStateWindows) throws Exception {//如果最大時(shí)間戳加上可以允許的延遲還是小于這個(gè)水印線,那么說(shuō)明這個(gè)數(shù)據(jù)要被丟棄了if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {throw new UnsupportedOperationException("The end timestamp of an " +"event-time window cannot become earlier than the current watermark " +"by merging. Current watermark: " + internalTimerService.currentWatermark() +" window: " + mergeResult);} else if (!windowAssigner.isEventTime()) {long currentProcessingTime = internalTimerService.currentProcessingTime();if (mergeResult.maxTimestamp() <= currentProcessingTime) {throw new UnsupportedOperationException("The end timestamp of a " +"processing-time window cannot become earlier than the current processing time " +"by merging. Current processing time: " + currentProcessingTime +" window: " + mergeResult);}}triggerContext.key = key;triggerContext.window = mergeResult;triggerContext.onMerge(mergedWindows);for (W m: mergedWindows) {triggerContext.window = m;triggerContext.clear();deleteCleanupTimer(m);}//更新?tīng)顟B(tài)// merge the merged state windows into the newly resulting state windowwindowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);}});// drop if the window is already lateif (isWindowLate(actualWindow)) {mergingWindows.retireWindow(actualWindow);continue;}isSkippedElement = false;W stateWindow = mergingWindows.getStateWindow(actualWindow);if (stateWindow == null) {throw new IllegalStateException("Window " + window + " is not in in-flight window set.");}windowState.setCurrentNamespace(stateWindow);//把元素分配到相應(yīng)的聚合后的windowwindowState.add(element.getValue());//使用TriggerContext(其實(shí)就是不同類(lèi)型窗口Trigger觸發(fā)器的代理),// 綜合early fire、late fire、水印時(shí)間與窗口結(jié)束時(shí)間,綜合判斷是否觸發(fā)窗口寫(xiě)出triggerContext.key = key;triggerContext.window = actualWindow;//判斷是否觸發(fā)TriggerResult triggerResult = triggerContext.onElement(element);if (triggerResult.isFire()) {ACC contents = windowState.get();if (contents == null) {continue;}//觸發(fā)窗口計(jì)算emitWindowContents(actualWindow, contents);}if (triggerResult.isPurge()) {windowState.clear();}registerCleanupTimer(actualWindow);}// need to make sure to update the merging state in statemergingWindows.persist();} else {for (W window: elementWindows) {// drop if the window is already lateif (isWindowLate(window)) {continue;}isSkippedElement = false;windowState.setCurrentNamespace(window);windowState.add(element.getValue());triggerContext.key = key;triggerContext.window = window;TriggerResult triggerResult = triggerContext.onElement(element);if (triggerResult.isFire()) {ACC contents = windowState.get();if (contents == null) {continue;}emitWindowContents(window, contents);}if (triggerResult.isPurge()) {windowState.clear();}registerCleanupTimer(window);}}// side output input event if// element not handled by any window// late arriving tag has been set// windowAssigner is event time and current timestamp + allowed lateness no less than element timestampif (isSkippedElement && isElementLate(element)) {if (lateDataOutputTag != null){sideOutput(element);} else {this.numLateRecordsDropped.inc();}}}總結(jié)
以上是生活随笔為你收集整理的Flink-SQL源码解读(一)window算子的创建的源码分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 2015年我国软件产业市场现状分析
- 下一篇: 飞凌嵌入式 OKA40i-C 开发板调试