mapreduce 聚合_MapReduce:处理数据密集型文本处理–局部聚合第二部分

mapreduce 聚合

这篇文章继续进行有关使用MapReduce进行数据密集型处理的书中实现算法的系列文章。 第一部分可以在这里找到。 在上一篇文章中,我们讨论了使用本地聚合技术来减少通过网络进行混洗和传输的数据量的方法。 减少传输的数据量是提高MapReduce作业效率的主要方法之一。 单词计数MapReduce作业用于演示本地聚合。 由于结果只需要总数,因此我们可以为合并器重新使用相同的化简器,因为更改加数的顺序或分组不会影响总和。

但是,如果您想要平均水平呢? 然后,由于计算平均值的平均值不等于原始数字集的平均值,因此相同的方法将行不通。 尽管有了一点见识,我们仍然可以使用本地聚合。 对于这些示例,我们将使用Hadoop最终指南书中使用的NCDC天气数据集的示例。 我们将计算1901年每个月的平均温度。可以在MapReduce的数据密集型处理的第3.1.3章中找到组合器和映射器内组合选项的平均值算法。

一种尺寸并不适合所有人

上次我们介绍了两种用于在MapReduce作业中减少数据的方法:Hadoop组合器和映射器内组合方法。 Hadoop框架将组合器视为一种优化,并且无法保证调用组合器的次数(如果有的话)。 结果,映射器必须以减速器期望的形式发出数据,因此,如果不涉及组合器,则最终结果不会更改。 为了调整计算平均值,我们需要返回到映射器并更改其输出。

映射器更改

在单词计数示例中,未优化的映射器仅发出单词和1的计数。合并器和映射器内组合映射器通过将每个单词作为哈希映射中的键(总计数为n)来优化此输出。值。 每次看到一个单词,计数都会增加1。使用此设置时,如果未调用组合器,则缩减器将接收到该单词作为键,并将一长串的1?s加在一起,从而得到相同的输出(当然,使用映射器内组合映射器可以避免此问题,因为可以保证合并结果是映射器代码的一部分)。 为了计算平均值,我们将使基本映射器发出一个字符串键(将天气观测的年和月连接在一起)和一个自定义可写对象,称为TemperatureAveragingPair。 TemperatureAveragingPair对象将包含两个数字(IntWritables),获取的温度和一个计数。 我们将从Hadoop:权威指南中获取MaximumTemperatureMapper,并以此为灵感来创建AverageTemperatureMapper:

public class AverageTemperatureMapper extends Mapper<LongWritable, Text, Text, TemperatureAveragingPair> {//sample line of weather data//0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF10899199999999999private Text outText = new Text();private TemperatureAveragingPair pair = new TemperatureAveragingPair();private static final int MISSING = 9999;@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String yearMonth = line.substring(15, 21);int tempStartPosition = 87;if (line.charAt(tempStartPosition) == '+') {tempStartPosition += 1;}int temp = Integer.parseInt(line.substring(tempStartPosition, 92));if (temp != MISSING) {outText.set(yearMonth);pair.set(temp, 1);context.write(outText, pair);}}
}

通过使映射器输出键和TemperatureAveragingPair对象,无论调用组合器如何,我们的MapReduce程序都可以保证具有正确的结果。

合路器

我们需要减少发送的数据量,因此我们将对温度求和,并对计数求和并分别存储。 这样,我们将减少发送的数据,但保留计算正确平均值所需的格式。 如果/在调用组合器时,它将采用所有传入的TemperatureAveragingPair对象,并为同一键发出单个TemperatureAveragingPair对象,其中包含温度和计数值的总和。 这是合并器的代码:

public class AverageTemperatureCombiner extends Reducer<Text,TemperatureAveragingPair,Text,TemperatureAveragingPair> {private TemperatureAveragingPair pair = new TemperatureAveragingPair();@Overrideprotected void reduce(Text key, Iterable<TemperatureAveragingPair> values, Context context) throws IOException, InterruptedException {int temp = 0;int count = 0;for (TemperatureAveragingPair value : values) {temp += value.getTemp().get();count += value.getCount().get();}pair.set(temp,count);context.write(key,pair);}
}

但是我们非常有兴趣确保我们减少了发送到reducer的数据量,因此我们将看看下一步如何实现。

在Mapper合并平均值中

与单词计数示例相似,为了计算平均值,映射器内组合映射器将使用哈希图,将连接的年+月作为键,将TemperatureAveragingPair作为值。 每次获得相同的年+月组合时,我们都会将对对象从地图中取出,添加温度并将计数增加一个。 调用cleanup方法后,我们将发出所有对及其各自的键:

public class AverageTemperatureCombiningMapper extends Mapper<LongWritable, Text, Text, TemperatureAveragingPair> {//sample line of weather data//0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF10899199999999999private static final int MISSING = 9999;private Map<String,TemperatureAveragingPair> pairMap = new HashMap<String,TemperatureAveragingPair>();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String yearMonth = line.substring(15, 21);int tempStartPosition = 87;if (line.charAt(tempStartPosition) == '+') {tempStartPosition += 1;}int temp = Integer.parseInt(line.substring(tempStartPosition, 92));if (temp != MISSING) {TemperatureAveragingPair pair = pairMap.get(yearMonth);if(pair == null){pair = new TemperatureAveragingPair();pairMap.put(yearMonth,pair);}int temps = pair.getTemp().get() + temp;int count = pair.getCount().get() + 1;pair.set(temps,count);}}@Overrideprotected void cleanup(Context context) throws IOException, InterruptedException {Set<String> keys = pairMap.keySet();Text keyText = new Text();for (String key : keys) {keyText.set(key);context.write(keyText,pairMap.get(key));}}
}

通过遵循在映射调用之间跟踪数据的相同模式,我们可以通过实现映射器内合并策略来实现可靠的数据缩减。 同样的注意事项适用于在对映射器的所有调用中保持状态,但是考虑使用这种方法可以提高处理效率,这值得考虑。

减速器

在这一点上,编写我们的reducer很容易,为每个键获取一个成对列表,将所有温度和计数求和,然后将温度总和除以计数总和。

public class AverageTemperatureReducer extends Reducer<Text, TemperatureAveragingPair, Text, IntWritable> {private IntWritable average = new IntWritable();@Overrideprotected void reduce(Text key, Iterable<TemperatureAveragingPair> values, Context context) throws IOException, InterruptedException {int temp = 0;int count = 0;for (TemperatureAveragingPair pair : values) {temp += pair.getTemp().get();count += pair.getCount().get();}average.set(temp / count);context.write(key, average);}
}


结果

使用合并器和映射器内合并映射器选项可以预测结果,从而显着减少数据输出。
未优化的映射器选项:

12/10/10 23:05:28 INFO mapred.JobClient:     Reduce input groups=12
12/10/10 23:05:28 INFO mapred.JobClient:     Combine output records=0
12/10/10 23:05:28 INFO mapred.JobClient:     Map input records=6565
12/10/10 23:05:28 INFO mapred.JobClient:     Reduce shuffle bytes=111594
12/10/10 23:05:28 INFO mapred.JobClient:     Reduce output records=12
12/10/10 23:05:28 INFO mapred.JobClient:     Spilled Records=13128
12/10/10 23:05:28 INFO mapred.JobClient:     Map output bytes=98460
12/10/10 23:05:28 INFO mapred.JobClient:     Total committed heap usage (bytes)=269619200
12/10/10 23:05:28 INFO mapred.JobClient:     Combine input records=0
12/10/10 23:05:28 INFO mapred.JobClient:     Map output records=6564
12/10/10 23:05:28 INFO mapred.JobClient:     SPLIT_RAW_BYTES=108
12/10/10 23:05:28 INFO mapred.JobClient:     Reduce input records=6564

