`
x-rip
  • 浏览: 105339 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

由ApplicationMaster启动一个Container的步骤

 
阅读更多

 

 

1. 申请Container

1) 连接ResourceManager

		Configuration conf = new Configuration();
		YarnRPC rpc = YarnRPC.create(conf);
		YarnConfiguration yarnConf = new YarnConfiguration(conf);
		// 获取ResourceManager的地址
		InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
			YarnConfiguration.RM_SCHEDULER_ADDRESS,
			YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
		AMRMProtocol resourceManager = ((AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf));	
    

 

2) 向ResourceManager注册为ApplicationMaster

		RegisterApplicationMasterRequest appMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class);
		// 设定该Application的相关信息
		appMasterRequest.setApplicationAttemptId(appAttemptID);
		appMasterRequest.setHost(appMasterHostname);
		appMasterRequest.setRpcPort(appMasterRpcPort);
		appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
		RegisterApplicationMasterResponse response = resourceManager.registerApplicationMaster(appMasterRequest);
 

 

3) 向ResourceManager申请Container

		// 初始化申请Container的request (包含运行Container的host、优先级、占用内存)
		ResourceRequest request = Records.newRecord(ResourceRequest.class);
		request.setHostName("*");
		request.setNumContainers(numContainers);
		Priority pri = Records.newRecord(Priority.class);
		pri.setPriority(requestPriority);
		request.setPriority(pri);
		Resource capability = Records.newRecord(Resource.class);
		capability.setMemory(containerMemory);
		request.setCapability(capability);
		List<ResourceRequest> resourceReq = new ArrayList<ResourceRequest>();
		resourceReq.add(request);
		// 向ResourceManager发送报告 (包含第几次申请、需要申请Container的request、保存需要释放Container的List、已分配Container与总共Container的比例)
		AllocateRequest req = Records.newRecord(AllocateRequest.class);
		CopyOnWriteArrayList<ContainerId> releasedContainers = new CopyOnWriteArrayList<ContainerId
		AtomicInteger rmRequestID = new AtomicInteger();
		req.setResponseId(rmRequestID.incrementAndGet());
		req.setApplicationAttemptId(appAttemptID);
		req.addAllAsks(resourceReq);
		req.addAllReleases(releasedContainers);
		req.setProgress((float)numCompletedContainers.get()/numTotalContainers);
		AllocateResponse resp = resourceManager.allocate(req);
		AMResponse amResp = resp.getAMResponse();
   

 

2. 为申请到的Container分配任务

1) 获取上面申请到的Container

 

		List<Container> allocatedContainers = amResp.getAllocatedContainers();
		for (Container allocatedContainer : allocatedContainers) {
			
		}
 

2) 初始化运行Container的上下文 (

ContainerId

User:运行该Container的用户,即运行当前Application的用户

Resource:ResourceManager分配给该Container的资源

ContainerToken:Security模式下的SecurityTokens

LocalResources:该Container所运行的程序所需的资源,比如程序所在的jar包

ServiceData:

Environment:该Container所运行的程序所需的环境变量,KeyValue格式

Commands:该Container所运行程序的命令,比如运行的为java程序,即$JAVA_HOME/bin/java org.yourclass

ApplicationACLs:该Container所属的Application的访问控制列表

)

 

		ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
		ctx.setContainerId(container.getId());
		ctx.setResource(container.getResource());
		ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
		
		Map<String, String> childEnv = new HashMap<String, String>();
		ctx.setEnvironment(childEnv);
		
		// 设定LocalResource
		// 将jar包上传到HDFS上
		FileSystem fs = FileSystem.get(conf);
		Path src = new Path(存放Container程序的本地路径);
		String pathSuffix = appName + "/" + appId.getId() + "/ChildProgram.jar";	    
		Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
		fs.copyFromLocalFile(false, true, src, dst);
		FileStatus destStatus = fs.getFileStatus(dst);
		
		Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
		LocalResource childRsrc = Records.newRecord(LocalResource.class);
        childRsrc.setType(LocalResourceType.FILE);
        childRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
	    childRsrc.setTimestamp(destStatus.getModificationTime());
		childRsrc.setSize(destStatus.getLen());	
        childRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(dst)));
		localResources.put("Child.jar",  amJarRsrc);
		ctx.setLocalResources(localResources);
		
		// 设定Command
		Vector<CharSequence> vargs = new Vector<CharSequence>(5); 
		vargs.add(JavaCommand);
		vargs.add(JavaArgs);
		vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
		vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
		StringBuilder command = new StringBuilder();
		for (CharSequence str : vargs) {
			command.append(str).append(" ");
		}
		List<String> commands = new ArrayList<String>();
		commands.add(command.toString());
		ctx.setCommands(commands);
		
		StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class);
		startReq.setContainerLaunchContext(ctx);
 

3) 连接该Container属于的ContainerManager

 

		String cmIpPortStr = container.getNodeId().getHost() + ":"
			+ container.getNodeId().getPort();
        InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
		ContainerManager cm = ((ContainerManager) rpc.getProxy(ContainerManager.class, cmAddress, conf));
 

4) 通过ContainerManager启动Container

 

		cm.startContainer(startReq);
 

3. 轮询获取Container的状态

1) 向ContainerManager获取Container的状态

 

		GetContainerStatusRequest statusReq = Records.newRecord(GetContainerStatusRequest.class);
		statusReq.setContainerId(container.getId());
        GetContainerStatusResponse statusResp = cm.getContainerStatus(statusReq);
		ContainerStatus containerStatus = statusResp.getStatus();
 

4. 更新Application状态

1) 在所有Container运行成功/失败后通知ResourceManager该Application运行成功/失败

 

		FinishApplicationMasterRequest finishReq = Records.newRecord(FinishApplicationMasterRequest.class);
		finishReq.setAppAttemptId(appAttemptID);
		finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
		// finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED);
		finishReq.setDiagnostics(diagnostics);
		resourceManager.finishApplicationMaster(finishReq);
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics