hive udaf 输入输出处理参考手册 - 指南

news/2025/10/20 19:29:52/文章来源:https://www.cnblogs.com/yxysuanfa/p/19153491

基本了解hive udaf

本次分享主要是介绍一下我在udaf开发中遇到的难点。可以让大家后面需要开发udaf时做个参考。

我对这篇文档的定位呢,就是如果你需要开发一个udaf函数,然后百度了解一下,查个代码模版。然后参考我的这个文档。能帮助解决类型输入输出的处理问题。因为我在开发的时候,网上没搜到类型输入输出的处理的比较详细的内容。一点点摸索,请教测试才完成的。希望可以帮大家跳过这个过程。

当然,做分享的时候还是简单说下udaf的基本的内容。

hive 自定义udaf函数对于hive 引擎,spark引擎是通用的。

最简单的方式开发hive 自定义udaf函数 可以 extends UDAF ,覆写其中的方法,但是这样写出来的udaf,由于类型转换处理的不好,性能差,查询速度很慢,基本被淘汰了。数据量一大就不可用了。

所以现在自定义udaf基本都是继承AbstractGenericUDAFResolver类或者实现GenericUDAFResolver2接口

本次案例继承AbstractGenericUDAFResolver

然后定义一个内部类继承GenericUDAFEvaluator 并覆写相应方法。

需要我们关注并覆写的方法:

init 初始化函数

public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {mode = m;return null;
}

iterate map 迭代函数

public abstract void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;

terminatePartial 中间输出函数

public abstract Object terminatePartial(AggregationBuffer agg) throws HiveException;

merge 聚合函数

public abstract void merge(AggregationBuffer agg, Object partial) throws HiveException;

terminate 最终输出函数

public abstract Object terminate(AggregationBuffer agg) throws HiveException;

这五个方法要结合下面的阶段来看怎么用

udaf的阶段与方法

下面以average udaf为例来介绍运行过程。

首先 看下average 背后的map reduce 过程

假设t表有a字段int类型,里面有1-10 十个数字。

执行select average(a) from t;

average是有map和reduce的任务。

map过程接收数字,然后保存两个变量,一个是数字的个数,一个是数字的总和。

reduce过程接收各个map结果的两个变量,然后数字的个数相加得到总个数。数字的总和相加得到总和。最后相除输出。

上述过程在udaf中的运算过程如下:

首先是part1 也就是map阶段

先进入init函数。这个函数不做业务逻辑处理,是用于获取输入的数据类型的。数据类型后面详细讲。

然后进入iterate函数,依次读入数据。修改变量值。

然后进入terminatePartial shuffer输出到本地文件

然后是part2 也就是combiner阶段

读本地的shuffer文件,先进init函数。

然后进入merge函数,数字个数相加,总和相加。

然后进入terminatePartial shuffer输出到本地文件

然后进入final阶段 也就是reduce

读取map的结果先进入init函数。

然后进入merge函数,数字个数相加,总和相加。

然后进入terminate函数将最终的平均数输出。

重点难点

上面主线介绍完了。下面介绍关键点。

上面主线中有两个问题需要解决一下。

第一点是在单个阶段中,需要调用多个方法。需要传递变量值。

变量值如何保存呢?有两种方法。第一种就是定义全局变量。不管几个方法,我在全局变量中自然都能用。解决了传递问题。

第二种方式是udaf提供了一个中间类。也是udaf推荐的方式。我不确定不用可不可以,理论可以。

下面是iterate的方法。第一个参数就是这个中间类。这个参数是必须有的。第二个参数是实际读的数据。

public abstract void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;

这是第一个问题。这个问题比较简单,大不了就用这个中间类就行了,然后在这个中间类中定义一个long型属性和一个double属性用于存放个数和总和变量。就不用全局变量了。

第二个问题就是udaf开发的难点:

类型转换

map阶段读数据是从hadoop上读,读取的是hive类型,并且读到的可能是int,可能是long,假如存放的是long型的数据,由于执行引擎的不同,读进内存时的类型也有所不同。可能是java的long,可能是hive的writablelong,也可能是lazylong。这个地方如果自己强转处理不完全就很容易报错。

combiner阶段和reduce阶段的读数据就容易多了。因为这个数据是我们写进去的。我们写进去是writablelong,那读就是writablelong。具体操作稍后介绍。

这里明白每个阶段读数据都需要类型转换就可以了。

map阶段,combiner阶段和reduce阶段的输出也有两个问题需要注意。

第一,输出的时候一定不可以输出java类型,一定要输出hive的类型,也就是实现了WritableComparable接口的类型。在计算是我们是用的java类型,输出前要先转换,再return。

