日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark Streaming原理简析

發(fā)布時間:2023/12/18 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark Streaming原理简析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

執(zhí)行流程

數據的接收

StreamingContext實例化的時候,需要傳入一個SparkContext,然后指定要連接的spark matser url,即連接一個spark engine,用于獲得executor

實例化之后,首先,要指定一個接收數據的方式,如

val lines = ssc.socketTextStream("localhost", 9999)

    • 1

      這樣從socket接收文本數據。這個步驟返回的是一個ReceiverInputDStream的實現,內含Receiver,可接收數據并轉化為RDD放內存里。

      ReceiverInputDStream有一個需要子類實現的方法

      def getReceiver(): Receiver[T]

    • 1

      子類實現這個方法,worker節(jié)點調用后能得到Receiver,使得數據接收的工作能分布到worker上。

      如果是local跑,由于Receiver接收數據在本地,所以在啟動streaming application的時候,要注意分配的core數目要大于Receiver數目,才能騰出cpu做計算任務的調度。

      Receiver需要子類實現

      def onStart()def onStop()

    • 1
    • 2

      來定義一個數據接收器的初始化、接收到數據后如何存、如何在結束的時候釋放資源。

      Receiver提供了一系列store()接口,如store(ByteBuffer)store(Iterator)等等。這些store接口是實現好了的,會由worker節(jié)點上初始化的ReceiverSupervisor來完成這些存儲功能。ReceiverSupervisor還會對Receiver做監(jiān)控,如監(jiān)控是否啟動了、是否停止了、是否要重啟、匯報error等等。

      ReceiverSupervisor的存儲接口的實現,借助的是BlockManager,數據會以RDD的形式被存放,根據StorageLevel選擇不同存放策略。默認是序列化后存內存,放不下的話寫磁盤(executor)。被計算出來的RDD中間結果,默認存放策略是序列化后只存內存。

      ReceiverSupervisor在做putBlock操作的時候,會首先借助BlockManager存好數據,然后往ReceiverTracker發(fā)送一個AddBlock的消息。ReceiverTracker內部的ReceivedBlockTracker用于維護一個receiver接收到的所有block信息,即BlockInfo,所以AddBlock會把信息存放在ReceivedBlockTracker里。未來需要計算的時候,ReceiverTracker根據streamId,從ReceivedBlockTracker取出對應的block列表。

      RateLimiter幫助控制Receiver速度,spark.streaming.receiver.maxRate參數。

      數據源方面,普通的數據源為file, socket, akka, RDDs。高級數據源為Twitter, Kafka, Flume等。開發(fā)者也可以自己定制數據源。

      任務調度

      JobSchedulercontext里初始化。當context start的時候,觸發(fā)schedulerstart

      schedulerstart觸發(fā)了ReceiverTrackerJobGeneratorstart。這兩個類是任務調度的重點。前者在worker上啟動Receiver接收數據,并且暴露接口能夠根據streamId獲得對應的一批Block地址。后者基于數據和時間來生成任務描述。

      JobScheduler內含一個線程池,用于調度任務執(zhí)行。spark.streaming.concurrentJobs可以控制job并發(fā)度,默認是1,即它只能一個一個提job

      job來自JobGenerator生成的JobSetJobGenerator根據時間,生成job并且執(zhí)行cp

      JobGenerator的生成job邏輯:

      - 調用ReceiverTrackerallocateBlocksToBatch方法,為本批數據分配好block,即準備好數據

      - 間接調用DStreamgenerateJob(time)方法,制造可執(zhí)行的RDD

      DStream切分RDD和生成可執(zhí)行的RDD,即getOrCompute(time)

      - 如果這個時間點的RDD已經生成好了,那么從內存hashmap里拿出來,否則下一步

      - 如果時間是批次間隔的整數倍,則下一步,否則這個時間點不切

      - 調用DStream的子類的compute方法,得到RDD。可能是一個RDD,也可以是個RDD列表

      - 對每個RDD,調用persist方法,制定默認的存儲策略。如果時間點合適,同時調用RDDcheckpoint方法,制定好cp策略

      - 得到這些RDD后,調用SparkContext.runJob(rdd, emptyFunction)。把這整個變成一個function,生成Job類。未來會在executor上觸發(fā)其runJob

      JobGenerator成功生成job后,調用JobScheduler.submitJobSet(JobSet)JobScheduler會使用線程池提交JobSet中的所有job。該方法調用結束后,JobGenerator發(fā)送一個DoCheckpoint的消息,注意這里的cpdriver端元數據的cp,而不是RDD本身的cp。如果time合適,會觸發(fā)cp操作,內部的CheckpointWriter類會完成write(streamingContext, time)

      JobScheduler提交job的線程里,觸發(fā)了jobrun()方法,同時,job跑完后,JobScheduler處理JobCompleted(job)。如果job跑成功了,調用JobSethandleJobCompletion(Job),做些計時和數數工作,如果整個JobSet完成了,調用JobGeneratoronBatchCompletion(time)方法,JobGenerator接著會做clearMetadata的工作,然后JobScheduler打印輸出;如果job跑失敗了,JobScheduler匯報error,最后會在context里拋異常。

      更多說明

      特殊操作

    • transform:可以與外部RDD交互,比如做維表的join
    • updateStateByKey:生成StateDStream,比如做增量計算。WordCount例子
    • 每一批都需要與增量RDD進行一次cogroup之后,然后執(zhí)行update function。兩個RDDcogroup過程有些開銷:RDD[K, V]RDD[K, U]合成RDD[K, List[V], List[U]]List[U]一般size1,理解為oldvalue,即RDD[K, batchValueList, Option[oldValue]]。然后update function處理完,變成RDD[K, newValue]
    • 批與批之間嚴格有序,即增量合并操作,是有序的,批之間沒發(fā)并發(fā)
    • 增量RDD的分區(qū)數可以開大,即這步增量的計算可以調大并發(fā)
    • windowbatch sizewindow length, sliding interval三個參數組成的滑窗操作。把多個批次的RDD合并成一個UnionRDD進行計算。
    • foreachRDD: 這個操作是一個輸出操作,比較特殊。


      /**

      * Apply a function to each RDD in this DStream. This is an output operator, so

      * 'this' DStream will be registered as an output stream and therefore materialized.

      */


      def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) { new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false)).register()

      }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

      DStream.foreachRDD()操作使開發(fā)者可以直接控制RDD的計算邏輯,而不是通過DStream映射過去。所以借助這個方法,可以實現MLlib, Spark SQLStreaming的集合,如:結合Spark SQLDataFrameWordcount

      Cache

      如果是window操作,默認接收的數據都persist在內存里。

      如果是flume, kafka源頭,默認接收的數據replicate成兩份存起來。

      Checkpoint

      state有關的流計算,計算出來的結果RDD,會被cpHDFS上,原文如下:

      Data checkpointing - Saving of the generated RDDs to reliable storage. This is necessary in some stateful transformations that combine data across multiple batches. In such transformations, the generated RDDs depends on RDDs of previous batches, which causes the length of the dependency chain to keep increasing with time. To avoid such unbounded increase in recovery time (proportional to dependency chain), intermediate RDDs of stateful transformations are periodically checkpointed to reliable storage (e.g. HDFS) to cut off the dependency chains.

      cp的時間間隔也可以設定,可以多批做一次cp

      cp的操作是同步的。

      簡單的不帶state操作的流任務,可以不開啟cp

      driver端的metadata也有cp策略。driver cp的時候是將整個StreamingContext對象寫到了可靠存儲里

轉載于:https://www.cnblogs.com/breg/p/4794780.html

總結

以上是生活随笔為你收集整理的Spark Streaming原理简析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。