日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

MapReduce Job集群提交过程源码跟踪及分析

發布時間:2024/1/17 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 MapReduce Job集群提交过程源码跟踪及分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

繼上篇文章對MapReduce Job本地提交過程進行分析之后?
在本篇文章中,同樣將通過debug的方式,對Job作業的集群提交過程進行分析?
Job作業集群的提交有別于本地的提交方式,本地的提交采用了LocalJobRunner,而集群提交則采用了YARNRunner?
決定使用LocalJobRunner還是YARNRunner取決于配置文件

所寫的MapReduce程序、debug的操作步驟以及本文中涉及到的內容均以整理好打包上傳,下載地址。

[mapred-site.xml] mapreduce.framework.name = local\yarn

Job作業集群提交過程簡略表示

先將整理得出的提交過程簡略標示圖,以方便閱讀后文所寫的關鍵代碼解析

job.submit() --> JobSubmitter.submitJobInternal() --> YARNRunner.submitJob(..) --> ResourceMgrDelegate.submitApplication(appContext) --> YarnClient.submitApplication(appContext) --> YarnClientImpl.submitApplication(...)SubmitApplicationRequest request = ... //創建請求報文 --> ApplicationClientProtocolPBClientImpl.submitApplication(request)SubmitApplicationRequestProto requestProto = request... //把request對象轉換成提交應用的請求協議 --> ProtobufRpcEngine$Invoker.invoke();RequestHeaderProto header = constructRpcRequestHeader(method); //消息頭Message body = (Message) args[1]; //消息體RpcResponseWrapper wrapper = (header,body); //消息頭+消息體,形成一個包裝器 --> ipc.Client.cal(...)client.call(RPC.RpcKind.RPC_RPOTOCOL_BUFFER,new RpcRequestWrapper(rpcRequestHeader,theRequest),remoteId,fallbackToSimpleAuth); --> ipc.Client.call(...)Call call = createCall(rpcKind,reqWrapper); --> Connection.sendRpcRequest(call);

源碼解析

org.apache.hadoop.mapreduce.Job類

