参考:
 ODPS初始篇--客户端配置和使用:http://blog.itpub.net/26613085/viewspace-1327313/
 odps dship客户端使用:http://blog.itpub.net/26613085/viewspace-1328434/
 有了上面两篇文章,就可以使用ODPS的客户端;使用ODPS DSHIP往ODPS上上传数据。
1、 在Eclipse中创建一个JAVA工程ODPS_WORD_COUNT
 2、 下载ODPS JAVA SDK包,http://www.aliyun.com/product/odps/,“开发者资源”,“JAVA SDK包”,解压缩。把这些jar包搜罗到一个文件夹下吧,等下添加的时候比较省事。
3、 在工程中添加ODPS SDK的JAR包,需要将这些JAR文件添加到Eclipse Project的Build Path中:选中ODPS_WORD_COUNT,右键Bulid Path,Configure Build Path,Java Build Path,Libraries,Add External JARs,选中JAR,然后OK就可以。把上面JAVA SDK包中的Jar文件都添加进来。 (如果想要在本地测试MR程序,需要把ODPS CLT里面的jar包都添加到项目中)
 4、 在ODPS中准备一张表,yangsw_test.word_count.
 odps:sql:yangsw_test> create table word_count(content string);
 InstanceId: 2014111201412840gqtigdx5
 OK
 odps:sql:yangsw_test> desc word_count;
 +------------------------------------------------------------------------------------+
 | Table: word_count                                                                  |
 | Owner: ALIYUN$******@***.com     | Project: yangsw_test                       |
 | TableComment:                                                                      |
 +------------------------------------------------------------------------------------+
 | CreatedTime:              2014-11-12 09:41:28                                      |
 | LastMetaModifiedTime:     2014-11-12 09:41:28                                      |
 | LastDataModifiedTime:     1970-01-01 08:00:00                                      |
 +------------------------------------------------------------------------------------+
 | Type : Table                 | Size: 0 Bytes                                       |
 +------------------------------------------------------------------------------------+
 | Native Columns:                                                                    |
 +------------------------------------------------------------------------------------+
 | Field           | Type       | Comment                                             |
 +------------------------------------------------------------------------------------+
 | content         | STRING     |                                                     |
 +------------------------------------------------------------------------------------+
5、 准备数据文件word_count.txt
 hello word
 hello odps
 let us begin cloud compute now
 6、 dship导入到word_count表
 dship upload -fd "~" -rd "\r\n" C:\Users\yangswa\Desktop\word_count.txt word_count
 Upload session: 201411120951352581870a0025cfcc
 2014-11-12 09:51:06     scanning file: 'word_count.txt'
 2014-11-12 09:51:06     uploading file: 'word_count.txt'
 2014-11-12 09:51:07     'word_count.txt' uploaded
 OK
 7、 查看下这张表的数据
 odps:sql:yangsw_test> select * from word_count;
 InstanceId: 2014111214471814gabmgdx5
 SQL: .
 +---------+
 | content |
 +---------+
 | hello word |
 | hello odps |
 | let us begin cloud compute now |
 +---------+
 8、 再创建一个结果表
 odps:sql:yangsw_test> create table word_count_result(word string,count bigint);
 InstanceId: 20141112074523943g9l5pdx5
 OK
 odps:sql:yangsw_test> desc word_count_result;
 +------------------------------------------------------------------------------------+
 | Table: word_count_result                                                           |
 | Owner: ALIYUN$**********@126.com     | Project: yangsw_test                       |
 | TableComment:                                                                      |
 +------------------------------------------------------------------------------------+
 | CreatedTime:              2014-11-12 15:45:24                                      |
 | LastMetaModifiedTime:     2014-11-12 15:45:24                                      |
 | LastDataModifiedTime:     1970-01-01 08:00:00                                      |
 +------------------------------------------------------------------------------------+
 | Type : Table                 | Size: 0 Bytes                                       |
 +------------------------------------------------------------------------------------+
 | Native Columns:                                                                    |
 +------------------------------------------------------------------------------------+
 | Field           | Type       | Comment                                             |
 +------------------------------------------------------------------------------------+
 | word            | STRING     |                                                     |
 | count           | BIGINT     |                                                     |
 +------------------------------------------------------------------------------------+
 9、 实现程序
 先建立一个包吧,com.thomas.odps
 先借用官方的一个WordCount.java代码,下一篇文章将这个类改写成mapper、reducer和main类分开的写法。这次先把流程都走通。
 package com.thomas.odps;
