日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark HistoryServer日志解析清理异常

發(fā)布時間:2025/1/21 编程问答 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark HistoryServer日志解析清理异常 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Spark HistoryServer日志解析&清理異常

一、背景介紹

用戶在使用 Spark 提交任務(wù)時,經(jīng)常會出現(xiàn)任務(wù)完成后在 HistoryServer(Spark 1.6 和 Spark 2.1 HistoryServer 合并,統(tǒng)一由 Spark 2.1 HistoryServer 管控,因此本文的代碼分析都是基于 Spark 2.1 版本的代碼展開的)中找不到 appid 信息,尤其是對于失敗的任務(wù),用戶無法查看日志分析任務(wù)失敗的原因。為此,特地對 Spark 2.1 HistoryServer 進行了研究,發(fā)現(xiàn)根本問題出在內(nèi)部的兩個核心數(shù)據(jù)結(jié)構(gòu)的使用存在異常導(dǎo)致的。

二、eventLog 日志文件及相關(guān)參數(shù)

2.1 eventLog 日志文件介紹

eventLog 是 Spark 任務(wù)在運行過程中,調(diào)用 EventLoggingListener#logEvent() 方法來輸出 eventLog 內(nèi)容,Spark 中定義各種類型的事件,一旦某個事件被觸發(fā),就會構(gòu)造一個類型的 Event,然后獲取相應(yīng)的運行信息并設(shè)置進去,最終將該 event 對象序列化成 json 字符串,追加到 eventLog 日志文件中。

Spark 中 eventLog 默認(rèn)是不開啟的,由參數(shù) ‘spark.history.fs.cleaner.enabled’ 來控制,開啟這個配置后,任務(wù)運行的信息就會寫到 eventLog 日志文件中,日志文件具體保存在參數(shù) ‘spark.eventLog.dir’ 配置的目錄下。

2.2 相關(guān)配置參數(shù)

一般這些配置放在 /etc/spark2/conf/spark-defaults.conf 中。

注:但在實際自定義修改 Spark HistoryServer 配置時,spark-defaults.conf 中并沒有寫入(具體原因待看)。但可以通過查看 HistoryServer 進程使用的 spark-history-server.conf 配置查看,在 Spark HistoryServer 所在機器上,通過 ‘ps -ef |grep HistoryServer’ 查看具體配置 ‘–properties-file /run/cloudera-scm-agent/process/136253-spark2_on_yarn-SPARK2_YARN_HISTORY_SERVER/spark2-conf/spark-history-server.conf’,這里會使用自定義更新的 HistoryServer 參數(shù)。

參數(shù)默認(rèn)含義
spark.history.retainedApplications50在內(nèi)存中保存 Application 歷史記錄的個數(shù),如果超過這個值,舊的應(yīng)用程序信息將被刪除,當(dāng)再次訪問已被刪除的應(yīng)用信息時需要重新構(gòu)建頁面。
spark.history.fs.update.interval10s指定刷新日志的時間,更短的時間可以更快檢測到新的任務(wù)以及任務(wù)執(zhí)行情況,但過快會加重服務(wù)器負(fù)載。
spark.history.ui.maxApplicationInt.MaxValue顯示在總歷史頁面中的程序的數(shù)量。如果總歷史頁面未顯示,程序 UI 仍可通過訪問其 URL 來顯示。
spark.history.ui.port18089(Spark2.1)指定history-server的網(wǎng)頁UI端口號
spark.history.fs.cleaner.enabledfalse指定history-server的日志是否定時清除,true為定時清除,false為不清除。這個值一定設(shè)置成true啊,不然日志文件會越來越大。
spark.history.fs.cleaner.interval1d定history-server的日志檢查間隔,默認(rèn)每一天會檢查一下日志文件
spark.history.fs.cleaner.maxAge7d指定history-server日志生命周期,當(dāng)檢查到某個日志文件的生命周期為7d時,則會刪除該日志文件
spark.eventLog.compressfalse設(shè)置history-server產(chǎn)生的日志文件是否使用壓縮,true為使用,false為不使用。這個參數(shù)務(wù)可以成壓縮哦,不然日志文件歲時間積累會過大
spark.history.retainedApplications50在內(nèi)存中保存Application歷史記錄的個數(shù),如果超過這個值,舊的應(yīng)用程序信息將被刪除,當(dāng)再次訪問已被刪除的應(yīng)用信息時需要重新構(gòu)建頁面。
spark.history.fs.numReplayThreadsceil(cpu核數(shù)/4)解析 eventLog 的線程數(shù)量

