本套数据数据:
通过网盘分享的文件:
链接: https://pan.baidu.com/s/1ZQXoWQhCOAiGVcFOZozV1g?pwd=json 提取码: json
第二部分:大数据程序编写部分(30分)
任务一、数据抽取转化部分(5分)
1、使用sqoop将MySQL的sakila数据库中的city表中city列以C开头的数据导入到hdfs中,可以使用--where 和 --query两种方式。请根据已有的--where方式导入语句:(2分)
sqoop import --connect jdbc:mysql://localhost:3306/sakila \
--username root --P root \
--table city \
--where "city like 'C%'" \
--target-dir /user/sqoop \
-m 1
请补充--query方式的语句:
答:
sqoop import --connect jdbc:mysql://localhost:3306/sakila \
--username root -P root \
--query "select * from city where city like 'C%' and \$CONDITIONS" \
--target-dir /user/sqoop \
--delete-target-dir \
-m 1
说明:
- "\"
代码中的"\"表示续行,一条命令很长,分多行书写 - target-dir:
指定导入时hdfs的路径,必须指定,但无需事先创建,运行时自动创建,如果存在,则可使用参数--delete-target-dir将其删除。
2、请将以下MySQL的demo数据库中的language表以分隔符;导入Hive表h_language中的sqoop语句的缺失部分补充完整。(3分)
sqoop import \
--connect jdbc:mysql://localhost:3306/demo \
--username root \
--password root\
--table ______(1)_______ \
--fields-terminated-by______(2)_____ \
-m 1 \
--hive-import \
--hive-database default \
--hive-table ______(3)________\
--delete-target-dir \
--create-hive-table
答:
(1)language
(2) ;
(3)h_language
任务二、Mapreduce程序操作题目(8分)
题目及分析
数据:userPhone.txt,location.txt,calls.txt
数据calls.txt 通话记录
样例:18620192711,15733218050,1506628174,1506628265,650000,810000
字段分别为:
呼叫者手机号,接受者手机号,开始时间戳,结束时间戳,呼叫者地址省份编码,接受者地址省份编码
数据location.txt 地区编码对应表
样例:1,110000,北京市
字段分别为:
地址id,省份编码,省份名称
数据userPhone.txt 是手机号与姓名对应表
样例:26,13799999999,菜中路
字段分别为:
电话ID,电话号码,姓名
注意: 文本的编码为 UTF-8
请根据要求把 通话记录表 转换为 新的格式数据。
要求把 通话记录表 呼叫者手机号,接受者手机号 替换为 姓名,
开始时间与结束时间 转换成时间格式为 yyyy-MM-dd HH:mm:ss,例如2017-03-29 10:58:12;
计算通话时间,并以秒做单位 计算为 通话时间=结束时间-开始时间
将呼叫者地址省份编码,接受者地址省份编码 替换成省份名称
- 将电话号码替换成人名 (2分 )
- 将拨打、接听电话的时间戳转换成日期 (2分 )
- 求出电话的通话时间,以秒做单位 (2分 )
- 将省份编码替换成省份名称 (2分 )
- 最后数据的样例:
邓二,张倩,2017-03-29 10:58:12,2017-03-29 10:58:42,30秒,黑龙江省,上海市
解题:
1、在windows上配置本地hadoop运行环境
通过网盘分享的文件:apache-hadoop-3.1.3-winutils-master.zip
链接: https://pan.baidu.com/s/1Q9_JgWQ4fnIJfdnayt3SdQ 提取码: json
配置环境变量:
(1)新建环境变量:
变量名:HADOOP_HOME
变量值:解压后的主目录
(2)hadoop的bin目录path环境变量:%HADOOP_HOME%\bin
测试:
'WIN+R'->'cmd'->'winutils'
出现使用说明,证明hadoop环境没问题。如果出现错误,缺少dll文件,请将bin目录中'hadoop.dll'复制到C:\Windows\System32
中。
2、新建maven项目
(1)新建项目
在IDEA中,选择File->New->Project->Maven->...->Finish
。
(2)配置依赖
pom.xml
<dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency></dependencies>
3、新建Mapper类
com.hadoop.mapreduce.calls.CallsMapper.java
点击查看代码
package com.hadoop.mapreduce.calls;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;public class CallsMapper extends Mapper<LongWritable, Text,Text,Text> {Map<String,String> mapPhone =new HashMap<String,String>();//手机号和姓名的映射Map<String,String> mapLocation =new HashMap<String,String>();//位置编码和位置名称的映射//setup()方法中映射后在map()方法中使用@Overrideprotected void setup(Context context) throws IOException, InterruptedException {String line;FileSystem fs=null;//hadoop文件系统try {fs = FileSystem.get(new URI("hdfs://bigdata2021master:9000"), new Configuration());} catch (URISyntaxException e) {e.printStackTrace();}//hdfs上的数据文件所在的路径,事先准备建好Path path1 = new Path("/input/calls/userPhone.txt");FSDataInputStream fsInputStream = fs.open(path1);BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(fsInputStream));while ((line=bufferedReader.readLine())!=null){//每次读一行,如:"7,18000696806,赵贺彪"String[] words = line.split(",");mapPhone.put(words[1],words[2]);//手机号和姓名映射}Path path2 = new Path("/input/calls/location.txt");fsInputStream = fs.open(path2);bufferedReader = new BufferedReader(new InputStreamReader(fsInputStream));while ((line=bufferedReader.readLine())!=null){//"1,110000,北京市"String[] words = line.split(",");mapLocation.put(words[1],words[2]);//位置编码和位置名称映射}}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line=value.toString();//value为map第1阶段读入的一行值,从calls.txt中读String[] words = line.split(",");//由手机号得到姓名String name1= mapPhone.get(words[0]);String name2= mapPhone.get(words[1]);//时间戳转换日期时间,数据文件中时间戳单位应该是秒,需*1000变成毫秒Date time1=new Date(Long.parseLong(words[2])*1000L);Date time2=new Date(Long.parseLong(words[3])*1000L);//变成规范的日期格式SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String t1=simpleDateFormat.format(time1);String t2=simpleDateFormat.format(time2);//时间差(秒),2个时间戳相减String t3=Long.parseLong(words[3])-Long.parseLong(words[2])+"";//由位置编码得位置名称String location1=mapLocation.get(words[4]);String location2=mapLocation.get(words[5]);//姓名拼接作为输出的keyString outKey=name1+","+name2+",";//时间、位置拼接作为输出的valueString outValue="";outValue+=t1+","+t2+","+t3+","+location1+","+location2;System.out.println(outKey+outValue);context.write(new Text(outKey),new Text(outValue));//输出,本题没有reduce阶段。}
}
4、新建驱动类
com.hadoop.mapreduce.calls.CallsDriver.java
点击查看代码
package com.hadoop.mapreduce.calls;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class CallsDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//配置对象confConfiguration conf = new Configuration();//作业对象jobJob job=Job.getInstance(conf);//设置jar包类,即驱动类(main所在的类)job.setJarByClass(CallsDriver.class);//设置map类job.setMapperClass(CallsMapper.class);//设置map输出键的数据类型job.setMapOutputKeyClass(Text.class);//设置map输出值的数据类型job.setMapOutputValueClass(Text.class);//设置最终输出键的数据类型job.setOutputKeyClass(Text.class);//设置最终输出值的数据类型job.setOutputValueClass(Text.class);//设置数据文件的路径FileInputFormat.setInputPaths(job,new Path("hdfs://bigdata2021master:9000/input/calls/calls.txt"));//设置输出目录的路径,如果存在则要删除FileOutputFormat.setOutputPath(job,new Path("hdfs://bigdata2021master:9000/output/calls/1"));//是否执行完毕boolean result = job.waitForCompletion(true);System.exit(result?0:1);//0表示正常退出,其他表示异常退出}
}
5、打包运行
(1)maven打包
IDEA打开右侧Maven窗口,选择"项目名"->"lifeCycle"->双击"package",完成后,在项目中"target"目录中就会出现jar包。如:"target/hadoop-1.0-SNAPSHOT.jar"。
(2)在hdfs创建目录和上传数据文件
数据文件要先传到linux
hdfs dfs -mkdir -p /input/calls
hdfs dfs -mkdir -p /output/calls
hdfs dfs -put userPhone.txt /input/calls/
hdfs dfs -put location.txt /input/calls/
hdfs dfs -put calls.txt /input/calls/
(3)运行jar包
hadoop jar hadoop-1.0-SNAPSHOT.jar com.hadoop.mapreduce.calls.CallsDriver
(4)查看结果
hdfs dfs -ls -R /output
hdfs dfs -cat /output/calls/1/part-r-00000
如果要再次运行,请在hdfs上删除"1"这个目录
(5)截图