Apache Flink 零基础入门(十六)Flink DataStream transformation
生活随笔
收集整理的這篇文章主要介紹了
Apache Flink 零基础入门(十六)Flink DataStream transformation
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
Operators transform one or more DataStreams into a new DataStream.?
Operators操作轉換一個或多個DataStream到一個新的DataStream 。
filter function
Scala
object DataStreamTransformationApp {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentfilterFunction(env)env.execute("DataStreamTransformationApp")}def filterFunction(env: StreamExecutionEnvironment): Unit = {val data=env.addSource(new CustomNonParallelSourceFunction)data.map(x=>{println("received:" + x)x}).filter(_%2 == 0).print().setParallelism(1)}}數據源選擇之前的任意一個數據源即可。
這里的map中沒有做任何實質性的操作,filter中將所有的數都對2取模操作,打印結果如下:
received:1 received:2 2 received:3 received:4 4 received:5 received:6 6 received:7 received:8 8說明map中得到的所有的數據,而在filter中進行了過濾操作。
Java
public static void filterFunction(StreamExecutionEnvironment env) {DataStreamSource<Long> data = env.addSource(new JavaCustomParallelSourceFunction());data.setParallelism(1).map(new MapFunction<Long, Long>() {@Overridepublic Long map(Long value) throws Exception {System.out.println("received:"+value);return value;}}).filter(new FilterFunction<Long>() {@Overridepublic boolean filter(Long value) throws Exception {return value % 2==0;}}).print().setParallelism(1);}需要先使用data.setParallelism(1)然后再進行map操作,否則會輸出多次。因為我們用的是JavaCustomParallelSourceFunction(),而當我們使用JavaCustomNonParallelSourceFunction時,默認就是并行度1,可以不用設置。
Union Function
Scala
def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// filterFunction(env)unionFunction(env)env.execute("DataStreamTransformationApp")}def unionFunction(env: StreamExecutionEnvironment): Unit = {val data01 = env.addSource(new CustomNonParallelSourceFunction)val data02 = env.addSource(new CustomNonParallelSourceFunction)data01.union(data02).print().setParallelism(1)}Union操作將兩個數據集綜合起來,可以一同處理,上面打印輸出如下:
1 1 2 2 3 3 4 4Java
public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); // filterFunction(environment);unionFunction(environment);environment.execute("JavaDataStreamTransformationApp");}public static void unionFunction(StreamExecutionEnvironment env) {DataStreamSource<Long> data1 = env.addSource(new JavaCustomNonParallelSourceFunction());DataStreamSource<Long> data2 = env.addSource(new JavaCustomNonParallelSourceFunction());data1.union(data2).print().setParallelism(1);}Split? Select? Function
Scala
split可以將一個流拆成多個流,select可以從多個流中進行選擇處理的流。
def splitSelectFunction(env: StreamExecutionEnvironment): Unit = {val data = env.addSource(new CustomNonParallelSourceFunction)val split = data.split(new OutputSelector[Long] {override def select(value: Long): lang.Iterable[String] = {val list = new util.ArrayList[String]()if (value % 2 == 0) {list.add("even")} else {list.add("odd")}list}})split.select("odd","even").print().setParallelism(1)}可以根據選擇的名稱來處理數據。
Java
public static void splitSelectFunction(StreamExecutionEnvironment env) {DataStreamSource<Long> data = env.addSource(new JavaCustomNonParallelSourceFunction());SplitStream<Long> split = data.split(new OutputSelector<Long>() {@Overridepublic Iterable<String> select(Long value) {List<String> output = new ArrayList<>();if (value % 2 == 0) {output.add("odd");} else {output.add("even");}return output;}});split.select("odd").print().setParallelism(1);}?
總結
以上是生活随笔為你收集整理的Apache Flink 零基础入门(十六)Flink DataStream transformation的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Apache Flink 零基础入门(十
- 下一篇: Apache Flink 零基础入门(十