大数据之flink定时器
生活随笔
收集整理的這篇文章主要介紹了
大数据之flink定时器
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
一、ProcessFunction的使用
1、沒有進(jìn)行keyBy
package cn._51doit.flink.day07;import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector;public class NonKeyedProcessFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);//對(duì)NonKeyedDataStream調(diào)用ProcessSingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.process(new ProcessFunction<String, Tuple2<String, Integer>>() {@Overridepublic void processElement(String line, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = line.split(" ");for (String word : words) {if (!word.equals("error")) {out.collect(Tuple2.of(word, 1));}}}});wordAndOne.print();env.execute();} }2、有keyBy
package cn._51doit.flink.day07;import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector;/****/ public class KeyedProcessFunctionDemo {public static void main(String[] args) throws Exception{//創(chuàng)建Flink流計(jì)算執(zhí)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(10000);//設(shè)置重啟策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));//創(chuàng)建DataStream//SourceDataStreamSource<String> lines = env.socketTextStream("localhost", 8888);//調(diào)用Transformation開始//調(diào)用TransformationSingleOutputStreamOperator<Tuple3<String, String, Double>> tpDataStream = lines.map(new MapFunction<String, Tuple3<String, String, Double>>() {@Overridepublic Tuple3<String, String, Double> map(String value) throws Exception {String[] fields = value.split(",");return Tuple3.of(fields[0], fields[1], Double.parseDouble(fields[2]));}});KeyedStream<Tuple3<String, String, Double>, String> keyedStream = tpDataStream.keyBy(t -> t.f0);//對(duì)KeyedDataStream調(diào)用process方法,可以獲取KeyedStateSingleOutputStreamOperator<Tuple3<String, String, Double>> result = keyedStream.process(new KeyedProcessFunction<String, Tuple3<String, String, Double>, Tuple3<String, String, Double>>() {private transient MapState<String, Double> mapState;@Overridepublic void open(Configuration parameters) throws Exception {//定義一個(gè)狀態(tài)描述器MapStateDescriptor<String, Double> stateDescriptor = new MapStateDescriptor<String, Double>("kv-state", String.class, Double.class);//初始化或恢復(fù)歷史狀態(tài)mapState = getRuntimeContext().getMapState(stateDescriptor);}@Overridepublic void processElement(Tuple3<String, String, Double> value, Context ctx, Collector<Tuple3<String, String, Double>> out) throws Exception {String city = value.f1;Double money = value.f2;Double historyMoney = mapState.get(city);if (historyMoney == null) {historyMoney = 0.0;}Double totalMoney = historyMoney + money; //累加//更新到state中mapState.put(city, totalMoney);//輸出value.f2 = totalMoney;out.collect(value);}});result.print();//啟動(dòng)執(zhí)行env.execute("StreamingWordCount");} }二、定時(shí)器
1、基本使用
package cn._51doit.flink.day07;import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector;public class ProcessingTimeTimerDemo {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//spark,1//spark,2//hadoop,1DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String line) throws Exception {String[] fields = line.split(",");return Tuple2.of(fields[0], Integer.parseInt(fields[1]));}});KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndCount.keyBy(t -> t.f0);keyed.process(new KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>>() {@Overridepublic void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {//獲取當(dāng)前的ProcessingTimelong currentProcessingTime = ctx.timerService().currentProcessingTime();//System.out.println("當(dāng)前時(shí)間:" + currentProcessingTime + ",定時(shí)器觸發(fā)的時(shí)間:" + (currentProcessingTime + 30000));//將當(dāng)前的ProcessingTime + 30 秒,注冊(cè)一個(gè)定時(shí)器ctx.timerService().registerProcessingTimeTimer(currentProcessingTime + 30000);}//當(dāng)鬧鐘到了指定的時(shí)間,就執(zhí)行onTimer方法@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Integer>> out) throws Exception {System.out.println("定時(shí)器執(zhí)行了:" + timestamp);}}).print();env.execute();} }2、先把數(shù)據(jù)攢起來(lái),滿足條件了再輸出
package cn._51doit.flink.day07;import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector;public class ProcessingTimeTimerDemo02 {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//spark,1//spark,2//hadoop,1DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String line) throws Exception {String[] fields = line.split(",");return Tuple2.of(fields[0], Integer.parseInt(fields[1]));}});KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndCount.keyBy(t -> t.f0);keyed.process(new KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>>() {private transient ValueState<Integer> counter;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("wc-state", Integer.class);counter = getRuntimeContext().getState(stateDescriptor);}@Overridepublic void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {//獲取當(dāng)前的ProcessingTimelong currentProcessingTime = ctx.timerService().currentProcessingTime();long fireTime = currentProcessingTime - currentProcessingTime % 60000 + 60000;//下一分鐘//如果注冊(cè)相同數(shù)據(jù)的TimeTimer,后面的會(huì)將前面的覆蓋,即相同的timeTimer只會(huì)觸發(fā)一次ctx.timerService().registerProcessingTimeTimer(fireTime);Integer currentCount = value.f1;Integer historyCount = counter.value();if(historyCount == null) {historyCount = 0;}Integer totalCount = historyCount + currentCount;//更新狀態(tài)counter.update(totalCount);}//當(dāng)鬧鐘到了指定的時(shí)間,就執(zhí)行onTimer方法@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Integer>> out) throws Exception {//定時(shí)器觸發(fā),輸出當(dāng)前的結(jié)果Integer value = counter.value();String currentKey = ctx.getCurrentKey();//輸出key,Value//如果想要實(shí)現(xiàn)類似滾動(dòng)窗口,不累加類似數(shù)據(jù),只是累加當(dāng)前窗口的數(shù)據(jù),就清空狀態(tài)//counter.update(0);out.collect(Tuple2.of(currentKey, value));}}).print();env.execute();} }三、測(cè)流輸出 / 旁路輸出
1、獲取不同類型的數(shù)據(jù),打上不同的標(biāo)簽
package cn._51doit.flink.day07;import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag;public class SideOutputDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);//奇數(shù)OutputTag<String> oddOutputTag = new OutputTag<String>("odd") {};//偶數(shù)OutputTag<String> evenOutputTag = new OutputTag<String>("even") {};//非數(shù)字OutputTag<String> nanOutputTag = new OutputTag<String>("nan") {};SingleOutputStreamOperator<String> mainStream = lines.process(new ProcessFunction<String, String>() {@Overridepublic void processElement(String value, Context ctx, Collector<String> out) throws Exception {try {int i = Integer.parseInt(value);if (i % 2 == 0) {//偶數(shù)ctx.output(evenOutputTag, value);} else {//奇數(shù)ctx.output(oddOutputTag, value);}} catch (NumberFormatException e) {ctx.output(nanOutputTag, value);}//在主流中輸出全部的數(shù)據(jù)out.collect(value);}});//偶數(shù)DataStream<String> evenStream = mainStream.getSideOutput(evenOutputTag);//奇數(shù)DataStream<String> oddStream = mainStream.getSideOutput(oddOutputTag);oddStream.print("odd: ");evenStream.print("even: ");mainStream.print("main: ");env.execute();} }2、使用側(cè)流輸出獲取窗口遲到的數(shù)據(jù)
package cn._51doit.flink.day07;import org.apache.flink.api.common.eventtime.IngestionTimeAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.*; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag;import java.time.Duration;//使用側(cè)流輸出獲取窗口遲到的數(shù)據(jù) public class WindowLateDateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());//1000,spark,3//1200,spark,5//2000,hadoop,2//socketTextStream返回的DataStream并行度為1DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<String> dataWithWaterMark = lines.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner((line, timestamp) -> Long.parseLong(line.split(",")[0])));SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = dataWithWaterMark.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] fields = value.split(",");return Tuple2.of(fields[1], Integer.parseInt(fields[2]));}});//調(diào)用keyByKeyedStream<Tuple2<String, Integer>, String> keyed = wordAndCount.keyBy(t -> t.f0);OutputTag<Tuple2<String, Integer>> lateDataTag = new OutputTag<Tuple2<String, Integer>>("late-data") {};//NonKeyd Window: 不調(diào)用KeyBy,然后調(diào)用windowAll方法,傳入windowAssinger// Keyd Window: 先調(diào)用KeyBy,然后調(diào)用window方法,傳入windowAssingerWindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed = keyed.window(TumblingEventTimeWindows.of(Time.seconds(5)));.sideOutputLataData(lateDataTag); //將遲到數(shù)據(jù)打上標(biāo)簽SingleOutputStreamOperator<Tuple2<String, Integer>> summed = windowed.sum(1);summed.print();DataStream<Tuple2<String, Integer>> lataDataStream=summed.getSideOutput(lateDataTag); //從主流當(dāng)中獲取遲到數(shù)據(jù)lataDataStream.print("lata-Data: ");env.execute();}}四、WindowFunction 使用
1、窗口內(nèi)增量聚合,且與歷史數(shù)據(jù)聚合
package cn._51oit.flink.day07;import org.apache.flink.api.common.eventtime.IngestionTimeAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.*; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag;import java.time.Duration;//使用側(cè)流輸出獲取窗口遲到的數(shù)據(jù) public class WindowLateDateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());//1000,spark,3//1200,spark,5//2000,hadoop,2//socketTextStream返回的DataStream并行度為1DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<String> dataWithWaterMark = lines.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner((line, timestamp) -> Long.parseLong(line.split(",")[0])));SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = dataWithWaterMark.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] fields = value.split(",");return Tuple2.of(fields[1], Integer.parseInt(fields[2]));}});//調(diào)用keyByKeyedStream<Tuple2<String, Integer>, String> keyed = wordAndCount.keyBy(t -> t.f0);//NonKeyd Window: 不調(diào)用KeyBy,然后調(diào)用windowAll方法,傳入windowAssinger//Keyd Window: 先調(diào)用KeyBy,然后調(diào)用window方法,傳入windowAssingerWindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed = keyed.window(TumblingEventTimeWindows.of(Time.seconds(5)));//如果直接調(diào)用sum或reduce,只會(huì)聚合窗口內(nèi)的數(shù)據(jù),不去跟歷史數(shù)據(jù)進(jìn)行累加//需求:可以在窗口內(nèi)進(jìn)行增量聚合,并且還可以與歷史數(shù)據(jù)進(jìn)行聚合SingleOutputStreamOperator<Tuple2<String, Integer>> result = windowed.reduce(new MyReduceFunc(), new MyWindowFunc());result.print();env.execute();}public static class MyReduceFunc implements ReduceFunction<Tuple2<String, Integer>> {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {value1.f1 = value1.f1 + value2.f1;return value1;}}public static class MyWindowFunc extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> {private transient ValueState<Integer> sumState;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("wc-state", Integer.class);sumState = getRuntimeContext().getState(stateDescriptor);}@Overridepublic void process(String key, Context context, Iterable<Tuple2<String, Integer>> elements, Collector<Tuple2<String, Integer>> out) throws Exception {Integer historyCount = sumState.value();if (historyCount == null) {historyCount = 0;}//獲取到窗口聚合后輸出的結(jié)果Tuple2<String, Integer> tp = elements.iterator().next();Integer windowCount = tp.f1;Integer totalCount = historyCount + windowCount;//更新狀態(tài)sumState.update(totalCount);tp.f1 = totalCount;//輸出out.collect(tp);}} }2、aggregate結(jié)合WindowFunction
package cn._51doit.flink.day07;import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag;import java.time.Duration;//累加當(dāng)前窗口的數(shù)據(jù),并與歷史數(shù)據(jù)進(jìn)行累加 public class WindowAggregateFunctionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.enableCheckpointing(10000);//1000,spark,3//1200,spark,5//2000,hadoop,2//socketTextStream返回的DataStream并行度為1DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<String> dataWithWaterMark = lines.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner((line, timestamp) -> Long.parseLong(line.split(",")[0])));SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = dataWithWaterMark.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] fields = value.split(",");return Tuple2.of(fields[1], Integer.parseInt(fields[2]));}});//調(diào)用keyByKeyedStream<Tuple2<String, Integer>, String> keyed = wordAndCount.keyBy(t -> t.f0);OutputTag<Tuple2<String, Integer>> lateDataTag = new OutputTag<Tuple2<String, Integer>>("late-data") {};//NonKeyd Window: 不調(diào)用KeyBy,然后調(diào)用windowAll方法,傳入windowAssinger// Keyd Window: 先調(diào)用KeyBy,然后調(diào)用window方法,傳入windowAssingerWindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed = keyed.window(TumblingEventTimeWindows.of(Time.seconds(5)));//如果直接調(diào)用sum或reduce,只會(huì)聚合窗口內(nèi)的數(shù)據(jù),不去跟歷史數(shù)據(jù)進(jìn)行累加//需求:可以在窗口內(nèi)進(jìn)行增量聚合,并且還可以與歷史數(shù)據(jù)進(jìn)行聚合SingleOutputStreamOperator<Tuple2<String, Integer>> result = windowed.aggregate(new MyAggFunc(), new MyWindowFunc());result.print();env.execute();}private static class MyAggFunc implements AggregateFunction<Tuple2<String, Integer>, Integer, Integer> {//創(chuàng)建一個(gè)初始值@Overridepublic Integer createAccumulator() {return 0;}//數(shù)據(jù)一條數(shù)據(jù),與初始值或中間累加的結(jié)果進(jìn)行聚合@Overridepublic Integer add(Tuple2<String, Integer> value, Integer accumulator) {return value.f1 + accumulator;}//返回的結(jié)果@Overridepublic Integer getResult(Integer accumulator) {return accumulator;}//如果使用的是非SessionWindow,可以不實(shí)現(xiàn)@Overridepublic Integer merge(Integer a, Integer b) {return null;}}private static class MyWindowFunc extends ProcessWindowFunction<Integer, Tuple2<String, Integer>, String, TimeWindow> {private transient ValueState<Integer> sumState;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("wc-state", Integer.class);sumState = getRuntimeContext().getState(stateDescriptor);}@Overridepublic void process(String key, Context context, Iterable<Integer> elements, Collector<Tuple2<String, Integer>> out) throws Exception {Integer historyCount = sumState.value();if (historyCount == null) {historyCount = 0;}//獲取到窗口聚合后輸出的結(jié)果Integer windowCount = elements.iterator().next();Integer totalCount = historyCount + windowCount;//更新狀態(tài)sumState.update(totalCount);//輸出out.collect(Tuple2.of(key, totalCount));}} }3、ProcessWindowFunction使用
package cn._51doit.flink.day07;import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag;import java.time.Duration;//累加當(dāng)前串口的數(shù)據(jù),并與歷史數(shù)據(jù)進(jìn)行累加 public class WindowProcessFunctionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.enableCheckpointing(10000);//1000,spark,3//1200,spark,5//2000,hadoop,2//socketTextStream返回的DataStream并行度為1DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<String> dataWithWaterMark = lines.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner((line, timestamp) -> Long.parseLong(line.split(",")[0])));SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = dataWithWaterMark.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] fields = value.split(",");return Tuple2.of(fields[1], Integer.parseInt(fields[2]));}});//調(diào)用keyByKeyedStream<Tuple2<String, Integer>, String> keyed = wordAndCount.keyBy(t -> t.f0);//NonKeyd Window: 不調(diào)用KeyBy,然后調(diào)用windowAll方法,傳入windowAssinger// Keyd Window: 先調(diào)用KeyBy,然后調(diào)用window方法,傳入windowAssingerWindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed = keyed.window(TumblingEventTimeWindows.of(Time.seconds(5)));//如果直接調(diào)用sum或reduce,只會(huì)聚合窗口內(nèi)的數(shù)據(jù),不去跟歷史數(shù)據(jù)進(jìn)行累加//需求:可以在窗口內(nèi)進(jìn)行增量聚合,并且還可以與歷史數(shù)據(jù)進(jìn)行聚合SingleOutputStreamOperator<Tuple2<String, Integer>> result = windowed.process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {//窗口觸發(fā),才會(huì)調(diào)用process方法,該方法可以獲取窗口內(nèi)的全量獲取窗口的數(shù)據(jù),數(shù)據(jù)是緩存到windowstate中的@Overridepublic void process(String s, Context context, Iterable<Tuple2<String, Integer>> elements, Collector<Tuple2<String, Integer>> out) throws Exception {for (Tuple2<String, Integer> element : elements) {out.collect(element);}}});result.print();env.execute();}}總結(jié)
以上是生活随笔為你收集整理的大数据之flink定时器的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: QT各提示框使用
- 下一篇: 杭电acm的第1000题c语言解法