日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark _30_SparkStreaming算子操作Driver HA

發布時間:2024/2/28 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark _30_SparkStreaming算子操作Driver HA 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

SparkStreaming算子操作??


foreachRDD

  • output operation算子,必須對抽取出來的RDD執行action類算子,代碼才能執行。

?

import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Durations, StreamingContext} import org.apache.spark.{SparkConf, SparkContext}/*** SparkStreaming 注意:* 1.需要設置local[2],因為一個線程是讀取數據,一個線程是處理數據* 2.創建StreamingContext兩種方式,如果采用的是StreamingContext(conf,Durations.seconds(5))這種方式,不能在new SparkContext* 3.Durations 批次間隔時間的設置需要根據集群的資源情況以及監控每一個job的執行時間來調節出最佳時間。* 4.SparkStreaming所有業務處理完成之后需要有一個output operato操作* 5.StreamingContext.start()straming框架啟動之后是不能在次添加業務邏輯* 6.StreamingContext.stop()無參的stop方法會將sparkContext一同關閉,stop(false) ,默認為true,會一同關閉* 7.StreamingContext.stop()停止之后是不能在調用start*/ object WordCountFromSocket {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("WordCountOnLine").setMaster("local[2]")val ssc = new StreamingContext(conf,Durations.seconds(5))ssc.sparkContext.setLogLevel("ERROR")//使用new StreamingContext(conf,Durations.seconds(5)) 這種方式默認會創建SparkContext // val sc = new SparkContext(conf)//從ssc中獲取SparkContext() // val context: SparkContext = ssc.sparkContextval lines: ReceiverInputDStream[String] = ssc.socketTextStream("c7node5",9999)val words: DStream[String] = lines.flatMap(line=>{line.split(" ")})val pairWords: DStream[(String, Int)] = words.map(word=>{(word,1)})val result: DStream[(String, Int)] = pairWords.reduceByKey((v1, v2)=>{v1+v2})// result.print()/*** foreachRDD 注意事項:* 1.foreachRDD中可以拿到DStream中的RDD,對RDD進行操作,但是一點要使用RDD的action算子觸發執行,不然DStream的邏輯也不會執行* 2.froeachRDD算子內,拿到的RDD算子操作外,這段代碼是在Driver端執行的,可以利用這點做到動態的改變廣播變量**/result.foreachRDD(wordCountRDD=>{println("******* produce in Driver *******")val sortRDD: RDD[(String, Int)] = wordCountRDD.sortByKey(false)val result: RDD[(String, Int)] = sortRDD.filter(tp => {println("******* produce in Executor *******")true})result.foreach(println)})ssc.start()ssc.awaitTermination()ssc.stop(false)} }

transform

  • transformation類算子
  • 可以通過transform算子,對Dstream做RDD到RDD的任意操作。

?

import org.apache.spark.SparkConf import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Durations, StreamingContext}object TransformBlackList {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("transform")conf.setMaster("local[2]")val ssc = new StreamingContext(conf,Durations.seconds(5)) // ssc.sparkContext.setLogLevel("Error")/*** 廣播黑名單*/val blackList: Broadcast[List[String]] = ssc.sparkContext.broadcast(List[String]("zhangsan","lisi"))/*** 從實時數據【"hello zhangsan","hello lisi"】中發現 數據的第二位是黑名單人員,過濾掉*/val lines: ReceiverInputDStream[String] = ssc.socketTextStream("node5",9999)val pairLines: DStream[(String, String)] = lines.map(line=>{(line.split(" ")(1),line)})/*** transform 算子可以拿到DStream中的RDD,對RDD使用RDD的算子操作,但是最后要返回RDD,返回的RDD又被封裝到一個DStream* transform中拿到的RDD的算子外,代碼是在Driver端執行的。可以做到動態的改變廣播變量*/val resultDStream: DStream[String] = pairLines.transform((pairRDD:RDD[(String,String)]) => {println("++++++ Driver Code +++++++")val filterRDD: RDD[(String, String)] = pairRDD.filter(tp => {val nameList: List[String] = blackList.value!nameList.contains(tp._1)})val returnRDD: RDD[String] = filterRDD.map(tp => tp._2)returnRDD})resultDStream.print()ssc.start()ssc.awaitTermination()ssc.stop()} }

updateStateByKey

  • transformation算子
  • updateStateByKey作用:
  • 為SparkStreaming中每一個Key維護一份state狀態,state類型可以是任意類型的,可以是一個自定義的對象,更新函數也可以是自定義的。
  • 通過更新函數對該key的狀態不斷更新,對于每個新的batch而言,SparkStreaming會在使用updateStateByKey的時候為已經存在的key進行state的狀態更新。
    • 使用到updateStateByKey要開啟checkpoint機制和功能。
    • 多久會將內存中的數據寫入到磁盤一份?

    ? ? ? ? ? ? 如果batchInterval設置的時間小于10秒,那么10秒寫入磁盤一份。如果batchInterval設置的時間大于10秒,那么就會batchInterval時間間隔寫入磁盤一份。

    import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Durations, StreamingContext}/*** UpdateStateByKey 根據key更新狀態* 1、為Spark Streaming中每一個Key維護一份state狀態,state類型可以是任意類型的, 可以是一個自定義的對象,那么更新函數也可以是自定義的。* 2、通過更新函數對該key的狀態不斷更新,對于每個新的batch而言,Spark Streaming會在使用updateStateByKey的時候為已經存在的key進行state的狀態更新*/ object UpdateStateByKey {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local[2]")conf.setAppName("UpdateStateByKey")val ssc = new StreamingContext(conf,Durations.seconds(5))//設置日志級別ssc.sparkContext.setLogLevel("ERROR")val lines: ReceiverInputDStream[String] = ssc.socketTextStream("node5",9999)val words: DStream[String] = lines.flatMap(line=>{line.split(" ")})val pairWords: DStream[(String, Int)] = words.map(word => {(word, 1)})/*** 根據key更新狀態,需要設置 checkpoint來保存狀態* 默認key的狀態在內存中 有一份,在checkpoint目錄中有一份。** 多久會將內存中的數據(每一個key所對應的狀態)寫入到磁盤上一份呢?* 如果你的batchInterval小于10s 那么10s會將內存中的數據寫入到磁盤一份* 如果bacthInterval 大于10s,那么就以bacthInterval為準** 這樣做是為了防止頻繁的寫HDFS*/ // ssc.checkpoint("./data/streamingCheckpoint")ssc.sparkContext.setCheckpointDir("./data/streamingCheckpoint")/*** currentValues :當前批次某個 key 對應所有的value 組成的一個集合* preValue : 以往批次當前key 對應的總狀態值*/val result: DStream[(String, Int)] = pairWords.updateStateByKey((currentValues: Seq[Int], preValue: Option[Int]) => {var totalValues = 0if (!preValue.isEmpty) {totalValues += preValue.get}for(value <- currentValues){totalValues += value}Option(totalValues)})result.print()ssc.start()ssc.awaitTermination()ssc.stop()} }

    ?

    窗口操作

    • 窗口操作理解圖:

    ?

    ?

    假設每隔5s 1個batch,上圖中窗口長度為15s,窗口滑動間隔10s。

    • 窗口長度和滑動間隔必須是batchInterval的整數倍。如果不是整數倍會檢測報錯。
    • 優化后的window窗口操作示意圖:

    ?

    ?

    • 優化后的window操作要保存狀態所以要設置checkpoint路徑,沒有優化的window操作可以不設置checkpoint路徑。
    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Durations, StreamingContext} import org.apache.spark.SparkConf/*** SparkStreaming 窗口操作* reduceByKeyAndWindow* 每隔窗口滑動間隔時間 計算 窗口長度內的數據,按照指定的方式處理*/ object WindowOperator {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("windowOperator")conf.setMaster("local[2]")val ssc = new StreamingContext(conf,Durations.seconds(5))ssc.sparkContext.setLogLevel("Error")val lines: ReceiverInputDStream[String] = ssc.socketTextStream("node5",9999)val words: DStream[String] = lines.flatMap(line=>{line.split(" ")})val pairWords: DStream[(String, Int)] = words.map(word=>{(word,1)})// val ds : DStream[(String,Int)] = pairWords.window(Durations.seconds(15),Durations.seconds(5))/*** 窗口操作普通的機制** 滑動間隔和窗口長度必須是 batchInterval 整數倍*/val windowResult: DStream[(String, Int)] =pairWords.reduceByKeyAndWindow((v1:Int, v2:Int)=>{v1+v2},Durations.seconds(15),Durations.seconds(5))/*** 窗口操作優化的機制*/ // ssc.checkpoint("./data/streamingCheckpoint") // val windowResult: DStream[(String, Int)] = pairWords.reduceByKeyAndWindow( // (v1:Int, v2:Int)=>{v1+v2}, // (v1:Int, v2:Int)=>{v1-v2}, // Durations.seconds(15), // Durations.seconds(5))windowResult.print()ssc.start()ssc.awaitTermination()ssc.stop()} }

    ?

    Driver HA(Standalone或者Mesos)

    因為SparkStreaming是7*24小時運行,Driver只是一個簡單的進程,有可能掛掉,所以實現Driver的HA就有必要(如果使用的Client模式就無法實現Driver HA ,這里針對的是cluster模式)。Yarn平臺cluster模式提交任務,AM(AplicationMaster)相當于Driver,如果掛掉會自動啟動AM。這里所說的DriverHA針對的是Spark standalone和Mesos資源調度的情況下。實現Driver的高可用有兩個步驟:

    第一:提交任務層面,在提交任務的時候加上選項 --supervise,當Driver掛掉的時候會自動重啟Driver。

    第二:代碼層面,使用JavaStreamingContext.getOrCreate(checkpoint路徑,JavaStreamingContextFactory)

    • Driver中元數據包括:
  • 創建應用程序的配置信息。
  • DStream的操作邏輯。
  • job中沒有完成的批次數據,也就是job的執行進度。
  • ?

    import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Durations, StreamingContext}/*** Driver HA :* 1.在提交application的時候 添加 --supervise 選項 如果Driver掛掉 會自動啟動一個Driver* 2.代碼層面恢復Driver(StreamingContext)**/ object SparkStreamingDriverHA {//設置checkpoint目錄val ckDir = "./data/streamingCheckpoint"def main(args: Array[String]): Unit = {/*** StreamingContext.getOrCreate(ckDir,CreateStreamingContext)* 這個方法首先會從ckDir目錄中獲取StreamingContext【 因為StreamingContext是序列化存儲在Checkpoint目錄中,恢復時會嘗試反序列化這些objects。* 如果用修改過的class可能會導致錯誤,此時需要更換checkpoint目錄或者刪除checkpoint目錄中的數據,程序才能起來?!?* 若能獲取回來StreamingContext,就不會執行CreateStreamingContext這個方法創建,否則就會創建**/val ssc: StreamingContext = StreamingContext.getOrCreate(ckDir,CreateStreamingContext)ssc.start()ssc.awaitTermination()ssc.stop()}def CreateStreamingContext() = {println("=======Create new StreamingContext =======")val conf = new SparkConf()conf.setMaster("local")conf.setAppName("DriverHA")val ssc: StreamingContext = new StreamingContext(conf,Durations.seconds(5))ssc.sparkContext.setLogLevel("Error")/*** 默認checkpoint 存儲:* 1.配置信息* 2.DStream操作邏輯* 3.job的執行進度* * 4.offset*/ssc.checkpoint(ckDir)val lines: DStream[String] = ssc.textFileStream("./data/streamingCopyFile")val words: DStream[String] = lines.flatMap(line=>{line.trim.split(" ")})val pairWords: DStream[(String, Int)] = words.map(word=>{(word,1)})val result: DStream[(String, Int)] = pairWords.reduceByKey((v1:Int, v2:Int)=>{v1+v2})// result.print()/*** 更改邏輯*/result.foreachRDD(pairRDD=>{pairRDD.filter(one=>{println("*********** filter *********")true})pairRDD.foreach(println)})ssc} }

    小練習:

    import java.io.{File, FileInputStream, FileOutputStream} import java.util.UUID/*** 將項目中的 ./data/copyFileWord 文件 每隔5s 復制到 ./data/streamingCopyFile 路徑下*/object CopyFileToDirectory {def main(args: Array[String]): Unit = {while(true){Thread.sleep(5000);val uuid = UUID.randomUUID().toString();println(uuid);copyFile(new File("./data/copyFileWord"),new File(".\\data\\streamingCopyFile\\"+uuid+"----words.txt"));}}/*** 復制文件到文件夾目錄下*/def copyFile(fromFile: File, toFile: File): Unit ={val ins = new FileInputStream(fromFile);val out = new FileOutputStream(toFile);val buffer = new Array[Byte](1024*1024)var size = 0while (size != -1) {out.write(buffer, 0, buffer.length);size = ins.read(buffer)}ins.close();out.close();}}

    ?

    import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Durations, StreamingContext} /*** sparkStreamig監控某個文件夾時,不需要設置local[2],沒有采用 receiver 接收器的模式讀取數據* 以下是監控 ./data/streamingCopyFile 目錄下的文件** SpakrStreaming 監控某個目錄下的文件,這個文件必須是原子性的在目錄中產生,已經存在的文件后面追加數據不能被監控到,被刪除的文件也不能被監控到*/ object SparkSteamingMonitorDirectory {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local")conf.setAppName("monitorFile")val ssc = new StreamingContext(conf,Durations.seconds(10))ssc.sparkContext.setLogLevel("Error")val lines: DStream[String] = ssc.textFileStream("./data/streamingCopyFile")val words: DStream[String] = lines.flatMap(line=>{line.trim.split(" ")})val pairWords: DStream[(String, Int)] = words.map(word=>{(word.trim,1)})val result: DStream[(String, Int)] = pairWords.reduceByKey((v1:Int, v2:Int)=>{v1+v2})result.print()ssc.start()ssc.awaitTermination()ssc.stop()} }

    ?

    總結

    以上是生活随笔為你收集整理的Spark _30_SparkStreaming算子操作Driver HA的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。