ODPS MR开发 WordCount


参考:
ODPS初始篇--客户端配置和使用:http://blog.itpub.net/26613085/viewspace-1327313/
odps dship客户端使用:http://blog.itpub.net/26613085/viewspace-1328434/
有了上面两篇文章,就可以使用ODPS的客户端;使用ODPS DSHIP往ODPS上上传数据。

1、 在Eclipse中创建一个JAVA工程ODPS_WORD_COUNT
2、 下载ODPS JAVA SDK包,http://www.aliyun.com/product/odps/,“开发者资源”,“JAVA SDK包”,解压缩。把这些jar包搜罗到一个文件夹下吧,等下添加的时候比较省事。

3、 在工程中添加ODPS SDK的JAR包,需要将这些JAR文件添加到Eclipse Project的Build Path中:选中ODPS_WORD_COUNT,右键Bulid Path,Configure Build Path,Java Build Path,Libraries,Add External JARs,选中JAR,然后OK就可以。把上面JAVA SDK包中的Jar文件都添加进来。 (如果想要在本地测试MR程序,需要把ODPS CLT里面的jar包都添加到项目中)
4、 在ODPS中准备一张表,yangsw_test.word_count.
odps:sql:yangsw_test> create table word_count(content string);
InstanceId: 2014111201412840gqtigdx5
OK
odps:sql:yangsw_test> desc word_count;
+------------------------------------------------------------------------------------+
| Table: word_count                                                                  |
| Owner: ALIYUN$******@***.com     | Project: yangsw_test                       |
| TableComment:                                                                      |
+------------------------------------------------------------------------------------+
| CreatedTime:              2014-11-12 09:41:28                                      |
| LastMetaModifiedTime:     2014-11-12 09:41:28                                      |
| LastDataModifiedTime:     1970-01-01 08:00:00                                      |
+------------------------------------------------------------------------------------+
| Type : Table                 | Size: 0 Bytes                                       |
+------------------------------------------------------------------------------------+
| Native Columns:                                                                    |
+------------------------------------------------------------------------------------+
| Field           | Type       | Comment                                             |
+------------------------------------------------------------------------------------+
| content         | STRING     |                                                     |
+------------------------------------------------------------------------------------+

5、 准备数据文件word_count.txt
hello word
hello odps
let us begin cloud compute now
6、 dship导入到word_count表
dship upload -fd "~" -rd "\r\n" C:\Users\yangswa\Desktop\word_count.txt word_count
Upload session: 201411120951352581870a0025cfcc
2014-11-12 09:51:06     scanning file: 'word_count.txt'
2014-11-12 09:51:06     uploading file: 'word_count.txt'
2014-11-12 09:51:07     'word_count.txt' uploaded
OK


7、 查看下这张表的数据
odps:sql:yangsw_test> select * from word_count;
InstanceId: 2014111214471814gabmgdx5
SQL: .
+---------+
| content |
+---------+
| hello word |
| hello odps |
| let us begin cloud compute now |
+---------+
8、 再创建一个结果表
odps:sql:yangsw_test> create table word_count_result(word string,count bigint);
InstanceId: 20141112074523943g9l5pdx5
OK
odps:sql:yangsw_test> desc word_count_result;
+------------------------------------------------------------------------------------+
| Table: word_count_result                                                           |
| Owner: ALIYUN$**********@126.com     | Project: yangsw_test                       |
| TableComment:                                                                      |
+------------------------------------------------------------------------------------+
| CreatedTime:              2014-11-12 15:45:24                                      |
| LastMetaModifiedTime:     2014-11-12 15:45:24                                      |
| LastDataModifiedTime:     1970-01-01 08:00:00                                      |
+------------------------------------------------------------------------------------+
| Type : Table                 | Size: 0 Bytes                                       |
+------------------------------------------------------------------------------------+
| Native Columns:                                                                    |
+------------------------------------------------------------------------------------+
| Field           | Type       | Comment                                             |
+------------------------------------------------------------------------------------+
| word            | STRING     |                                                     |
| count           | BIGINT     |                                                     |
+------------------------------------------------------------------------------------+
9、 实现程序
先建立一个包吧,com.thomas.odps
先借用官方的一个WordCount.java代码,下一篇文章将这个类改写成mapper、reducer和main类分开的写法。这次先把流程都走通。
package com.thomas.odps;

