Apache Flink CEP 实战
本文根據(jù)Apache Flink 實(shí)戰(zhàn)&進(jìn)階篇系列直播課程整理而成,由哈啰出行大數(shù)據(jù)實(shí)時(shí)平臺(tái)資深開發(fā)劉博分享。通過一些簡(jiǎn)單的實(shí)際例子,從概念原理,到如何使用,再到功能的擴(kuò)展,希望能夠給打算使用或者已經(jīng)使用的同學(xué)一些幫助。
主要的內(nèi)容分為如下三個(gè)部分:
Flink CEP 概念以及使用場(chǎng)景
什么是 CEP
CEP的意思是復(fù)雜事件處理,例如:起床-->洗漱-->吃飯-->上班等一系列串聯(lián)起來的事件流形成的模式稱為CEP。如果發(fā)現(xiàn)某一次起床后沒有刷牙洗臉亦或是吃飯就直接上班,就可以把這種非正常的事件流匹配出來進(jìn)行分析,看看今天是不是起晚了。
下圖中列出了幾個(gè)例子:
- 第一個(gè)是異常行為檢測(cè)的例子:假設(shè)車輛維修的場(chǎng)景中,當(dāng)一輛車出現(xiàn)故障時(shí),這輛車會(huì)被送往維修點(diǎn)維修,然后被重新投放到市場(chǎng)運(yùn)行。如果這輛車被投放到市場(chǎng)之后還未被使用就又被報(bào)障了,那么就有可能之前的維修是無效的。
- 第二個(gè)是策略營(yíng)銷的例子:假設(shè)打車的場(chǎng)景中,用戶在APP上規(guī)劃了一個(gè)行程訂單,如果這個(gè)行程在下單之后超過一定的時(shí)間還沒有被司機(jī)接單的話,那么就需要將這個(gè)訂單輸出到下游做相關(guān)的策略調(diào)整。?
- 第三個(gè)是運(yùn)維監(jiān)控的例子:通常運(yùn)維會(huì)監(jiān)控服務(wù)器的CPU、網(wǎng)絡(luò)IO等指標(biāo)超過閾值時(shí)產(chǎn)生相應(yīng)的告警。但是在實(shí)際使用中,后臺(tái)服務(wù)的重啟、網(wǎng)絡(luò)抖動(dòng)等情況都會(huì)造成瞬間的流量毛刺,對(duì)非關(guān)鍵鏈路可以忽略這些毛刺而只對(duì)頻繁發(fā)生的異常進(jìn)行告警以減少誤報(bào)。
Flink CEP 應(yīng)用場(chǎng)景
- 風(fēng)險(xiǎn)控制:對(duì)用戶異常行為模式進(jìn)行實(shí)時(shí)檢測(cè),當(dāng)一個(gè)用戶發(fā)生了不該發(fā)生的行為,判定這個(gè)用戶是不是有違規(guī)操作的嫌疑。
- 策略營(yíng)銷:用預(yù)先定義好的規(guī)則對(duì)用戶的行為軌跡進(jìn)行實(shí)時(shí)跟蹤,對(duì)行為軌跡匹配預(yù)定義規(guī)則的用戶實(shí)時(shí)發(fā)送相應(yīng)策略的推廣。
- 運(yùn)維監(jiān)控:靈活配置多指標(biāo)、多依賴來實(shí)現(xiàn)更復(fù)雜的監(jiān)控模式。
Flink CEP原理
Flink CEP內(nèi)部是用NFA(非確定有限自動(dòng)機(jī))來實(shí)現(xiàn)的,由點(diǎn)和邊組成的一個(gè)狀態(tài)圖,以一個(gè)初始狀態(tài)作為起點(diǎn),經(jīng)過一系列的中間狀態(tài),達(dá)到終態(tài)。點(diǎn)分為起始狀態(tài)、中間狀態(tài)、最終狀態(tài)三種,邊分為take、ignore、proceed三種。
- take:必須存在一個(gè)條件判斷,當(dāng)?shù)絹淼南M足take邊條件判斷時(shí),把這個(gè)消息放入結(jié)果集,將狀態(tài)轉(zhuǎn)移到下一狀態(tài)。
- ignore:當(dāng)消息到來時(shí),可以忽略這個(gè)消息,將狀態(tài)自旋在當(dāng)前不變,是一個(gè)自己到自己的狀態(tài)轉(zhuǎn)移。?
- proceed:又叫做狀態(tài)的空轉(zhuǎn)移,當(dāng)前狀態(tài)可以不依賴于消息到來而直接轉(zhuǎn)移到下一狀態(tài)。舉個(gè)例子,當(dāng)用戶購(gòu)買商品時(shí),如果購(gòu)買前有一個(gè)咨詢客服的行為,需要把咨詢客服行為和購(gòu)買行為兩個(gè)消息一起放到結(jié)果集中向下游輸出;如果購(gòu)買前沒有咨詢客服的行為,只需把購(gòu)買行為放到結(jié)果集中向下游輸出就可以了。?也就是說,如果有咨詢客服的行為,就存在咨詢客服狀態(tài)的上的消息保存,如果沒有咨詢客服的行為,就不存在咨詢客服狀態(tài)的上的消息保存,咨詢客服狀態(tài)是由一條proceed邊和下游的購(gòu)買狀態(tài)相連。
下面以一個(gè)打車的例子來展示狀態(tài)是如何流轉(zhuǎn)的,規(guī)則見下圖所示。
以乘客制定行程作為開始,匹配乘客的下單事件,如果這個(gè)訂單超時(shí)還沒有被司機(jī)接單的話,就把行程事件和下單事件作為結(jié)果集往下游輸出。
假如消息到來順序?yàn)?#xff1a;行程-->其他-->下單-->其他。
狀態(tài)流轉(zhuǎn)如下:
上面是一個(gè)匹配成功的例子,如果是不成功的例子會(huì)怎么樣?
假如當(dāng)狀態(tài)處于超時(shí)未接單狀態(tài)時(shí),收到了一個(gè)接單事件,那么就不符合超時(shí)未被接單的觸發(fā)條件,此時(shí)整個(gè)模式匹配失敗,之前放入結(jié)果集中的行程事件和下單事件會(huì)被清理。
Flink CEP程序開發(fā)
本節(jié)將詳細(xì)介紹Flink CEP的程序結(jié)構(gòu)以及API。
Flink CEP 程序結(jié)構(gòu)
主要分為兩部分:定義事件模式和匹配結(jié)果處理。
官方示例如下:
DataStream<Event> input = ... Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {@Overridepublic boolean filter(Event event) {return event.getId() == 42;}}).next("middle").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {@Overridepublic boolean filter(SubEvent subEvent) {return subEvent.getVolume() >= 10.0;}}).followedBy("end").where(new SimpleCondition<Event>() {@Overridepublic boolean filter(Event event) {return event.getName().equals("end");}});PatternStream<Event> patternStream = CEP.pattern(input, pattern);DataStream<Alert> result = patternStream.select(new PatternProcessFunction<Event, Alert>() {@Overridepublic void select(Map<String, List<Event>> pattern,Context ctx,Collector<Alert> out) throws Exception {out.collect(createAlertFrom(pattern));}});程序結(jié)構(gòu)分為三部分:首先需要定義一個(gè)模式(Pattern),即第2行代碼所示,接著把定義好的模式綁定在DataStream上(第25行),最后就可以在具有CEP功能的DataStream上將匹配的結(jié)果進(jìn)行處理(第27行)。?
下面對(duì)關(guān)鍵部分做詳細(xì)講解:
定義模式:上面示例中,分為了三步,首先匹配一個(gè)ID為42的事件,接著匹配一個(gè)體積大于等于10的事件,最后等待收到一個(gè)name等于end的事件。?
匹配結(jié)果輸出:此部分,需要重點(diǎn)注意select函數(shù)(第30行,注:本文基于Flink 1.7版本)里邊的Map類型的pattern參數(shù),Key是一個(gè)pattern的name,它的取值是模式定義中的Begin節(jié)點(diǎn)start,或者是接下來next里面的middle,或者是第三個(gè)步驟的end。后面的map中的value是每一步發(fā)生的匹配事件。因在每一步中是可以使用循環(huán)屬性的,可以匹配發(fā)生多次,所以map中的value是匹配發(fā)生多次的所有事件的一個(gè)集合。
Flink CEP構(gòu)成
上圖中,藍(lán)色方框代表的是一個(gè)個(gè)單獨(dú)的模式;淺黃色的橢圓代表的是這個(gè)模式上可以添加的屬性,包括模式可以發(fā)生的循環(huán)次數(shù),或者這個(gè)模式是貪婪的還是可選的;橘色的橢圓代表的是模式間的關(guān)系,定義了多個(gè)模式之間是怎么樣串聯(lián)起來的。通過定義模式,添加相應(yīng)的屬性,將多個(gè)模式串聯(lián)起來三步,就可以構(gòu)成了一個(gè)完整的Flink CEP程序。
定義模式
下面是示例代碼:
pattern.next("start").where(new SimpleCondition<Event>() {@Overridepublic boolean filter(Event event) {return event.getId() == 42;}} )定義模式主要有如下5個(gè)部分組成:
pattern:前一個(gè)模式
next/followedBy/...:開始一個(gè)新的模式
start:模式名稱
where:模式的內(nèi)容
filter:核心處理邏輯
模式的屬性
接下來介紹一下怎樣設(shè)置模式的屬性。模式的屬性主要分為循環(huán)屬性和可選屬性。
循環(huán)屬性可以定義模式匹配發(fā)生固定次數(shù)(times),匹配發(fā)生一次以上(oneOrMore),匹配發(fā)生多次以上。(timesOrMore)。
可選屬性可以設(shè)置模式是貪婪的(greedy),即匹配最長(zhǎng)的串,或設(shè)置為可選的(optional),有則匹配,無則忽略。
模式的有效期
由于模式的匹配事件存放在狀態(tài)中進(jìn)行管理,所以需要設(shè)置一個(gè)全局的有效期(within)。 若不指定有效期,匹配事件會(huì)一直保存在狀態(tài)中不會(huì)被清除。至于有效期能開多大,要依據(jù)具體使用場(chǎng)景和數(shù)據(jù)量來衡量,關(guān)鍵要看匹配的事件有多少,隨著匹配的事件增多,新到達(dá)的消息遍歷之前的匹配事件會(huì)增加CPU、內(nèi)存的消耗,并且隨著狀態(tài)變大,數(shù)據(jù)傾斜也會(huì)越來越嚴(yán)重。
模式間的聯(lián)系
主要分為三種:嚴(yán)格連續(xù)性(next/notNext),寬松連續(xù)性(followedBy/notFollowedBy),和非確定寬松連續(xù)性(followedByAny)。
三種模式匹配的差別見下表所示:
| Pattern(A B) Streaming('a','c','b1','b2') | 不匹配 | 匹配 輸出:a,b1 | 匹配 輸出:a,b1 a,b2 |
總結(jié)如下:
- 嚴(yán)格連續(xù)性:需要消息的順序到達(dá)與模式完全一致。
- 寬松連續(xù)性:允許忽略不匹配的事件。
- 非確定寬松連性:不僅可以忽略不匹配的事件,也可以忽略已經(jīng)匹配的事件。
多模式組合
除了前面提到的模式定義和模式間的聯(lián)系,還可以把相連的多個(gè)模式組合在一起看成一個(gè)模式組,類似于視圖,可以在這個(gè)模式視圖上進(jìn)行相關(guān)操作。
上圖這個(gè)例子里面,首先匹配了一個(gè)登錄事件,然后接下來匹配瀏覽,下單,購(gòu)買這三個(gè)事件反復(fù)發(fā)生三次的用戶。?
如果沒有模式組的話,代碼里面瀏覽,下單,購(gòu)買要寫三次。有了模式組,只需把瀏覽,下單,購(gòu)買這三個(gè)事件當(dāng)做一個(gè)模式組,把相應(yīng)的屬性加上times(3)就可以了。
處理結(jié)果
處理匹配的結(jié)果主要有四個(gè)接口: PatternFlatSelectFunction,PatternSelectFunction,PatternFlatTimeoutFunction和PatternTimeoutFunction。
從名字上可以看出,輸出可以分為兩類:select和flatSelect指定輸出一條還是多條,timeoutFunction和不帶timeout的Function指定可不可以對(duì)超時(shí)事件進(jìn)行旁路輸出。?
下圖是輸出的綜合示例代碼:
狀態(tài)存儲(chǔ)優(yōu)化
當(dāng)一個(gè)事件到來時(shí),如果這個(gè)事件同時(shí)符合多個(gè)輸出的結(jié)果集,那么這個(gè)事件是如何保存的?
Flink CEP通過Dewey計(jì)數(shù)法在多個(gè)結(jié)果集中共享同一個(gè)事件副本,以實(shí)現(xiàn)對(duì)事件副本進(jìn)行資源共享。
Flink CEP的擴(kuò)展
本章主要介紹一些Flink CEP的擴(kuò)展,講述如何做到超時(shí)機(jī)制的精確管理,以及規(guī)則的動(dòng)態(tài)加載與更新。
超時(shí)觸發(fā)機(jī)制擴(kuò)展
原生Flink CEP中超時(shí)觸發(fā)的功能可以通過within+outputtag結(jié)合來實(shí)現(xiàn),但是在復(fù)雜的場(chǎng)景下處理存在問題,如下圖所示,在下單事件后還有一個(gè)預(yù)付款事件,想要得到下單并且預(yù)付款后超時(shí)未被接單的訂單,該如何表示呢??
參照下單后超時(shí)未被接單的做法,把下單并且預(yù)付款后超時(shí)未被接單規(guī)則表示為下單.followedBy(預(yù)付款).followedBy(接單).within(time),那么這樣實(shí)現(xiàn)會(huì)存在問題嗎?
這種做法的計(jì)算結(jié)果是會(huì)存在臟數(shù)據(jù)的,因?yàn)檫@個(gè)規(guī)則不僅匹配到了下單并且預(yù)付款后超時(shí)未被接單的訂單(想要的結(jié)果),同樣還匹配到了只有下單行為后超時(shí)未被接單的訂單(臟數(shù)據(jù),沒有預(yù)付款)。原因是因?yàn)槌瑫r(shí)within是控制在整個(gè)規(guī)則上,而不是某一個(gè)狀態(tài)節(jié)點(diǎn)上,所以不論當(dāng)前的狀態(tài)是處在哪個(gè)狀態(tài)節(jié)點(diǎn),超時(shí)后都會(huì)被旁路輸出。
那么就需要考慮能否通過時(shí)間來直接對(duì)狀態(tài)轉(zhuǎn)移做到精確的控制,而不是通過規(guī)則超時(shí)這種曲線救國(guó)的方式。 于是乎,在通過消息觸發(fā)狀態(tài)的轉(zhuǎn)移之外,需要增加通過時(shí)間觸發(fā)狀態(tài)的轉(zhuǎn)移的支持。要實(shí)現(xiàn)此功能,需要在原來的狀態(tài)以及狀態(tài)轉(zhuǎn)移中,增加時(shí)間屬性的概念。如下圖所示,通過wait算子來得到waiting狀態(tài),然后在waiting狀態(tài)上設(shè)置一個(gè)十秒的時(shí)間屬性以定義一個(gè)十秒的時(shí)間窗口。
wait算子對(duì)應(yīng)NFA中的ignore狀態(tài),將在沒有到達(dá)時(shí)間窗口結(jié)束時(shí)間時(shí)自旋,在ComputationState中記錄wait的開始時(shí)間,在NFA的doProcess中,將到來的數(shù)據(jù)與waiting狀態(tài)處理,如果到了waiting的結(jié)束時(shí)間,則進(jìn)行狀態(tài)轉(zhuǎn)移。
上圖中紅色方框中為waiting狀態(tài)設(shè)置了兩條ignore邊:
1.waitingStatus.addIgnore(lastSink,waitingCondition),waitingCondition中的邏輯是獲取當(dāng)前的時(shí)間(支持事件時(shí)間),判斷有沒有超過設(shè)置的waiting閾值,如果超過就把狀態(tài)向后轉(zhuǎn)移。
2.waitingStatus.addIgnore(waitingCondition),waitingCondition中如果未達(dá)到設(shè)置的waiting閾值,就會(huì)自旋在當(dāng)前的waiting狀態(tài)不變。
規(guī)則動(dòng)態(tài)注入
線上運(yùn)行的CEP中肯定經(jīng)常遇到規(guī)則變更的情況,如果每次變更時(shí)都將任務(wù)重啟、重新發(fā)布是非常不優(yōu)雅的。尤其在營(yíng)銷或者風(fēng)控這種對(duì)實(shí)時(shí)性要求比較高的場(chǎng)景,如果規(guī)則窗口過長(zhǎng)(一兩個(gè)星期),狀態(tài)過大,就會(huì)導(dǎo)致重啟時(shí)間延長(zhǎng),期間就會(huì)造成一些想要處理的異常行為不能及時(shí)發(fā)現(xiàn)。
那么要怎么樣做到規(guī)則的動(dòng)態(tài)更新和加載呢?
梳理一下整體架構(gòu),Flink CEP是運(yùn)行在Flink Job里的,而規(guī)則庫(kù)是放在外部存儲(chǔ)中的。首先,需要在運(yùn)行的Job中能及時(shí)發(fā)現(xiàn)外部存儲(chǔ)中規(guī)則的變化,即需要在Job中提供訪問外部庫(kù)的能力。 其次,需要將規(guī)則庫(kù)中變更的規(guī)則動(dòng)態(tài)加載到CEP中,即把外部規(guī)則的描述解析成Flink CEP所能識(shí)別的pattern結(jié)構(gòu)體。最后,把生成的pattern轉(zhuǎn)化成NFA,替換歷史NFA,這樣對(duì)新到來的消息,就會(huì)使用新的規(guī)則進(jìn)行匹配。
下圖就是一個(gè)支持將外部規(guī)則動(dòng)態(tài)注入、更新的接口。
這個(gè)接口里面主要實(shí)現(xiàn)了四個(gè)方法:
- initialize:初始化方法,進(jìn)行外部庫(kù)連接的初始化。
- inject:和外部數(shù)據(jù)庫(kù)交互的主要方法,監(jiān)聽外部庫(kù)變化,獲取最新的規(guī)則并通過Groovy動(dòng)態(tài)加載,返回pattern。
- getPeriod:設(shè)置輪巡周期,在一些比較簡(jiǎn)單的實(shí)時(shí)性要求不高的場(chǎng)景,可以采用輪巡的方式,定期對(duì)外部數(shù)據(jù)庫(kù)進(jìn)行檢測(cè)。
- getNfaKeySelector:和動(dòng)態(tài)更新無關(guān),用來支持一個(gè)流對(duì)應(yīng)多個(gè)規(guī)則組。
歷史匹配結(jié)果清理
新規(guī)則動(dòng)態(tài)加載到Flink CEP的Job中,替換掉原來的NFA之后,還需要對(duì)歷史匹配的結(jié)果集進(jìn)行清理。在AbstractKeyedCEPPatternOperator中實(shí)現(xiàn)刷新NFA,注意,歷史狀態(tài)是否需要清理和業(yè)務(wù)相關(guān):
總結(jié)
使用Flink CEP,熟知其原理是很重要的,特別是NFA的狀態(tài)轉(zhuǎn)移流程,然后再去看源碼中的狀態(tài)圖的構(gòu)建就會(huì)很清晰了。
雙12來襲!500元淘寶紅包、iPhone11等你拿。
https://www.aliyun.com/1212/2019/home?utm_content=g_1000092611
原文鏈接
本文為云棲社區(qū)原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
以上是生活随笔為你收集整理的Apache Flink CEP 实战的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: MaxCompute技术人背后的故事:从
- 下一篇: 威胁快报|新兴挖矿团伙借助shodan作