`
x-rip
  • 浏览: 105260 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

YARN/MRv2 ResourceManager端 源码分析1

阅读更多

2. ResourceManager端

Client端通过YarnRunner.submitJob()将Application提交给了ResourceManager。

连接Client与ResourceManager的协议为ClientRMProtocol,该协议的实现类为ClientRMService。

1) ClientRMService.java

Client端与ResourceManager交互的所有操作最终都是由ClientRMService中的操作实现的。以submitApplication()为例。

 

		  public SubmitApplicationResponse submitApplication(
			  SubmitApplicationRequest request) throws YarnRemoteException {
			// 获取Application的相关信息,包括上下文、id、用户
			ApplicationSubmissionContext submissionContext = request
				.getApplicationSubmissionContext();
			ApplicationId applicationId = submissionContext.getApplicationId();
			String user = submissionContext.getUser();
			try {
			  user = UserGroupInformation.getCurrentUser().getShortUserName();
			  if (rmContext.getRMApps().get(applicationId) != null) {
				throw new IOException("Application with id " + applicationId
					+ " is already present! Cannot add a duplicate!");
			  }
			  submissionContext.setUser(user);
			// 通过上下文构造RMAppManagerSubmitEvent,并调用RMAppManger的handle方法进行处理
			  rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext, System
				  .currentTimeMillis()));
			  LOG.info("Application with id " + applicationId.getId() + 
				  " submitted by user " + user + " with " + submissionContext);
			  RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
				  "ClientRMService", applicationId);
			} catch (IOException ie) {
			  ...
			}
			SubmitApplicationResponse response = recordFactory
				.newRecordInstance(SubmitApplicationResponse.class);
			return response;
		 }
 

 

  2) RMAppmanager.java

RMAppManager实现了EventHandler接口,代表该类是用于处理某种事件的

 

		  public void handle(RMAppManagerEvent event) {
			ApplicationId applicationId = event.getApplicationId();
			LOG.debug("RMAppManager processing event for " 
				+ applicationId + " of type " + event.getType());
			// 由event.getType()可以看出,该类用于处理Application的提交和完成事件
			switch(event.getType()) {
			  case APP_COMPLETED: 
			  {
				finishApplication(applicationId);
				ApplicationSummary.logAppSummary(
					rmContext.getRMApps().get(applicationId));
				checkAppNumCompletedLimit(); 
			  } 
			  break;
			  case APP_SUBMIT:
			  {
				ApplicationSubmissionContext submissionContext = 
					((RMAppManagerSubmitEvent)event).getSubmissionContext();
				long submitTime = ((RMAppManagerSubmitEvent)event).getSubmitTime();
				// 这里调用了submitApplication函数去向ResourceManager提交Job
				submitApplication(submissionContext, submitTime);
			  }
			  break;
			  default:
				LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
			  }
		  }
		  protected synchronized void submitApplication(
			  ApplicationSubmissionContext submissionContext, long submitTime) {
			ApplicationId applicationId = submissionContext.getApplicationId();
			RMApp application = null;
			try {
			// 从传进来的context中获取Application的相关参数,并对没有赋值的参数添加默认值
			  String clientTokenStr = null;
			  ...
			  ...
			// 存储Application的相关信息用于在Application出错或者挂掉时恢复
			  ApplicationStore appStore = rmContext.getApplicationsStore()
				  .createApplicationStore(submissionContext.getApplicationId(),
				  submissionContext);
			// 创建ResourceManager用于封装Application的RMAppImpl对象
			  application = new RMAppImpl(applicationId, rmContext,
				  this.conf, submissionContext.getApplicationName(), user,
				  submissionContext.getQueue(), submissionContext, clientTokenStr,
				  appStore, this.scheduler,
				  this.masterService, submitTime);
			// 判断是否重复提交相同的Application
			  if (rmContext.getRMApps().putIfAbsent(applicationId, application) != 
				  null) {
				String message = "Application with id " + applicationId
					+ " is already present! Cannot add a duplicate!";
				LOG.info(message);
				throw RPCUtil.getRemoteException(message);
			  } 

			// 通知ACLsManager
			  this.applicationACLsManager.addApplication(applicationId,
				  submissionContext.getAMContainerSpec().getApplicationACLs());

			// 安全令牌
			  if (UserGroupInformation.isSecurityEnabled()) {
				this.rmContext.getDelegationTokenRenewer().addApplication(
					applicationId,parseCredentials(submissionContext),
					submissionContext.getCancelTokensWhenComplete()
					);
			  }      
			// 向AsyncDispatcher发送RMAppEventType.START事件,ApplicationEventDispatcher接到AsyncDispatcher分发来的事件并交由RMAppImpl处理
			  this.rmContext.getDispatcher().getEventHandler().handle(
				  new RMAppEvent(applicationId, RMAppEventType.START));
			} catch (IOException ie) {
				LOG.info("RMAppManager submit application exception", ie);
				if (application != null) {
				// 发送RMAppRejectedEvent事件
				  this.rmContext.getDispatcher().getEventHandler().handle(
					  new RMAppRejectedEvent(applicationId, ie.getMessage()));
				}
			}
		  }
 

 

3) EventHandler

自此AsyncDispatcher将接管之后所有的事件分发,所有事件都将由AsyncDispatcher分发给对应的EventDispatcher。EventDispatcher会初始化处理该事件的类,并将事件交给创建的类来进行处理。以RMAppEventType.START事件为例,该类将分发给ApplicationEventDispatcher,然后由ApplicationEventDispatcher初始化RMApp的实现类RMAppImpl来处理。

 

		  public static final class ApplicationEventDispatcher implements
			  EventHandler<RMAppEvent> {
			private final RMContext rmContext;
			public ApplicationEventDispatcher(RMContext rmContext) {
			  this.rmContext = rmContext;
			}
			@Override
			public void handle(RMAppEvent event) {
			  ApplicationId appID = event.getApplicationId();
			// 初始化处理对应事件的类
			  RMApp rmApp = this.rmContext.getRMApps().get(appID);
			  if (rmApp != null) {
				try {
				// 将事件交由对应类处理
				  rmApp.handle(event);
				} catch (Throwable t) {
				  LOG.error("Error in handling event type " + event.getType()
					  + " for application " + appID, t);
				}
			  }
			}
		  }
 

 

 RMAppImpl.java

 

			  public void handle(RMAppEvent event) {
				// 为更新状态机加锁
				this.writeLock.lock();
				try {
				  ApplicationId appID = event.getApplicationId();
				  LOG.debug("Processing event for " + appID + " of type "
					  + event.getType());
				  final RMAppState oldState = getState();
				  try {
					// 由状态机处理该事件
					this.stateMachine.doTransition(event.getType(), event);
				  } catch (InvalidStateTransitonException e) {
					...
				  }
				} finally {
				// 解锁
				  this.writeLock.unlock();
				}
			  }
 

 

状态机的工作方式如下,以RMAppImpl中的状态机为例

 

		/* 泛型参数从左到右依次为
			执行状态变换的类、封装状态的类、封装事件类型的类、封装事件的类
		   构造函数参数为该状态机的起始状态 */
		  private static final StateMachineFactory<RMAppImpl,
								   RMAppState,
								   RMAppEventType,
								   RMAppEvent> stateMachineFactory
					   = new StateMachineFactory<RMAppImpl,
								   RMAppState,
								   RMAppEventType,
								   RMAppEvent>(RMAppState.NEW)
						/* 添加状态之间的变换以及变换时的需要进行的操作的封装类
						   以下即表示状态从RMAppState.NEW -> RMAppState.SUBMITTED
						   的触发事件类型为RMAppEventType.START事件
						   需要执行的方法被封装在StartAppAttemptTransition类中 */
						.addTransition(RMAppState.NEW, RMAppState.SUBMITTED,
							RMAppEventType.START, new StartAppAttemptTransition())
						...
						...
						// 构建状态机
						.installTopology();
		// 封装执行状态变化所需的方法的类需要实现SingleArcTransition接口,以StartAppAttemptTransition为例
		// 每个状态机都会对应一个实现了SingleArcTransition接口的类,在这里为RMAppTransition
		// StartAppAttemptTransition通过继承RMAppTransition并实现transition方法,在该方法中实现状态变化的处理逻辑
		  private static final class StartAppAttemptTransition extends RMAppTransition {
			public void transition(RMAppImpl app, RMAppEvent event) {
			  app.createNewAttempt();
			};
		  }
 

 

至此,一个典型的由状态机分发事件并进行处理的相关类介绍完毕。总结如下:

(1) AsyncDispatcher根据接收到的事件按它的类分发给相应的EventDispatcher

(2) EventDispatcher初始化处理该类事件的类A,并将事件传递给A

(3) A调用状态机的doTransition方法确定该事件类型对应的状态变化和封装了需要执行的方法的类

(4) 实现了SingleArcTransition接口的类,将调用transition方法完成状态变换

由于状态机处理代码大都相同,以下将以事件为标题来描述状态变换和涉及的类和操作

4) RMAppEventType.START

EventDispatcher: ApplicationEventDispatcher

事件处理类: RMAppImpl

状态更新: RMAppState.NEW -> RMAppState.SUBMITTED

所需操作: 创建RMAppAttemptImpl对象,初始化其状态为RMAppAttemptState.NEW

触发RMAppAttemptEventType.START事件

5) RMAppAttemptEventType.START

EventDispatcher: ApplicationAttemptEventDispatcher

事件处理类: RMAppAttemptImpl

状态更新: RMAppAttemptState.NEW -> RMAppAttemptState.SUBMITTED

所需操作: 向ApplicationMasterService注册该AppAttempt

触发AppAddedSchedulerEvent事件

6) AppAddedSchedulerEvent

EventDispatcher: SchedulerEventDispatcher

事件处理类: FifoScheduler/CapacityScheduler

所需操作: 创建SchedulerApp对象

触发RMAppAttemptEventType.APP_ACCEPTED事件

7) RMAppAttemptEventType.APP_ACCEPTED

EventDispatcher: ApplicationAttemptEventDispatcher

事件处理类: RMAppAttemptImpl

状态更新: RMAppAttemptState.SUBMITTED -> RMAppAttemptState.SCHEDULED

所需操作: 调用ResourceScheduler的allocate函数,向ResourceManager申请运行                                                             ApplicationMaster需要的Container

触发RMAppEventType.APP_ACCEPTED事件

8) RMAppEventType.APP_ACCEPTED

EventDispatcher: ApplicationEventDispatcher

事件处理类: RMAppImpl

状态更新: RMAppState.SUBMITTED -> RMAppState.ACCEPTED

9) 某个NodeManager向ResourceManager发送心跳

      10) ResourceManager的ResourceTrackerService收到心跳信息后触发封装了                                                            RMNodeEventType.STATUS_UPDATE的RMNodeStatusEvent事件

      11) RMNodeEventType.STATUS_UPDATE

EventDispatcher: ApplicationEventDispatcher

事件处理类: RMNodeImpl

状态更新: RMNodeState.RUNNING -> RMNodeState.RUNNING

所需操作: 更新节点的健康状态

触发NodeUpdateSchedulerEvent事件

      12) NodeUpdateSchedulerEvent

EventDispatcher: SchedulerEventDispatcher

事件处理类: FifoScheduler/CapacityScheduler

所需操作: 创建SchedulerApp对象,调用assginContainers为该application分配一个                                                         container,此时还未真正分配

触发RMContainerEventType.START事件

      13) RMContainerEventType.START

EventDispatcher: NodeEventDispatcher

事件处理类: RMContainerImpl

状态更新: RMContainerState.NEW -> RMContainerState.ALLOCATED

所需操作: 触发RMAppAttemptContainerAllocatedEvent

                                                (RMAppAttemptEventType.CONTAINER_ALLOCATED)事件

      14) RMAppAttemptEventType.CONTAINER_ALLOCATED

EventDispatcher: ApplicationAttemptEventDispatcher

事件处理类: RMAppAttemptImpl

状态更新: RMAppAttemptState.SCHEDULED -> RMAppAttemptState.ALLOCATED

所需操作: 调用Scheduler的allocate函数申请一个container

触发AMLauncherEventType.LAUNCH事件

      15) AMLauncherEventType.LAUNCH

事件处理类: ApplicationMasterLauncher

状态更新: RMAppAttemptState.SCHEDULED -> RMAppAttemptState.ALLOCATED

所需操作: 创建AMLauncher对象,并将其添加到队列masterEvents中;

LauncherThread不断从masterEvents取出,进行处理,并调用

                                                AMLauncher.launch()函数;

AMLancher.launch()调用ContainerManager.startContainer()函数创建

                                                container;

同时触发RMAppAttemptEventType.LAUNCHED事件。

      16) RMAppAttemptEventType.LAUNCHED

EventDispatcher: ApplicationAttemptEventDispatcher

事件处理类: RMAppAttemptImpl

状态更新: RMAppAttemptState.ALLOCATED -> RMAppAttemptState.LAUNCHED

所需操作: 向AMLivelinessMonitor注册,用于实时监控该Application的状态

      17) 第9)步中向ResourceManager发送心跳的NodeManager,调用

             AMRMProtocol.registerApplicationMaster()向ApplicationMasterService进行注册

      18) ApplicationMasterService.registerApplicationMaster()

从request中获取ApplicationAttempt的相关信息

触发RMAppAttemptEventType.REGISTERED事件

返回给调用端response

      19) RMAppAttemptEventType.REGISTERED

EventDispatcher: ApplicationAttemptEventDispatcher

事件处理类: RMAppAttemptImpl

状态更新: RMAppAttemptState.LAUNCHED -> RMAppAttemptState.RUNNING

所需操作: 设置Application注册后的信息,如运行的host、端口、TrackingURL等

触发RMAppEventType.ATTEMPT_REGISTERED事件

      20) 触发RMAppEventType.REGISTERED

EventDispatcher: ApplicationAttemptEventDispatcher

事件处理类: RMAppAttemptImpl

状态更新: RMAppAttemptState.LAUNCHED -> RMAppAttemptState.RUNNING

所需操作: 设置Application注册后的信息,如运行的host、端口、TrackingURL等

触发RMAppEventType.ATTEMPT_REGISTERED事件

      21) RMAppEventType.ATTEMPT_REGISTERED

EventDispatcher: ApplicationEventDispatcher

事件处理类: RMAppImpl

状态更新: RMAppState.ACCEPTED -> RMAppState.RUNNING   

至此,ApplicationMaster(MRAppMaster)创建完毕,之后Application的运行将由ApplicationMaster(MRAppMaster)接管,它将负责向ResourceManager申请运行子任务所需的资源,监控子任务的运行状态,并向ResourceManager汇报Application的运行状态


 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics