日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

flink实时流遇到的问题排查——部分数据未落库redis问题

發布時間:2025/3/11 编程问答 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 flink实时流遇到的问题排查——部分数据未落库redis问题 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

flink實時流遇到的問題排查

    • 1、技術和環境
    • 2、問題表述
    • 3、簡化的代碼
    • 4、問題排查思路
    • 5、結論
    • 6、后續補充

1、技術和環境

技術:kafka、zookeeper、DataStream、redis
環境表述:kafka生產者KafkaProducerTest類mock 3條日志后,FlinkDataRealTimeFlowDeal類有建消費者,消費日志數據進行實時流DataStream處理,進行日志清洗、數據落庫redis。

2、問題表述

理論上:
KafkaProducerTest生產者每次執行x條日志,消費者實際讀取x條日志,實際落庫x條處理結果。

實際:
(有問題)KafkaProducerTest生產者第1次執行3條日志,消費者實際讀取3條日志,實際落庫2條處理結果。
(正常)KafkaProducerTest生產者第2次執行之前的3條日志,消費者實際讀取3條日志,實際落庫3條處理結果。
(有問題)KafkaProducerTest生產者第3次執行之前的3條日志,消費者實際讀取3條日志,實際落庫2條處理結果。
(正常)KafkaProducerTest生產者第4次執行之前的3條日志,消費者實際讀取3條日志,實際落庫3條處理結果。

問題總結:奇數次執行時數據漏掉了1條沒有落庫,偶數次全部落庫成功。
注意:每次執行的是相同的日志數據(測試用)。

3、簡化的代碼

DataStream.flatMap(進行日志清洗-Collector收集).keyBy(0).countWindow(2).reduce(進行聚合).process(進行redis落庫)

4、問題排查思路

首先查看日志,發現:
進行【日志清洗-Collector收集】的方法:內部的錯誤日志信息打印處理了3次,符合要求,說明3條日志均進行了日志清洗。

然后,結合看方法:flatMap(String value, Collector<Tuple2<String, List>> out),flatMap扁平化得到的Collector是一個Tuple2<String, List類型的收集。

接著,keyby(0)后(以Tuple2的第一個參數對數據進行平鋪)得到:
String key1,List< UserEventAction>111

String key2,List< UserEventAction>11
String key2,List< UserEventAction>22

String key3,List< UserEventAction>1
String key3,List< UserEventAction>2
String key3,List< UserEventAction>3

第一條日志(未處理):(key1平鋪結果:處理0條,未處理1條)
第二條日志(處理):(key2平鋪結果:處理2條,未處理0條)
第三條日志(部分處理):(key3平鋪結果:處理2條,未處理1條)
備注:本業務場景,一條日志對應一個key。

進行【聚合】的方法reduce:內部共處理了4條Tuple2<String, List< UserEventAction>里的平鋪元素(日志清洗的對象結果)。缺少了2條未處理。

觀察到:對應日志條數:未處理1條。處理了2條日志(1條處理,1條部分處理)。

聚合前一步是countWindow。
推斷是countWindow(2)出現問題。

上述結果推測:
key1暫不處理:具備1條數據,在countWindow(2)時數據每滿2個觸發一次,會處理。暫時不處理List< UserEventAction>111。等下一次key1有新數據時候,滿2處理。

key2處理:具備2條數據,在countWindow(2)時數據每滿2個觸發一次,會處理2條:List< UserEventAction>11,List< UserEventAction>22。

key3部分處理:具備3條數據,在countWindow(2)時數據每滿2個觸發一次,會處理2條:List< UserEventAction>1,List< UserEventAction>2。暫時不處理List< UserEventAction>3。等下一次key3有新數據時候,滿2條處理。

備注:也可以從落庫redis的數據反序列化后得到印證。

5、結論

把countWindow改成1就可以都落庫了。

DataStream.flatMap(進行日志清洗-Collector收集).keyBy(0).countWindow(1).reduce(進行聚合).process(進行redis落庫)

問題解決了,問題在countWindow(2),也就是當根據keyBy(0)分組之后,數據的數量每次達到2時進行輸出。

日志清洗后的結果是一條日志對應一個key,一個key對應多個List< UserEventAction>或者單個List< UserEventAction>。

設置2的時候,對應的key的List< UserEventAction>如果只有一條就不會落庫,kafka生產者執行兩次時候就會累積到兩條相同的key的數據,每滿2條處理后續操作,所以之前有奇數次執行和偶數次執行區別。

6、后續補充

主要是和flatMap平鋪后收集到的key的種類數量有關系。

總結

以上是生活随笔為你收集整理的flink实时流遇到的问题排查——部分数据未落库redis问题的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。