【大数据学习 | Spark-SQL】定义UDF和DUAF,UDTF函数

1. UDF函数(用户自定义函数)

一般指的是用户自己定义的单行函数。一进一出,函数接受的是一行中的一个或者多个字段值,返回一个值。比如MySQL中的,日期相关的dateDiff函数,字符串相关的substring函数。

先准备数据:

1.1 导入必要的包

首先,确保导入必要的Spark包:

import org.apache.spark.sql.SparkSession

1.2 创建SparkSession

创建一个SparkSession对象,这是与Spark交互的入口。

1.3 定义UDF并注册到SparkSQL

定义一个Scala函数,并将其注册为UDF。示例

1.4 使用UDF在SQL查询中:

调用udf的register方法,第一个参数是udf函数的函数名,第二个参数是要注册为UDF的函数。

session.udf.register("all_income",(sal:Int,bonus:Int)=>{sal*12 + bonus})

1.5 代码:

尽量使用SparkSQL的sql形式的写法,api写法太麻烦了。

object TestUDF{def main(args: Array[String]): Unit = {val session = SparkSession.builder().master("local[*]").appName("testUDF").getOrCreate()import session.implicits._val df = session.sparkContext.textFile("D:\\software\\Spark\\SparkProgram1\\atguigu-classes\\data\\a.txt").map(t => {val strs = t.split(" ")(strs(0), strs(1), strs(2).toInt, strs(3).toInt)}).toDF("id", "name", "salary", "bonus")session.udf.register("all_income",(sal:Int,bonus:Int)=>{sal*12 + bonus})import org.apache.spark.sql.functions
//    df.withColumn("all",functions.callUDF("all_income",$"salary",$"bonus"))
//      .select("id","name","all")
//      .show()df.createTempView("salary")session.sql("""|select id,name,all_income(salary,bonus) all from salary|""".stripMargin).show()}
}

输出:

2. UDAF(用户自定义的聚合函数)

指的是用户自定义的聚合函数,多进一出,比如MySQL中的,count函数,avg函数。

以学生信息为主进行统计,所有人员的年龄的总和

或者每个性别的年龄的平均值

计算所有人的年龄之和:

package com.atguigu.bigdata.testimport org.apache.spark.sql.{Encoder, Encoders, SparkSession, functions}
import org.apache.spark.sql.expressions.Aggregator/*** ClassName : TestUDAF* Package : com.atguigu.bigdata.test* Description** @Author HeXua* @Create 2024/11/29 19:09*         Version 1.0*/
object TestUDAF {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("test udaf").master("local[*]").getOrCreate()import session.implicits._val df = session.sparkContext.textFile("D:\\software\\Spark\\SparkProgram1\\atguigu-classes\\data\\a.txt").map(t => {val strs = t.split(" ")(strs(0), strs(1), strs(2).toInt, strs(3))}).toDF("id", "name", "age", "gender")import org.apache.spark.sql.functions._// 注册udaf函数session.udf.register("mysum",udaf(new MySum))df.createTempView("student")session.sql("""|select mysum(age) from student|""".stripMargin).show()}
}
// udaf的类继承Aggregator抽象类
class MySum extends Aggregator[Int,Int,Int]{//初始化def zero: Int = 0//聚合逻辑def reduce(b: Int, a: Int): Int = a+b//整体聚合def merge(b1: Int, b2: Int): Int = b1+b2//最终返回值def finish(reduction: Int): Int = reduction//累加值的类型def bufferEncoder: Encoder[Int] = Encoders.scalaInt//输出结果的类型def outputEncoder: Encoder[Int] = Encoders.scalaInt
}

定义用户自定义聚合函数时,继承Aggregator类需要指定三个泛型参数。这三个泛型参数分别代表不同的概念。

泛型参数解释:

1. 输入类型(IN)

这是聚合函数的输入类型,即每次调用reduce方法时传入的单个元素的类型。例如你要计算一组整数的平均值,输入类型就是int。

2. 缓冲区类型(BUFFER)

这是聚合函数的中间状态类型,也称为缓冲区类型。

例如你要计算一组整数的平均值,缓冲区可能包含两个字段:总和和计数,因为iBUF可能是一个元组。

3. 输出类型(OUT)

这是聚合函数的最终输出类型,即finish方法返回的类型。例如你要计算平均值,最终输出类型是Double。

方法解释:

zero:初始化缓冲区的值,对于平均值计算,初始化和计数都是0。

reduce:更新缓冲区,每次传入一个新的输入值时,更新总和和计数。

finish:计算最终结果,根据缓冲区中的总和和计数,计算平均值。

bufferEncoder:定义缓冲区类型的编码器,用于序列化和反序列化缓冲区。

outputEncoder:定义最终输出类型的编码器,用于序列化和反序列化输出结果。

计算每个性别的年龄的平均值:

case class AggragateVo(var cnt:Int,var sum:Int)
object MyAvg extends Aggregator[Int,AggragateVo,Double]{override def zero: AggragateVo = AggragateVo(0,0)override def reduce(b: AggragateVo, a: Int): AggragateVo = {b.cnt += 1b.sum += ab}override def merge(b1: AggragateVo, b2: AggragateVo): AggragateVo = {b1.cnt += b2.cntb1.sum += b2.sumb1}override def finish(reduction: AggragateVo): Double = {reduction.sum.toDouble /reduction.cnt}override def bufferEncoder: Encoder[AggragateVo] = Encoders.productoverride def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

3. UDTF(用户自定义炸裂函数)

拆分函数,进入的是一行内容出现的结果是多行内容。

spark中并不直接支持UDTF函数。但可以使用hive中的炸裂函数达到效果。

import org.apache.spark.sql.SparkSessionobject TestUDTF {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("test udtf").master("local[*]").getOrCreate()import session.implicits._val df = session.sparkContext.textFile("file:///headless/workspace/spark/data/m.txt").map(t => {val strs = t.split(",")(strs(0), strs(1), strs(2))}).toDF("id", "name", "actors")//explode map arraydf.createTempView("movies")session.sql("""|select id,name,actor  from movies lateral view explode(split(actors,'\\|')) t as actor|""".stripMargin).createTempView("movies1")session.sql("""|select count(1),actor from movies1 group by actor|""".stripMargin).show()}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/61541.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

详解Qt pdf 之QPdfSelection 选择文本类

文章目录 QPdfSelection 类详解前言 详细说明公共函数说明1. 构造函数2. text3. boundingRect4. isEmpty5. startPage6. endPage 使用场景示例代码代码说明总结 QPdfSelection 类详解 前言 QPdfSelection 是 Qt PDF 模块中的一个类,用于表示在 PDF 文档中被选中的…

记录部署dvwa靶场踩的几个坑

DVWA reCAPTCHA key: Missing 解决方法:网上随便copy一个,粘贴到config.inc.php配置文件里,具体我也是参考这篇文章的:DVWA下载、安装You dont have permission to access this resource.Server unable to read htaccess file, de…

linux——进程间通信及管道的应用场景

linux进程的控制-CSDN博客 liunx——进程间通信(管道通信)-CSDN博客 文章目录 文章目录 前言 二、管道的应用 1.创建子进程 1、描述: 2.创建进程及管理 3、子进程接受任务 4、控制子进程 总结 前言 上篇博客我们学习了进程间通信&…

开发工具WebStorm,VSCode,HbuilderX

WebStorm、VSCode 和 HBuilderX 是三款非常受欢迎的开发工具,各自有不同的特点和适用场景。下面我会详细比较它们的优势和劣势,帮助你选择最合适的开发环境。 1. WebStorm WebStorm 是由 JetBrains 开发的一款强大的 JavaScript IDE,专门针对…

FPGA实现串口升级及MultiBoot(十)串口升级SPI FLASH实现

本文目录索引 工程架构example9工程设计Vivado设计Vitis设计example9工程验证1、读取FLASH ID2、擦除整个FLASH3、Blank-Check4、烧写Golden区位流5、读取FLASH内容6、烧写MultiBoot区位流(升级位流)7、MultiBoot区位流(升级位流)启动example10工程设计Vivado设计Vitis设计exam…

AIGC引领金融大模型革命:未来已来

文章目录 金融大模型的应用场景1. **金融风险管理**2. **量化交易**3. **个性化投资建议**4. **金融欺诈检测和预防**5. **智能客户服务** 金融大模型开发面临的挑战应对策略《金融大模型开发基础与实践》亮点内容简介作者简介获取方式 在AIGC(Artificial Intellige…

设计模式学习[10]---迪米特法则+外观模式

文章目录 前言1. 迪米特法则2. 外观模式2.1 原理阐述2.2 举例说明 总结 前言 之前有写到过 依赖倒置原则,这篇博客中涉及到的迪米特法则和外观模式更像是这个依赖倒置原则的一个拓展。 设计模式的原则嘛,总归还是高内聚低耦合,下面就来阐述…

量化交易系统开发-实时行情自动化交易-8.4.MT4/MT5平台

19年创业做过一年的量化交易但没有成功,作为交易系统的开发人员积累了一些经验,最近想重新研究交易系统,一边整理一边写出来一些思考供大家参考,也希望跟做量化的朋友有更多的交流和合作。 接下来会对于MT4/MT5平台介绍。 MetaT…

BWO-CNN-BiGRU-Attention白鲸优化算法优化卷积神经网络结合双向门控循环单元时间序列预测,含优化前后对比

BWO-CNN-BiGRU-Attention白鲸优化算法优化卷积神经网络结合双向门控循环单元时间序列预测,含优化前后对比 目录 BWO-CNN-BiGRU-Attention白鲸优化算法优化卷积神经网络结合双向门控循环单元时间序列预测,含优化前后对比预测效果基本介绍模型描述程序设计…

【Linux】死锁、读写锁、自旋锁

文章目录 1. 死锁1.1 概念1.2 死锁形成的四个必要条件1.3 避免死锁 2. 读者写者问题与读写锁2.1 读者写者问题2.2 读写锁的使用2.3 读写策略 3. 自旋锁3.1 概念3.2 原理3.3 自旋锁的使用3.4 优点与缺点 1. 死锁 1.1 概念 死锁是指在⼀组进程中的各个进程均占有不会释放的资源…

Vue3之弹窗

文章目录 第一步、引入JS第二步、弹框 在前端开发语言Vue3&#xff0c;在管理端如何进行弹窗&#xff1f;下面根据API实现效果。 Element API文档&#xff1a; Element-plus文档 搭建环境可参考博客【 初探Vue3环境搭建与nvm使用】 第一步、引入JS <script lang"ts&…

el-tree的使用及控制全选、反选、获取选中

el-tree的使用及控制全选、反选、获取选中 组件使用获取选中的id全选实现反选实现全部代码 组件使用 引入组件&#xff0c;可以参考官网组件引入参考官网示例写好基础数据结构&#xff0c;不知道怎么转换树形机构的看文章&#xff1a;一维数组转树形 <template><el-…

MySQL - 性能优化

使用 Explain 进行分析 Explain 用来分析 SELECT 查询语句&#xff0c;开发人员可以通过分析 Explain 结果来优化查询语句。 比较重要的字段有: select_type : 查询类型&#xff0c;有简单查询、联合查询、子查询等 key : 使用的索引 rows : 扫描的行数 type &#xff1a;…

2、Three.js初步认识场景Scene、相机Camera、渲染器Renderer三要素

三要素之间关系&#xff1a; 有了虚拟场景Scene&#xff0c;相机录像Camera&#xff0c;在相机小屏幕上看到的Renderer Scene当前空间 Mesh人在场景 Camera相机录像 Renderer显示器上 首先先描述下Scene&#xff1a; 这个场景为三要素之一&#xff0c;一切需要展示的东西都需…

【docker】容器卷综合讲解,以及go实现的企业案例

容器卷&#xff08;Volumes&#xff09;基础讲解&#xff1a; 容器概念 容器卷&#xff08;Volumes&#xff09;是 Docker 提供的一种持久化存储机制&#xff0c;允许容器持久化数据&#xff0c;即使容器被删除或重新创建&#xff0c;数据仍然可以保留。 卷是独立于容器的&…

cin/cout的性能优化和缓冲区同步问题

目录 背景导入 问题 1.1ios::sync_with_stdio(false) 1.2为什么要解除C/C IO流同步? 1.3使用场景 2.1cin和cout的绑定关系 2.2为什么要解除绑定关系? 2.3注意事项 背景导入 大家可以先看一下这段背景知识;后面我会谈谈自己的理解; 1.在C中&#xff0c;标准输⼊输出流…

node.js基础学习-url模块-url地址处理(二)

前言 前面我们创建了一个HTTP服务器&#xff0c;如果只是简单的http://localhost:3000/about这种链接我们是可以处理的&#xff0c;但是实际运用中一般链接都会带参数&#xff0c;这样的话如果我们只是简单的判断链接来分配数据&#xff0c;就会报404找不到链接。为了解决这个问…

基于springboot中小型制造企业质量管理系统源码和论文

信息数据从传统到当代&#xff0c;是一直在变革当中&#xff0c;突如其来的互联网让传统的信息管理看到了革命性的曙光&#xff0c;因为传统信息管理从时效性&#xff0c;还是安全性&#xff0c;还是可操作性等各个方面来讲&#xff0c;遇到了互联网时代才发现能补上自古以来的…

服务器密码错误被锁定怎么解决?

当服务器密码错误多次导致账号被锁定时&#xff0c;解决方法需要根据服务器的操作系统&#xff08;如 Linux 或 Windows &#xff09;和具体服务器环境来处理。以下是常见的解决办法&#xff1a; 一、Linux 服务器被锁定的解决方法 1. 使用其他用户账号登录 如果有其他未被…

Java基础——(四)继承

1. 类、超类和子类 在Java中&#xff0c;通过关键字extends表示继承。extends表明正在构造的新类派生与一个已存在的类&#xff0c;已存在的类称为超类&#xff08;superclass&#xff09;、基类&#xff08;base class&#xff09;或父类&#xff08;parent class&#xff09…