日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark系列(八)Worker工作原理

發布時間:2025/5/22 编程问答 21 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark系列(八)Worker工作原理 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

工作原理圖

?

源代碼分析

包名:org.apache.spark.deploy.worker

啟動driver入口點:registerWithMaster方法中的case LaunchDriver

1? case?LaunchDriver(driverId,?driverDesc)?=>?{
2? ??? logInfo(s"Asked?to?launch?driver?$driverId")
3? ??? //?創建DriverRunner對象啟動Driver
4? ??? val?driver?=?new?DriverRunner(
5? ??? conf,
6? ??? driverId,
7? ??? workDir,
8? ??? sparkHome,
9? ??? driverDesc.copy(command?=?Worker.maybeUpdateSSLSettings(driverDesc.command,?conf)),
10? ??? self,
11? ??? akkaUrl)
12? ??? //?將driver加入本地緩存
13? ??? drivers(driverId)?=?driver
14? ??? driver.start()
15? ?
16? ??? //?增加已使用core
17? ??? coresUsed?+=?driverDesc.cores
18? ??? //?增加已使用內存
19? ??? memoryUsed?+=?driverDesc.mem
20? }

?

DriverRunner

管理一個driver的執行,包括失敗時自動重啟driver,這種方式僅僅適用于standalone集群部署模式

DriverRunner類中start方法實現

1? def?start()?=?{
2? ??? //?創建新線程
3? ??? new?Thread("DriverRunner?for?"?+?driverId)?{
4? ??? ??override?def?run()?{
5? ??? ??? try?{
6? ??? ??? ??//?創建driver工作目錄
7? ??? ??? ??val?driverDir?=?createWorkingDirectory()
8? ??? ??? ??//?下載應用所需的的Jar包
9? ??? ??? ??val?localJarFilename?=?downloadUserJar(driverDir)
10? ?
11? ??? ??? ??def?substituteVariables(argument:?String):?String?=?argument?match?{
12? ??? ??? ??? case?"{{WORKER_URL}}"?=>?workerUrl
13? ??? ??? ??? case?"{{USER_JAR}}"?=>?localJarFilename
14? ??? ??? ??? case?other?=>?other
15? ??? ??? ??}
16? ?
17? ??? ??? ??//?TODO:?If?we?add?ability?to?submit?multiple?jars?they?should?also?be?added?here
18? ??? ??? ??//?構建ProcessBuilder對象,傳入啟動driver命令(所需內存大小)
19? ??? ??? ??val?builder?=?CommandUtils.buildProcessBuilder(driverDesc.command,?driverDesc.mem,
20? ??? ??? ??? sparkHome.getAbsolutePath,?substituteVariables)
21? ??? ??? ??//?啟動driver進程
22? ??? ??? ??launchDriver(builder,?driverDir,?driverDesc.supervise)
23? ??? ??? }
24? ??? ??? catch?{
25? ??? ??? ??case?e:?Exception?=>?finalException?=?Some(e)
26? ??? ??? }
27? ?
28? ??? ??? //?Driver退出狀態處理
29? ??? ??? val?state?=
30? ??? ??? ??if?(killed)?{
31? ??? ??? ??? DriverState.KILLED
32? ??? ??? ??}?else?if?(finalException.isDefined)?{
33? ??? ??? ??? DriverState.ERROR
34? ??? ??? ??}?else?{
35? ??? ??? ??? finalExitCode?match?{
36? ??? ??? ??? ??case?Some(0)?=>?DriverState.FINISHED
37? ??? ??? ??? ??case?_?=>?DriverState.FAILED
38? ??? ??? ??? }
39? ??? ??? ??}
40? ?
41? ??? ??? finalState?=?Some(state)
42? ??? ??? //?向Driver所屬worker發送DriverStateChanged消息
43? ??? ??? worker?!?DriverStateChanged(driverId,?state,?finalException)
44? ??? ??}
45? ??? }.start()
46? }

?

LaunchExecutor

管理LaunchExecutor的啟動

1? case?LaunchExecutor(masterUrl,?appId,?execId,?appDesc,?cores_,?memory_)?=>
2? ??? if?(masterUrl?!=?activeMasterUrl)?{
3? ??? logWarning("Invalid?Master?("?+?masterUrl?+?")?attempted?to?launch?executor.")
4? ??? }?else?{
5? ??? try?{
6? ??? ??logInfo("Asked?to?launch?executor?%s/%d?for?%s".format(appId,?execId,?appDesc.name))
7? ?
8? ??? ??//?Create?the?executor's?working?directory
9? ??? ??//?創建executor本地工作目錄
10? ??? ??val?executorDir?=?new?File(workDir,?appId?+?"/"?+?execId)
11? ??? ??if?(!executorDir.mkdirs())?{
12? ??? ??? throw?new?IOException("Failed?to?create?directory?"?+?executorDir)
13? ??? ??}
14? ?
15? ??? ??//?Create?local?dirs?for?the?executor.?These?are?passed?to?the?executor?via?the
16? ??? ??//?SPARK_LOCAL_DIRS?environment?variable,?and?deleted?by?the?Worker?when?the
17? ??? ??//?application?finishes.
18? ??? ??val?appLocalDirs?=?appDirectories.get(appId).getOrElse?{
19? ??? ??? Utils.getOrCreateLocalRootDirs(conf).map?{?dir?=>
20? ??? ??? ??Utils.createDirectory(dir).getAbsolutePath()
21? ??? ??? }.toSeq
22? ??? ??}
23? ??? ??appDirectories(appId)?=?appLocalDirs
24? ??? ??//?創建ExecutorRunner對象
25? ??? ??val?manager?=?new?ExecutorRunner(
26? ??? ??? appId,
27? ??? ??? execId,
28? ??? ??? appDesc.copy(command?=?Worker.maybeUpdateSSLSettings(appDesc.command,?conf)),
29? ??? ??? cores_,
30? ??? ??? memory_,
31? ??? ??? self,
32? ??? ??? workerId,
33? ??? ??? host,
34? ??? ??? webUi.boundPort,
35? ??? ??? publicAddress,
36? ??? ??? sparkHome,
37? ??? ??? executorDir,
38? ??? ??? akkaUrl,
39? ??? ??? conf,
40? ??? ??? appLocalDirs,?ExecutorState.LOADING)
41? ??? ??//?executor加入本地緩存
42? ??? ??executors(appId?+?"/"?+?execId)?=?manager
43? ??? ??manager.start()
44? ??? ??//?增加worker已使用core
45? ??? ??coresUsed?+=?cores_
46? ??? ??//?增加worker已使用memory
47? ??? ??memoryUsed?+=?memory_
48? ??? ??//?通知master發送ExecutorStateChanged消息
49? ??? ??master?!?ExecutorStateChanged(appId,?execId,?manager.state,?None,?None)
50? ??? }
51? ??? //?異常情況處理,通知master發送ExecutorStateChanged?FAILED消息
52? ??? catch?{
53? ??? ??case?e:?Exception?=>?{
54? ??? ??? logError(s"Failed?to?launch?executor?$appId/$execId?for?${appDesc.name}.",?e)
55? ??? ??? if?(executors.contains(appId?+?"/"?+?execId))?{
56? ??? ??? ??executors(appId?+?"/"?+?execId).kill()
57? ??? ??? ??executors?-=?appId?+?"/"?+?execId
58? ??? ??? }
59? ??? ??? master?!?ExecutorStateChanged(appId,?execId,?ExecutorState.FAILED,
60? ??? ??? ??Some(e.toString),?None)
61? ??? ??}
62? ??? }
63? }

?

總結

1、Worker、Driver、Application啟動后都會向Master進行注冊,并緩存到Master內存數據模型中
2、完成注冊后發送LaunchExecutor、LaunchDriver到Worker
3、Worker收到消息后啟動executor和driver進程,并調用Worker的ExecutorStateChanged和DriverStateChanged方法
4、發送ExecutorStateChanged和DriverStateChanged消息到Master的,根據各自的狀態信息進行處理,最重要的是會調用schedule方法進行資源的重新調度

轉載于:https://www.cnblogs.com/jianyuan/p/Spark%E7%B3%BB%E5%88%97%E4%B9%8BWorker%E5%B7%A5%E4%BD%9C%E5%8E%9F%E7%90%86.html

總結

以上是生活随笔為你收集整理的Spark系列(八)Worker工作原理的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。