flink读mysql速度怎么样_[DB] Flink 读 MySQL
思路
在 Flink 中創(chuàng)建一張表有兩種方法:
從一個(gè)文件中導(dǎo)入表結(jié)構(gòu)(Structure)(常用于批計(jì)算)(靜態(tài))
從 DataStream 或者 DataSet 轉(zhuǎn)換成 Table (動(dòng)態(tài))
package com.kaikeba.mysql.demo
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala.BatchTableEnvironment
import org.apache.flink.types.Row
object Flink2Mysql {
def main(args: Array[String]): Unit = {
//設(shè)定執(zhí)行環(huán)境
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = BatchTableEnvironment.create(env)
//通過(guò)創(chuàng)建JDBCInputFormat讀取JDBC數(shù)據(jù)源
val jdbcDataSet: DataSet[Row] =
env.createInput(JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.mysql.cj.jdbc.Driver")
.setDBUrl("jdbc:mysql://127.0.0.1:3306/flink-mysql?serverTimezone=GMT%2B8&characterEncoding=UTF-8&useSSL=false")
.setUsername("root")
.setPassword("Chen1227+")
.setQuery("select * from filter")
.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
.finish()
)
//將DataSet注冊(cè)為表
tEnv.registerDataSet("tb", jdbcDataSet)
//執(zhí)行查詢操作
val table = tEnv.sqlQuery("select * from tb")
//把table轉(zhuǎn)為DataSet
tEnv.toDataSet[Row](table).print()
}
}
?
參考
Flink 讀寫(xiě) Mysql
Flink流處理訪問(wèn)MySQL
Flink實(shí)例
總結(jié)
以上是生活随笔為你收集整理的flink读mysql速度怎么样_[DB] Flink 读 MySQL的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 银行卡信息生成
- 下一篇: mysql not exists无效_分