日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

left join 后数据变多_Flink 双流 Join 的3种操作示例

發布時間:2023/12/19 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 left join 后数据变多_Flink 双流 Join 的3种操作示例 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
在數據庫中的靜態表上做 OLAP 分析時,兩表 join 是非常常見的操作。同理,在流式處理作業中,有時也需要在兩條流上做 join 以獲得更豐富的信息。Flink DataStream API 為用戶提供了3個算子來實現雙流 join,分別是:
  • join()
  • coGroup()
  • intervalJoin()
本文舉例說明它們的使用方法,順便聊聊比較特殊的 interval join 的原理。

準備數據

從 Kafka 分別接入點擊流和訂單流,并轉化為 POJO。DataStream<String> clickSourceStream = env .addSource(new FlinkKafkaConsumer011<>( "ods_analytics_access_log", new SimpleStringSchema(), kafkaProps ).setStartFromLatest());DataStream<String> orderSourceStream = env .addSource(new FlinkKafkaConsumer011<>( "ods_ms_order_done", new SimpleStringSchema(), kafkaProps ).setStartFromLatest());DataStream clickRecordStream = clickSourceStream .map(message -> JSON.parseObject(message, AnalyticsAccessLogRecord.class));DataStream orderRecordStream = orderSourceStream .map(message -> JSON.parseObject(message, OrderDoneLogRecord.class));

join()

join() 算子提供的語義為"Window join",即按照指定字段和(滾動/滑動/會話)窗口進行?inner join,支持處理時間和事件時間兩種時間特征。以下示例以10秒滾動窗口,將兩個流通過商品 ID 關聯,取得訂單流中的售價相關字段。clickRecordStream .join(orderRecordStream) .where(record -> record.getMerchandiseId()) .equalTo(record -> record.getMerchandiseId()) .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .apply(new JoinFunction() { @Override public String join(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord) throws Exception { return StringUtils.join(Arrays.asList( accessRecord.getMerchandiseId(), orderRecord.getPrice(), orderRecord.getCouponMoney(), orderRecord.getRebateAmount() ), '\t'); } }) .print().setParallelism(1);簡單易用。

coGroup()

