日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

sparkcore写mysql_spark读写mysql

發布時間:2024/10/8 数据库 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 sparkcore写mysql_spark读写mysql 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

首先還是pom文件:

UTF-8

1.8

1.8

UTF-8

2.11.12

2.4.5

2.7.7

2.11

org.scala-lang

scala-library

${scala.version}

org.apache.spark

spark-core_2.11

${spark.version}

org.apache.spark

spark-sql_2.11

${spark.version}

org.apache.spark

spark-streaming_2.11

${spark.version}

org.apache.hadoop

hadoop-client

${hadoop.version}

mysql

mysql-connector-java

5.1.45

log4j

log4j

1.2.17

runtime

代碼:讀mysql

import java.sql.DriverManager

import org.apache.spark.rdd.JdbcRDD

import org.apache.spark.{SparkConf, SparkContext}

object MysqlRDD {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setAppName("readMysql").setMaster("local[*]")

val sparkContext = new SparkContext(sparkConf)

val jdbcrdd: JdbcRDD[String] = new JdbcRDD(sparkContext

, ()=>{

Class.forName("com.mysql.jdbc.Driver")

DriverManager.getConnection("jdbc:mysql://hadoop01:3306/transaction", "root", "root")

}

, "select * from orders where realTotalMoney>? and realTotalMoney"

, 150

, 151

, 1

, (r) => {

r.getString(1)+","+

r.getString(2)+","+

r.getString(3)+","+

r.getString(4)+","+

r.getString(5)

}

)

jdbcrdd.foreach(println)

print(jdbcrdd.count())

sparkContext.stop()

}

}

寫入mysql,這里有效率問題需要注意:

低效版本:

import java.sql.DriverManager

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object RddToMysql {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setAppName("rddToMysql").setMaster("local[*]")

val sparkContext: SparkContext = SparkContext.getOrCreate(sparkConf)

val rdd: RDD[(Int, String, Int)] = sparkContext.parallelize(List((1, "yls", 31), (2, "byl", 27), (3, "yms", 29)),1)

rdd.foreach{ case (a: Int, b: String, c: Int) => {

Class.forName("com.mysql.jdbc.Driver")

val connection = DriverManager.getConnection("jdbc:mysql://hadoop01:3306/test", "root", "root")

val sql = "insert into student(id,name,age) values(?,?,?)"

val preparedStatement = connection.prepareStatement(sql)

preparedStatement.setInt(1, a)

preparedStatement.setString(2, b)

preparedStatement.setInt(3, c)

preparedStatement.executeUpdate()

preparedStatement.close()

}}

sparkContext.stop()

}

}

效率提升版本:

import java.sql.DriverManager

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object RddToMysql {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setAppName("rddToMysql").setMaster("local[*]")

val sparkContext: SparkContext = SparkContext.getOrCreate(sparkConf)

val rdd: RDD[(Int, String, Int)] = sparkContext.parallelize(List((1, "yls", 31), (2, "byl", 27), (3, "yms", 29)),1)

rdd.foreachPartition{case it:Iterator[(Int,String,Int)]=>{

Class.forName("com.mysql.jdbc.Driver")

val connection = DriverManager.getConnection("jdbc:mysql://hadoop01:3306/test", "root", "root")

val sql = "insert into student(id,name,age) values(?,?,?)"

it.foreach{case (a:Int,b:String,c:Int)=>{

val preparedStatement = connection.prepareStatement(sql)

preparedStatement.setInt(1, a)

preparedStatement.setString(2, b)

preparedStatement.setInt(3, c)

preparedStatement.executeUpdate()

preparedStatement.close()

}

}

}}

sparkContext.stop()

}

}

總結

以上是生活随笔為你收集整理的sparkcore写mysql_spark读写mysql的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。