import java.io.IOException;
import java.util.Iterator;
import com.aliyun.odps.Column;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.OdpsType;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.RunningJob;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;


public class WordCount {

 public static class TokenizerMapper extends MapperBase {
  Record word;
  Record one;

  @Override
  public void setup(TaskContext context) throws IOException {
   word = context.createMapOutputKeyRecord();
   one = context.createMapOutputValueRecord();
   one.setBigint(0, 1L);
  }

  @Override
  public void map(long recordNum, Record record, TaskContext context)
    throws IOException {
   for (int i = 0; i < record.getColumnCount(); i++) {
    String[] words = record.get(i).toString().split("\\s+");
    for (String w : words) {
     word.setString(0, w);
     context.write(word, one);
    }
   }
  }

  public static class SumCombiner extends ReducerBase {
   private Record count;

   @Override
   public void setup(TaskContext context) throws IOException {
    count = context.createMapOutputValueRecord();
   }

   @Override
   public void reduce(Record key, Iterator<Record> values,
     TaskContext context) throws IOException {
    long c = 0;
    while (values.hasNext()) {
     Record val = values.next();
     c += (Long) val.get(0);
    }
    count.set(0, c);
    context.write(key, count);
   }
  }

  public static class SumReducer extends ReducerBase {
   private Record result;

   @Override
   public void setup(TaskContext context) throws IOException {
    result = context.createOutputRecord();
   }

   @Override
   public void reduce(Record key, Iterator<Record> values,
     TaskContext context) throws IOException {
    long count = 0;
    while (values.hasNext()) {
     Record val = values.next();
     count += (Long) val.get(0);
    }
    result.set(0, key.get(0));
    result.set(1, count);
    context.write(result);
   }
  }

  public static void main(String[] args) throws OdpsException {
   if (args.length != 2) {
    System.err.println("Usage: wordcount <in_table> <out_table>");
    System.exit(2);
   }
   JobConf job = new JobConf();
   job.setMapperClass(TokenizerMapper.class);
   job.setCombinerClass(SumCombiner.class);
   job.setReducerClass(SumReducer.class);
   job.setMapOutputKeySchema(new Column[] { new Column("word",
     OdpsType.STRING) });
   job.setMapOutputValueSchema(new Column[] { new Column("count",
     OdpsType.BIGINT) });
   InputUtils.addTable(TableInfo.builder().tableName(args[0]).build(), job);
   OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job);
   RunningJob rj = JobClient.runJob(job);
   rj.waitForCompletion();
  }
 }
}
10、 本地测试
WordCount.java右键,“Run As”,“Run Configurations …”:

Main”,”Project”:
ODPS_WORD_COUNT
Main”,”Main class”:
com.thomas.odps.WordCount$TokenizerMapper
(这个本来应该是com.thomas.odps.WordCount但是系统自动带出来上面的,改掉在本地执行时还报错因此就设置成上面的吧)
Arguments”,”Program arguments”:
word_count word_count_result
Arguments”,”VM arguments”:
-Dodps.runner.mode=local
-Dodps.project.name=yangsw_test
-Dodps.end.point=http://service.odps.aliyun.com/api
-Dodps.access.id=**********
-Dodps.access.key=**********
配置好后点击“Run”,启动本地测试

控制台输出:
Running open mr on old console.
2014-11-13 16:15:35 com.aliyun.odps.mapred.LocalJobRunner submit
信息: run mapreduce job in local mode
2014-11-13 16:15:35 com.aliyun.odps.mapred.LocalJobRunner submit
信息: job id: mr_20141113161535_909_4984
2014-11-13 16:15:35 com.aliyun.odps.mapred.LocalJobRunner processInput
信息: Processing input: word_count
2014-11-13 16:15:36 com.aliyun.odps.mapred.local.utils.SchemaUtils generateSchemaFile
信息: generate schema file: D:\MyEclipseWorkStation\ODPS_WORD_COUNT\warehouse\yangsw_test\word_count\__schema__
2014-11-13 16:15:36 com.aliyun.odps.mapred.local.utils.LocalRunUtils downloadTableSchemeAndData
信息: Create table scheme of word_count, Path: D:\MyEclipseWorkStation\ODPS_WORD_COUNT\warehouse\yangsw_test\word_count
2014-11-13 16:15:36 com.aliyun.odps.mapred.local.utils.LocalRunUtils downloadTable
信息: Start to download table: yangsw_test.word_count partSpec: null to D:\MyEclipseWorkStation\ODPS_WORD_COUNT\warehouse\yangsw_test\word_count
2014-11-13 16:15:37 com.aliyun.odps.mapred.local.utils.SchemaUtils generateSchemaFile
信息: generate schema file: D:\MyEclipseWorkStation\ODPS_WORD_COUNT\temp\mr_20141113161535_909_4984\input\yangsw_test\word_count\__schema__
2014-11-13 16:15:37 com.aliyun.odps.mapred.local.utils.SchemaUtils generateSchemaFile
信息: generate schema file: D:\MyEclipseWorkStation\ODPS_WORD_COUNT\temp\mr_20141113161535_909_4984\output\__default__\__schema__
2014-11-13 16:15:37 com.aliyun.odps.mapred.LocalJobRunner runJob
信息: Start to run mappers, num: 1
2014-11-13 16:15:37 com.aliyun.odps.mapred.local.MapDriver <init>
信息: Map M_000000: input: temp\mr_20141113161535_909_4984\input\yangsw_test\word_count\data:0+56
2014-11-13 16:15:37 com.aliyun.odps.mapred.local.MapDriver <init>
信息: create record reader: finished
2014-11-13 16:15:37 com.aliyun.odps.mapred.local.MapDriver run
信息: Start to run Mapper: M_000000
2014-11-13 16:15:37 com.aliyun.odps.mapred.local.MapDriver$ProxiedMapContextImpl close
信息: Start to run Combiner
2014-11-13 16:15:37 com.aliyun.odps.mapred.local.MapDriver$ProxiedMapContextImpl close
信息: Fininshed run Combiner
2014-11-13 16:15:37 com.aliyun.odps.mapred.local.MapDriver run
信息: Fininshed run Reducer: M_000000
2014-11-13 16:15:37 com.aliyun.odps.mapred.LocalJobRunner runJob
信息: Start to run reduces, num: 1
2014-11-13 16:15:37 com.aliyun.odps.mapred.local.ReduceDriver run
信息: Start to run Reducer: R_000000
2014-11-13 16:15:37 com.aliyun.odps.mapred.local.ReduceDriver run
信息: Fininshed run Reducer: R_000000
2014-11-13 16:15:37 com.aliyun.odps.mapred.LocalJobRunner moveOutputs
信息: Copy output to warehouse: label=__default__ -> D:\MyEclipseWorkStation\ODPS_WORD_COUNT\warehouse\yangsw_test\word_count_result
Summary:
counters: 8
 map-reduce framework
  combine_input_groups=9
  combine_output_records=9
  map_input_bytes=56
  map_input_records=3
  map_output_records=10
  reduce_output_[word_count_result]_bytes=74
  reduce_output_[word_count_result]_records=9

OK
InstanceId: mr_20141113161535_909_4984

执行完成后,会在该工程的目录下面多了如下目录结构:
./temp
./ warehouse
/ yangsw_test
    /word_count
        / __schema__
        / data
    /word_count_result
        / __schema__
        / R_000000
