日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark Streaming 实战案例(四) Spark Streaming 缓存、Checkpoint机制

發布時間:2024/1/23 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark Streaming 实战案例(四) Spark Streaming 缓存、Checkpoint机制 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

主要內容

  • Spark Stream 緩存
  • Checkpoint
  • 案例
  • 1. Spark Stream 緩存

    通過前面一系列的課程介紹,我們知道DStream是由一系列的RDD構成的,它同一般的RDD一樣,也可以將流式數據持久化到內容當中,采用的同樣是persisit方法,調用該方法后DStream將持久化所有的RDD數據。這對于一些需要重復計算多次或數據需要反復被使用的DStream特別有效。像reduceByWindow、reduceByKeyAndWindow等基于窗口操作的方法,它們默認都是有persisit操作的。reduceByKeyAndWindow方法源碼具體如下:

    def reduceByKeyAndWindow(reduceFunc: (V, V) => V,invReduceFunc: (V, V) => V,windowDuration: Duration,slideDuration: Duration,partitioner: Partitioner,filterFunc: ((K, V)) => Boolean): DStream[(K, V)] = ssc.withScope {val cleanedReduceFunc = ssc.sc.clean(reduceFunc)val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)val cleanedFilterFunc = if (filterFunc != null) Some(ssc.sc.clean(filterFunc)) else Nonenew ReducedWindowedDStream[K, V](self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc,windowDuration, slideDuration, partitioner)}

    從上面的方法來看,它最返回的是一個ReducedWindowedDStream對象,跳到該類的源碼中可以看到在其主構造函數中包含下面兩段代碼:

    private[streaming] class ReducedWindowedDStream[K: ClassTag, V: ClassTag](parent: DStream[(K, V)],reduceFunc: (V, V) => V,invReduceFunc: (V, V) => V,filterFunc: Option[((K, V)) => Boolean],_windowDuration: Duration,_slideDuration: Duration,partitioner: Partitioner) extends DStream[(K, V)](parent.ssc) {//省略其它非關鍵代碼//默認被緩存到內存當中// Persist RDDs to memory by default as these RDDs are going to be reused.super.persist(StorageLevel.MEMORY_ONLY_SER)reducedStream.persist(StorageLevel.MEMORY_ONLY_SER) }

    通過上面的代碼我們可以看到,通過窗口操作產生的DStream不需要開發人員手動去調用persist方法,Spark會自動幫我們將數據緩存當內存當中。同一般的RDD類似,DStream支持的persisit級別為:

    2. Checkpoint機制

    通過前期對Spark Streaming的理解,我們知道,Spark Streaming應用程序如果不手動停止,則將一直運行下去,在實際中應用程序一般是24小時*7天不間斷運行的,因此Streaming必須對諸如系統錯誤、JVM出錯等與程序邏輯無關的錯誤(failures )具體很強的彈性,具備一定的非應用程序出錯的容錯性。Spark Streaming的Checkpoint機制便是為此設計的,它將足夠多的信息checkpoint到某些具備容錯性的存儲系統如HDFS上,以便出錯時能夠迅速恢復。有兩種數據可以chekpoint:

    (1)Metadata checkpointing
    將流式計算的信息保存到具備容錯性的存儲上如HDFS,Metadata Checkpointing適用于當streaming應用程序Driver所在的節點出錯時能夠恢復,元數據包括:
    Configuration(配置信息) - 創建streaming應用程序的配置信息
    DStream operations - 在streaming應用程序中定義的DStreaming操作
    Incomplete batches - 在列隊中沒有處理完的作業

    (2)Data checkpointing
    將生成的RDD保存到外部可靠的存儲當中,對于一些數據跨度為多個bactch的有狀態tranformation操作來說,checkpoint非常有必要,因為在這些transformation操作生成的RDD對前一RDD有依賴,隨著時間的增加,依賴鏈可能會非常長,checkpoint機制能夠切斷依賴鏈,將中間的RDD周期性地checkpoint到可靠存儲當中,從而在出錯時可以直接從checkpoint點恢復。

    具體來說,metadata checkpointing主要還是從drvier失敗中恢復,而Data Checkpoing用于對有狀態的transformation操作進行checkpointing

    Checkpointing具體的使用方式時通過下列方法:

    //checkpointDirectory為checkpoint文件保存目錄 streamingContext.checkpoint(checkpointDirectory)

    3. 案例

    程序來源:https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
    進行了適量修改

    import java.io.File import java.nio.charset.Charsetimport com.google.common.io.Filesimport org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Time, Seconds, StreamingContext} import org.apache.spark.util.IntParam/*** Counts words in text encoded with UTF8 received from the network every second.** Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory> <output-file>* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive* data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data* <output-file> file to which the word counts will be appended** <checkpoint-directory> and <output-file> must be absolute paths** To run this on your local machine, you need to first run a Netcat server** `$ nc -lk 9999`** and run the example as** `$ ./bin/run-example org.apache.spark.examples.streaming.RecoverableNetworkWordCount \* localhost 9999 ~/checkpoint/ ~/out`** If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create* a new StreamingContext (will print "Creating new context" to the console). Otherwise, if* checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from* the checkpoint data.** Refer to the online documentation for more details.*/ object RecoverableNetworkWordCount {def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String): StreamingContext = {//程序第一運行時會創建該條語句,如果應用程序失敗,則會從checkpoint中恢復,該條語句不會執行println("Creating new context")val outputFile = new File(outputPath)if (outputFile.exists()) outputFile.delete()val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount").setMaster("local[4]")// Create the context with a 1 second batch sizeval ssc = new StreamingContext(sparkConf, Seconds(1))ssc.checkpoint(checkpointDirectory)//將socket作為數據源val lines = ssc.socketTextStream(ip, port)val words = lines.flatMap(_.split(" "))val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {val counts = "Counts at time " + time + " " + rdd.collect().mkString("[", ", ", "]")println(counts)println("Appending to " + outputFile.getAbsolutePath)Files.append(counts + "\n", outputFile, Charset.defaultCharset())})ssc}//將String轉換成Intprivate object IntParam {def unapply(str: String): Option[Int] = {try {Some(str.toInt)} catch {case e: NumberFormatException => None}} }def main(args: Array[String]) {if (args.length != 4) {System.err.println("You arguments were " + args.mkString("[", ", ", "]"))System.err.println("""|Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory>| <output-file>. <hostname> and <port> describe the TCP server that Spark| Streaming would connect to receive data. <checkpoint-directory> directory to| HDFS-compatible file system which checkpoint data <output-file> file to which the| word counts will be appended||In local mode, <master> should be 'local[n]' with n > 1|Both <checkpoint-directory> and <output-file> must be absolute paths""".stripMargin)System.exit(1)}val Array(ip, IntParam(port), checkpointDirectory, outputPath) = args//getOrCreate方法,從checkpoint中重新創建StreamingContext對象或新創建一個StreamingContext對象val ssc = StreamingContext.getOrCreate(checkpointDirectory,() => {createContext(ip, port, outputPath, checkpointDirectory)})ssc.start()ssc.awaitTermination()} }

    輸入參數配置如下:

    運行狀態圖如下:

    首次運行時:

    //創建新的StreamingContext Creating new context 15/11/30 07:20:32 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set. 15/11/30 07:20:33 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes Counts at time 1448896840000 ms [] Appending to /root/out2 15/11/30 07:20:47 WARN BlockManager: Block input-0-1448896847000 replicated to only 0 peer(s) instead of 1 peers Counts at time 1448896850000 ms [(Spark,1), (Context,1)]

    手動將程序停止,然后重新運行

    //這時從checkpoint目錄中讀取元數據信息,進行StreamingContext的恢復 Counts at time 1448897070000 ms [] Appending to /root/out2 Counts at time 1448897080000 ms [] Appending to /root/out2 Counts at time 1448897090000 ms [] Appending to /root/out2 15/11/30 07:24:58 WARN BlockManager: Block input-0-1448897098600 replicated to only 0 peer(s) instead of 1 peers [Stage 8:> (0 + 0) / 4]Counts at time 1448897100000 ms [(Spark,1), (Context,1)] Appending to /root/out2

    總結

    以上是生活随笔為你收集整理的Spark Streaming 实战案例(四) Spark Streaming 缓存、Checkpoint机制的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。