组合器选项:

12/10/10 23:07:19 INFO mapred.JobClient:     Reduce input groups=12
12/10/10 23:07:19 INFO mapred.JobClient:     Combine output records=12
12/10/10 23:07:19 INFO mapred.JobClient:     Map input records=6565
12/10/10 23:07:19 INFO mapred.JobClient:     Reduce shuffle bytes=210
12/10/10 23:07:19 INFO mapred.JobClient:     Reduce output records=12
12/10/10 23:07:19 INFO mapred.JobClient:     Spilled Records=24
12/10/10 23:07:19 INFO mapred.JobClient:     Map output bytes=98460
12/10/10 23:07:19 INFO mapred.JobClient:     Total committed heap usage (bytes)=269619200
12/10/10 23:07:19 INFO mapred.JobClient:     Combine input records=6564
12/10/10 23:07:19 INFO mapred.JobClient:     Map output records=6564
12/10/10 23:07:19 INFO mapred.JobClient:     SPLIT_RAW_BYTES=108
12/10/10 23:07:19 INFO mapred.JobClient:     Reduce input records=12

映射器内合并选项:

12/10/10 23:09:09 INFO mapred.JobClient:     Reduce input groups=12
12/10/10 23:09:09 INFO mapred.JobClient:     Combine output records=0
12/10/10 23:09:09 INFO mapred.JobClient:     Map input records=6565
12/10/10 23:09:09 INFO mapred.JobClient:     Reduce shuffle bytes=210
12/10/10 23:09:09 INFO mapred.JobClient:     Reduce output records=12
12/10/10 23:09:09 INFO mapred.JobClient:     Spilled Records=24
12/10/10 23:09:09 INFO mapred.JobClient:     Map output bytes=180
12/10/10 23:09:09 INFO mapred.JobClient:     Total committed heap usage (bytes)=269619200
12/10/10 23:09:09 INFO mapred.JobClient:     Combine input records=0
12/10/10 23:09:09 INFO mapred.JobClient:     Map output records=12
12/10/10 23:09:09 INFO mapred.JobClient:     SPLIT_RAW_BYTES=108
12/10/10 23:09:09 INFO mapred.JobClient:     Reduce input records=12

计算结果:
(注意:示例文件中的温度以摄氏度* 10为单位)

未优化 合路器 映射器内合并器映射器
190101 -25
190102 -91
190103 -49 190104 22 190105 76 190106 146 190107 192 190108 170 190109 114 190110 86 190111 -16 190112 -77
190101 -25
190102 -91
190103 -49 190104 22 190105 76 190106 146 190107 192 190108 170 190109 114 190110 86 190111 -16 190112 -77
190101 -25
190102 -91
190103 -49 190104 22 190105 76 190106 146 190107 192 190108 170 190109 114 190110 86 190111 -16 190112 -77


结论

我们已经讨论了本地聚合,无论是简单的情况(可以将reducer用作组合器),还是更复杂的情况(对于如何构造数据,同时仍能从本地聚合数据以提高处理效率)中获得一些见解。

进一步阅读

  • Jimmy Lin和Chris Dyer 使用MapReduce进行的数据密集型处理
  • Hadoop: Tom White 的权威指南
  • 来自博客的源代码
  • Hadoop API
  • MRUnit用于单元测试Apache Hadoop映射减少工作
  • Gutenberg项目提供了大量纯文本格式的书籍,非常适合在本地测试Hadoop作业。

参考: 使用MapReduce进行数据密集型文本处理-本地聚合第二部分,来自我们的JCG合作伙伴 Bill Bejeck,来自“ 随机编码思考”博客。


翻译自: https://www.javacodegeeks.com/2012/10/mapreduce-working-through-data-2.html

mapreduce 聚合

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/352381.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

最常出现的字符串 Most Common Word

