日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Apache Spark源码走读之4 -- DStream实时流数据处理

發布時間:2023/12/10 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Apache Spark源码走读之4 -- DStream实时流数据处理 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

歡迎轉載,轉載請注明出處,徽滬一郎。

Spark Streaming能夠對流數據進行近乎實時的速度進行數據處理。采用了不同于一般的流式數據處理模型,該模型使得Spark Streaming有非常高的處理速度,與storm相比擁有更高的吞能力。

本篇簡要分析Spark Streaming的處理模型,Spark Streaming系統的初始化過程,以及當接收到外部數據時后續的處理步驟。

系統概述

流數據的特點

與一般的文件(即內容已經固定)型數據源相比,所謂的流數據擁有如下的特點

  • 數據一直處在變化中
  • 數據無法回退
  • 數據一直源源不斷的涌進
  • DStream

    如果要用一句話來概括Spark Streaming的處理思路的話,那就是"將連續的數據持久化,離散化,然后進行批量處理"。

    讓我們來仔細分析一下這么作的原因。

    • 數據持久化?將從網絡上接收到的數據先暫時存儲下來,為事件處理出錯時的事件重演提供可能,
    • 離散化?數據源源不斷的涌進,永遠沒有一個盡頭,就像周星馳的喜劇中所說“崇拜之情如黃河之水綿綿不絕,一發而不可收拾”。既然不能窮盡,那么就將其按時間分片。比如采用一分鐘為時間間隔,那么在連續的一分鐘內收集到的數據集中存儲在一起。
    • 批量處理?將持久化下來的數據分批進行處理,處理機制套用之前的RDD模式

    DStream可以說是對RDD的又一層封裝。如果打開DStream.scala和RDD.scala,可以發現幾乎RDD上的所有operation在DStream中都有相應的定義。

    作用于DStream上的operation分成兩類

  • Transformation
  • Output 表示將輸出結果,目前支持的有print, saveAsObjectFiles, saveAsTextFiles, saveAsHadoopFiles
  • DStreamGraph

    有輸入就要有輸出,如果沒有輸出,則前面所做的所有動作全部沒有意義,那么如何將這些輸入和輸出綁定起來呢?這個問題的解決就依賴于DStreamGraph,DStreamGraph記錄輸入的Stream和輸出的Stream。

    private val inputStreams = new ArrayBuffer[InputDStream[_]]()private val outputStreams = new ArrayBuffer[DStream[_]]() var rememberDuration: Duration = null var checkpointInProgress = false

    outputStreams中的元素是在有Output類型的Operation作用于DStream上時自動添加到DStreamGraph中的。

    outputStream區別于inputStream一個重要的地方就是會重載generateJob.

    初始化流程

    StreamingContext

    StreamingContext是Spark Streaming初始化的入口點,主要的功能是根據入參來生成JobScheduler

    設定InputStream

    如果流數據源來自于socket,則使用socketStream。如果數據源來自于不斷變化著的文件,則可使用fileStream

    提交運行

    StreamingContext.start()

    ?

    數據處理

    以socketStream為例,數據來自于socket。

    SocketInputDstream啟動一個線程,該線程使用receive函數來接收數據

    def receive() { var socket: Socket = null try { logInfo("Connecting to " + host + ":" + port) socket = new Socket(host, port) logInfo("Connected to " + host + ":" + port) val iterator = bytesToObjects(socket.getInputStream()) while(!isStopped && iterator.hasNext) { store(iterator.next) } logInfo("Stopped receiving") restart("Retrying connecting to " + host + ":" + port) } catch { case e: java.net.ConnectException => restart("Error connecting to " + host + ":" + port, e) case t: Throwable => restart("Error receiving data", t) } finally { if (socket != null) { socket.close() logInfo("Closed socket to " + host + ":" + port) } } } }

    接收到的數據會被先存儲起來,存儲最終會調用到BlockManager.scala中的函數,那么BlockManager是如何被傳遞到StreamingContext的呢?利用SparkEnv傳入的,注意StreamingContext構造函數的入參。

    處理定時器

    數據的存儲有是被socket觸發的。那么已經存儲的數據被真正的處理又是被什么觸發的呢?

    記得在初始化StreamingContext的時候,我們指定了一個時間參數,那么用這個參數會構造相應的重復定時器,一旦定時器超時,調用generateJobs函數。

    private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventActor ! GenerateJobs(new Time(longTime)), "JobGenerator")

    事件處理函數

    /** Processes all events */ private def processEvent(event: JobGeneratorEvent) { logDebug("Got event " + event) event match { case GenerateJobs(time) => generateJobs(time) case ClearMetadata(time) => clearMetadata(time) case DoCheckpoint(time) => doCheckpoint(time) case ClearCheckpointData(time) => clearCheckpointData(time) } }

    generteJobs

    private def generateJobs(time: Time) { SparkEnv.set(ssc.env) Try(graph.generateJobs(time)) match { case Success(jobs) => val receivedBlockInfo = graph.getReceiverInputStreams.map { stream => val streamId = stream.id val receivedBlockInfo = stream.getReceivedBlockInfo(time) (streamId, receivedBlockInfo) }.toMap jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } eventActor ! DoCheckpoint(time) }

    ?generateJobs->generateJob一路下去會調用到Job.run,在job.run中調用sc.runJob,在具體調用路徑就不一一列出。

    private class JobHandler(job: Job) extends Runnable { def run() { eventActor ! JobStarted(job) job.run() eventActor ! JobCompleted(job) } }

    DStream.generateJob函數中定義了jobFunc,也就是在job.run()中使用到的jobFunc

    private[streaming] def generateJob(time: Time): Option[Job] = {getOrCompute(time) match {case Some(rdd) => {val jobFunc = () => { val emptyFunc = { (iterator: Iterator[T]) => {} } context.sparkContext.runJob(rdd, emptyFunc) } Some(new Job(time, jobFunc)) } case None => None } }

    在這個流程中,DStreamGraph起到非常關鍵的作用,非常類似于TridentStorm中的graph.

    在generateJob過程中,DStream會通過調用compute函數生成相應的RDD,SparkContext則是將基于RDD的抽象轉換成為多個stage,而執行。

    StreamingContext中一個重要的轉換就是DStream到RDD的轉換,而SparkContext中一個重要的轉換是RDD到Stage及Task的轉換。在這兩個不同的抽象類中,要注意其中getOrCompute和compute函數的實現。

    小結

    本篇內容有點倉促,內容不夠豐富翔實,爭取回頭有空的時候再好好豐富一下具體的調用路徑。

    對于容錯處理機制,本文沒有涉及,待研究明白之后另起一篇進行闡述。

    轉載于:https://www.cnblogs.com/downtjs/p/3815291.html

    總結

    以上是生活随笔為你收集整理的Apache Spark源码走读之4 -- DStream实时流数据处理的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。