第二,输出的时候不只是在terminatePartial或者terminate函数输出了就完事了。不可以。我们还需要特意的告知udaf,我们在每个阶段输出的是什么类型。而告知的方式也不是直接把类型传给udaf,而是要使用输出类型的对应的Inspector。我叫它类型描述器。

所以我们的数据对于java来说如果是long类型,那么我们除了要用long,也要用到LongWritable和PrimitiveObjectInspector。这就是udaf麻烦的地方。

接下来的篇幅将着重细致介绍类型的具体处理。

输入输出类型处理的具体操作

map输入的处理

average举例。map读取的数据是int或者long或者byte这种数字类型。

从hive读取。由于引擎的不同,所以实际读到内存中的数据类型不确定性很大。所以这个地方非常不建议自己强转处理。

我们在读取之前不清楚它的类型,但是在实际读取时,udaf是能够知道读取的具体类型的。它会上报给init。我们可以在init的方法中拿到。

public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {inputOI0 = (PrimitiveObjectInspector) parameters[0];
}

init方法的第一个参数是阶段的意思。

m==Mode.PARTIAL1||Mode.COMPLETE 表示当前阶段是map

m==Mode.PARTIAL2 表示当前阶段是combiner

m==FINAL 表示当前阶段是reduce

现在处理的map阶段。所以我们取m==Mode.PARTIAL1

然后ObjectInspector[] parameters就是udaf给我们传进来的类型信息,这个类型信息是以类型描述器传进来的。

下面介绍一下类型描述器

主要使用到的是Inspector的各种子类,第一大子类就是ObjectInspector,这个子类也是接口,我们用不到。我们用到的是ObjectInspector 的各种子类。

ObjectInspector有五个子类

PRIMITIVE,

LIST,

MAP,

STRUCT,

UNION;

以我的经验,没用到第五个。

PrimitiveObjectInspector描述器,可以用于描述各种基本类型。

我们average传入的数字,所以,在PARTIAL1阶段,传进来的ObjectInspector一定是PrimitiveObjectInspector。所以我们可以强转成PrimitiveObjectInspector。作为全局变量保留。

然后下一步,在iterate中,拿到这个数字的类型描述器。现在我们需要通过这个描述器将数字变成java的类型。可以使用工具类PrimitiveObjectInspectorUtils。

long num = PrimitiveObjectInspectorUtils.getLong(parameters[0], inputOI0);

map输出的处理

map阶段的输出其实就是两个变量,一个是数字个数,一个是数字的总和。

首先在init中,一定要定义输出的类型描述器。

我们在init中return的东西,是类型描述器。我们的变量是long类型,那么我们的输出就是long的对应的类型描述器,也就是writableLongObjectInspector。

两个变量都是long类型。但是需要注意的是,我们输出的只能是一个个体,不能是两个。所以这两个变量不能作为两个long存在,可以作为list,可以作为map。也可以作为struct。struct就是结构体的意思,是类型描述器的一种。

如果是以list输出

那么

return ObjectInspectorUtils.getStandardObjectInspector(ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableLongObjectInspector));

如果以map输出

那么

return ObjectInspectorUtils.getStandardObjectInspector(ObjectInspectorFactory.getStandardMapObjectInspector(PrimitiveObjectInspectorFactory.writableLongObjectInspector,PrimitiveObjectInspectorFactory.writableLongObjectInspector));

如果以struct输出

//先建一个描述器的list,里面放两个long描述器
ArrayList foi = new ArrayList();
foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector);
foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector);
//再建一个string list,里面放两个名称,一一对应上面描述器的名称
ArrayList fname = new ArrayList();
fname.add("num");
fname.add("sum");
//然后利用工具类返回StructObjectInspector
return ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi);

使用这个工具类我们创建struct比较容易。但是其实生成的东西比较复杂。

struct是以块的方式存放数据。上面的数据被分成两个数据块,第一个是num块,第二个是sum块。num和sum是块的名字,我们可以通过名字取出各个块。

每个数据块又包含两部分数据,一个是数据的具体值,一个是这个数据的类型描述器。

init函数只是告知map阶段输出的类型,

map阶段的实际输出在terminatePartial中进行。

此方法返回object 。

return的数据就是输出的数据。

如果上面init写的类型是list,那么这里就要return list,注意,list里面的格式不是long,而是LongWritable

ArrayList write_list=new ArrayList() ;
write_list.add(new LongWritable(num));
write_list.add(new LongWritable(sum));

如果上面init写的类型是map,return 也是map

HashMap write_map=new HashMap<>() ;
HashMap.put(new LongWritable(num),new LongWritable(sum));

