2021年大数据Spark(十七):Spark Core的RDD持久化
目錄
RDD 持久化
引入
API
緩存/持久化函數(shù)
緩存/持久化級別
釋放緩存/持久化
代碼演示
總結(jié):何時(shí)使用緩存/持久化
RDD 持久化
引入
在實(shí)際開發(fā)中某些RDD的計(jì)算或轉(zhuǎn)換可能會(huì)比較耗費(fèi)時(shí)間,如果這些RDD后續(xù)還會(huì)頻繁的被使用到,那么可以將這些RDD進(jìn)行持久化/緩存,這樣下次再使用到的時(shí)候就不用再重新計(jì)算了,提高了程序運(yùn)行的效率。
?
?
API
緩存/持久化函數(shù)
可以將RDD數(shù)據(jù)直接緩存到內(nèi)存中,函數(shù)聲明如下:
?
但是實(shí)際項(xiàng)目中,不會(huì)直接使用上述的緩存函數(shù),RDD數(shù)據(jù)量往往很多,內(nèi)存放不下的。在實(shí)際的項(xiàng)目中緩存RDD數(shù)據(jù)時(shí),往往使用如下函數(shù),依據(jù)具體的業(yè)務(wù)和數(shù)據(jù)量,指定緩存的級別
?
緩存/持久化級別
在Spark框架中對數(shù)據(jù)緩存可以指定不同的級別,對于開發(fā)來說至關(guān)重要,如下所示:
?
| 持久化級別 | 說明 |
| MEMORY_ONLY(默認(rèn)) | 將RDD以非序列化的Java對象存儲(chǔ)在JVM中。 如果沒有足夠的內(nèi)存存儲(chǔ)RDD,則某些分區(qū)將不會(huì)被緩存,每次需要時(shí)都會(huì)重新計(jì)算。 這是默認(rèn)級別。 |
| MEMORY_AND_DISK (開發(fā)中可以使用這個(gè)) | 將RDD以非序列化的Java對象存儲(chǔ)在JVM中。如果數(shù)據(jù)在內(nèi)存中放不下,則溢寫到磁盤上.需要時(shí)則會(huì)從磁盤上讀取 |
| MEMORY_ONLY_SER (Java and Scala) | 將RDD以序列化的Java對象(每個(gè)分區(qū)一個(gè)字節(jié)數(shù)組)的方式存儲(chǔ).這通常比非序列化對象(deserialized objects)更具空間效率,特別是在使用快速序列化的情況下,但是這種方式讀取數(shù)據(jù)會(huì)消耗更多的CPU。 |
| MEMORY_AND_DISK_SER (Java and Scala) | 與MEMORY_ONLY_SER類似,但如果數(shù)據(jù)在內(nèi)存中放不下,則溢寫到磁盤上,而不是每次需要重新計(jì)算它們。 |
| DISK_ONLY | 將RDD分區(qū)存儲(chǔ)在磁盤上。 |
| MEMORY_ONLY_2, MEMORY_AND_DISK_2等 | 與上面的儲(chǔ)存級別相同,只不過將持久化數(shù)據(jù)存為兩份,備份每個(gè)分區(qū)存儲(chǔ)在兩個(gè)集群節(jié)點(diǎn)上。 |
| OFF_HEAP(實(shí)驗(yàn)中) | 與MEMORY_ONLY_SER類似,但將數(shù)據(jù)存儲(chǔ)在堆外內(nèi)存中。 (即不是直接存儲(chǔ)在JVM內(nèi)存中) 如:Tachyon-分布式內(nèi)存存儲(chǔ)系統(tǒng)、Alluxio - Open Source Memory Speed Virtual Distributed Storage |
實(shí)際項(xiàng)目中緩存數(shù)據(jù)時(shí),往往選擇MEMORY_AND_DISK
緩存函數(shù)與Transformation函數(shù)一樣,都是Lazy操作,需要Action函數(shù)觸發(fā),通常使用count函數(shù)觸發(fā)。
?
釋放緩存/持久化
當(dāng)緩存的RDD數(shù)據(jù),不再被使用時(shí),考慮釋資源,使用如下函數(shù):
此函數(shù)屬于eager,立即執(zhí)行。
?
代碼演示
package cn.itcast.coreimport org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}/*** RDD中緩存函數(shù),將數(shù)據(jù)緩存到內(nèi)存或磁盤、釋放緩存*/
object SparkCacheTest {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// 讀取文本文件數(shù)據(jù)val inputRDD: RDD[String] = sc.textFile("data/input/words.txt", minPartitions = 2)// 緩存數(shù)據(jù)inputRDD.persist(StorageLevel.MEMORY_AND_DISK)// 使用Action函數(shù)觸發(fā)緩存println(s"Count = ${inputRDD.count()}")println(s"Count = ${inputRDD.count()}")// 釋放緩存inputRDD.unpersist()// 應(yīng)用程序運(yùn)行結(jié)束,關(guān)閉資源sc.stop()}
}
或使用spark-shell演示
// 啟動(dòng)集群和spark-shell/export/servers/spark/sbin/start-all.sh// 將一個(gè)RDD持久化,后續(xù)操作該RDD就可以直接從緩存中拿val rdd1 = sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")val rdd2 = rdd1.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_)//WordCountrdd2.cache //緩存/持久化rdd2.sortBy(_._2,false).collect//觸發(fā)action,會(huì)去讀取HDFS的文件,rdd2會(huì)真正執(zhí)行緩存/持久化rdd2.sortBy(_._2,false).collect//觸發(fā)action,會(huì)去讀緩存中的數(shù)據(jù),執(zhí)行速度會(huì)比之前快,因?yàn)閞dd2已經(jīng)持久化到內(nèi)存中了
?
總結(jié):何時(shí)使用緩存/持久化
在實(shí)際項(xiàng)目開發(fā)中,什么時(shí)候緩存RDD數(shù)據(jù),最好呢???
?第一點(diǎn):當(dāng)某個(gè)RDD被使用多次的時(shí)候,建議緩存此RDD數(shù)據(jù)
比如,從HDFS上讀取網(wǎng)站行為日志數(shù)據(jù),進(jìn)行多維度的分析,最好緩存數(shù)據(jù)
第二點(diǎn):當(dāng)某個(gè)RDD來之不易,并且使用不止一次,建議緩存此RDD數(shù)據(jù)
比如,從HBase表中讀取歷史訂單數(shù)據(jù),與從MySQL表中商品和用戶維度信息數(shù)據(jù),進(jìn)行關(guān)聯(lián)Join等聚合操作,獲取RDD:etlRDD,后續(xù)的報(bào)表分析使用此RDD,此時(shí)建議緩存RDD數(shù)據(jù)
案例: etlRDD.persist(StoageLeval.MEMORY_AND_DISK_2)
總結(jié)
以上是生活随笔為你收集整理的2021年大数据Spark(十七):Spark Core的RDD持久化的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Spark(十六):Sp
- 下一篇: 2021年大数据Spark(十八):Sp