flink实时流遇到的问题排查——部分数据未落库redis问题
flink實時流遇到的問題排查
- 1、技術和環境
- 2、問題表述
- 3、簡化的代碼
- 4、問題排查思路
- 5、結論
- 6、后續補充
1、技術和環境
技術:kafka、zookeeper、DataStream、redis
環境表述:kafka生產者KafkaProducerTest類mock 3條日志后,FlinkDataRealTimeFlowDeal類有建消費者,消費日志數據進行實時流DataStream處理,進行日志清洗、數據落庫redis。
2、問題表述
理論上:
KafkaProducerTest生產者每次執行x條日志,消費者實際讀取x條日志,實際落庫x條處理結果。
實際:
(有問題)KafkaProducerTest生產者第1次執行3條日志,消費者實際讀取3條日志,實際落庫2條處理結果。
(正常)KafkaProducerTest生產者第2次執行之前的3條日志,消費者實際讀取3條日志,實際落庫3條處理結果。
(有問題)KafkaProducerTest生產者第3次執行之前的3條日志,消費者實際讀取3條日志,實際落庫2條處理結果。
(正常)KafkaProducerTest生產者第4次執行之前的3條日志,消費者實際讀取3條日志,實際落庫3條處理結果。
…
問題總結:奇數次執行時數據漏掉了1條沒有落庫,偶數次全部落庫成功。
注意:每次執行的是相同的日志數據(測試用)。
3、簡化的代碼
DataStream.flatMap(進行日志清洗-Collector收集).keyBy(0).countWindow(2).reduce(進行聚合).process(進行redis落庫)4、問題排查思路
首先查看日志,發現:
進行【日志清洗-Collector收集】的方法:內部的錯誤日志信息打印處理了3次,符合要求,說明3條日志均進行了日志清洗。
然后,結合看方法:flatMap(String value, Collector<Tuple2<String, List>> out),flatMap扁平化得到的Collector是一個Tuple2<String, List類型的收集。
接著,keyby(0)后(以Tuple2的第一個參數對數據進行平鋪)得到:
String key1,List< UserEventAction>111
String key2,List< UserEventAction>11
String key2,List< UserEventAction>22
String key3,List< UserEventAction>1
String key3,List< UserEventAction>2
String key3,List< UserEventAction>3
第一條日志(未處理):(key1平鋪結果:處理0條,未處理1條)
第二條日志(處理):(key2平鋪結果:處理2條,未處理0條)
第三條日志(部分處理):(key3平鋪結果:處理2條,未處理1條)
備注:本業務場景,一條日志對應一個key。
進行【聚合】的方法reduce:內部共處理了4條Tuple2<String, List< UserEventAction>里的平鋪元素(日志清洗的對象結果)。缺少了2條未處理。
觀察到:對應日志條數:未處理1條。處理了2條日志(1條處理,1條部分處理)。
聚合前一步是countWindow。
推斷是countWindow(2)出現問題。
上述結果推測:
key1暫不處理:具備1條數據,在countWindow(2)時數據每滿2個觸發一次,會處理。暫時不處理List< UserEventAction>111。等下一次key1有新數據時候,滿2處理。
key2處理:具備2條數據,在countWindow(2)時數據每滿2個觸發一次,會處理2條:List< UserEventAction>11,List< UserEventAction>22。
key3部分處理:具備3條數據,在countWindow(2)時數據每滿2個觸發一次,會處理2條:List< UserEventAction>1,List< UserEventAction>2。暫時不處理List< UserEventAction>3。等下一次key3有新數據時候,滿2條處理。
備注:也可以從落庫redis的數據反序列化后得到印證。
5、結論
把countWindow改成1就可以都落庫了。
DataStream.flatMap(進行日志清洗-Collector收集).keyBy(0).countWindow(1).reduce(進行聚合).process(進行redis落庫)問題解決了,問題在countWindow(2),也就是當根據keyBy(0)分組之后,數據的數量每次達到2時進行輸出。
日志清洗后的結果是一條日志對應一個key,一個key對應多個List< UserEventAction>或者單個List< UserEventAction>。
設置2的時候,對應的key的List< UserEventAction>如果只有一條就不會落庫,kafka生產者執行兩次時候就會累積到兩條相同的key的數據,每滿2條處理后續操作,所以之前有奇數次執行和偶數次執行區別。
6、后續補充
主要是和flatMap平鋪后收集到的key的種類數量有關系。
總結
以上是生活随笔為你收集整理的flink实时流遇到的问题排查——部分数据未落库redis问题的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: discuz的ajax,discuz分页
- 下一篇: IDEA查看源码时总是出现.class而