调度流程图_Flink 实现Locality 模式调度
背景
在計算與存儲一體化的情況,spark任務在調度task時會優先將其調度在數據所在的節點上或者相同的rack上,這樣可以減少數據在不同節點或者不同rack上移動所帶來的性能消耗;目前在Flink on yarn模式下,TaskExecutor的資源位置完全由yarn自主控制的,那么就可能會造成任務所在的節點與kafka數據所在的節點不在同一個機房,從而產生跨機房的流量消耗,在這樣的一個環境背景下,需要將任務調度在數據所在機房,以減少流量消耗。(注:基于Flink-1.10.1)
Flink on Yarn調度流程
在Flink-1.9版本以前使用的調度模式是LAZY_FROM_SOURCES即以source-vertex為起始節點開始調度,當有數據輸出到下游節點時開始調度下游的vertex,以這種方式部署所有的vertex;在1.9及1.9版本以后使用EAGER調度模式即會立刻調度所有的vertex。下面看一下具體的調度流程圖:任務調度與部署是在JobMaster中通過DefaultScheduler完成,其會首先為所有的ExecutionVertex向SlotPoo(1)l申請資源然后部署,SlotPool會向ResourceManager中SlotManager(2)申請資源,如果沒有可用的資源,那么就會向Yarn申請一個Container(3),待yarn分配了資源之后,回調給YarnResourceManager,進而啟動TaskExecutor(4),TaskExecutor啟動之后就會向YarnResourceManager匯報其資源情況(5),在YarnResourceManager進行資源匹配之后就會向TaskExecutor申請資源(6),然后TaskExecutor會將自身的資源分配給SlotPool(7), 最后告知給DefaultScheduler(8)將任務部署到對應的TaskExecutor上。至此完成一次完整的任務調度過程。
在SlotPool向SlotManager申請資源前,會生成一個AllocationId的唯一標識(資源ID),并且在申請的時候會將這個標識一起攜帶過去,當TaskExecutor向YarnResourceManager匯報自身資源情況時,在YarnResourceManager中會做一個資源請求(攜帶AllocationId)與實際資源匹配的過程,主要是通過資源大小(cpu、內存)匹配,匹配成功之后YarnResourceManager會向TaskExecutor發送一個申請slot請求(攜帶AllocationId),待請求成功之后TaskExecutor會將資源分配給對應的AllocationId的請求(7),完成資源匹配過程。
Locality 調度實現分析
通常Flink與kafka是部署在不同的集群上,這里所說的Locality僅僅是實現rack級別的調度,即將任務調度在kafka對應分區數據所在的rack上,為了實現此功能,分為以下幾個步驟:
1)數據分配:Flink每一個Source-Task拉取partition是按照一定規則進行分配的,為了實現相同rack的partition在同一個task,因此需要改變其分配策略;為了保證每一個rack的數據都被消費到,需要對source并行度進行擴張,以前可能一個task消費所有rack的數據,現在需要每一個rack上的數據都有對應的task去拉取數據
實現:在flink-conf.yaml 中配置yarn集群機器分布情況,包括ip以及對應的rack信息,那么任務啟動會獲取這些信息;在StreamGraphGenerator中的transformSource方法提前生成每個source-task消費的對應topic與partition信息,以及其需要調度到的rack信息。這里主要說明一下目前的分配策略:
例如:有a,b,c 三個rack, topic1對應partition:[0,1,2,3,4,5], 可通過KafkaConsumer的partitionsFor方法獲取對應的partition信息,parition的分布情況是:a ->[0,1],b->[2,3],c->[4,5]如果設置的并行度為:1 ,則分配規則是:task0(a)->[0,1],task1(b)->[2,3],task2(c)->[4,5]如果設置的并行度為:4 ,則分配規則是:task0(a)->[0],task1(b)->[2],task2(c)->[4],task3(a)->[1],task4(b)->[3],task5(c)->[5]注:task0 表示下標為0的task擴充規則是:userSourceParallelism%numRack==0?userSourceParallelism:(1+userSourceParallelism/numRack)*numRack, 即生成的并行度是rack個數的整數倍。
生成的配置放在ExecutionConfig中的GlobalParameters中,實際效果圖:
代表著下標為0的task消費partition-2,同時部署在rack-a中的機器上,下標為1的task消費partition-1,同時部署在rack-b的機器上,下標為2的task消費partition-0,同時部署在rack-c中的機器上。
2)資源申請:默認情況下在Flink向Yarn申請資源是不攜帶任何NodeManager信息的,通常需要向yarn申請資源的流程是當遇到新的Source-Task時才會去走這個流程(根據slot-shared機制),因此只需要在Source對應的ExecutionVetex上打上對應的rack標簽即可,將這個rack一直傳遞到YarnResourceManager端,然后獲取該rack對應的機器,從這些機器上申請資源。
實現:在申請資源前會給ExecutionVertex配置相關的資源信息,在ExecutionVertexSchedulingRequirementsMapper.getPhysicalSlotResourceProfile中完成,因此在這里對ExecutionVertex的資源信息打上rack信息
boolean hasNoConnectedInputs=executionVertex.getJobVertex().getJobVertex().hasNoConnectedInputs(); if(hasNoConnectedInputs){ try{ int index=executionVertex.getParallelSubtaskIndex(); ExecutionConfig executionConfig=executionVertex.getJobVertex().getJobVertex().getJobGraph().getExecutionConfig(); Map map=executionConfig.getGlobalJobParameters().toMap(); String index2Zone=map.get("index2Zone"); String zone=""; ObjectMapper objectMapper=new ObjectMapper(); //index 表示該ExecutionVertext的下標Index zone=objectMapper.readTree(index2Zone).findValue(String.valueOf(index)).asText(); //賦予區域信息 ResourceProfile resourceProfile1=resourceProfile.copy2ZoneUnknown(resourceProfile,zone); LOG.debug("vertexName:{},ResourceProfile:{}",executionVertex.getJobVertex().getName(),resourceProfile1); return resourceProfile1; }catch (Throwable e){ LOG.error("parse resourceProfile error:{}",e); } }在這里重新定義了ResourceProfile,賦予了其rack信息,ResourceProfile會一直傳遞到YarnResourceManager資源申請端:
public CollectionstartNewWorker(ResourceProfile resourceProfile) { if (!resourceProfilesPerWorker.iterator().next().isMatching(resourceProfile)) { return Collections.emptyList(); } //zone 表示 rack信息 String zone=resourceProfile.getZone(); if(zone!=null){ requestYarnContainer(zone); }else{ requestYarnContainer(); } return resourceProfilesPerWorker; }重新定義了requestYarnContainer流程,使請求包含rack信息:AMRMClient.ContainerRequest getContainerRequest(String zone) { String[] ipList= ResourceManager.ZONE_IPS.get(zone).split(",");//獲取該rack下的所有iplist LOG.debug("request slot from [{}] for zone [{}]",ipList,zone); AMRMClient.ContainerRequest request= new AMRMClient.ContainerRequest( getContainerResource(), ipList, null, RM_REQUEST_PRIORITY,false);//false:RelaxLocality表示不允許資源降級申請,一定要使其分布在指定的機器上 containerRequestList.add(request); return request; }由于yarn返回的是一個滿足請求的一個資源集合,因此需要在滿足的集合中做資源過濾,將多余資源返回給yarn,因此在回調方法onContainersAllocated中:
public void onContainersAllocated(List containers) { runAsync(() -> { log.info("Received {} containers with {} pending container requests.", containers.size(), numPendingContainerRequests); //final Collection pendingReques ts = getPendingRequests(); //請求到的host List requestedHost=new ArrayList<>(); containers.stream().map(container -> container.getNodeId().getHost()).forEach(requestedHost::add); //獲取滿足匹配的請求 final Collection pendingRequests=containerRequestList.stream().map(containerRequest -> Tuple2.of(containerRequest.getNodes(),containerRequest)) .filter(tuple2-> requestedHost.stream().filter(host->tuple2.f0.contains(host)) .count()>0 ) .map(map->map.f1).collect(Collectors.toList()); int matchRequest=pendingRequests.size(); log.info("recevied container size : {}, matching request:{}",containers.size(),matchRequest); final Iterator pendingRequestsIterator = pendingRequests.iterator(); // number of allocated containers can be larger than the number of pending container requests //final int numAcceptedContainers = Math.min(containers.size(), numPendingContainerRequests); final int numAcceptedContainers = Math.min(matchRequest, numPendingContainerRequests); final List requiredContainers = containers.subList(0, numAcceptedContainers); final List excessContainers = containers.subList(numAcceptedContainers, containers.size()); for (int i = 0; i < requiredContainers.size(); i++) { //removeContainerRequest(pendingRequestsIterator.next()); AMRMClient.ContainerRequest needRemoveRequest=pendingRequestsIterator.next(); containerRequestList.remove(needRemoveRequest); removeContainerRequest(needRemoveRequest); } //返回多余的資源 excessContainers.forEach(this::returnExcessContainer); requiredContainers.forEach(this::startTaskExecutorInContainer); // if we are waiting for no further containers, we can go to the // regular heartbeat interval if (numPendingContainerRequests <= 0) { resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis); } }); }3) 資源匹配:默認情況下,在YarnResourceManager中做分配到的資源與申請的資源匹配時是按照大小進行的,因此需要改為按照rack進行匹配
實現:匹配的流程在SlotManager.findExactlyMatchingPendingTaskManagerSlot中:
private PendingTaskManagerSlot findExactlyMatchingPendingTaskManagerSlot(ResourceProfile resourceProfile,String zone) { for (PendingTaskManagerSlot pendingTaskManagerSlot : pendingSlots.values()) { LOG.info("zone:{},request_zone:{}",zone,pendingTaskManagerSlot.getResourceProfile().getZone()); /** * 區域匹配 */ if(zone.equals(pendingTaskManagerSlot.getResourceProfile().getZone())){ LOG.debug("get resource zone:{},resourceProfile:{}",zone,pendingTaskManagerSlot.getResourceProfile()); return pendingTaskManagerSlot; }完成了這個資源匹配過程,并且在后續的流程中由AllocationId完成資源與具體的ExecutionVertex請求匹配,就可以將ExecutionVertex部署到匹配的機器上。
4) 指定source的消費數據:在數據分配中已經將每個task消費的數據指定好了,因此在source端只需要獲取對應的分區信息即可,同時需要放棄默認的分配策略
實現:FlinkKafkaConsumerBase.open 中:
final List allPartitions = new ArrayList<>(); //從配置里面獲取 Map globalMaps=getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap(); String index2TopicPartitionsStr=globalMaps.get("index2TopicPartitions"); ObjectMapper objectMapper=new ObjectMapper(); JsonNode rootNode=objectMapper.readTree(index2TopicPartitionsStr); JsonNode topicPartitionNode=rootNode.findValue(String.valueOf(getRuntimeContext().getIndexOfThisSubtask())); topicPartitionNode.fieldNames().forEachRemaining(topic->{ JsonNode partitionsNode=topicPartitionNode.findValue(topic); partitionsNode.iterator().forEachRemaining(jsonNode -> { allPartitions.add(new KafkaTopicPartition(topic,jsonNode.asInt())); }); }); allPartitions.stream().forEach(x->{ LOG.debug("consumer topic:{}, partition:{}",x.getTopic(),x.getPartition()); });allPartitions 就代表了該task需要消費的數據。
至此整個流程完成。
總結
在實現該方案前,也做過在任務調度后直接在FlinkKafkaConsumerBase中自定義partition的分配,即根據機器的所在rack去獲取對應的rack上的數據,但是經常會出現有數據的rack上沒有對應的rack任務,只能做降級處理,將這些rack上的分區數據分配給其他rack上的任務,仍然會有部分的數據跨機房拉取,流量成本消耗縮減效果并不好,因此才做了這個Locality的方案,由于涉及的內容比較多,本文只提供了一個實現的思路與關鍵的部分代碼。目前的實現方案仍然存在以下幾個限制:
? 1.一個任務只能消費一個kafka集群的數據,由于slot-share機制,不同的JobVertext可以分配到同一個Slot上,如果有多個kafka集群的話,source就會對應多個JobVertex,那么在后續的JobVertext在申請資源的時候就會尋找前面已經申請到資源的JobVertext,很有可能會匹配到其他的rack的資源,目前并未對這塊進行改造。
?? 2.一個TaskExecutor只分配一個Slot,如果有多個slot的話,第一次申請后,后續SlotPool向YarnResourceManager申請資源時,直接發現有可用的Slot就會直接分配,很有可能會匹配到其他的rack的資源,目前并未對這塊進行改造。
? ?3.如果topic的partition在rack分配不均勻,可能會造成流量傾斜,因此需要在topic創建中做好partition的分布。
?? 4.由于source-vertext的擴充,會導致需要的資源變多,因此需要在cpu/內存與流量成本消耗之間權衡。
目前在使用上主要是針對大的topic采取該方案,流量成本也有很顯著的縮減效果,后續會對以上問題進行優化。總結
以上是生活随笔為你收集整理的调度流程图_Flink 实现Locality 模式调度的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 画一个圆角多边形_CAD零基础教程,矩形
- 下一篇: 何晓群pdf 应用回归分析第五版_暨南社