日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Apache Flink 零基础入门(十六)Flink DataStream transformation

發布時間:2024/9/16 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Apache Flink 零基础入门(十六)Flink DataStream transformation 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Operators transform one or more DataStreams into a new DataStream.?

Operators操作轉換一個或多個DataStream到一個新的DataStream 。

filter function

Scala

object DataStreamTransformationApp {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentfilterFunction(env)env.execute("DataStreamTransformationApp")}def filterFunction(env: StreamExecutionEnvironment): Unit = {val data=env.addSource(new CustomNonParallelSourceFunction)data.map(x=>{println("received:" + x)x}).filter(_%2 == 0).print().setParallelism(1)}}

數據源選擇之前的任意一個數據源即可。

這里的map中沒有做任何實質性的操作,filter中將所有的數都對2取模操作,打印結果如下:

received:1 received:2 2 received:3 received:4 4 received:5 received:6 6 received:7 received:8 8

說明map中得到的所有的數據,而在filter中進行了過濾操作。

Java

public static void filterFunction(StreamExecutionEnvironment env) {DataStreamSource<Long> data = env.addSource(new JavaCustomParallelSourceFunction());data.setParallelism(1).map(new MapFunction<Long, Long>() {@Overridepublic Long map(Long value) throws Exception {System.out.println("received:"+value);return value;}}).filter(new FilterFunction<Long>() {@Overridepublic boolean filter(Long value) throws Exception {return value % 2==0;}}).print().setParallelism(1);}

需要先使用data.setParallelism(1)然后再進行map操作,否則會輸出多次。因為我們用的是JavaCustomParallelSourceFunction(),而當我們使用JavaCustomNonParallelSourceFunction時,默認就是并行度1,可以不用設置。

Union Function

Scala

def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// filterFunction(env)unionFunction(env)env.execute("DataStreamTransformationApp")}def unionFunction(env: StreamExecutionEnvironment): Unit = {val data01 = env.addSource(new CustomNonParallelSourceFunction)val data02 = env.addSource(new CustomNonParallelSourceFunction)data01.union(data02).print().setParallelism(1)}

Union操作將兩個數據集綜合起來,可以一同處理,上面打印輸出如下:

1 1 2 2 3 3 4 4

Java

public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); // filterFunction(environment);unionFunction(environment);environment.execute("JavaDataStreamTransformationApp");}public static void unionFunction(StreamExecutionEnvironment env) {DataStreamSource<Long> data1 = env.addSource(new JavaCustomNonParallelSourceFunction());DataStreamSource<Long> data2 = env.addSource(new JavaCustomNonParallelSourceFunction());data1.union(data2).print().setParallelism(1);}

Split? Select? Function

Scala

split可以將一個流拆成多個流,select可以從多個流中進行選擇處理的流。

def splitSelectFunction(env: StreamExecutionEnvironment): Unit = {val data = env.addSource(new CustomNonParallelSourceFunction)val split = data.split(new OutputSelector[Long] {override def select(value: Long): lang.Iterable[String] = {val list = new util.ArrayList[String]()if (value % 2 == 0) {list.add("even")} else {list.add("odd")}list}})split.select("odd","even").print().setParallelism(1)}

可以根據選擇的名稱來處理數據。

Java

public static void splitSelectFunction(StreamExecutionEnvironment env) {DataStreamSource<Long> data = env.addSource(new JavaCustomNonParallelSourceFunction());SplitStream<Long> split = data.split(new OutputSelector<Long>() {@Overridepublic Iterable<String> select(Long value) {List<String> output = new ArrayList<>();if (value % 2 == 0) {output.add("odd");} else {output.add("even");}return output;}});split.select("odd").print().setParallelism(1);}

?

總結

以上是生活随笔為你收集整理的Apache Flink 零基础入门(十六)Flink DataStream transformation的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。