【2】flink数据流转换算子
【README】
本文記錄了flink對數據的轉換操作,包括
本文使用的flink為 1.14.4 版本;maven依賴如下:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.14.4</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.14.4</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.14.4</version></dependency>本文部分內容參考了 flink 官方文檔:
概覽 | Apache Flink算子 # 用戶通過算子能將一個或多個 DataStream 轉換成新的 DataStream,在應用程序中可以將多個數據轉換算子合并成一個復雜的數據流拓撲。這部分內容將描述 Flink DataStream API 中基本的數據轉換 API,數據轉換后各種數據分區方式,以及算子的鏈接策略。數據流轉換 # Map # DataStream → DataStream # 輸入一個元素同時輸出一個元素。下面是將輸入流中元素數值加倍的 map function:Java DataStream dataStream = //... dataStream.map(new MapFunction() { @Override public Integer map(Integer value) throws Exception { return 2 * value; } }); Scala dataStream.map { x => x * 2 } Python data_stream = env.from_collection(collection=[1, 2, 3, 4, 5]) data_stream.map(lambda x: 2 * x, output_type=Types.,>https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/datastream/operators/overview/
【1】基本轉換算子
包括 map-轉換, flatMap-打散,filter-過濾;
1)代碼如下:
/*** @Description flink對數據流的基本轉換* @author xiao tang* @version 1.0.0* @createTime 2022年04月15日*/ public class TransformTest1_Base {public static void main(String[] args) throws Exception {// 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 從kafka讀取數據DataStream<String> baseStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor.txt");// 1-map-轉換或映射或函數; 把string轉為長度輸出 // DataStream<Integer> mapStream = baseStream.map(x->x.length());DataStream<Integer> mapStream = baseStream.map(String::length);// 2-flatMap-打散-按照逗號分割字段DataStream<String> flatMapStream = baseStream.flatMap((String raw, Collector<String> collector)->{for (String rd : raw.split(",")) {collector.collect(rd);}}).returns(Types.STRING);// 3-filter-過濾-篩選 sensor_1 開頭的結束DataStream<String> filterStream = baseStream.filter(x->x.startsWith("sensor_1"));// 打印輸出mapStream.print("mapStream");flatMapStream.print("flatMapStream");filterStream.print("filterStream");// 執行env.execute("BaseTransformStreamJob");} }sensor 文本文件如下:
sensor_1,12341561,36.1 sensor_2,12341562,33.5 sensor_3,12341563,39.9 sensor_1,12341573,43.1打印結果:
mapStream> 22
flatMapStream> sensor_1
flatMapStream> 12341561
flatMapStream> 36.1
filterStream> sensor_1,12341561,36.1
mapStream> 22
flatMapStream> sensor_2
flatMapStream> 12341562
flatMapStream> 33.5
mapStream> 22
flatMapStream> sensor_3
flatMapStream> 12341563
flatMapStream> 39.9
mapStream> 22
flatMapStream> sensor_1
flatMapStream> 12341573
flatMapStream> 43.1
filterStream> sensor_1,12341573,43.1
【2】滾動聚合算子
keyBy算子-根據key對數據流分組,因為聚合前必須前分組,類似于sql的group by;
keyBy算子的作用:
- 邏輯把一個數據流拆分為多個分區(但還是同一個流),每個分區包含相同key(相同hash)的元素,底層對key求hash來實現;
- 在邏輯上將流劃分為不相交的分區。具有相同 key 的記錄都分配到同一個分區。在內部, keyBy() 是通過哈希分區實現的。
keyBy可以形成 KeyedStream;
然后滾動聚合算子可以對 KeyStream 進行操作,滾動聚合算子如下:
- sum
- min
- max
- minBy
- maxBy
【2.1】代碼示例
/*** @Description 滾動聚合算子* @author xiao tang* @version 1.0.0* @createTime 2022年04月15日*/ public class TransformTest2_RollingAgg {public static void main(String[] args) throws Exception {// 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 從kafka讀取數據DataStream<String> inputStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor.txt");// 轉換為 SensorReader pojo類型DataStream<SensorReading> sensorStream = inputStream.map(x -> {String[] arr = x.split(",");return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));});// keyBy算子對數據流分組,并做滾動聚合(單字段分組)KeyedStream<SensorReading, String> keyedStream = sensorStream.keyBy(SensorReading::getId);// keyBy 多字段分組 // KeyedStream<SensorReading, Tuple1<String>> keyedStream = sensorStream.keyBy(new KeySelector<SensorReading, Tuple1<String>>() { // @Override // public Tuple1<String> getKey(SensorReading sensorReading) throws Exception { // return Tuple1.of(sensorReading.getId()); // } // });// max聚合DataStream<SensorReading> maxTempratureStream = keyedStream.max("temperature");// maxBy 聚合DataStream<SensorReading> maxbyTempratureStream = keyedStream.maxBy("temperature");// 打印輸出maxTempratureStream.print("maxTempratureStream");// 打印輸出maxbyTempratureStream.print("maxbyTempratureStream");// 執行env.execute("maxTempratureStreamJob");} }sensor文本內容:
sensor_1,11,36.1 sensor_2,21,33.1 sensor_1,12,36.2 sensor_1,13,36.3 sensor_2,22,33.2max聚合打印結果:
max> SensorRd{id='sensor_1', timestamp=11, temperature=36.1}
max> SensorRd{id='sensor_2', timestamp=21, temperature=33.1}
max> SensorRd{id='sensor_1', timestamp=11, temperature=36.2}
max> SensorRd{id='sensor_1', timestamp=11, temperature=36.3}
max> SensorRd{id='sensor_2', timestamp=21, temperature=33.2}
maxBy聚合打印結果:
maxBy> SensorRd{id='sensor_1', timestamp=11, temperature=36.1}
maxBy> SensorRd{id='sensor_2', timestamp=21, temperature=33.1}
maxBy> SensorRd{id='sensor_1', timestamp=12, temperature=36.2}
maxBy> SensorRd{id='sensor_1', timestamp=13, temperature=36.3}
maxBy> SensorRd{id='sensor_2', timestamp=22, temperature=33.2}
小結,max與maxBy區別:
- max:把聚合字段(最大溫度值)取出來,其他字段和第一條記錄保持一致;
- maxBy:把聚合字段(最大溫度值)取出來,且連同最大溫度值所在記錄的其他字段一并取出來;
同理 min與minby,本文不再演示;
補充: 聚合時要先分組,可以根據單字段分組,也可以根據多個字段分組;
上述代碼注釋部分給出了多個字段分組的例子,一個組記錄稱為Tuple,元組;
1個字段叫 Tuple1,2個字段叫Tuple2;....
【2.2】規約聚合-reduce
定義:
在相同 key 的數據流上“滾動”執行 reduce。將當前元素與最后一次 reduce 得到的值組合然后輸出新值。
場景:根據sensorid分組后,形成keyedStream,然后查詢最大溫度,且最新時間戳;即多個聚合算子;
代碼
/*** @Description reduce規約聚合算子 * @author xiao tang* @version 1.0.0* @createTime 2022年04月15日*/ public class TransformTest3_Reduce {public static void main(String[] args) throws Exception {// 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 從kafka讀取數據DataStream<String> inputStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor.txt");// 轉換為 SensorReader pojo類型DataStream<SensorReading> sensorStream = inputStream.map(x -> {String[] arr = x.split(",");return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));});// keyBy算子對數據流分組,并做滾動聚合(單字段分組)KeyedStream<SensorReading, String> keyedStream = sensorStream.keyBy(SensorReading::getId);// reduce規約聚合-查詢最大溫度,且最新時間戳DataStream<SensorReading> reduceStream = keyedStream.reduce((a,b)->new SensorReading(a.getId(), Math.max(a.getTimestamp(),b.getTimestamp()), Math.max(a.getTemperature(),b.getTemperature())));// 打印輸出reduceStream.print("reduceStream");// 執行env.execute("reduceStreamJob");} }sensor文本:
sensor_1,11,36.1 sensor_2,21,33.1 sensor_1,32,36.2 sensor_1,23,30.3 sensor_2,22,31.2打印結果:
reduceStream> SensorRd{id='sensor_1', timestamp=11, temperature=36.1}
reduceStream> SensorRd{id='sensor_2', timestamp=21, temperature=33.1}
reduceStream> SensorRd{id='sensor_1', timestamp=32, temperature=36.2}
reduceStream> SensorRd{id='sensor_1', timestamp=32, temperature=36.2}
reduceStream> SensorRd{id='sensor_2', timestamp=22, temperature=33.1}
【3】分流(把一個流切分為多個流)
flink 1.14.4 移除了 split 算子,refer2 ?https://issues.apache.org/jira/browse/FLINK-19083
轉而使用 side output 側輸出實現,refer2
Side Outputs | Apache Flink
【3.1】 切分流(flink移除了split方法,需要使用 side output 來實現流切分)
1)代碼,啟動大于30度算高溫,否則低溫;
通過實現? ProcessFunction 來實現;
public class TransformTest4_SplitStream {public static void main(String[] args) throws Exception {// 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 從kafka讀取數據DataStream<String> inputStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor.txt");// 轉換為 SensorReader pojo類型DataStream<SensorReading> sensorStream = inputStream.map(x -> {String[] arr = x.split(",");return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));});// 1,分流,按照溫度值是否大于30度,分為兩條流-高溫和低溫OutputTag<SensorReading> highTag = new OutputTag<SensorReading>("high") {};OutputTag<SensorReading> lowTag = new OutputTag<SensorReading>("low") {};SingleOutputStreamOperator<SensorReading> splitStream = sensorStream.process(new ProcessFunction<SensorReading, SensorReading>() {@Overridepublic void processElement(SensorReading record, Context context, Collector<SensorReading> collector) throws Exception {// 把數據發送到側輸出context.output(record.getTemperature()>30? highTag : lowTag, record);// 把數據發送到常規輸出collector.collect(record);}});// 2, 選擇流打印輸出splitStream.getSideOutput(highTag).print("high");splitStream.getSideOutput(lowTag).print("low");// 執行env.execute("reduceStreamJob");} }sensor文本:
sensor_1,11,36.1 sensor_2,21,23.1 sensor_1,32,36.2 sensor_1,23,30.3 sensor_2,22,11.2打印結果:
high> SensorRd{id='sensor_1', timestamp=11, temperature=36.1}
low> SensorRd{id='sensor_2', timestamp=21, temperature=23.1}
high> SensorRd{id='sensor_1', timestamp=32, temperature=36.2}
high> SensorRd{id='sensor_1', timestamp=23, temperature=30.3}
low> SensorRd{id='sensor_2', timestamp=22, temperature=11.2}
以上分流代碼refer2 ?Process function: a versatile tool in Flink datastream API | Develop Paper
【4】connect 連接流
1)定義: 把多個流連接為一個流,叫做連接流,連接流中的子流的各自元素類型可以不同;
2)步驟:
- 把2個流 connect 連接再一起形成 ConnectedStream;
- 把連接流 通過 map 得到數據流;
代碼:
/*** @Description connect-連接流* @author xiao tang* @version 1.0.0* @createTime 2022年04月16日*/ public class TransformTest5_ConnectStream {public static void main(String[] args) throws Exception {// 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 從kafka讀取數據DataStream<String> inputStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor.txt");// 轉換為 SensorReader pojo類型DataStream<SensorReading> sensorStream = inputStream.map(x -> {String[] arr = x.split(",");return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));});// 1,分流,按照溫度值是否大于30度,分為兩條流-高溫和低溫OutputTag<SensorReading> highTag = new OutputTag<SensorReading>("high") {};OutputTag<SensorReading> lowTag = new OutputTag<SensorReading>("low") {};SingleOutputStreamOperator<SensorReading> splitStream = sensorStream.process(new ProcessFunction<SensorReading, SensorReading>() {@Overridepublic void processElement(SensorReading record, Context context, Collector<SensorReading> collector) throws Exception {// 把數據發送到側輸出context.output(record.getTemperature() > 30 ? highTag : lowTag, record);// 把數據發送到常規輸出collector.collect(record);}});// 得到高溫和低溫流DataStream<SensorReading> highStream = splitStream.getSideOutput(highTag);DataStream<SensorReading> lowStream = splitStream.getSideOutput(lowTag);// 2 把2個流連接為1個流(子流1的元素為3元組,子流2的元素為2元組)ConnectedStreams<SensorReading, SensorReading> connectedStreams = highStream.connect(lowStream);DataStream<Object> resultStream = connectedStreams.map(new CoMapFunction<SensorReading, SensorReading, Object>() {@Overridepublic Object map1(SensorReading rd) throws Exception {return new Tuple3<>(rd.getId(), rd.getTemperature(), "high"); // map1 作用于第1個流 highStream}@Overridepublic Object map2(SensorReading rd) throws Exception {return new Tuple2<>(rd.getId(), rd.getTemperature()); // map2 作用于第2個流 lowStream}});// 3 打印結果resultStream.print("connectedStream");// 執行env.execute("connectedStreamJob");} }sensor文本:
sensor_1,11,36.1 sensor_2,21,23.1 sensor_1,32,36.2 sensor_1,23,30.3 sensor_2,22,11.2打印結果:
connectedStream> (sensor_1,36.1,high)
connectedStream> (sensor_2,23.1)
connectedStream> (sensor_1,36.2,high)
connectedStream> (sensor_2,11.2)
connectedStream> (sensor_1,30.3,high)
【5】合流-union
上述connect,只能連接兩條流,如果要合并多條流,connect需要多次連接,不太適合;
如果要合并多條流,需要用 union,前提是 多個流的元素數據類型需要相同;
1)代碼
// 2 把3個流合并為1個流DataStream<SensorReading> inputStream2 = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor2.txt").map(x -> {String[] arr = x.split(",");return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));});DataStream<SensorReading> unionStream = highStream.union(lowStream,inputStream2);// 3 打印結果unionStream.print("unionStream");// 執行env.execute("unionStreamJob");打印結果:
unionStream> SensorRd{id='sensor2_1', timestamp=11, temperature=36.1}
unionStream> SensorRd{id='sensor2_2', timestamp=21, temperature=23.1}
unionStream> SensorRd{id='sensor2_1', timestamp=32, temperature=36.2}
unionStream> SensorRd{id='sensor2_1', timestamp=23, temperature=30.3}
unionStream> SensorRd{id='sensor2_2', timestamp=22, temperature=11.2}
unionStream> SensorRd{id='sensor_1', timestamp=11, temperature=36.1}
unionStream> SensorRd{id='sensor_1', timestamp=32, temperature=36.2}
unionStream> SensorRd{id='sensor_1', timestamp=23, temperature=30.3}
unionStream> SensorRd{id='sensor_2', timestamp=21, temperature=23.1}
unionStream> SensorRd{id='sensor_2', timestamp=22, temperature=11.2}
【6】自定義函數 UDF user-defined function
flink 暴露了所有udf 函數的接口,如MapFunction, FilterFunction, ProcessFunction等;可以理解為 java8引入的 函數式接口;
可以參考官方的udf文檔:
ck自定義函數 | Apache Flinkhttps://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/functions/udfs/
【6.1】富函數
1)復函數可以獲取上下文信息,而普通函數則不行;
代碼:
/*** @Description 富函數* @author xiao tang* @version 1.0.0* @createTime 2022年04月16日*/ public class TransformTest7_RichFunction {public static void main(String[] args) throws Exception {// 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 從文件讀取數據DataStream<String> inputStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor.txt");// 轉換為 SensorReader pojo類型DataStream<SensorReading> sensorStream = inputStream.map(x -> {String[] arr = x.split(",");return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));});// 自定義富函數DataStream<Tuple3<String,Integer,Integer>> richMapStream = sensorStream.map(new MyRichMapFunction());// 3 打印結果richMapStream.print("richMapStream");// 執行env.execute("richMapStreamJob");}// 富函數類static class MyRichMapFunction extends RichMapFunction<SensorReading, Tuple3<String, Integer, Integer>> {@Overridepublic Tuple3<String, Integer, Integer> map(SensorReading record) throws Exception {// 富函數可以獲取運行時上下文的屬性 getRuntimeContext() ,普通map函數則不行return new Tuple3<String, Integer, Integer>(record.getId(), record.getId().length(), getRuntimeContext().getIndexOfThisSubtask());}@Overridepublic void open(Configuration parameters) throws Exception {// 初始化工作,一般是定義狀態, 或者建立數據庫連接System.out.println("open db conn");}@Overridepublic void close() throws Exception {// 關閉連接,清空狀態System.out.println("close db conn");}} }sensor文本:
sensor_1,11,36.1 sensor_2,21,23.1 sensor_1,32,36.2 sensor_1,23,30.3 sensor_2,22,11.2?打印結果:
open db conn
open db conn
richMapStream:1> (sensor_1,8,0)
richMapStream:2> (sensor_1,8,1)
richMapStream:1> (sensor_2,8,0)
richMapStream:2> (sensor_2,8,1)
richMapStream:1> (sensor_1,8,0)
close db conn
close db conn
從打印結果可以看出,每個子任務(線程)都會執行 open close方法 ,tuple3中的第3個字段是 執行上下文的任務id(這是富函數才可以獲得上下文);
【7】flink中的數據重分區
1)flink中的分區指的是: taskmanager中的槽,即線程;
分區操作有:
- shuffle-洗牌亂分區;
- keyBy-按照key分區;
- global 把數據轉到第1個分區
2)代碼 :
/*** @Description 重分區* @author xiao tang* @version 1.0.0* @createTime 2022年04月16日*/ public class TransformTest8_Partition2 {public static void main(String[] args) throws Exception {// 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 從文件讀取數據DataStream<String> inputStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor.txt");// 轉換為 SensorReader pojo類型DataStream<SensorReading> sensorStream = inputStream.map(x -> {String[] arr = x.split(",");return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));});// 1-shuffle 洗牌(亂分區)DataStream<SensorReading> shuffleStream = sensorStream.shuffle();shuffleStream.print("shuffleStream");// 2-keyby 按照key分區DataStream<SensorReading> keybyStream = sensorStream.keyBy(SensorReading::getId); // keybyStream.print("keybyStream");// 3-global 把數據轉到第1個分區DataStream<SensorReading> globalStream = sensorStream.global(); // globalStream.print("globalStream");// 執行env.execute("partitionJob");} }sensor文本:
sensor_1,11,36.1 sensor_2,21,23.1 sensor_1,32,36.2 sensor_1,23,30.3 sensor_2,22,11.2原生分區結果:(重分區前)
rawStream:1> SensorRd{id='sensor_1', timestamp=11, temperature=36.1} rawStream:2> SensorRd{id='sensor_1', timestamp=23, temperature=30.3} rawStream:1> SensorRd{id='sensor_2', timestamp=21, temperature=23.1} rawStream:2> SensorRd{id='sensor_2', timestamp=22, temperature=11.2} rawStream:1> SensorRd{id='sensor_1', timestamp=32, temperature=36.2}shuffle-洗牌亂分區結果:
shuffleStream:2> SensorRd{id='sensor_1', timestamp=11, temperature=36.1} shuffleStream:1> SensorRd{id='sensor_2', timestamp=22, temperature=11.2} shuffleStream:2> SensorRd{id='sensor_1', timestamp=32, temperature=36.2} shuffleStream:1> SensorRd{id='sensor_2', timestamp=21, temperature=23.1} shuffleStream:2> SensorRd{id='sensor_1', timestamp=23, temperature=30.3}keyby-按照key進行分區的結果:
keybyStream:1> SensorRd{id='sensor_2', timestamp=21, temperature=23.1} keybyStream:2> SensorRd{id='sensor_1', timestamp=23, temperature=30.3} keybyStream:1> SensorRd{id='sensor_2', timestamp=22, temperature=11.2} keybyStream:2> SensorRd{id='sensor_1', timestamp=11, temperature=36.1} keybyStream:2> SensorRd{id='sensor_1', timestamp=32, temperature=36.2}global-把數據轉到第1個分區的打印結果:
globalStream:1> SensorRd{id='sensor_1', timestamp=23, temperature=30.3} globalStream:1> SensorRd{id='sensor_2', timestamp=22, temperature=11.2} globalStream:1> SensorRd{id='sensor_1', timestamp=11, temperature=36.1} globalStream:1> SensorRd{id='sensor_2', timestamp=21, temperature=23.1} globalStream:1> SensorRd{id='sensor_1', timestamp=32, temperature=36.2}總結
以上是生活随笔為你收集整理的【2】flink数据流转换算子的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 华为手机如何删除app软件
- 下一篇: 【3】flink sink