flink中的java匿名函数修改为实名函数
生活随笔
收集整理的這篇文章主要介紹了
flink中的java匿名函数修改为实名函数
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
[1]中有這么個代碼
DataStream<Tuple2<String, Long>> keyedStream = env.addSource(consumer).flatMap(new MessageSplitter()).keyBy(0).timeWindow(Time.seconds(2)).apply(new WindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple, TimeWindow>() {public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<Tuple2<String, Long>> out) throws Exception {long sum = 0L;int count = 0;for (Tuple2<String, Long> record: input) {sum += record.f1;count++;}Tuple2<String, Long> result = input.iterator().next();result.f1 = sum / count;out.collect(result);}});改為:
DataStream<Tuple2<String, Long>>keyedStream = env.addSource(consumer).flatMap(new MessageSplitter()).keyBy(0).timeWindow(Time.seconds(2)).apply(new WindowFunction2());?
WindowFunction2.java如下:
import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;public class WindowFunction2 implements WindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple, TimeWindow>{ @Override public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<Tuple2<String, Long>> out) throws Exception {long sum = 0L;int count = 0;System.out.printf("count=%d",count);System.out.println("----------------------------1----------------------------------------------------");for (Tuple2<String, Long> record: input) {sum += record.f1;count++;}Tuple2<String, Long> result = input.iterator().next();result.f1 = sum / count;out.collect(result);} }這樣頂層文件KafkaMessageStreaming.java中使用WindowFunction2就會顯得簡潔很多
?
Reference:
[1]Flink 讀取Kafka數據示例
總結
以上是生活随笔為你收集整理的flink中的java匿名函数修改为实名函数的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: flink读不到kafka数据问题
- 下一篇: SecureCRT报错ImportErr