如果上面init写的类型是struct,return也是struct ,这里可以返回一个LongWritable数组,如果struct里面类型不一致的话,就返回object数组

Object[] longresult = new Object[2];
longresult[0] = new LongWritable(num);
longresult[1] = new LongWritable(sum);
return longresult;

combiner输入的处理

combiner的输入是map的输出。

我们已经知道明确的类型了,所以可以在init中处理,也可以直接在merge中强转。

我选择在merge中强转。merge函数的第二个参数,就是传进来的数据。

public abstract void merge(AggregationBuffer agg, Object partial) throws HiveException;

如果上面传的是list,先创建一个list描述器,然后使用描述器将list获取出来。

StandardListObjectInspector listinspect=(StandardListObjectInspector)ObjectInspectorUtils.getStandardObjectInspector(ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableLongObjectInspector));
ArrayList list =(ArrayList) listinspect.getList(partial);

如果上面传的是map,先创建一个list描述器,然后使用描述器将map获取出来。

StandardMapObjectInspector mapinspect= (StandardMapObjectInspector)ObjectInspectorUtils.getStandardObjectInspector(ObjectInspectorFactory.getStandardMapObjectInspector(PrimitiveObjectInspectorFactory.writableLongObjectInspector,PrimitiveObjectInspectorFactory.writableLongObjectInspector));
HashMap map= (HashMap)mapinspect.getMap(partial);

如果上面传的是struct,先创建一个struct描述器,然后使用描述器将struct获取出来。

这个struct描述器自己创建比较麻烦,所以我在init函数中获取

首先定义一个全局变量StructObjectInspector soi;

然后在init中

if (mode == Mode.PARTIAL2 || mode == Mode.FINAL) {soi = (StructObjectInspector) parameters[0];
}

然后在merge中

//先用名称拿出strct中的想要处理的数据块num_field
StructField num_field = soi.getStructFieldRef("num");
//然后用数据块num_field拿到具体数据值,和这个数据的描述器
Object num_obj = soi.getStructFieldData(partial, num_field);
PrimitiveObjectInspector num_Inspector = (PrimitiveObjectInspector)num_field.getFieldObjectInspector();
//最后用这个数据的描述器和这个具体数据值,转换成相应的java数据类型。
long num = PrimitiveObjectInspectorUtils.getLong(num_obj, num_Inspector);

reduce输入的处理

reduce的输入处理和combiner完全一致

combiner输出的处理

combiner的输出与map的输出完全一致。

reduce输出的处理

在init定义输出的类型描述器,writableDouble

if (mode == Mode.FINAL) {return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
}

然后实际输出在terminate函数中,输出DoubleWritable。

return new DoubleWritable(sum/num);

部分java,hive,描述器类型对应关系参考

java类型

hive输入

hive输出

描述器

int

PrimitiveObjectInspectorUtils.getInt()

IntWritable

PrimitiveObjectInspector

long

PrimitiveObjectInspectorUtils.getLong()

LongWritable

PrimitiveObjectInspector

double

PrimitiveObjectInspectorUtils.getDouble()

DoubleWritable

PrimitiveObjectInspector

string

PrimitiveObjectInspectorUtils.getString()

Text

PrimitiveObjectInspector

list<Long>

StandardListObjectInspector fieldObjectInspector;

fieldObjectInspector.getList();

ObjectInspectorFactory

.getStandardListObjectInspector

StandardListObjectInspector

map<String,Integer>

StandardMapObjectInspector fieldObjectInspector;

fieldObjectInspector.getMap();

ObjectInspectorFactory

.getStandardMapObjectInspector

StandardMapObjectInspector

int,list<Long> (组合)

soi.getStructFieldRef();

soi.getStructFieldData();

ObjectInspectorFactory.getStandardStructObjectInspector

StructObjectInspector

其他问题:

如果class AggregateAgg implements GenericUDAFEvaluator.AggregationBuffer {}

中的属性不只有基本数据类型。比如包含字符串,list,map等。

那么spark或者hive在执行的时候,如果启用map端聚合。在分配内存的时候不知道分配多少。会报错。

解决方式是在使用的时候配置参数set hive.map.aggr = false;

或者在这个类上加注解@GenericUDAFEvaluator.AggregationType(estimable = true)

效果都是不启用map端聚合。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/941526.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

250921

1. 20号胶 12000附近部署多单 目标看新高

位运算(早晚得学会)

在二进制中,数字的奇偶性可以通过最低位(最右边的一位)来判断:如果一个数是奇数,它的二进制最低位一定是 1(例如:3 是 11,5 是 101)。 如果一个数是偶数,它的二进制最低位一定是 0(例如:2 是 10,4 是 100…

Jvm参数分类

目录Jvm参数分类总结快速概览1. 标准参数 (Standard Options)2. 扩展参数 (X Options)3. 高级运行时参数 (XX Options)布尔类型键值类型4. 系统属性 (System Properties)5. 主类参数 (Main Class Arguments) Jvm参数分…

10/20

今天满课,下午弄了一下Java数据库的连接,没弄明白,明天打算在弄,好困

2025年市面上工程石材产品排名前十:选购指南与品牌深度解析

摘要 随着建筑行业标准化与品质化需求提升,工程石材市场迎来新一轮发展机遇。据中国石材协会统计,2024年工程石材市场规模突破8000亿元,同比增长12.3%。本文基于产品性能、市场口碑、服务能力等维度,为您呈现权威的…

2025年市面上工程石材产品排名前十:权威榜单与选择指南

摘要 工程石材行业在2025年持续蓬勃发展,随着建筑和装饰市场的需求增长,优质石材供应商成为关键。本文基于用户搜索意图,提供市面上工程石材产品的排名前十榜单,涵盖品牌实力、口碑评分和核心优势,旨在帮助用户快…

深入解析:【C++】继承

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

利用错误配置的postMessage()函数实现DOM型XSS攻击

本文详细分析了三种通过错误配置postMessage函数实现DOM型XSS攻击的场景,包括无源验证检查、使用indexOf函数进行源验证以及白名单源存在XSS漏洞的情况,并提供了相应的漏洞代码和利用方法。利用错误配置的postMessag…

听说今年很多应届硕士很难找到工作...

听说今年很多应届硕士很难找到工作...听说今年很多应届硕士很难找到工作,投递简历的人和岗位机会比例通常是300:1的概率。本文适合没开始工作的应届生、Gap大于1年的人、没去过大厂工作的人...分享一个提高约面率的小…

开源 C++ QT QML 创建(四)复杂控件--Listview

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

洛谷 P7380 [COCI 2018/2019 #6] Konj 题解

P7380 [COCI 2018/2019 #6] Konj 题解 题目传送门。 题意 给定 \(n\) 条线段两端端点坐标,保证一定与 \(x\) 轴或 \(y\) 轴平行。求最小的矩阵能表示出从点 \(T\) 出发到达的所有点。 如果两条线段相交或间接可以走到…

意大利居留 办理 看小红书上的材料就行,部分材料可以到按手印再补交

到了邮局之后 取个寄信号 我的是P开头 然后给他信封和材料 最后签3个名字 然后刷卡100零几欧 输入pin码 ok

机器学习领导者分享AI技术与行业洞见

本文介绍了某中心机器学习业务负责人在人工智能领域的专业见解,涵盖分布式模型训练、自然语言处理、计算机视觉等技术应用,以及AI行业发展趋势和人才培养策略,为技术从业者提供有价值的参考。Allie K. Miller希望帮…

el-upload上传配合$confirm使用的问题

现象:el-upload如果再on-remove,先调用$confirm 点了取消,也删了文件。 解决方案: 1、使用:before-remove钩子<el-uploadref="upload"class="m-l-10":limit="1"action="&qu…

博客的意義

我大概就是想寫成遊寄?個人總結?還是其他隨意的? 我補檔前面的個人總結。

我写过的动态规划问题的状态表示与转移汇总

众所周知,在写 dp 问题时只要想出来状态表示和转移方程就能把一道题写的差不多了,所以在这里整理一下我写过的 dp 问题的解法,方便我后面举一反三 背包动态规划 线性动态规划 Luogu P1359 租用游艇 橙 题目链接\(n\…

基于大语言模型的具身智能语义地图与导航研究 - MKT

基于大语言模型的具身智能语义地图与导航研究https://www.gml.ac.cn/kydt/694.html https://www.gml.ac.cn/kydt/694.html

10.20 CSP-S模拟35 改题记录

水水HZOJ 写在前面 改不出来T3T4,遂先把这个写了,然后晚上搞搞恐怖的容斥qwq。这场模拟赛不多做评价,我不知道是我菜还是题史,虽然大概率是两者都有。《美人》 한 번 보고 두 번 보고 一眼 再一眼 자꾸만 보고 싶네…

例子:vue3+vite+router创建导航菜单

第一部分 1、初始化项目npm init vite@latestnpm run dev :运行项目 q+Enter:退出运行 2、安装路由依赖npm install vue-router@4 # Vue3 对应 vue-router 4.x 版本 第二部分: 创建页面组件 在 src/views 目录下创…