日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

谈一谈RDD 持久化的三个算子:cache、persist、checkpoint

發布時間:2024/2/28 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 谈一谈RDD 持久化的三个算子:cache、persist、checkpoint 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

這段偽代碼的瑕疵:

lines = sc.textFile(“hdfs://...”) errors = lines.filter(_.startsWith(“ERROR”)) mysql_errors = errors.filter(_.contain(“MySQL”)).count http_errors = errors.filter(_.contain(“Http”)).count

errors 是一個 RDD,mysql_errors 這個 RDD 執行時,會先讀文件,然后獲取數據,通過計算 errors,把數據傳給 mysql_errors,再進行計算,因為 RDD 中是不存儲數據的,所以 http_errors 計算的時候會重新讀數據,計算 errors 后把數據傳給 http_errors 進行計算,重復使用 errors 這個 RDD 很有必須,這就需要把 errors 這個 RDD 持久化,以便其他 RDD使用。

RDD 持久化有三個算子:cache、persist、checkpoint

  • cache:把 RDD 持久化到內存

使用方法:

var rdd = sc.textFile("test") rdd = rdd.cache() val count = rdd.count() //或者其他操作
  • persist:可以選擇多種持久化方式

使用方法:

var rdd = sc.textFile("test") rdd = rdd.persist(StorageLevel.MEMORY_ONLY) val count = rdd.count() //或者其他操作


Persist StorageLevel 說明:

class StorageLevel private( private var _useDisk: Boolean, private var _useMemory: Boolean, private var _useOffHeap: Boolean, private var _deserialized: Boolean, private var _replication: Int = 1)


初始化 StorageLevel 可以傳入 5 個參數,分別對應是否存入磁盤、是否存入內存、是否使用堆外內存、是否不進行序列化,副本數(默認為 1)

使用不同參數的組合構造的實例被預先定義為一些值,比如 MEMORY_ONLY 代表著不存入磁盤,存入內存,不使用堆外內存,不進行序列化,副本數為 1,使用 persisit()方法時把這些持久化的級別作為參數傳入即可,cache()與 persist( StorageLevel. MEMORY_ONLY)是等價的。

cache 和 persist 的注意事項


1. ?cache 和 persist 是懶執行算子,需要有一個 action 類的算子觸發執行
2. ?cache 和 persist 算子的返回執行必須賦值給一個變量,在接下來的 job 中直接使用這
個變量,那么就是使用了持久化的數據了,如果 application 中只有一個 job,沒有必要
使用 RDD 持久化
3. ?cache 和 persist 算子后不能立即緊跟 action 類算子,比如 count 算子,但是在下一行
可以有 action 類算子
error : cache().count()
right : rdd = rdd.cache() rdd.count()
4. ?cache() = persist(StorageLevel.MEMORY_ONLY)

  • checkpoint : 可以把 RDD 持久化到 HDFS,同時切斷 RDD 之間的依賴

使用方法:

sc.setCheckpointDir("hdfs://...") var rdd = sc.textFile("test") rdd.checkpoint() val count = rdd.count() //或者其他操作

對于切斷 RDD 之間的依賴的說明:
當業務邏輯很復雜時,RDD 之間頻繁轉換,RDD 的血統很長,如果中間某個 RDD 的數據丟失,還需要重新從頭計算,如果對中間某個 RDD 調用了 checkpoint()方法,把這個RDD 上傳到 HDFS,同時讓后面的 RDD 不再依賴于這個 RDD,而是依賴于 HDFS 上的數據,那么下次計算會方便很多。


checkpoint()執行原理:
1. ?當 RDD 的 job 執行完畢后,會從 finalRDD 從后往前回溯
2. ?當回溯到調用了 checkpoint()方法的 RDD 后,會給這個 RDD 做一個標記
3. ?Spark 框架自動啟動一個新的 job,計算這個 RDD 的數據,然后把數據持久化到 HDFS上
4. ?優化:對某個 RDD 執行 checkpoint()之前,對該 RDD 執行 cache(),這樣的話,新啟動的 job 只需要把內存中的數據上傳到 HDFS 中即可,不需要重新計算

總結

以上是生活随笔為你收集整理的谈一谈RDD 持久化的三个算子:cache、persist、checkpoint的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。