代码实现——MapReduce实现Hadoop序列化

简单介绍

1、什么是序列化

  • 序列化:把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。
  • 反序列化:将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。

2、 为什么要序列化

对象的序列化(Serialization)用于将对象编码成一个字节流,以及从字节流中重新构建对象。"将一个对象编码成一个字节流"称为序列化该对象(SeTializing);相反的处理过程称为反序列化(Deserializing)。 序列化有三种主要的用途:

  1. 作为一种持久化格式:一个对象被序列化以后,它的编码可以被存储到磁盘上,供以后反序列化用。

  2. 作为一种通信数据格式:序列化结果可以从一个正在运行的虚拟机,通过网络被传递到另一个虚拟机上。

  3. 作为一种拷贝、克隆(clone)机制:将对象序列化到内存的缓存区中。然后通过反序列化,可以得到一个对已存对象进行深拷贝的新对象。

在分布式数据处理中,主要使用上面提到的前两种功能:数据持久化和通信数据格式

需求

统计每一个手机号耗费的总上行流量、下行流量、总流量(txt文档在/Users/lizhengi/test/input/目录下)

1       13736230513     192.196.2.1     www.shouhu.com  2481    24681   200
2       13846544121     192.196.2.2                     264     0       200
3       13956435636     192.196.2.3                     132     1512    200
4       13966251146     192.168.2.1                     240     0       404
5       18271575951     192.168.2.2     www.shouhu.com  1527    2106    200
6       18240717138     192.168.2.3     www.hao123.com  4116    1432    200
7       13590439668     192.168.2.4                     1116    954     200
8       15910133277     192.168.2.5     www.hao123.com  3156    2936    200
9       13729199489     192.168.2.6                     240     0       200
10      13630577991     192.168.2.7     www.shouhu.com  6960    690     200
11      15043685818     192.168.2.8     www.baidu.com   3659    3538    200
12      15959002129     192.168.2.9     www.hao123.com  1938    180     500
13      13560439638     192.168.2.10                    918     4938    200
14      13470253144     192.168.2.11                    180     180     200
15      13682846555     192.168.2.12    www.qq.com      1938    2910    200
16      13992314666     192.168.2.13    www.gaga.com    3008    3720    200
17      13509468723     192.168.2.14    www.qinghua.com 7335    110349  404
18      18390173782     192.168.2.15    www.sogou.com   9531    2412    200
19      13975057813     192.168.2.16    www.baidu.com   11058   48243   200
20      13768778790     192.168.2.17                    120     120     200
21      13568436656     192.168.2.18    www.alibaba.com 2481    24681   200
22      13568436656     192.168.2.19                    1116    954     200

实现过程

1、新建Maven工程,pom.xml依赖如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.lizhengi</groupId><artifactId>Hadoop-API</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>RELEASE</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.8.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.2.1</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.2.1</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>3.2.1</version></dependency></dependencies></project>

2、src/main/resources目录下,新建一个文件,命名为“log4j.properties”,添加内容如下

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

3、编写Bean类-FlowBean