只有 inner join 肯定還不夠,如何實現 left/right outer join 呢?答案就是利用 coGroup() 算子。它的調用方式類似于 join() 算子,也需要開窗,但是 CoGroupFunction 比 JoinFunction 更加靈活,可以按照用戶指定的邏輯匹配左流和/或右流的數據并輸出。以下的例子就實現了點擊流 left join 訂單流的功能,是很樸素的 nested loop join 思想(二重循環)。clickRecordStream .coGroup(orderRecordStream) .where(record -> record.getMerchandiseId()) .equalTo(record -> record.getMerchandiseId()) .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .apply(new CoGroupFunctionString, Long>>() { @Override public void coGroup(Iterable accessRecords, Iterable orderRecords, CollectorString, Long>> collector) throws Exception { for (AnalyticsAccessLogRecord accessRecord : accessRecords) { boolean isMatched = false; for (OrderDoneLogRecord orderRecord : orderRecords) { // 右流中有對應的記錄 collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), orderRecord.getPrice())); isMatched = true; } if (!isMatched) { // 右流中沒有對應的記錄 collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), null)); } } } }) .print().setParallelism(1);

intervalJoin()

join() 和 coGroup() 都是基于窗口做關聯的。但是在某些情況下,兩條流的數據步調未必一致。例如,訂單流的數據有可能在點擊流的購買動作發生之后很久才被寫入,如果用窗口來圈定,很容易 join 不上。所以 Flink 又提供了"Interval join"的語義,按照指定字段以及右流相對左流偏移的時間區間進行關聯,即:right.timestamp ∈ [left.timestamp + lowerBound; left.timestamp + upperBound]interval join 也是 inner join,雖然不需要開窗,但是需要用戶指定偏移區間的上下界,并且只支持事件時間。示例代碼如下。注意在運行之前,需要分別在兩個流上應用 assignTimestampsAndWatermarks() 方法獲取事件時間戳和水印。clickRecordStream .keyBy(record -> record.getMerchandiseId()) .intervalJoin(orderRecordStream.keyBy(record -> record.getMerchandiseId())) .between(Time.seconds(-30), Time.seconds(30)) .process(new ProcessJoinFunctionString>() { @Override public void processElement(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord, Context context, Collector<String> collector) throws Exception { collector.collect(StringUtils.join(Arrays.asList( accessRecord.getMerchandiseId(), orderRecord.getPrice(), orderRecord.getCouponMoney(), orderRecord.getRebateAmount() ), '\t')); } }) .print().setParallelism(1);由上可見,interval join 與 window join?不同,是兩個 KeyedStream 之上的操作,并且需要調用 between() 方法指定偏移區間的上下界。如果想令上下界是開區間,可以調用 upperBoundExclusive()/lowerBoundExclusive() 方法。
interval join 的實現原理
以下是 KeyedStream.process(ProcessJoinFunction) 方法調用的重載方法的邏輯。public <OUT> SingleOutputStreamOperator<OUT> process( ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction, TypeInformation<OUT> outputType) { Preconditions.checkNotNull(processJoinFunction); Preconditions.checkNotNull(outputType); final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction); final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator = new IntervalJoinOperator<>( lowerBound, upperBound, lowerBoundInclusive, upperBoundInclusive, left.getType().createSerializer(left.getExecutionConfig()), right.getType().createSerializer(right.getExecutionConfig()), cleanedUdf ); return left .connect(right) .keyBy(keySelector1, keySelector2) .transform("Interval Join", outputType, operator);}可見是先對兩條流執行 connect() 和 keyBy() 操作,然后利用 IntervalJoinOperator 算子進行轉換。在 IntervalJoinOperator 中,會利用兩個 MapState 分別緩存左流和右流的數據。private transient MapState>> leftBuffer;private transient MapState>> rightBuffer;@Overridepublic void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>( LEFT_BUFFER, LongSerializer.INSTANCE, new ListSerializer<>(new BufferEntrySerializer<>(leftTypeSerializer)) )); this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>( RIGHT_BUFFER, LongSerializer.INSTANCE, new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer)) ));}其中 Long 表示事件時間戳,List> 表示該時刻到來的數據記錄。當左流和右流有數據到達時,會分別調用 processElement1() 和 processElement2() 方法,它們都調用了 processElement() 方法,代碼如下。@Overridepublic void processElement1(StreamRecord record) throws Exception { processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);}@Overridepublic void processElement2(StreamRecord record) throws Exception { processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);}@SuppressWarnings("unchecked")private void processElement( final StreamRecord record, final MapState>> ourBuffer, final MapState>> otherBuffer, final long relativeLowerBound, final long relativeUpperBound, final boolean isLeft) throws Exception { final THIS ourValue = record.getValue(); final long ourTimestamp = record.getTimestamp(); if (ourTimestamp == Long.MIN_VALUE) { throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " + "interval stream joins need to have timestamps meaningful timestamps."); } if (isLate(ourTimestamp)) { return; } addToBuffer(ourBuffer, ourValue, ourTimestamp); for (Map.Entry>> bucket: otherBuffer.entries()) { final long timestamp = bucket.getKey(); if (timestamp < ourTimestamp + relativeLowerBound || timestamp > ourTimestamp + relativeUpperBound) { continue; } for (BufferEntry entry: bucket.getValue()) { if (isLeft) { collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp); } else { collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp); } } } long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp; if (isLeft) { internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime); } else { internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime); }}這段代碼的思路是:
  • 取得當前流 StreamRecord 的時間戳,調用 isLate() 方法判斷它是否是遲到數據(即時間戳小于當前水印值),如是則丟棄。
  • 調用 addToBuffer() 方法,將時間戳和數據一起插入當前流對應的 MapState。
  • 遍歷另外一個流的 MapState,如果數據滿足前述的時間區間條件,則調用 collect() 方法將該條數據投遞給用戶定義的 ProcessJoinFunction 進行處理。collect() 方法的代碼如下,注意結果對應的時間戳是左右流時間戳里較大的那個。
  • private?void?collect(T1?left,?T2?right,?long?leftTimestamp,?long?rightTimestamp)?throws?Exception?{ final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp); collector.setAbsoluteTimestamp(resultTimestamp); context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp); userFunction.processElement(left, right, context, collector);}
  • 調用 TimerService.registerEventTimeTimer() 注冊時間戳為 timestamp + relativeUpperBound?的定時器,該定時器負責在水印超過區間的上界時執行狀態的清理邏輯,防止數據堆積。注意左右流的定時器所屬的 namespace 是不同的,具體邏輯則位于 onEventTime() 方法中。
  • @Overridepublic void onEventTime(InternalTimer timer) throws Exception { long timerTimestamp = timer.getTimestamp(); String namespace = timer.getNamespace(); logger.trace("onEventTime @ {}", timerTimestamp); switch (namespace) { case CLEANUP_NAMESPACE_LEFT: { long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound; logger.trace("Removing from left buffer @ {}", timestamp); leftBuffer.remove(timestamp); break; } case CLEANUP_NAMESPACE_RIGHT: { long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp; logger.trace("Removing from right buffer @ {}", timestamp); rightBuffer.remove(timestamp); break; } default: throw new RuntimeException("Invalid namespace " + namespace); }}本文轉載自簡書,作者:LittleMagic
    原文鏈接:https://www.jianshu.com/p/45ec888332df
    ? Flink?Forward?Asia 2020??大會議程發布Flink Forward Asia 2020 在線峰會重磅開啟!12月13-15日,全球?38+?一線廠商,70+?優質議題,與您探討新型數字化技術下的未來趨勢!大會議程已正式上線,點擊文末「閱讀原文」即可免費預約~(點擊可了解更多大會詳情)戳我預約! 創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

    總結

    以上是生活随笔為你收集整理的left join 后数据变多_Flink 双流 Join 的3种操作示例的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 亚洲不卡视频在线 | 锦绣未央在线观看 | 华丽的外出在线 | jiuse九色| 欧美色涩 | 香蕉一区二区三区四区 | 国产黄色片在线播放 | 九九爱爱视频 | 日一区二区三区 | 闺蜜张开腿让我爽了一夜 | 久久99精品国产麻豆婷婷 | 中文字幕在线免费观看 | 一起操网站 | 淫片aaa| 国产日韩欧美视频在线 | 九色国产在线 | 国产免费视屏 | 欧美日韩国产中文字幕 | 中文字幕人妻一区二区三区 | 日本一区二区三区四区在线观看 | 亚洲欧美日韩在线 | 香蕉午夜视频 | 欧美久久久久久久久中文字幕 | 91原创国产 | 免费看一级黄色片 | 精品国产成人av | 欧美日韩高清一区二区 国产亚洲免费看 | 免费精品无码AV片在线观看黄 | 免费a v在线 | 91热在线 | 久久久.com | 黄色片免费在线播放 | 蜜臀av88 | 午夜精品区| 999成人网| 中文字幕二区三区 | 免费看女生隐私 | 日本三级欧美三级 | 久久久国产精 | 一区二区三区视频在线 | 一级黄色网 | 久久久久久久久久久久久国产 | 色播在线视频 | 久久日av| 欧美在线视频a | 亚洲奶汁xxxx哺乳期 | 国产91av在线 | 第一毛片| 一个人免费在线观看视频 | 青娱乐最新官网 | 精品女同一区二区 | 中文字幕在线观看播放 | 国产亚洲精品美女久久久 | 公与妇乱理三级xxx www色 | 久草视频中文在线 | 二十四小时在线更新观看 | 2019中文在线观看 | 69人人| 国产精品久久久久久久久免费桃花 | 婷婷综合在线视频 | 精品人妻一区二区三区日产乱码 | 噜噜视频 | 超碰97成人 | 无码一区二区三区在线 | 一极黄色大片 | 黑白配高清国语在线观看 | 在线观看中文字幕一区 | 成年女人免费视频 | 久久久成| 国产精品分类 | 国产精品乱子伦 | 日韩丝袜一区 | 91久久久久久久久久久 | 中文av一区二区 | 国产精品久久久久久久久毛片 | 一级中文片 | 欧美大色一区 | 中文字幕第12页 | 欧美两根一起进3p做受视频 | 综合久久久久久 | 美女的奶胸大爽爽大片 | 官场艳妇疯狂性关系 | 老女人丨91丨九色 | 色多多在线观看 | 亚洲一区日本 | 欧洲熟妇的性久久久久久 | 欧美一区二区三区免费视频 | 中文字av | 精品久久久久久久久久久久久久久久久久 | 日本一区二区人妻 | 国产成人无码AA精品区 | 韩日欧美| 日本一区二区不卡视频 | 日韩精品1区2区 | 97看片网| 欧美77777 | 中文字幕一区三区 | 国产精品久久久久久免费观看 | 亚洲va天堂va欧美ⅴa在线 |