hadoop临时文件 jar包_hadoop之Mapper/reducer源码分析之二

若当前JobClient (0.22 hadoop) 运行在YARN.则job提交任务运行在YARNRunner

Hadoop Yarn 框架原理及运作机制

26ba9d71df83d83ed2c6e0e560d63518.png

主要步骤

  • 作业提交
  • 作业初始化
  • 资源申请与任务分配
  • 任务执行

具体步骤

在运行作业之前,Resource Manager和Node Manager都已经启动,所以在上图中,Resource Manager进程和Node Manager进程不需要启动

  • 1. 客户端进程通过runJob(实际中一般使用waitForCompletion提交作业)在客户端提交Map Reduce作业(在Yarn中,作业一般称为Application应用程序)
  • 2. 客户端向Resource Manager申请应用程序ID(application id),作为本次作业的唯一标识
  • 3. 客户端程序将作业相关的文件(通常是指作业本身的jar包以及这个jar包依赖的第三方的jar),保存到HDFS上。也就是说Yarn based MR通过HDFS共享程序的jar包,供Task进程读取
  • 4. 客户端通过runJob向ResourceManager提交应用程序
  • 5.a/5.b. Resource Manager收到来自客户端的提交作业请求后,将请求转发给作业调度组件(Scheduler),Scheduler分配一个Container,然后Resource Manager在这个Container中启动Application Master进程,并交由Node Manager对Application Master进程进行管理
  • 6. Application Master初始化作业(应用程序),初始化动作包括创建监听对象以监听作业的执行情况,包括监听任务汇报的任务执行进度以及是否完成(不同的计算框架为集成到YARN资源调度框架中,都要提供不同的ApplicationMaster,比如Spark、Storm框架为了运行在Yarn之上,它们都提供了ApplicationMaster)
  • 7. Application Master根据作业代码中指定的数据地址(数据源一般来自HDFS)进行数据分片,以确定Mapper任务数,具体每个Mapper任务发往哪个计算节点,Hadoop会考虑数据本地性,本地数据本地性、本机架数据本地性以及最后跨机架数据本地性)。同时还会计算Reduce任务数,Reduce任务数是在程序代码中指定的,通过job.setNumReduceTask显式指定的
  • 8.如下几点是Application Master向Resource Manager申请资源的细节
  • 8.1 Application Master根据数据分片确定的Mapper任务数以及Reducer任务数向Resource Manager申请计算资源(计算资源主要指的是内存和CPU,在Hadoop Yarn中,使用Container这个概念来描述计算单位,即计算资源是以Container为单位的,一个Container包含一定数量的内存和CPU内核数)。
  • 8.2 Application Master是通过向Resource Manager发送Heart Beat心跳包进行资源申请的,申请时,请求中还会携带任务的数据本地性等信息,使得Resource Manager在分配资源时,不同的Task能够分配到的计算资源尽可能满足数据本地性
  • 8.3 Application Master向Resource Manager资源申请时,还会携带内存数量信息,默认情况下,Map任务和Reduce任务都会分陪1G内存,这个值是可以通过参数mapreduce.map.memory.mb and mapreduce.reduce.memory.mb进行修改。

5. YARNRunner

