日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark _30_SparkStreaming算子操作Driver HA

發(fā)布時(shí)間:2024/2/28 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark _30_SparkStreaming算子操作Driver HA 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

SparkStreaming算子操作??


foreachRDD

  • output operation算子,必須對(duì)抽取出來的RDD執(zhí)行action類算子,代碼才能執(zhí)行。

?

import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Durations, StreamingContext} import org.apache.spark.{SparkConf, SparkContext}/*** SparkStreaming 注意:* 1.需要設(shè)置local[2],因?yàn)橐粋€(gè)線程是讀取數(shù)據(jù),一個(gè)線程是處理數(shù)據(jù)* 2.創(chuàng)建StreamingContext兩種方式,如果采用的是StreamingContext(conf,Durations.seconds(5))這種方式,不能在new SparkContext* 3.Durations 批次間隔時(shí)間的設(shè)置需要根據(jù)集群的資源情況以及監(jiān)控每一個(gè)job的執(zhí)行時(shí)間來調(diào)節(jié)出最佳時(shí)間。* 4.SparkStreaming所有業(yè)務(wù)處理完成之后需要有一個(gè)output operato操作* 5.StreamingContext.start()straming框架啟動(dòng)之后是不能在次添加業(yè)務(wù)邏輯* 6.StreamingContext.stop()無參的stop方法會(huì)將sparkContext一同關(guān)閉,stop(false) ,默認(rèn)為true,會(huì)一同關(guān)閉* 7.StreamingContext.stop()停止之后是不能在調(diào)用start*/ object WordCountFromSocket {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("WordCountOnLine").setMaster("local[2]")val ssc = new StreamingContext(conf,Durations.seconds(5))ssc.sparkContext.setLogLevel("ERROR")//使用new StreamingContext(conf,Durations.seconds(5)) 這種方式默認(rèn)會(huì)創(chuàng)建SparkContext // val sc = new SparkContext(conf)//從ssc中獲取SparkContext() // val context: SparkContext = ssc.sparkContextval lines: ReceiverInputDStream[String] = ssc.socketTextStream("c7node5",9999)val words: DStream[String] = lines.flatMap(line=>{line.split(" ")})val pairWords: DStream[(String, Int)] = words.map(word=>{(word,1)})val result: DStream[(String, Int)] = pairWords.reduceByKey((v1, v2)=>{v1+v2})// result.print()/*** foreachRDD 注意事項(xiàng):* 1.foreachRDD中可以拿到DStream中的RDD,對(duì)RDD進(jìn)行操作,但是一點(diǎn)要使用RDD的action算子觸發(fā)執(zhí)行,不然DStream的邏輯也不會(huì)執(zhí)行* 2.froeachRDD算子內(nèi),拿到的RDD算子操作外,這段代碼是在Driver端執(zhí)行的,可以利用這點(diǎn)做到動(dòng)態(tài)的改變廣播變量**/result.foreachRDD(wordCountRDD=>{println("******* produce in Driver *******")val sortRDD: RDD[(String, Int)] = wordCountRDD.sortByKey(false)val result: RDD[(String, Int)] = sortRDD.filter(tp => {println("******* produce in Executor *******")true})result.foreach(println)})ssc.start()ssc.awaitTermination()ssc.stop(false)} }

transform

  • transformation類算子
  • 可以通過transform算子,對(duì)Dstream做RDD到RDD的任意操作。

?

import org.apache.spark.SparkConf import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Durations, StreamingContext}object TransformBlackList {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("transform")conf.setMaster("local[2]")val ssc = new StreamingContext(conf,Durations.seconds(5)) // ssc.sparkContext.setLogLevel("Error")/*** 廣播黑名單*/val blackList: Broadcast[List[String]] = ssc.sparkContext.broadcast(List[String]("zhangsan","lisi"))/*** 從實(shí)時(shí)數(shù)據(jù)【"hello zhangsan","hello lisi"】中發(fā)現(xiàn) 數(shù)據(jù)的第二位是黑名單人員,過濾掉*/val lines: ReceiverInputDStream[String] = ssc.socketTextStream("node5",9999)val pairLines: DStream[(String, String)] = lines.map(line=>{(line.split(" ")(1),line)})/*** transform 算子可以拿到DStream中的RDD,對(duì)RDD使用RDD的算子操作,但是最后要返回RDD,返回的RDD又被封裝到一個(gè)DStream* transform中拿到的RDD的算子外,代碼是在Driver端執(zhí)行的。可以做到動(dòng)態(tài)的改變廣播變量*/val resultDStream: DStream[String] = pairLines.transform((pairRDD:RDD[(String,String)]) => {println("++++++ Driver Code +++++++")val filterRDD: RDD[(String, String)] = pairRDD.filter(tp => {val nameList: List[String] = blackList.value!nameList.contains(tp._1)})val returnRDD: RDD[String] = filterRDD.map(tp => tp._2)returnRDD})resultDStream.print()ssc.start()ssc.awaitTermination()ssc.stop()} }

updateStateByKey

  • transformation算子
  • updateStateByKey作用:
  • 為SparkStreaming中每一個(gè)Key維護(hù)一份state狀態(tài),state類型可以是任意類型的,可以是一個(gè)自定義的對(duì)象,更新函數(shù)也可以是自定義的。
  • 通過更新函數(shù)對(duì)該key的狀態(tài)不斷更新,對(duì)于每個(gè)新的batch而言,SparkStreaming會(huì)在使用updateStateByKey的時(shí)候?yàn)橐呀?jīng)存在的key進(jìn)行state的狀態(tài)更新。
    • 使用到updateStateByKey要開啟checkpoint機(jī)制和功能。
    • 多久會(huì)將內(nèi)存中的數(shù)據(jù)寫入到磁盤一份?

    ? ? ? ? ? ? 如果batchInterval設(shè)置的時(shí)間小于10秒,那么10秒寫入磁盤一份。如果batchInterval設(shè)置的時(shí)間大于10秒,那么就會(huì)batchInterval時(shí)間間隔寫入磁盤一份。

    import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Durations, StreamingContext}/*** UpdateStateByKey 根據(jù)key更新狀態(tài)* 1、為Spark Streaming中每一個(gè)Key維護(hù)一份state狀態(tài),state類型可以是任意類型的, 可以是一個(gè)自定義的對(duì)象,那么更新函數(shù)也可以是自定義的。* 2、通過更新函數(shù)對(duì)該key的狀態(tài)不斷更新,對(duì)于每個(gè)新的batch而言,Spark Streaming會(huì)在使用updateStateByKey的時(shí)候?yàn)橐呀?jīng)存在的key進(jìn)行state的狀態(tài)更新*/ object UpdateStateByKey {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local[2]")conf.setAppName("UpdateStateByKey")val ssc = new StreamingContext(conf,Durations.seconds(5))//設(shè)置日志級(jí)別ssc.sparkContext.setLogLevel("ERROR")val lines: ReceiverInputDStream[String] = ssc.socketTextStream("node5",9999)val words: DStream[String] = lines.flatMap(line=>{line.split(" ")})val pairWords: DStream[(String, Int)] = words.map(word => {(word, 1)})/*** 根據(jù)key更新狀態(tài),需要設(shè)置 checkpoint來保存狀態(tài)* 默認(rèn)key的狀態(tài)在內(nèi)存中 有一份,在checkpoint目錄中有一份。** 多久會(huì)將內(nèi)存中的數(shù)據(jù)(每一個(gè)key所對(duì)應(yīng)的狀態(tài))寫入到磁盤上一份呢?* 如果你的batchInterval小于10s 那么10s會(huì)將內(nèi)存中的數(shù)據(jù)寫入到磁盤一份* 如果bacthInterval 大于10s,那么就以bacthInterval為準(zhǔn)** 這樣做是為了防止頻繁的寫HDFS*/ // ssc.checkpoint("./data/streamingCheckpoint")ssc.sparkContext.setCheckpointDir("./data/streamingCheckpoint")/*** currentValues :當(dāng)前批次某個(gè) key 對(duì)應(yīng)所有的value 組成的一個(gè)集合* preValue : 以往批次當(dāng)前key 對(duì)應(yīng)的總狀態(tài)值*/val result: DStream[(String, Int)] = pairWords.updateStateByKey((currentValues: Seq[Int], preValue: Option[Int]) => {var totalValues = 0if (!preValue.isEmpty) {totalValues += preValue.get}for(value <- currentValues){totalValues += value}Option(totalValues)})result.print()ssc.start()ssc.awaitTermination()ssc.stop()} }

    ?

    窗口操作

    • 窗口操作理解圖:

    ?

    ?

    假設(shè)每隔5s 1個(gè)batch,上圖中窗口長(zhǎng)度為15s,窗口滑動(dòng)間隔10s。

    • 窗口長(zhǎng)度和滑動(dòng)間隔必須是batchInterval的整數(shù)倍。如果不是整數(shù)倍會(huì)檢測(cè)報(bào)錯(cuò)。
    • 優(yōu)化后的window窗口操作示意圖:

    ?

    ?

    • 優(yōu)化后的window操作要保存狀態(tài)所以要設(shè)置checkpoint路徑,沒有優(yōu)化的window操作可以不設(shè)置checkpoint路徑。
    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Durations, StreamingContext} import org.apache.spark.SparkConf/*** SparkStreaming 窗口操作* reduceByKeyAndWindow* 每隔窗口滑動(dòng)間隔時(shí)間 計(jì)算 窗口長(zhǎng)度內(nèi)的數(shù)據(jù),按照指定的方式處理*/ object WindowOperator {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("windowOperator")conf.setMaster("local[2]")val ssc = new StreamingContext(conf,Durations.seconds(5))ssc.sparkContext.setLogLevel("Error")val lines: ReceiverInputDStream[String] = ssc.socketTextStream("node5",9999)val words: DStream[String] = lines.flatMap(line=>{line.split(" ")})val pairWords: DStream[(String, Int)] = words.map(word=>{(word,1)})// val ds : DStream[(String,Int)] = pairWords.window(Durations.seconds(15),Durations.seconds(5))/*** 窗口操作普通的機(jī)制** 滑動(dòng)間隔和窗口長(zhǎng)度必須是 batchInterval 整數(shù)倍*/val windowResult: DStream[(String, Int)] =pairWords.reduceByKeyAndWindow((v1:Int, v2:Int)=>{v1+v2},Durations.seconds(15),Durations.seconds(5))/*** 窗口操作優(yōu)化的機(jī)制*/ // ssc.checkpoint("./data/streamingCheckpoint") // val windowResult: DStream[(String, Int)] = pairWords.reduceByKeyAndWindow( // (v1:Int, v2:Int)=>{v1+v2}, // (v1:Int, v2:Int)=>{v1-v2}, // Durations.seconds(15), // Durations.seconds(5))windowResult.print()ssc.start()ssc.awaitTermination()ssc.stop()} }

    ?

    Driver HA(Standalone或者M(jìn)esos)

    因?yàn)镾parkStreaming是7*24小時(shí)運(yùn)行,Driver只是一個(gè)簡(jiǎn)單的進(jìn)程,有可能掛掉,所以實(shí)現(xiàn)Driver的HA就有必要(如果使用的Client模式就無法實(shí)現(xiàn)Driver HA ,這里針對(duì)的是cluster模式)。Yarn平臺(tái)cluster模式提交任務(wù),AM(AplicationMaster)相當(dāng)于Driver,如果掛掉會(huì)自動(dòng)啟動(dòng)AM。這里所說的DriverHA針對(duì)的是Spark standalone和Mesos資源調(diào)度的情況下。實(shí)現(xiàn)Driver的高可用有兩個(gè)步驟:

    第一:提交任務(wù)層面,在提交任務(wù)的時(shí)候加上選項(xiàng) --supervise,當(dāng)Driver掛掉的時(shí)候會(huì)自動(dòng)重啟Driver。

    第二:代碼層面,使用JavaStreamingContext.getOrCreate(checkpoint路徑,JavaStreamingContextFactory)

    • Driver中元數(shù)據(jù)包括:
  • 創(chuàng)建應(yīng)用程序的配置信息。
  • DStream的操作邏輯。
  • job中沒有完成的批次數(shù)據(jù),也就是job的執(zhí)行進(jìn)度。
  • ?

    import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Durations, StreamingContext}/*** Driver HA :* 1.在提交application的時(shí)候 添加 --supervise 選項(xiàng) 如果Driver掛掉 會(huì)自動(dòng)啟動(dòng)一個(gè)Driver* 2.代碼層面恢復(fù)Driver(StreamingContext)**/ object SparkStreamingDriverHA {//設(shè)置checkpoint目錄val ckDir = "./data/streamingCheckpoint"def main(args: Array[String]): Unit = {/*** StreamingContext.getOrCreate(ckDir,CreateStreamingContext)* 這個(gè)方法首先會(huì)從ckDir目錄中獲取StreamingContext【 因?yàn)镾treamingContext是序列化存儲(chǔ)在Checkpoint目錄中,恢復(fù)時(shí)會(huì)嘗試反序列化這些objects。* 如果用修改過的class可能會(huì)導(dǎo)致錯(cuò)誤,此時(shí)需要更換checkpoint目錄或者刪除checkpoint目錄中的數(shù)據(jù),程序才能起來。】** 若能獲取回來StreamingContext,就不會(huì)執(zhí)行CreateStreamingContext這個(gè)方法創(chuàng)建,否則就會(huì)創(chuàng)建**/val ssc: StreamingContext = StreamingContext.getOrCreate(ckDir,CreateStreamingContext)ssc.start()ssc.awaitTermination()ssc.stop()}def CreateStreamingContext() = {println("=======Create new StreamingContext =======")val conf = new SparkConf()conf.setMaster("local")conf.setAppName("DriverHA")val ssc: StreamingContext = new StreamingContext(conf,Durations.seconds(5))ssc.sparkContext.setLogLevel("Error")/*** 默認(rèn)checkpoint 存儲(chǔ):* 1.配置信息* 2.DStream操作邏輯* 3.job的執(zhí)行進(jìn)度* * 4.offset*/ssc.checkpoint(ckDir)val lines: DStream[String] = ssc.textFileStream("./data/streamingCopyFile")val words: DStream[String] = lines.flatMap(line=>{line.trim.split(" ")})val pairWords: DStream[(String, Int)] = words.map(word=>{(word,1)})val result: DStream[(String, Int)] = pairWords.reduceByKey((v1:Int, v2:Int)=>{v1+v2})// result.print()/*** 更改邏輯*/result.foreachRDD(pairRDD=>{pairRDD.filter(one=>{println("*********** filter *********")true})pairRDD.foreach(println)})ssc} }

    小練習(xí):

    import java.io.{File, FileInputStream, FileOutputStream} import java.util.UUID/*** 將項(xiàng)目中的 ./data/copyFileWord 文件 每隔5s 復(fù)制到 ./data/streamingCopyFile 路徑下*/object CopyFileToDirectory {def main(args: Array[String]): Unit = {while(true){Thread.sleep(5000);val uuid = UUID.randomUUID().toString();println(uuid);copyFile(new File("./data/copyFileWord"),new File(".\\data\\streamingCopyFile\\"+uuid+"----words.txt"));}}/*** 復(fù)制文件到文件夾目錄下*/def copyFile(fromFile: File, toFile: File): Unit ={val ins = new FileInputStream(fromFile);val out = new FileOutputStream(toFile);val buffer = new Array[Byte](1024*1024)var size = 0while (size != -1) {out.write(buffer, 0, buffer.length);size = ins.read(buffer)}ins.close();out.close();}}

    ?

    import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Durations, StreamingContext} /*** sparkStreamig監(jiān)控某個(gè)文件夾時(shí),不需要設(shè)置local[2],沒有采用 receiver 接收器的模式讀取數(shù)據(jù)* 以下是監(jiān)控 ./data/streamingCopyFile 目錄下的文件** SpakrStreaming 監(jiān)控某個(gè)目錄下的文件,這個(gè)文件必須是原子性的在目錄中產(chǎn)生,已經(jīng)存在的文件后面追加數(shù)據(jù)不能被監(jiān)控到,被刪除的文件也不能被監(jiān)控到*/ object SparkSteamingMonitorDirectory {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local")conf.setAppName("monitorFile")val ssc = new StreamingContext(conf,Durations.seconds(10))ssc.sparkContext.setLogLevel("Error")val lines: DStream[String] = ssc.textFileStream("./data/streamingCopyFile")val words: DStream[String] = lines.flatMap(line=>{line.trim.split(" ")})val pairWords: DStream[(String, Int)] = words.map(word=>{(word.trim,1)})val result: DStream[(String, Int)] = pairWords.reduceByKey((v1:Int, v2:Int)=>{v1+v2})result.print()ssc.start()ssc.awaitTermination()ssc.stop()} }

    ?

    總結(jié)

    以上是生活随笔為你收集整理的Spark _30_SparkStreaming算子操作Driver HA的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。