Yarn/MRv2中MapReduce的启动过程之Client端
Hadoop版本0.23.1
Shell端
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-0.23.1.jar wordcount input output
Client端
1、 bin/hadoop文件
(该文件主要用于解析hadoop的命令参数,并传给相应的Java类进行处理,其中与运行WordCount相关代码如下)
#将第一个参数即字符串jar传给参数COMMAND
COMMAND=$1
#判断参数COMMAND的值,如果是jar,则将参数CLASS设为org.apache.hadoop.util.RunJar
elif [ "$COMMAND" = "jar" ] ; then
CLASS=org.apache.hadoop.util.RunJar
#执行java命令,相当于$JAVA_HOME/bin/java org.apache.hadoop.util.RunJar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-0.23.1.jar wordcount input output
exec "$JAVA" $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@"
2、 RunJar.java
(该java文件用于加载参数传递过来的jar包并执行,相关代码如下)
int firstArg = 0;
//初始化加载jar包的参数,注意在这里fileName值为args[0],++操作先赋值后递增
String fileName = args[firstArg++];
File file = new File(fileName);
String mainClassName = null;
JarFile jarFile;
try {
jarFile = new JarFile(fileName);
} catch(IOException io) {
throw new IOException("Error opening job jar: " + fileName)
.initCause(io);
}
/*获取jar包的mainClassName,用WinRAR打开hadoop-mapreduce-examples-0.23.1.jar,在META-INF目录下的MANIFEST.MF文件中可以看到Main-Class: org.apache.hadoop.examples.ExampleDriver,这是在打包时生成的。定义这个class在pom.xml中,代码如下
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>org.apache.hadoop.examples.ExampleDriver</mainClass>
</manifest>
</archive>
</configuration>
</plugin>*/
Manifest manifest = jarFile.getManifest();
if (manifest != null) {
mainClassName = manifest.getMainAttributes().getValue("Main-Class");
}
jarFile.close();
if (mainClassName == null) {
if (args.length < 2) {
System.err.println(usage);
System.exit(-1);
}
mainClassName = args[firstArg++];
}
mainClassName = mainClassName.replaceAll("/", ".");
File tmpDir = new File(new Configuration().get("hadoop.tmp.dir"));
ensureDirectory(tmpDir);
//创建jar包运行的临时目录
final File workDir;
try {
workDir = File.createTempFile("hadoop-unjar", "", tmpDir);
} catch (IOException ioe) {
// If user has insufficient perms to write to tmpDir, default
// "Permission denied" message doesn't specify a filename.
System.err.println("Error creating temp dir in hadoop.tmp.dir "
+ tmpDir + " due to " + ioe.getMessage());
System.exit(-1);
return;
}
if (!workDir.delete()) {
System.err.println("Delete failed for " + workDir);
System.exit(-1);
}
ensureDirectory(workDir);
//添加运行结束后执行hook,用于删除临时文件
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
FileUtil.fullyDelete(workDir);
}
});
unJar(file, workDir);
//初始化CLASSPATH
ArrayList<URL> classPath = new ArrayList<URL>();
classPath.add(new File(workDir+"/").toURI().toURL());
classPath.add(file.toURI().toURL());
classPath.add(new File(workDir, "classes/").toURI().toURL());
File[] libs = new File(workDir, "lib").listFiles();
if (libs != null) {
for (int i = 0; i < libs.length; i++) {
classPath.add(libs[i].toURI().toURL());
}
}
ClassLoader loader =
new URLClassLoader(classPath.toArray(new URL[0]));
//利用反射加载jar包中的mainclass
Thread.currentThread().setContextClassLoader(loader);
Class<?> mainClass = Class.forName(mainClassName, true, loader);
Method main = mainClass.getMethod("main", new Class[] {
Array.newInstance(String.class, 0).getClass()
});
String[] newArgs = Arrays.asList(args)
.subList(firstArg, args.length).toArray(new String[0]);
try {
main.invoke(null, new Object[] { newArgs });
} catch (InvocationTargetException e) {
throw e.getTargetException();
}
}
3、 ExampleDriver.java
(在执行wordcount时,命令中并没有执行wordcount的类,只有一个字符串“wordcount”,ExampleDriver就是将这个字符串解析成对应的类,并通过ProgramDriver调用,相关代码如下)
//初始化ProgramDriver,并添加wordcount和其对应的类
ProgramDriver pgd = new ProgramDriver();
try {
pgd.addClass("wordcount", WordCount.class,
"A map/reduce program that counts the words in the input files.");
…
//执行传递进来的参数,即wordcount
exitCode = pgd.driver(argv);
}
catch(Throwable e){
e.printStackTrace();
}
4、 ProgramDriver.java
(wordcount被传递给driver,在这里将真正执行WordCount.class)
public int driver(String[] args)
throws Throwable {
…
//通过参数wordcount获取封装了WordCount.class的ProgramDescription
ProgramDescription pgm = programs.get(args[0]);
if (pgm == null) {
System.out.println("Unknown program '" + args[0] + "' chosen.");
printUsage(programs);
return -1;
}
//通过反射调用WordCount.class的main方法
// Remove the leading argument and call main
String[] new_args = new String[args.length - 1];
for(int i=1; i < args.length; ++i) {
new_args[i-1] = args[i];
}
pgm.invoke(new_args);
return 0;
}
5、 WordCount.java
(WordCount没什么好说的,初始化job的一些参数,提交job)
public static void main(String[] args) throws Exception {
…
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
//在这里通过waitForCompletion(true)提交Job
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
6、 之后,WordCount将在Job中通过JobSubmitter提交到实现了ClientProtocol协议的类去真正提交Job。
分享到:
相关推荐
YARN(MRv2)搭建
yarn-v0.23.2.tar.gz 在安装ambari,源码编译的时候下载的文件有问题 手动下载 地址 https://github.com/yarnpkg/yarn/releases/download/v0.23.2/yarn-v0.23.2.tar.gz
■ 计算框架在Hadoop 中的作用 ■ YARN 的设计目的和基本架构 ■ MapReduce 概念 ■ Apache Spark 概念 ■ YARN 如何分配集群资源 ■ YARN 如何处理故障 ■ 如何查看和管理YARN 应用程序 ■ 如何访问YARN ...
脚本功能:启动集群 前提:配置好执行脚本的主机到其他主机的ssh登录 脚本使用:vim编辑脚本,按照自己的配置修改主机号,我的是hadoop1、2是NN;hadoop2、3是Spark Master;hadoop3还是RM;hadoop4、5、6是DN、NM、...
23、hadoop集群中yarn运行mapreduce的内存、CPU分配调度计算与优化 网址:https://blog.csdn.net/chenwewi520feng/article/details/130457270 本文介绍在hadoop集群中,不适用默认的参数情况下,yarn的cpu和内容配置...
赠送jar包:hadoop-yarn-client-2.6.5.jar; 赠送原API文档:hadoop-yarn-client-2.6.5-javadoc.jar; 赠送源代码:hadoop-yarn-client-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-yarn-client-2.6.5.pom;...
YARN中运行Mapreduce程序 Yarn作业运行流程 1 提交WordCount程序 2 目 录 一、Yarn作业运行流程 首先我们来讨论一下一个作业是如何在Yarn中运行的。 1、MapReduce2中的作业提交是使用与MapReduce1相同的用户API。 2...
YARN配置、启动与验证 YARN配置、启动与验证 序号 任务名称 任务一 YARN组件参数配置 任务二 MapReduce组件参数配置 任务三 配置SSH无密钥登录(slave1为主节点) 任务四 分发YARN与MapReduce配置文件 任务五 启动...
flink-hadoop-compatibility_2.12-1.7.1.jar javax.ws.rs-api-2.0.1.jar jersey-common-2.27.jar ...Exception in thread "main" java.lang.NoClassDefFoundError: com/sun/jersey/api/client/config/ClientConfig
赠送jar包:hadoop-mapreduce-client-common-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-common-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-common-2.6.5-sources.jar; 赠送Maven依赖信息...
赠送jar包:hadoop-yarn-client-2.5.1.jar; 赠送原API文档:hadoop-yarn-client-2.5.1-javadoc.jar; 赠送源代码:hadoop-yarn-client-2.5.1-sources.jar; 赠送Maven依赖信息文件:hadoop-yarn-client-2.5.1.pom;...
用戶編寫的MapReduce程序通過Client提交到JobTracker端;同時,用戶可通過Client提供的一些接口查看作業運行狀態。在Hadoop內部用“作業” (Job)表示MapReduce程序。每一個Job都會在用戶端通過Client類將應用程序...
大数据技术之Hadoop(MapReduce&Yarn).docx详细文档
赠送jar包:hadoop-yarn-client-2.7.3.jar; 赠送原API文档:hadoop-yarn-client-2.7.3-javadoc.jar; 赠送源代码:hadoop-yarn-client-2.7.3-sources.jar; 赠送Maven依赖信息文件:hadoop-yarn-client-2.7.3.pom;...
hadoop 搭建过程
bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar *** 输入文件目录 输出文件目录 *** 本地运行案例 bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar ...
赠送jar包:hadoop-yarn-client-2.6.5.jar; 赠送原API文档:hadoop-yarn-client-2.6.5-javadoc.jar; 赠送源代码:hadoop-yarn-client-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-yarn-client-2.6.5.pom;...
hadoop-annotations-3.1.1.jar hadoop-common-3.1.1.jar hadoop-mapreduce-client-core-3.1.1.jar hadoop-yarn-api-3.1.1.jar hadoop-auth-3.1.1.jar hadoop-hdfs-3.1.1.jar hadoop-mapreduce-client-hs-3.1.1.jar ...
当flink on yarn模式运行时,发生如下异常信息,需要将压缩包中的4个依赖jar包放入flink安装路径下的lib目录下。 Exception in thread "main" java.lang.NoClassDefFoundError: ...
Apache Hadoop™ YARN Moving beyond MapReduce and Batch Processing with Apache Hadoop