日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

2021年大数据Flink(十一):流批一体API Source

發布時間:2023/11/28 生活经验 45 豆豆
生活随笔 收集整理的這篇文章主要介紹了 2021年大数据Flink(十一):流批一体API Source 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

Source

預定義Source

基于集合的Source

基于文件的Source

???????基于Socket的Source

自定義Source

隨機生成數據

???????MySQL


Source

預定義Source

基于集合的Source

  • API

一般用于學習測試時編造數據時使用

1.env.fromElements(可變參數);

2.env.fromColletion(各種集合);

3.env.generateSequence(開始,結束);

4.env.fromSequence(開始,結束);

  • 代碼演示:
package cn.it.source;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Arrays;/*** Author lanson* Desc* 把本地的普通的Java集合/Scala集合變為分布式的Flink的DataStream集合!* 一般用于學習測試時編造數據時使用* 1.env.fromElements(可變參數);* 2.env.fromColletion(各種集合);* 3.env.generateSequence(開始,結束);* 4.env.fromSequence(開始,結束);*/
public class SourceDemo01 {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2.source// * 1.env.fromElements(可變參數);DataStream<String> ds1 = env.fromElements("hadoop", "spark", "flink");// * 2.env.fromColletion(各種集合);DataStream<String> ds2 = env.fromCollection(Arrays.asList("hadoop", "spark", "flink"));// * 3.env.generateSequence(開始,結束);DataStream<Long> ds3 = env.generateSequence(1, 10);//* 4.env.fromSequence(開始,結束);DataStream<Long> ds4 = env.fromSequence(1, 10);//3.Transformation//4.sinkds1.print();ds2.print();ds3.print();ds4.print();//5.executeenv.execute();}
}

???????基于文件的Source

  • API

一般用于學習測試

env.readTextFile(本地/HDFS文件/文件夾);//壓縮文件也可以

  • 代碼演示:
package cn.it.source;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** Author lanson* Desc* 1.env.readTextFile(本地/HDFS文件/文件夾);//壓縮文件也可以*/
public class SourceDemo02 {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2.source// * 1.env.readTextFile(本地文件/HDFS文件);//壓縮文件也可以DataStream<String> ds1 = env.readTextFile("data/input/words.txt");DataStream<String> ds2 = env.readTextFile("data/input/dir");DataStream<String> ds3 = env.readTextFile("hdfs://node1:8020//wordcount/input/words.txt");DataStream<String> ds4 = env.readTextFile("data/input/wordcount.txt.gz");//3.Transformation//4.sinkds1.print();ds2.print();ds3.print();ds4.print();//5.executeenv.execute();}
}

???????基于Socket的Source

一般用于學習測試

  • 需求

1.在node1上使用nc -lk 9999 向指定端口發送數據

nc是netcat的簡稱,原本是用來設置路由器,我們可以利用它向某個端口發送數據

如果沒有該命令可以下安裝

yum install -y nc

2.使用Flink編寫流處理應用程序實時統計單詞數量

  • 代碼實現:
package cn.it.source;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** Author lanson* Desc* SocketSource*/
public class SourceDemo03 {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2.sourceDataStream<String> linesDS = env.socketTextStream("node1", 9999);//3.處理數據-transformation//3.1每一行數據按照空格切分成一個個的單詞組成一個集合DataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {//value就是一行行的數據String[] words = value.split(" ");for (String word : words) {out.collect(word);//將切割處理的一個個的單詞收集起來并返回}}});//3.2對集合中的每個單詞記為1DataStream<Tuple2<String, Integer>> wordAndOnesDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {//value就是進來一個個的單詞return Tuple2.of(value, 1);}});//3.3對數據按照單詞(key)進行分組//KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOnesDS.keyBy(0);KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOnesDS.keyBy(t -> t.f0);//3.4對各個組內的數據按照數量(value)進行聚合就是求sumDataStream<Tuple2<String, Integer>> result = groupedDS.sum(1);//4.輸出結果-sinkresult.print();//5.觸發執行-executeenv.execute();}
}

自定義Source

隨機生成數據

  • API

一般用于學習測試,模擬生成一些數據

Flink還提供了數據源接口,我們實現該接口就可以實現自定義數據源,不同的接口有不同的功能,分類如下:

SourceFunction:非并行數據源(并行度只能=1)

RichSourceFunction:多功能非并行數據源(并行度只能=1)

ParallelSourceFunction:并行數據源(并行度能夠>=1)

RichParallelSourceFunction:多功能并行數據源(并行度能夠>=1)--后續學習的Kafka數據源使用的就是該接口

  • 需求

每隔1秒隨機生成一條訂單信息(訂單ID、用戶ID、訂單金額、時間戳)

要求:

- 隨機生成訂單ID(UUID)

