Spark-SQL从MySQL中加载数据以及将数据写入到mysql中(Spark Shell方式,Spark SQL程序)
生活随笔
收集整理的這篇文章主要介紹了
Spark-SQL从MySQL中加载数据以及将数据写入到mysql中(Spark Shell方式,Spark SQL程序)
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
1. JDBC
Spark SQL可以通過JDBC從關系型數(shù)據(jù)庫中讀取數(shù)據(jù)的方式創(chuàng)建DataFrame,通過對DataFrame一系列的計算后,還可以將數(shù)據(jù)再寫回關系型數(shù)據(jù)庫中。
1.1. 從MySQL中加載數(shù)據(jù)(Spark Shell方式)
1.啟動Spark Shell,必須指定mysql連接驅(qū)動jar包
[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# bin/spark-shell --master spark://hadoop1:7077,hadoop2:7077 --jars /home/tuzq/software/spark-2.1.1-bin-hadoop2.7/jars/mysql-connector-java-5.1.38.jar --driver-class-path /home/tuzq/software/spark-2.1.1-bin-hadoop2.7/jars/mysql-connector-java-5.1.38.jar2.從mysql中加載數(shù)據(jù)
進入bigdata中創(chuàng)建person表:
并初始化數(shù)據(jù):
3.執(zhí)行查詢
scala> jdbcDF.show +---+--------+---+ | id| name|age| +---+--------+---+ | 1|zhangsan| 19| | 2| lisi| 20| | 3| wangwu| 28| | 4| zhaoliu| 26| | 5| tianqi| 55| +---+--------+---+1.2. 將數(shù)據(jù)寫入到MySQL中(打jar包方式)
1.2.1編寫Spark SQL程序
package cn.toto.sparkimport java.sql.DriverManager import org.apache.spark.rdd.JdbcRDD import org.apache.spark.{SparkConf, SparkContext}/*** Created by toto on 2017/7/11.*/ object JdbcRDDDemo {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("JdbcRDDDemo").setMaster("local[2]")val sc = new SparkContext(conf)val connection = () => {Class.forName("com.mysql.jdbc.Driver").newInstance()DriverManager.getConnection("jdbc:mysql://hadoop10:3306/bigdata","root","123456")}//這個地方?jīng)]有讀取數(shù)據(jù)(數(shù)據(jù)庫表也用的是person)val jdbcRDD = new JdbcRDD(sc,connection,"SELECT * FROM person where id >= ? AND id <= ?",//這里表示從取數(shù)據(jù)庫中的第1、2、3、4條數(shù)據(jù),然后分兩個區(qū)1, 4, 2,r => {val id = r.getInt(1)val code = r.getString(2)(id, code)})//這里相當于是action獲取到數(shù)據(jù)val jrdd = jdbcRDD.collect()println(jrdd.toBuffer)sc.stop()} }注意在運行的時候使用的還是person這個表,表中的數(shù)據(jù)如下:
如果是在IDEA中運行程序,程序結(jié)果如下:
1.2.2用maven將程序打包
1.2.3.將Jar包提交到spark集群
將bigdata-1.0-SNAPSHOT.jar放到:/home/tuzq/software/sparkdata,如下:
注意在運行執(zhí)行,要將mysql-connector-java-5.1.38.jar 放到:/home/tuzq/software/spark-2.1.1-bin-hadoop2.7/jars/下
bin/spark-submit --class cn.toto.spark.JdbcRDDDemo --master spark://hadoop1:7077 --jars /home/tuzq/software/spark-2.1.1-bin-hadoop2.7/jars/mysql-connector-java-5.1.38.jar --driver-class-path /home/tuzq/software/spark-2.1.1-bin-hadoop2.7/jars/mysql-connector-java-5.1.38.jar /home/tuzq/software/sparkdata/bigdata-1.0-SNAPSHOT.jar運行結(jié)果:
2、通過Spark-sql將數(shù)據(jù)存儲到數(shù)據(jù)庫中
2.2.1.代碼如下:
package cn.toto.sparkimport java.util.Propertiesimport org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.{SparkConf, SparkContext}/*** Created by toto on 2017/7/11.*/ object JdbcRDD {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("MySQL-Demo").setMaster("local")val sc = new SparkContext(conf)val sqlContext = new SQLContext(sc)//通過并行化創(chuàng)建RDDval personRDD = sc.parallelize(Array("14 tom 5", "15 jerry 3", "16 kitty 6")).map(_.split(" "))//通過StrutType直接指定每個字段的schemaval schema = StructType(List(StructField("id",IntegerType,true),StructField("name",StringType,true),StructField("age",IntegerType,true)))//將RDD映射到rowRDDval rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))//將schema信息應用到rowRDD上val personDataFrame = sqlContext.createDataFrame(rowRDD,schema)//創(chuàng)建Properties存儲數(shù)據(jù)庫相關屬性val prop = new Properties()prop.put("user", "root")prop.put("password", "123456")//將數(shù)據(jù)追加到數(shù)據(jù)庫personDataFrame.write.mode("append").jdbc("jdbc:mysql://hadoop10:3306/bigdata","bigdata.person",prop)//停止SparkContextsc.stop()} }運行結(jié)果:
2.2.2、用maven將程序打包
2.2.3、將Jar包提交到spark集群
bin/spark-submit --class cn.toto.spark.JdbcRDD --master spark://hadoop1:7077 --jars /home/tuzq/software/spark-2.1.1-bin-hadoop2.7/jars/mysql-connector-java-5.1.38.jar --driver-class-path /home/tuzq/software/spark-2.1.1-bin-hadoop2.7/jars/mysql-connector-java-5.1.38.jar /home/tuzq/software/sparkdata/bigdata-1.0-SNAPSHOT.jar 與50位技術(shù)專家面對面20年技術(shù)見證,附贈技術(shù)全景圖總結(jié)
以上是生活随笔為你收集整理的Spark-SQL从MySQL中加载数据以及将数据写入到mysql中(Spark Shell方式,Spark SQL程序)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 马云为什么选择张勇接任 这位牛人的战绩了
- 下一篇: Python3.x的mysqlclien