MapReduce分区

目录

  • 1. MapReduce分区
    • 1.1 哈希分区
    • 1.2 自定义分区
  • 2. 成绩分组
    • 2.1 Map
    • 2.2 Partition
    • 2.3 Reduce
  • 3. 代码和结果
    • 3.1 pom.xml中依赖配置
    • 3.2 工具类util
    • 3.3 GroupScores
    • 3.4 结果
  • 参考

  本文引用的Apache Hadoop源代码基于Apache许可证 2.0,详情请参阅 Apache许可证2.0。

1. MapReduce分区

  在默认情况下,MapReduce认为Reduce函数处理的是数据汇总操作,因此其针对的必定是一个Map函数清洗处理后的相对规模较小的数据集,且需要对整个集群中所有Map的中间输出结果进行统一处理,因此只会启动一个Reduce计算节点来处理。
  这与某些特殊的应用需求并不相匹配。在某些特定的时刻,开发人员希望启动更多的Reduce并发节点来优化最终结果统计的性能,减小数据处理的延迟,这通过简单的设置代码即可完成;而在更定制化的环境中,开发人员希望符合特定规则的Map中间输出结果交由特定的Reduce节点处理,这就需要使用MapReduce分区,开发人员还可以提供自定义的分区规则。
  如果有很多个Reduce任务,每个Map任务就会针对输出进行分区,即为每个Reduce任务建立一个分区。每个分区中有很多键,但每个键对应的键值对记录都在同一分区中。如果不给定自定义的分区规则,则Hadoop使用默认的哈希函数来分区,效率较高。

1.1 哈希分区

  下面是Apache Hadoop中默认哈希分区的源代码。在这个分区规则中,选择Reduce节点的计算方法是(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks

/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.hadoop.mapreduce.lib.partition;import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.Partitioner;/** Partition keys by their {@link Object#hashCode()}. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class HashPartitioner<K, V> extends Partitioner<K, V> {/** Use {@link Object#hashCode()} to partition. */public int getPartition(K key, V value,int numReduceTasks) {return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;}}

1.2 自定义分区

  如果希望实现自定义分区,需要继承Hadoop提供的分区类org.apache.hadoop.mapreduce.Partitioner,下面是该类的声明。继承该分区类的自定义分区类需要实现public abstract int getPartition(KEY key, VALUE value, int numPartitions),该函数的作用是设置Map中间处理结果的分区规则,其中numPartitions是总分区的个数。此外,在自定义分区类时,通过函数返回了多少个分区,那么在MapReduce任务调度代码中需要设置Job.setNumReduceTasks(自定义分区个数)

/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.hadoop.mapreduce;import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable;/** * Partitions the key space.* * <p><code>Partitioner</code> controls the partitioning of the keys of the * intermediate map-outputs. The key (or a subset of the key) is used to derive* the partition, typically by a hash function. The total number of partitions* is the same as the number of reduce tasks for the job. Hence this controls* which of the <code>m</code> reduce tasks the intermediate key (and hence the * record) is sent for reduction.</p>** <p>Note: A <code>Partitioner</code> is created only when there are multiple* reducers.</p>** <p>Note: If you require your Partitioner class to obtain the Job's* configuration object, implement the {@link Configurable} interface.</p>* * @see Reducer*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class Partitioner<KEY, VALUE> {/** * Get the partition number for a given key (hence record) given the total * number of partitions i.e. number of reduce-tasks for the job.*   * <p>Typically a hash function on a all or a subset of the key.</p>** @param key the key to be partioned.* @param value the entry value.* @param numPartitions the total number of partitions.* @return the partition number for the <code>key</code>.*/public abstract int getPartition(KEY key, VALUE value, int numPartitions);}

2. 成绩分组

  成绩文本如下,第一列为人名,第二列为成绩。目标是将成绩分为5段,分别为 [ 0 , 20 ) [0, 20) [0,20) [ 20 , 40 ) [20, 40) [20,40) [ 40 , 60 ) [40, 60) [40,60) [ 60 , 80 ) [60, 80) [60,80) [ 80 , 100 ] [80, 100] [80,100]

1 23
2 78
3 45
4 12
5 67
6 34
7 89
8 56
9 9
10 78
11 21
12 54
13 83
14 10
15 65
16 39
17 92
18 47
19 28
20 72

2.1 Map

  假设这个MapReduce作业使用了1个Map,Map的作用是从文本获取<人名,成绩>键值对,同时保证成绩在有效范围内。
在这里插入图片描述

2.2 Partition

  根据成绩进行分区,其中1的范围是 [ 20 , 40 ) [20, 40) [20,40),3的范围是 [ 60 , 80 ) [60, 80) [60,80),2的范围是 [ 40 , 60 ) [40, 60) [40,60),4的范围是 [ 80 , 100 ] [80, 100] [80,100],0的范围是 [ 0 , 20 ) [0, 20) [0,20)
在这里插入图片描述

2.3 Reduce

  reduce不进行任何操作,直接将分区结果排序后写入5个文件中。
在这里插入图片描述

3. 代码和结果

3.1 pom.xml中依赖配置

  <dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.3.6</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>3.3.6</version><type>pom</type></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-jobclient</artifactId><version>3.3.6</version></dependency></dependencies>

3.2 工具类util

import java.net.URI;
import java.util.regex.Matcher;
import java.util.regex.Pattern;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;public class util {public static FileSystem getFileSystem(String uri, Configuration conf) throws Exception {URI add = new URI(uri);return FileSystem.get(add, conf);}public static void removeALL(String uri, Configuration conf, String path) throws Exception {FileSystem fs = getFileSystem(uri, conf);if (fs.exists(new Path(path))) {boolean isDeleted = fs.delete(new Path(path), true);System.out.println("Delete Output Folder? " + isDeleted);}}public static void  showResult(String uri, Configuration conf, String path) throws Exception {FileSystem fs = getFileSystem(uri, conf);String regex = "part-r-";Pattern pattern = Pattern.compile(regex);if (fs.exists(new Path(path))) {FileStatus[] files = fs.listStatus(new Path(path));for (FileStatus file : files) {Matcher matcher = pattern.matcher(file.getPath().toString());if (matcher.find()) {System.out.println(file.getPath() + ":");FSDataInputStream openStream = fs.open(file.getPath());IOUtils.copyBytes(openStream, System.out, 1024);openStream.close();}}}}
}

3.3 GroupScores

import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class App {public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String[] splitStr = value.toString().split(" ");Text keyOut = new Text(splitStr[0]);int grade = Integer.parseInt(splitStr[1]);if (grade >= 0 && grade <= 100) {IntWritable valueOut = new IntWritable(grade);context.write(keyOut, valueOut);}}}public static class MyPartitioner extends Partitioner<Text, IntWritable> {@Overridepublic int getPartition(Text key, IntWritable value, int numPartitions) {if (value.get() >= 80) {return 0;} else if (value.get() >= 60) {return 1;} else if (value.get() >= 40) {return 2;} else if (value.get() >= 20) {return 3;} else {return 4;}}}public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {for (IntWritable value : values) {context.write(key, value);}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] myArgs = {"file:///home/developer/CodeArtsProjects/mapreduce-partitioner/values.txt","hdfs://localhost:9000/user/developer/GroupScores/output"};util.removeALL("hdfs://localhost:9000", conf, myArgs[myArgs.length - 1]);Job job = Job.getInstance(conf, "GroupScores");job.setMapperClass(MyMapper.class);job.setPartitionerClass(MyPartitioner.class);job.setReducerClass(MyReducer.class);job.setNumReduceTasks(5);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);for (int i = 0; i < myArgs.length - 1; i++) {FileInputFormat.setInputPaths(job, new Path(myArgs[i]));}FileOutputFormat.setOutputPath(job, new Path(myArgs[myArgs.length - 1]));int res = job.waitForCompletion(true) ? 0 : 1;if (res == 0) {System.out.println("GroupScores结果为:");util.showResult("hdfs://localhost:9000", conf, myArgs[myArgs.length - 1]);}System.exit(res);}
}

