`
x-rip
  • 浏览: 105345 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

由Client启动ApplicationMaster的步骤

 
阅读更多

 

1. 申请Application

1) 连接ResourceManager

		YarnConfiguration yarnConf = new YarnConfiguration(conf);
		InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
			YarnConfiguration.RM_ADDRESS,
			YarnConfiguration.DEFAULT_RM_ADDRESS));		
		LOG.info("Connecting to ResourceManager at "  rmAddress);
		ClientRMProtocol applicationsManager = ((ClientRMProtocol) rpc.getProxy(
			ClientRMProtocol.class, rmAddress, conf));
 

2) 向ResourceManager申请一个Application

		    GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class);		
			GetNewApplicationResponse response = applicationsManager.getNewApplication(request);
			LOG.info("Got new application id="  response.getApplicationId());
 

2. 初始化ApplicationMaster的上下文(

ApplicationId

ApplicationName

Queue:Application将被提交到的队列

Priority:Application的优先级

User:运行该Application的用户

AMContainerSpec:运行ApplicationMaster的Container的信息

)

1) 设置Application属性

		ApplicationId appId = response.getApplicationId();
		ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
		appContext.setApplicationId(appId);
		appContext.setApplicationName(appName);
 

2) 设置运行ApplicationMaster的Container的上下文

		ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
		
		// 上传包含ApplicationMaster的jar包
		FileSystem fs = FileSystem.get(conf);
		Path src = new Path(appMasterJar);
		String pathSuffix = appName  "/"  appId.getId()  "/AppMaster.jar";	    
		Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
		fs.copyFromLocalFile(false, true, src, dst);
		FileStatus destStatus = fs.getFileStatus(dst);
		
		// 上传ApplicationMaster的log4j配置文件
		Path log4jSrc = new Path(log4jPropFile);
		Path log4jDst = new Path(fs.getHomeDirectory(), "log4j.props");
		fs.copyFromLocalFile(false, true, log4jSrc, log4jDst);
		FileStatus log4jFileStatus = fs.getFileStatus(log4jDst);
		
		// 设置LocalResources
		LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
		amJarRsrc.setType(LocalResourceType.FILE);
		amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);	   
		amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst)); 
		amJarRsrc.setTimestamp(destStatus.getModificationTime());
		amJarRsrc.setSize(destStatus.getLen());
		Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
		localResources.put("AppMaster.jar",  amJarRsrc);
		
		LocalResource log4jRsrc = Records.newRecord(LocalResource.class);
		log4jRsrc.setType(LocalResourceType.FILE);
		log4jRsrc.setVisibility(LocalResourceVisibility.APPLICATION);	   
		log4jRsrc.setResource(ConverterUtils.getYarnUrlFromURI(log4jDst.toUri()));
		log4jRsrc.setTimestamp(log4jFileStatus.getModificationTime());
		log4jRsrc.setSize(log4jFileStatus.getLen());
		localResources.put("log4j.properties", log4jRsrc);
		
		amContainer.setLocalResources(localResources);
		
		// 设置运行ApplicationMaster的环境变量
		StringBuilder classPathEnv = new StringBuilder("${CLASSPATH}:./*");  
		for (String c : conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH).split(",")) {  
			classPathEnv.append(':');  
			classPathEnv.append(c.trim());  
		}  
		classPathEnv.append(":./log4j.properties");
		Map<String, String> env = new HashMap<String, String>();
		env.put("CLASSPATH", classPathEnv.toString());
		amContainer.setEnvironment(env);
		
		// 设置运行ApplicationMaster的命令
		Vector<CharSequence> vargs = new Vector<CharSequence>(30);
		vargs.add("${JAVA_HOME}" + "/bin/java");
		vargs.add("-Xmx" + amMemory + "m");
		vargs.add(appMasterMainClass);
		vargs.add(some args);
		vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
		vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
		StringBuilder command = new StringBuilder();
		for (CharSequence str : vargs) {
		  command.append(str).append(" ");
		} 
		List<String> commands = new ArrayList<String>();
		commands.add(command.toString());		
		amContainer.setCommands(commands);
		
		// 设置ApplicationMaster运行所需的内存
		Resource capability = Records.newRecord(Resource.class);
		capability.setMemory(amMemory);
		amContainer.setResource(capability);
		
		appContext.setAMContainerSpec(amContainer);
 

3) 设置优先级、队列、用户等

		Priority pri = Records.newRecord(Priority.class);
		pri.setPriority(amPriority);
		appContext.setPriority(pri);
		appContext.setQueue(amQueue);
		appContext.setUser(amUser);
 

3. 创建运行application的request,并提交

	SubmitApplicationRequest appRequest = Records.newRecord(SubmitApplicationRequest.class);
    appRequest.setApplicationSubmissionContext(appContext);
	applicationsManager.submitApplication(appRequest);
 

4. 向ResourceManager轮训Application的状态

1) 构建获取Application状态的request并提交

		GetApplicationReportRequest reportRequest = Records.newRecord(GetApplicationReportRequest.class);
		reportRequest.setApplicationId(appId);
		GetApplicationReportResponse reportResponse = applicationsManager.getApplicationReport(reportRequest);
		ApplicationReport report = reportResponse.getApplicationReport();
 

2) 获取Application的相关信息

		LOG.info("Got application report from ASM for"
			+ ", appId=" + appId.getId()
			+ ", clientToken=" + report.getClientToken()
			+ ", appDiagnostics=" + report.getDiagnostics()
			+ ", appMasterHost=" + report.getHost()
			+ ", appQueue=" + report.getQueue()
			+ ", appMasterRpcPort=" + report.getRpcPort()
			+ ", appStartTime=" + report.getStartTime()
			+ ", yarnAppState=" + report.getYarnApplicationState().toString()
			+ ", distributedFinalState=" + report.getFinalApplicationStatus().toString()
			+ ", appTrackingUrl=" + report.getTrackingUrl()
			+ ", appUser=" + report.getUser());
		YarnApplicationState state = report.getYarnApplicationState();
		FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
 
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics