日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

2021年大数据Flink(十三):流批一体API Sink

發布時間:2023/11/28 生活经验 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 2021年大数据Flink(十三):流批一体API Sink 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

Sink

預定義Sink

基于控制臺和文件的Sink

自定義Sink

MySQL


Sink

預定義Sink

基于控制臺和文件的Sink

  • API

1.ds.print 直接輸出到控制臺

2.ds.printToErr() 直接輸出到控制臺,用紅色

3.ds.writeAsText("本地/HDFS的path",WriteMode.OVERWRITE).setParallelism(1)

  • 注意:

在輸出到path的時候,可以在前面設置并行度,如果

并行度>1,則path為目錄

并行度=1,則path為文件名

  • 代碼演示:
package cn.it.sink;import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** Author lanson* Desc* 1.ds.print 直接輸出到控制臺* 2.ds.printToErr() 直接輸出到控制臺,用紅色* 3.ds.collect 將分布式數據收集為本地集合* 4.ds.setParallelism(1).writeAsText("本地/HDFS的path",WriteMode.OVERWRITE)*/
public class SinkDemo01?{public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.source//DataStream<String> ds = env.fromElements("hadoop", "flink");DataStream<String> ds = env.readTextFile("data/input/words.txt");//3.transformation//4.sinkds.print();ds.printToErr();ds.writeAsText("data/output/test", FileSystem.WriteMode.OVERWRITE).setParallelism(2);//注意://Parallelism=1為文件//Parallelism>1為文件夾//5.executeenv.execute();}
}

自定義Sink

MySQL

  • 需求:

將Flink集合中的數據通過自定義Sink保存到MySQL

  • 代碼實現:
package cn.it.sink;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;/*** Author lanson* Desc* 使用自定義sink將數據保存到MySQL*/
public class SinkDemo02_MySQL {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.SourceDataStream<Student> studentDS = env.fromElements(new Student(null, "tonyma", 18));//3.Transformation//4.SinkstudentDS.addSink(new MySQLSink());//5.executeenv.execute();}@Data@NoArgsConstructor@AllArgsConstructorpublic static class Student {private Integer id;private String name;private Integer age;}public static class MySQLSink extends RichSinkFunction<Student> {private Connection conn = null;private PreparedStatement ps = null;@Overridepublic void open(Configuration parameters) throws Exception {//加載驅動,開啟連接//Class.forName("com.mysql.jdbc.Driver");conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "root");String sql = "INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (null, ?, ?)";ps = conn.prepareStatement(sql);}@Overridepublic void invoke(Student value, Context context) throws Exception {//給ps中的?設置具體值ps.setString(1,value.getName());ps.setInt(2,value.getAge());//執行sqlps.executeUpdate();}@Overridepublic void close() throws Exception {if (conn != null) conn.close();if (ps != null) ps.close();}}
}

總結

以上是生活随笔為你收集整理的2021年大数据Flink(十三):流批一体API Sink的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。