三、eventLog 日志解析及日志清理原理

3.1 兩個定時任務(wù)

FsHistoryProvider 類在初始化時,會調(diào)用 startPolling() 方法,來啟動兩個定時任務(wù),即日志文件解析任務(wù)和日志文件清理任務(wù),兩個任務(wù)均是由獨立線程執(zhí)行。當(dāng)然,日志文件清理任務(wù)是否開啟是由參數(shù) spark.history.fs.cleaner.enabled 控制(默認(rèn)為 false,線上環(huán)境為 true,即開啟了日志文件清理任務(wù))。

//位置:core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scalaprivate[history] def initialize(): Thread = {if (!isFsInSafeMode()) {// 兩個定時任務(wù)啟動入口startPolling()null} else {startSafeModeCheckThread(None)}}private def startPolling(): Unit = {// Validate the log directory.val path = new Path(logDir)// Disable the background thread during tests.if (!conf.contains("spark.testing")) {// A task that periodically checks for event log updates on disk.logDebug(s"Scheduling update thread every $UPDATE_INTERVAL_S seconds")// 日志文件解析線程pool.scheduleWithFixedDelay(getRunner(checkForLogs), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS)if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) {// A task that periodically cleans event logs on disk.// 日志文件清理線程pool.scheduleWithFixedDelay(getRunner(cleanLogs), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS)}} else {logDebug("Background update thread disabled for testing")}}

3.2 eventLog 日志文件解析原理

3.2.1 關(guān)鍵數(shù)據(jù)結(jié)構(gòu)

在介紹日志解析前,先來看看兩個關(guān)鍵的數(shù)據(jù)結(jié)構(gòu)。fileToAppInfo 和 applications。

fileToAppInfo 結(jié)構(gòu)用于保存日志目錄 /user/spark/spark2ApplicationHistory/ 下每一條 eventLog 日志文件。每次 HDFS 目錄下新生成的文件都會更新到該數(shù)據(jù)結(jié)構(gòu)。

val fileToAppInfo = new mutable.HashMap[Path, FsApplicationAttemptInfo]()

applications 結(jié)構(gòu)用于保存每個 App 對應(yīng)的所有 AppAttempt 運行或完成的日志信息。

@volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo] = new mutable.LinkedHashMap()

舉個例子:HDFS 日志目錄下有同一個 App 的兩個 eventLog 文件。

/user/spark/spark2ApplicationHistory/application_1599034722009_10003548_1 /user/spark/spark2ApplicationHistory/application_1599034722009_10003548_2

此時,fileToAppInfo 保存的數(shù)據(jù)格式為:(兩條記錄)

<'/user/spark/spark2ApplicationHistory/application_1599034722009_10003548_1', AttemptInfo> <'/user/spark/spark2ApplicationHistory/application_1599034722009_10003548_2', AttemptInfo>

而 applications 保存的數(shù)據(jù)格式為:(一條記錄)

<'application_1599034722009_10003548', HistoryInfo<Attemp1, Attempt2>>

3.2.2 日志文件解析流程