import java.io.IOException;
 import java.util.Iterator;
 import com.aliyun.odps.Column;
 import com.aliyun.odps.OdpsException;
 import com.aliyun.odps.OdpsType;
 import com.aliyun.odps.data.Record;
 import com.aliyun.odps.data.TableInfo;
 import com.aliyun.odps.mapred.JobClient;
 import com.aliyun.odps.mapred.MapperBase;
 import com.aliyun.odps.mapred.ReducerBase;
 import com.aliyun.odps.mapred.RunningJob;
 import com.aliyun.odps.mapred.conf.JobConf;
 import com.aliyun.odps.mapred.utils.InputUtils;
 import com.aliyun.odps.mapred.utils.OutputUtils;
 public class WordCount {
 public static class TokenizerMapper extends MapperBase {
   Record word;
   Record one;
  @Override
   public void setup(TaskContext context) throws IOException {
    word = context.createMapOutputKeyRecord();
    one = context.createMapOutputValueRecord();
    one.setBigint(0, 1L);
   }
  @Override
   public void map(long recordNum, Record record, TaskContext context)
     throws IOException {
    for (int i = 0; i < record.getColumnCount(); i++) {
     String[] words = record.get(i).toString().split("\\s+");
     for (String w : words) {
      word.setString(0, w);
      context.write(word, one);
     }
    }
   }
  public static class SumCombiner extends ReducerBase {
    private Record count;
   @Override
    public void setup(TaskContext context) throws IOException {
     count = context.createMapOutputValueRecord();
    }
   @Override
    public void reduce(Record key, Iterator<Record> values,
      TaskContext context) throws IOException {
     long c = 0;
     while (values.hasNext()) {
      Record val = values.next();
      c += (Long) val.get(0);
     }
     count.set(0, c);
     context.write(key, count);
    }
   }
  public static class SumReducer extends ReducerBase {
    private Record result;
   @Override
    public void setup(TaskContext context) throws IOException {
     result = context.createOutputRecord();
    }
   @Override
    public void reduce(Record key, Iterator<Record> values,
      TaskContext context) throws IOException {
     long count = 0;
     while (values.hasNext()) {
      Record val = values.next();
      count += (Long) val.get(0);
     }
     result.set(0, key.get(0));
     result.set(1, count);
     context.write(result);
    }
   }
  public static void main(String[] args) throws OdpsException {
    if (args.length != 2) {
     System.err.println("Usage: wordcount <in_table> <out_table>");
     System.exit(2);
    }
    JobConf job = new JobConf();
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(SumCombiner.class);
    job.setReducerClass(SumReducer.class);
    job.setMapOutputKeySchema(new Column[] { new Column("word",
      OdpsType.STRING) });
    job.setMapOutputValueSchema(new Column[] { new Column("count",
      OdpsType.BIGINT) });
    InputUtils.addTable(TableInfo.builder().tableName(args[0]).build(), job);
    OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job);
    RunningJob rj = JobClient.runJob(job);
    rj.waitForCompletion();
   }
  }
 }
 10、 本地测试
 WordCount.java右键,“Run As”,“Run Configurations …”:
“Main”,”Project”: 
 ODPS_WORD_COUNT
 “Main”,”Main class”:
 com.thomas.odps.WordCount$TokenizerMapper
 (这个本来应该是com.thomas.odps.WordCount但是系统自动带出来上面的,改掉在本地执行时还报错因此就设置成上面的吧)
 “Arguments”,”Program arguments”:
 word_count word_count_result
 “Arguments”,”VM arguments”:
 -Dodps.runner.mode=local
 -Dodps.project.name=yangsw_test
 -Dodps.end.point=http://service.odps.aliyun.com/api
 -Dodps.access.id=**********
 -Dodps.access.key=**********
 配置好后点击“Run”,启动本地测试
