日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 综合教程 >内容正文

综合教程

Flink中的CEP复杂事件处理 (源码分析)

發布時間:2023/12/13 综合教程 38 生活家
生活随笔 收集整理的這篇文章主要介紹了 Flink中的CEP复杂事件处理 (源码分析) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

其實CEP復雜事件處理,簡單來說你可以用通過類似正則表達式的方式去表示你的邏輯,表現能力非常的強,用過的人都知道

開篇先偷一張圖,整體了解FlinkCEP中的 一種重要的圖 NFA

FlinkCEP在運行時會將用戶的邏輯轉化成這樣的一個NFA Graph (nfa對象)

graph 中包含狀態(Flink中State對象),以及連接狀態的邊(Flink中StateTransition對象)

當從一個State跳變到另一個State時需要通過一條邊StateTransition,這條邊中包含一個Condition對象包含了用戶的邏輯就是我們用戶代碼中.where()中返回Boolean的方法

也就是說Condition對象中包含是否可以完成狀態跳變的條件,A狀態要跳變到B狀態就必須滿足連接AB的邊中的條件(邊StateTransition對象屬于B state)

其中邊StateTransition分為三種

  take: 狀態滿足跳變條件后直接跳變到B狀態

  ignore: 狀態滿足跳變條件以后又回到原來狀態,狀態保持不變

  process: 這條邊可以忽略也可以不忽略

后面源碼分析的時候可以看到他們之間的區別

接著從源碼來看一下如何用這個NFA圖實現Flink中的CEP復雜事件處理的

因為CEP在Flink中被設計成算子的一種而不是單獨的計算引擎,所以直接找到CepOperator.java中

來看一下它的初始化Open()

這里看到有一個NFAFactory的工廠創建了一個NFA,這里的這個工廠是在Driver端通過用戶編寫的代碼返回的Patten對象轉換得到的,也就是用戶env.exection()的時候解析的,工廠對象還包含了用戶所有的State集合

繼續,在createNFA()方法中

將工廠中的所有頂點也就是狀態States放到了NFA對象的一個Map中

Key為這個States的Name(其實就是用戶代碼中的.next("Name"))

接著看CepOperator.java中接收到數據processElement()方法做了什么

這里是處理時間的,這里其實就是直接執行了,這里就不看了,直接看事件時間是如何處理的

先是取出數據的事件時間,判斷是不是小于當前水印了,小于這條數據就證明遲到太久了,如果有側輸出丟給側輸出處理,沒有就直接丟棄了,和WindowOperater一樣

然后看saveRegisterWatermarkTimer()方法

將 (當前水印+1) 注冊成了一個定時器timer用于觸發計算,和window原理一樣(不知道的可以看看前面的文章)

這里主要是因為窗口是一批一批觸發而CEP需要逐個觸發,所以用(當前水印+1)當做定時器,也就是說只要水印往前推進了就觸發推進這段時間的所有計算

然后bufferEvent()將這條數據加入到了一個Queue中

現在來看觸發計算的具體邏輯

來到onEventTime()方法中

先是拿到一個用時間排序的優先隊列PriorityQueue里面就是排序的事件時間

getNFAState()這里比較重要,這里通過nfa得到了一個nfaState具體來看一下

這里這個NFAstate會初始化,NFAstate里面包含了一個ComputationState的queue,主要目的是用于每條數據來的時候都會去遍歷這個queue,看這條數據是否能匹配上里面的state如果匹配上了就更新下一個準備匹配的狀態

這里就知道他為什么NFAstate初始化的時候會把用戶所有的State中可以作為開始start的狀態放queue了吧

因為一開始沒數據,當來數據的時候我要判斷這條數據是不是屬于我CEP的Begin頭,這個state也就是我們用戶的begin()方法,所以才把所有的可以作為開始的狀態都放到這個PartialMatches這個queue中去,這個PartialMatches后面計算的時候會用到,注意

NFAState的初始化就講完了

繼續,回到處理邏輯

然后根據事件時間作為key拉取前面將數據放入的那個queue中數據,返回的是一個List包含這個事件時間的所有數據

然后排序,這里是二次排序,第一次排序是用的事件時間,二次排序排的是同一時間的數據按什么順序處理

然后這里ProcessEvent()方法就是具體執行的邏輯了,這里同時會把剛剛初始化好的NFAState傳遞進去

一開始會獲取一個共享的緩沖區主要是為了減小CEP重復數據存儲的內存占用,這里不講了因為CEP論文里面有,比較復雜

這里process()方法就是具體邏輯了,返回了一個map這個map包含了process()方法這條數據匹配成功結束的數據也就是結果,而processMatchedSequences(patterns, timestamp)就是執行用戶的.select()邏輯了

既然這里就得到了CEP匹配的結果,來看下具體計算邏輯nfa.process()

這里又初始化兩個優先隊列

分別用于

  newPartialMatches 裝nfa匹配到一半沒有結束數據,也就是半匹配,

  potentialMatches 裝成功匹配完成的數據,用于返回,調用用戶的方法去處理結果

接著

這里就直接去初始化好的NFAState中拿剛剛的那個PartialMatches,并且遍歷它,通過傳入這個computeNextStates()方法,用于判斷這條數據是否可以滿足這個ComputationState完成匹配

注意! 一開始時初始化里面只有所有可作為CEP匹配頭的ComputationState,可想而知當后面匹配上了以后肯定會更新這個用于看數據是否匹配的queue

這里就可以知道了整個CEP的處理方式了:  

   一開始會把所有可以作為CEP匹配頭的狀態State先放入queue,每來一條數據就會遍歷queue中所有state,看這條數據是否能能匹配上,能匹配上就在queue中加入下一個用于匹配的狀態, 用于看下一條數據能否繼續匹配上

   比如一個正則"abc"用于CEP匹配 當來了一條a數據,就匹配上CEP頭了,會把b state加入queue中,接著來了一條b數據,又繼續匹配上了,又把c state加入queue 直到來了一條c數據整個就匹配完成,返回結果

   總結 : 處理過程就是兩步

        1.來一條數據,遍歷queue中所有state,看哪些state能匹配上就匹配

        2.根據1的結果更新queue,用于下一條數據的匹配 

    

而判斷是否能匹配上就是這個computerNextStates()方法中

先把這個狀態state壓棧

從棧中取state遍歷它所有的邊StateTransitions

調用用戶的方法看是否能滿足邊條件,也就是說是否能跳變到這個狀態

當滿足時,會根據邊

  ignore: 啥都不做

  take: 加入結果集中

  process: 又把這個狀態的下一個狀態state壓棧了,繼續循環處理

結果返回這條數據匹配上的狀態們,于是

遍歷所有匹配上的狀態得結果集,會把匹配上的狀態的下一個(target)用于匹配的狀態加進queue去

如果是結束,默認NFAstate中是有一個自帶"&end"的結束state

遍歷所有完成的狀態,當匹配上最后一個狀態時就是上面說的“&end”就證明完成了,丟到完成queue中

當匹配失敗了就清空狀態

當匹配上了但還沒有結束就丟到半匹配queue

接著

會先執行跳過策略把結果篩選一遍

然后

就是用我們前面說的那個半匹配queue了,用它又繼續更新了NFAState中的PartialMatches了

下一條數據來了以后就會用遍歷這個新queue集合來判斷是否可以繼續匹配了

然后返回這次匹配成功的數據,調用用戶select方法處理結果了

總結

以上是生活随笔為你收集整理的Flink中的CEP复杂事件处理 (源码分析)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。