2021年大数据Flink(十一):流批一体API Source
目錄
Source
預定義Source
基于集合的Source
基于文件的Source
???????基于Socket的Source
自定義Source
隨機生成數據
???????MySQL
Source
預定義Source
基于集合的Source
- API
一般用于學習測試時編造數據時使用
1.env.fromElements(可變參數);
2.env.fromColletion(各種集合);
3.env.generateSequence(開始,結束);
4.env.fromSequence(開始,結束);
- 代碼演示:
package cn.it.source;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Arrays;/*** Author lanson* Desc* 把本地的普通的Java集合/Scala集合變為分布式的Flink的DataStream集合!* 一般用于學習測試時編造數據時使用* 1.env.fromElements(可變參數);* 2.env.fromColletion(各種集合);* 3.env.generateSequence(開始,結束);* 4.env.fromSequence(開始,結束);*/
public class SourceDemo01 {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2.source// * 1.env.fromElements(可變參數);DataStream<String> ds1 = env.fromElements("hadoop", "spark", "flink");// * 2.env.fromColletion(各種集合);DataStream<String> ds2 = env.fromCollection(Arrays.asList("hadoop", "spark", "flink"));// * 3.env.generateSequence(開始,結束);DataStream<Long> ds3 = env.generateSequence(1, 10);//* 4.env.fromSequence(開始,結束);DataStream<Long> ds4 = env.fromSequence(1, 10);//3.Transformation//4.sinkds1.print();ds2.print();ds3.print();ds4.print();//5.executeenv.execute();}
}
???????基于文件的Source
- API
一般用于學習測試
env.readTextFile(本地/HDFS文件/文件夾);//壓縮文件也可以
- 代碼演示:
package cn.it.source;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** Author lanson* Desc* 1.env.readTextFile(本地/HDFS文件/文件夾);//壓縮文件也可以*/
public class SourceDemo02 {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2.source// * 1.env.readTextFile(本地文件/HDFS文件);//壓縮文件也可以DataStream<String> ds1 = env.readTextFile("data/input/words.txt");DataStream<String> ds2 = env.readTextFile("data/input/dir");DataStream<String> ds3 = env.readTextFile("hdfs://node1:8020//wordcount/input/words.txt");DataStream<String> ds4 = env.readTextFile("data/input/wordcount.txt.gz");//3.Transformation//4.sinkds1.print();ds2.print();ds3.print();ds4.print();//5.executeenv.execute();}
}
???????基于Socket的Source
一般用于學習測試
- 需求
1.在node1上使用nc -lk 9999 向指定端口發送數據
nc是netcat的簡稱,原本是用來設置路由器,我們可以利用它向某個端口發送數據
如果沒有該命令可以下安裝
yum install -y nc
2.使用Flink編寫流處理應用程序實時統計單詞數量
- 代碼實現:
package cn.it.source;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** Author lanson* Desc* SocketSource*/
public class SourceDemo03 {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2.sourceDataStream<String> linesDS = env.socketTextStream("node1", 9999);//3.處理數據-transformation//3.1每一行數據按照空格切分成一個個的單詞組成一個集合DataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {//value就是一行行的數據String[] words = value.split(" ");for (String word : words) {out.collect(word);//將切割處理的一個個的單詞收集起來并返回}}});//3.2對集合中的每個單詞記為1DataStream<Tuple2<String, Integer>> wordAndOnesDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {//value就是進來一個個的單詞return Tuple2.of(value, 1);}});//3.3對數據按照單詞(key)進行分組//KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOnesDS.keyBy(0);KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOnesDS.keyBy(t -> t.f0);//3.4對各個組內的數據按照數量(value)進行聚合就是求sumDataStream<Tuple2<String, Integer>> result = groupedDS.sum(1);//4.輸出結果-sinkresult.print();//5.觸發執行-executeenv.execute();}
}
自定義Source
隨機生成數據
- API
一般用于學習測試,模擬生成一些數據
Flink還提供了數據源接口,我們實現該接口就可以實現自定義數據源,不同的接口有不同的功能,分類如下:
SourceFunction:非并行數據源(并行度只能=1)
RichSourceFunction:多功能非并行數據源(并行度只能=1)
ParallelSourceFunction:并行數據源(并行度能夠>=1)
RichParallelSourceFunction:多功能并行數據源(并行度能夠>=1)--后續學習的Kafka數據源使用的就是該接口
- 需求
每隔1秒隨機生成一條訂單信息(訂單ID、用戶ID、訂單金額、時間戳)
要求:
- 隨機生成訂單ID(UUID)
- 隨機生成用戶ID(0-2)
- 隨機生成訂單金額(0-100)
- 時間戳為當前系統時間
- 代碼實現
package cn.it.source;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import java.util.Random;
import java.util.UUID;/*** Author lanson* Desc*需求* 每隔1秒隨機生成一條訂單信息(訂單ID、用戶ID、訂單金額、時間戳)* 要求:* - 隨機生成訂單ID(UUID)* - 隨機生成用戶ID(0-2)* - 隨機生成訂單金額(0-100)* - 時間戳為當前系統時間** API* 一般用于學習測試,模擬生成一些數據* Flink還提供了數據源接口,我們實現該接口就可以實現自定義數據源,不同的接口有不同的功能,分類如下:* SourceFunction:非并行數據源(并行度只能=1)* RichSourceFunction:多功能非并行數據源(并行度只能=1)* ParallelSourceFunction:并行數據源(并行度能夠>=1)* RichParallelSourceFunction:多功能并行數據源(并行度能夠>=1)--后續學習的Kafka數據源使用的就是該接口*/
public class SourceDemo04_Customer {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2.SourceDataStream<Order> orderDS = env.addSource(new MyOrderSource()).setParallelism(2);//3.Transformation//4.SinkorderDS.print();//5.executeenv.execute();}@Data@NoArgsConstructor@AllArgsConstructorpublic static class Order {private String id;private Integer userId;private Integer money;private Long createTime;}public static class MyOrderSource extends RichParallelSourceFunction<Order> {private Boolean flag = true;@Overridepublic void run(SourceContext<Order> ctx) throws Exception {Random random = new Random();while (flag){Thread.sleep(1000);String id = UUID.randomUUID().toString();int userId = random.nextInt(3);int money = random.nextInt(101);long createTime = System.currentTimeMillis();ctx.collect(new Order(id,userId,money,createTime));}}//取消任務/執行cancle命令的時候執行@Overridepublic void cancel() {flag = false;}}
}
???????MySQL
- 需求:
實際開發中,經常會實時接收一些數據,要和MySQL中存儲的一些規則進行匹配,那么這時候就可以使用Flink自定義數據源從MySQL中讀取數據
那么現在先完成一個簡單的需求:
從MySQL中實時加載數據
要求MySQL中的數據有變化,也能被實時加載出來
- 準備數據
CREATE TABLE `t_student` (`id` int(11) NOT NULL AUTO_INCREMENT,`name` varchar(255) DEFAULT NULL,`age` int(11) DEFAULT NULL,PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;INSERT INTO `t_student` VALUES ('1', 'jack', '18');INSERT INTO `t_student` VALUES ('2', 'tom', '19');INSERT INTO `t_student` VALUES ('3', 'rose', '20');INSERT INTO `t_student` VALUES ('4', 'tom', '19');INSERT INTO `t_student` VALUES ('5', 'jack', '18');INSERT INTO `t_student` VALUES ('6', 'rose', '20');
- 代碼實現:
package cn.it.source;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.concurrent.TimeUnit;/*** Author lansnon* Desc* 需求:* 實際開發中,經常會實時接收一些數據,要和MySQL中存儲的一些規則進行匹配,那么這時候就可以使用Flink自定義數據源從MySQL中讀取數據* 那么現在先完成一個簡單的需求:* 從MySQL中實時加載數據* 要求MySQL中的數據有變化,也能被實時加載出來*/
public class SourceDemo05_Customer_MySQL {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.SourceDataStream<Student> studentDS = env.addSource(new MySQLSource()).setParallelism(1);//3.Transformation//4.SinkstudentDS.print();//5.executeenv.execute();}@Data@NoArgsConstructor@AllArgsConstructorpublic static class Student {private Integer id;private String name;private Integer age;}public static class MySQLSource extends RichParallelSourceFunction<Student> {private Connection conn = null;private PreparedStatement ps = null;@Overridepublic void open(Configuration parameters) throws Exception {//加載驅動,開啟連接//Class.forName("com.mysql.jdbc.Driver");conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "root");String sql = "select id,name,age from t_student";ps = conn.prepareStatement(sql);}private boolean flag = true;@Overridepublic void run(SourceContext<Student> ctx) throws Exception {while (flag) {ResultSet rs = ps.executeQuery();while (rs.next()) {int id = rs.getInt("id");String name = rs.getString("name");int age = rs.getInt("age");ctx.collect(new Student(id, name, age));}TimeUnit.SECONDS.sleep(5);}}@Overridepublic void cancel() {flag = false;}@Overridepublic void close() throws Exception {if (conn != null) conn.close();if (ps != null) ps.close();}}
}
總結
以上是生活随笔為你收集整理的2021年大数据Flink(十一):流批一体API Source的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Flink(十):流处理
- 下一篇: 2021年大数据Flink(十二):流批