日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

flink on yarn部分源码解析 (FLIP-6 new mode)

發(fā)布時(shí)間:2025/7/14 编程问答 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 flink on yarn部分源码解析 (FLIP-6 new mode) 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

我們?cè)趆ttps://www.cnblogs.com/dongxiao-yang/p/9403427.html文章里分析了flink提交single job到y(tǒng)arn集群上的代碼,flink在1.5版本后對(duì)整個(gè)框架的deploy方式重構(gòu)了全新的流程(參考https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077),本文基于flink1.6.1版本源碼分析一下新模式在yarn的整個(gè)流程。

?

一 初始化

客戶端本地整個(gè)初始化流程與https://www.cnblogs.com/dongxiao-yang/p/9403427.html差不多,由于newmode的關(guān)系,幾個(gè)有區(qū)別的地方為

1?final ClusterDescriptor<T> clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine); ,返回的具體對(duì)象類為YarnClusterDescriptor

2 ClientFrontend.runProgram方法會(huì)進(jìn)入if (isNewMode && clusterId == null && runOptions.getDetachedMode()) {..方法塊,調(diào)用路徑為

YarnClusterDescriptor.deployJobCluster->AbstractYarnClusterDescriptor.deployInternal->startAppMaster

這個(gè)時(shí)候我們發(fā)現(xiàn)AM的啟動(dòng)類變成了YarnJobClusterEntrypoint

?

二?YarnJobClusterEntrypoint

?YarnJobClusterEntrypoint的main函數(shù)是整個(gè)AM進(jìn)程的啟動(dòng)入口,在方法的最后會(huì)調(diào)用其祖父類ClusterEntrypoint的startCluster方法開(kāi)啟整個(gè)集群組件的啟動(dòng)過(guò)程。

具體調(diào)用鏈路為startCluster->runCluster->startClusterComponents

protected void startClusterComponents(Configuration configuration,RpcService rpcService,HighAvailabilityServices highAvailabilityServices,BlobServer blobServer,HeartbeatServices heartbeatServices,MetricRegistry metricRegistry) throws Exception {synchronized (lock) {dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever();LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(rpcService,DispatcherGateway.class,DispatcherId::fromUuid,10,Time.milliseconds(50L));LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(rpcService,ResourceManagerGateway.class,ResourceManagerId::fromUuid,10,Time.milliseconds(50L));// TODO: Remove once we have ported the MetricFetcher to the RpcEndpointfinal ActorSystem actorSystem = ((AkkaRpcService) rpcService).getActorSystem();final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));webMonitorEndpoint = createRestEndpoint(configuration,dispatcherGatewayRetriever,resourceManagerGatewayRetriever,transientBlobCache,rpcService.getExecutor(),new AkkaQueryServiceRetriever(actorSystem, timeout),highAvailabilityServices.getWebMonitorLeaderElectionService());LOG.debug("Starting Dispatcher REST endpoint.");webMonitorEndpoint.start();resourceManager = createResourceManager(configuration,ResourceID.generate(),rpcService,highAvailabilityServices,heartbeatServices,metricRegistry,this,clusterInformation,webMonitorEndpoint.getRestBaseUrl());jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, rpcService.getAddress());final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint);dispatcher = createDispatcher(configuration,rpcService,highAvailabilityServices,resourceManager.getSelfGateway(ResourceManagerGateway.class),blobServer,heartbeatServices,jobManagerMetricGroup,metricRegistry.getMetricQueryServicePath(),archivedExecutionGraphStore,this,webMonitorEndpoint.getRestBaseUrl(),historyServerArchivist);LOG.debug("Starting ResourceManager.");resourceManager.start();resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);LOG.debug("Starting Dispatcher.");dispatcher.start();dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);}}

從上述代碼里可以發(fā)現(xiàn),AM里面包含兩個(gè)重要的全新組件:ResourceManager和Dispatcher

?

在FLIP6的改進(jìn)下,Resource這個(gè)全新的角色定義如下:

The main tasks of the ResourceManager are

  • Acquire new TaskManager?(or slots) by starting containers, or allocating them to a job

  • Giving failure notifications?to JobManagers and TaskManagers

  • Caching TaskManagers?(containers) to be reused, releasing TaskManagers (containers) that are unused for a certain period.

大體來(lái)說(shuō)就是由ResourceManager負(fù)責(zé)和YARN集群進(jìn)行資源申請(qǐng)上的溝通,并給指定JobManager分配特定

?

aa

在yarn模式下,ResourceManager對(duì)應(yīng)的實(shí)現(xiàn)類為YarnResourceManager,在這個(gè)類的initialize方法中,我們可以發(fā)現(xiàn)它實(shí)例化了兩個(gè)client,resourceManagerClient和nodeManagerClient,這兩個(gè)客戶端分別包含了Yarn框架的AMRMClientAsync和NMClient,分別用來(lái)負(fù)責(zé)和Yarn的ResourceManager和NodeManager通信。

@Overrideprotected void initialize() throws ResourceManagerException {try {resourceManagerClient = createAndStartResourceManagerClient(yarnConfig,yarnHeartbeatIntervalMillis,webInterfaceUrl);} catch (Exception e) {throw new ResourceManagerException("Could not start resource manager client.", e);}nodeManagerClient = createAndStartNodeManagerClient(yarnConfig);} View Code

?

關(guān)于Dispatcher的定義如下,它取代了以前由jobManager負(fù)責(zé)的提交job給集群的工作,并且預(yù)期將來(lái)可以由一個(gè)dispatcher提交任務(wù)給多個(gè)集群。

The new design includes the concept of a?Dispatcher. The dispatcher accepts job submissions from clients and starts the jobs on their behalf on a cluster manager.

The dispatcher is introduced because:

  • Some cluster managers need a central job spawning and monitoring instance

  • It subsumes the role of the standalone JobManager, waiting for jobs to be submitted

在本文的條件下,Dispatcher具體的實(shí)現(xiàn)類為MiniDispatcher,在dispatcher.start();調(diào)用后,整個(gè)調(diào)用鏈經(jīng)過(guò)了

leaderElectionService.start(this)-> ZooKeeperLeaderElectionService.start-> ZooKeeperLeaderElectionService.isLeader-> Dispatcher.grantLeadership-> tryAcceptLeadershipAndRunJobs-> runJob-> createJobManagerRunner

調(diào)到了DisPatcher的createJobManagerRunner方法。

private CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) {final RpcService rpcService = getRpcService();final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = CompletableFuture.supplyAsync(CheckedSupplier.unchecked(() ->jobManagerRunnerFactory.createJobManagerRunner(ResourceID.generate(),jobGraph,configuration,rpcService,highAvailabilityServices,heartbeatServices,blobServer,jobManagerSharedServices,new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),fatalErrorHandler)),rpcService.getExecutor());return jobManagerRunnerFuture.thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner));}

  

