日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

sparkstreaming监听hdfs目录如何终止_Spark笔试题:Spark Streaming 反压机制

發布時間:2025/3/21 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 sparkstreaming监听hdfs目录如何终止_Spark笔试题:Spark Streaming 反压机制 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Spark Streaming 反壓機制是1.5版本推出的特性,用來解決處理速度比攝入速度慢的情況,簡單來講就是做流量控制。當批處理時間(Batch Processing Time)大于批次間隔(Batch Interval,即BatchDuration)時,說明處理數據的速度小于數據攝入的速度,持續時間過?或源頭數據暴增,容易造成 數據在內存中堆積,最終導致Executor OOM。反壓就是來解決這個問題的。
spark streaming的消費數據源方式有兩種:
1. 若是基于Receiver的數據源,可以通過設置spark.streaming.receiver.maxRate來控制最大輸入 速率;
2. 若是基于Direct的數據源(如Kafka Direct Stream),則可以通過設置spark.streaming.kafka.maxRatePerPartition來控制最大輸入速率。


當然,在事先經過壓測,且流量高峰不會超過預期的情況下,設置這些參數一般沒什么問題。但最大值不代表是最優值,最好還能根據每個批次處理情況來動態預估下個批次最優速率。


在Spark 1.5.0以上,就可通過背壓機制來實現。開啟反壓機制,即設置spark.streaming.backpressure.enabled為true,Spark Streaming會自動根據處理能力來調整輸入速率,從而在流量高峰時仍能保證最大的吞吐和性能


Spark Streaming的反壓機制中,有以下幾個重要的組件:

第一:RateController
RateController 組件是 JobScheduler 的監聽器,主要監聽集群所有作業的提交、運行、完成情況,并從 BatchInfo 實例中獲取以下信息,交給速率估算器(RateEstimator)做速率的估算。

  • 當前批次任務處理完成的時間戳 (processingEndTime)
  • 該批次從第一個 job 到最后一個 job 的實際處理時? (processingDelay)
  • 該批次的調度時延,即從被提交到 JobScheduler 到第一個 job 開始處理的時?
    (schedulingDelay)
  • 該批次輸入數據的總條數(numRecords)

第二:RateEstimator
Spark 2.x 只支持基于 PID 的速率估算器,這里只討論這種實現?;?PID 的速率估算器簡單地說就是它把收集到的數據(當前批次速率)和一個設定值(上一批次速率)進行比較,然后用它們 之間的差計算新的輸入值,估算出一個合適的用于下一批次的流量閾值。這里估算出來的值就是流 量的閾值,用于更新每秒能夠處理的最大記錄數

第三:RateLimiter

RateController和RateEstimator組件都是在Driver端用于更新最大速度的,而RateLimiter是用于接收到Driver的更新通知之后更新Executor的最大處理速率的組件。RateLimiter是一個抽象類,它并不是Spark本身實現 的,而是借助了第三方Google的GuavaRateLimiter來產生的。它實質上是一個限流器,也可以叫 做令牌,如果Executor中task每秒計算的速度大于該值則阻塞,如果小于該值則通過,將流數據加 入緩存中進行計算。

*反壓機制真正起作用時需要至少處理一個批:由于反壓機制需要根據當前批的速率,預估新批的速率,所以反壓機制真正起作用前,應至少保證處理一個批。*如何保證反壓機制真正起作用前應用不會崩潰:要保證反壓機制真正起作用前應用不會崩潰,需要控制每個批次最大攝入速率。若為Direct Stream,如Kafka Direct Stream,則可以通過spark.streaming.kafka.maxRatePerPartition參數來控制。此參數代表了每秒每個分區最大攝入的數據條數。假設BatchDuration為10秒,spark.streaming.kafka.maxRatePerPartition為12條,kafka topic分區數為3個,則一個批(Batch)最大讀取的數據條數為360條(3*12*10=360)。同時,需要注意,該參數也代表了整個應用生命周期中的最大速率,即使是背壓調整的最大值也不會超過該參數。

反壓相關的參數

總結

以上是生活随笔為你收集整理的sparkstreaming监听hdfs目录如何终止_Spark笔试题:Spark Streaming 反压机制的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。