Spark Streaming 实战案例(四) Spark Streaming 缓存、Checkpoint机制
主要內容
1. Spark Stream 緩存
通過前面一系列的課程介紹,我們知道DStream是由一系列的RDD構成的,它同一般的RDD一樣,也可以將流式數據持久化到內容當中,采用的同樣是persisit方法,調用該方法后DStream將持久化所有的RDD數據。這對于一些需要重復計算多次或數據需要反復被使用的DStream特別有效。像reduceByWindow、reduceByKeyAndWindow等基于窗口操作的方法,它們默認都是有persisit操作的。reduceByKeyAndWindow方法源碼具體如下:
def reduceByKeyAndWindow(reduceFunc: (V, V) => V,invReduceFunc: (V, V) => V,windowDuration: Duration,slideDuration: Duration,partitioner: Partitioner,filterFunc: ((K, V)) => Boolean): DStream[(K, V)] = ssc.withScope {val cleanedReduceFunc = ssc.sc.clean(reduceFunc)val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)val cleanedFilterFunc = if (filterFunc != null) Some(ssc.sc.clean(filterFunc)) else Nonenew ReducedWindowedDStream[K, V](self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc,windowDuration, slideDuration, partitioner)}從上面的方法來看,它最返回的是一個ReducedWindowedDStream對象,跳到該類的源碼中可以看到在其主構造函數中包含下面兩段代碼:
private[streaming] class ReducedWindowedDStream[K: ClassTag, V: ClassTag](parent: DStream[(K, V)],reduceFunc: (V, V) => V,invReduceFunc: (V, V) => V,filterFunc: Option[((K, V)) => Boolean],_windowDuration: Duration,_slideDuration: Duration,partitioner: Partitioner) extends DStream[(K, V)](parent.ssc) {//省略其它非關鍵代碼//默認被緩存到內存當中// Persist RDDs to memory by default as these RDDs are going to be reused.super.persist(StorageLevel.MEMORY_ONLY_SER)reducedStream.persist(StorageLevel.MEMORY_ONLY_SER) }通過上面的代碼我們可以看到,通過窗口操作產生的DStream不需要開發人員手動去調用persist方法,Spark會自動幫我們將數據緩存當內存當中。同一般的RDD類似,DStream支持的persisit級別為:
2. Checkpoint機制
通過前期對Spark Streaming的理解,我們知道,Spark Streaming應用程序如果不手動停止,則將一直運行下去,在實際中應用程序一般是24小時*7天不間斷運行的,因此Streaming必須對諸如系統錯誤、JVM出錯等與程序邏輯無關的錯誤(failures )具體很強的彈性,具備一定的非應用程序出錯的容錯性。Spark Streaming的Checkpoint機制便是為此設計的,它將足夠多的信息checkpoint到某些具備容錯性的存儲系統如HDFS上,以便出錯時能夠迅速恢復。有兩種數據可以chekpoint:
(1)Metadata checkpointing
將流式計算的信息保存到具備容錯性的存儲上如HDFS,Metadata Checkpointing適用于當streaming應用程序Driver所在的節點出錯時能夠恢復,元數據包括:
Configuration(配置信息) - 創建streaming應用程序的配置信息
DStream operations - 在streaming應用程序中定義的DStreaming操作
Incomplete batches - 在列隊中沒有處理完的作業
(2)Data checkpointing
將生成的RDD保存到外部可靠的存儲當中,對于一些數據跨度為多個bactch的有狀態tranformation操作來說,checkpoint非常有必要,因為在這些transformation操作生成的RDD對前一RDD有依賴,隨著時間的增加,依賴鏈可能會非常長,checkpoint機制能夠切斷依賴鏈,將中間的RDD周期性地checkpoint到可靠存儲當中,從而在出錯時可以直接從checkpoint點恢復。
具體來說,metadata checkpointing主要還是從drvier失敗中恢復,而Data Checkpoing用于對有狀態的transformation操作進行checkpointing
Checkpointing具體的使用方式時通過下列方法:
//checkpointDirectory為checkpoint文件保存目錄 streamingContext.checkpoint(checkpointDirectory)3. 案例
程序來源:https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
進行了適量修改
輸入參數配置如下:
運行狀態圖如下:
首次運行時:
//創建新的StreamingContext Creating new context 15/11/30 07:20:32 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set. 15/11/30 07:20:33 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes Counts at time 1448896840000 ms [] Appending to /root/out2 15/11/30 07:20:47 WARN BlockManager: Block input-0-1448896847000 replicated to only 0 peer(s) instead of 1 peers Counts at time 1448896850000 ms [(Spark,1), (Context,1)]手動將程序停止,然后重新運行
//這時從checkpoint目錄中讀取元數據信息,進行StreamingContext的恢復 Counts at time 1448897070000 ms [] Appending to /root/out2 Counts at time 1448897080000 ms [] Appending to /root/out2 Counts at time 1448897090000 ms [] Appending to /root/out2 15/11/30 07:24:58 WARN BlockManager: Block input-0-1448897098600 replicated to only 0 peer(s) instead of 1 peers [Stage 8:> (0 + 0) / 4]Counts at time 1448897100000 ms [(Spark,1), (Context,1)] Appending to /root/out2總結
以上是生活随笔為你收集整理的Spark Streaming 实战案例(四) Spark Streaming 缓存、Checkpoint机制的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: geometry-api-java 学习
- 下一篇: Spark Streaming 实战案例