上述代碼可以分為兩個(gè)部分,第一部分經(jīng)過(guò)DefaultJobManagerRunnerFactory.createJobManagerRunner->new JobManagerRunner->new? ?JobMaster初始化了JobMaster對(duì)象。

第二部分經(jīng)過(guò)

startJobManagerRunner-> JobManagerRunner.start-> ZooKeeperLeaderElectionService.start-> ZooKeeperLeaderElectionService.isLeader->

JobManagerRunner.grantLeadership-> verifyJobSchedulingStatusAndStartJobManager->

jobMaster.start-> startJobExecution-> private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {validateRunsInMainThread();checkNotNull(newJobMasterId, "The new JobMasterId must not be null.");if (Objects.equals(getFencingToken(), newJobMasterId)) {log.info("Already started the job execution with JobMasterId {}.", newJobMasterId);return Acknowledge.get();}setNewFencingToken(newJobMasterId);startJobMasterServices();log.info("Starting execution of job {} ({})", jobGraph.getName(), jobGraph.getJobID());resetAndScheduleExecutionGraph();return Acknowledge.get();}private void startJobMasterServices() throws Exception {// start the slot pool make sure the slot pool now accepts messages for this leaderslotPool.start(getFencingToken(), getAddress());//TODO: Remove once the ZooKeeperLeaderRetrieval returns the stored address upon start// try to reconnect to previously known leaderreconnectToResourceManager(new FlinkException("Starting JobMaster component."));// job is ready to go, try to establish connection with resource manager// - activate leader retrieval for the resource manager// - on notification of the leader, the connection will be established and// the slot pool will start requesting slotsresourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());}  

JobMaster首先調(diào)用startJobMasterServices進(jìn)行連接flink resource manager ,啟動(dòng)jobmanager服務(wù)并注冊(cè)等操作。然后通過(guò)resetAndScheduleExecutionGraph執(zhí)行任務(wù)資源的初始化申請(qǐng)。resetAndScheduleExecutionGraph方法首先調(diào)用createAndRestoreExecutionGraph生成了整個(gè)任務(wù)的executiongraph,然后通過(guò)

scheduleExecutionGraph-> ExecutionGraph.scheduleForExecution-> scheduleEager-> ExecutionJobVertex.allocateResourcesForAll-> Execution.allocateAndAssignSlotForExecution-> ProviderAndOwner.allocateSlot-> SlotPool.allocateSlot-> allocateMultiTaskSlot

提出對(duì)任務(wù)slot資源的申請(qǐng)

SlotPool.requestSlotFromResourceManager-> ResourceManager.requestSlot-> SlotManager.registerSlotRequest-> internalRequestSlot->
ResourceActionsImpl.allocateResource-> YarnResourceManager.startNewWorker->

申請(qǐng)啟動(dòng)新的TaskManager

@Overridepublic void startNewWorker(ResourceProfile resourceProfile) {log.info("startNewWorker");// Priority for worker containers - priorities are intra-application//TODO: set priority according to the resource allocatedPriority priority = Priority.newInstance(generatePriority(resourceProfile));int mem = resourceProfile.getMemoryInMB() < 0 ? defaultTaskManagerMemoryMB : resourceProfile.getMemoryInMB();int vcore = resourceProfile.getCpuCores() < 1 ? defaultCpus : (int) resourceProfile.getCpuCores();Resource capability = Resource.newInstance(mem, vcore);requestYarnContainer(capability, priority);}private void requestYarnContainer(Resource resource, Priority priority) {resourceManagerClient.addContainerRequest(new AMRMClient.ContainerRequest(resource, null, null, priority));// make sure we transmit the request fast and receive fast news of granted allocations resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);numPendingContainerRequests++;log.info("Requesting new TaskExecutor container with resources {}. Number pending requests {}.",resource,numPendingContainerRequests);} View Code

?

?上述代碼就是flink resourcemanager 通過(guò)yarn客戶端與yarn通信申請(qǐng)taskmanager部分代碼

?

@Overridepublic void onContainersAllocated(List<Container> containers) {log.info("onContainersAllocated");runAsync(() -> {for (Container container : containers) {log.info("Received new container: {} - Remaining pending container requests: {}",container.getId(),numPendingContainerRequests);if (numPendingContainerRequests > 0) {numPendingContainerRequests--;final String containerIdStr = container.getId().toString();final ResourceID resourceId = new ResourceID(containerIdStr);workerNodeMap.put(resourceId, new YarnWorkerNode(container));try {// Context information used to start a TaskExecutor Java processContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext(container.getResource(),containerIdStr,container.getNodeId().getHost());nodeManagerClient.startContainer(container, taskExecutorLaunchContext);} catch (Throwable t) {log.error("Could not start TaskManager in container {}.", container.getId(), t);// release the failed containerworkerNodeMap.remove(resourceId);resourceManagerClient.releaseAssignedContainer(container.getId());// and ask for a new onerequestYarnContainer(container.getResource(), container.getPriority());}} else {// return the excessive containerslog.info("Returning excess container {}.", container.getId());resourceManagerClient.releaseAssignedContainer(container.getId());}}// if we are waiting for no further containers, we can go to the// regular heartbeat intervalif (numPendingContainerRequests <= 0) {resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);}});}

  

am客戶端在taskmanager的客戶端里會(huì)設(shè)置啟動(dòng)的主類org.apache.flink.yarn.YarnTaskExecutorRunner

?

轉(zhuǎn)載于:https://www.cnblogs.com/dongxiao-yang/p/9884516.html

總結(jié)

以上是生活随笔為你收集整理的flink on yarn部分源码解析 (FLIP-6 new mode)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。