日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

Spark ListenerBus 和 MetricsSystem 体系分析

發(fā)布時(shí)間:2023/11/27 生活经验 47 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark ListenerBus 和 MetricsSystem 体系分析 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

轉(zhuǎn)載自:https://yq.aliyun.com/articles/60196

摘要:?Spark 事件體系的中樞是ListenerBus,由該類接受Event并且分發(fā)給各個(gè)Listener。MetricsSystem 則是一個(gè)為了衡量系統(tǒng)的各種指標(biāo)的度量系統(tǒng)。Listener可以是MetricsSystem的信息來源之一。他們之間總體是一個(gè)互相補(bǔ)充的關(guān)系。

前言

監(jiān)控是一個(gè)大系統(tǒng)完成后最重要的一部分。Spark整個(gè)系統(tǒng)運(yùn)行情況是由ListenerBus以及MetricsSystem 來完成的。這篇文章重點(diǎn)分析他們之間的工作機(jī)制以及如何通過這兩個(gè)系統(tǒng)完成更多的指標(biāo)收集。

ListenerBus 是如何工作的

Spark的事件體系是如何工作的呢?我們先簡要描述下,讓大家有個(gè)大概的了解。 首先,大部分類都會(huì)引入一個(gè)對(duì)象叫l(wèi)istenerBus,這個(gè)類具體是什么得看實(shí)現(xiàn),但是都一定繼承自org.apache.spark.util.ListenerBus. 假設(shè)我們要提交一個(gè)任務(wù)集。這個(gè)動(dòng)作可能會(huì)很多人關(guān)心,我就是使用listenerBus把Event發(fā)出去,類似下面的第二行代碼。
 def submitJobSet(jobSet: JobSet) {listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))    jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))logInfo("Added jobs for time " + jobSet.time)}
listenerBus里已經(jīng)注冊(cè)了很多監(jiān)聽者,我們叫l(wèi)istener,通常listenerBus 會(huì)啟動(dòng)一個(gè)線程異步的調(diào)用這些listener去消費(fèi)這個(gè)Event。而所謂的消費(fèi),其實(shí)就是觸發(fā)事先設(shè)計(jì)好的回調(diào)函數(shù)來執(zhí)行譬如信息存儲(chǔ)等動(dòng)作。? 這就是整個(gè)listenerBus的工作方式。這里我們看到,其實(shí)類似于埋點(diǎn),這是有侵入性的,每個(gè)你需要關(guān)注的地方,如果想讓人知曉,就都需要發(fā)出一個(gè)特定的Event。

ListenerBus 分析

特定實(shí)現(xiàn) <   AsynchronousListenerBus  < ListenerBus
特定實(shí)現(xiàn) <   SparkListenerBus  < ListenerBus

這里的特定實(shí)現(xiàn)有:

 *  StreamingListenerBus extends  AsynchronousListenerBus *  LiveListenerBus extends  AsynchronousListenerBus  with SparkListenerBus*  ReplayListenerBus extends SparkListenerBus
AsynchronousListenerBus 內(nèi)部維護(hù)了一個(gè)queue,事件都會(huì)先放到這個(gè)queue,然后通過一個(gè)線程來讓Listener處理Event。 SparkListenerBus 也是一個(gè)trait,但是里面有個(gè)具體的實(shí)現(xiàn),預(yù)先定義了onPostEvent 方法對(duì)一些特定的事件做了處理。 其他更下面的類則根據(jù)需要混入或者繼承SparkListenerBus ,AsynchronousListenerBus來完成他們需要的功能。 不同的ListenerBus 需要不同的Event 集 和Listener,比如你看StreamingListenerBus的簽名,就知道所有的Event都必須是StreamingListenerEvent,所有的Listener都必須是StreamingListener。
  StreamingListenerBus  extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent]

Listener(監(jiān)聽器)

通常而言,Listener 是有狀態(tài)的,一般接受到一個(gè)Event后,可能就會(huì)更新內(nèi)部的某個(gè)數(shù)據(jù)結(jié)構(gòu)。以 org.apache.spark.streaming.ui.StreamingJobProgressListener為例,他是一個(gè)StreamingListener,內(nèi)部就含有一些存儲(chǔ)結(jié)構(gòu),譬如:
 private val waitingBatchUIData = new HashMap[Time, BatchUIData]private val runningBatchUIData = new HashMap[Time, BatchUIData]

看申明都是普通的 HashMap ,所以操作是需要做synchronized操作。如下

override def onReceiverError(receiverError: StreamingListenerReceiverError) {synchronized {receiverInfos(receiverError.receiverInfo.streamId) = receiverError.receiverInfo}}

MetricsSystem介紹

