Apache SparkStreaming 简介和编程模型
1. 簡介
圖5.22 SparkStreaming[16]
????Spark Streaming是Spark API核心擴展,提供對實時數據流進行流式處理,具備可擴展、高吞吐和容錯等特性。Spark Streaming支持從多種數據源中提取數據,例如Twitter、Kafka、Flume、ZeroMQ和TCP套接字,并提供了一些高級的API來表示復雜處理算法,如map、reduce、join、windows等,最后可以將得到的結果存儲到分布式文件系統(如HDFS)、數據庫或者其他輸出,Spark的機器學習和圖計算的算法也可以應用于Spark Streaming的數據流中。Spark Streaming的本質實際上是一個微批處理系統,正因如此,Spark Streaming具有一些現有的流處理模型所沒有的特性。它可以對故障節點和慢節點實現秒級的恢復,且具有高吞吐量。但其實時計算延遲是在秒級的,而現有的流處理系統(如Storm)一般是在毫秒級,所以Spark Streaming不適用于一些實時性要求很高的場景,如實時金融系統等。
????許多數據需要實時進行處理,也就是說數據產生時的價值最大。例如,一個社交網絡想在分鐘級別內確定某個交流話題的趨勢,搜索網站想根據用戶的訪問訓練模型,服務商想在秒級內通過挖掘日志找到錯誤信息。設計適用于這些場景的模型極具挑戰性,因為對于一些應用場景(如機器學習、實時日志分析),集群規模會達到百級以上,在這樣的規模下會存在兩個主要問題:節點故障(faults)和慢節點(slow nodes)問題。這兩個問題在大規模集群下都是經常存在的,所以快速恢復在流系統應用中是十分重要的,否則流式應用可能無法及時做出關鍵的決定。但現有的一些流處理系統在這兩個問題的處理上都十分有限,大多數流處理系統(如Storm、TimeStream、MapReduce Online等)都是基于純實時的計算模型(a-record-at-a-time,來一條數據就處理一條數據),雖然這個模型能夠有較小的計算時延,但是很難解決節點故障和慢節點的問題。一些傳統的流式處理方法在小規模集群下運行較好,但在大規模情況下卻面臨著實質性的問題。
????Spark Streaming提供了一種抽象的連續數據流,即Discretized Stream(離散流),一個離散流本質上就是一個序列化的RDD(Resilient Distributed Datasets,彈性分布式數據集)。離散流模型利用其并行恢復(ParallelRecovery)解決了節點故障和減輕了慢節點所帶來的問題,還保證了一致性語義。
2. 系統架構
????離散流是Spark Streaming提供的基礎抽象,它代表持續性的數據流,這些數據流既可以從外部源(如Kafka、Flume等)獲取,也可以通過離散流的算子操作來獲得。實質上,離散流由一組時間上連續的RDD組成,每個RDD都包含著一定時間片的數據,如圖5.23所示:
圖5.23Discretized Stream[17]
圖5.24 SparkStreaming 整體架構[18]
?
????如圖5.24所示,這是Spark Streaming系統的整體架構,它將實時的流數據分解成一系列很小的批處理作業。批處理引擎使用的是Spark Core,也就是把輸入數據按照一定的時間片(如1s)分成一段一段數據,每一段數據都會轉換成Spark的RDD輸入到Spark Core中,然后再將離散流的操作轉換為RDD的算子操作,RDD算子操作產生的中間結果會保存在內存中,最后整個流式計算可以將中間結果輸出到外部。
3. 一致性語義和容錯
????對于流式計算,容錯性的重要性在第一小節已經詳細說明過了。首先,我們需要回憶Spark中RDD的容錯機制。RDD是一個彈性不可變的分布式數據集,Spark記錄著確定性的RDD轉換的操作繼承關系(lineage),所以只要輸入的數據是可容錯的,任何一個RDD的分區出錯時,都可以根據lineage對原始輸入數據進行轉換操作,從而重新計算。圖5.25是Spark Streaming的一個RDD繼承關系圖:
圖5.25 統計網頁瀏覽量的lineragegraph[18]
????圖中每個橢圓代表的是一個RDD,橢圓中的每一個圓形是一個RDD的分區,圖中的每一列的所有RDD代表的是一個離散流(圖中一共有3個離散流),間隔[0,1)和[1,2)代表的是不同時間分片,圖中每一行的最后一個RDD代表的是中間結果RDD。
????并行恢復(Parallel Recovery):系統會周期性的checkpoint RDD的數據,異步的備份到其他節點(默認復制數是2),因為RDD是不可變的,所以checkpoint不會鎖住當前時間片的執行。一個Spark Streaming的流式應用,系統會每分鐘對中間結果RDD進行checkpoint。當一個節點發生故障了,系統監測出丟失的RDD,系統會選擇上一個checkpoint的數據來進行重新計算。離散流可以利用充分利用分區的并行性來達到更快的恢復速度:1)與批處理系統很相似的是,每個節點上運行多個task,每一個時間片的轉換操作會在每個節點創建多個RDD分區(例如在100個節點的集群上有1000個RDD分區)。這樣當一個節點發生故障時,可以讓RDD的不同分區并行恢復。2)繼承關系圖(lineage graph)可以使不同時間片的數據并行恢復。如果一個機器節點發生故障,系統在每一個時間片可能丟失一些map操作的輸出,從圖5.26的瀏覽量統計應用的lineage graph可以看出,不同時間片的map可以并行地恢復計算,所以并行恢復的速度是比上游緩存策略更快的。
????慢節點問題:在現有的純實時流處理系統中,基本都沒有解決慢節點的問題。離散流則與批處理系統類似,通過運行慢任務的副本來減輕慢節點帶來的影響。Spark Streaming最開始采用一種簡單的閾值來判斷一個任務是否是慢任務:當一個任務是這個階段(stage)的中間任務運行時間的1.4倍,則判斷這是個慢任務。
????一致性語義:離散流還有一個好處就是提供了強一致性。例如,考慮一個系統統計男女網頁瀏覽量的比例,一個節點統計男性網頁瀏覽量,另一個節點統計女性網頁瀏覽量。如果一個節點落后于另一個節點,那么最終的結果也將有誤。一些系統(如Borealis)利用同步節點來避免這個問題,而Storm就直接忽略了這個問題。而且Storm只能保證一個記錄最少被處理一次,可能存在錯誤記錄被多次處理,這就會使可變更的狀態因更新兩次而導致結果不正確,雖然Storm提供了Trident可以確保每條記錄有且僅被處理一次,但是非常慢且需要用戶去實現。使用離散流可以保證一致性是很明顯的,因為時間被劃分成時間片,每一個時間片的輸出RDD都與這個時間片的輸入和前面時間片有關(參考圖5.26),而RDD是不可變的,因此最終的結果是不會改變的。
4. Apache SparkStreaming編程模型
4.1 數據模型
?
????在第2節我們知道,Spark Streaming就是把數據流劃分為微批交給Spark Core處理的。Spark Core的處理的數據被抽象成了一個RDD,而Spark Streaming的處理數據被抽象成了一系列的DStream。實質上,離散流由一組時間上連續的RDD組成,每個RDD都包含著一定時間片的數據,如圖5.23所示。
?
4.2 計算模型
?
????Spark Streaming的編程模型可以看成是一個批處理Spark Core的編程模型,除了API是調用Spark Streaming的API,很多概念都是一樣的。在Spark Core編寫程序時,只需要指定初始RDD的生成,然后對初始RDD進行一系列轉換的操作,不斷生成新的RDD,最后生成最終的結果RDD。
????Spark Streaming也是類似的計算模型,DStream本質是一組時間上連續的RDD組成的,RDD是依靠著分區(Partition)來保證并行性的。在編寫Spark Streaming程序的時候,我們需要指定初始DStream的輸入源,生成初始的DStream,然后定義一些轉換操作,這些DStream的操作最終都會轉換成RDD的操作,然后在每一個時間片內,可以獲得最終的結果DStream對應的RDD(也可以將結果選擇輸出到外部文件中),可以參考后面單詞計數的實例分析。
PS:關于Spark Core中RDD的編程模型不屬于本章所要講的重點,在這里就不做贅述。
?
4.3 基本操作
?
????從Spark Streaming的系統架構可知,Spark Streaming中對DStream的各種操作,最終會在Spark Core中轉換成RDD的操作,因此對DStream的操作是與Spark Core對RDD的操作是十分類似的。Spark Streaming在其數據模型DStream的模型下,為DStream提供了一系列的操作方法,這些操作大概可以分為3類:普通的轉換操作、窗口轉換操作和輸出操作。常用的普通轉換操作有flatMap、map、filter、reduceByKey、countByKey等操作,并且Spark Streaming支持將DStream的數據輸出到外部系統,如數據庫或文件系統。具體Spark Streaming支持的所有操作,可以到官網查看。
?
4.4 編程模型實例分析
??????
?????? 下面用最基本的wordcount例子來解釋其編程模型,其DStream的轉換如下所示:
???? 圖XXX:單詞計數的DStream轉換圖
????如上圖所示,一共定義了四個離散流,wordCounts的離散流是我們最終要的結果。LinesDStream可以從文件系統、數據庫、kafka等獲取,然后對其進行flatMap操作,將每一行的文本分割成單詞,形成新的離散流words DStream,隨即進行mapToPair操作,將其映射成<word,1>的模式,最后用reduceByKey操作對每個單詞進行計數,得到最終的結果離散流wordCountsDStream。
Java核心代碼如下:
?
//創建SparkConf對象 //與Spark Core的有一點不同,設置Master屬性的時候,使用local模式時, // local后面必須跟一個方括號,里面填寫一個數字,數字代表了用幾個線程執行Spark Streaming程序。 SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("WordCountLocal");//創建SparkStreamingContext對象,還需指定每隔多長時間的數據劃分為一個batch,這里是1s JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(1));//首先,創建一個DStream,代表了從一個數據源(這里是socket)來的持續不斷的實時數據流 JavaReceiverInputDStream<String> lines = jsc.socketTextStream("localhost", 9999);//將一行行的文本用flatMap切分成多個單詞,words DStream的RDD元素類型為一個個單詞 JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String line) throws Exception {return Arrays.asList(line.split(" ")).iterator();} });/ //接著開始進行mapToPair操作,將單詞映射成<word,1>的pair格式,得到離散流pairs JavaPairDStream<String,Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String word) throws Exception {return new Tuple2<String, Integer>(word,1);} });//對離散流pairs進行reduceByKey操作,進行單詞計數,得到wordCounts離散流 JavaPairDStream<String,Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;} }); //最后,每次計算完,就打印這一秒鐘的單詞計數情況 wordCounts.print(); //必須調用JavaStreamingContext的start()方法,整個Java Streaming Application才會啟動執行 //否則,不會執行 jsc.start(); try {jsc.awaitTermination();//等待應用程序的終止,可以使用CTRL+C手動停止//也可以通過調用JavaStreamingContext的stop()方法來終止程序 } catch (InterruptedException e) {e.printStackTrace(); } jsc.close();
總結
以上是生活随笔為你收集整理的Apache SparkStreaming 简介和编程模型的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Apache Flink 简介和编程模型
- 下一篇: 大数据技术:Zookeeper分布式协调