日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

spark 持久化 mysql_Spark读取数据库(Mysql)的四种方式讲解

發布時間:2025/3/21 数据库 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark 持久化 mysql_Spark读取数据库(Mysql)的四种方式讲解 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目前

一、不指定查詢條件

這個方式鏈接MySql的函數原型是: def jdbc(url: String, table: String, properties: Properties): DataFrame

我們只需要提供Driver的url,需要查詢的表名,以及連接表相關屬性properties。下面是具體例子: val url = "jdbc:mysql://www.iteblog.com:3306/iteblog?user=iteblog&password=iteblog"

val prop = new Properties()

val df = sqlContext.read.jdbc(url, "iteblog", prop )

println(df.count())

println(df.rdd.partitions.size)

我們運行上面的程序,可以看到df.rdd.partitions.size輸出結果是1,這個結果的含義是iteblog表的所有數據都是由RDD的一個分區處理的,所以說,如果你這個表很大,很可能會出現OOM WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 14, spark047219):

java.lang.OutOfMemoryError: GC overhead limit exceeded at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3380)

這種方式在數據量大的時候不建議使用。

如果想及時了解iteblog_hadoop

二、指定數據庫字段的范圍

這種方式就是通過指定數據庫中某個字段的范圍,但是遺憾的是,這個字段必須是數字,來看看這個函數的函數原型: def jdbc(

url: String,

table: String,

columnName: String,

lowerBound: Long,

upperBound: Long,

numPartitions: Int,

connectionProperties: Properties): DataFrame

前兩個字段的含義和方法一類似。columnName就是需要分區的字段,這個字段在數據庫中的類型必須是數字;lowerBound就是分區的下界;upperBound就是分區的上界;numPartitions是分區的個數。同樣,我們也來看看如何使用: val lowerBound = 1

val upperBound = 100000

val numPartitions = 5

val url = "jdbc:mysql://www.iteblog.com:3306/iteblog?user=iteblog&password=iteblog"

val prop = new Properties()

val df = sqlContext.read.jdbc(url, "iteblog", "id", lowerBound, upperBound, numPartitions, prop)

這個方法可以將iteblog表的數據分布到RDD的幾個分區中,分區的數量由numPartitions參數決定,在理想情況下,每個分區處理相同數量的數據,我們在使用的時候不建議將這個值設置的比較大,因為這可能導致數據庫掛掉!但是根據前面介紹,這個函數的缺點就是只能使用整形數據字段作為分區關鍵字。

這個函數在極端情況下,也就是設置將numPartitions設置為1,其含義和第一種方式一致。

三、根據任意字段進行分區

基于前面兩種方法的限制,Spark還提供了根據任意字段進行分區的方法,函數原型如下: def jdbc(

url: String,

table: String,

predicates: Array[String],

connectionProperties: Properties): DataFrame

這個函數相比第一種方式多了predicates參數,我們可以通過這個參數設置分區的依據,來看看例子: val predicates = Array[String]("reportDate <= '2014-12-31'",

"reportDate > '2014-12-31' and reportDate <= '2015-12-31'")

val url = "jdbc:mysql://www.iteblog.com:3306/iteblog?user=iteblog&password=iteblog"

val prop = new Properties()

val df = sqlContext.read.jdbc(url, "iteblog", predicates, prop)

最后rdd的分區數量就等于predicates.length。

四、通過load獲取

Spark還提供通過load的方式來讀取數據。 sqlContext.read.format("jdbc").options(

Map("url" -> "jdbc:mysql://www.iteblog.com:3306/iteblog?user=iteblog&password=iteblog",

"dbtable" -> "iteblog")).load()

options函數支持url、driver、dbtable、partitionColumn、lowerBound、upperBound以及numPartitions選項,細心的同學肯定發現這個和方法二的參數一致。是的,其內部實現原理部分和方法二大體一致。同時load方法還支持json、orc等數據源的讀取。

總結

以上是生活随笔為你收集整理的spark 持久化 mysql_Spark读取数据库(Mysql)的四种方式讲解的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。