日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

Apache Flink CEP 实战

發(fā)布時(shí)間:2024/8/23 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Apache Flink CEP 实战 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

本文根據(jù)Apache Flink 實(shí)戰(zhàn)&進(jìn)階篇系列直播課程整理而成,由哈啰出行大數(shù)據(jù)實(shí)時(shí)平臺(tái)資深開發(fā)劉博分享。通過一些簡(jiǎn)單的實(shí)際例子,從概念原理,到如何使用,再到功能的擴(kuò)展,希望能夠給打算使用或者已經(jīng)使用的同學(xué)一些幫助。

主要的內(nèi)容分為如下三個(gè)部分:

  • Flink CEP概念以及使用場(chǎng)景。
  • 如何使用Flink CEP。
  • 如何擴(kuò)展Flink CEP。
  • Flink CEP 概念以及使用場(chǎng)景

    什么是 CEP

    CEP的意思是復(fù)雜事件處理,例如:起床-->洗漱-->吃飯-->上班等一系列串聯(lián)起來的事件流形成的模式稱為CEP。如果發(fā)現(xiàn)某一次起床后沒有刷牙洗臉亦或是吃飯就直接上班,就可以把這種非正常的事件流匹配出來進(jìn)行分析,看看今天是不是起晚了。

    下圖中列出了幾個(gè)例子:

    • 第一個(gè)是異常行為檢測(cè)的例子:假設(shè)車輛維修的場(chǎng)景中,當(dāng)一輛車出現(xiàn)故障時(shí),這輛車會(huì)被送往維修點(diǎn)維修,然后被重新投放到市場(chǎng)運(yùn)行。如果這輛車被投放到市場(chǎng)之后還未被使用就又被報(bào)障了,那么就有可能之前的維修是無效的。
    • 第二個(gè)是策略營(yíng)銷的例子:假設(shè)打車的場(chǎng)景中,用戶在APP上規(guī)劃了一個(gè)行程訂單,如果這個(gè)行程在下單之后超過一定的時(shí)間還沒有被司機(jī)接單的話,那么就需要將這個(gè)訂單輸出到下游做相關(guān)的策略調(diào)整。?
    • 第三個(gè)是運(yùn)維監(jiān)控的例子:通常運(yùn)維會(huì)監(jiān)控服務(wù)器的CPU、網(wǎng)絡(luò)IO等指標(biāo)超過閾值時(shí)產(chǎn)生相應(yīng)的告警。但是在實(shí)際使用中,后臺(tái)服務(wù)的重啟、網(wǎng)絡(luò)抖動(dòng)等情況都會(huì)造成瞬間的流量毛刺,對(duì)非關(guān)鍵鏈路可以忽略這些毛刺而只對(duì)頻繁發(fā)生的異常進(jìn)行告警以減少誤報(bào)。

    Flink CEP 應(yīng)用場(chǎng)景

    • 風(fēng)險(xiǎn)控制:對(duì)用戶異常行為模式進(jìn)行實(shí)時(shí)檢測(cè),當(dāng)一個(gè)用戶發(fā)生了不該發(fā)生的行為,判定這個(gè)用戶是不是有違規(guī)操作的嫌疑。
    • 策略營(yíng)銷:用預(yù)先定義好的規(guī)則對(duì)用戶的行為軌跡進(jìn)行實(shí)時(shí)跟蹤,對(duì)行為軌跡匹配預(yù)定義規(guī)則的用戶實(shí)時(shí)發(fā)送相應(yīng)策略的推廣。
    • 運(yùn)維監(jiān)控:靈活配置多指標(biāo)、多依賴來實(shí)現(xiàn)更復(fù)雜的監(jiān)控模式。

    Flink CEP原理

    Flink CEP內(nèi)部是用NFA(非確定有限自動(dòng)機(jī))來實(shí)現(xiàn)的,由點(diǎn)和邊組成的一個(gè)狀態(tài)圖,以一個(gè)初始狀態(tài)作為起點(diǎn),經(jīng)過一系列的中間狀態(tài),達(dá)到終態(tài)。點(diǎn)分為起始狀態(tài)中間狀態(tài)最終狀態(tài)三種,邊分為takeignoreproceed三種。

    • take:必須存在一個(gè)條件判斷,當(dāng)?shù)絹淼南M足take邊條件判斷時(shí),把這個(gè)消息放入結(jié)果集,將狀態(tài)轉(zhuǎn)移到下一狀態(tài)。
    • ignore:當(dāng)消息到來時(shí),可以忽略這個(gè)消息,將狀態(tài)自旋在當(dāng)前不變,是一個(gè)自己到自己的狀態(tài)轉(zhuǎn)移。?
    • proceed:又叫做狀態(tài)的空轉(zhuǎn)移,當(dāng)前狀態(tài)可以不依賴于消息到來而直接轉(zhuǎn)移到下一狀態(tài)。舉個(gè)例子,當(dāng)用戶購(gòu)買商品時(shí),如果購(gòu)買前有一個(gè)咨詢客服的行為,需要把咨詢客服行為和購(gòu)買行為兩個(gè)消息一起放到結(jié)果集中向下游輸出;如果購(gòu)買前沒有咨詢客服的行為,只需把購(gòu)買行為放到結(jié)果集中向下游輸出就可以了。?也就是說,如果有咨詢客服的行為,就存在咨詢客服狀態(tài)的上的消息保存,如果沒有咨詢客服的行為,就不存在咨詢客服狀態(tài)的上的消息保存,咨詢客服狀態(tài)是由一條proceed邊和下游的購(gòu)買狀態(tài)相連。

    下面以一個(gè)打車的例子來展示狀態(tài)是如何流轉(zhuǎn)的,規(guī)則見下圖所示。

    以乘客制定行程作為開始,匹配乘客的下單事件,如果這個(gè)訂單超時(shí)還沒有被司機(jī)接單的話,就把行程事件和下單事件作為結(jié)果集往下游輸出。

    假如消息到來順序?yàn)?#xff1a;行程-->其他-->下單-->其他。

    狀態(tài)流轉(zhuǎn)如下:

  • 開始時(shí)狀態(tài)處于行程狀態(tài),即等待用戶制定行程。
  • 當(dāng)收到行程事件時(shí),匹配行程狀態(tài)的條件,把行程事件放到結(jié)果集中,通過take邊將狀態(tài)往下轉(zhuǎn)移到下單狀態(tài)
  • 由于下單狀態(tài)上有一條ignore邊,所以可以忽略收到的其他事件,直到收到下單事件時(shí)將其匹配,放入結(jié)果集中,并且將當(dāng)前狀態(tài)往下轉(zhuǎn)移到超時(shí)未接單狀態(tài)。這時(shí)候結(jié)果集當(dāng)中有兩個(gè)事件:制定行程事件和下單事件。?
  • 超時(shí)未接單狀態(tài)時(shí),如果來了一些其他事件,同樣可以被ignore邊忽略,直到超時(shí)事件的觸發(fā),將狀態(tài)往下轉(zhuǎn)移到最終狀態(tài),這時(shí)候整個(gè)模式匹配成功,最終將結(jié)果集中的制定行程事件和下單事件輸出到下游。
  • 上面是一個(gè)匹配成功的例子,如果是不成功的例子會(huì)怎么樣?

    假如當(dāng)狀態(tài)處于超時(shí)未接單狀態(tài)時(shí),收到了一個(gè)接單事件,那么就不符合超時(shí)未被接單的觸發(fā)條件,此時(shí)整個(gè)模式匹配失敗,之前放入結(jié)果集中的行程事件和下單事件會(huì)被清理。

    Flink CEP程序開發(fā)

    本節(jié)將詳細(xì)介紹Flink CEP的程序結(jié)構(gòu)以及API。

    Flink CEP 程序結(jié)構(gòu)

    主要分為兩部分:定義事件模式和匹配結(jié)果處理。

    官方示例如下:

    DataStream<Event> input = ... Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {@Overridepublic boolean filter(Event event) {return event.getId() == 42;}}).next("middle").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {@Overridepublic boolean filter(SubEvent subEvent) {return subEvent.getVolume() >= 10.0;}}).followedBy("end").where(new SimpleCondition<Event>() {@Overridepublic boolean filter(Event event) {return event.getName().equals("end");}});PatternStream<Event> patternStream = CEP.pattern(input, pattern);DataStream<Alert> result = patternStream.select(new PatternProcessFunction<Event, Alert>() {@Overridepublic void select(Map<String, List<Event>> pattern,Context ctx,Collector<Alert> out) throws Exception {out.collect(createAlertFrom(pattern));}});

    程序結(jié)構(gòu)分為三部分:首先需要定義一個(gè)模式(Pattern),即第2行代碼所示,接著把定義好的模式綁定在DataStream上(第25行),最后就可以在具有CEP功能的DataStream上將匹配的結(jié)果進(jìn)行處理(第27行)。?

    下面對(duì)關(guān)鍵部分做詳細(xì)講解:

    定義模式:上面示例中,分為了三步,首先匹配一個(gè)ID為42的事件,接著匹配一個(gè)體積大于等于10的事件,最后等待收到一個(gè)name等于end的事件。?
    匹配結(jié)果輸出:此部分,需要重點(diǎn)注意select函數(shù)(第30行,注:本文基于Flink 1.7版本)里邊的Map類型的pattern參數(shù),Key是一個(gè)pattern的name,它的取值是模式定義中的Begin節(jié)點(diǎn)start,或者是接下來next里面的middle,或者是第三個(gè)步驟的end。后面的map中的value是每一步發(fā)生的匹配事件。因在每一步中是可以使用循環(huán)屬性的,可以匹配發(fā)生多次,所以map中的value是匹配發(fā)生多次的所有事件的一個(gè)集合。

    Flink CEP構(gòu)成

    上圖中,藍(lán)色方框代表的是一個(gè)個(gè)單獨(dú)的模式;淺黃色的橢圓代表的是這個(gè)模式上可以添加的屬性,包括模式可以發(fā)生的循環(huán)次數(shù),或者這個(gè)模式是貪婪的還是可選的;橘色的橢圓代表的是模式間的關(guān)系,定義了多個(gè)模式之間是怎么樣串聯(lián)起來的。通過定義模式,添加相應(yīng)的屬性,將多個(gè)模式串聯(lián)起來三步,就可以構(gòu)成了一個(gè)完整的Flink CEP程序。

    定義模式

    下面是示例代碼:

    pattern.next("start").where(new SimpleCondition<Event>() {@Overridepublic boolean filter(Event event) {return event.getId() == 42;}} )

    定義模式主要有如下5個(gè)部分組成:

    pattern:前一個(gè)模式
    next/followedBy/...:開始一個(gè)新的模式
    start:模式名稱
    where:模式的內(nèi)容
    filter:核心處理邏輯

    模式的屬性

    接下來介紹一下怎樣設(shè)置模式的屬性。模式的屬性主要分為循環(huán)屬性可選屬性

    循環(huán)屬性可以定義模式匹配發(fā)生固定次數(shù)(times),匹配發(fā)生一次以上(oneOrMore),匹配發(fā)生多次以上。(timesOrMore)。

    可選屬性可以設(shè)置模式是貪婪的(greedy),即匹配最長(zhǎng)的串,或設(shè)置為可選的(optional),有則匹配,無則忽略。

    模式的有效期

    由于模式的匹配事件存放在狀態(tài)中進(jìn)行管理,所以需要設(shè)置一個(gè)全局的有效期(within)。 若不指定有效期,匹配事件會(huì)一直保存在狀態(tài)中不會(huì)被清除。至于有效期能開多大,要依據(jù)具體使用場(chǎng)景和數(shù)據(jù)量來衡量,關(guān)鍵要看匹配的事件有多少,隨著匹配的事件增多,新到達(dá)的消息遍歷之前的匹配事件會(huì)增加CPU、內(nèi)存的消耗,并且隨著狀態(tài)變大,數(shù)據(jù)傾斜也會(huì)越來越嚴(yán)重。

    模式間的聯(lián)系

    主要分為三種:嚴(yán)格連續(xù)性(next/notNext),寬松連續(xù)性(followedBy/notFollowedBy),和非確定寬松連續(xù)性(followedByAny)。

    三種模式匹配的差別見下表所示:

    模式&數(shù)據(jù)流嚴(yán)格連續(xù)性寬松連續(xù)性非確定寬松連續(xù)性
    Pattern(A B) Streaming('a','c','b1','b2')不匹配匹配 輸出:a,b1匹配 輸出:a,b1 a,b2

    總結(jié)如下:

    • 嚴(yán)格連續(xù)性:需要消息的順序到達(dá)與模式完全一致。
    • 寬松連續(xù)性:允許忽略不匹配的事件。
    • 非確定寬松連性:不僅可以忽略不匹配的事件,也可以忽略已經(jīng)匹配的事件。

    多模式組合

    除了前面提到的模式定義和模式間的聯(lián)系,還可以把相連的多個(gè)模式組合在一起看成一個(gè)模式組,類似于視圖,可以在這個(gè)模式視圖上進(jìn)行相關(guān)操作。

    上圖這個(gè)例子里面,首先匹配了一個(gè)登錄事件,然后接下來匹配瀏覽,下單,購(gòu)買這三個(gè)事件反復(fù)發(fā)生三次的用戶。?

    如果沒有模式組的話,代碼里面瀏覽,下單,購(gòu)買要寫三次。有了模式組,只需把瀏覽,下單,購(gòu)買這三個(gè)事件當(dāng)做一個(gè)模式組,把相應(yīng)的屬性加上times(3)就可以了。

    處理結(jié)果

    處理匹配的結(jié)果主要有四個(gè)接口: PatternFlatSelectFunction,PatternSelectFunction,PatternFlatTimeoutFunction和PatternTimeoutFunction。

    從名字上可以看出,輸出可以分為兩類:select和flatSelect指定輸出一條還是多條,timeoutFunction和不帶timeout的Function指定可不可以對(duì)超時(shí)事件進(jìn)行旁路輸出。?

    下圖是輸出的綜合示例代碼:

    狀態(tài)存儲(chǔ)優(yōu)化

    當(dāng)一個(gè)事件到來時(shí),如果這個(gè)事件同時(shí)符合多個(gè)輸出的結(jié)果集,那么這個(gè)事件是如何保存的?

    Flink CEP通過Dewey計(jì)數(shù)法在多個(gè)結(jié)果集中共享同一個(gè)事件副本,以實(shí)現(xiàn)對(duì)事件副本進(jìn)行資源共享。

    Flink CEP的擴(kuò)展

    本章主要介紹一些Flink CEP的擴(kuò)展,講述如何做到超時(shí)機(jī)制的精確管理,以及規(guī)則的動(dòng)態(tài)加載與更新。

    超時(shí)觸發(fā)機(jī)制擴(kuò)展

    原生Flink CEP中超時(shí)觸發(fā)的功能可以通過within+outputtag結(jié)合來實(shí)現(xiàn),但是在復(fù)雜的場(chǎng)景下處理存在問題,如下圖所示,在下單事件后還有一個(gè)預(yù)付款事件,想要得到下單并且預(yù)付款后超時(shí)未被接單的訂單,該如何表示呢??

    參照下單后超時(shí)未被接單的做法,把下單并且預(yù)付款后超時(shí)未被接單規(guī)則表示為下單.followedBy(預(yù)付款).followedBy(接單).within(time),那么這樣實(shí)現(xiàn)會(huì)存在問題嗎?

    這種做法的計(jì)算結(jié)果是會(huì)存在臟數(shù)據(jù)的,因?yàn)檫@個(gè)規(guī)則不僅匹配到了下單并且預(yù)付款后超時(shí)未被接單的訂單(想要的結(jié)果),同樣還匹配到了只有下單行為后超時(shí)未被接單的訂單(臟數(shù)據(jù),沒有預(yù)付款)。原因是因?yàn)槌瑫r(shí)within是控制在整個(gè)規(guī)則上,而不是某一個(gè)狀態(tài)節(jié)點(diǎn)上,所以不論當(dāng)前的狀態(tài)是處在哪個(gè)狀態(tài)節(jié)點(diǎn),超時(shí)后都會(huì)被旁路輸出。

    那么就需要考慮能否通過時(shí)間來直接對(duì)狀態(tài)轉(zhuǎn)移做到精確的控制,而不是通過規(guī)則超時(shí)這種曲線救國(guó)的方式。 于是乎,在通過消息觸發(fā)狀態(tài)的轉(zhuǎn)移之外,需要增加通過時(shí)間觸發(fā)狀態(tài)的轉(zhuǎn)移的支持。要實(shí)現(xiàn)此功能,需要在原來的狀態(tài)以及狀態(tài)轉(zhuǎn)移中,增加時(shí)間屬性的概念。如下圖所示,通過wait算子來得到waiting狀態(tài),然后在waiting狀態(tài)上設(shè)置一個(gè)十秒的時(shí)間屬性以定義一個(gè)十秒的時(shí)間窗口。

    wait算子對(duì)應(yīng)NFA中的ignore狀態(tài),將在沒有到達(dá)時(shí)間窗口結(jié)束時(shí)間時(shí)自旋,在ComputationState中記錄wait的開始時(shí)間,在NFA的doProcess中,將到來的數(shù)據(jù)與waiting狀態(tài)處理,如果到了waiting的結(jié)束時(shí)間,則進(jìn)行狀態(tài)轉(zhuǎn)移。

    上圖中紅色方框中為waiting狀態(tài)設(shè)置了兩條ignore邊:

    1.waitingStatus.addIgnore(lastSink,waitingCondition),waitingCondition中的邏輯是獲取當(dāng)前的時(shí)間(支持事件時(shí)間),判斷有沒有超過設(shè)置的waiting閾值,如果超過就把狀態(tài)向后轉(zhuǎn)移。
    2.waitingStatus.addIgnore(waitingCondition),waitingCondition中如果未達(dá)到設(shè)置的waiting閾值,就會(huì)自旋在當(dāng)前的waiting狀態(tài)不變。

    規(guī)則動(dòng)態(tài)注入

    線上運(yùn)行的CEP中肯定經(jīng)常遇到規(guī)則變更的情況,如果每次變更時(shí)都將任務(wù)重啟、重新發(fā)布是非常不優(yōu)雅的。尤其在營(yíng)銷或者風(fēng)控這種對(duì)實(shí)時(shí)性要求比較高的場(chǎng)景,如果規(guī)則窗口過長(zhǎng)(一兩個(gè)星期),狀態(tài)過大,就會(huì)導(dǎo)致重啟時(shí)間延長(zhǎng),期間就會(huì)造成一些想要處理的異常行為不能及時(shí)發(fā)現(xiàn)。

    那么要怎么樣做到規(guī)則的動(dòng)態(tài)更新和加載呢?

    梳理一下整體架構(gòu),Flink CEP是運(yùn)行在Flink Job里的,而規(guī)則庫(kù)是放在外部存儲(chǔ)中的。首先,需要在運(yùn)行的Job中能及時(shí)發(fā)現(xiàn)外部存儲(chǔ)中規(guī)則的變化,即需要在Job中提供訪問外部庫(kù)的能力。 其次,需要將規(guī)則庫(kù)中變更的規(guī)則動(dòng)態(tài)加載到CEP中,即把外部規(guī)則的描述解析成Flink CEP所能識(shí)別的pattern結(jié)構(gòu)體。最后,把生成的pattern轉(zhuǎn)化成NFA,替換歷史NFA,這樣對(duì)新到來的消息,就會(huì)使用新的規(guī)則進(jìn)行匹配。

    下圖就是一個(gè)支持將外部規(guī)則動(dòng)態(tài)注入、更新的接口。

    這個(gè)接口里面主要實(shí)現(xiàn)了四個(gè)方法:

    • initialize:初始化方法,進(jìn)行外部庫(kù)連接的初始化。
    • inject:和外部數(shù)據(jù)庫(kù)交互的主要方法,監(jiān)聽外部庫(kù)變化,獲取最新的規(guī)則并通過Groovy動(dòng)態(tài)加載,返回pattern。
    • getPeriod:設(shè)置輪巡周期,在一些比較簡(jiǎn)單的實(shí)時(shí)性要求不高的場(chǎng)景,可以采用輪巡的方式,定期對(duì)外部數(shù)據(jù)庫(kù)進(jìn)行檢測(cè)。
    • getNfaKeySelector:和動(dòng)態(tài)更新無關(guān),用來支持一個(gè)流對(duì)應(yīng)多個(gè)規(guī)則組。

    歷史匹配結(jié)果清理

    新規(guī)則動(dòng)態(tài)加載到Flink CEP的Job中,替換掉原來的NFA之后,還需要對(duì)歷史匹配的結(jié)果集進(jìn)行清理。在AbstractKeyedCEPPatternOperator中實(shí)現(xiàn)刷新NFA,注意,歷史狀態(tài)是否需要清理和業(yè)務(wù)相關(guān):

  • 修改的邏輯對(duì)規(guī)則中事件的匹配沒有影響,保留歷史結(jié)果集中的狀態(tài)。
  • 修改的邏輯影響到了之前匹配的部分,需要將之前匹配的結(jié)果集中的狀態(tài)數(shù)據(jù)清除,防止錯(cuò)誤的輸出。
  • 總結(jié)

    使用Flink CEP,熟知其原理是很重要的,特別是NFA的狀態(tài)轉(zhuǎn)移流程,然后再去看源碼中的狀態(tài)圖的構(gòu)建就會(huì)很清晰了。

    雙12來襲!500元淘寶紅包、iPhone11等你拿。
    https://www.aliyun.com/1212/2019/home?utm_content=g_1000092611

    原文鏈接
    本文為云棲社區(qū)原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。

    總結(jié)

    以上是生活随笔為你收集整理的Apache Flink CEP 实战的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。