日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

Spark Streaming 实战案例(一)

發(fā)布時(shí)間:2024/1/23 编程问答 47 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark Streaming 实战案例(一) 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

本節(jié)主要內(nèi)容

本節(jié)部分內(nèi)容來(lái)自官方文檔:http://spark.apache.org/docs/latest/streaming-programming-guide.html#mllib-operations

  • Spark流式計(jì)算簡(jiǎn)介
  • Spark Streaming相關(guān)核心類(lèi)
  • 入門(mén)案例
  • 1. Spark流式計(jì)算簡(jiǎn)介

    Hadoop的MapReduce及Spark SQL等只能進(jìn)行離線計(jì)算,無(wú)法滿足實(shí)時(shí)性要求較高的業(yè)務(wù)需求,例如實(shí)時(shí)推薦、實(shí)時(shí)網(wǎng)站性能分析等,流式計(jì)算可以解決這些問(wèn)題。目前有三種比較常用的流式計(jì)算框架,它們分別是Storm,Spark Streaming和Samza,各個(gè)框架的比較及使用情況,可以參見(jiàn):http://www.csdn.net/article/2015-03-09/2824135。本節(jié)對(duì)Spark Streaming進(jìn)行重點(diǎn)介紹,Spark Streaming作為Spark的五大核心組件之一,其原生地支持多種數(shù)據(jù)源的接入,而且可以與Spark MLLib、Graphx結(jié)合起來(lái)使用,輕松完成分布式環(huán)境下在線機(jī)器學(xué)習(xí)算法的設(shè)計(jì)。Spark支持的輸入數(shù)據(jù)源及輸出文件如下圖所示:

    在后面的案例實(shí)戰(zhàn)當(dāng)中,會(huì)涉及到這部分內(nèi)容。中間的”Spark Streaming“會(huì)對(duì)輸入的數(shù)據(jù)源進(jìn)行處理,然后將結(jié)果輸出,其內(nèi)部工作原理如下圖所示:


    Spark Streaming接受實(shí)時(shí)傳入的數(shù)據(jù)流,然后將數(shù)據(jù)按批次(batch)進(jìn)行劃分,然后再將這部分?jǐn)?shù)據(jù)交由Spark引擎進(jìn)行處理,處理完成后將結(jié)果輸出到外部文件。

    先看下面一段基于Spark Streaming的word count代碼,它可以很好地幫助初步理解流式計(jì)算

    import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext}object StreamingWordCount {def main(args: Array[String]) {if (args.length < 1) {System.err.println("Usage: StreamingWordCount <directory>")System.exit(1)}//創(chuàng)建SparkConf對(duì)象val sparkConf = new SparkConf().setAppName("HdfsWordCount").setMaster("local[2]")// Create the context//創(chuàng)建StreamingContext對(duì)象,與集群進(jìn)行交互val ssc = new StreamingContext(sparkConf, Seconds(20))// Create the FileInputDStream on the directory and use the// stream to count words in new files created//如果目錄中有新創(chuàng)建的文件,則讀取val lines = ssc.textFileStream(args(0))//分割為單詞val words = lines.flatMap(_.split(" "))//統(tǒng)計(jì)單詞出現(xiàn)次數(shù)val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)//打印結(jié)果wordCounts.print()//啟動(dòng)Spark Streamingssc.start()//一直運(yùn)行,除非人為干預(yù)再停止ssc.awaitTermination()} }

    運(yùn)行上面的程序后,再通過(guò)命令行界面,將文件拷貝到相應(yīng)的文件目錄,具體如下:

    程序在運(yùn)行時(shí),根據(jù)文件創(chuàng)建時(shí)間對(duì)文件進(jìn)行處理,在上一次運(yùn)行時(shí)間后創(chuàng)建的文件都會(huì)被處理,輸出結(jié)果如下:

    2. Spark Streaming相關(guān)核心類(lèi)

    1. DStream(discretized stream)

    Spark Streaming提供了對(duì)數(shù)據(jù)流的抽象,它就是DStream,它可以通過(guò)前述的 Kafka, Flume等數(shù)據(jù)源創(chuàng)建,DStream本質(zhì)上是由一系列的RDD構(gòu)成。各個(gè)RDD中的數(shù)據(jù)為對(duì)應(yīng)時(shí)間間隔( interval)中流入的數(shù)據(jù),如下圖所示:

    對(duì)DStream的所有操作最終都要轉(zhuǎn)換為對(duì)RDD的操作,例如前面的StreamingWordCount程序,flatMap操作將作用于DStream中的所有RDD,如下圖所示:

    2.StreamingContext
    在Spark Streaming當(dāng)中,StreamingContext是整個(gè)程序的入口,其創(chuàng)建方式有多種,最常用的是通過(guò)SparkConf來(lái)創(chuàng)建:

    import org.apache.spark._ import org.apache.spark.streaming._val conf = new SparkConf().setAppName(appName).setMaster(master) val ssc = new StreamingContext(conf, Seconds(1))

    創(chuàng)建StreamingContext對(duì)象時(shí)會(huì)根據(jù)SparkConf創(chuàng)建SparkContext

    /*** Create a StreamingContext by providing the configuration necessary for a new SparkContext.* @param conf a org.apache.spark.SparkConf object specifying Spark parameters* @param batchDuration the time interval at which streaming data will be divided into batches*/def this(conf: SparkConf, batchDuration: Duration) = {this(StreamingContext.createNewSparkContext(conf), null, batchDuration)}

    也就是說(shuō)StreamingContext是對(duì)SparkContext的封裝,StreamingContext還有其它幾個(gè)構(gòu)造方法,感興趣的可以了解,后期在源碼解析時(shí)會(huì)對(duì)它進(jìn)行詳細(xì)的講解,創(chuàng)建StreamingContext時(shí)會(huì)指定batchDuration,它用于設(shè)定批處理時(shí)間間隔,需要根據(jù)應(yīng)用程序和集群資源情況去設(shè)定。當(dāng)創(chuàng)建完成StreamingContext之后,再按下列步驟進(jìn)行:

  • 通過(guò)輸入源創(chuàng)建InputDStreaim
  • 對(duì)DStreaming進(jìn)行transformation和output操作,這樣操作構(gòu)成了后期流式計(jì)算的邏輯
  • 通過(guò)StreamingContext.start()方法啟動(dòng)接收和處理數(shù)據(jù)的流程
  • 使用streamingContext.awaitTermination()方法等待程序處理結(jié)束(手動(dòng)停止或出錯(cuò)停止)
  • 也可以調(diào)用streamingContext.stop()方法結(jié)束程序的運(yùn)行
  • 關(guān)于StreamingContext有幾個(gè)值得注意的地方:

    1.StreamingContext啟動(dòng)后,增加新的操作將不起作用。也就是說(shuō)在StreamingContext啟動(dòng)之前,要定義好所有的計(jì)算邏輯
    2.StreamingContext停止后,不能重新啟動(dòng)。也就是說(shuō)要重新計(jì)算的話,需要重新運(yùn)行整個(gè)程序。
    3.在單個(gè)JVM中,一段時(shí)間內(nèi)不能出現(xiàn)兩個(gè)active狀態(tài)的StreamingContext
    4.調(diào)用StreamingContext的stop方法時(shí),SparkContext也將被stop掉,如果希望StreamingContext關(guān)閉時(shí),保留SparkContext,則需要在stop方法中傳入?yún)?shù)stopSparkContext=false
    /**
    * Stop the execution of the streams immediately (does not wait for all received data
    * to be processed). By default, if stopSparkContext is not specified, the underlying
    * SparkContext will also be stopped. This implicit behavior can be configured using the
    * SparkConf configuration spark.streaming.stopSparkContextByDefault.
    *
    * @param stopSparkContext If true, stops the associated SparkContext. The underlying SparkContext
    * will be stopped regardless of whether this StreamingContext has been
    * started.
    */
    def stop(
    stopSparkContext: Boolean = conf.getBoolean(“spark.streaming.stopSparkContextByDefault”, true)
    ): Unit = synchronized {
    stop(stopSparkContext, false)
    }
    5.SparkContext對(duì)象可以被多個(gè)StreamingContexts重復(fù)使用,但需要前一個(gè)StreamingContexts停止后再創(chuàng)建下一個(gè)StreamingContext對(duì)象。

    3. InputDStreams及Receivers
    InputDStream指的是從數(shù)據(jù)流的源頭接受的輸入數(shù)據(jù)流,在前面的StreamingWordCount程序當(dāng)中,val lines = ssc.textFileStream(args(0)) 就是一種InputDStream。除文件流外,每個(gè)input DStream都關(guān)聯(lián)一個(gè)Receiver對(duì)象,該Receiver對(duì)象接收數(shù)據(jù)源傳來(lái)的數(shù)據(jù)并將其保存在內(nèi)存中以便后期Spark處理。

    Spark Streaimg提供兩種原生支持的流數(shù)據(jù)源:

  • Basic sources(基礎(chǔ)流數(shù)據(jù)源)。直接通過(guò)StreamingContext API創(chuàng)建,例如文件系統(tǒng)(本地文件系統(tǒng)及分布式文件系統(tǒng))、Socket連接及Akka的Actor。
    文件流(File Streams)的創(chuàng)建方式:
    a. streamingContext.fileStreamKeyClass, ValueClass, InputFormatClass
    b. streamingContext.textFileStream(dataDirectory)
    實(shí)時(shí)上textFileStream方法最終調(diào)用的也是fileStream方法
    def textFileStream(directory: String): DStream[String] = withNamedScope(“text file stream”) {
    fileStreamLongWritable, Text, TextInputFormat.map(_._2.toString)
    }

    基于Akka Actor流數(shù)據(jù)的創(chuàng)建方式:
    streamingContext.actorStream(actorProps, actor-name)

    基于Socket流數(shù)據(jù)的創(chuàng)建方式:
    ssc.socketTextStream(hostname: String,port: Int,storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2)

    基于RDD隊(duì)列的流數(shù)據(jù)創(chuàng)建方式:
    streamingContext.queueStream(queueOfRDDs)

  • Advanced sources(高級(jí)流數(shù)據(jù)源)。如Kafka, Flume, Kinesis, Twitter等,需要借助外部工具類(lèi),在運(yùn)行時(shí)需要外部依賴(lài)(下一節(jié)內(nèi)容中介紹)

  • Spark Streaming還支持用戶
    3. Custom Sources(自定義流數(shù)據(jù)源),它需要用戶定義receiver,該部分內(nèi)容也放在下一節(jié)介紹

    最后有兩個(gè)需要注意的地方:

  • 在本地運(yùn)行Spark Streaming時(shí),master URL不能使用“l(fā)ocal” 或 “l(fā)ocal[1]”,因?yàn)楫?dāng)input DStream與receiver(如sockets, Kafka, Flume等)關(guān)聯(lián)時(shí),receiver自身就需要一個(gè)線程來(lái)運(yùn)行,此時(shí)便沒(méi)有線程去處理接收到的數(shù)據(jù)。因此,在本地運(yùn)行SparkStreaming程序時(shí),要使用“l(fā)ocal[n]”作為master URL,n要大于receiver的數(shù)量。
  • 在集群上運(yùn)行Spark Streaming時(shí),分配給Spark Streaming程序的CPU核數(shù)也必須大于receiver的數(shù)量,否則系統(tǒng)將只接受數(shù)據(jù),無(wú)法處理數(shù)據(jù)。
  • 3. 入門(mén)案例

    為方便后期查看運(yùn)行結(jié)果,修改日志級(jí)別為L(zhǎng)evel.WARN

    import org.apache.spark.Loggingimport org.apache.log4j.{Level, Logger}/** Utility functions for Spark Streaming examples. */ object StreamingExamples extends Logging {/** Set reasonable logging levels for streaming if the user has not configured log4j. */def setStreamingLogLevels() {val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElementsif (!log4jInitialized) {// We first log something to initialize Spark's default logging, then we override the// logging level.logInfo("Setting log level to [WARN] for streaming example." +" To override add a custom log4j.properties to the classpath.")Logger.getRootLogger.setLevel(Level.WARN)}} }
  • NetworkWordCount
    基于Socket流數(shù)據(jù)
  • object NetworkWordCount {def main(args: Array[String]) {if (args.length < 2) {System.err.println("Usage: NetworkWordCount <hostname> <port>")System.exit(1)}//修改日志層次為L(zhǎng)evel.WARNStreamingExamples.setStreamingLogLevels()// Create the context with a 1 second batch sizeval sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[4]")val ssc = new StreamingContext(sparkConf, Seconds(1))// Create a socket stream on target ip:port and count the// words in input stream of \n delimited text (eg. generated by 'nc')// Note that no duplication in storage level only for running locally.// Replication necessary in distributed scenario for fault tolerance.//創(chuàng)建SocketInputDStream,接收來(lái)自ip:port發(fā)送來(lái)的流數(shù)據(jù)val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)val words = lines.flatMap(_.split(" "))val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)wordCounts.print()ssc.start()ssc.awaitTermination()} }

    配置運(yùn)行時(shí)參數(shù)

    使用

    //啟動(dòng)netcat server root@sparkmaster:~/streaming# nc -lk 9999

    運(yùn)行NetworkWordCount 程序,然后在netcat server運(yùn)行的控制臺(tái)輸入任意字符串

    root@sparkmaster:~/streaming# nc -lk 9999 Hello WORLD HELLO WORLD WORLD TEWST NIMA

  • QueueStream
    基于RDD隊(duì)列的流數(shù)據(jù)
  • import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Seconds, StreamingContext}import scala.collection.mutableobject QueueStream {def main(args: Array[String]) {StreamingExamples.setStreamingLogLevels()val sparkConf = new SparkConf().setAppName("QueueStream").setMaster("local[4]")// Create the contextval ssc = new StreamingContext(sparkConf, Seconds(1))// Create the queue through which RDDs can be pushed to// a QueueInputDStream//創(chuàng)建RDD隊(duì)列val rddQueue = new mutable.SynchronizedQueue[RDD[Int]]()// Create the QueueInputDStream and use it do some processing// 創(chuàng)建QueueInputDStream val inputStream = ssc.queueStream(rddQueue)//處理隊(duì)列中的RDD數(shù)據(jù)val mappedStream = inputStream.map(x => (x % 10, 1))val reducedStream = mappedStream.reduceByKey(_ + _)//打印結(jié)果reducedStream.print()//啟動(dòng)計(jì)算ssc.start()// Create and push some RDDs intofor (i <- 1 to 30) {rddQueue += ssc.sparkContext.makeRDD(1 to 3000, 10)Thread.sleep(1000)//通過(guò)程序停止StreamingContext的運(yùn)行ssc.stop()} }

    總結(jié)

    以上是生活随笔為你收集整理的Spark Streaming 实战案例(一)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 免费毛片网站 | 国产69精品久久久久999小说 | 精品成人免费一区二区在线播放 | www.777奇米影视 | 国产久久精品 | 欧美77777| 精品国产aⅴ一区二区三区四川人 | 97碰碰视频 | av中文字幕一区二区 | 黄色片视频免费在线观看 | 少妇人妻好深好紧精品无码 | 欧美三级成人 | 国产精品自拍合集 | 玉足女爽爽91 | 黄色大片视频网站 | 欧美12--15处交性娇小 | 亚洲一级特黄毛片 | 欧美成人a∨高清免费观看 国产精品999视频 | 日本精品人妻无码免费大全 | 在线天堂中文 | 韩国精品一区二区三区 | 欧美福利电影 | 国内毛片毛片 | 国产精品入口免费 | 日本午夜免费福利视频 | www男人的天堂 | 99综合在线| 午夜影视在线观看 | 不卡的av在线免费观看 | 欧美日韩另类视频 | 成人软件在线观看 | 国产精品欧美久久久久天天影视 | 五月婷婷狠狠干 | 黑人毛片网站 | 麻豆高清| 91手机在线 | 久久久久久视 | 在线观看免费的av | 国产一区影院 | 欧洲最强rapper网站直播 | 亚洲综合五区 | 久久爰| 五月婷网站 | 国产做受网站 | 黄色福利社| 亚洲视频在线网 | 中文字幕亚洲一区 | 久久99婷婷 | 激情欧美日韩 | 国产精品久久久久久久久久久久久久久久久 | 国产在线视频网址 | 波波野结衣 | 免费在线成人 | 麻豆精品免费 | 中文字幕国产在线 | 日日骚网 | 欧美18aaaⅹxx | 日韩亚洲一区二区三区 | 日本黄xxxxxxxxx100 | 日本aa在线观看 | 99热在线免费观看 | 中文字幕亚洲在线观看 | 黄色av免费在线播放 | 67194国产| 久久久99精品国产一区二区三区 | 一二三区精品视频 | 成年人在线免费观看网站 | 超碰青草 | 欧美超碰在线观看 | 91精品国产一区二区 | 色播av | 国产黄色一级片视频 | 精品久久福利 | 撕开少妇裙子猛然进入 | 青青草原综合网 | 欧美 在线| 黄色av观看 | 亚洲伊人天堂 | 欧美黄色录像视频 | av美女在线观看 | 精品久久久久一区二区国产 | 日日cao | 亚洲成人生活片 | 一级高清视频 | 日韩av网址大全 | 97se在线 | 日本美女一级视频 | 丰满女人又爽又紧又丰满 | 日韩色一区| 国产精品美女久久久久久久久 | 鲍鱼av在线 | 色爽 | 成人免费观看在线视频 | 亚洲久久成人 | 99久久久久 | 337p亚洲精品色噜噜噜 | 天天射,天天干 | ass极品国模人体欣赏 | av2014天堂 |