public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException,ClassNotFoundException {if (state == JobState.DEFINE) {// 進行提交submit();}...public void submit() throws IOException, InterruptedException, ClassNotFoundException {ensureState(JobState.DEFINE);setUseNewAPI();// 進行連接connect();// 獲取JobSubmitter類型的submitterfinal JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {// 調用的是JobSubmitter類型的submitter 的 submitJobInternal()方法// 實質就是調用作業提交器的內部提交方法return submitter.submitJobInternal(Job.this, cluster);}});state = JobState.RUNNING;LOG.info("The url to track the job: " + getTrackingURL());}

org.apache.hadoop.mapreduce.JobSubmitter類

// 開始了內部提交過程JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException {//validate the jobs output specs // 先檢查空間checkSpecs(job);Configuration conf = job.getConfiguration();addMRFrameworkToDistributedCache(conf);Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);//configure the command line options correctly on the submitting dfsInetAddress ip = InetAddress.getLocalHost();if (ip != null) {submitHostAddress = ip.getHostAddress();submitHostName = ip.getHostName();conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);}JobID jobId = submitClient.getNewJobID();job.setJobID(jobId);Path submitJobDir = new Path(jobStagingArea, jobId.toString());JobStatus status = null;try {conf.set(MRJobConfig.USER_NAME,UserGroupInformation.getCurrentUser().getShortUserName());conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir");// get delegation token for the dirTokenCache.obtainTokensForNamenodes(job.getCredentials(),new Path[] { submitJobDir }, conf);populateTokenCache(conf, job.getCredentials());// generate a secret to authenticate shuffle transfersif (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {KeyGenerator keyGen;try {keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);keyGen.init(SHUFFLE_KEY_LENGTH);} catch (NoSuchAlgorithmException e) {throw new IOException("Error generating shuffle secret key", e);}SecretKey shuffleKey = keyGen.generateKey();TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),job.getCredentials());}if (CryptoUtils.isEncryptedSpillEnabled(conf)) {conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);LOG.warn("Max job attempts set to 1 since encrypted intermediate" +"data spill is enabled");}copyAndConfigureFiles(job, submitJobDir);Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);// Create the splits for the jobLOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));int maps = writeSplits(job, submitJobDir);conf.setInt(MRJobConfig.NUM_MAPS, maps);LOG.info("number of splits:" + maps);// write "queue admins of the queue to which job is being submitted"// to job file.String queue = conf.get(MRJobConfig.QUEUE_NAME,JobConf.DEFAULT_QUEUE_NAME);AccessControlList acl = submitClient.getQueueAdmins(queue);conf.set(toFullPropertyName(queue,QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());// removing jobtoken referrals before copying the jobconf to HDFS// as the tasks don't need this setting, actually they may break// because of it if present as the referral will point to a// different job.TokenCache.cleanUpTokenReferral(conf);if (conf.getBoolean(MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {// Add HDFS tracking idsArrayList<String> trackingIds = new ArrayList<String>();for (Token<? extends TokenIdentifier> t :job.getCredentials().getAllTokens()) {trackingIds.add(t.decodeIdentifier().getTrackingId());}conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,trackingIds.toArray(new String[trackingIds.size()]));}// Set reservation info if it existsReservationId reservationId = job.getReservationId();if (reservationId != null) {conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());}// Write job file to submit dir// 寫入配置文件到服務器writeConf(conf, submitJobFile);//// Now, actually submit the job (using the submit name)//printTokens(jobId, job.getCredentials());// YARNRunner的提交作業方法status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());if (status != null) {return status;} else {throw new IOException("Could not launch job");}} finally {if (status == null) {LOG.info("Cleaning up the staging area " + submitJobDir);if (jtFs != null && submitJobDir != null)jtFs.delete(submitJobDir, true);}}}

org.apache.hadoop.mapred.YARNRunner類

@Overridepublic JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)throws IOException, InterruptedException {addHistoryToken(ts);// Construct necessary information to start the MR AM// 得到上下文對象ApplicationSubmissionContext appContext =createApplicationSubmissionContext(conf, jobSubmitDir, ts);// Submit to ResourceManagertry {// 調用資源管理器代理對象的提交應用程序方法// resMgrDelegate的值為:ResourceMgrDelegate(資源管理器代理對象)// Service org.apache.hadoop.mapred.ResourceMgrDelegate in state org.apache.hadoop.mapred.ResourceMgrDelegate: STARTEDApplicationId applicationId =resMgrDelegate.submitApplication(appContext);ApplicationReport appMaster = resMgrDelegate.getApplicationReport(applicationId);String diagnostics =(appMaster == null ?"application report is null" : appMaster.getDiagnostics());if (appMaster == null|| appMaster.getYarnApplicationState() == YarnApplicationState.FAILED|| appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {throw new IOException("Failed to run job : " +diagnostics);}return clientCache.getClient(jobId).getJobStatus(jobId);} catch (YarnException e) {throw new IOException(e);}}

org.apache.hadoop.mapred.ResourceMgrDelegate類

@Overridepublic ApplicationIdsubmitApplication(ApplicationSubmissionContext appContext)throws YarnException, IOException {// client的值為:Service org.apache.hadoop.yarn.client.api.impl.YarnClientImpl in state org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: STARTED// 調用的是YarnClientImpl的提交應用的方法return client.submitApplication(appContext);}

org.apache.hadoop.yarn.client.api.impl.YarnClientImpl類

@Overridepublic ApplicationIdsubmitApplication(ApplicationSubmissionContext appContext)throws YarnException, IOException {ApplicationId applicationId = appContext.getApplicationId();if (applicationId == null) {throw new ApplicationIdNotProvidedException("ApplicationId is not provided in ApplicationSubmissionContext");}// 構造一個提交應用請求的對象SubmitApplicationRequest request =Records.newRecord(SubmitApplicationRequest.class);request.setApplicationSubmissionContext(appContext);...// rmClient的值為:ApplicationClientProtocolPBClientImpl// Application客戶端協議客戶端實現類,也是一個提交應用的方法,同時傳入request請求對象// ApplicationClientProtocolPBClientImpl.submitApplication()rmClient.submitApplication(request);......}

org.apache.hadoop.io.retry.RetryInvocationHandler類

invoke(...){...// 反射// method:public abstract org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse org.apache.hadoop.yarn.api.ApplicationClientProtocol.submitApplication(org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest) throws org.apache.hadoop.yarn.exceptions.YarnException,java.io.IOException// 實質上method為:submitApplication()方法 // 實質上args為:request信息 Object ret = invokeMethod(method, args);...}...// java的反射方式protected Object invokeMethod(Method method, Object[] args) throws Throwable {try {if (!method.isAccessible()) {// 設置可訪問性method.setAccessible(true);}// 開始訪問return method.invoke(currentProxy.proxy, args);} catch (InvocationTargetException e) {throw e.getCause();}}

java.lang.reflect.Method類

@CallerSensitivepublic Object invoke(Object obj, Object... args)throws IllegalAccessException, IllegalArgumentException,InvocationTargetException{if (!override) {if (!Reflection.quickCheckMemberAccess(clazz, modifiers)) {Class<?> caller = Reflection.getCallerClass();checkAccess(caller, clazz, obj, modifiers);}}// 訪問器MethodAccessor ma = methodAccessor; // read volatileif (ma == null) {ma = acquireMethodAccessor();}// 進入該方法,開始進行反射獲取對象return ma.invoke(obj, args);}

org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl類

// 通過反射的方式,rmClient.submitApplication(request) rmClient的值實際上為ApplicationClientProtocolPBClientImpl// 再度調用了ApplicationClientProtocolPBClientImpl.submitApplication()方法@Overridepublic SubmitApplicationResponse submitApplication(SubmitApplicationRequest request) throws YarnException,IOException {// 構造了一個提交應用請求協議SubmitApplicationRequestProto requestProto =((SubmitApplicationRequestPBImpl) request).getProto();try {// 調用了代理的提交應用方法,在這里直接進入org.apache.hadoop.ipc.ProtobufRpcEngine$Invokerreturn new SubmitApplicationResponsePBImpl(proxy.submitApplication(null,requestProto));} catch (ServiceException e) {RPCUtil.unwrapAndThrowException(e);return null;}}

org.apache.hadoop.ipc.ProtobufRpcEngine類

Invoker{...public Object invoke(Object proxy, Method method, Object[] args)throws ServiceException {// 制作請求的頭協議// 查看rpcRequestHeader的值:// methodName: "submitApplication"declaringClassProtocolName: "org.apache.hadoop.yarn.api.ApplicationClientProtocolPB"clientProtocolVersion: 1// bitField0_ 7 // clientProtocolVersion_ 1 // declaringClassProtocolName_ "org.apache.hadoop.yarn.api.ApplicationClientProtocolPB" (id=4069) // memoizedHashCode 0 // memoizedIsInitialized 1 // memoizedSerializedSize -1 // memoizedSize -1 // methodName_ "submitApplication" (id=4073) // unknownFields UnknownFieldSet (id=4075) RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);...// 消息,實際上為第一個參數 theRequest的具體值在上文的操作步驟里進行查看// theRequest = YarnProtos$ApplicationSubmissionContextProto 上文是請求頭協議$下文是提交應用請求協議// theRequest的值的格式為JSON格式,具體內容由于篇幅過長,將在后文中給出 Message theRequest = (Message) args[1];...// Wrapper包裝類final RpcResponseWrapper val;try {// 在構造RpcRequestWrapper類的同時調用了org.apache.hadoop.ipc.Client.call()方法// 涉及到了hadoop的底層// 該方法的作用:傳入前面封裝的RpcRequestWrapper類的對象,帶上RPC的類型,套接字的遠程地址(8032端口,即資源管理器)val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,// 把頭和消息加在一起構成了Wrapper包裝類new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId,fallbackToSimpleAuth);} catch{...}...}...}

org.apache.hadoop.ipc.Client類

public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,ConnectionId remoteId, AtomicBoolean fallbackToSimpleAuth)throws IOException {return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT,fallbackToSimpleAuth);}...public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,ConnectionId remoteId, int serviceClass,AtomicBoolean fallbackToSimpleAuth) throws IOException {// 創建一個Call對象,將rpc的操作類型和request傳入// 實質上就是傳入了一個RpcRequestWrapper類final Call call = createCall(rpcKind, rpcRequest);// 得到一個連接// connection的值為:Thread[IPC Client (975033189) connection to s100/192.168.26.100:8032 from zhaotao,5,main]Connection connection = getConnection(remoteId, call, serviceClass,fallbackToSimpleAuth);...// 傳入call對象,通過連接去發送RPC請求connection.sendRpcRequest(call);...sendRpcRequest(..){...// 先將call對象寫入到當前的數據輸出緩沖區final DataOutputBuffer d = new DataOutputBuffer();// 制作了RPC請求的頭協議RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,clientId);// 將協議頭寫入到緩沖區header.writeDelimitedTo(d);// 將call對象的RPC請求也寫入了緩沖區call.rpcRequest.write(d);// 將數據都寫入了DataOutputBuffer里去了,DataOutputBuffer繼承了DataOutputStream......// 使用了同步機制 synchronized (sendRpcRequestLock) {// senderFuture即為在線程池中,將來某個階段要運行的程序Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {@Overridepublic void run() {try {// Connection.this.out為連接中的流對象(連接中的數據流)synchronized (Connection.this.out) {if (shouldCloseConnection.get()) {return;}if (LOG.isDebugEnabled())LOG.debug(getName() + " sending #" + call.id);// d為本地存放的緩沖區數據 得到d中的所有數據byte[] data = d.getData();// 得到數據長度int totalLength = d.getLength();// 把長度寫入輸出流out.writeInt(totalLength); // Total Length// 把數據寫入到輸出流中去out.write(data, 0, totalLength);// RpcRequestHeader + RpcRequest// 清理輸出流out.flush();// 至此,客戶端往服務端的消息發送完畢// out:DataOutputStream --> BufferedOutputStream --> SocketOutputStream --> SocketOutputStream$Writer --> SocketChannel}} ...}

Job作業集群提交流程分析圖

注:圖片不清晰,右鍵圖片在新標簽頁中顯示圖片即可,可以放大觀看。對于圖片格式調整,不太在行。。抱歉

附錄

org.apache.hadoop.ipc.ProtobufRpcEngine類中Message theRequest = (Message) args[1]語句里的 theRequest 值

application_submission_context {application_id {id: 1cluster_timestamp: 1494680303350}application_name: "Max temperature"queue: "default"am_container_spec {localResources {key: "jobSubmitDir/job.splitmetainfo"value {resource {scheme: "hdfs"host: "s100"port: 8020file: "/tmp/hadoop-yarn/staging/zhaotao/.staging/job_1494680303350_0001/job.splitmetainfo"}size: 52timestamp: 1494681000611type: FILEvisibility: APPLICATION}}localResources {key: "job.jar"value {resource {scheme: "hdfs"host: "s100"port: 8020file: "/tmp/hadoop-yarn/staging/zhaotao/.staging/job_1494680303350_0001/job.jar"}size: 6739timestamp: 1494680999584type: PATTERNvisibility: APPLICATIONpattern: "(?:classes/|lib/).*"}}localResources {key: "jobSubmitDir/job.split"value {resource {scheme: "hdfs"host: "s100"port: 8020file: "/tmp/hadoop-yarn/staging/zhaotao/.staging/job_1494680303350_0001/job.split"}size: 195timestamp: 1494681000515type: FILEvisibility: APPLICATION}}localResources {key: "job.xml"value {resource {scheme: "hdfs"host: "s100"port: 8020file: "/tmp/hadoop-yarn/staging/zhaotao/.staging/job_1494680303350_0001/job.xml"}size: 98114timestamp: 1494681001303type: FILEvisibility: APPLICATION}}tokens: "HDTS\000\000\001\025MapReduceShuffleToken\b\336U\020\246\317\t\253\315"environment {key: "HADOOP_CLASSPATH"value: "$PWD:job.jar/job.jar:job.jar/classes/:job.jar/lib/*:$PWD/*:/soft/hadoop/contrib/capacity-scheduler/*.jar"}environment {key: "SHELL"value: "/bin/bash"}environment {key: "CLASSPATH"value: "$PWD:$HADOOP_CONF_DIR:$HADOOP_COMMON_HOME/share/hadoop/common/*:$HADOOP_COMMON_HOME/share/hadoop/common/lib/*:$HADOOP_HDFS_HOME/share/hadoop/hdfs/*:$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*:$HADOOP_YARN_HOME/share/hadoop/yarn/*:$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*:job.jar/job.jar:job.jar/classes/:job.jar/lib/*:$PWD/*"}environment {key: "LD_LIBRARY_PATH"value: "$PWD:{{HADOOP_COMMON_HOME}}/lib/native"}command: "$JAVA_HOME/bin/java -Djava.io.tmpdir=$PWD/tmp -Dlog4j.configuration=container-log4j.properties -Dyarn.app.container.log.dir=<LOG_DIR> -Dyarn.app.container.log.filesize=0 -Dhadoop.root.logger=INFO,CLA -Dhadoop.root.logfile=syslog -Xmx1024m org.apache.hadoop.mapreduce.v2.app.MRAppMaster 1><LOG_DIR>/stdout 2><LOG_DIR>/stderr "application_ACLs {accessType: APPACCESS_VIEW_APPacl: " "}application_ACLs {accessType: APPACCESS_MODIFY_APPacl: " "}}cancel_tokens_when_complete: truemaxAppAttempts: 2resource {memory: 1536virtual_cores: 1}applicationType: "MAPREDUCE" }

總結

以上是生活随笔為你收集整理的MapReduce Job集群提交过程源码跟踪及分析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。