3.4 结果

在这里插入图片描述

参考

吴章勇 杨强著 大数据Hadoop3.X分布式处理实战

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/67929.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

kamailio-ACC_JSON模块详解【后端语言go】

要确认 ACC_JSON 模块是否已经成功将计费信息推送到消息队列&#xff08;MQueue&#xff09;&#xff0c;以及如何从队列中取值&#xff0c;可以按照以下步骤进行操作&#xff1a; 1. 确认 ACC_JSON 已推送到队列 1.1 配置 ACC_JSON 确保 ACC_JSON 模块已正确配置并启用。以下…

网件r7000刷回原厂固件合集测评

《网件R7000路由器刷回原厂固件详解》 网件R7000是一款备受赞誉的高性能无线路由器&#xff0c;其强大的性能和可定制性吸引了许多高级用户。然而&#xff0c;有时候用户可能会尝试第三方固件以提升功能或优化网络性能&#xff0c;但这也可能导致一些问题&#xff0c;如系统不…

【C++STL标准模板库】二、STL三大组件

文章目录 1、容器2、算法3、迭代器 二、STL三大组件 1、容器 容器&#xff0c;置物之所也。 研究数据的特定排列方式&#xff0c;以利于搜索或排序或其他特殊目的&#xff0c;这一门学科我们称为数据结构。大学信息类相关专业里面&#xff0c;与编程最有直接关系的学科&…

基于 Java 开发的 MongoDB 企业级应用全解析

基于Java的MongoDB企业级应用开发实战 目录 背景与历史MongoDB的核心功能与特性企业级业务场景分析MongoDB的优缺点剖析开发环境搭建 5.1 JDK安装与配置5.2 MongoDB安装与集群配置5.3 开发工具选型 Java与MongoDB集成实战 6.1 项目依赖与驱动选择6.2 连接池与客户端配置6.3…

需求分析应该从哪些方面来着手做?

需求分析一般可从以下几个方面着手&#xff1a; 业务需求方面 - 与相关方沟通&#xff1a;与业务部门、客户等进行深入交流&#xff0c;通过访谈、问卷调查、会议讨论等方式&#xff0c;明确他们对项目的期望、目标和整体业务需求&#xff0c;了解项目要解决的业务问题及达成的…

算法题(57):找出字符串中第一个匹配项的下标

审题: 需要我们根据原串与模式串相比较并找到完全匹配时子串的第一个元素索引&#xff0c;若没有则返回-1 思路&#xff1a; 方法一&#xff1a;BF暴力算法 思路很简单&#xff0c;我们用p1表示原串的索引&#xff0c;p2表示模式串索引。遍历原串&#xff0c;每次遍历都匹配一次…

求组合数(递推法、乘法逆元、卢卡斯定理、分解质因数)

文章目录 递推法 10^4代码 乘法逆元 10^6代码 卢卡斯定理 1 0 18 m o d 1 0 6 10^{18}mod 10^6 1018mod106代码 分解质因数 常规的解法就不多加赘述了&#xff0c;如&#xff08;分子/分母&#xff0c;边乘边除&#xff09;&#xff0c;本文讲述以下方法&#xff1a; 递推法 了…

WPF进阶 | WPF 动画特效揭秘:实现炫酷的界面交互效果

WPF进阶 | WPF 动画特效揭秘&#xff1a;实现炫酷的界面交互效果 前言一、WPF 动画基础概念1.1 什么是 WPF 动画1.2 动画的基本类型1.3 动画的核心元素 二、线性动画详解2.1 DoubleAnimation 的使用2.2 ColorAnimation 实现颜色渐变 三、关键帧动画深入3.1 DoubleAnimationUsin…

【Numpy核心编程攻略:Python数据处理、分析详解与科学计算】2.27 NumPy+Pandas:高性能数据处理的黄金组合

2.27 NumPyPandas&#xff1a;高性能数据处理的黄金组合 目录 #mermaid-svg-x3ndEE4hrhO6WR6H {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-x3ndEE4hrhO6WR6H .error-icon{fill:#552222;}#mermaid-svg-x3ndEE4hr…

