自己做网站的服务器深圳哪家做网站比较好
news/
2025/10/6 5:27:13/
文章来源:
自己做网站的服务器,深圳哪家做网站比较好,东阳海天建设集团网站,开发一个平台需要多少钱目录前言#xff1a;1、MapReduce原理2、mapreduce实践#xff08;WordCount实例#xff09; 目录
今天先总体说下MapReduce的相关知识#xff0c;后续将会详细说明对应的shuffle、mr与yarn的联系、以及mr的join操作的等知识。以下内容全是个人学习后的见解#xff0c;如…目录前言1、MapReduce原理2、mapreduce实践WordCount实例 目录
今天先总体说下MapReduce的相关知识后续将会详细说明对应的shuffle、mr与yarn的联系、以及mr的join操作的等知识。以下内容全是个人学习后的见解如有遗漏或不足请大家多多指教。
前言
为什么要MAPREDUCE 1海量数据在单机上处理因为硬件资源限制无法胜任 2而一旦将单机版程序扩展到集群来分布式运行将极大增加程序的复杂度和开发难度 3引入mapreduce框架后开发人员可以将绝大部分工作集中在业务逻辑的开发上而将分布式计算中的复杂性交由框架来处理。
设想一个海量数据场景下的wordcount需求 单机版内存受限磁盘受限运算能力受限分布式 1、文件分布式存储HDFS 2、运算逻辑需要至少分成2个阶段一个阶段独立并发一个阶段汇聚 3、运算程序如何分发 4、程序如何分配运算任务切片 5、两阶段的程序如何启动如何协调 6、整个程序运行过程中的监控容错重试
可见在程序由单机版扩成分布式时会引入大量的复杂工作。为了提高开发效率可以将分布式程序中的公共功能封装成框架让开发人员可以将精力集中于业务逻辑。
而mapreduce就是这样一个分布式程序的通用框架其应对以上问题的整体结构如下 1、MRAppMaster(mapreduce application master) 2、MapTask 3、ReduceTask
1、MapReduce原理
Mapreduce是一个分布式运算程序的编程框架是用户开发“基于hadoop的数据分析应用”的核心框架 Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序并发运行在一个hadoop集群上
Mapreduce框架结构及核心运行机制 1.1、结构 一个完整的mapreduce程序在分布式运行时有三类实例进程 1、MRAppMaster负责整个程序的过程调度及状态协调 2、mapTask负责map阶段的整个数据处理流程 3、ReduceTask负责reduce阶段的整个数据处理流程 1.2、mapreduce框架的设计思想 这里面有两个任务的分配过程1、总的任务切割分配给各个mapTask不同的mapTask再将得到的hashmap按照首字母划分分配给各个reduceTask。
1.3、mapreduce程序运行的整体流程wordcount运行过程的解析 流程解析 job.split负责任务的切分形成一个任务切片规划文件。 wc.jar要运行的jar包包含mapper、reducer、Driver等java类。 job.xml:job的其他配置信息:如指定map是哪个类reduce是那个类以及输入数据的路径在哪输出数据的路径在哪等配置信息。 前提客户端提交任务给yarn后(提交前会进行任务的规划)yarn利用ResouceManager去找到mrAppmaster. 1、 一个mr程序启动的时候最先启动的是MRAppMasterMRAppMaster启动后根据本次job的描述信息计算出需要的maptask实例数量然后向集群申请机器启动相应数量的maptask进程
2、 maptask进程启动之后根据给定的数据切片范围进行数据处理主体流程为 a) 利用客户指定的inputformat来获取RecordReader读取数据形成输入KV对框架干的事 b) 将输入KV对传递给客户定义的map()方法做逻辑运算并将map()方法输出的KV对收集到缓存 c) 将缓存中的KV对按照K分区排序后不断溢写到磁盘文件
3、 MRAppMaster监控到所有maptask进程任务完成之后会根据客户指定的参数启动相应数量的reducetask进程并告知reducetask进程要处理的数据范围数据分区
4、 Reducetask进程启动之后根据MRAppMaster告知的待处理数据所在位置从若干台maptask运行所在机器上获取到若干个maptask输出结果文件并在本地进行重新归并排序然后按照相同key的KV为一个组调用客户定义的reduce()方法进行逻辑运算并收集运算输出的结果KV然后调用客户指定的outputformat将结果数据输出到外部存储(对应的就是context.write方法
2、mapreduce实践WordCount实例
编程规范
1用户编写的程序分成三个部分MapperReducerDriver(提交运行mr程序的客户端)
2Mapper的输入数据是KV对的形式KV的类型可自定义
3Mapper的输出数据是KV对的形式KV的类型可自定义
4Mapper中的业务逻辑写在map()方法中
5map()方法maptask进程对每一个K,V调用一次
6Reducer的输入数据类型对应Mapper的输出数据类型也是KV
7Reducer的业务逻辑写在reduce()方法中
8Reducetask进程对每一组相同k的k,v组调用一次reduce()方法
9用户自定义的Mapper和Reducer都要继承各自的父类
10整个程序需要一个Drvier来进行提交提交的是一个描述了各种必要信息的job对象
WordCount程序 mapper类
package bigdata.mr.wcdemo;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
//map方法的生命周期 框架每传一行数据就被调用一次* KEYIN: 默认情况下是mr框架所读到的一行文本的起始偏移量Long,* 但是在hadoop中有自己的更精简的序列化接口所以不直接用Long而用LongWritable* * VALUEIN:默认情况下是mr框架所读到的一行文本的内容String同上用Text* * KEYOUT是用户自定义逻辑处理完成之后输出数据中的key在此处是单词String同上用Text* VALUEOUT是用户自定义逻辑处理完成之后输出数据中的value在此处是单词次数Integer同上用IntWritable*/
public class WordcountMapper extends MapperLongWritable, Text, Text, IntWritable{/*** map阶段的业务逻辑就写在自定义的map()方法中* maptask会对每一行输入数据调用一次我们自定义的map()方法*/Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //将maptask传给我们的文本内容先转换成StringString line value.toString();//根据空格将这一行切分成单词String[] words line.split( ); //将单词输出为单词1for(String word:words){//将单词作为key将次数1作为value以便于后续的数据分发可以根据单词分发以便于相同单词会到相同的reduce taskcontext.write(new Text(word), new IntWritable(1));}}
}
reducer类
package mr_test;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
//生命周期框架每传递进来一个k相同的value 组reduce方法就被调用一次* KEYIN, VALUEIN 对应 mapper输出的KEYOUT,VALUEOUT类型对应* KEYOUT, VALUEOUT 是自定义reduce逻辑处理结果的输出数据类型* KEYOUT是单词* VLAUEOUT是总次数*/
public class WordcountReducer extends ReducerText, IntWritable, Text, IntWritable { /*** angelababy,1angelababy,1angelababy,1angelababy,1angelababy,1* hello,1hello,1hello,1hello,1hello,1hello,1* banana,1banana,1banana,1banana,1banana,1banana,1* 入参key是一组相同单词kv对的key*/Overrideprotected void reduce(Text key, IterableIntWritable values,Context context) throws IOException, InterruptedException { int count0;for(IntWritable value:values){countvalue.get(); }context.write(key, new IntWritable(count));}
}
Driver类 用来描述job并提交job
package mr_test;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/*** 相当于一个yarn集群的客户端* 需要在此封装我们的mr程序的相关运行参数指定jar包* 最后提交给yarn*/
public class WordcountDriver {public static void main(String[] args) throws IOException, Exception, InterruptedException {Configuration cf new Configuration();
// 把这个程序打包成一个Job来运行Job job Job.getInstance(); //指定本程序的jar包所在的本地路径job.setJarByClass(WordcountDriver.class); //指定本业务job要使用的mapper/Reducer业务类job.setMapperClass(WorldcountMapper.class);job.setReducerClass(WordcountReducer.class); //指定mapper输出数据的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class); //指定最终输出的数据的kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class); //指定job的输入原始文件所在目录FileInputFormat.setInputPaths(job, new Path(args[0])); //指定job的输出结果所在目录FileOutputFormat.setOutputPath(job, new Path(args[1])); //将job中配置的相关参数以及job所用的java类所在的jar包提交给yarn去运行boolean res job.waitForCompletion(true);System.exit(res?0:1); }
}
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/929007.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!