spark学习:ContextCleaner清理器
Spark運行的時候,會產生一堆臨時文件,臨時數據,比如持久化的RDD數據在磁盤上,沒有持久化的在內存中,比如shuffle的臨時數據等,如果每次運行完,或者沒有運行完殺掉了,不清理,會產生大量的無用數據,最終造成大數據集群崩潰而死。
初始化
ContextCleaner的初始化是在SparkContext中初始化的,這個功能默認是必須開?
啟的。
? _cleaner =
? ? ? if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
? ? ? ? Some(new ContextCleaner(this))
? ? ? } else {
? ? ? ? None
? ? ? }
? ? _cleaner.foreach(_.start())
1
2
3
4
5
6
7
初始化 的時候主要newle一個清理線程
// 清理線程===》很重要
? private val cleaningThread = new Thread() { override def run() { keepCleaning() }}
1
2
這個清理線程,主要清理了RDD,shuffle,Broadcast,累加器,檢查點等數據
?/** Keep cleaning RDD, shuffle, and broadcast state.
? ? * 保持一個干凈的RDD,shuffle和broadcast狀態
? ? *
? ? * ContextCleaner的工作原理和listenerBus一樣,也采用監聽器模式,由線程來處理,此線程實際還是那個只是
? ? * 調用keepCleanning方法。
? ? * */
? private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
? ? // 默認一直為真true
? ? while (!stopped) {
? ? ? try {
? ? ? ? val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
? ? ? ? ? .map(_.asInstanceOf[CleanupTaskWeakReference])
? ? ? ? // Synchronize here to avoid being interrupted on stop()
? ? ? ? synchronized {
? ? ? ? ? reference.foreach { ref =>
? ? ? ? ? ? logDebug("Got cleaning task " + ref.task)
? ? ? ? ? ? referenceBuffer.remove(ref)
? ? ? ? ? ? // 清除Shuffle和Broadcast相關的數據會分別調用doCleanupShuffle和doCleanupBroadcast函數。根據需要清除數據的類型分別調用
? ? ? ? ? ? ref.task match {
? ? ? ? ? ? ? case CleanRDD(rddId) =>
? ? ? ? ? ? ? ? doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
? ? ? ? ? ? ? case CleanShuffle(shuffleId) =>
? ? ? ? ? ? ? ? doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
? ? ? ? ? ? ? case CleanBroadcast(broadcastId) =>
? ? ? ? ? ? ? ? doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
? ? ? ? ? ? ? case CleanAccum(accId) =>
? ? ? ? ? ? ? ? doCleanupAccum(accId, blocking = blockOnCleanupTasks)
? ? ? ? ? ? ? case CleanCheckpoint(rddId) =>
? ? ? ? ? ? ? ? doCleanCheckpoint(rddId)
? ? ? ? ? ? }
? ? ? ? ? }
? ? ? ? }
? ? ? } catch {
? ? ? ? case ie: InterruptedException if stopped => // ignore
? ? ? ? case e: Exception => logError("Error in cleaning thread", e)
? ? ? }
? ? }
? }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
RDD的清理
?/** Perform RDD cleanup.
? ? * 在ContextCleaner 中會調用RDD.unpersist()來清除已經持久化的RDD數據
? ? * */
? def doCleanupRDD(rddId: Int, blocking: Boolean): Unit = {
? ? try {
? ? ? logDebug("Cleaning RDD " + rddId)
? ? ? // 被SparkContext的unpersistRDD方法
? ? ? sc.unpersistRDD(rddId, blocking)
? ? ? listeners.asScala.foreach(_.rddCleaned(rddId))
? ? ? logInfo("Cleaned RDD " + rddId)
? ? } catch {
? ? ? case e: Exception => logError("Error cleaning RDD " + rddId, e)
? ? }
? }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
shuffle的清理
/** Perform shuffle cleanup.
? ? *
? ? * 清理Shuffle
? ? * */
? def doCleanupShuffle(shuffleId: Int, blocking: Boolean): Unit = {
? ? try {
? ? ? logDebug("Cleaning shuffle " + shuffleId)
? ? ? // 把mapOutputTrackerMaster跟蹤的shuffle數據不注冊(具體做了什么,還沒處理)
? ? ? mapOutputTrackerMaster.unregisterShuffle(shuffleId)
? ? ? // 刪除shuffle的塊數據
? ? ? blockManagerMaster.removeShuffle(shuffleId, blocking)
? ? ? listeners.asScala.foreach(_.shuffleCleaned(shuffleId))
? ? ? logInfo("Cleaned shuffle " + shuffleId)
? ? } catch {
? ? ? case e: Exception => logError("Error cleaning shuffle " + shuffleId, e)
? ? }
? }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
廣播的清理
/** Perform broadcast cleanup.
? ? * 清除廣播
? ? * */
? def doCleanupBroadcast(broadcastId: Long, blocking: Boolean): Unit = {
? ? try {
? ? ? logDebug(s"Cleaning broadcast $broadcastId")
? ? ? // 廣播管理器 清除廣播
? ? ? broadcastManager.unbroadcast(broadcastId, true, blocking)
? ? ? listeners.asScala.foreach(_.broadcastCleaned(broadcastId))
? ? ? logDebug(s"Cleaned broadcast $broadcastId")
? ? } catch {
? ? ? case e: Exception => logError("Error cleaning broadcast " + broadcastId, e)
? ? }
? }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
累加器的清理
/** Perform accumulator cleanup.
? ? * 清除累加器
? ? * */
? def doCleanupAccum(accId: Long, blocking: Boolean): Unit = {
? ? try {
? ? ? logDebug("Cleaning accumulator " + accId)
? ? ? AccumulatorContext.remove(accId)
? ? ? listeners.asScala.foreach(_.accumCleaned(accId))
? ? ? logInfo("Cleaned accumulator " + accId)
? ? } catch {
? ? ? case e: Exception => logError("Error cleaning accumulator " + accId, e)
? ? }
? }
1
2
3
4
5
6
7
8
9
10
11
12
13
檢查點的清理
/**
? ?* Clean up checkpoint files written to a reliable storage.
? ?* Locally checkpointed files are cleaned up separately through RDD cleanups.
? ? *
? ? * 清理記錄到可靠存儲的檢查點文件。
? ? * 局部檢查點文件通過RDD清理被單獨清理。
? ?*/
? def doCleanCheckpoint(rddId: Int): Unit = {
? ? try {
? ? ? logDebug("Cleaning rdd checkpoint data " + rddId)
? ? ? // 這里直接調用文件系統刪除 ?是本地 就本地刪除,是hdfs就hdfs刪除
? ? ? ReliableRDDCheckpointData.cleanCheckpoint(sc, rddId)
? ? ? listeners.asScala.foreach(_.checkpointCleaned(rddId))
? ? ? logInfo("Cleaned rdd checkpoint data " + rddId)
? ? }
? ? catch {
? ? ? case e: Exception => logError("Error cleaning rdd checkpoint data " + rddId, e)
? ? }
? }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
啟動方法
在sparkContext中調用啟動方法
? ? _cleaner.foreach(_.start())
1
這里是啟動方法
/** Start the cleaner.
? ? * 開始清理
? ? * */
? def start(): Unit = {
? ? // 設置清理線程為守護進程
? ? cleaningThread.setDaemon(true)
? ? // 設置守護進程的名稱
? ? cleaningThread.setName("Spark Context Cleaner")
? ? // 啟動守護進程
? ? cleaningThread.start()
? ? // scheduleAtFixedRate 在給定的初始延遲之后,并隨后在給定的時間內,創建并執行一個已啟用的周期操作
? ? // periodicGCInterval=30分鐘 也就是沒=每過30分鐘運行一次清理線程清理垃圾
? ? periodicGCService.scheduleAtFixedRate(new Runnable {
? ? ? // 執行系統的垃圾清理
? ? ? override def run(): Unit = System.gc()
? ? }, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
? }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
這里啟動線程 // 啟動守護進程 cleaningThread.start(),這里自我感覺一下,因為下面調用System.gc()是清理垃圾,所以這個cleaningThread線程應該是收集那些需要清理的數據,保存它的引用(引用就是一個地址,一個指針,指向要刪除的數據),最后調用System.gc()方法,才真正清理。
結束
最后是關閉這個應用的時候,調用Stop()方法
/**
? ?* Stop the cleaning thread and wait until the thread has finished running its current task.
? ? * 停止清理線程并等待線程完成其當前任務。
? ?*/
? def stop(): Unit = {
? ? stopped = true
? ? // Interrupt the cleaning thread, but wait until the current task has finished before
? ? // doing so. This guards against the race condition where a cleaning thread may
? ? // potentially clean similarly named variables created by a different SparkContext,
? ? // resulting in otherwise inexplicable block-not-found exceptions (SPARK-6132).
? ? // 中斷清理線程,但等待當前任務完成后再執行。
? ? // This guards against the race condition where a cleaning thread may
? ? // potentially clean similarly named variables created by a different SparkContext,
? ? // ,導致其他令人費解的塊未發現異常(spark-6132)。
? ? synchronized {
? ? ? // 打斷線程
? ? ? cleaningThread.interrupt()
? ? }
? ? // 設置0 等待這個線程死掉
? ? cleaningThread.join()
? ? // 關閉垃圾清理
? ? periodicGCService.shutdown()
? }
?
總結
以上是生活随笔為你收集整理的spark学习:ContextCleaner清理器的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: jstat的小伙伴:找出system.g
- 下一篇: 关于Netty的ByteBuff内存泄漏