Apache SparkStreaming 简介和编程模型
1. 簡介
圖5.22 SparkStreaming[16]
????Spark Streaming是Spark API核心擴(kuò)展,提供對(duì)實(shí)時(shí)數(shù)據(jù)流進(jìn)行流式處理,具備可擴(kuò)展、高吞吐和容錯(cuò)等特性。Spark Streaming支持從多種數(shù)據(jù)源中提取數(shù)據(jù),例如Twitter、Kafka、Flume、ZeroMQ和TCP套接字,并提供了一些高級(jí)的API來表示復(fù)雜處理算法,如map、reduce、join、windows等,最后可以將得到的結(jié)果存儲(chǔ)到分布式文件系統(tǒng)(如HDFS)、數(shù)據(jù)庫或者其他輸出,Spark的機(jī)器學(xué)習(xí)和圖計(jì)算的算法也可以應(yīng)用于Spark Streaming的數(shù)據(jù)流中。Spark Streaming的本質(zhì)實(shí)際上是一個(gè)微批處理系統(tǒng),正因如此,Spark Streaming具有一些現(xiàn)有的流處理模型所沒有的特性。它可以對(duì)故障節(jié)點(diǎn)和慢節(jié)點(diǎn)實(shí)現(xiàn)秒級(jí)的恢復(fù),且具有高吞吐量。但其實(shí)時(shí)計(jì)算延遲是在秒級(jí)的,而現(xiàn)有的流處理系統(tǒng)(如Storm)一般是在毫秒級(jí),所以Spark Streaming不適用于一些實(shí)時(shí)性要求很高的場(chǎng)景,如實(shí)時(shí)金融系統(tǒng)等。
????許多數(shù)據(jù)需要實(shí)時(shí)進(jìn)行處理,也就是說數(shù)據(jù)產(chǎn)生時(shí)的價(jià)值最大。例如,一個(gè)社交網(wǎng)絡(luò)想在分鐘級(jí)別內(nèi)確定某個(gè)交流話題的趨勢(shì),搜索網(wǎng)站想根據(jù)用戶的訪問訓(xùn)練模型,服務(wù)商想在秒級(jí)內(nèi)通過挖掘日志找到錯(cuò)誤信息。設(shè)計(jì)適用于這些場(chǎng)景的模型極具挑戰(zhàn)性,因?yàn)閷?duì)于一些應(yīng)用場(chǎng)景(如機(jī)器學(xué)習(xí)、實(shí)時(shí)日志分析),集群規(guī)模會(huì)達(dá)到百級(jí)以上,在這樣的規(guī)模下會(huì)存在兩個(gè)主要問題:節(jié)點(diǎn)故障(faults)和慢節(jié)點(diǎn)(slow nodes)問題。這兩個(gè)問題在大規(guī)模集群下都是經(jīng)常存在的,所以快速恢復(fù)在流系統(tǒng)應(yīng)用中是十分重要的,否則流式應(yīng)用可能無法及時(shí)做出關(guān)鍵的決定。但現(xiàn)有的一些流處理系統(tǒng)在這兩個(gè)問題的處理上都十分有限,大多數(shù)流處理系統(tǒng)(如Storm、TimeStream、MapReduce Online等)都是基于純實(shí)時(shí)的計(jì)算模型(a-record-at-a-time,來一條數(shù)據(jù)就處理一條數(shù)據(jù)),雖然這個(gè)模型能夠有較小的計(jì)算時(shí)延,但是很難解決節(jié)點(diǎn)故障和慢節(jié)點(diǎn)的問題。一些傳統(tǒng)的流式處理方法在小規(guī)模集群下運(yùn)行較好,但在大規(guī)模情況下卻面臨著實(shí)質(zhì)性的問題。
????Spark Streaming提供了一種抽象的連續(xù)數(shù)據(jù)流,即Discretized Stream(離散流),一個(gè)離散流本質(zhì)上就是一個(gè)序列化的RDD(Resilient Distributed Datasets,彈性分布式數(shù)據(jù)集)。離散流模型利用其并行恢復(fù)(ParallelRecovery)解決了節(jié)點(diǎn)故障和減輕了慢節(jié)點(diǎn)所帶來的問題,還保證了一致性語義。
2. 系統(tǒng)架構(gòu)
????離散流是Spark Streaming提供的基礎(chǔ)抽象,它代表持續(xù)性的數(shù)據(jù)流,這些數(shù)據(jù)流既可以從外部源(如Kafka、Flume等)獲取,也可以通過離散流的算子操作來獲得。實(shí)質(zhì)上,離散流由一組時(shí)間上連續(xù)的RDD組成,每個(gè)RDD都包含著一定時(shí)間片的數(shù)據(jù),如圖5.23所示:
圖5.23Discretized Stream[17]
圖5.24 SparkStreaming 整體架構(gòu)[18]
?
????如圖5.24所示,這是Spark Streaming系統(tǒng)的整體架構(gòu),它將實(shí)時(shí)的流數(shù)據(jù)分解成一系列很小的批處理作業(yè)。批處理引擎使用的是Spark Core,也就是把輸入數(shù)據(jù)按照一定的時(shí)間片(如1s)分成一段一段數(shù)據(jù),每一段數(shù)據(jù)都會(huì)轉(zhuǎn)換成Spark的RDD輸入到Spark Core中,然后再將離散流的操作轉(zhuǎn)換為RDD的算子操作,RDD算子操作產(chǎn)生的中間結(jié)果會(huì)保存在內(nèi)存中,最后整個(gè)流式計(jì)算可以將中間結(jié)果輸出到外部。
3. 一致性語義和容錯(cuò)
????對(duì)于流式計(jì)算,容錯(cuò)性的重要性在第一小節(jié)已經(jīng)詳細(xì)說明過了。首先,我們需要回憶Spark中RDD的容錯(cuò)機(jī)制。RDD是一個(gè)彈性不可變的分布式數(shù)據(jù)集,Spark記錄著確定性的RDD轉(zhuǎn)換的操作繼承關(guān)系(lineage),所以只要輸入的數(shù)據(jù)是可容錯(cuò)的,任何一個(gè)RDD的分區(qū)出錯(cuò)時(shí),都可以根據(jù)lineage對(duì)原始輸入數(shù)據(jù)進(jìn)行轉(zhuǎn)換操作,從而重新計(jì)算。圖5.25是Spark Streaming的一個(gè)RDD繼承關(guān)系圖:
圖5.25 統(tǒng)計(jì)網(wǎng)頁瀏覽量的lineragegraph[18]
????圖中每個(gè)橢圓代表的是一個(gè)RDD,橢圓中的每一個(gè)圓形是一個(gè)RDD的分區(qū),圖中的每一列的所有RDD代表的是一個(gè)離散流(圖中一共有3個(gè)離散流),間隔[0,1)和[1,2)代表的是不同時(shí)間分片,圖中每一行的最后一個(gè)RDD代表的是中間結(jié)果RDD。
????并行恢復(fù)(Parallel Recovery):系統(tǒng)會(huì)周期性的checkpoint RDD的數(shù)據(jù),異步的備份到其他節(jié)點(diǎn)(默認(rèn)復(fù)制數(shù)是2),因?yàn)镽DD是不可變的,所以checkpoint不會(huì)鎖住當(dāng)前時(shí)間片的執(zhí)行。一個(gè)Spark Streaming的流式應(yīng)用,系統(tǒng)會(huì)每分鐘對(duì)中間結(jié)果RDD進(jìn)行checkpoint。當(dāng)一個(gè)節(jié)點(diǎn)發(fā)生故障了,系統(tǒng)監(jiān)測(cè)出丟失的RDD,系統(tǒng)會(huì)選擇上一個(gè)checkpoint的數(shù)據(jù)來進(jìn)行重新計(jì)算。離散流可以利用充分利用分區(qū)的并行性來達(dá)到更快的恢復(fù)速度:1)與批處理系統(tǒng)很相似的是,每個(gè)節(jié)點(diǎn)上運(yùn)行多個(gè)task,每一個(gè)時(shí)間片的轉(zhuǎn)換操作會(huì)在每個(gè)節(jié)點(diǎn)創(chuàng)建多個(gè)RDD分區(qū)(例如在100個(gè)節(jié)點(diǎn)的集群上有1000個(gè)RDD分區(qū))。這樣當(dāng)一個(gè)節(jié)點(diǎn)發(fā)生故障時(shí),可以讓RDD的不同分區(qū)并行恢復(fù)。2)繼承關(guān)系圖(lineage graph)可以使不同時(shí)間片的數(shù)據(jù)并行恢復(fù)。如果一個(gè)機(jī)器節(jié)點(diǎn)發(fā)生故障,系統(tǒng)在每一個(gè)時(shí)間片可能丟失一些map操作的輸出,從圖5.26的瀏覽量統(tǒng)計(jì)應(yīng)用的lineage graph可以看出,不同時(shí)間片的map可以并行地恢復(fù)計(jì)算,所以并行恢復(fù)的速度是比上游緩存策略更快的。
????慢節(jié)點(diǎn)問題:在現(xiàn)有的純實(shí)時(shí)流處理系統(tǒng)中,基本都沒有解決慢節(jié)點(diǎn)的問題。離散流則與批處理系統(tǒng)類似,通過運(yùn)行慢任務(wù)的副本來減輕慢節(jié)點(diǎn)帶來的影響。Spark Streaming最開始采用一種簡單的閾值來判斷一個(gè)任務(wù)是否是慢任務(wù):當(dāng)一個(gè)任務(wù)是這個(gè)階段(stage)的中間任務(wù)運(yùn)行時(shí)間的1.4倍,則判斷這是個(gè)慢任務(wù)。
????一致性語義:離散流還有一個(gè)好處就是提供了強(qiáng)一致性。例如,考慮一個(gè)系統(tǒng)統(tǒng)計(jì)男女網(wǎng)頁瀏覽量的比例,一個(gè)節(jié)點(diǎn)統(tǒng)計(jì)男性網(wǎng)頁瀏覽量,另一個(gè)節(jié)點(diǎn)統(tǒng)計(jì)女性網(wǎng)頁瀏覽量。如果一個(gè)節(jié)點(diǎn)落后于另一個(gè)節(jié)點(diǎn),那么最終的結(jié)果也將有誤。一些系統(tǒng)(如Borealis)利用同步節(jié)點(diǎn)來避免這個(gè)問題,而Storm就直接忽略了這個(gè)問題。而且Storm只能保證一個(gè)記錄最少被處理一次,可能存在錯(cuò)誤記錄被多次處理,這就會(huì)使可變更的狀態(tài)因更新兩次而導(dǎo)致結(jié)果不正確,雖然Storm提供了Trident可以確保每條記錄有且僅被處理一次,但是非常慢且需要用戶去實(shí)現(xiàn)。使用離散流可以保證一致性是很明顯的,因?yàn)闀r(shí)間被劃分成時(shí)間片,每一個(gè)時(shí)間片的輸出RDD都與這個(gè)時(shí)間片的輸入和前面時(shí)間片有關(guān)(參考圖5.26),而RDD是不可變的,因此最終的結(jié)果是不會(huì)改變的。
4. Apache SparkStreaming編程模型
4.1 數(shù)據(jù)模型
?
????在第2節(jié)我們知道,Spark Streaming就是把數(shù)據(jù)流劃分為微批交給Spark Core處理的。Spark Core的處理的數(shù)據(jù)被抽象成了一個(gè)RDD,而Spark Streaming的處理數(shù)據(jù)被抽象成了一系列的DStream。實(shí)質(zhì)上,離散流由一組時(shí)間上連續(xù)的RDD組成,每個(gè)RDD都包含著一定時(shí)間片的數(shù)據(jù),如圖5.23所示。
?
4.2 計(jì)算模型
?
????Spark Streaming的編程模型可以看成是一個(gè)批處理Spark Core的編程模型,除了API是調(diào)用Spark Streaming的API,很多概念都是一樣的。在Spark Core編寫程序時(shí),只需要指定初始RDD的生成,然后對(duì)初始RDD進(jìn)行一系列轉(zhuǎn)換的操作,不斷生成新的RDD,最后生成最終的結(jié)果RDD。
????Spark Streaming也是類似的計(jì)算模型,DStream本質(zhì)是一組時(shí)間上連續(xù)的RDD組成的,RDD是依靠著分區(qū)(Partition)來保證并行性的。在編寫Spark Streaming程序的時(shí)候,我們需要指定初始DStream的輸入源,生成初始的DStream,然后定義一些轉(zhuǎn)換操作,這些DStream的操作最終都會(huì)轉(zhuǎn)換成RDD的操作,然后在每一個(gè)時(shí)間片內(nèi),可以獲得最終的結(jié)果DStream對(duì)應(yīng)的RDD(也可以將結(jié)果選擇輸出到外部文件中),可以參考后面單詞計(jì)數(shù)的實(shí)例分析。
PS:關(guān)于Spark Core中RDD的編程模型不屬于本章所要講的重點(diǎn),在這里就不做贅述。
?
4.3 基本操作
?
????從Spark Streaming的系統(tǒng)架構(gòu)可知,Spark Streaming中對(duì)DStream的各種操作,最終會(huì)在Spark Core中轉(zhuǎn)換成RDD的操作,因此對(duì)DStream的操作是與Spark Core對(duì)RDD的操作是十分類似的。Spark Streaming在其數(shù)據(jù)模型DStream的模型下,為DStream提供了一系列的操作方法,這些操作大概可以分為3類:普通的轉(zhuǎn)換操作、窗口轉(zhuǎn)換操作和輸出操作。常用的普通轉(zhuǎn)換操作有flatMap、map、filter、reduceByKey、countByKey等操作,并且Spark Streaming支持將DStream的數(shù)據(jù)輸出到外部系統(tǒng),如數(shù)據(jù)庫或文件系統(tǒng)。具體Spark Streaming支持的所有操作,可以到官網(wǎng)查看。
?
4.4 編程模型實(shí)例分析
??????
?????? 下面用最基本的wordcount例子來解釋其編程模型,其DStream的轉(zhuǎn)換如下所示:
???? 圖XXX:單詞計(jì)數(shù)的DStream轉(zhuǎn)換圖
????如上圖所示,一共定義了四個(gè)離散流,wordCounts的離散流是我們最終要的結(jié)果。LinesDStream可以從文件系統(tǒng)、數(shù)據(jù)庫、kafka等獲取,然后對(duì)其進(jìn)行flatMap操作,將每一行的文本分割成單詞,形成新的離散流words DStream,隨即進(jìn)行mapToPair操作,將其映射成<word,1>的模式,最后用reduceByKey操作對(duì)每個(gè)單詞進(jìn)行計(jì)數(shù),得到最終的結(jié)果離散流wordCountsDStream。
Java核心代碼如下:
?
//創(chuàng)建SparkConf對(duì)象 //與Spark Core的有一點(diǎn)不同,設(shè)置Master屬性的時(shí)候,使用local模式時(shí), // local后面必須跟一個(gè)方括號(hào),里面填寫一個(gè)數(shù)字,數(shù)字代表了用幾個(gè)線程執(zhí)行Spark Streaming程序。 SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("WordCountLocal");//創(chuàng)建SparkStreamingContext對(duì)象,還需指定每隔多長時(shí)間的數(shù)據(jù)劃分為一個(gè)batch,這里是1s JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(1));//首先,創(chuàng)建一個(gè)DStream,代表了從一個(gè)數(shù)據(jù)源(這里是socket)來的持續(xù)不斷的實(shí)時(shí)數(shù)據(jù)流 JavaReceiverInputDStream<String> lines = jsc.socketTextStream("localhost", 9999);//將一行行的文本用flatMap切分成多個(gè)單詞,words DStream的RDD元素類型為一個(gè)個(gè)單詞 JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String line) throws Exception {return Arrays.asList(line.split(" ")).iterator();} });/ //接著開始進(jìn)行mapToPair操作,將單詞映射成<word,1>的pair格式,得到離散流pairs JavaPairDStream<String,Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String word) throws Exception {return new Tuple2<String, Integer>(word,1);} });//對(duì)離散流pairs進(jìn)行reduceByKey操作,進(jìn)行單詞計(jì)數(shù),得到wordCounts離散流 JavaPairDStream<String,Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;} }); //最后,每次計(jì)算完,就打印這一秒鐘的單詞計(jì)數(shù)情況 wordCounts.print(); //必須調(diào)用JavaStreamingContext的start()方法,整個(gè)Java Streaming Application才會(huì)啟動(dòng)執(zhí)行 //否則,不會(huì)執(zhí)行 jsc.start(); try {jsc.awaitTermination();//等待應(yīng)用程序的終止,可以使用CTRL+C手動(dòng)停止//也可以通過調(diào)用JavaStreamingContext的stop()方法來終止程序 } catch (InterruptedException e) {e.printStackTrace(); } jsc.close();
總結(jié)
以上是生活随笔為你收集整理的Apache SparkStreaming 简介和编程模型的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Apache Flink 简介和编程模型
- 下一篇: 大数据技术:Zookeeper分布式协调