MetricsSystem 比較好理解,一般是為了衡量系統(tǒng)的各種指標(biāo)的度量系統(tǒng)。算是一個(gè)key-value形態(tài)的東西。舉個(gè)比較簡單的例子,我怎么把當(dāng)前JVM相關(guān)信息展示出去呢?做法自然很多,通過MetricsSystem就可以做的更標(biāo)準(zhǔn)化些,具體方式如下:
  1. Source 。數(shù)據(jù)來源。比如對(duì)應(yīng)的有org.apache.spark.metrics.source.JvmSource
  2. Sink。 ?數(shù)據(jù)發(fā)送到哪去。有被動(dòng)和主動(dòng)。一般主動(dòng)的是通過定時(shí)器來完成輸出,譬如CSVSink,被動(dòng)的如MetricsServlet等需要被用戶主動(dòng)調(diào)用。
  3. 橋接Source 和Sink的則是MetricRegistry了。
Spark 并沒有實(shí)現(xiàn)底層Metrics的功能,而是使用了一個(gè)第三方庫:http://metrics.codahale.com?。感興趣大家可以看看,有個(gè)更完整的認(rèn)識(shí)。

如何配置MetricsSystem

MetricsSystem的配置有兩種,第一種是 metrics.properties 配置文件的形態(tài)。第二種是通過spark conf完成,參數(shù)以spark.metrics.conf.開頭 。 我這里簡單介紹下第二種方式。 比如我想查看JVM的信息,包括GC和Memory的使用情況,則我通過類似?
 conf.set("spark.metrics.conf.driver.source.jvm.class","org.apache.spark.metrics.source.JvmSource")
默認(rèn)情況下,MetricsSystem 配置了一個(gè)全局的Sink,MetricsServlet。所以你添加的任何Source 都可以通過一個(gè)path /metrics/json獲取到。如果你的程序設(shè)置做了上面的設(shè)置,把你的spark-ui的路徑換成/metrics/json,就能看到j(luò)vm源的一些信息了。 通常,如果你要實(shí)現(xiàn)一個(gè)自定義的Source,可以遵循如下步驟(這里以JvmSource為例)。 -- 創(chuàng)建一個(gè)Source
private[spark] class JvmSource extends Source {override val sourceName = "jvm"override val metricRegistry = new MetricRegistry()metricRegistry.registerAll(new GarbageCollectorMetricSet)metricRegistry.registerAll(new MemoryUsageGaugeSet)
}

其中 sourceName 是為了給配置用的,比如上面我們?cè)O(shè)置

spark.metrics.conf.driver.source.jvm.class
里面的jvm 就是JvmSource里設(shè)置的sourceName 每個(gè)Source 一般會(huì)自己構(gòu)建一個(gè)MetricRegistry。上面的例子,具體的數(shù)據(jù)收集工作是由GarbageCollectorMetricSet,MemoryUsageGaugeSet完成的。 具體就是寫一個(gè)類繼承com.codahale.metrics.MetricSet,然后實(shí)現(xiàn)Map<String, Metric> getMetrics() 方法就好。 接著通過metricRegistry.registerAll將寫好的MetricSet注冊(cè)上就行。 -- 添加配置
conf.set("spark.metrics.conf.driver.source.jvm.class","org.apache.spark.metrics.source.JvmSource")
-- 調(diào)用結(jié)果 將Spark UI 的地址換成/metrics/json,就能看到輸出結(jié)果了。當(dāng)然,這里是因?yàn)槟J(rèn)系統(tǒng)默認(rèn)提供了一個(gè)Sink實(shí)現(xiàn):org.apache.spark.metrics.sink.MetricsServlet,你可以自己實(shí)現(xiàn)一個(gè)。

如何定制更多的監(jiān)控指標(biāo)

通過之前我寫的Spark UI (基于Yarn) 分析與定制,你應(yīng)該學(xué)會(huì)了如何添加新的頁面到Spark UI上。 而這通過這一片文章,你應(yīng)該了解了數(shù)據(jù)來源有兩個(gè):
  • 各個(gè)Listener
  • MetricsSystem
你可以組合現(xiàn)有的Listener以及Metrics Source 顯示任何你想要的內(nèi)容。 如果現(xiàn)有的無法滿足你,通常你的新的需求應(yīng)該可以通過下面兩種方式來滿足:
  1. 你需要監(jiān)控新的事件,那么你需要添加新的ListenerBus,Listener,Event,然后到你需要的地方去埋點(diǎn)(post事件)。這肯定需要修改spark-core里的代碼了。
  2. 你需要呈現(xiàn)現(xiàn)有的listener或者已知對(duì)象的變量,則使用MetricsSystem,定義一個(gè)新的Source 即可。
這樣,把這些對(duì)象傳遞到你的Page中,就可以進(jìn)行展示。

轉(zhuǎn)載于:https://www.cnblogs.com/itboys/p/9153091.html

總結(jié)

以上是生活随笔為你收集整理的Spark ListenerBus 和 MetricsSystem 体系分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。