2018-10-26 00:32:05 问题描述&#xff1a; 问题求解&#xff1a; 方法一、Trie 最长出现的字符串&#xff0c;最容易想到的解法就是Trie树了&#xff0c;于是首先使用Trie树进行了实现&#xff0c;代码量有点大&#xff0c;当然了是可以A掉的&#xff0c;只是对于这种Easy的题…

docker启动odoo提示module没有安装_Ubuntu20.04通过docker安装微信

到目前为止&#xff0c;在ubuntu20.04上使用wechat最简单的方式不是wine&#xff0c;而是用docker。今天就传授大家一个一定可以使用的docker安装的wine版本。首先&#xff0c;安装一下docker&#xff1a;sudo apt install docker.io sudo systemctl enable --now dockersudo s…

mysql如何在一个表中插入数据的同时,更新另一个表的数据?

三种方案,你看看哪个比较适合你1,适用于学生: 写两个方法,一个新增一个更新,在新增完了以后马上去查询一下,按主键倒叙排列,取到最新插入的id,前提主键是自增的且不是uuid,然后把查到的主键返回出去作为形参让更新方法接收到,然后更新即可.2,适用于ssh框架: 写两个事务,事务的传…

Python Web初学解惑之 WSGI、flup、fastcgi、web.py的关系

首先声明这篇文章 是我从 豆瓣 上面看到的。 原文地址 http://www.douban.com/note/13508388/?start0&postok#last 看我之后 豁然开朗&#xff0c;对Web的理解有加深了一层&#xff0c;在此再感谢一下文章的作者。写这篇文章 &#xff1a;一 写下 自己的理解&#xff1…

继承Javadoc方法注释

尽管用于javadoc工具的JDK工具和实用程序页面通过实现和继承方法来描述Javadoc方法注释重用的规则&#xff0c;但是当实际上不需要使用{inheritDoc}时&#xff0c;很容易不必要地显式描述注释继承&#xff0c;因为会使用相同的注释隐式继承。 Java 8 javadoc工具页面在“ 方法公…

C++之手写strlen函数

代码&#xff1a; int strlen(const char *str){ assert(str!NULL); intlen0; while((*str)!\0) len;return len; } 这个函数实现起来较为简单&#xff0c;注意字符指针的有效性检查。 可参考&#xff1a;strlen、strcpy、strcat等字符串处理函数的实现 转载于:https://www.cnb…

mysql or优化_MySQL 语句优化

官方文档放这里&#xff0c;有什么代码先到官方文档查询&#xff1a;MySQL 8.0 Reference Manual :: 8 Optimization​dev.mysql.com优化涉及多个级别的配置&#xff0c;调整和测量性能。 根据工作角色&#xff08;开发人员&#xff0c;DBA或两者的组合&#xff09;&#xff0c…

浅谈 MySQL 的存储引擎(表类型)

什么是MySql数据库 通常意义上&#xff0c;数据库也就是数据的集合&#xff0c;具体到计算机上数据库可以是存储器上一些文件的集合或者一些内存数据的集合。 我们通常说的MySql数据库&#xff0c;sql server数据库等等其实是数据库管理系统&#xff0c;它们可以存储数据&#…

Ubuntu如何搭建Django与Flup和Nginx环境?

Ubuntu系统越来越多的用户开始使用&#xff0c;本文介绍的是搭建DjangoFlupNginx环境的过程。 首先我们必须明白这这3者在该环境下发挥的作用。 1.nginx&#xff1a;("enginex")是一个高性能的HTTP和反向代理服务器&#xff0c;作用和apache的类似。它可以处理一些静…

洛谷 1137 旅行计划

【题解】 拓扑排序DP即可。 1 #include<cstdio>2 #include<cstring>3 #include<algorithm>4 #include<vector>5 #define LL long long6 #define rg register7 #define N 2000108 using namespace std;9 int n,m,front,rear,q[N],f[N],in[N]; 10 bool v…

