Spark-1.6.0之Application运行信息记录器JobProgressListener
JobProgressListener類是Spark的ListenerBus中一個很重要的監聽器,可以用于記錄Spark任務的Job和Stage等信息,比如在Spark UI頁面上Job和Stage運行狀況以及運行進度的顯示等數據,就是從JobProgressListener中獲得的。另外,SparkStatusTracker也會從JobProgressListener中獲取Spark運行信息。外部應用也可以通過Spark提供的status相關API比如AllJobResource, AllStagesResource, OneJobResource, OneStageResource獲取到Spark程序的運行信息。
JobProgressListener類的繼承關系,以及該類中重要的屬性和方法,見下圖
在Spark-1.6.0中,JobProgressListener對象生成后,會被add到一個LiveListenerBus類型的ListenerBus中。LiveListenerBus類的基礎關系,以及該類中重要方法和屬性見下圖
文章接下來分析在一個Spark Application中JobProgressListener的生命周期,以及其數據接收和傳遞的過程。
一、JobProgressListener生成和數據寫入
1、JobProgressListener生成
在源代碼中,JobProgressListener在SparkContext對象創建時就生成了,
private[spark] val listenerBus = new LiveListenerBus //listenerBus private var _jobProgressListener: JobProgressListener = _ //定義 ... _jobProgressListener = new JobProgressListener(_conf) //生成 private[spark] def jobProgressListener: JobProgressListener = _jobProgressListener //使用 listenerBus.addListener(jobProgressListener) //使用 從上面的代碼中看到,jobProgressListener在生成后,spark將其存入了LiveListenerBus對象中,其他任何接收到listenerBus的地方都能從中獲取到這個jobProgressListener對象。另外在創建SparkUI對象時,使用到了_jobProgressListener對象,使得Spark UI頁面能夠從該對象中獲取Spark應用程序的運行時數據。或者也可以像SparkStatusTracker對象那樣,直接從SparkContext對象中獲取jobProgressListener。
最后,在SparkContext中調用setupAndStartListenerBus()方法,啟動和初始化listenerBus。我們可以看到,在該方法中最后調用了listenerBus.start(this)方法真正啟動listenerBus。
2、JobProgressListener接收事件
(1)事件進入LiveListenerBus
LiveListenerBus繼承自AsynchronousListenerBus,可以看到這里是多線程的方式。里面維持了一個大小為10000的eventQueue,LinkedBlockingDeque類型。這個可以和DAGScheduler中提到的EventLoop類中的eventQueue對比分析。
eventQueue接收事件調用的是post方法,這里調用的是LinkedBlockingDeque.offer方法,而EventLoop中調用的是LinkedBlockingDeque.put,可以比較這兩者的區別。
所以說,各類事件都是調用AsynchronousListenerBus.post方法傳入eventQueue中的。比如,在DAGScheduler類中,可以看到總共有14個調用的地方,下面列舉出其中12個不同的。
| executorHeartbeatReceived | SparkListenerExecutorMetricsUpdate | executor向master發送心跳表示BlockManager仍然存活 |
| handleBeginEvent | SparkListenerTaskStart | task開始執行事件 |
| cleanUpAfterSchedulerStop | SparkListenerJobEnd | Job結束事件 |
| handleGetTaskResult | SparkListenerTaskGettingResult | task獲取結果事件 |
| handleJobSubmitted | SparkListenerJobStart | Job開始事件 |
| handleMapStageSubmitted | SparkListenerJobStart | Job開始事件 |
| submitMissingTasks | SparkListenerStageSubmitted | Stage提交事件 |
| handleTaskCompletion | SparkListenerTaskEnd | Task結束事件 |
| handleTaskCompletion | SparkListenerJobEnd | Job結束事件 |
| markStageAsFinished | SparkListenerStageCompleted | Stage結束事件 |
| failJobAndIndependentStages | SparkListenerJobEnd | Job結束事件 |
| markMapStageJobAsFinished | SparkListenerJobEnd | Job結束事件 |
分析到這里,各種SparkListenerEvent事件傳遞到了eventQueue中,那么如何進一步傳遞到JobProgessListener中呢?接下來JobProgressListener作為消費者,從eventQueue中消費這些SparkListenerEvent。
(2)事件進入到JobProgressListener
從SparkContext中啟動LiveListenerBus線程開始,LiveListenerBus繼承自AsynchronousListenerBus的run方法便一直在多線程運行。在run中有一段主要邏輯,
val event = eventQueue.poll if (event == null) {// Get out of the while loop and shutdown the daemon threadif (!stopped.get) {throw new IllegalStateException("Polling `null` from eventQueue means" +" the listener bus has been stopped. So `stopped` must be true")}return } postToAll(event) 從eventQueue取出事件后,調用LiveListenerBus的postToAll方法,將事件分發到各Listener中。
具體看一下LiveListenerBus的postToAll方法,這個方法從ListenerBus繼承。
2、JobProgressListener對各種事件的響應
那么接下來,從JobProgressListener對各種事件的響應方法出發,對其狀態變更邏輯作一個簡要梳理,很多方法從其命名上就能看出其主要功能,有需要的可以進入具體方法中做進一步的研究。JobProgressListener能做出響應的所有SparkListenerEvent事件,基本上都列在前面的表格中了。各類事件基本上都是從DAGScheduler中傳入的,可以參考Spark Scheduler模塊源碼分析之DAGScheduler
(1)Job級別信息
這里主要涉及到Job開始和結束的兩個方法
- onJobStart(SparkListenerJobStart)
在Job開始時,獲取job的一些基本信息,比如參數spark.jobGroup.id 確定的JobGroup。然后生成一個JobUIData對象,用于在Spark UI頁面上顯示Job的ID,提交時間,運行狀態,這個Job包含的Stage個數,完成、跳過、失敗的Stage個數。以及總的Task個數,以及完成、失敗、跳過、正在運行的Task個數。該Job中包含的所有Stage都存入pendingStages中。 - onJobEnd(SparkListenerJobEnd)
在Job完成時,根據該Job的最終狀態是成功還是失敗,分別把該job的相關信息存入completedJob對象和failedJobs對象中,同時把成功或者失敗的job數加一。然后循環處理該Job的每一個Stage,將該Stage對應的當前Job移除,如果移除后發現該Stage再沒有其他Job使用了,就把該Stage從activeStage列表中移除。接下來,如果這個Stage的提交時間為空,則表示該Stage被跳過執行,更新一下skipped的Stage個數,以及skipped的Task個數。(成功和失敗的Stage的邏輯在下面一小節中)
(2)Stage級別信息
有關Stage的狀態變更處理邏輯,這里也有Stage的submit和complete方法
- onStageSubmitted(SparkListenerStageSubmitted)
在Stage提交后,將該Stage存入activeStages中,并且從pendingStages中移除該Stage。首先獲得當前的調度池名稱,如果是FIFO模式,則是default(實際上不起任何作用),然后根據該調度池,將這個Stage放入調度池中。然后把所屬job的numActiveStages加一, onStageCompleted(SparkListenerStageCompleted)
在Stage完成后,從調度池中將該Stage移除,同時也從activeStages中移除。根據該Stage是成功還是失敗,繼續更新completedStages或failedStages,并更新這類Stage的統計數。然后把對應Job中activeStages值減一,如果這個Stage是成功的(判斷依據是failureReason為空),則把對應job的成功Stage數加一,否則把對應Job的失敗Stage數加一。
(3)Task級別信息
有關Task的方法有task開始,結束兩個方法onTaskStart(SparkListenerTaskStart)
當一個Task開始運行時,會把對應Stage中active狀態的Task計數加一,并且把這個Task相關的信息記入對應Stage中,同時更新該Task所屬Job中Active狀態Task的個數。- onTaskEnd(SparkListenerTaskEnd)
當一個Task運行完成時,獲取該Task對應Stage的executorSummary信息,這個executorSummary中記錄了每個Executor對應的ExecutorSummary信息,其中包括task開始時間,失敗task個數,成功task個數,輸入輸出字節數,shuffle read/write字節數等。然后根據這個Task所屬的executorId,找到當前Task的運行統計信息execSummary。如果這個Task運行成功,就將成功task個數加一,否則就將失敗task個數加一。然后根據Task運行狀態,更新對應Stage中失敗或成功Task個數。進一步,更新對應Job中失敗或成功的Task個數。
二、SparkUI頁面從JobProgressListener讀取數據
JobProgressListener主要用在向Spark UI頁面傳遞數據上。
轉載于:https://www.cnblogs.com/wuyida/p/6300238.html
與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的Spark-1.6.0之Application运行信息记录器JobProgressListener的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JMS学习(五)--ActiveMQ中的
- 下一篇: ebs 初始化登陆