日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > windows >内容正文

windows

Flink-SQL源码解读(一)window算子的创建的源码分析

發布時間:2024/3/24 windows 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink-SQL源码解读(一)window算子的创建的源码分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本文大體框架參考 https://blog.csdn.net/LS_ice/article/details/90711744

flink版本:1.9

Intro

作為無限流的核心機制,流可以分割為大小有限的窗口,通過在窗口內進行聚合。把源源不斷產生的數據根據不同的條件劃分成一段一段有邊界的數據區間,使用戶能夠利用窗口功能實現很多復雜的統計分析需求。

windowOperator算子的創建架構

window語法主要是在group by語句中使用,calcite創建WindowOperator算子伴隨著聚合策略的實現,包括聚合規則匹配(StreamExecGroupWindowAggregateRule),以及生成聚合physical算子StreamExecGroupWindowAggregate兩個子流程


上圖內部流程分析:
應用層SQL:
1.1 window分類及配置,包括滑動、翻轉、會話類型窗口
1.2 window時間類型配置,默認待字段名的EventTime,也可以通過PROCTIME()配置為ProcessingTime
Calcite解析引擎:
2.1 Calcite SQL解析,包括邏輯、優化、物理計劃和算子綁定(#translateToPlanInternal),在本文特指StreamExecGroupWindowAggregateRule和StreamExecGroupWindowAggregate物理計劃
WindowOperator算子創建相關:
3.1 StreamExecGroupWindowAggregate#createWindowOperator創建算子
3.2 WindowAssigner的創建,根據輸入的數據,和窗口類型,生成多個窗口
3.3 processElement()真實處理數據,包括聚合運算,生成窗口,更新緩存,提交數據等功能
3.4 Trigger根據數據或時間,來決定窗口觸發

StreamExecGroupWindowAggregateRule

/home/graviti/flink/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecGroupWindowAggregateRule.scala

會對window進行提前匹配,生成的WindowEmitStrategy內部具有:是否為EventTime表標識、是否為SessionWindow、early fire和late fire配置、延遲毫秒數(窗口結束時間加上這個毫秒數即數據清理時間)獲取聚合邏輯計劃中,window配置的時間字段,記錄時間字段index信息,window的觸發和清理都會用到這個時間

2.1 match匹配是否是group by 語句

override def matches(call: RelOptRuleCall): Boolean = {val agg: FlinkLogicalWindowAggregate = call.rel(0)// check if we have grouping setsval groupSets = agg.getGroupType != Group.SIMPLEif (groupSets || agg.indicator) {throw new TableException("GROUPING SETS are currently not supported.")}true}

2.2 生成WindowEmitStrategy 包含window的一些配置屬性

/home/graviti/flink/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala

object WindowEmitStrategy {/**是否為EventTime表標識、* 是否為SessionWindow、* early fire和late fire配置、* 延遲毫秒數(窗口結束時間加上這個毫秒數即數據清理時間)* 是否允許延遲* @param tableConfig* @param window* @return*/def apply(tableConfig: TableConfig, window: LogicalWindow): WindowEmitStrategy = {val isEventTime = isRowtimeAttribute(window.timeAttribute)val isSessionWindow = window.isInstanceOf[SessionGroupWindow]val allowLateness = if (isSessionWindow) {// ignore allow lateness in session window because retraction is not supported0L} else if (tableConfig.getMinIdleStateRetentionTime < 0) {// min idle state retention time is not set, use 0L as default which means not allow lateness0L} else {// use min idle state retention time as allow latenesstableConfig.getMinIdleStateRetentionTime}val enableEarlyFireDelay = tableConfig.getConfiguration.getBoolean(TABLE_EXEC_EMIT_EARLY_FIRE_ENABLED)val earlyFireDelay = getMillisecondFromConfigDuration(tableConfig, TABLE_EXEC_EMIT_EARLY_FIRE_DELAY)val enableLateFireDelay = tableConfig.getConfiguration.getBoolean(TABLE_EXEC_EMIT_LATE_FIRE_ENABLED)val lateFireDelay = getMillisecondFromConfigDuration(tableConfig, TABLE_EXEC_EMIT_LATE_FIRE_DELAY)new WindowEmitStrategy(isEventTime,isSessionWindow,earlyFireDelay,enableEarlyFireDelay,lateFireDelay,enableLateFireDelay,allowLateness)}

下面是convert函數的完整實現

大概分為三個部分:

  • 生成WindowEmitStrategy
  • 生成時間字段index信息,window的觸發與清理都會用到這個時間
  • 調用StreamExecGroupWindowAggregate,創建算子
override def convert(rel: RelNode): RelNode = {val agg = rel.asInstanceOf[FlinkLogicalWindowAggregate]val input = agg.getInputval inputRowType = input.getRowTypeval cluster = rel.getClusterval requiredDistribution = if (agg.getGroupCount != 0) {FlinkRelDistribution.hash(agg.getGroupSet.asList)} else {FlinkRelDistribution.SINGLETON}val requiredTraitSet = input.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL).replace(requiredDistribution)val providedTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)val newInput: RelNode = RelOptRule.convert(input, requiredTraitSet)val config = cluster.getPlanner.getContext.asInstanceOf[FlinkContext].getTableConfigval emitStrategy = WindowEmitStrategy(config, agg.getWindow)val timeField = agg.getWindow.timeAttribute//時間字段index信息val inputTimestampIndex = if (isRowtimeAttribute(timeField)) {timeFieldIndex(inputRowType, relBuilderFactory.create(cluster, null), timeField)} else {-1}//創建算子new StreamExecGroupWindowAggregate(cluster,providedTraitSet,newInput,rel.getRowType,inputRowType,agg.getGroupSet.toArray,agg.getAggCallList,agg.getWindow,agg.getNamedProperties,inputTimestampIndex,emitStrategy)}

下面來看WindowAggregate

/home/graviti/flink/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupWindowAggregateBase.scala

它是繼承了StreamExecGroupWindowAggregateBase這個抽象類,我們直接來看這個代碼。

主要做了三件事:

  • 生成inpu信息
  • aggHandler用于生成代碼,實現accumulate,retract,merge,update等方法
  • 創建算子,設置window類型,retract標識,trigger,聚合函數句柄等。
override protected def translateToPlanInternal(planner: StreamPlanner): Transformation[BaseRow] = {val config = planner.getTableConfig//循環調用child子節點的translateToPlan 方法,生成input transform信息作為輸入val inputTransform = getInputNodes.get(0).translateToPlan(planner).asInstanceOf[Transformation[BaseRow]]val inputRowTypeInfo = inputTransform.getOutputType.asInstanceOf[BaseRowTypeInfo]val outRowType = BaseRowTypeInfo.of(FlinkTypeFactory.toLogicalRowType(outputRowType))val inputIsAccRetract = StreamExecRetractionRules.isAccRetract(input)if (inputIsAccRetract) {throw new TableException(s"Group Window $aggType: Retraction on windowed GroupBy $aggType is not supported yet. \n" +"please re-check sql grammar. \n" +s"Note: Windowed GroupBy $aggType should not follow a" +"non-windowed GroupBy aggregation.")}val isCountWindow = window match {case TumblingGroupWindow(_, _, size) if hasRowIntervalType(size) => truecase SlidingGroupWindow(_, _, size, _) if hasRowIntervalType(size) => truecase _ => false}if (isCountWindow && grouping.length > 0 && config.getMinIdleStateRetentionTime < 0) {LOG.warn("No state retention interval configured for a query which accumulates state. " +"Please provide a query configuration with valid retention interval to prevent " +"excessive state size. You may specify a retention time of 0 to not clean up the state.")}val timeIdx = if (isRowtimeAttribute(window.timeAttribute)) {if (inputTimeFieldIndex < 0) {throw new TableException(s"Group window $aggType must defined on a time attribute, " +"but the time attribute can't be found.\n" +"This should never happen. Please file an issue.")}inputTimeFieldIndex} else {-1}val needRetraction = StreamExecRetractionRules.isAccRetract(getInput)val aggInfoList = transformToStreamAggregateInfoList(aggCalls,inputRowType,Array.fill(aggCalls.size)(needRetraction),needInputCount = needRetraction,isStateBackendDataViews = true)val aggCodeGenerator = createAggsHandler(aggInfoList,config,planner.getRelBuilder,inputRowTypeInfo.getLogicalTypes,needRetraction)val aggResultTypes = aggInfoList.getActualValueTypes.map(fromDataTypeToLogicalType)val windowPropertyTypes = namedProperties.map(_.property.resultType).toArrayval generator = new EqualiserCodeGenerator(aggResultTypes ++ windowPropertyTypes)val equaliser = generator.generateRecordEqualiser("WindowValueEqualiser")val aggValueTypes = aggInfoList.getActualValueTypes.map(fromDataTypeToLogicalType)val accTypes = aggInfoList.getAccTypes.map(fromDataTypeToLogicalType)val operator = createWindowOperator(config,aggCodeGenerator,equaliser,accTypes,windowPropertyTypes,aggValueTypes,inputRowTypeInfo.getLogicalTypes,timeIdx)val transformation = new OneInputTransformation(inputTransform,getRelDetailedDescription,operator,outRowType,inputTransform.getParallelism)if (inputsContainSingleton()) {transformation.setParallelism(1)transformation.setMaxParallelism(1)}val selector = KeySelectorUtil.getBaseRowSelector(grouping, inputRowTypeInfo)// set KeyType and Selector for statetransformation.setStateKeySelector(selector)transformation.setStateKeyType(selector.getProducedType)transformation}/*** 生成的創建的class實現了accumulate、retract、merge、update方法,* 這個handler最后也傳遞給了WindowOperater,處理數據時,可以進行聚合、回撤并輸出最新數據給下游* @param aggInfoList* @param config* @param relBuilder* @param fieldTypeInfos* @param needRetraction* @return*/private def createAggsHandler(aggInfoList: AggregateInfoList,config: TableConfig,relBuilder: RelBuilder,fieldTypeInfos: Seq[LogicalType],needRetraction: Boolean): GeneratedClass[_] = {val needMerge = window match {case SlidingGroupWindow(_, _, size, _) if hasTimeIntervalType(size) => truecase SessionGroupWindow(_, _, _) => truecase _ => false}val windowClass = window match {case TumblingGroupWindow(_, _, size) if hasRowIntervalType(size) =>classOf[CountWindow]case SlidingGroupWindow(_, _, size, _) if hasRowIntervalType(size) =>classOf[CountWindow]case _ => classOf[TimeWindow]}val generator = new AggsHandlerCodeGenerator(CodeGeneratorContext(config),relBuilder,fieldTypeInfos,copyInputField = false)generator.needAccumulate()if (needMerge) {generator.needMerge(mergedAccOffset = 0, mergedAccOnHeap = false)}if (needRetraction) {generator.needRetract()}val isTableAggregate =AggregateUtil.isTableAggregate(aggInfoList.getActualAggregateCalls.toList)if (isTableAggregate) {generator.generateNamespaceTableAggsHandler("GroupingWindowTableAggsHandler",aggInfoList,namedProperties.map(_.property),windowClass)} else {generator.generateNamespaceAggsHandler("GroupingWindowAggsHandler",aggInfoList,namedProperties.map(_.property),windowClass)}}/*** StreamExecGroupWindowAggregate與window相關的最后一步就是調用#createWindowOperator創建算子,* 其內部先創建了一個WindowOperatorBuilder,設置window類型、retract標識、trigger(window觸發條件)、* 聚合函數句柄等,最后創建WindowOperator** @param config* @param aggsHandler* @param recordEqualiser* @param accTypes* @param windowPropertyTypes* @param aggValueTypes* @param inputFields* @param timeIdx* @return*/private def createWindowOperator(config: TableConfig,aggsHandler: GeneratedClass[_],recordEqualiser: GeneratedRecordEqualiser,accTypes: Array[LogicalType],windowPropertyTypes: Array[LogicalType],aggValueTypes: Array[LogicalType],inputFields: Seq[LogicalType],timeIdx: Int): WindowOperator[_, _] = {val builder = WindowOperatorBuilder.builder().withInputFields(inputFields.toArray)//操作算子是通過builder來創建的val newBuilder = window match {case TumblingGroupWindow(_, timeField, size)if isProctimeAttribute(timeField) && hasTimeIntervalType(size) =>builder.tumble(toDuration(size)).withProcessingTime()case TumblingGroupWindow(_, timeField, size)if isRowtimeAttribute(timeField) && hasTimeIntervalType(size) =>builder.tumble(toDuration(size)).withEventTime(timeIdx)case TumblingGroupWindow(_, timeField, size)if isProctimeAttribute(timeField) && hasRowIntervalType(size) =>builder.countWindow(toLong(size))case TumblingGroupWindow(_, _, _) =>// TODO: EventTimeTumblingGroupWindow should sort the stream on event time// before applying the windowing logic. Otherwise, this would be the same as a// ProcessingTimeTumblingGroupWindowthrow new UnsupportedOperationException("Event-time grouping windows on row intervals are currently not supported.")case SlidingGroupWindow(_, timeField, size, slide)if isProctimeAttribute(timeField) && hasTimeIntervalType(size) =>builder.sliding(toDuration(size), toDuration(slide)).withProcessingTime()case SlidingGroupWindow(_, timeField, size, slide)if isRowtimeAttribute(timeField) && hasTimeIntervalType(size) =>builder.sliding(toDuration(size), toDuration(slide)).withEventTime(timeIdx)case SlidingGroupWindow(_, timeField, size, slide)if isProctimeAttribute(timeField) && hasRowIntervalType(size) =>builder.countWindow(toLong(size), toLong(slide))case SlidingGroupWindow(_, _, _, _) =>// TODO: EventTimeTumblingGroupWindow should sort the stream on event time// before applying the windowing logic. Otherwise, this would be the same as a// ProcessingTimeTumblingGroupWindowthrow new UnsupportedOperationException("Event-time grouping windows on row intervals are currently not supported.")case SessionGroupWindow(_, timeField, gap)if isProctimeAttribute(timeField) =>builder.session(toDuration(gap)).withProcessingTime()case SessionGroupWindow(_, timeField, gap)if isRowtimeAttribute(timeField) =>builder.session(toDuration(gap)).withEventTime(timeIdx)}if (emitStrategy.produceUpdates) {// mark this operator will send retraction and set new triggernewBuilder.withSendRetraction().triggering(emitStrategy.getTrigger).withAllowedLateness(Duration.ofMillis(emitStrategy.getAllowLateness))}aggsHandler match {case agg: GeneratedNamespaceAggsHandleFunction[_] =>newBuilder.aggregate(agg, recordEqualiser, accTypes, aggValueTypes, windowPropertyTypes).build()case tableAgg: GeneratedNamespaceTableAggsHandleFunction[_] =>newBuilder.aggregate(tableAgg, accTypes, aggValueTypes, windowPropertyTypes).build()}}

優化點:
注意到代碼有注釋
EventTimeTumblingGroupWindow should sort the stream on event time
這是可以著手優化的地方

上一小節,已經完成了WindowOperator參數的設定,并創建實例,接下來我們主要分析WindowOperator真實處理數據的流程(起點在WindowOperator#processElement方法):

Windowoperator

當流中一個元素到達的時候,它會被分配到一個key,由KeySelector(udf)完成。

然后WindowAssinger會給這個元素分配到0到多個窗口。

經過這些步驟之后,這個元素就相當于被放入一個pane,即窗格。

所謂窗格,是一系列擁有相同key和在相同窗口的元素。這是flink對窗口做的一種優化,具體原理見窗格介紹

然后每一個窗格都有一個屬于其的Trigger實例,Trigger決定什么時候窗格內的元素會被觸發計算。計算的業務邏輯是由用戶自己定義的。

@Overridepublic void processElement(StreamRecord<IN> element) throws Exception {//對每個元素分配窗口final Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp(), windowAssignerContext);//if element is handled by none of assigned elementWindowsboolean isSkippedElement = true;final K key = this.<K>getKeyedStateBackend().getCurrentKey();//if (windowAssigner instanceof MergingWindowAssigner) {MergingWindowSet<W> mergingWindows = getMergingWindowSet();for (W window: elementWindows) {//接下來是遍歷涉及的窗口進行聚合,包括從windowState獲取聚合前值、使用句柄進行聚合、// 更新狀態至windowState,將當前轉態// adding the new window might result in a merge, in that case the actualWindow// is the merged window and we work with that. If we don't merge then// actualWindow == windowW actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() {@Overridepublic void merge(W mergeResult,Collection<W> mergedWindows, W stateWindowResult,Collection<W> mergedStateWindows) throws Exception {//如果最大時間戳加上可以允許的延遲還是小于這個水印線,那么說明這個數據要被丟棄了if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {throw new UnsupportedOperationException("The end timestamp of an " +"event-time window cannot become earlier than the current watermark " +"by merging. Current watermark: " + internalTimerService.currentWatermark() +" window: " + mergeResult);} else if (!windowAssigner.isEventTime()) {long currentProcessingTime = internalTimerService.currentProcessingTime();if (mergeResult.maxTimestamp() <= currentProcessingTime) {throw new UnsupportedOperationException("The end timestamp of a " +"processing-time window cannot become earlier than the current processing time " +"by merging. Current processing time: " + currentProcessingTime +" window: " + mergeResult);}}triggerContext.key = key;triggerContext.window = mergeResult;triggerContext.onMerge(mergedWindows);for (W m: mergedWindows) {triggerContext.window = m;triggerContext.clear();deleteCleanupTimer(m);}//更新狀態// merge the merged state windows into the newly resulting state windowwindowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);}});// drop if the window is already lateif (isWindowLate(actualWindow)) {mergingWindows.retireWindow(actualWindow);continue;}isSkippedElement = false;W stateWindow = mergingWindows.getStateWindow(actualWindow);if (stateWindow == null) {throw new IllegalStateException("Window " + window + " is not in in-flight window set.");}windowState.setCurrentNamespace(stateWindow);//把元素分配到相應的聚合后的windowwindowState.add(element.getValue());//使用TriggerContext(其實就是不同類型窗口Trigger觸發器的代理),// 綜合early fire、late fire、水印時間與窗口結束時間,綜合判斷是否觸發窗口寫出triggerContext.key = key;triggerContext.window = actualWindow;//判斷是否觸發TriggerResult triggerResult = triggerContext.onElement(element);if (triggerResult.isFire()) {ACC contents = windowState.get();if (contents == null) {continue;}//觸發窗口計算emitWindowContents(actualWindow, contents);}if (triggerResult.isPurge()) {windowState.clear();}registerCleanupTimer(actualWindow);}// need to make sure to update the merging state in statemergingWindows.persist();} else {for (W window: elementWindows) {// drop if the window is already lateif (isWindowLate(window)) {continue;}isSkippedElement = false;windowState.setCurrentNamespace(window);windowState.add(element.getValue());triggerContext.key = key;triggerContext.window = window;TriggerResult triggerResult = triggerContext.onElement(element);if (triggerResult.isFire()) {ACC contents = windowState.get();if (contents == null) {continue;}emitWindowContents(window, contents);}if (triggerResult.isPurge()) {windowState.clear();}registerCleanupTimer(window);}}// side output input event if// element not handled by any window// late arriving tag has been set// windowAssigner is event time and current timestamp + allowed lateness no less than element timestampif (isSkippedElement && isElementLate(element)) {if (lateDataOutputTag != null){sideOutput(element);} else {this.numLateRecordsDropped.inc();}}}

總結

以上是生活随笔為你收集整理的Flink-SQL源码解读(一)window算子的创建的源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。