日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Spark-SQL从MySQL中加载数据以及将数据写入到mysql中(Spark Shell方式,Spark SQL程序)

發(fā)布時間:2024/9/27 数据库 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark-SQL从MySQL中加载数据以及将数据写入到mysql中(Spark Shell方式,Spark SQL程序) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

1. JDBC

Spark SQL可以通過JDBC從關系型數(shù)據(jù)庫中讀取數(shù)據(jù)的方式創(chuàng)建DataFrame,通過對DataFrame一系列的計算后,還可以將數(shù)據(jù)再寫回關系型數(shù)據(jù)庫中。

1.1. 從MySQL中加載數(shù)據(jù)(Spark Shell方式)

1.啟動Spark Shell,必須指定mysql連接驅(qū)動jar包

[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# bin/spark-shell --master spark://hadoop1:7077,hadoop2:7077 --jars /home/tuzq/software/spark-2.1.1-bin-hadoop2.7/jars/mysql-connector-java-5.1.38.jar --driver-class-path /home/tuzq/software/spark-2.1.1-bin-hadoop2.7/jars/mysql-connector-java-5.1.38.jar

2.從mysql中加載數(shù)據(jù)
進入bigdata中創(chuàng)建person表:

CREATE DATABASE bigdata CHARACTER SET utf8; USE bigdata; CREATE TABLE person (id INT(10) AUTO_INCREMENT PRIMARY KEY,name varchar(100),age INT(3) ) ENGINE=INNODB DEFAULT CHARSET=utf8;

并初始化數(shù)據(jù):

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc) scala> val jdbcDF = sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:mysql://hadoop10:3306/bigdata", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "person", "user" -> "root", "password" -> "123456")).load()

3.執(zhí)行查詢

scala> jdbcDF.show +---+--------+---+ | id| name|age| +---+--------+---+ | 1|zhangsan| 19| | 2| lisi| 20| | 3| wangwu| 28| | 4| zhaoliu| 26| | 5| tianqi| 55| +---+--------+---+

1.2. 將數(shù)據(jù)寫入到MySQL中(打jar包方式)

1.2.1編寫Spark SQL程序

package cn.toto.sparkimport java.sql.DriverManager import org.apache.spark.rdd.JdbcRDD import org.apache.spark.{SparkConf, SparkContext}/*** Created by toto on 2017/7/11.*/ object JdbcRDDDemo {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("JdbcRDDDemo").setMaster("local[2]")val sc = new SparkContext(conf)val connection = () => {Class.forName("com.mysql.jdbc.Driver").newInstance()DriverManager.getConnection("jdbc:mysql://hadoop10:3306/bigdata","root","123456")}//這個地方?jīng)]有讀取數(shù)據(jù)(數(shù)據(jù)庫表也用的是person)val jdbcRDD = new JdbcRDD(sc,connection,"SELECT * FROM person where id >= ? AND id <= ?",//這里表示從取數(shù)據(jù)庫中的第1、2、3、4條數(shù)據(jù),然后分兩個區(qū)1, 4, 2,r => {val id = r.getInt(1)val code = r.getString(2)(id, code)})//這里相當于是action獲取到數(shù)據(jù)val jrdd = jdbcRDD.collect()println(jrdd.toBuffer)sc.stop()} }

注意在運行的時候使用的還是person這個表,表中的數(shù)據(jù)如下:

如果是在IDEA中運行程序,程序結(jié)果如下:

1.2.2用maven將程序打包

1.2.3.將Jar包提交到spark集群

將bigdata-1.0-SNAPSHOT.jar放到:/home/tuzq/software/sparkdata,如下:

注意在運行執(zhí)行,要將mysql-connector-java-5.1.38.jar 放到:/home/tuzq/software/spark-2.1.1-bin-hadoop2.7/jars/下

bin/spark-submit --class cn.toto.spark.JdbcRDDDemo --master spark://hadoop1:7077 --jars /home/tuzq/software/spark-2.1.1-bin-hadoop2.7/jars/mysql-connector-java-5.1.38.jar --driver-class-path /home/tuzq/software/spark-2.1.1-bin-hadoop2.7/jars/mysql-connector-java-5.1.38.jar /home/tuzq/software/sparkdata/bigdata-1.0-SNAPSHOT.jar

運行結(jié)果:

2、通過Spark-sql將數(shù)據(jù)存儲到數(shù)據(jù)庫中

2.2.1.代碼如下:

package cn.toto.sparkimport java.util.Propertiesimport org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.{SparkConf, SparkContext}/*** Created by toto on 2017/7/11.*/ object JdbcRDD {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("MySQL-Demo").setMaster("local")val sc = new SparkContext(conf)val sqlContext = new SQLContext(sc)//通過并行化創(chuàng)建RDDval personRDD = sc.parallelize(Array("14 tom 5", "15 jerry 3", "16 kitty 6")).map(_.split(" "))//通過StrutType直接指定每個字段的schemaval schema = StructType(List(StructField("id",IntegerType,true),StructField("name",StringType,true),StructField("age",IntegerType,true)))//將RDD映射到rowRDDval rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))//將schema信息應用到rowRDD上val personDataFrame = sqlContext.createDataFrame(rowRDD,schema)//創(chuàng)建Properties存儲數(shù)據(jù)庫相關屬性val prop = new Properties()prop.put("user", "root")prop.put("password", "123456")//將數(shù)據(jù)追加到數(shù)據(jù)庫personDataFrame.write.mode("append").jdbc("jdbc:mysql://hadoop10:3306/bigdata","bigdata.person",prop)//停止SparkContextsc.stop()} }

運行結(jié)果:

2.2.2、用maven將程序打包

2.2.3、將Jar包提交到spark集群

bin/spark-submit --class cn.toto.spark.JdbcRDD --master spark://hadoop1:7077 --jars /home/tuzq/software/spark-2.1.1-bin-hadoop2.7/jars/mysql-connector-java-5.1.38.jar --driver-class-path /home/tuzq/software/spark-2.1.1-bin-hadoop2.7/jars/mysql-connector-java-5.1.38.jar /home/tuzq/software/sparkdata/bigdata-1.0-SNAPSHOT.jar 與50位技術(shù)專家面對面20年技術(shù)見證,附贈技術(shù)全景圖

總結(jié)

以上是生活随笔為你收集整理的Spark-SQL从MySQL中加载数据以及将数据写入到mysql中(Spark Shell方式,Spark SQL程序)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。