基本了解hive udaf
本次分享主要是介绍一下我在udaf开发中遇到的难点。可以让大家后面需要开发udaf时做个参考。
我对这篇文档的定位呢,就是如果你需要开发一个udaf函数,然后百度了解一下,查个代码模版。然后参考我的这个文档。能帮助解决类型输入输出的处理问题。因为我在开发的时候,网上没搜到类型输入输出的处理的比较详细的内容。一点点摸索,请教测试才完成的。希望可以帮大家跳过这个过程。
当然,做分享的时候还是简单说下udaf的基本的内容。
hive 自定义udaf函数对于hive 引擎,spark引擎是通用的。
最简单的方式开发hive 自定义udaf函数 可以 extends UDAF ,覆写其中的方法,但是这样写出来的udaf,由于类型转换处理的不好,性能差,查询速度很慢,基本被淘汰了。数据量一大就不可用了。
所以现在自定义udaf基本都是继承AbstractGenericUDAFResolver类或者实现GenericUDAFResolver2接口
本次案例继承AbstractGenericUDAFResolver
然后定义一个内部类继承GenericUDAFEvaluator 并覆写相应方法。
需要我们关注并覆写的方法:
init 初始化函数
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {mode = m;return null;
}
iterate map 迭代函数
public abstract void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;
terminatePartial 中间输出函数
public abstract Object terminatePartial(AggregationBuffer agg) throws HiveException;
merge 聚合函数
public abstract void merge(AggregationBuffer agg, Object partial) throws HiveException;
terminate 最终输出函数
public abstract Object terminate(AggregationBuffer agg) throws HiveException;
这五个方法要结合下面的阶段来看怎么用
udaf的阶段与方法
下面以average udaf为例来介绍运行过程。
首先 看下average 背后的map reduce 过程
假设t表有a字段int类型,里面有1-10 十个数字。
执行select average(a) from t;
average是有map和reduce的任务。
map过程接收数字,然后保存两个变量,一个是数字的个数,一个是数字的总和。
reduce过程接收各个map结果的两个变量,然后数字的个数相加得到总个数。数字的总和相加得到总和。最后相除输出。
上述过程在udaf中的运算过程如下:
首先是part1 也就是map阶段
先进入init函数。这个函数不做业务逻辑处理,是用于获取输入的数据类型的。数据类型后面详细讲。
然后进入iterate函数,依次读入数据。修改变量值。
然后进入terminatePartial shuffer输出到本地文件
然后是part2 也就是combiner阶段
读本地的shuffer文件,先进init函数。
然后进入merge函数,数字个数相加,总和相加。
然后进入terminatePartial shuffer输出到本地文件
然后进入final阶段 也就是reduce
读取map的结果先进入init函数。
然后进入merge函数,数字个数相加,总和相加。
然后进入terminate函数将最终的平均数输出。
重点难点
上面主线介绍完了。下面介绍关键点。
上面主线中有两个问题需要解决一下。
第一点是在单个阶段中,需要调用多个方法。需要传递变量值。
变量值如何保存呢?有两种方法。第一种就是定义全局变量。不管几个方法,我在全局变量中自然都能用。解决了传递问题。
第二种方式是udaf提供了一个中间类。也是udaf推荐的方式。我不确定不用可不可以,理论可以。
下面是iterate的方法。第一个参数就是这个中间类。这个参数是必须有的。第二个参数是实际读的数据。
public abstract void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;
这是第一个问题。这个问题比较简单,大不了就用这个中间类就行了,然后在这个中间类中定义一个long型属性和一个double属性用于存放个数和总和变量。就不用全局变量了。
第二个问题就是udaf开发的难点:
类型转换
map阶段读数据是从hadoop上读,读取的是hive类型,并且读到的可能是int,可能是long,假如存放的是long型的数据,由于执行引擎的不同,读进内存时的类型也有所不同。可能是java的long,可能是hive的writablelong,也可能是lazylong。这个地方如果自己强转处理不完全就很容易报错。
combiner阶段和reduce阶段的读数据就容易多了。因为这个数据是我们写进去的。我们写进去是writablelong,那读就是writablelong。具体操作稍后介绍。
这里明白每个阶段读数据都需要类型转换就可以了。
map阶段,combiner阶段和reduce阶段的输出也有两个问题需要注意。
第一,输出的时候一定不可以输出java类型,一定要输出hive的类型,也就是实现了WritableComparable接口的类型。在计算是我们是用的java类型,输出前要先转换,再return。
第二,输出的时候不只是在terminatePartial或者terminate函数输出了就完事了。不可以。我们还需要特意的告知udaf,我们在每个阶段输出的是什么类型。而告知的方式也不是直接把类型传给udaf,而是要使用输出类型的对应的Inspector。我叫它类型描述器。
所以我们的数据对于java来说如果是long类型,那么我们除了要用long,也要用到LongWritable和PrimitiveObjectInspector。这就是udaf麻烦的地方。
接下来的篇幅将着重细致介绍类型的具体处理。
输入输出类型处理的具体操作
map输入的处理
average举例。map读取的数据是int或者long或者byte这种数字类型。
从hive读取。由于引擎的不同,所以实际读到内存中的数据类型不确定性很大。所以这个地方非常不建议自己强转处理。
我们在读取之前不清楚它的类型,但是在实际读取时,udaf是能够知道读取的具体类型的。它会上报给init。我们可以在init的方法中拿到。
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {inputOI0 = (PrimitiveObjectInspector) parameters[0];
}
init方法的第一个参数是阶段的意思。
m==Mode.PARTIAL1||Mode.COMPLETE 表示当前阶段是map
m==Mode.PARTIAL2 表示当前阶段是combiner
m==FINAL 表示当前阶段是reduce
现在处理的map阶段。所以我们取m==Mode.PARTIAL1
然后ObjectInspector[] parameters就是udaf给我们传进来的类型信息,这个类型信息是以类型描述器传进来的。
下面介绍一下类型描述器
主要使用到的是Inspector的各种子类,第一大子类就是ObjectInspector,这个子类也是接口,我们用不到。我们用到的是ObjectInspector 的各种子类。
ObjectInspector有五个子类
PRIMITIVE,
LIST,
MAP,
STRUCT,
UNION;
以我的经验,没用到第五个。
PrimitiveObjectInspector描述器,可以用于描述各种基本类型。
我们average传入的数字,所以,在PARTIAL1阶段,传进来的ObjectInspector一定是PrimitiveObjectInspector。所以我们可以强转成PrimitiveObjectInspector。作为全局变量保留。
然后下一步,在iterate中,拿到这个数字的类型描述器。现在我们需要通过这个描述器将数字变成java的类型。可以使用工具类PrimitiveObjectInspectorUtils。
long num = PrimitiveObjectInspectorUtils.getLong(parameters[0], inputOI0);
map输出的处理
map阶段的输出其实就是两个变量,一个是数字个数,一个是数字的总和。
首先在init中,一定要定义输出的类型描述器。
我们在init中return的东西,是类型描述器。我们的变量是long类型,那么我们的输出就是long的对应的类型描述器,也就是writableLongObjectInspector。
两个变量都是long类型。但是需要注意的是,我们输出的只能是一个个体,不能是两个。所以这两个变量不能作为两个long存在,可以作为list,可以作为map。也可以作为struct。struct就是结构体的意思,是类型描述器的一种。
如果是以list输出
那么
return ObjectInspectorUtils.getStandardObjectInspector(ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableLongObjectInspector));
如果以map输出
那么
return ObjectInspectorUtils.getStandardObjectInspector(ObjectInspectorFactory.getStandardMapObjectInspector(PrimitiveObjectInspectorFactory.writableLongObjectInspector,PrimitiveObjectInspectorFactory.writableLongObjectInspector));
如果以struct输出
//先建一个描述器的list,里面放两个long描述器
ArrayList foi = new ArrayList();
foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector);
foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector);
//再建一个string list,里面放两个名称,一一对应上面描述器的名称
ArrayList fname = new ArrayList();
fname.add("num");
fname.add("sum");
//然后利用工具类返回StructObjectInspector
return ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi);
使用这个工具类我们创建struct比较容易。但是其实生成的东西比较复杂。
struct是以块的方式存放数据。上面的数据被分成两个数据块,第一个是num块,第二个是sum块。num和sum是块的名字,我们可以通过名字取出各个块。
每个数据块又包含两部分数据,一个是数据的具体值,一个是这个数据的类型描述器。
init函数只是告知map阶段输出的类型,
map阶段的实际输出在terminatePartial中进行。
此方法返回object 。
return的数据就是输出的数据。
如果上面init写的类型是list,那么这里就要return list,注意,list里面的格式不是long,而是LongWritable
ArrayList write_list=new ArrayList() ;
write_list.add(new LongWritable(num));
write_list.add(new LongWritable(sum));
如果上面init写的类型是map,return 也是map
HashMap write_map=new HashMap<>() ;
HashMap.put(new LongWritable(num),new LongWritable(sum));
如果上面init写的类型是struct,return也是struct ,这里可以返回一个LongWritable数组,如果struct里面类型不一致的话,就返回object数组
Object[] longresult = new Object[2];
longresult[0] = new LongWritable(num);
longresult[1] = new LongWritable(sum);
return longresult;
combiner输入的处理
combiner的输入是map的输出。
我们已经知道明确的类型了,所以可以在init中处理,也可以直接在merge中强转。
我选择在merge中强转。merge函数的第二个参数,就是传进来的数据。
public abstract void merge(AggregationBuffer agg, Object partial) throws HiveException;
如果上面传的是list,先创建一个list描述器,然后使用描述器将list获取出来。
StandardListObjectInspector listinspect=(StandardListObjectInspector)ObjectInspectorUtils.getStandardObjectInspector(ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableLongObjectInspector));
ArrayList list =(ArrayList) listinspect.getList(partial);
如果上面传的是map,先创建一个list描述器,然后使用描述器将map获取出来。
StandardMapObjectInspector mapinspect= (StandardMapObjectInspector)ObjectInspectorUtils.getStandardObjectInspector(ObjectInspectorFactory.getStandardMapObjectInspector(PrimitiveObjectInspectorFactory.writableLongObjectInspector,PrimitiveObjectInspectorFactory.writableLongObjectInspector));
HashMap map= (HashMap)mapinspect.getMap(partial);
如果上面传的是struct,先创建一个struct描述器,然后使用描述器将struct获取出来。
这个struct描述器自己创建比较麻烦,所以我在init函数中获取
首先定义一个全局变量StructObjectInspector soi;
然后在init中
if (mode == Mode.PARTIAL2 || mode == Mode.FINAL) {soi = (StructObjectInspector) parameters[0];
}
然后在merge中
//先用名称拿出strct中的想要处理的数据块num_field
StructField num_field = soi.getStructFieldRef("num");
//然后用数据块num_field拿到具体数据值,和这个数据的描述器
Object num_obj = soi.getStructFieldData(partial, num_field);
PrimitiveObjectInspector num_Inspector = (PrimitiveObjectInspector)num_field.getFieldObjectInspector();
//最后用这个数据的描述器和这个具体数据值,转换成相应的java数据类型。
long num = PrimitiveObjectInspectorUtils.getLong(num_obj, num_Inspector);
reduce输入的处理
reduce的输入处理和combiner完全一致
combiner输出的处理
combiner的输出与map的输出完全一致。
reduce输出的处理
在init定义输出的类型描述器,writableDouble
if (mode == Mode.FINAL) {return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
}
然后实际输出在terminate函数中,输出DoubleWritable。
return new DoubleWritable(sum/num);
部分java,hive,描述器类型对应关系参考
java类型 | hive输入 | hive输出 | 描述器 |
int | PrimitiveObjectInspectorUtils.getInt() | IntWritable | PrimitiveObjectInspector |
long | PrimitiveObjectInspectorUtils.getLong() | LongWritable | PrimitiveObjectInspector |
double | PrimitiveObjectInspectorUtils.getDouble() | DoubleWritable | PrimitiveObjectInspector |
string | PrimitiveObjectInspectorUtils.getString() | Text | PrimitiveObjectInspector |
list<Long> | StandardListObjectInspector fieldObjectInspector; fieldObjectInspector.getList(); | ObjectInspectorFactory .getStandardListObjectInspector | StandardListObjectInspector |
map<String,Integer> | StandardMapObjectInspector fieldObjectInspector; fieldObjectInspector.getMap(); | ObjectInspectorFactory .getStandardMapObjectInspector | StandardMapObjectInspector |
int,list<Long> (组合) | soi.getStructFieldRef(); soi.getStructFieldData(); | ObjectInspectorFactory.getStandardStructObjectInspector | StructObjectInspector |
其他问题:
如果class AggregateAgg implements GenericUDAFEvaluator.AggregationBuffer {}
中的属性不只有基本数据类型。比如包含字符串,list,map等。
那么spark或者hive在执行的时候,如果启用map端聚合。在分配内存的时候不知道分配多少。会报错。
解决方式是在使用的时候配置参数set hive.map.aggr = false;
或者在这个类上加注解@GenericUDAFEvaluator.AggregationType(estimable = true)
效果都是不启用map端聚合。