NOT IN、JOIN、IS NULL、NOT EXISTS效率对比

语句一&#xff1a;select count(*) from A where A.a not in (select a from B) 语句二&#xff1a;select count(*) from A left join B on A.a B.a where B.a is null 语句三&#xff1a;select count(*) from A where not exists (select a from B where A.a B.a) 知道以…

python ssl_Python3 ssl模块不可用的问题

编译安装完Python3之后&#xff0c;使用pip来安装python库&#xff0c;发现了如下报错&#xff1a; $ pip install numpy pipis configured with locations that require TLS/SSL, however the ssl module in Python isnot available. Collecting numpy Retrying (Retry(total4…

shell脚本 如何切换当前目录

问题&#xff1a; 是这么个情况&#xff1a;当前目录是/root/replace/ 我想在脚本a.sh中执行该目录下的一个子目录/root/replace/scripts/下的可执行文件run.out和b.sh脚本。但是这个可执行文件run.out的文件读写要求在/scripts下&#xff0c;而b.sh脚本则是以/scripts为当前目…

使用DynamoDBMapper扫描DynamoDB项目

之前&#xff0c;我们介绍了如何使用DynamoDBMapper或底层Java api查询DynamoDB数据库。 除了发出查询之外&#xff0c;DynamoDB还提供扫描功能。 扫描的目的是获取您在DynamoDB表上可能拥有的所有项目。 因此&#xff0c;扫描不需要任何基于我们的分区键或您的全局/本地二级…

vs python生成exe文件_使用VScode编写python程序并打包成.exe文件-文件夹变成exe

1. 下载vscode并安装 2. 配置Python环境 点击左下角的吃了图标&#xff0c;在弹出的菜单中选择extensions&#xff0c;在左上方搜索框内输入“Python”&#xff0c;可以看到好多Python插件&#xff0c;选择安装喜欢的Python插件&#xff08;配图两张&#xff09;。安装Python插…

如何清除主板CMOS

最近很多网友反映&#xff0c;对于清除主板CMOS&#xff0c;不是很了解&#xff0c;操作上也不明白&#xff0c;因此网站重新制作主板CMOS清除的过程&#xff0c;并以图文的形式制作&#xff0c;以便更加直观清楚。 CMOS(本意是指互补金属氧化物半导体存储嚣&#xff0c;是一…

ubuntu如何安装samba

1.samba安装sudo apt-get install samba2.修改smb.confsudo gedit /etc/samba/smb.conf 文件最后增加如下代码&#xff1a;[share] path /home/liunx/share available yes browseable yes public yes writable yes 4.进入home/liunx目录创建share文件夹mkdir /share …

文本处理工具AWK详解

awk简介 awk: 中文意思是报告生成器 能够根据我们输入的信息&#xff0c;将信息格式化以后显示&#xff0c;将定义好的信息以比较美观&#xff08;直观&#xff09;的方式显示出来出现比较早&#xff0c;继而出现了new awk&#xff08;nawk&#xff09;在windows上实现&#x…

安装12G内存读出内存条为3.45G的处理方法

在台式电脑上安装2个内存条&#xff0c;之前是4G的后来加安装了8G内存&#xff0c;本应该有12G但是在电脑上却显示只有3.45G内存为可用&#xff0c; 以下是处理方式&#xff1a; 1、确定两条内存都是可以用 2、在cmd 中输入msconfig 点击enter就可以入系统配置 然后点击引导…

顺序表的介绍与简单运用

1&#xff1a;解释与结构 顺序表是用一段物理地址连续的存储单元依次存储数据元素的线性结构&#xff0c;一般情况下采用数组存 储。在数组上完成数据的增删查改。 顺序表一般可分为一下几类 1.1 静态顺序表 概念&#xff1a;使用定长数组存储元素。注意&#xff1a;这种是…