Spark学习之Spark Streaming
一、簡(jiǎn)介
許多應(yīng)用需要即時(shí)處理收到的數(shù)據(jù),例如用來(lái)實(shí)時(shí)追蹤頁(yè)面訪問(wèn)統(tǒng)計(jì)的應(yīng)用、訓(xùn)練機(jī)器學(xué)習(xí)模型的應(yīng)用,還有自動(dòng)檢測(cè)異常的應(yīng)用。Spark Streaming 是 Spark 為這些應(yīng)用而設(shè)計(jì)的模型。它允許用戶使用一套和批處理非常接近的 API 來(lái)編寫(xiě)流式計(jì)算應(yīng)用,這樣就可以大量重用批處理應(yīng)用的技術(shù)甚至代碼。
和 Spark 基于 RDD 的概念很相似,Spark Streaming 使用離散化流(discretized stream)作為抽象表示,叫作 DStream。DStream 是隨時(shí)間推移而收到的數(shù)據(jù)的序列。在內(nèi)部,每個(gè)時(shí)間區(qū)間收到的數(shù)據(jù)都作為 RDD 存在,而 DStream 是由這些 RDD 所組成的序列(因此得名“離散化”)。DStream 可以從各種輸入源創(chuàng)建,比如 Flume、Kafka 或者 HDFS。創(chuàng)建出來(lái)的 DStream 支持兩種操作,一種是轉(zhuǎn)化操作(transformation),會(huì)生成一個(gè)新的DStream,另一種是輸出操作(output operation),可以把數(shù)據(jù)寫(xiě)入外部系統(tǒng)中。DStream提供了許多與 RDD 所支持的操作相類似的操作支持,還增加了與時(shí)間相關(guān)的新操作,比如滑動(dòng)窗口。
和批處理程序不同,Spark Streaming 應(yīng)用需要進(jìn)行額外配置來(lái)保證 24/7 不間斷工作。Spark Streaming 的檢查點(diǎn)(checkpointing)機(jī)制,也就是把數(shù)據(jù)存儲(chǔ)到可靠文件系統(tǒng)(比如 HDFS)上的機(jī)制,這也是 Spark Streaming 用來(lái)實(shí)現(xiàn)不間斷工作的主要方式。
二、一個(gè)簡(jiǎn)單的例子
我們會(huì)從一臺(tái)服務(wù)器的 9999 端口上實(shí)時(shí)輸入數(shù)據(jù),并在控制臺(tái)打印出來(lái)。
首先,你得有一個(gè)nc軟件,因?yàn)槲沂窃趙indow下運(yùn)行程序的,但是在Linux系統(tǒng)里面就不需要,Linux里面有內(nèi)置的nc命令。
nc軟件的用法:
開(kāi)一個(gè)命令行窗口(這里要切換到nc軟件的路徑下): 服務(wù)端:nc –lp 9999 //客戶端:nc localhost 9999nc軟件啟動(dòng)成功的界面:
然后就是一個(gè)簡(jiǎn)單的Spark Streaming的代碼:
import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.Duration import org.apache.spark.streaming.Secondsobject Test {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("test").setMaster("local[4]") // 從SparkConf創(chuàng)建StreamingContex并指定4秒鐘的批處理大小// 用來(lái)指定多長(zhǎng)時(shí)間處理一次新數(shù)據(jù)的批次間隔(batch interval)作為輸入val ssc = new StreamingContext(conf,Seconds(4))// 連接到本地機(jī)器9999端口val lines = ssc.socketTextStream("localhost", 9999)lines.print()// 啟動(dòng)流式計(jì)算環(huán)境StreamingContext并等待它"完成"ssc.start()// 等待作業(yè)完成ssc.awaitTermination()} }連接成功的界面:
然后我在剛才的界面輸入"Hello world",然后就會(huì)在控制臺(tái)界面打印出來(lái)。
三、架構(gòu)與抽象
Spark Streaming 使用“微批次”的架構(gòu),把流式計(jì)算當(dāng)作一系列連續(xù)的小規(guī)模批處理來(lái)對(duì)待。Spark Streaming 從各種輸入源中讀取數(shù)據(jù),并把數(shù)據(jù)分組為小的批次。新的批次按均勻的時(shí)間間隔創(chuàng)建出來(lái)。在每個(gè)時(shí)間區(qū)間開(kāi)始的時(shí)候,一個(gè)新的批次就創(chuàng)建出來(lái),在該區(qū)間內(nèi)收到的數(shù)據(jù)都會(huì)被添加到這個(gè)批次中。在時(shí)間區(qū)間結(jié)束時(shí),批次停止增長(zhǎng)。時(shí)間區(qū)間的大小是由批次間隔這個(gè)參數(shù)決定的。批次間隔一般設(shè)在 500 毫秒到幾秒之間,由應(yīng)用開(kāi)發(fā)者配置。每個(gè)輸入批次都形成一個(gè) RDD,以 Spark 作業(yè)的方式處理并生成其他的 RDD。處理的結(jié)果可以以批處理的方式傳給外部系統(tǒng)。
?
?
四、檢查點(diǎn)機(jī)制
Spark Streaming 對(duì) DStream 提供的容錯(cuò)性與 Spark 為 RDD 所提供的容錯(cuò)性一致:只要輸入數(shù)據(jù)還在,它就可以使用 RDD 譜系重算出任意狀態(tài)(比如重新執(zhí)行處理輸入數(shù)據(jù)的操作)。默認(rèn)情況下,收到的數(shù)據(jù)分別存在于兩個(gè)節(jié)點(diǎn)上,這樣 Spark 可以容忍一個(gè)工作節(jié)點(diǎn)的故障。不過(guò),如果只用譜系圖來(lái)恢復(fù)的話,重算有可能會(huì)花很長(zhǎng)時(shí)間,因?yàn)樾枰幚韽某绦騿?dòng)以來(lái)的所有數(shù)據(jù)。因此,Spark Streaming 也提供了檢查點(diǎn)機(jī)制,可以把狀態(tài)階段性地存儲(chǔ)到可靠文件系統(tǒng)中(例如 HDFS 或者 S3)。一般來(lái)說(shuō),你需要每處理 5-10 個(gè)批次的數(shù)據(jù)就保存一次。在恢復(fù)數(shù)據(jù)時(shí),Spark Streaming 只需要回溯到上一個(gè)檢查點(diǎn)即可。
如果流計(jì)算應(yīng)用中的驅(qū)動(dòng)器程序崩潰了,還可以重啟驅(qū)動(dòng)器程序并讓驅(qū)動(dòng)器程序從檢查點(diǎn)恢復(fù),這樣 Spark Streaming 就可以讀取之前運(yùn)行的程序處理數(shù)據(jù)的進(jìn)度,并從那里繼續(xù)。
ssc.checkpoint("hdfs://...")五、轉(zhuǎn)化操作
DStream 的轉(zhuǎn)化操作可以分為無(wú)狀態(tài)(stateless)和有狀態(tài)(stateful)兩種。
? 在無(wú)狀態(tài)轉(zhuǎn)化操作中,每個(gè)批次的處理不依賴于之前批次的數(shù)據(jù)。常見(jiàn)的RDD轉(zhuǎn)化操作,例如 map() 、 filter() 、 reduceByKey() 等,都是無(wú)狀態(tài)轉(zhuǎn)化操作,無(wú)狀態(tài)轉(zhuǎn)化操作是分別應(yīng)用到每個(gè) RDD 上的。
? 相對(duì)地,有狀態(tài)轉(zhuǎn)化操作需要使用之前批次的數(shù)據(jù)或者是中間結(jié)果來(lái)計(jì)算當(dāng)前批次的數(shù)據(jù)。有狀態(tài)轉(zhuǎn)化操作包括基于滑動(dòng)窗口的轉(zhuǎn)化操作和追蹤狀態(tài)變化的轉(zhuǎn)化操作。
DStream 的有狀態(tài)轉(zhuǎn)化操作是跨時(shí)間區(qū)間跟蹤數(shù)據(jù)的操作;也就是說(shuō),一些先前批次的數(shù)據(jù)也被用來(lái)在新的批次中計(jì)算結(jié)果。主要的兩種類型是滑動(dòng)窗口和 updateStateByKey() ,前者以一個(gè)時(shí)間階段為滑動(dòng)窗口進(jìn)行操作,后者則用來(lái)跟蹤每個(gè)鍵的狀態(tài)變化(例如構(gòu)建一個(gè)代表用戶會(huì)話的對(duì)象)。有狀態(tài)轉(zhuǎn)化操作需要在你的 StreamingContext 中打開(kāi)檢查點(diǎn)機(jī)制來(lái)確保容錯(cuò)性。
所有基于窗口的操作都需要兩個(gè)參數(shù),分別為窗口時(shí)長(zhǎng)以及滑動(dòng)步長(zhǎng),兩者都必須是StreamContext 的批次間隔的整數(shù)倍。窗口時(shí)長(zhǎng)控制每次計(jì)算最近的多少個(gè)批次的數(shù)據(jù),其實(shí)就是最近的 windowDuration/batchInterval 個(gè)批次。如果有一個(gè)以 10 秒為批次間隔的源DStream,要?jiǎng)?chuàng)建一個(gè)最近 30 秒的時(shí)間窗口(即最近 3 個(gè)批次),就應(yīng)當(dāng)把 windowDuration設(shè)為 30 秒。而滑動(dòng)步長(zhǎng)的默認(rèn)值與批次間隔相等,用來(lái)控制對(duì)新的 DStream 進(jìn)行計(jì)算的間隔。如果源 DStream 批次間隔為 10 秒,并且我們只希望每?jī)蓚€(gè)批次計(jì)算一次窗口結(jié)果,就應(yīng)該把滑動(dòng)步長(zhǎng)設(shè)置為 20 秒。
對(duì) DStream 可以用的最簡(jiǎn)單窗口操作是 window() ,它返回一個(gè)新的 DStream 來(lái)表示所請(qǐng)求的窗口操作的結(jié)果數(shù)據(jù)。換句話說(shuō), window() 生成的 DStream 中的每個(gè) RDD 會(huì)包含多個(gè)批次中的數(shù)據(jù),可以對(duì)這些數(shù)據(jù)進(jìn)行 count() 、 transform() 等操作。
lines.window(windowDuration, slideDuration) lines.reduceByWindow(reduceFunc, windowDuration, slideDuration)
有時(shí),我們需要在 DStream 中跨批次維護(hù)狀態(tài)(例如跟蹤用戶訪問(wèn)網(wǎng)站的會(huì)話)。針對(duì)這種情況, updateStateByKey() 為我們提供了對(duì)一個(gè)狀態(tài)變量的訪問(wèn),用于鍵值對(duì)形式的DStream。給定一個(gè)由(鍵,事件)對(duì)構(gòu)成的 DStream,并傳遞一個(gè)指定如何根據(jù)新的事件更新每個(gè)鍵對(duì)應(yīng)狀態(tài)的函數(shù),它可以構(gòu)建出一個(gè)新的 DStream,其內(nèi)部數(shù)據(jù)為(鍵,狀態(tài))對(duì)。例如,在網(wǎng)絡(luò)服務(wù)器日志中,事件可能是對(duì)網(wǎng)站的訪問(wèn),此時(shí)鍵是用戶的 ID。使用updateStateByKey() 可以跟蹤每個(gè)用戶最近訪問(wèn)的 10 個(gè)頁(yè)面。這個(gè)列表就是“狀態(tài)”對(duì)象,我們會(huì)在每個(gè)事件到來(lái)時(shí)更新這個(gè)狀態(tài)。
六、輸出輸入操作
輸出操作指定了對(duì)流數(shù)據(jù)經(jīng)轉(zhuǎn)化操作得到的數(shù)據(jù)所要執(zhí)行的操作(例如把結(jié)果推入外部數(shù)據(jù)庫(kù)或輸出到屏幕上)。與 RDD 中的惰性求值類似,如果一個(gè) DStream 及其派生出的 DStream都沒(méi)有被執(zhí)行輸出操作,那么這些 DStream 就都不會(huì)被求值。如果StreamingContext 中沒(méi)有設(shè)定輸出操作,整個(gè) context 就都不會(huì)啟動(dòng)。
在 Scala 中將 DStream 保存為文本文件
ipAddressRequestCount.saveAsTextFiles("outputDir", "txt")因?yàn)?Spark 支持從任意 Hadoop 兼容的文件系統(tǒng)中讀取數(shù)據(jù),所以 Spark Streaming 也就支持從任意 Hadoop 兼容的文件系統(tǒng)目錄中的文件創(chuàng)建數(shù)據(jù)流。
val line = ssc.textFileStream("directory")這篇博文主要來(lái)自《Spark快速大數(shù)據(jù)分析》這本書(shū)里面的第十章,內(nèi)容有刪減,還有本書(shū)的一些代碼的實(shí)驗(yàn)結(jié)果。
?
轉(zhuǎn)載于:https://www.cnblogs.com/xiaoyh/p/10791245.html
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來(lái)咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)總結(jié)
以上是生活随笔為你收集整理的Spark学习之Spark Streaming的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: cnpm install -g liv
- 下一篇: java实现多线程的4种方式