swagger使用指引

1.swagger介绍 在前后端分离开发中通常由后端程序员设计接口&#xff0c;完成后需要编写接口文档&#xff0c;最后将文档交给前端工程师&#xff0c;前端工程师参考文档进行开发。 可以通过一些工具快速生成接口文档 &#xff0c;本项目通过Swagger生成接口在线文档 。 什么…

DeepSeek API文档解读(对话模块)

对话&#xff08;Chat&#xff09; 对话补全 报文message对象数组 System message name 一个在线聊天系统&#xff0c;其中涉及多个用户和一个系统管理员。在这个系统中&#xff0c;每个用户都可以发送消息&#xff0c;并且系统管理员可以监控和回复这些消息。为了区分不同…

【Numpy核心编程攻略:Python数据处理、分析详解与科学计算】2.19 线性代数核武器:BLAS/LAPACK深度集成

2.19 线性代数核武器&#xff1a;BLAS/LAPACK深度集成 目录 #mermaid-svg-yVixkwXWUEZuu02L {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-yVixkwXWUEZuu02L .error-icon{fill:#552222;}#mermaid-svg-yVixkwXWUEZ…

Linux——文件与磁盘

1. 磁盘结构 磁盘在我们的计算机中有着重要的地位&#xff0c;当文件没有被打开时其数据就存储在磁盘上&#xff0c;要了解磁盘的工作原理先要了解磁盘的结构。 1.1 磁盘的物理结构 以传统的存储设备机械硬盘为例&#xff0c;它通过磁性盘片和磁头来读写数据。磁盘内部有多个旋…

【Envi遥感图像处理】010:归一化植被指数NDVI计算方法

文章目录 一、NDVI简介二、NDVI计算方法1. NDVI工具2. 波段运算三、注意事项1. 计算结果为一片黑2. 计算结果超出范围一、NDVI简介 归一化植被指数,是反映农作物长势和营养信息的重要参数之一,应用于遥感影像。NDVI是通过植被在近红外波段(NIR)和红光波段(R)的反射率差异…

UE虚幻引擎No Google Play Store Key:No OBB found报错如何处理

UE虚幻引擎No Google Play Store Key&#xff1a;No OBB found报错如何处理&#xff1f; 问题描述&#xff1a; UE成功打包APK并安装过后&#xff0c;启动应用时提示&#xff1a; No Google Play Store KeyNo OBB found and no store key to try to download. Please setone …

C++并发编程指南04

文章目录 共享数据的问题3.1.1 条件竞争双链表的例子条件竞争示例恶性条件竞争的特点 3.1.2 避免恶性条件竞争1. 使用互斥量保护共享数据结构2. 无锁编程3. 软件事务内存&#xff08;STM&#xff09; 总结互斥量与共享数据保护3.2.1 互斥量使用互斥量保护共享数据示例代码&…

【Redis】主从模式,哨兵,集群

主从复制 单点问题&#xff1a; 在分布式系统中&#xff0c;如果某个服务器程序&#xff0c;只有一个节点&#xff08;也就是一个物理服务器&#xff09;来部署这个服务器程序的话&#xff0c;那么可能会出现以下问题&#xff1a; 1.可用性问题&#xff1a;如果这个机器挂了…

Vue.js 如何选择合适的组件库

Vue.js 如何选择合适的组件库 大家在开发 Vue.js 项目的时候&#xff0c;都会面临一个问题&#xff1a;我该选择哪个组件库&#xff1f; 市面上有很多优秀的 Vue 组件库&#xff0c;比如 Element Plus、Vuetify、Quasar 等&#xff0c;它们各有特点。选择合适的组件库&#xf…

寒假(一)

请使用消息队列实现2个终端之间互相聊天 终端一 #include <stdio.h> #include <string.h> #include <unistd.h> #include <stdlib.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <pthread.h&g…

java项目验证码登录

1.依赖 导入hutool工具包用于创建验证码 <dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.5.2</version></dependency> 2.测试 生成一个验证码图片&#xff08;生成的图片浏览器可…