eventLog 日志文件一次完整解析的流程大概分為以下幾個步驟:

  • 掃描 /user/spark/spark2ApplicationHistory/ 目錄下日志文件是否有更新。(更新有兩個情況:一種是已有的日志文件大小增加,一種是生成了新的日志文件)
  • 若有更新,則從線程池中啟動一個線程對日志進行初步解析。(解析環(huán)節(jié)是關(guān)鍵,UI 界面無法查看是因為解析出現(xiàn)異常)
  • 將解析后的日志同時更新到 fileToAppInfo 和 applications 結(jié)構(gòu)中,保證數(shù)據(jù)維持最新狀態(tài)。
  • 等待解析線程執(zhí)行完成,更新 HDFS 目錄的掃描時間。(線程池啟動的多個線程會阻塞執(zhí)行,直到所有解析線程完成才更新掃描時間)
  • 源碼分析如下:

    這段代碼主要是前兩個步驟的介紹,定期掃描日志目錄(定期時間由參數(shù) spark.history.fs.update.interval 控制,線上環(huán)境為 30s),將文件大小有增加和新生成的文件保存在 logInfos 對象中。然后將新文件放到

    replayExecutor 線程池中執(zhí)行,該線程池大小默認(rèn)為 機器cpu核數(shù)/4,由參數(shù) spark.history.fs.numReplayThreads 控制,線上環(huán)境為 50。

    //位置:core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scalaprivate[history] def checkForLogs(): Unit = {try {val newLastScanTime = getNewLastScanTime()logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Seq[FileStatus]())// logInfos 保存所有新的 eventLog 文件(包括大小增加的和新生成的文件)// filter:過濾出新的日志文件// flatMap:過濾空的entry對象// sortWith:根據(jù)日志文件更新時間降序排序val logInfos: Seq[FileStatus] = statusList.filter { entry =>try {val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)!entry.isDirectory() &&!entry.getPath().getName().startsWith(".") &&prevFileSize < entry.getLen()} catch {case e: AccessControlException =>logDebug(s"No permission to read $entry, ignoring.")false}}.flatMap { entry => Some(entry) }.sortWith { case (entry1, entry2) =>entry1.getModificationTime() >= entry2.getModificationTime()}if (logInfos.nonEmpty) {logDebug(s"New/updated attempts found: ${logInfos.size} ${logInfos.map(_.getPath)}")}var tasks = mutable.ListBuffer[Future[_]]()try {for (file <- logInfos) {// 對掃描出來的文件進行解析tasks += replayExecutor.submit(new Runnable {override def run(): Unit = mergeApplicationListing(file)})}} catch {case e: Exception =>logError(s"Exception while submitting event log for replay", e)}... //省略}

    第三步流程主要在 mergeApplicationListing() 方法中處理。先來看看 fileToAppInfo 結(jié)構(gòu)如何更新,這里的關(guān)鍵是 replay() 方法,這里會對 eventLog 進行初步解析,然后將解析后的內(nèi)容更新到 fileToAppInfo 中。

    //位置:core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scalaprivate def mergeApplicationListing(fileStatus: FileStatus): Unit = {// 函數(shù)監(jiān)聽兩個事件:作業(yè)開始和作業(yè)結(jié)束val newAttempts = try {val eventsFilter: ReplayEventsFilter = { eventString =>eventString.startsWith(APPL_START_EVENT_PREFIX) ||eventString.startsWith(APPL_END_EVENT_PREFIX)}val logPath = fileStatus.getPath()val appCompleted = isApplicationCompleted(fileStatus)// UI 查看的關(guān)鍵,對 eventLog 日志文件進行解析回放val appListener = replay(fileStatus, appCompleted, new ReplayListenerBus(), eventsFilter)// 根據(jù)解析的結(jié)果構(gòu)建 FsApplicationAttemptInfo 對象if (appListener.appId.isDefined) {val attemptInfo = new FsApplicationAttemptInfo(logPath.getName(),appListener.appName.getOrElse(NOT_STARTED),appListener.appId.getOrElse(logPath.getName()),appListener.appAttemptId,appListener.startTime.getOrElse(-1L),appListener.endTime.getOrElse(-1L),fileStatus.getModificationTime(),appListener.sparkUser.getOrElse(NOT_STARTED),appCompleted,fileStatus.getLen())// 更新 fileToAppInfo 結(jié)構(gòu)fileToAppInfo(logPath) = attemptInfologDebug(s"Application log ${attemptInfo.logPath} loaded successfully: $attemptInfo")Some(attemptInfo)} else {logWarning(s"Failed to load application log ${fileStatus.getPath}. " +"The application may have not started.")None}}... // 省略 }

    那 applications 結(jié)構(gòu)又是如何更新的呢?主要是先找出新的 App 對象,將舊的 App 列表和新的 App 列表進行合并,生成新的對象,并更新到 applications 中。

    //位置:core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scalaprivate def mergeApplicationListing(fileStatus: FileStatus): Unit = {... // 省略val newAppMap = new mutable.HashMap[String, FsApplicationHistoryInfo]()// 多線程同時更新 applications 對象,這里用 synchronized 實現(xiàn)同步訪問該對象applications.synchronized {// newAttempts 對象是剛才解析 eventLog 構(gòu)造的 FsApplicationAttemptInfo 對象列表// 這一步的目的就是要過濾出剛才新生成的App對象,并更新已存在但大小有增加的App對象newAttempts.foreach { attempt =>val appInfo = newAppMap.get(attempt.appId).orElse(applications.get(attempt.appId)).map { app =>val attempts =app.attempts.filter(_.attemptId != attempt.attemptId) ++ List(attempt)new FsApplicationHistoryInfo(attempt.appId, attempt.name,attempts.sortWith(compareAttemptInfo))}.getOrElse(new FsApplicationHistoryInfo(attempt.appId, attempt.name, List(attempt)))newAppMap(attempt.appId) = appInfo}val newApps = newAppMap.values.toSeq.sortWith(compareAppInfo)val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()def addIfAbsent(info: FsApplicationHistoryInfo): Unit = {if (!mergedApps.contains(info.id)) {mergedApps += (info.id -> info)}}// mergedApps 對象用于保存已有App對象和新生成的App對象進行合并后結(jié)果,產(chǎn)生最新的 applications 對象val newIterator = newApps.iterator.bufferedval oldIterator = applications.values.iterator.bufferedwhile (newIterator.hasNext && oldIterator.hasNext) {if (newAppMap.contains(oldIterator.head.id)) {oldIterator.next()} else if (compareAppInfo(newIterator.head, oldIterator.head)) {addIfAbsent(newIterator.next())} else {addIfAbsent(oldIterator.next())}}newIterator.foreach(addIfAbsent)oldIterator.foreach(addIfAbsent)applications = mergedApps} }

    3.3 eventLog 日志清理原理

    了解了前面 fileToAppInfo 和 applications 數(shù)據(jù)結(jié)構(gòu),日志清理的原理相對而言就簡單很多,主要是對 applications 對象進行處理。

    日志清理大致流程如下:

  • 獲取 eventLog 日志保留的生命周期事件,由參數(shù) spark.history.fs.cleaner.maxAge 控制,默認(rèn) 7d,線上 5d。
  • 掃描 applications 對象,將待清理的日志對象保存在 attemptsToClean 對象,保留的對象保存在 appsToRetain。(一個文件是否可以刪除由函數(shù) shouldClean() 控制)
  • 更新 applications 對象。
  • 調(diào)用 HDFS api 執(zhí)行真正的刪除操作。
  • 源碼分析:

    //位置:core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scalaprivate[history] def cleanLogs(): Unit = {try {// 1、獲取 eventLog 保存的生命周期時間val maxAge = conf.getTimeAsSeconds("spark.history.fs.cleaner.maxAge", "7d") * 1000val now = clock.getTimeMillis()val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()// 判斷函數(shù):超過生命周期并完成(后綴不是 .inprogress 結(jié)束)的任務(wù)可以正常清理def shouldClean(attempt: FsApplicationAttemptInfo): Boolean = {now - attempt.lastUpdated > maxAge && attempt.completed}// 2、掃描 applications 對象,將超過生命周期待清理的 eventLog 保存在 attemptsToClean 對象中,未超過的保存在 appsToRetain 對象中applications.values.foreach { app =>val (toClean, toRetain) = app.attempts.partition(shouldClean)attemptsToClean ++= toCleanif (toClean.isEmpty) {appsToRetain += (app.id -> app)} else if (toRetain.nonEmpty) {appsToRetain += (app.id ->new FsApplicationHistoryInfo(app.id, app.name, toRetain.toList))}}// 3、更新 applications 對象applications = appsToRetainval leftToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]// 4、調(diào)用 HDFS api 執(zhí)行真正的清理操作attemptsToClean.foreach { attempt =>try {fs.delete(new Path(logDir, attempt.logPath), true)} catch {case e: AccessControlException =>logInfo(s"No permission to delete ${attempt.logPath}, ignoring.")case t: IOException =>logError(s"IOException in cleaning ${attempt.logPath}", t)leftToClean += attempt}}// 沒有正常清理的對象重新更新到 attemptsToClean 中attemptsToClean = leftToClean} catch {case t: Exception => logError("Exception in cleaning logs", t)}}

    四、原因分析&解決方案

    上面日志解析和日志清理的邏輯都依賴 fileToAppInfo 和 applications 對象,Spark HistoryServer UI 界面展示的內(nèi)容也是依賴這兩個對象,所以,UI 無法加載任務(wù)信息也是由于這里的數(shù)據(jù)結(jié)構(gòu)出現(xiàn)了多線程訪問的線程安全問題。

    4.1 HashMap 線程同步問題&解決方案

    4.1.1 原因分析

    fileToAppInfo 對象是 FsHistoryProvider 類的一個對象,數(shù)據(jù)結(jié)構(gòu)采用 HashMap,是線程不安全的對象,但在多線程調(diào)用 mergeApplicationListing() 方法操作 fileToAppInfo 對象并不是同步訪問,導(dǎo)致每次載入所有 eventLog 日志文件,會出現(xiàn)不能保證所有文件都能被正常加載。那為什么會出現(xiàn)這種情況呢?其實就是多線程訪問同一個對象時經(jīng)常出現(xiàn)的一個問題。

    下圖是多線程訪問同一對象帶來的線程安全問題的一個簡單例子:

    • 當(dāng)線程 1 執(zhí)行 x++ 后將結(jié)果更新到內(nèi)存中,內(nèi)存中此時 x=1,沒有問題。
    • 但由于線程 1 在讀內(nèi)存數(shù)據(jù)時線程 2 同時也讀取內(nèi)存中 x 的值,當(dāng)線程 2 執(zhí)行 x++ 后,將結(jié)果更新到內(nèi)存中,此時內(nèi)存中 x 的值還是 1。
    • 而預(yù)期的結(jié)果是 x = 2,這種情況便是多線程訪問同一對象的線程安全問題。

    多線程訪問同一對象帶來的線程安全問題

    4.1.2 解決方案

    HashMap 對象帶來的線程安全問題,解決方法比較簡單,用 ConcurrentHashMap 替代即可。參考 patch:SPARK-21223。

    var fileToAppInfo = new ConcurrentHashMap[Path, FsApplicationAttemptInfo]()

    4.2 Synchronized 鎖同步問題

    4.2.1 原因分析

    在 Spark HistoryServer 中,applications 更新的玩法是這樣的:

    //位置:core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]= new mutable.LinkedHashMap()applications.synchronized {... // 省略val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()... // 省略更新 mergedApps 的值applications = mergedApps }

    咋一看,這樣使用 synchronized 鎖住 applications 對象似乎沒什么問題。但其實是有問題的,我們先來看一個例子。

    class Synchronized {private List aList = new ArrayList();public void anyObject1() {// 和 HistoryServer 玩法一致,鎖住 aList 對象,代碼塊中用 aList2 更新 aList 對象值synchronized (aList) {List aList2 = new ArrayList();for (int i = 0; i < 10; i++) {System.out.println("anyObject" + "-" + Thread.currentThread());aList2.add(1);}aList = aList2;System.out.println("aList =" + aList.size());}} }public class SynchronizedDemo01 {public static void main(String[] args) {SynchronizedDemo01 syn = new SynchronizedDemo01();syn.anyObjTest();}public void anyObjTest() {final Synchronized syn = new Synchronized();// 啟動5個線程去操作aList對象,每次打印10條記錄for (int i = 0; i < 5; i++) {new Thread() {@Overridepublic void run() {syn.anyObject1();}}.start();}} }運行結(jié)果:(隨機多運行幾次) anyObject-Thread[Thread-3,5,main] anyObject-Thread[Thread-2,5,main] anyObject-Thread[Thread-3,5,main] anyObject-Thread[Thread-2,5,main] anyObject-Thread[Thread-3,5,main] aList =10 anyObject-Thread[Thread-3,5,main] anyObject-Thread[Thread-3,5,main] anyObject-Thread[Thread-3,5,main] anyObject-Thread[Thread-3,5,main] anyObject-Thread[Thread-3,5,main] aList =10 anyObject-Thread[Thread-4,5,main] anyObject-Thread[Thread-4,5,main] anyObject-Thread[Thread-4,5,main]

    通過這個例子,可以看出 Thread-3 在 Thread-2 線程中打印了信息,也就是說通過這種方式鎖住 synchronized(aList 對象)(非 this 對象)是有問題的,線程并沒有真正的鎖住 aList 對象。那為什么會出現(xiàn)這種情況呢?我們接著看。

    https://blog.csdn.net/weixin_42762133/article/details/103241439 這篇文章給出了 Synchronized 鎖幾種使用場景。

    修飾目標(biāo)鎖
    方法實例方法當(dāng)前對象實例(即方法調(diào)用者)
    靜態(tài)方法類對象
    代碼塊this當(dāng)前對象實例(即方法調(diào)用者)
    class 對象類對象
    任意 Object 對象當(dāng)前對象實例(即方法調(diào)用者)

    這里重點介紹下 synchronized 修飾目標(biāo)為 this 和任意 Object 對象這兩種情況。要理解他們之間的區(qū)別,就需要搞清楚 synchronized 到底鎖住的是什么?在 https://juejin.im/post/6844903872431915022 這篇文章中,介紹了 synchronized 鎖住的內(nèi)容有兩種,一種是類,另一種是對象實例。這里的關(guān)鍵就在于第二種情況,當(dāng)使用 synchronized 鎖住的是對象實例時,HistoryServer 和上面 aList 的例子那就有問題了,怎么說呢?我們來看看下面這張圖。

    Synchronized 鎖住的對象示意圖

    通過這張圖就一目了然,synchronized(aList) 代碼塊鎖住的是 aList 對象指向的堆中的對象實例,當(dāng)在代碼塊中通過 aList = aList2 賦值后,aList 便指向的新的對象實例,導(dǎo)致原來的對象實例變成了無主狀態(tài),synchronized(aList) 代碼塊的鎖其實也就失去了意義。所以才會出現(xiàn)線程安全的問題。

    上面那段測試代碼如果采用 synchronized(this) 則不會出現(xiàn)多線程錯亂打印的情況,為什么呢?通過上表中我們知道 synchronized(this) 的鎖是當(dāng)前對象實例,即方法的調(diào)用者,在測試代碼中也就是 "SynchronizedDemo01 syn = new SynchronizedDemo01(); " 這里創(chuàng)建 syn 對象實例,在內(nèi)存中的表現(xiàn)為:

    Synchronized 對象堆內(nèi)表現(xiàn)示意圖

    使用 synchronized(this) 之所以不會出問題,是由于不管 aList 指向哪個對象實例,this 對象(即 syn 對象)指向的對象實例始終沒有變,所以多線程訪問 aList 不會出現(xiàn)線程安全問題。

    至此,HistoryServer 中的那段代碼塊是有問題的,并不能實現(xiàn) applications 對象的多線程安全訪問。

    4.2.2 解決方案

    分析清楚了具體原因后,解決方法就比較容易了,將那段代碼的 synchronized 鎖住的對象從 applications 對象改成 this 對象即可。

    //位置:core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]= new mutable.LinkedHashMap()this.synchronized {... // 省略val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()... // 省略更新 mergedApps 的值applications = mergedApps }

    4.3.3 一點小擴展

    上面解決了 synchronized 鎖住 applications 非 this 對象的問題,那 Spark 中為什么不直接用 this 對象呢?這里還是有一點小竅門的。那就是 synchronzied(this) 比 Synchronized(非this) 的效率要低一些,為什么這么說呢?來看兩個例子。

    例子1:兩個線程使用同一個對象分別訪問 synchronized 方法和 synchronized(str) 代碼塊。

    結(jié)論:兩個線程是異步執(zhí)行的,Thread1 鎖住的 ‘str’ Object 對象實例,而 Thread2 鎖住的是 service 對象實例,互不影響。

    public class SynchronizedDemo02 {static Service service = new Service();public static void main(String[] args) {new Thread () {@Overridepublic void run() {service.method1();}}.start();new Thread () {@Overridepublic void run() {service.method2();}}.start();} }class Service {String str = "test";public void method1() {synchronized (str) {System.out.println("method1 begin");try {Thread.sleep(1000);}catch (Exception e) {e.printStackTrace();}System.out.println("method1 end");}}public synchronized void method2() {System.out.println("method2 begin");try {Thread.sleep(1000);}catch (Exception e) {e.printStackTrace();}System.out.println("method2 end");} }結(jié)果輸出: method1 begin method2 begin method1 end method2 end

    例子2:兩個線程使用同一個對象分別訪問 synchronized 方法和 synchronized(this) 代碼塊。

    結(jié)論:兩個線程同步執(zhí)行,鎖住的是同一個 this 對象(即 service 對象),必須一個線程執(zhí)行完才能執(zhí)行另一個線程。

    public class SynchronizedDemo02 {static Service service = new Service();public static void main(String[] args) {new Thread () {@Overridepublic void run() {service.method1();}}.start();new Thread () {@Overridepublic void run() {service.method2();}}.start();} }class Service {String str = "test";public void method1() {synchronized (this) {System.out.println("method1 begin");try {Thread.sleep(1000);}catch (Exception e) {e.printStackTrace();}System.out.println("method1 end");}}public synchronized void method2() {System.out.println("method2 begin");try {Thread.sleep(1000);}catch (Exception e) {e.printStackTrace();}System.out.println("method2 end");} }結(jié)果輸出: method1 begin method1 end method2 begin method2 end

    所以,采用 synchronized(非 this 對象) 會減少當(dāng)前對象鎖與其他 synchorinzed(this) 代碼塊或 synchronized 方法之間的鎖競爭,與其他 synchronized 代碼異步執(zhí)行,互不影響,會提高代碼的執(zhí)行效率。

    【參考資料】

    • https://blog.csdn.net/u013332124/article/details/88350345 (HistoryServer 原理介紹)
    • https://issues.apache.org/jira/browse/SPARK-21223 (fileToAppInfo HashMap 線程安全解決 patch)
    • https://blog.csdn.net/winterking3/article/details/83858782 (Synchronized 非 this 對象同步代碼塊)
    • https://juejin.im/post/6844903872431915022 (Synchronized 到底鎖的什么?)

    總結(jié)

    以上是生活随笔為你收集整理的Spark HistoryServer日志解析清理异常的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。