Spark Streaming 实战案例(一)
本節(jié)主要內(nèi)容
本節(jié)部分內(nèi)容來(lái)自官方文檔:http://spark.apache.org/docs/latest/streaming-programming-guide.html#mllib-operations
1. Spark流式計(jì)算簡(jiǎn)介
Hadoop的MapReduce及Spark SQL等只能進(jìn)行離線計(jì)算,無(wú)法滿足實(shí)時(shí)性要求較高的業(yè)務(wù)需求,例如實(shí)時(shí)推薦、實(shí)時(shí)網(wǎng)站性能分析等,流式計(jì)算可以解決這些問(wèn)題。目前有三種比較常用的流式計(jì)算框架,它們分別是Storm,Spark Streaming和Samza,各個(gè)框架的比較及使用情況,可以參見(jiàn):http://www.csdn.net/article/2015-03-09/2824135。本節(jié)對(duì)Spark Streaming進(jìn)行重點(diǎn)介紹,Spark Streaming作為Spark的五大核心組件之一,其原生地支持多種數(shù)據(jù)源的接入,而且可以與Spark MLLib、Graphx結(jié)合起來(lái)使用,輕松完成分布式環(huán)境下在線機(jī)器學(xué)習(xí)算法的設(shè)計(jì)。Spark支持的輸入數(shù)據(jù)源及輸出文件如下圖所示:
在后面的案例實(shí)戰(zhàn)當(dāng)中,會(huì)涉及到這部分內(nèi)容。中間的”Spark Streaming“會(huì)對(duì)輸入的數(shù)據(jù)源進(jìn)行處理,然后將結(jié)果輸出,其內(nèi)部工作原理如下圖所示:
Spark Streaming接受實(shí)時(shí)傳入的數(shù)據(jù)流,然后將數(shù)據(jù)按批次(batch)進(jìn)行劃分,然后再將這部分?jǐn)?shù)據(jù)交由Spark引擎進(jìn)行處理,處理完成后將結(jié)果輸出到外部文件。
先看下面一段基于Spark Streaming的word count代碼,它可以很好地幫助初步理解流式計(jì)算
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext}object StreamingWordCount {def main(args: Array[String]) {if (args.length < 1) {System.err.println("Usage: StreamingWordCount <directory>")System.exit(1)}//創(chuàng)建SparkConf對(duì)象val sparkConf = new SparkConf().setAppName("HdfsWordCount").setMaster("local[2]")// Create the context//創(chuàng)建StreamingContext對(duì)象,與集群進(jìn)行交互val ssc = new StreamingContext(sparkConf, Seconds(20))// Create the FileInputDStream on the directory and use the// stream to count words in new files created//如果目錄中有新創(chuàng)建的文件,則讀取val lines = ssc.textFileStream(args(0))//分割為單詞val words = lines.flatMap(_.split(" "))//統(tǒng)計(jì)單詞出現(xiàn)次數(shù)val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)//打印結(jié)果wordCounts.print()//啟動(dòng)Spark Streamingssc.start()//一直運(yùn)行,除非人為干預(yù)再停止ssc.awaitTermination()} }運(yùn)行上面的程序后,再通過(guò)命令行界面,將文件拷貝到相應(yīng)的文件目錄,具體如下:
程序在運(yùn)行時(shí),根據(jù)文件創(chuàng)建時(shí)間對(duì)文件進(jìn)行處理,在上一次運(yùn)行時(shí)間后創(chuàng)建的文件都會(huì)被處理,輸出結(jié)果如下:
2. Spark Streaming相關(guān)核心類(lèi)
1. DStream(discretized stream)
Spark Streaming提供了對(duì)數(shù)據(jù)流的抽象,它就是DStream,它可以通過(guò)前述的 Kafka, Flume等數(shù)據(jù)源創(chuàng)建,DStream本質(zhì)上是由一系列的RDD構(gòu)成。各個(gè)RDD中的數(shù)據(jù)為對(duì)應(yīng)時(shí)間間隔( interval)中流入的數(shù)據(jù),如下圖所示:
對(duì)DStream的所有操作最終都要轉(zhuǎn)換為對(duì)RDD的操作,例如前面的StreamingWordCount程序,flatMap操作將作用于DStream中的所有RDD,如下圖所示:
2.StreamingContext
在Spark Streaming當(dāng)中,StreamingContext是整個(gè)程序的入口,其創(chuàng)建方式有多種,最常用的是通過(guò)SparkConf來(lái)創(chuàng)建:
創(chuàng)建StreamingContext對(duì)象時(shí)會(huì)根據(jù)SparkConf創(chuàng)建SparkContext
/*** Create a StreamingContext by providing the configuration necessary for a new SparkContext.* @param conf a org.apache.spark.SparkConf object specifying Spark parameters* @param batchDuration the time interval at which streaming data will be divided into batches*/def this(conf: SparkConf, batchDuration: Duration) = {this(StreamingContext.createNewSparkContext(conf), null, batchDuration)}也就是說(shuō)StreamingContext是對(duì)SparkContext的封裝,StreamingContext還有其它幾個(gè)構(gòu)造方法,感興趣的可以了解,后期在源碼解析時(shí)會(huì)對(duì)它進(jìn)行詳細(xì)的講解,創(chuàng)建StreamingContext時(shí)會(huì)指定batchDuration,它用于設(shè)定批處理時(shí)間間隔,需要根據(jù)應(yīng)用程序和集群資源情況去設(shè)定。當(dāng)創(chuàng)建完成StreamingContext之后,再按下列步驟進(jìn)行:
關(guān)于StreamingContext有幾個(gè)值得注意的地方:
1.StreamingContext啟動(dòng)后,增加新的操作將不起作用。也就是說(shuō)在StreamingContext啟動(dòng)之前,要定義好所有的計(jì)算邏輯
2.StreamingContext停止后,不能重新啟動(dòng)。也就是說(shuō)要重新計(jì)算的話,需要重新運(yùn)行整個(gè)程序。
3.在單個(gè)JVM中,一段時(shí)間內(nèi)不能出現(xiàn)兩個(gè)active狀態(tài)的StreamingContext
4.調(diào)用StreamingContext的stop方法時(shí),SparkContext也將被stop掉,如果希望StreamingContext關(guān)閉時(shí),保留SparkContext,則需要在stop方法中傳入?yún)?shù)stopSparkContext=false
/**
* Stop the execution of the streams immediately (does not wait for all received data
* to be processed). By default, if stopSparkContext is not specified, the underlying
* SparkContext will also be stopped. This implicit behavior can be configured using the
* SparkConf configuration spark.streaming.stopSparkContextByDefault.
*
* @param stopSparkContext If true, stops the associated SparkContext. The underlying SparkContext
* will be stopped regardless of whether this StreamingContext has been
* started.
*/
def stop(
stopSparkContext: Boolean = conf.getBoolean(“spark.streaming.stopSparkContextByDefault”, true)
): Unit = synchronized {
stop(stopSparkContext, false)
}
5.SparkContext對(duì)象可以被多個(gè)StreamingContexts重復(fù)使用,但需要前一個(gè)StreamingContexts停止后再創(chuàng)建下一個(gè)StreamingContext對(duì)象。
3. InputDStreams及Receivers
InputDStream指的是從數(shù)據(jù)流的源頭接受的輸入數(shù)據(jù)流,在前面的StreamingWordCount程序當(dāng)中,val lines = ssc.textFileStream(args(0)) 就是一種InputDStream。除文件流外,每個(gè)input DStream都關(guān)聯(lián)一個(gè)Receiver對(duì)象,該Receiver對(duì)象接收數(shù)據(jù)源傳來(lái)的數(shù)據(jù)并將其保存在內(nèi)存中以便后期Spark處理。
Spark Streaimg提供兩種原生支持的流數(shù)據(jù)源:
Basic sources(基礎(chǔ)流數(shù)據(jù)源)。直接通過(guò)StreamingContext API創(chuàng)建,例如文件系統(tǒng)(本地文件系統(tǒng)及分布式文件系統(tǒng))、Socket連接及Akka的Actor。
文件流(File Streams)的創(chuàng)建方式:
a. streamingContext.fileStreamKeyClass, ValueClass, InputFormatClass
b. streamingContext.textFileStream(dataDirectory)
實(shí)時(shí)上textFileStream方法最終調(diào)用的也是fileStream方法
def textFileStream(directory: String): DStream[String] = withNamedScope(“text file stream”) {
fileStreamLongWritable, Text, TextInputFormat.map(_._2.toString)
}
基于Akka Actor流數(shù)據(jù)的創(chuàng)建方式:
streamingContext.actorStream(actorProps, actor-name)
基于Socket流數(shù)據(jù)的創(chuàng)建方式:
ssc.socketTextStream(hostname: String,port: Int,storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2)
基于RDD隊(duì)列的流數(shù)據(jù)創(chuàng)建方式:
streamingContext.queueStream(queueOfRDDs)
Advanced sources(高級(jí)流數(shù)據(jù)源)。如Kafka, Flume, Kinesis, Twitter等,需要借助外部工具類(lèi),在運(yùn)行時(shí)需要外部依賴(lài)(下一節(jié)內(nèi)容中介紹)
Spark Streaming還支持用戶
3. Custom Sources(自定義流數(shù)據(jù)源),它需要用戶定義receiver,該部分內(nèi)容也放在下一節(jié)介紹
最后有兩個(gè)需要注意的地方:
3. 入門(mén)案例
為方便后期查看運(yùn)行結(jié)果,修改日志級(jí)別為L(zhǎng)evel.WARN
import org.apache.spark.Loggingimport org.apache.log4j.{Level, Logger}/** Utility functions for Spark Streaming examples. */ object StreamingExamples extends Logging {/** Set reasonable logging levels for streaming if the user has not configured log4j. */def setStreamingLogLevels() {val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElementsif (!log4jInitialized) {// We first log something to initialize Spark's default logging, then we override the// logging level.logInfo("Setting log level to [WARN] for streaming example." +" To override add a custom log4j.properties to the classpath.")Logger.getRootLogger.setLevel(Level.WARN)}} }基于Socket流數(shù)據(jù)
配置運(yùn)行時(shí)參數(shù)
使用
//啟動(dòng)netcat server root@sparkmaster:~/streaming# nc -lk 9999運(yùn)行NetworkWordCount 程序,然后在netcat server運(yùn)行的控制臺(tái)輸入任意字符串
root@sparkmaster:~/streaming# nc -lk 9999 Hello WORLD HELLO WORLD WORLD TEWST NIMA基于RDD隊(duì)列的流數(shù)據(jù)
總結(jié)
以上是生活随笔為你收集整理的Spark Streaming 实战案例(一)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: ArcGis中空间连接join
- 下一篇: 万用socket神器Linux Netc