日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

flink 3-转换

發(fā)布時間:2024/10/8 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 flink 3-转换 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
轉(zhuǎn)換 transformation
map

一對一,輸入為一,輸出為一

import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class MapTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dataStreamSource = env.readTextFile("xxx\hello.txt");SingleOutputStreamOperator<String> map = dataStreamSource.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {return value.split("\t")[0];}});map.print();env.execute("map test");} }

flatMap

一對多,一行進,多行出

import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;public class FlatMapTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dataStreamSource = env.readTextFile("xxx\hello.txt");SingleOutputStreamOperator<String> flatMap = dataStreamSource.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {for (String word : value.split(" ")) {out.collect(word);}}});flatMap.print();env.execute("flatMap test");} }

filter

設置過濾條件

import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;public class FilterTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dataStreamSource = env.readTextFile("xxx\hello.txt");SingleOutputStreamOperator<String[]> filter = dataStreamSource.map(text -> text.split(" ")).filter(text -> text[0].equals("001"));SingleOutputStreamOperator<String> map = filter.map(text -> text[1]);map.addSink(new PrintSinkFunction<>());env.execute("filter test");} }

keyBy

聚合,dataStream中用keyBy, dataSet中用groupBy

import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; import org.apache.flink.util.Collector;public class KeyByTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<String> dataStreamSource = env.readTextFile("xxx\hello.txt");SingleOutputStreamOperator<Tuple2<String, Integer>> sum = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] strings = value.split(" ");for (String field : strings) {out.collect(new Tuple2<>(field, 1));}}}).keyBy(0).sum(1);sum.addSink(new PrintSinkFunction<>());env.execute("keyBy test");} }

reduce
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;public class ReduceTeset {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dataStreamSource = env.readTextFile("D:\\ifeng\\flinkTest\\src\\main\\java\\com\\ifeng\\zgx\\exercise\\data\\hello.txt");SingleOutputStreamOperator<Tuple2<String, Integer>> reduce = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] strings = value.split(" ");for (String str : strings) {out.collect(new Tuple2<>(str, 1));}}}).keyBy(0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {return new Tuple2<>(value2.f0, value1.f1 + value2.f1);}});reduce.print();env.execute("reduce test");} }

hello.txt

ref

flink transformations
flink Basic API

總結

以上是生活随笔為你收集整理的flink 3-转换的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。