其中word_count下面的__schema__内容包含word_count源表的项目和表结构:
project=yangsw_test
table=word_count
columns=content:STRING
word_count下面的data是word_count表的数据:
hello word
hello odps
let us begin cloud compute now
word_count_result下面的__schema__是目标表的项目和表结构:
project=yangsw_test
table=word_count_result
columns=word:STRING,count:BIGINT
word_count_result下面的R_000000是输出结果,本次只有一个reduce所以只有一个文件:
begin,1
cloud,1
compute,1
hello,2
let,1
now,1
odps,1
us,1
word,1
上面就是wordcount本地运行的结果了
11、 发布jar包
create resource jar C:\Users\yangswa\Desktop\word_count.jar word_count.jar
12、 在服务端运行
odps:yangsw_test> jar -resources word_count.jar -classpath C:\Users\yangswa\Desktop\word_count.jar com.thomas.odps.WordCount
word_count word_count_result
Exception in thread "main" java.lang.RuntimeException: java.lang.NoSuchMethodException: com.thomas.odps.WordCount.main([Ljava
.lang.String;)
        at com.aliyun.odps.mapred.cli.Cli.main(Cli.java:47)
Caused by: java.lang.NoSuchMethodException: com.thomas.odps.WordCount.main([Ljava.lang.String;)
        at java.lang.Class.getMethod(Unknown Source)
        at com.aliyun.odps.mapred.cli.Cli.main(Cli.java:35)
这个居然报错:main方法不存在,在odps用户中心的售后服务中,提交了个工单,得到的反馈是ODPS MR功能还没有正式开放,要等到12月底才能正式使用。好吧,就先等等吧.

另外一个需要注意的地方时,-classpath要制定本地jar文件的位置,而不是只是指向资源。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/352614.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

跨域设置

# 参考&#xff1a; https://blog.csdn.net/linzi1994/article/details/82724429 https://www.cnblogs.com/ShaunChen/p/5998800.html# 配置详解&#xff1a; # 跨域设置 CORS_ORIGIN_ALLOW_ALL True # 允许所有域名 CORS_ALLOW_CREDENTIALS True # 允许获取cookie CORS_UR…

kdb q介绍

Q起源Kx systems公司的创始人之一Arthur Whitney在2003年研发了列式数据库KDB和它的操作语言q。q也可以写成Q。设计之初&#xff0c;q语言要做到简洁&#xff0c;高效和富表达性。 q的起源受到多种语言的启示。包括APL、LISP和函数式编程。 APL是一个向量语言&#xff0c;所以…

mysql多大_洞悉MySQL底层架构:游走在缓冲与磁盘之间

提起MySQL&#xff0c;其实网上已经有一大把教程了&#xff0c;为什么我还要写这篇文章呢&#xff0c;大概是因为网上很多网站都是比较零散&#xff0c;而且描述不够直观&#xff0c;不能系统对MySQL相关知识有一个系统的学习&#xff0c;导致不能形成知识体系。为此我撰写了这…

odps新手上路之安装Eclipse开发环境

只看楼主更多操作楼主 发表于: 2014-11-07—本帖被 亮伟 执行取消精华操作(2014-11-27)—准备工作&#xff1a;登录阿里云官网&#xff0c;开通odps服务。创建一个odps的project。安装odps的客户端&#xff0c;具体的方法见 传送门下载Eclipse并解压缩。建议到官网上下载。作…

mfc对话框在不同计算机上显示不全

出现&#xff2d;&#xff26;&#xff23;对话框在不同计算机上显示不全的或者显示大小不一样的活可以调整电脑的分辨率也可以在创建对话框时根据分辨率来调整控件。

c++二进制转十进制_二进制,八进制,十进制,十六进制转换详解~

点 击 上 方 蓝 字 关 注 我 们 哦 ^-^本文思维导图&#xff1a;1.数制&#xff1a;用一组固定的数字和一套统一的规则来表示数目的方法称为数制。进位计数制的要素&#xff1a;①、数码&#xff1a;用来表示进制数的元素。二进制&#xff1a;0,1。八进制&#xff1a;0,1,2,3,4…

复习第三章多态

第三章 多态 本章需要掌握的东西 掌握多态的优势和应用场合答&#xff1a;多态的优势可以减少代码量&#xff0c;更加清晰明了。&#xff0c;应用场合为&#xff0c;一个事物需要做多种事情的时候需要用到多态。 2.掌握父类和子类之间的类型转换 答&#xff1a;父类引用转向子…

java堆 数据结构 堆_Java中的紧凑堆外结构/组合

java堆 数据结构 堆在上一篇文章中&#xff0c;我详细介绍了代码对主内存的访问方式的含义。 从那时起&#xff0c;我就在Java中可以做什么以实现更可预测的内存布局提出了很多疑问。 有些模式可以使用数组支持的结构来应用&#xff0c;我将在另一篇文章中讨论。 这篇文章将探讨…

Unix环境高级编程学习笔记(七) 多线程

线程概述 线程&#xff08;thread&#xff09;技术早在60年代就被提出&#xff0c;但真正应用多线程到操作系统中去&#xff0c;是在80年代中期&#xff0c;solaris是这方面的佼佼者。传统的Unix也支持线程的概念&#xff0c;但是在一个进程&#xff08;process&#xff09;中只…

testflight开发者已将您从测试计划中移除_使用 TestFlight 测?试 App

目前市面上对于iOS开发的签名样式大致分为三种&#xff1a; a、企业签 依赖于苹果企业级开发者账号b、超级签 依赖于苹果个人开发者账号c、tf签 TestFlight Beta 版测试让您可以分发您 App 的 Beta 版构建版本给测试员并收集反馈。您可以在您的 App Store Connect 帐户中一次为…

MFC操作ini文件方法

转自&#xff1a;http://blog.csdn.net/rayborn1105/article/details/8192142 一个不错的接口&#xff1a;http://blog.csdn.net/qq575787460/article/details/8185339 在我们的程序设计中经常需要对一些参数进行配置&#xff0c;配置好后还要在下一次启动仍然有效&#xff0c;…

Java 8流中的数据库CRUD操作

在开始使用新工具时要克服的最大障碍是让您着手处理小事情。 到目前为止&#xff0c;您可能对新的Java 8 Stream API的工作方式充满信心&#xff0c;但是您可能尚未将其用于数据库查询。 为了帮助您开始使用Stream API创建&#xff0c;修改和读取SQL数据库&#xff0c;我整理了…

网络时间同步

linux yum install ntp ntpdate -y 时间同步命令&#xff1a;ntpdate time.windows.com 开机启动: chkconfig ntpd on 查看开机启动&#xff1a;chkconfig --list ntpd ntpd 0:off 1:off 2:on 3:on 4:on 5:on 6:off 转载于:https://www.cnblogs.com/…

java 权限url权限_SpringBootSecurity学习(11)网页版登录之URL动态权限

动态权限前面讨论用户登录认证的时候&#xff0c;根据用户名查询用户会将用户拥有的角色一起查询出来&#xff0c;自动实现判断当前登录用户拥有哪些角色。可以说用户与角色之间的动态配置和判断security做的非常不错。不过在配置方法级别的权限的时候&#xff0c;使用注解虽然…

线程使用

嵌入式中线程应用还是看需求&#xff0c;一般不常用&#xff08;在不会使用的情况下&#xff09;一、编译有线程的应用程序需要编译时指定编译lib库 &#xff08; -l pthread&#xff09; 如&#xff1a;gcc main.c -o main -l pthread 才能编译通过。二、线程使用。1、线程运行…

C++ MFC string转Cstring为什么会乱码

前段时间学习mfc编了一个小程序&#xff0c;其中涉及到CString 与string的转换的时候感觉特别蛋疼&#xff0c;因此再此总结一下经验。希望能够对大家能有所帮助 通常有两种字符集模式 unicode字符集 和 ascii字符集&#xff0c;其中unicode有多种编码方式。utf8&#xff0c; …

metaq原理简介

1. 前言 本文档旨在描述RocketMQ的多个关键特性的实现原理&#xff0c;并对消息中间件遇到的各种问题进行总结&#xff0c;阐述RocketMQ如何解决这些问题。文中主要引用了JMS规范与CORBA Notification规范&#xff0c;规范为我们设计系统指明了方向&#xff0c;但是仍有不少问题…

3d展示网页开发_超实用:一篇文章带你了解市面上主流通用的3D模型格式

说到格式&#xff0c;相信大家都不陌生。随着互联网的普及&#xff0c;我们几乎每天都会和不同的格式打交道&#xff0c;文本的TXT、图片的JPG、视频的MP4&#xff0c;就连压缩包也有不同的格式。通俗来说&#xff0c;你可以把“格式”理解成基于同一规范的技术表征&#xff0c…

java三件套_Java开发人员应该知道的三件事

java三件套对于那些长期关注JavaOne 2012会议的读者来说&#xff0c;这是一篇有趣的文章。 我最近对Java冠军Heinz Kabutz的采访引起了我的注意&#xff1b; 包括他的Java内存难题程序&#xff0c;从Java内存管理的角度来看&#xff0c;这很有启发性。 采访中有一个特别的部分吸…

Python 的变量作用域和 LEGB 原则

在 Python 程序中创建、改变或查找变量名时&#xff0c;都是在一个保存变量名的地方进行中&#xff0c;那个地方我们称之为命名空间。作用域这个术语也称之为命名空间。 具体地说&#xff0c;在代码中变量名被赋值&#xff08;Python 中变量声明即赋值&#xff0c;global 声明的…