- 隨機生成用戶ID(0-2)

- 隨機生成訂單金額(0-100)

- 時間戳為當前系統時間

  • 代碼實現
package cn.it.source;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import java.util.Random;
import java.util.UUID;/*** Author lanson* Desc*需求* 每隔1秒隨機生成一條訂單信息(訂單ID、用戶ID、訂單金額、時間戳)* 要求:* - 隨機生成訂單ID(UUID)* - 隨機生成用戶ID(0-2)* - 隨機生成訂單金額(0-100)* - 時間戳為當前系統時間** API* 一般用于學習測試,模擬生成一些數據* Flink還提供了數據源接口,我們實現該接口就可以實現自定義數據源,不同的接口有不同的功能,分類如下:* SourceFunction:非并行數據源(并行度只能=1)* RichSourceFunction:多功能非并行數據源(并行度只能=1)* ParallelSourceFunction:并行數據源(并行度能夠>=1)* RichParallelSourceFunction:多功能并行數據源(并行度能夠>=1)--后續學習的Kafka數據源使用的就是該接口*/
public class SourceDemo04_Customer {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2.SourceDataStream<Order> orderDS = env.addSource(new MyOrderSource()).setParallelism(2);//3.Transformation//4.SinkorderDS.print();//5.executeenv.execute();}@Data@NoArgsConstructor@AllArgsConstructorpublic static class Order {private String id;private Integer userId;private Integer money;private Long createTime;}public static class MyOrderSource extends RichParallelSourceFunction<Order> {private Boolean flag = true;@Overridepublic void run(SourceContext<Order> ctx) throws Exception {Random random = new Random();while (flag){Thread.sleep(1000);String id = UUID.randomUUID().toString();int userId = random.nextInt(3);int money = random.nextInt(101);long createTime = System.currentTimeMillis();ctx.collect(new Order(id,userId,money,createTime));}}//取消任務/執行cancle命令的時候執行@Overridepublic void cancel() {flag = false;}}
}

???????MySQL

  • 需求:

實際開發中,經常會實時接收一些數據,要和MySQL中存儲的一些規則進行匹配,那么這時候就可以使用Flink自定義數據源從MySQL中讀取數據

那么現在先完成一個簡單的需求:

從MySQL中實時加載數據

要求MySQL中的數據有變化,也能被實時加載出來

  • 準備數據
CREATE TABLE `t_student` (`id` int(11) NOT NULL AUTO_INCREMENT,`name` varchar(255) DEFAULT NULL,`age` int(11) DEFAULT NULL,PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;INSERT INTO `t_student` VALUES ('1', 'jack', '18');INSERT INTO `t_student` VALUES ('2', 'tom', '19');INSERT INTO `t_student` VALUES ('3', 'rose', '20');INSERT INTO `t_student` VALUES ('4', 'tom', '19');INSERT INTO `t_student` VALUES ('5', 'jack', '18');INSERT INTO `t_student` VALUES ('6', 'rose', '20');

  • 代碼實現:
package cn.it.source;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.concurrent.TimeUnit;/*** Author lansnon* Desc* 需求:* 實際開發中,經常會實時接收一些數據,要和MySQL中存儲的一些規則進行匹配,那么這時候就可以使用Flink自定義數據源從MySQL中讀取數據* 那么現在先完成一個簡單的需求:* 從MySQL中實時加載數據* 要求MySQL中的數據有變化,也能被實時加載出來*/
public class SourceDemo05_Customer_MySQL {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.SourceDataStream<Student> studentDS = env.addSource(new MySQLSource()).setParallelism(1);//3.Transformation//4.SinkstudentDS.print();//5.executeenv.execute();}@Data@NoArgsConstructor@AllArgsConstructorpublic static class Student {private Integer id;private String name;private Integer age;}public static class MySQLSource extends RichParallelSourceFunction<Student> {private Connection conn = null;private PreparedStatement ps = null;@Overridepublic void open(Configuration parameters) throws Exception {//加載驅動,開啟連接//Class.forName("com.mysql.jdbc.Driver");conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "root");String sql = "select id,name,age from t_student";ps = conn.prepareStatement(sql);}private boolean flag = true;@Overridepublic void run(SourceContext<Student> ctx) throws Exception {while (flag) {ResultSet rs = ps.executeQuery();while (rs.next()) {int id = rs.getInt("id");String name = rs.getString("name");int age = rs.getInt("age");ctx.collect(new Student(id, name, age));}TimeUnit.SECONDS.sleep(5);}}@Overridepublic void cancel() {flag = false;}@Overridepublic void close() throws Exception {if (conn != null) conn.close();if (ps != null) ps.close();}}
}

總結

以上是生活随笔為你收集整理的2021年大数据Flink(十一):流批一体API Source的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。