控制台输出:
 Running open mr on old console.
 2014-11-13 16:15:35 com.aliyun.odps.mapred.LocalJobRunner submit
 信息: run mapreduce job in local mode
 2014-11-13 16:15:35 com.aliyun.odps.mapred.LocalJobRunner submit
 信息: job id: mr_20141113161535_909_4984
 2014-11-13 16:15:35 com.aliyun.odps.mapred.LocalJobRunner processInput
 信息: Processing input: word_count
 2014-11-13 16:15:36 com.aliyun.odps.mapred.local.utils.SchemaUtils generateSchemaFile
 信息: generate schema file: D:\MyEclipseWorkStation\ODPS_WORD_COUNT\warehouse\yangsw_test\word_count\__schema__
 2014-11-13 16:15:36 com.aliyun.odps.mapred.local.utils.LocalRunUtils downloadTableSchemeAndData
 信息: Create table scheme of word_count, Path: D:\MyEclipseWorkStation\ODPS_WORD_COUNT\warehouse\yangsw_test\word_count
 2014-11-13 16:15:36 com.aliyun.odps.mapred.local.utils.LocalRunUtils downloadTable
 信息: Start to download table: yangsw_test.word_count partSpec: null to D:\MyEclipseWorkStation\ODPS_WORD_COUNT\warehouse\yangsw_test\word_count
 2014-11-13 16:15:37 com.aliyun.odps.mapred.local.utils.SchemaUtils generateSchemaFile
 信息: generate schema file: D:\MyEclipseWorkStation\ODPS_WORD_COUNT\temp\mr_20141113161535_909_4984\input\yangsw_test\word_count\__schema__
 2014-11-13 16:15:37 com.aliyun.odps.mapred.local.utils.SchemaUtils generateSchemaFile
 信息: generate schema file: D:\MyEclipseWorkStation\ODPS_WORD_COUNT\temp\mr_20141113161535_909_4984\output\__default__\__schema__
 2014-11-13 16:15:37 com.aliyun.odps.mapred.LocalJobRunner runJob
 信息: Start to run mappers, num: 1
 2014-11-13 16:15:37 com.aliyun.odps.mapred.local.MapDriver <init>
 信息: Map M_000000: input: temp\mr_20141113161535_909_4984\input\yangsw_test\word_count\data:0+56
 2014-11-13 16:15:37 com.aliyun.odps.mapred.local.MapDriver <init>
 信息: create record reader: finished
 2014-11-13 16:15:37 com.aliyun.odps.mapred.local.MapDriver run
 信息: Start to run Mapper: M_000000
 2014-11-13 16:15:37 com.aliyun.odps.mapred.local.MapDriver$ProxiedMapContextImpl close
 信息: Start to run Combiner
 2014-11-13 16:15:37 com.aliyun.odps.mapred.local.MapDriver$ProxiedMapContextImpl close
 信息: Fininshed run Combiner
 2014-11-13 16:15:37 com.aliyun.odps.mapred.local.MapDriver run
 信息: Fininshed run Reducer: M_000000
 2014-11-13 16:15:37 com.aliyun.odps.mapred.LocalJobRunner runJob
 信息: Start to run reduces, num: 1
 2014-11-13 16:15:37 com.aliyun.odps.mapred.local.ReduceDriver run
 信息: Start to run Reducer: R_000000
 2014-11-13 16:15:37 com.aliyun.odps.mapred.local.ReduceDriver run
 信息: Fininshed run Reducer: R_000000
 2014-11-13 16:15:37 com.aliyun.odps.mapred.LocalJobRunner moveOutputs
 信息: Copy output to warehouse: label=__default__ -> D:\MyEclipseWorkStation\ODPS_WORD_COUNT\warehouse\yangsw_test\word_count_result
 Summary:
 counters: 8
  map-reduce framework
   combine_input_groups=9
   combine_output_records=9
   map_input_bytes=56
   map_input_records=3
   map_output_records=10
   reduce_output_[word_count_result]_bytes=74
   reduce_output_[word_count_result]_records=9
OK
 InstanceId: mr_20141113161535_909_4984
执行完成后,会在该工程的目录下面多了如下目录结构:
 ./temp
 ./ warehouse
 / yangsw_test
     /word_count
         / __schema__
         / data
     /word_count_result
         / __schema__
         / R_000000
 其中word_count下面的__schema__内容包含word_count源表的项目和表结构:
 project=yangsw_test
 table=word_count
 columns=content:STRING
 word_count下面的data是word_count表的数据:
 hello word
 hello odps
 let us begin cloud compute now
 word_count_result下面的__schema__是目标表的项目和表结构:
 project=yangsw_test
 table=word_count_result
 columns=word:STRING,count:BIGINT
 word_count_result下面的R_000000是输出结果,本次只有一个reduce所以只有一个文件:
 begin,1
 cloud,1
 compute,1
 hello,2
 let,1
 now,1
 odps,1
 us,1
 word,1
 上面就是wordcount本地运行的结果了
 11、 发布jar包
 create resource jar C:\Users\yangswa\Desktop\word_count.jar word_count.jar
 12、 在服务端运行
 odps:yangsw_test> jar -resources word_count.jar -classpath C:\Users\yangswa\Desktop\word_count.jar com.thomas.odps.WordCount
 word_count word_count_result
 Exception in thread "main" java.lang.RuntimeException: java.lang.NoSuchMethodException: com.thomas.odps.WordCount.main([Ljava
 .lang.String;)
         at com.aliyun.odps.mapred.cli.Cli.main(Cli.java:47)
 Caused by: java.lang.NoSuchMethodException: com.thomas.odps.WordCount.main([Ljava.lang.String;)
         at java.lang.Class.getMethod(Unknown Source)
         at com.aliyun.odps.mapred.cli.Cli.main(Cli.java:35)
 这个居然报错:main方法不存在,在odps用户中心的售后服务中,提交了个工单,得到的反馈是ODPS MR功能还没有正式开放,要等到12月底才能正式使用。好吧,就先等等吧.
 
另外一个需要注意的地方时,-classpath要制定本地jar文件的位置,而不是只是指向资源。