package com.lizhengi.flow;import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;/*** @author lizhengi* @create 2020-07-20*/
// 1 实现writable接口
public class FlowBean implements Writable {private long upFlow;    //上行流量private long downFlow;  //下行流量private long sumFlow;   //总流量//2  反序列化时,需要反射调用空参构造函数,所以必须有public FlowBean() {}@Overridepublic String toString() {return upFlow + "\t" + downFlow + "\t" + sumFlow;}public void set(long upFlow, long downFlow) {this.upFlow = upFlow;this.downFlow = downFlow;this.sumFlow = upFlow + downFlow;}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDownFlow() {return downFlow;}public void setDownFlow(long downFlow) {this.downFlow = downFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}//3  写序列化方法public void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);}//4 反序列化方法//5 反序列化方法读顺序必须和写序列化方法的写顺序必须一致public void readFields(DataInput in) throws IOException {upFlow = in.readLong();downFlow = in.readLong();sumFlow = in.readLong();}}

4、编写Mapper类-FlowMapper

package com.lizhengi.flow;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** @author lizhengi* @create 2020-07-20*/
public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {private Text phone = new Text();private FlowBean flow = new FlowBean();@Overrideprotected void map(LongWritable key, Text value, Context context)	throws IOException, InterruptedException {String[] fields = value.toString().split("\t");phone.set(fields[1]);long upFlow = Long.parseLong(fields[fields.length - 3]);long downFlow = Long.parseLong(fields[fields.length - 2]);flow.set(upFlow,downFlow);context.write(phone, flow);}
}

5、编写Reducer类-FlowReducer

package com.lizhengi.flow;import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;/*** @author lizhengi* @create 2020-07-20*/
public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {private FlowBean sunFlow = new FlowBean();@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Context context)throws IOException, InterruptedException {long sum_upFlow = 0;long sum_downFlow = 0;// 1 遍历所用bean,将其中的上行流量,下行流量分别累加for (FlowBean value : values) {sum_upFlow += value.getUpFlow();sum_downFlow += value.getDownFlow();}// 2 封装对象sunFlow.set(sum_upFlow, sum_downFlow);// 3 写出context.write(key, sunFlow);}
}

6、编写Drvier类-FlowDriver

package com.lizhengi.flow;/*** @author lizhengi* @create 2020-07-20*/import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FlowDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 1 获取job实例Job job = Job.getInstance(new Configuration());// 2.设置类路径job.setJarByClass(FlowDriver.class);// 3 指定本业务job要使用的mapper/Reducer业务类job.setMapperClass(FlowMapper.class);job.setReducerClass(FlowReducer.class);// 4 指定mapper输出数据的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);// 5 指定最终输出的数据的kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);// 6 指定job的输入原始文件所在目录FileInputFormat.setInputPaths(job, "/Users/marron27/test/input");FileOutputFormat.setOutputPath(job, new Path("/Users/marron27/test/output"));//FileInputFormat.setInputPaths(job, new Path(args[0]));//FileOutputFormat.setOutputPath(job, new Path(args[1]));// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}

结果展示

Carlota:output marron27$ pwd
/Users/marron27/test/output
Carlota:output marron27$ cat part-r-00000 
13470253144	180	180	360
13509468723	7335	110349	117684
13560439638	918	4938	5856
13568436656	3597	25635	29232
13590439668	1116	954	2070
13630577991	6960	690	7650
13682846555	1938	2910	4848
13729199489	240	0	240
13736230513	2481	24681	27162
13768778790	120	120	240
13846544121	264	0	264
13956435636	132	1512	1644
13966251146	240	0	240
13975057813	11058	48243	59301
13992314666	3008	3720	6728
15043685818	3659	3538	7197
15910133277	3156	2936	6092
15959002129	1938	180	2118
18240717138	4116	1432	5548
18271575951	1527	2106	3633
18390173782	9531	2412	11943

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/535809.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

日常问题——hadoop启动后发现namenode没有启动,但是排除了格式化过度的问题

hadoop启动后发现namenode没有启动&#xff0c;网上说的格式化过度的问题我是没有的&#xff0c;因为我只格式化过一次。之后查看日志 vim /opt/hadoop/logs/namenode对应的log文件 发现 2020-03-03 23:16:21,868 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Na…

Zookeeper3.6.1常用的Shell命令

1、客户端连接 zkCli.sh zkCli.sh -server host:port2、显示节点信息 -s状态 -w监听器 -R递归 ls [-s] [-w] [-R] path3、创建节点 -s加序列号 -e临时节点 create [-s] [-e] path [data]4、获取节点值 -s状态 -w监听器 get [-s] [-w] path5、设置节点值 -s状态 set [-s] […

CentOS7下MySQL5.7的安装

1、下载MySQL 安装包&#xff1a; wget https://dev.mysql.com/get/mysql57-community-release-el7-11.noarch.rpmyum -y localinstall mysql57-community-release-el7-11.noarch.rpm 2、在线安装MySQL yum -y install mysql-community-server 3、启动mysql 服务 systemct…

CentOS7下Hive的安装配置

0、安装前提 安装配置jdk与hadoop安装配置好mysql 1、下载上传 下载hive&#xff0c;地址:http://mirror.bit.edu.cn/apache/hive/上传到指定位置scp apache-hive-3.1.2-bin.tar.gz rootCarlota1:/usr/local/apps 2、解压安装 tar -zxvf apache-hive-3.1.2-bin.tar.gzmv a…

Hive常用的操作命令

Hive常用的交互命令 hive 进入数据库hive -e 不进入hive的交互窗口执行sql语句hive -f 执行sql脚本hive -help 查看帮助 Hive常用数据库的操作命令 show databases;查看hive中的所有数据库use default;用default数据库create database myhive ;创建数据库create database if…

Hive常见的属性配置

配置文件 默认配置文件&#xff1a;hive-default.xml 用户自定义配置文件&#xff1a;hive-site.xml 用户自定义配置会覆盖默认配置。另外&#xff0c;Hive也会读入Hadoop的配置&#xff0c;因为Hive是作为Hadoop的客户端启动的&#xff0c;Hive的配置会覆盖Hadoop的配置。配…

什么是集群(cluster)

1、集群 1.1 什么是集群 简单的说&#xff0c;集群(cluster)就是一组计算机&#xff0c;它们作为一个整体向用户提供一组网络资源。这些单个的计算机系统就是集群的节点(node)。一个理想的集群是&#xff0c;用户从来不会意识到集群系统底层的节点&#xff0c;在他/她们看来&am…

Kafka:集群部署

0、环境准备 安装jdk&#xff0c;配置环境提前安装zookeeper 1、解压安装 将tar压缩包上传tar -zxvf kafka_2.12-2.5.0.tgz 2、配置变量环境 vi /etc/profile #kafka export KAFKA_HOME/usr/local/apps/kafka_2.12-2.5.0 export PATH$PATH:$KAFKA_HOME/binsource /etc/pr…

集群(cluster)amp;高可用性(HA)概念

1.1 什么是集群 简单的说&#xff0c;集群&#xff08;cluster&#xff09;就是一组计算机&#xff0c;它们作为一个整体向用户提供一组网络资源。这些单个的计算机系统就是集群的节点&#xff08;node&#xff09;。一个理想的集群是&#xff0c;用户从来不会意识到集群系…

Kafka:常用命令

启动Kafka&#xff1a;kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties创建一个叫test的话题&#xff0c;有两个分区&#xff0c;每个分区3个副本&#xff1a;kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 3 …

MySQL Cluster 群集安装环境介绍

MySQL Cluster 群集安装环境介绍 MySQL 群集支持的操作系统:* Linux (Red Hat, Novell/SUSE) * Sun Solaris * IBM AIX * HP-UX * Mac OS X MySQL 软件:* MySQL Max 版本 (並不是指 MaxDB)* MySQL NDB Cluster 系统最低需求&#xff1a; OS&#xff1a; Linux ( Turbolinux…

八股文打卡day9——计算机网络(9)

面试题&#xff1a;HTTP1.0和HTTP1.1的区别&#xff1f; 我的回答&#xff1a; 1.长连接&#xff1a;HTTP1.1引入了长连接的机制&#xff0c;connection&#xff1a;keep-alive。一个TCP连接可以进行多次请求和响应。而HTTP1.0每次请求响应一次都得建立连接、断开连接。 引入…

使用ogg实现oracle到kafka的增量数据实时同步

Oracle Golden Gate软件是一种基于日志的结构化数据复制备份软件&#xff0c;它通过解析源数据库在线日志或归档日志获得数据的增量变化&#xff0c;再将这些变化应用到目标数据库&#xff0c;从而实现源数据库与目标数据库同步。 0、本篇中源端和目标端的一些配置信息&#xf…

转载:35岁前成功的12条黄金法则

习惯的力量是惊人的。习惯能载着你走向成功&#xff0c;也能驮着你滑向失败。如何选择&#xff0c;完全取决于你自己。 1.习惯的力量&#xff1a;35岁以前养成好习惯 你想成功吗&#xff1f;那就及早培养有利于成功的好习惯。 习惯的力量是惊人的&#xff0c;35岁…

JDK源码解析之 Java.lang.Object

Object类是Java中其他所有类的祖先&#xff0c;没有Object类Java面向对象无从谈起。作为其他所有类的基类&#xff0c;Object具有哪些属性和行为&#xff0c;是Java语言设计背后的思维体现。 Object类位于java.lang包中&#xff0c;java.lang包包含着Java最基础和核心的类&…

将z-blog改成英文blog所遇到的问题

1.将z-blog中文章日期中的“年,月,日”改成英文 相关模板:b_article-multi.htmlb_article-single.html默认用的时间标签是<#article/posttime/longdate#> 即 "2007年1月13日" 这样的形式你可以换成 <#article/posttime/shortdate#>即 "2…

JDK源码解析之 Java.lang.String

String 类代表字符串。Java 程序中的所有字符串字面值&#xff08;如 “abc” &#xff09;都作为此类的实例实现。 字符串是常量&#xff1b;它们的值在创建之后不能更改。字符串缓冲区支持可变的字符串。因为 String 对象是不可变的&#xff0c;所以可以共享。 一、类定义 p…

看到一个blog的标语,有意思!

"上世纪80年代勇气&#xff0c;90年代靠关系&#xff0c;现在必须靠知识能力&#xff01;挣钱靠1、兴趣广泛&#xff1b; 2、感觉敏锐&#xff1b; 3、集中力强&#xff1b; 4、个性不脆弱&#xff08;坚韧性&#xff09;&#xff1b; 5、能在瞬间了解因果关系&#xff1b…

JDK源码解析之 Java.lang.AbstractStringBuilder

这个抽象类是StringBuilder和StringBuffer的直接父类&#xff0c;而且定义了很多方法&#xff0c;因此在学习这两个类之间建议先学习 AbstractStringBuilder抽象类 该类在源码中注释是以JDK1.5开始作为前两个类的父类存在的&#xff0c;可是直到JDK1.8的API中&#xff0c;关于S…

RHEL下安装配置基于2台服务器的MYSQL集群

一、介绍这篇文档旨在介绍如何在RHEL下安装配置基于2台服务器的MySQL集群。并且实现任意一台服务器出现问题或宕机时MySQL依然能够继续运行。 注意&#xff01;虽然这是基于2台服务器的MySQL集群&#xff0c;但也必须有额外的第三台服务器作为管理节点&#xff0c;但这台服务器…