@Override public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException {  addHistoryToken(ts);  // Construct necessary information to start the MR AM ApplicationSubmissionContext appContext = createApplicationSubmissionContext(conf, jobSubmitDir, ts); // Submit to ResourceManager try { ApplicationId applicationId = resMgrDelegate.submitApplication(appContext); ApplicationReport appMaster = resMgrDelegate .getApplicationReport(applicationId); String diagnostics = (appMaster == null ? "application report is null" : appMaster.getDiagnostics()); if (appMaster == null || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) { throw new IOException("Failed to run job : " + diagnostics); } return clientCache.getClient(jobId).getJobStatus(jobId); } catch (YarnException e) { throw new IOException(e); } }

调用YarnClient的submitApplication()方法,其实现如下: 

6. YarnClientImpl

@Override public ApplicationId submitApplication(ApplicationSubmissionContext appContext) throws YarnException, IOException { ApplicationId applicationId = appContext.getApplicationId(); if (applicationId == null) { throw new ApplicationIdNotProvidedException( "ApplicationId is not provided in ApplicationSubmissionContext"); } SubmitApplicationRequest request = Records.newRecord(SubmitApplicationRequest.class); request.setApplicationSubmissionContext(appContext); // Automatically add the timeline DT into the CLC // Only when the security and the timeline service are both enabled if (isSecurityEnabled() && timelineServiceEnabled) { addTimelineDelegationToken(appContext.getAMContainerSpec()); } //TODO: YARN-1763:Handle RM failovers during the submitApplication call. rmClient.submitApplication(request); int pollCount = 0; long startTime = System.currentTimeMillis(); EnumSet waitingStates =  EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING, YarnApplicationState.SUBMITTED); EnumSet failToSubmitStates =  EnumSet.of(YarnApplicationState.FAILED, YarnApplicationState.KILLED);  while (true) { try { ApplicationReport appReport = getApplicationReport(applicationId); YarnApplicationState state = appReport.getYarnApplicationState(); if (!waitingStates.contains(state)) { if(failToSubmitStates.contains(state)) { throw new YarnException("Failed to submit " + applicationId +  " to YARN : " + appReport.getDiagnostics()); } LOG.info("Submitted application " + applicationId); break; } long elapsedMillis = System.currentTimeMillis() - startTime; if (enforceAsyncAPITimeout() && elapsedMillis >= asyncApiPollTimeoutMillis) { throw new YarnException("Timed out while waiting for application " + applicationId + " to be submitted successfully"); } // Notify the client through the log every 10 poll, in case the client // is blocked here too long. if (++pollCount % 10 == 0) { LOG.info("Application submission is not finished, " + "submitted application " + applicationId + " is still in " + state); } try { Thread.sleep(submitPollIntervalMillis); } catch (InterruptedException ie) { LOG.error("Interrupted while waiting for application " + applicationId + " to be successfully submitted."); } } catch (ApplicationNotFoundException ex) { // FailOver or RM restart happens before RMStateStore saves // ApplicationState LOG.info("Re-submit application " + applicationId + "with the " + "same ApplicationSubmissionContext"); rmClient.submitApplication(request); } } return applicationId; }

7. ClientRMService

ClientRMService是resource manager的客户端接口。这个模块处理从客户端到resource mananger的rpc接口。

@Override public SubmitApplicationResponse submitApplication( SubmitApplicationRequest request) throws YarnException { ApplicationSubmissionContext submissionContext = request .getApplicationSubmissionContext(); ApplicationId applicationId = submissionContext.getApplicationId(); // ApplicationSubmissionContext needs to be validated for safety - only // those fields that are independent of the RM's configuration will be // checked here, those that are dependent on RM configuration are validated // in RMAppManager. String user = null; try { // Safety user = UserGroupInformation.getCurrentUser().getShortUserName(); } catch (IOException ie) { LOG.warn("Unable to get the current user.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/538839.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python 编码文件json.loads json.dumps

python 编码文件json.loads json.dumps import yaml d {name: 张三, age: 1} print d jd json.dumps(d, ensure_asciiFalse, encodingutf-8)) ud json.loads(jd, encodingutf-8) print ud ud yaml.safe_load(jd, encodingutf-8) print udposted on 2018-04-23 15:18 秦瑞It…

getActionBar()报空指针异常

调用 getActionBar()的Activity类 public class WlanListActivity extends AppCompatActivity 在使用getActionBar("标题内容")的时候报空指针。 原因是要用AppCompatActivity类里的getSupportActionBar()

红黑树与平衡二叉树_百图详解红黑树,想不理解都难

之前在公司组内分享了红黑树的工作原理,今天把它整理下发出来,希望能对大家有所帮助,对自己也算是一个知识点的总结。这篇文章算是我写博客写公众号以来画图最多的一篇文章了,没有之一,我希望尽可能多地用图片来形象地…

android:showAsAction 无效

我想要的效果 但actionbar上的搜索菜单不显示 在androidstudio里,android:showAsAction"always"标红 根据提示,需要加入 xmlns:app"http://schemas.android.com/apk/res-auto" 加入后依然无效 正确的加入方式是:

Exchange_Server_2013在Windows_2008_R2部署

Exchange Server 2013可以部署在Windows Server 2012的平台,也可以部署在Windows Server 2008 R2的平台。如果部署在Windows Server 2008 R2平台要求操作系统版本为Windows Server 2008 R2 SP1的版本。如下拓扑图:在本架构中有两台服务器,都安…

建立副本名称冲突_包的建立(一)

这次的内容,涉及到 R 语言包的建立。事实上,CRAN 提供的官方参考指南,并不适合快速阅读,且内容繁杂。比较适合作为后期提高的 教材。而 http://r-pkgs.had.co.nz/ 上 的教程则更适合作为 R 包编写的帮助指南。这里,仅仅…

Android 多选列表

原文&#xff1a;http://blog.csdn.net/wljun739/article/details/37655209 点击阅读原文 ----------------------------------------------------------- 1、activity_main.xml[java] view plaincopy<LinearLayout xmlns:android"http://schemas.android.com/apk/res/…

python自带的编辑器怎么换行_Python3基础 print 自带换行功能

镇场诗&#xff1a; ———大梦谁觉&#xff0c;水月中建博客。百千磨难&#xff0c;才知世事无常。 ———今持佛语&#xff0c;技术无量愿学。愿尽所学&#xff0c;铸一良心博客。 —————————————————————————————————————————— 1 …

leetcode 回文数

2019独角兽企业重金招聘Python工程师标准>>> 判断一个整数是否是回文数。回文数是指正序&#xff08;从左向右&#xff09;和倒序&#xff08;从右向左&#xff09;读都是一样的整数。 示例 1: 输入: 121 输出: true 示例 2: 输入: -121 输出: false 解释: 从左向右…

Hadoop控制输出文件命名

原文地址&#xff1a;http://blog.csdn.net/zuochanxiaoheshang/article/details/8769198 点击阅读原文 --------------------------------------------------- Hadoop 控制输出文件命名 在一般情况下&#xff0c;Hadoop 每一个 Reducer 产生一个输出文件&#xff0c;文件以 …

前端路由的两种实现原理

2019独角兽企业重金招聘Python工程师标准>>> History API 这里不细说每一个 API 的用法&#xff0c;大家可以看 MDN 的文档&#xff1a;https://developer.mozilla.org... 重点说其中的两个新增的API history.pushState 和 history.replaceState 这两个 API 都接收三…

3.JDK和JRE和JVM的区别

JDK --Java Development Kit --java 开发工具包 JRE --Java Runtime Environment --java运行时环境 JVM --Java Virtual Machine --java虚拟机 ------------- 更多的Java&#xff0c;Angular&#xff0c;Android&#xff0c;大数据&#xff0c;J2EE&#xff0c;Python…

缓存cache

由于Django是动态网站&#xff0c;所有每次请求均会去数据进行相应的操作&#xff0c;当程序访问量大时&#xff0c;耗时必然会更加明显&#xff0c;最简单解决方式是使用&#xff1a;缓存&#xff0c;缓存将一个某个views的返回值保存至内存或者memcache中&#xff0c;5分钟内…

4.JVM简述

JVM是一种规范。 就是一个虚拟的用于执行bytecodes字节码的计算机 可以用软件来实现&#xff0c;如IBM,SUN,BEA等按照这个规范实现&#xff0c;可以实现比SUN公司更好的JVM&#xff0c;我们自己也可以实现一个。 可以使用硬件来实现&#xff0c;如sun与intel公司研发java的芯…

5.JDK环境配置

下载 进入Oracle官网下载&#xff0c;点击进入 安装 一路下一步。记住安装到哪里了。 配置环境变量 JAVA_HOME 刚才的java安装目录 PATH %JAVA_HOME%\bin PATH里配置多个用英文的分号; 分隔。 *classpath&#xff0c;jdk5.0以上可以不用配置了 测试 windows下&#xf…

6.第一个程序Hello World

新建文件夹 在C盘新建个文件夹 mycode。注意不要用中文。 新建java文件 1、显示隐藏文件名。 2、右键新建文本文件 3、重命名为 Welcome.java。&#xff08;首字母必须大写。如果不显示隐藏文件名&#xff0c;会是Welcome.java.txt不是java文件&#xff09; 4、编写代码 p…

pythonstdin_python 笔试输入:sys.stdin.readline和input

①&#xff1a;输入一行数据并输 出两种方法 # 输入一行数据并输出 import sys # 方法一&#xff1a; str1 input() print(input 输入:,str1,len,len(str1)) print(循环遍历输入得到输入的每个字符的ascii码如下&#xff1a;) for i in str1: print(ord(i)) # 方法二&#xff…

8.对Hello World程序的深入

Welcome.java public class Welcome{public static void main(String[] args){System.out.println("Hello World,I am Java!");}}1、Java对大小写敏感。如果出现了大小写拼写错误&#xff0c;程序无法运行。 关键字class表明Java程序中的全部内容都包含在类中&…

python整数类型没有取值范围限制_详解Python中6种数据类型

Python中数据类型主要有六种&#xff1a;数字类型&#xff0c;字符串类型&#xff0c;元组类型&#xff0c;列表类型&#xff0c;文件类型和字典类型&#xff0c;我们今天先介绍前四种类型。 假如在Python程序中&#xff0c;出现了“010”&#xff0c;那么这个“010”到底是什么…

python爬虫的用途_python爬虫用途

广告关闭 腾讯云11.11云上盛惠 &#xff0c;精选热门产品助力上云&#xff0c;云服务器首年88元起&#xff0c;买的越多返的越多&#xff0c;最高返5000元&#xff01;专业点来说就是应用多台机器同时实现爬虫任务&#xff0c;这多台机器上的爬虫&#xff0c;就是称作分布式爬虫…