日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL

發布時間:2025/5/22 数据库 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

<!-- more -->

前言

之前其實在 《從0到1學習Flink》—— 如何自定義 Data Sink ? 文章中其實已經寫了點將數據寫入到 MySQL,但是一些配置化的東西當時是寫死的,不能夠通用,最近知識星球里有朋友叫我: 寫個從 kafka 中讀取數據,經過 Flink 做個預聚合,然后創建數據庫連接池將數據批量寫入到 mysql 的例子。

于是才有了這篇文章,更多提問和想要我寫的文章可以在知識星球里像我提問,我會根據提問及時回答和盡可能作出文章的修改。

準備

你需要將這兩個依賴添加到 pom.xml 中

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.34</version> </dependency>

讀取 kafka 數據

這里我依舊用的以前的 student 類,自己本地起了 kafka 然后造一些測試數據,這里我們測試發送一條數據則 sleep 10s,意味著往 kafka 中一分鐘發 6 條數據。

package com.zhisheng.connectors.mysql.utils;import com.zhisheng.common.utils.GsonUtil; import com.zhisheng.connectors.mysql.model.Student; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** Desc: 往kafka中寫數據,可以使用這個main函數進行測試* Created by zhisheng on 2019-02-17* Blog: http://www.54tianzhisheng.cn/tags/Flink/*/ public class KafkaUtil {public static final String broker_list = "localhost:9092";public static final String topic = "student"; //kafka topic 需要和 flink 程序用同一個 topicpublic static void writeToKafka() throws InterruptedException {Properties props = new Properties();props.put("bootstrap.servers", broker_list);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer producer = new KafkaProducer<String, String>(props);for (int i = 1; i <= 100; i++) {Student student = new Student(i, "zhisheng" + i, "password" + i, 18 + i);ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, GsonUtil.toJson(student));producer.send(record);System.out.println("發送數據: " + GsonUtil.toJson(student));Thread.sleep(10 * 1000); //發送一條數據 sleep 10s,相當于 1 分鐘 6 條}producer.flush();}public static void main(String[] args) throws InterruptedException {writeToKafka();} }

從 kafka 中讀取數據,然后序列化成 student 對象。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("zookeeper.connect", "localhost:2181"); props.put("group.id", "metric-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "latest");SingleOutputStreamOperator<Student> student = env.addSource(new FlinkKafkaConsumer011<>("student", //這個 kafka topic 需要和上面的工具類的 topic 一致new SimpleStringSchema(),props)).setParallelism(1).map(string -> GsonUtil.fromJson(string, Student.class)); //,解析字符串成 student 對象

因為 RichSinkFunction 中如果 sink 一條數據到 mysql 中就會調用 invoke 方法一次,所以如果要實現批量寫的話,我們最好在 sink 之前就把數據聚合一下。那這里我們開個一分鐘的窗口去聚合 Student 數據。

student.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction<Student, List<Student>, TimeWindow>() {@Overridepublic void apply(TimeWindow window, Iterable<Student> values, Collector<List<Student>> out) throws Exception {ArrayList<Student> students = Lists.newArrayList(values);if (students.size() > 0) {System.out.println("1 分鐘內收集到 student 的數據條數是:" + students.size());out.collect(students);}} });

寫入數據庫

這里使用 DBCP 連接池連接數據庫 mysql,pom.xml 中添加依賴:

<dependency><groupId>org.apache.commons</groupId><artifactId>commons-dbcp2</artifactId><version>2.1.1</version> </dependency>

如果你想使用其他的數據庫連接池請加入對應的依賴。

這里將數據寫入到 MySQL 中,依舊是和之前文章一樣繼承 RichSinkFunction 類,重寫里面的方法:

package com.zhisheng.connectors.mysql.sinks;import com.zhisheng.connectors.mysql.model.Student; import org.apache.commons.dbcp2.BasicDataSource; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import javax.sql.DataSource; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.util.List;/*** Desc: 數據批量 sink 數據到 mysql* Created by zhisheng_tian on 2019-02-17* Blog: http://www.54tianzhisheng.cn/tags/Flink/*/ public class SinkToMySQL extends RichSinkFunction<List<Student>> {PreparedStatement ps;BasicDataSource dataSource;private Connection connection;/*** open() 方法中建立連接,這樣不用每次 invoke 的時候都要建立連接和釋放連接** @param parameters* @throws Exception*/@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);dataSource = new BasicDataSource();connection = getConnection(dataSource);String sql = "insert into Student(id, name, password, age) values(?, ?, ?, ?);";ps = this.connection.prepareStatement(sql);}@Overridepublic void close() throws Exception {super.close();//關閉連接和釋放資源if (connection != null) {connection.close();}if (ps != null) {ps.close();}}/*** 每條數據的插入都要調用一次 invoke() 方法** @param value* @param context* @throws Exception*/@Overridepublic void invoke(List<Student> value, Context context) throws Exception {//遍歷數據集合for (Student student : value) {ps.setInt(1, student.getId());ps.setString(2, student.getName());ps.setString(3, student.getPassword());ps.setInt(4, student.getAge());ps.addBatch();}int[] count = ps.executeBatch();//批量后執行System.out.println("成功了插入了" + count.length + "行數據");}private static Connection getConnection(BasicDataSource dataSource) {dataSource.setDriverClassName("com.mysql.jdbc.Driver");//注意,替換成自己本地的 mysql 數據庫地址和用戶名、密碼dataSource.setUrl("jdbc:mysql://localhost:3306/test");dataSource.setUsername("root");dataSource.setPassword("root123456");//設置連接池的一些參數dataSource.setInitialSize(10);dataSource.setMaxTotal(50);dataSource.setMinIdle(2);Connection con = null;try {con = dataSource.getConnection();System.out.println("創建連接池:" + con);} catch (Exception e) {System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());}return con;} }

核心類 Main

核心程序如下:

public class Main {public static void main(String[] args) throws Exception{final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("zookeeper.connect", "localhost:2181");props.put("group.id", "metric-group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "latest");SingleOutputStreamOperator<Student> student = env.addSource(new FlinkKafkaConsumer011<>("student", //這個 kafka topic 需要和上面的工具類的 topic 一致new SimpleStringSchema(),props)).setParallelism(1).map(string -> GsonUtil.fromJson(string, Student.class)); //student.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction<Student, List<Student>, TimeWindow>() {@Overridepublic void apply(TimeWindow window, Iterable<Student> values, Collector<List<Student>> out) throws Exception {ArrayList<Student> students = Lists.newArrayList(values);if (students.size() > 0) {System.out.println("1 分鐘內收集到 student 的數據條數是:" + students.size());out.collect(students);}}}).addSink(new SinkToMySQL());env.execute("flink learning connectors kafka");} }

運行項目

運行 Main 類后再運行 KafkaUtils.java 類!

下圖是往 Kafka 中發送的數據:

下圖是運行 Main 類的日志,會創建 4 個連接池是因為默認的 4 個并行度,你如果在 addSink 這個算子設置并行度為 1 的話就會創建一個連接池:

下圖是批量插入數據庫的結果:

總結

本文從知識星球一位朋友的疑問來寫的,應該都滿足了他的條件(批量/數據庫連接池/寫入mysql),的確網上很多的例子都是簡單的 demo 形式,都是單條數據就創建數據庫連接插入 MySQL,如果要寫的數據量很大的話,會對 MySQL 的寫有很大的壓力。這也是我之前在 《從0到1學習Flink》—— Flink 寫入數據到 ElasticSearch 中,數據寫 ES 強調過的,如果要提高性能必定要批量的寫。就拿我們現在這篇文章來說,如果數據量大的話,聚合一分鐘數據達萬條,那么這樣批量寫會比來一條寫一條性能提高不知道有多少。

本文原創地址是: http://www.54tianzhisheng.cn/2019/01/15/Flink-MySQL-sink/ , 未經允許禁止轉載。

關注我

微信公眾號:zhisheng

另外我自己整理了些 Flink 的學習資料,目前已經全部放到微信公眾號了。你可以加我的微信:zhisheng_tian,然后回復關鍵字:Flink 即可無條件獲取到。

更多私密資料請加入知識星球!

Github 代碼倉庫

https://github.com/zhisheng17/flink-learning/

以后這個項目的所有代碼都將放在這個倉庫里,包含了自己學習 flink 的一些 demo 和博客。

本文的項目代碼在 https://github.com/zhisheng17/flink-learning/tree/master/flink-learning-connectors/flink-learning-connectors-mysql

相關文章

1、《從0到1學習Flink》—— Apache Flink 介紹

2、《從0到1學習Flink》—— Mac 上搭建 Flink 1.6.0 環境并構建運行簡單程序入門

3、《從0到1學習Flink》—— Flink 配置文件詳解

4、《從0到1學習Flink》—— Data Source 介紹

5、《從0到1學習Flink》—— 如何自定義 Data Source ?

6、《從0到1學習Flink》—— Data Sink 介紹

7、《從0到1學習Flink》—— 如何自定義 Data Sink ?

8、《從0到1學習Flink》—— Flink Data transformation(轉換)

9、《從0到1學習Flink》—— 介紹Flink中的Stream Windows

10、《從0到1學習Flink》—— Flink 中的幾種 Time 詳解

11、《從0到1學習Flink》—— Flink 寫入數據到 ElasticSearch

12、《從0到1學習Flink》—— Flink 項目如何運行?

13、《從0到1學習Flink》—— Flink 寫入數據到 Kafka

14、《從0到1學習Flink》—— Flink JobManager 高可用性配置

15、《從0到1學習Flink》—— Flink parallelism 和 Slot 介紹

16、《從0到1學習Flink》—— Flink 讀取 Kafka 數據批量寫入到 MySQL

總結

以上是生活随笔為你收集整理的《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 99视频免费观看 | 国产精品21p | 久久国产精品亚洲 | 婷婷五月色综合 | 黑鬼大战白妞高潮喷白浆 | 天堂色区 | 日韩城人网站 | 免费的黄网站 | 最近中文字幕在线观看 | 黄色片日韩 | 国产乱码一区二区三区在线观看 | 久久精品视频在线播放 | 欧美不卡视频在线观看 | 在线观看污| 最新天堂中文在线 | 超污网站在线观看 | 日本视频色 | 在线看黄色网 | 天堂…中文在线最新版在线 | 国产亚洲精品久久久久久打不开 | 精品热 | 亚洲乱码视频在线观看 | 欧美成人毛片 | 欧美一卡二卡三卡四卡 | 国产日韩一区二区在线观看 | 欧美一区二区在线视频观看 | 亚洲自拍偷拍网站 | 国产在线xxxx | 国产伊人网 | 光棍影院一区二区 | 色综合av在线 | 成人av网站免费 | 美女黄页在线观看 | 超薄肉色丝袜一二三 | 国产精品1024 | 日本无遮羞调教打屁股网站 | 懂色aⅴ国产一区二区三区 亚洲欧美国产另类 | 在线视频播放大全 | 影音先锋久久久 | 中文字幕亚洲国产 | 亚洲欧美日韩在线一区 | 狠狠人妻久久久久久综合 | 爱豆国产剧免费观看大全剧集 | 一路向西在线看 | 加勒比hezyo黑人专区 | 91精品国产闺蜜国产在线闺蜜 | 亚洲va久久久噜噜噜久久天堂 | 少妇免费视频 | 中文字幕在线网站 | 在线亚洲欧美 | 丰满人妻一区二区三区53视频 | 蜜桃av鲁一鲁一鲁一鲁俄罗斯的 | 精品乱码一区二区三四区视频 | 午夜精品福利一区二区 | 伊人99在线 | 日韩欧美一区二区在线观看 | 乱短篇艳辣500篇h文最新章节 | 99精品国产99久久久久久97 | a毛片毛片av永久免费 | 香蕉影院在线 | 精品裸体舞一区二区三区 | 永久免费汤不热视频 | 深夜精品| 久久久欧美精品 | 欧美视频第一页 | 日韩美女在线视频 | 国产福利一区二区三区在线观看 | 最好看的中文字幕国语电影mv | 冲田杏梨一区二区三区 | 欧洲视频一区二区 | 91免费在线视频 | 性日本xxx| 国产人免费人成免费视频 | 国产视频精品视频 | 日韩激情片 | 反差在线观看免费版全集完整版 | 日本免费黄色大片 | 亚洲一卡一卡 | 天天干天天天 | 中文字幕日韩人妻在线视频 | 天天看av | 日本激情网站 | 中文字幕女同 | 男同志毛片特黄毛片 | 久久久久人妻一区精品 | 日本色中色 | 99视频一区| 操操操综合网 | 亚洲精视频 | 日韩欧美久久精品 | 亚洲乱码国产乱码精品精剪 | 免费av在线网| 久久久久伊人 | 91艹| 青春草国产视频 | 国产九区 | 女人看黄色网 | 国产美女自拍 | 精品乱子一区二区三区 |