日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flink-CEP快速入门

發布時間:2023/12/10 编程问答 52 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink-CEP快速入门 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Flink-CEP快速入門
更新時間:2022-09-12 10:58:28發布時間:2小時前朗讀
文章目錄
0. 簡介 & 使用步驟
簡介
使用步驟

  • 模式API(Pattern API:匹配規則)
    單個模式
    量詞
    條件
    限定子類型
    簡單條件(SimpleCondition)
    迭代條件(IterativeCondition)
    組合條件
    終止條件
    模式操作列舉
    組合模式
    連續性
    循環模式中的近鄰條件
    模式組
    匹配后跳過策略

  • 檢測模式(檢測滿足規則的復雜事件)
    將模式應用到流上
    處理匹配事件
    匹配事件的選擇提取(select)
    PatternSelectFunction
    PatternFlatSelectFunction
    匹配事件的通用處理(process)
    處理超時事件
    Maven

  • 簡介 & 使用步驟 簡介
    所謂 CEP,其實就是“復雜事件處理(Complex Event Processing)”的縮寫;而 Flink CEP,就是 Flink 實現的一個用于復雜事件處理的庫(library)
    把事件流中的一個個簡單事件,通過一定的規則匹配組合起來,這就是“復雜事件”;然后基于這些滿足規則的一組組復雜事件進行轉換處理,得到想要的結果進行輸出
    使用步驟
    復雜事件處理(CEP)的流程可以分成三個步驟:
    定義一個匹配規則
    將匹配規則應用到事件流上,檢測滿足規則的復雜事件
    對檢測到的復雜事件進行處理,得到結果進行輸出
    // 實體類
    public class LoginEvent {
    public String userId;
    public String ipAddress;
    public String eventType;
    public Long timestamp;

    public LoginEvent(String userId, String ipAddress, String eventType, Long timestamp) {
    this.userId = userId;
    this.ipAddress = ipAddress;
    this.eventType = eventType;
    this.timestamp = timestamp;
    }
    }

  • // CEP Demo
    public class Demo003 {
    public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 獲取登錄事件流,并提取時間戳、生成水位線SingleOutputStreamOperator<loginevent> sourceData = env.fromElements(new LoginEvent("user_1", "192.168.0.1", "fail", 2000L),new LoginEvent("user_1", "192.168.0.2", "fail", 3000L),new LoginEvent("user_2", "192.168.1.29", "fail", 4000L),new LoginEvent("user_1", "171.56.23.10", "fail", 5000L),new LoginEvent("user_2", "192.168.1.29", "success", 6000L),new LoginEvent("user_2", "192.168.1.29", "fail", 7000L),new LoginEvent("user_2", "192.168.1.29", "fail", 8000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<loginevent>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<loginevent>() {<!-- -->@Overridepublic long extractTimestamp(LoginEvent loginEvent, long l) {<!-- -->return loginEvent.timestamp;}}));// 1. 定義一個匹配規則:定義 Pattern,連續的三個登錄失敗事件Pattern<loginevent, loginevent=""> pattern = Pattern.<loginevent>begin("first") // 以第一個登錄失敗事件開始.where(new IterativeCondition<loginevent>() {<!-- -->@Overridepublic boolean filter(LoginEvent loginEvent, Context<loginevent> context) throws Exception {<!-- -->return "fail".equals(loginEvent.eventType);}}).next("second") // 接著是第二個登錄失敗事件.where(new IterativeCondition<loginevent>() {<!-- -->@Overridepublic boolean filter(LoginEvent loginEvent, Context<loginevent> context) throws Exception {<!-- -->return "fail".equals(loginEvent.eventType);}}).next("third") // 接著是第三個登錄失敗事件.where(new IterativeCondition<loginevent>() {<!-- -->@Overridepublic boolean filter(LoginEvent loginEvent, Context<loginevent> context) throws Exception {<!-- -->return "fail".equals(loginEvent.eventType);}});// 2. 將 Pattern 應用到流上,檢測匹配的復雜事件,得到一個 PatternStreamPatternStream<loginevent> cepPattern = CEP.pattern(sourceData.keyBy(loginEvent -> loginEvent.userId), pattern);// 3. 對檢測到的復雜事件進行處理:將匹配到的復雜事件選擇出來,然后包裝成字符串SingleOutputStreamOperator<string> select = cepPattern.select(new PatternSelectFunction<loginevent, string="">() {<!-- -->@Overridepublic String select(Map<string, list<loginevent="">> map) throws Exception {<!-- -->LoginEvent first = map.get("first").get(0);LoginEvent second = map.get("second").get(0);LoginEvent third = map.get("third").get(0);return first.userId + " 連續三次登錄失敗!登錄時間:" + first.timestamp + ", " + second.timestamp + ", " + third.timestamp;}});select.print();env.execute(); }

    }
    </string,></loginevent,></loginevent,>

  • 模式API(Pattern API:匹配規則) 單個模式
    一個模式可以是一個單例或者循環模式。單例模式只接受一個事件,循環模式可以接受多個事件。 在模式匹配表達式中,模式"a b+ c? d"(或者"a",后面跟著一個或者多個"b",再往后可選擇的跟著一個"c",最后跟著一個"d"), a,c?,和 d都是單例模式,b+是一個循環模式
    量詞
    單個模式后面可以跟一個“量詞”,用來指定循環的次數,單個模式可以包括“單例(singleton)模式”和“循環(looping)模式”,默認是“單例(singleton)模式”,當定義了量詞之后,就變成了“循環模式”,可以匹配接收多個事件
    循環模式的方法:
  • .oneOrMore()
    匹配事件出現一次或多次,假設 a 是一個個體模式,a.oneOrMore()表示可以匹配 1 個或多個 a 的事件組合。我們有時會用 a+來簡單表示
    .times(times)
    匹配事件發生特定次數(times),例如 a.times(3)表示 aaa
    .times(fromTimes,toTimes)
    指定匹配事件出現的次數范圍,最小次數為fromTimes,最大次數為toTimes。例如a.times(2, 4)可以匹配 aa,aaa 和 aaaa
    .greedy()
    只能用在循環模式后,使當前循環模式變得“貪心”(greedy),也就是總是盡可能多地去匹配。例如 a.times(2, 4).greedy(),如果出現了連續 4 個 a,那么會直接把 aaaa 檢測出來進行處理,其他任意 2 個 a 是不算匹配事件的
    .optional()
    使當前模式成為可選的,也就是說可以滿足這個匹配條件,也可以不滿足
    // 期望出現4次
    start.times(4);

    // 期望出現0或者4次
    start.times(4).optional();

    // 期望出現2、3或者4次
    start.times(2, 4);

    // 期望出現2、3或者4次,并且盡可能的重復次數多
    start.times(2, 4).greedy();

    // 期望出現0、2、3或者4次
    start.times(2, 4).optional();

    // 期望出現0、2、3或者4次,并且盡可能的重復次數多
    start.times(2, 4).optional().greedy();

    // 期望出現1到多次
    start.oneOrMore();

    // 期望出現1到多次,并且盡可能的重復次數多
    start.oneOrMore().greedy();

    // 期望出現0到多次
    start.oneOrMore().optional();

    // 期望出現0到多次,并且盡可能的重復次數多
    start.oneOrMore().optional().greedy();

    // 期望出現2到多次
    start.timesOrMore(2);

    // 期望出現2到多次,并且盡可能的重復次數多
    start.timesOrMore(2).greedy();

    // 期望出現0、2或多次
    start.timesOrMore(2).optional();

    // 期望出現0、2或多次,并且盡可能的重復次數多
    start.timesOrMore(2).optional().greedy();
    條件 限定子類型
    調用.subtype()方法可以為當前模式增加子類型限制條件

    // 這里 SubEvent 是流中數據類型 Event 的子類型。只有事件是 SubEvent 類型時,才可以滿足當前模式 pattern 的匹配條件
    pattern.subtype(SubEvent.class);
    簡單條件(SimpleCondition)
    簡單條件是最簡單的匹配規則,只根據當前事件的特征來決定是否接受它。這在本質上其實就是一個 filter 操作

    start.where(new SimpleCondition() {
    @Override
    public boolean filter(MyEvent myEvent) throws Exception {
    return … // 一些判斷條件
    }
    })

    迭代條件(IterativeCondition)
    在實際應用中,我們可能需要將當前事件跟之前的事件做對比,才能判斷出要不要接受當前事件。這種需要依靠之前事件來做判斷的條件,就叫作“迭代條件”(Iterative Condition)

    Pattern.begin(“first”)
    .where(new IterativeCondition() {
    @Override
    public boolean filter(MyEvent myEvent, Context context) throws Exception {
    if (!“event1001”.equals(myEvent.getEvent())) {
    return false;
    }

    // 根據上下文獲取 之前的事件,獲取之前滿足條件的Iterable<myevent> myEventIterable = context.getEventsForPattern("first");// TODO 處理 之前的事件return ... // 一些判斷條件} });


    組合條件
    可以多個條件一起使用,當有多個判斷邏輯的時候我們可能會用if-else的方式,但組合條件可以在 where()方法后繼續接or()方法來組合使用

    Pattern<myevent, myevent=“”> pattern = Pattern.begin(“first”)
    .where(new IterativeCondition() {
    @Override
    public boolean filter(MyEvent myEvent, Context context) throws Exception {
    return … // 一些判斷條件
    }
    }).or(new IterativeCondition() {
    @Override
    public boolean filter(MyEvent myEvent, Context context) throws Exception {
    return … // 一些判斷條件
    }
    });
    </myevent,>
    終止條件
    終止條件的定義是通過調用模式對象的.until()方法來實現的

    ??終止條件只與oneOrMore()或者oneOrMore().optional()結合使用

    Pattern<myevent, myevent=“”> pattern = Pattern.begin(“first”)
    .where(new IterativeCondition() {
    @Override
    public boolean filter(MyEvent myEvent, Context context) throws Exception {
    return … // 一些判斷條件
    }
    }).oneOrMore()
    .until(new IterativeCondition() {
    @Override
    public boolean filter(MyEvent myEvent, Context context) throws Exception {
    return … // 一些判斷條件
    }
    });
    </myevent,>
    模式操作列舉
    模式操作 描述
    where(condition) 為當前模式定義一個條件。為了匹配這個模式,一個事件必須滿足某些條件。 多個連續的where()語句取與組成判斷條件:java pattern.where(new IterativeCondition() { @Override public boolean filter(Event value, Context ctx) throws Exception { return … // 一些判斷條件 } });
    or(condition) 增加一個新的判斷,和當前的判斷取或。一個事件只要滿足至少一個判斷條件就匹配到模式:java pattern.where(new IterativeCondition() { @Override public boolean filter(Event value, Context ctx) throws Exception { return … // 一些判斷條件 } }).or(new IterativeCondition() { @Override public boolean filter(Event value, Context ctx) throws Exception { return … // 替代條件 } });
    until(condition) 為循環模式指定一個停止條件。意思是滿足了給定的條件的事件出現后,就不會再有事件被接受進入模式了。只適用于和oneOrMore()同時使用。NOTE: 在基于事件的條件中,它可用于清理對應模式的狀態。java pattern.oneOrMore().until(new IterativeCondition() { @Override public boolean filter(Event value, Context ctx) throws Exception { return … // 替代條件 } });
    subtype(subClass) 為當前模式定義一個子類型條件。一個事件只有是這個子類型的時候才能匹配到模式:java pattern.subtype(SubEvent.class);
    oneOrMore() 指定模式期望匹配到的事件至少出現一次。.默認(在子事件間)使用松散的內部連續性。 NOTE: 推薦使用until()或者within()來清理狀態。java pattern.oneOrMore();
    timesOrMore(#times) 指定模式期望匹配到的事件至少出現**#times次。.默認(在子事件間)使用松散的內部連續性。 java pattern.timesOrMore(2);
    times(#ofTimes) 指定模式期望匹配到的事件正好出現的次數。默認(在子事件間)使用松散的內部連續性。 java pattern.times(2);
    times(#fromTimes, #toTimes) 指定模式期望匹配到的事件出現次數在
    #fromTimes和#toTimes**之間。默認(在子事件間)使用松散的內部連續性。 java pattern.times(2, 4);
    optional() 指定這個模式是可選的,也就是說,它可能根本不出現。這對所有之前提到的量詞都適用。java pattern.oneOrMore().optional();
    greedy() 指定這個模式是貪心的,也就是說,它會重復盡可能多的次數。這只對量詞適用,現在還不支持模式組。java pattern.oneOrMore().greedy();
    組合模式 連續性
    將多個個體模式組合起來的完整模式,就叫作“組合模式”

    FlinkCEP支持事件之間如下形式的連續策略:

    嚴格連續: next()期望所有匹配的事件嚴格的一個接一個出現,中間沒有任何不匹配的事件。
    松散連續: followedBy()忽略匹配的事件之間的不匹配的事件。
    不確定的松散連續: followedByAny()更進一步的松散連續,允許忽略掉一些匹配事件的附加匹配。
    notNext():如果不想后面直接連著一個特定事件
    notFollowedBy(),如果不想一個特定事件發生在兩個事件之間的任何地方
    // 嚴格連續
    Pattern<event, ?=“”> strict = start.next(“middle”).where(…);

    // 松散連續
    Pattern<event, ?=“”> relaxed = start.followedBy(“middle”).where(…);

    // 不確定的松散連續
    Pattern<event, ?=“”> nonDetermin = start.followedByAny(“middle”).where(…);

    // 嚴格連續的NOT模式
    Pattern<event, ?=“”> strictNot = start.notNext(“not”).where(…);

    // 松散連續的NOT模式
    Pattern<event, ?=“”> relaxedNot = start.notFollowedBy(“not”).where(…);
    </event,></event,></event,></event,></event,>
    within()方法:指定一個模式應該在一定時間內發生

    // 在十秒鐘內,從 event1001 開始到 event1004 結束才算
    Pattern<myevent, myevent=“”> pattern = Pattern.begin(“first”)
    .where(new IterativeCondition() {
    @Override
    public boolean filter(MyEvent myEvent, Context context) throws Exception {
    return “event1001”.equals(myEvent.getEvent());
    }
    })
    .followedBy(“second”)
    .where(new IterativeCondition() {
    @Override
    public boolean filter(MyEvent myEvent, Context context) throws Exception {
    return “event1004”.equals(myEvent.getEvent());
    }
    })
    .within(Time.seconds(10L));
    </myevent,>
    循環模式中的近鄰條件
    oneOrMore()、times()等循環模式的默認是松散連續,也就是followedBy()模式

    .consecutive():在oneOrMore()、times()等循環模式后面跟上consecutive()表示嚴格連續(next())

    // 1. 定義 Pattern,登錄失敗事件,循環檢測 3 次
    Pattern<loginevent, loginevent=“”> pattern = Pattern
    .begin(“fails”)
    .where(new SimpleCondition() {
    @Override
    public boolean filter(LoginEvent loginEvent) throws Exception {
    return loginEvent.eventType.equals(“fail”);
    }
    }).times(3).consecutive();
    </loginevent,>
    .allowCombinations():在oneOrMore()、times()等循環模式后面跟上allowCombinations()表示不確定的松散連續(followedByAny())

    模式組
    也可以定義一個模式序列作為begin,followedBy,followedByAny和next的條件。這個模式序列在邏輯上會被當作匹配的條件, 并且返回一個GroupPattern,可以在GroupPattern上使用oneOrMore(),times(#ofTimes), times(#fromTimes, #toTimes),optional(),consecutive(),allowCombinations()。

    Pattern<event, ?=“”> start = Pattern.begin(
    Pattern.begin(“start”).where(…).followedBy(“start_middle”).where(…)
    );

    // 嚴格連續
    Pattern<event, ?=“”> strict = start.next(
    Pattern.begin(“next_start”).where(…).followedBy(“next_middle”).where(…)
    ).times(3);

    // 松散連續
    Pattern<event, ?=“”> relaxed = start.followedBy(
    Pattern.begin(“followedby_start”).where(…).followedBy(“followedby_middle”).where(…)
    ).oneOrMore();

    // 不確定松散連續
    Pattern<event, ?=“”> nonDetermin = start.followedByAny(
    Pattern.begin(“followedbyany_start”).where(…).followedBy(“followedbyany_middle”).where(…)
    ).optional();
    </event,></event,></event,></event,>
    模式操作 描述
    begin(#name) 定義一個開始的模式:java Pattern start = Pattern.begin(“start”);
    begin(#pattern_sequence) 定義一個開始的模式:java Pattern start = Pattern.begin( Pattern.begin(“start”).where(…).followedBy(“middle”).where(…) );
    next(#name) 增加一個新的模式。匹配的事件必須是直接跟在前面匹配到的事件后面(嚴格連續):java Pattern next = start.next(“middle”);
    next(#pattern_sequence) 增加一個新的模式。匹配的事件序列必須是直接跟在前面匹配到的事件后面(嚴格連續):java Pattern next = start.next( Pattern.begin(“start”).where(…).followedBy(“middle”).where(…) );
    followedBy(#name) 增加一個新的模式。可以有其他事件出現在匹配的事件和之前匹配到的事件中間(松散連續):java Pattern followedBy = start.followedBy(“middle”);
    followedBy(#pattern_sequence) 增加一個新的模式。可以有其他事件出現在匹配的事件序列和之前匹配到的事件中間(松散連續):java Pattern followedBy = start.followedBy( Pattern.begin(“start”).where(…).followedBy(“middle”).where(…) );
    followedByAny(#name) 增加一個新的模式。可以有其他事件出現在匹配的事件和之前匹配到的事件中間, 每個可選的匹配事件都會作為可選的匹配結果輸出(不確定的松散連續):java Pattern followedByAny = start.followedByAny(“middle”);
    followedByAny(#pattern_sequence) 增加一個新的模式。可以有其他事件出現在匹配的事件序列和之前匹配到的事件中間, 每個可選的匹配事件序列都會作為可選的匹配結果輸出(不確定的松散連續):java Pattern followedByAny = start.followedByAny( Pattern.begin(“start”).where(…).followedBy(“middle”).where(…) );
    notNext() 增加一個新的否定模式。匹配的(否定)事件必須直接跟在前面匹配到的事件之后(嚴格連續)來丟棄這些部分匹配:java Pattern notNext = start.notNext(“not”);
    notFollowedBy() 增加一個新的否定模式。即使有其他事件在匹配的(否定)事件和之前匹配的事件之間發生, 部分匹配的事件序列也會被丟棄(松散連續):java Pattern notFollowedBy = start.notFollowedBy(“not”);
    within(time) 定義匹配模式的事件序列出現的最大時間間隔。如果未完成的事件序列超過了這個事件,就會被丟棄:java pattern.within(Time.seconds(10));
    匹配后跳過策略
    對于一個給定的模式,同一個事件可能會分配到多個成功的匹配上。為了控制一個事件會分配到多少個匹配上,你需要指定跳過策略AfterMatchSkipStrategy。 有五種跳過策略,如下:

    NO_SKIP: 不跳過
    SKIP_TO_NEXT: 跳至下一個
    SKIP_PAST_LAST_EVENT: 跳過所有子匹配
    SKIP_TO_FIRST: 跳至第一個
    SKIP_TO_LAST: 跳至最后一個
    例如,給定一個模式b+ c和一個數據流b1 b2 b3 c,不同跳過策略之間的不同如下:

    跳過策略 結果 描述
    NO_SKIP b1 b2 b3 c
    b2 b3 c
    b3 c 找到匹配b1 b2 b3 c之后,不會丟棄任何結果。
    SKIP_TO_NEXT b1 b2 b3 c
    b2 b3 c
    b3 c 找到匹配b1 b2 b3 c之后,不會丟棄任何結果,因為沒有以b1開始的其他匹配。
    SKIP_PAST_LAST_EVENT b1 b2 b3 c 找到匹配b1 b2 b3 c之后,會丟棄其他所有的部分匹配。
    SKIP_TO_FIRST[b] b1 b2 b3 c
    b2 b3 c
    b3 c 找到匹配b1 b2 b3 c之后,會嘗試丟棄所有在b1之前開始的部分匹配,但沒有這樣的匹配,所以沒有任何匹配被丟棄。
    SKIP_TO_LAST[b] b1 b2 b3 c
    b3 c 找到匹配b1 b2 b3 c之后,會嘗試丟棄所有在b3之前開始的部分匹配,有一個這樣的b2 b3 c被丟棄。
    方法 描述
    AfterMatchSkipStrategy.noSkip() 創建NO_SKIP策略
    AfterMatchSkipStrategy.skipToNext() 創建SKIP_TO_NEXT策略
    AfterMatchSkipStrategy.skipPastLastEvent() 創建SKIP_PAST_LAST_EVENT策略
    AfterMatchSkipStrategy.skipToFirst(patternName) 創建引用模式名稱為patternName的SKIP_TO_FIRST策略
    AfterMatchSkipStrategy.skipToLast(patternName) 創建引用模式名稱為patternName的SKIP_TO_LAST策略
    skipToNext

    // 配置跳過策略:skipToNext模式
    AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipToNext();
    // 將跳過策略加入到模式中
    Pattern<myevent, myevent=“”> pattern = Pattern.begin(“first”, skipStrategy)
    .where(new IterativeCondition() {
    @Override
    public boolean filter(MyEvent myEvent, Context context) throws Exception {
    return … // 一些判斷條件
    }
    });
    </myevent,>
    skipToFirst(patternName)

    // 配置跳過策略:skipToFirst模式,參數傳模式名稱
    AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipToFirst(“first”);
    Pattern<myevent, myevent=“”> pattern = Pattern.begin(“first”, skipStrategy)
    .where(new IterativeCondition() {
    @Override
    public boolean filter(MyEvent myEvent, Context context) throws Exception {
    return “event1001”.equals(myEvent.getEvent());
    }
    }).oneOrMore()
    .followedBy(“second”)
    .where(new IterativeCondition() {
    @Override
    public boolean filter(MyEvent myEvent, Context context) throws Exception {
    return “event1003”.equals(myEvent.getEvent());
    }
    })
    .followedBy(“thrid”)
    .where(new IterativeCondition() {
    @Override
    public boolean filter(MyEvent myEvent, Context context) throws Exception {
    return “event1004”.equals(myEvent.getEvent());
    }
    });
    </myevent,>
    2. 檢測模式(檢測滿足規則的復雜事件) 將模式應用到流上
    調用 CEP 類的靜態方法.pattern(),將數據流(DataStream)和模式(Pattern)作為兩個參數傳入
    DataStream,也可以通過 keyBy 進行按鍵分區得到 KeyedStream,接下來對復雜事件的檢測就會針對不同的 key 單獨進行了
    DataStream inputStream = …
    Pattern<event, ?=“”> pattern = …
    PatternStream patternStream = CEP.pattern(inputStream, pattern);
    </event,>
    處理匹配事件 匹配事件的選擇提取(select) PatternSelectFunction
    處理匹配事件最簡單的方式,就是從 PatternStream 中直接把匹配的復雜事件提取出來,包裝成想要的信息輸出,這個操作就是“選擇”(select)

    Pattern.begin(“first”).where(…);

    // 處理匹配事件
    cepPattern.select(new PatternSelectFunction<myevent, string=“”>() {
    @Override
    public String select(Map<string, list<myevent=“”>> map) throws Exception {
    // first 是 Pattern 的 name 字符串
    List first = map.get(“first”);
    return … // 處理匹配事件邏輯
    }
    });
    </string,></myevent,>
    PatternFlatSelectFunction
    .flatSelect(),傳入的參數是一個PatternFlatSelectFunction。這是 PatternSelectFunction 的“扁平化”版本;內部需要實現一個 flatSelect()方法,

    它與之前 select()的不同就在于沒有返回值,而是多了一個收集器(Collector)參數 collector,通過調用 collector.collet()方法就可以實現多次發送輸出數據了

    cepPattern.flatSelect(new PatternFlatSelectFunction<myevent, string=“”>() {
    @Override
    public void flatSelect(Map<string, list<myevent=“”>> map, Collector collector) throws Exception {
    // 處理匹配事件邏輯
    }
    });
    </string,></myevent,>
    匹配事件的通用處理(process)
    自 1.8 版本之后,Flink CEP 引入了對于匹配事件的通用檢測處理方式,那就是直接調用PatternStream 的.process()方法,傳入一個 PatternProcessFunction。這看起來就像是我們熟悉的處理函數(process function),它也可以訪問一個上下文(Context),進行更多的操作。

    PatternProcessFunction 功能更加豐富、調用更加靈活,可以完全覆蓋其他接口,也就成為了目前官方推薦的處理方式。事實上,PatternSelectFunction 和 PatternFlatSelectFunction在 CEP 內部執行時也會被轉換成 PatternProcessFunction

    Context context:上下文

    collector.collect():調用此方法實現發送輸出數據

    cepPattern.process(new PatternProcessFunction<myevent, string=“”>() {
    @Override
    public void processMatch(Map<string, list<myevent=“”>> map, Context context, Collector collector) throws Exception {
    // 處理匹配事件邏輯
    }
    });
    </string,></myevent,>
    處理超時事件
    在 Flink CEP 中 , 提 供 了 一 個 專 門 捕 捉 超 時 的 部 分 匹 配 事 件 的 接 口 , 叫 作TimedOutPartialMatchHandler。這個接口需要實現一個 processTimedOutMatch()方法,可以將超時的、已檢測到的部分匹配事件放在一個 Map 中,作為方法的第一個參數;方法的第二個參數則是 PatternProcessFunction 的上下文 Context。所以這個接口必須與 PatternProcessFunction結合使用,對處理結果的輸出則需要利用側輸出流來進行

    PatternStream cepPattern = CEP.pattern(myEventData.keyBy(myEvent -> myEvent.getUserId()), pattern);
    // 測流
    OutputTag outputTag = new OutputTag(“time_out”){};
    // 超時數據處理
    SingleOutputStreamOperator processData = cepPattern.process(new MyPatternProcessFunction());

    // 數據處理,處理匹配成功數據,處理超時數據
    public static class MyPatternProcessFunction extends PatternProcessFunction<myevent, string=“”>
    implements TimedOutPartialMatchHandler {

    @Override public void processMatch(Map<string, list<myevent="">> map, Context context, Collector<string> collector) throws Exception {<!-- -->// 匹配成功邏輯處理 }@Override public void processTimedOutMatch(Map<string, list<myevent="">> map, Context context) throws Exception {<!-- -->// 超時邏輯處理,將數據寫入到測輸出流中OutputTag<string> outputTag = new OutputTag<string>("time_out"){<!-- -->};String str = ... // 邏輯處理context.output(outputTag, str); }

    }
    </string,></string,></myevent,>
    Maven

    <flink.version>1.13.0</flink.version>
    <java.version>1.8</java.version>
    <scala.binary.version>2.12</scala.binary.version>
    <slf4j.version>1.7.30</slf4j.version>

    org.apache.flink flink-cep_${scala.binary.version} ${flink.version} org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} org.apache.flink flink-clients_${scala.binary.version} ${flink.version} org.apache.flink flink-connector-kafka_${scala.binary.version} ${flink.version}

    文章轉自:Flink-CEP快速入門_Java-答學網

    作者:答學網,轉載請注明原文鏈接:http://www.dxzl8.com/

    總結

    以上是生活随笔為你收集整理的Flink-CEP快速入门的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。