SparkSQL中UDAF案例分析

SparkSQL中UDAF案例分析

1、统计单词的个数

package com.bynear.spark_sql;

import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.ArrayList;


public class Spark_UDAF extends UserDefinedAggregateFunction {/**
     * inputSchema指的是输入的数据类型
     *
     * @return
     */
    @Override
    public StructType inputSchema() {ArrayList<StructField> fields = new ArrayList<StructField>();
        fields.add(DataTypes.createStructField("str", DataTypes.StringType, true));
        return DataTypes.createStructType(fields);
    }/**
     * bufferSchema指的是  中间进行聚合时  所处理的数据类型
     *
     * @return
     */
    @Override
    public StructType bufferSchema() {ArrayList<StructField> fields = new ArrayList<StructField>();
        fields.add(DataTypes.createStructField("count", DataTypes.IntegerType, true));
        return DataTypes.createStructType(fields);
    }/**
     * dataType指的是函数返回值的类型
     *
     * @return
     */
    @Override
    public DataType dataType() {return DataTypes.IntegerType;
    }/**
     * 一致性检验,如果为true,那么输入不变的情况下计算的结果也是不变的。
     *
     * @return
     */
    @Override
    public boolean deterministic() {return true;
    }/**
     * 设置聚合中间buffer的初始值,但需要保证这个语义:两个初始buffer调用下面实现的merge方法后也应该为初始buffer
     * 即如果你初始值是1,然后你merge是执行一个相加的动作,两个初始buffer合并之后等于2     * 不会等于初始buffer了。这样的初始值就是有问题的,所以初始值也叫"zero value"
     * 为每个分组的数据执行初始化操作
     *
     * @param buffer
     */
    @Override
    public void initialize(MutableAggregationBuffer buffer) {buffer.update(0, 0);
    }/**
     * 用输入数据input更新buffer,类似于combineByKey
     * 指的是,每个分组,有新的值进来的时候,如何进行分组对应的聚合值的计算
     *
     * @param buffer
     * @param input
     */
    @Override
    public void update(MutableAggregationBuffer buffer, Row input) {buffer.update(0, Integer.valueOf(buffer.getAs(0).toString()) + 1);
    }/**
     * 合并两个buffer,buffer2合并到buffer1.在合并两个分区聚合结果的时候会被用到,类似于reduceByKey
     * 这里要注意该方法没有返回值,在实现的时候是把buffer2合并到buffer1中去,你需要实现这个合并细节
     * 由于spark是分布式的,所以每一分组的数据,可能会在不同的节点上进行局部聚合,就是update
     * 但是 最后一个分组,在各个节点上的聚合值,要进行merge 也就是合并
     *
     * @param buffer1
     * @param buffer2
     */
    @Override
    public void merge(MutableAggregationBuffer buffer1, Row buffer2) {buffer1.update(0, Integer.valueOf(buffer1.getAs(0).toString()) + Integer.valueOf(buffer2.getAs(0).toString()));
    }/**
     * 只的是 一个分组的聚合值,如何通过中间的缓存聚合值,最后返回一个最终的聚合值
     *
     * @param buffer
     * @return
     */
    @Override
    public Object evaluate(Row buffer) {return buffer.getInt(0);
    }
}

package com.bynear.spark_sql;

import com.clearspring.analytics.util.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.Arrays;
import java.util.List;

public class UDAF {public static void main(String[] args) {SparkConf conf = new SparkConf().setAppName("UDAF").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);
        List<String> nameList = Arrays.asList("xiaoming", "xiaoming", "刘德华","古天乐","feifei", "feifei", "feifei", "katong");
        //转换为javaRDD
        JavaRDD<String> nameRDD = sc.parallelize(nameList, 3);
        //转换为JavaRDD<Row>
        JavaRDD<Row> nameRowRDD = nameRDD.map(new Function<String, Row>() {public Row call(String name) throws Exception {return RowFactory.create(name);
            }});
        List<StructField> fields = Lists.newArrayList();
        fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
        StructType structType = DataTypes.createStructType(fields);
        DataFrame namesDF = sqlContext.createDataFrame(nameRowRDD, structType);
        namesDF.registerTempTable("names");
        sqlContext.udf().register("countString", new Spark_UDAF());
        sqlContext.sql("select name,countString(name) as count  from names group by name").show();
        List<Row> rows = sqlContext.sql("select name,countString(name) as count  from names group by name").javaRDD().collect();
        for (Row row : rows) {System.out.println(row);
        }}
}
运行结果:

+--------+-----+
|    name|count|
+--------+-----+
|  feifei|    3|
|xiaoming|    2|
|     刘德华|    1|
|  katong|    1|
|     古天乐|    1|
+--------+-----+

2、统计某品牌价格的平均值

package com.bynear.spark_sql;

import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.ArrayList;

public class MyUDAF extends UserDefinedAggregateFunction {private StructType inputSchema;
    private StructType bufferSchema;

    public MyUDAF() {ArrayList<StructField> inputFields = new ArrayList<StructField>();
        inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.DoubleType, true));
        inputSchema = DataTypes.createStructType(inputFields);

        ArrayList<StructField> bufferFields = new ArrayList<StructField>();
        bufferFields.add(DataTypes.createStructField("sum", DataTypes.DoubleType, true));
        bufferFields.add(DataTypes.createStructField("count", DataTypes.DoubleType, true));
        bufferSchema = DataTypes.createStructType(bufferFields);
    }@Override
    public StructType inputSchema() {return inputSchema;
    }@Override
    public StructType bufferSchema() {return bufferSchema;
    }@Override
    public DataType dataType() {return DataTypes.DoubleType;
    }@Override
    public boolean deterministic() {return true;
    }@Override
    public void initialize(MutableAggregationBuffer buffer) {
//        缓存区两个分组  分组编号为0 求和sum   初始化值为0
//                     分组编号为1 count   初始化值为0
        buffer.update(0, 0.0);
        buffer.update(1, 0.0);
    }@Override
    public void update(MutableAggregationBuffer buffer, Row input) {//如果input的索引值为0的值不为0
        if (!input.isNullAt(0)) {
//            两个分组分别进行更新数据!分组编号0  求和sum  缓存区的值 +  输入放入值
            double updatesum = buffer.getDouble(0) + input.getDouble(0);
//                                 分组编号1  count  缓存区的个数 + 1
            double updatecount = buffer.getDouble(1) + 1;
            buffer.update(0, updatesum);
            buffer.update(1, updatecount);
        }}@Override
    public void merge(MutableAggregationBuffer buffer1, Row buffer2) {double metgesum = buffer1.getDouble(0) + buffer2.getDouble(0);
        double mergecount = buffer1.getDouble(1) + buffer2.getDouble(1);
        buffer1.update(0, metgesum);
        buffer1.update(1, mergecount);
    }@Override
    public Object evaluate(Row buffer) {return buffer.getDouble(0) / buffer.getDouble(1);
    }
}

package com.bynear.spark_sql;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.math.BigDecimal;
import java.util.ArrayList;

public class MyUDAF_SQL {public static void main(String[] args) {SparkConf conf = new SparkConf().setAppName("myUDAF").setMaster("local");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(jsc);
        JavaRDD<String> lines = jsc.textFile("C://Users//Administrator//Desktop//fastJSon//sales.txt");
        JavaRDD<Row> map = lines.map(new Function<String, Row>() {@Override
            public Row call(String line) throws Exception {String[] Linesplit = line.split(",");
                return RowFactory.create(String.valueOf(Linesplit[0]), Double.valueOf(Linesplit[1]));
            }});
        ArrayList<StructField> fields = new ArrayList<StructField>();
        fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
        fields.add(DataTypes.createStructField("salary", DataTypes.DoubleType, true));
        StructType structType = DataTypes.createStructType(fields);
        DataFrame df = sqlContext.createDataFrame(map, structType);
        sqlContext.udf().register("myAverage", new MyUDAF());
        df.registerTempTable("zjs_table");

        df.show();

        sqlContext.udf().register("twoDecimal", new UDF1<Double, Double>() {@Override
            public Double call(Double in) throws Exception {BigDecimal b = new BigDecimal(in);
                double res = b.setScale(2, BigDecimal.ROUND_HALF_DOWN).doubleValue();
                return res;
            }}, DataTypes.DoubleType);

        DataFrame resultDF = sqlContext.sql("select name,twoDecimal(myAverage(salary)) as 平均值 from zjs_table group by name ");
        resultDF.show();

    }
}

文本:

三星,1542
三星,1548
三星,8456
三星,8866
中兴,1856
中兴,1752
苹果,1500
苹果,2500
苹果,3500
苹果,4500
苹果,5500

运行结果:

+----+-------+
|name| salary|
+----+-------+
|  三星|12345.0|
|  三星| 4521.0|
|  三星| 7895.0|
|  华为| 5421.0|
|  华为| 4521.0|
|  华为| 5648.0|
|  苹果|12548.0|
|  苹果| 7856.0|
|  苹果|45217.0|
|  苹果|89654.0|
+----+-------+

+----+--------+
|name|     平均值|
+----+--------+
|  三星| 8253.67|
|  华为| 5196.67|
|  苹果|38818.75|
+----+--------+

注意点:文本的编码格式,以及Java代码中DataTypes.DoubleType。。。。








本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/326061.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第四章选择结构(二)

一、Switch的结构语法&#xff1a; switch&#xff08;变量&#xff09;{ case 1: //代码 break; case 2: //代码 break; default : //代码 break; } 2.switch&#xff1a;相当于一个开关&#xff0c;后面的括号里面可以是int short byte char 枚举类型 String类型的值 3.case后…

河北省计算机对口高考大纲,河北省普通高等学校对口招生英语考试大纲

河北省普通高等学校对口招生英语考试大纲一、考试范围和考试形式河北省中等职业学校对口升学英语考试以教育部颁布的《中等职业学校英语教学大纲》为依据&#xff0c;以中等职业教育课程改革国家规划教材《英语》为主要参考教材&#xff0c;同时结合我省中等职业学校教学情况进…

2017蓝桥杯省赛---java---B---10(k倍区间)

题目描述 标题&#xff1a; k倍区间给定一个长度为N的数列&#xff0c;A1, A2, … AN&#xff0c;如果其中一段连续的子序列Ai, Ai1, … Aj(i < j)之和是K的倍数&#xff0c;我们就称这个区间[i, j]是K倍区间。你能求出数列中总共有多少个K倍区间吗&#xff1f;输入 第一行…

Spring Boot 最佳实践

转载自 Spring Boot 最佳实践 Spring Boot是用于开发微服务的最流行的Java框架。在本文中&#xff0c;我将与您分享自2016年以来我在专业开发中使用Spring Boot所采用的最佳实践。本文基于我的个人经验和认可的Spring Boot方面的专家。 在本文中&#xff0c;我将重点介绍Spr…

第三章选择结构(一)

一、boolean 类型&#xff1a; 值只有两个&#xff1a;true(真)和false(假) 二、常用的关系符&#xff1a; > < > < ! 作用&#xff1a;用来比较运算结果&#xff0c;值是boolean类型。. 三、if选择结构的语法&#xff1a; if(boolean){ 代码块 } 含义&#xff1a…

eq,neq,gt,lt等表达式缩写

eq,neq,gt,lt等表达式缩写 eq 等于neq 不等于gt 大于egt 大于等于lt 小于elt 小于等于like LIKEbetween BETWEEN

asp.net core源码飘香:Configuration组件

简介&#xff1a; 这是一个基础组件&#xff0c;是一个统一的配置模型&#xff0c;配置可以来源于配置文件&#xff08;json文件&#xff0c;xml文件&#xff0c;ini文件&#xff09;&#xff0c;内存对象&#xff0c;命令行参数&#xff0c;系统的环境变量又或者是你自己扩展的…

计算机图形橡皮筋实验报告,弹性或橡皮筋技术

橡皮筋是一种在计算机屏幕上绘制线, 折线, 矩形, 圆形和椭圆形等几何图元的流行技术。它已成为图形用户界面(GUI)不可或缺的一部分, 并成为事实上的标准, 并且几乎被所有基于Windows的应用程序普遍接受。用户通过定位其两个端点以通常的方式指定该线。当我们从第一个端点移动到…

2017蓝桥杯省赛---java---C---9(青蛙跳杯子)

题目描述 题目描述 X星球的流行宠物是青蛙&#xff0c;一般有两种颜色&#xff1a;白色和黑色。X星球的居民喜欢把它们放在一排茶杯里&#xff0c;这样可以观察它们跳来跳去。如下图&#xff0c;有一排杯子&#xff0c;左边的一个是空着的&#xff0c;右边的杯子&#xff0c;每…

青客宝团队redis内部分享ppt

Redis&#xff1a;最好的缓存数据库 说Redis是缓存服务&#xff0c;估计有些人会不开心&#xff0c;因为Redis也可以把数据库持久化&#xff0c;但是在大多数情况Redis的竞争力是提供缓存服务。说到缓存服务必然会想到Memcached&#xff0c;因为几年前Memcached是最流行的缓存服…

你知道面试官是如何刷人的吗

转载自 你知道面试官是如何刷人的吗 对于一个公司来说&#xff0c;执行招聘面试事宜是一个耗时耗钱的项目&#xff0c;从顾问公司和人才中介挑选出合适的简历之后&#xff0c;还要花更多的时间找出合适的候选人。有的时候这些机构会向你保证这些人都是 Java 天才、SQL 专家、…

jQuery动画与事件概念以及语法

一、鼠标单击事件&#xff1a; 语法&#xff1a;KaTeX parse error: Expected EOF, got # at position 3: ("#̲div1").click(fu…("#div1").mouseover(function(){//代码}); 三、鼠标移出事件&#xff1a; 语法&#xff1a;KaTeX parse error: Expected E…

spark submit参数及调优

spark submit参数及调优 原文地址 spark submit参数介绍 你可以通过spark-submit --help或者spark-shell --help来查看这些参数。 使用格式: ./bin/spark-submit \ --class <main-class> \ --master <master-url> \ --deploy-mode <deploy-mode> \ …

2020蓝桥杯省赛---java---A---4(七段码)

题目描述 思路分析 代码实现 package lanqiao;public class Main {public static int N10;public static int e[][]new int[N][N];//存储二极管相邻的信息public static int f[]new int[N];//并查集public static int ans0;public static boolean st[]new boolean[N];//true表…

小米无线路由器服务器用户名和密码忘了,小米路由器管理密码忘记了怎么办?...

问&#xff1a;小米路由器管理密码忘记了怎么办&#xff1f;我想修改小米路由器上的配置&#xff0c;在打开miwifi.com的时候&#xff0c;提示需要输入管理密码。但是&#xff0c;我不知道管理密码是多少&#xff0c;忘记了管理密码应该怎么办&#xff1f;答&#xff1a;首先说…

asp.net core源码飘香:Options组件

简介&#xff1a; Options组件是一个小组件&#xff0c;但用的地方很多。它本质是将一个POCO类注册到容器中&#xff08;主要在Startup中作为其他组件的配置功能提供&#xff09;&#xff0c;后续使用的时候就可以通过比如构造函数注入等获取到POCO对象。如果只是为了注入一个P…

DevOps面试问题

转载自 DevOps面试问题 DevOps是一组过程、方法与系统的统称&#xff0c;用于促进开发&#xff08;应用程序/软件工程&#xff09;、技术运营和质量保障&#xff08;QA&#xff09;部门之间的沟通、协作与整合。下面为大家分享DevOps系列的面试问题 持续整合问题 问题一&a…

第五章循环结构(一)

一、while循环&#xff1a; 1.语法&#xff1a;while(循环条件){ //循环操作 } 2.循环条件是一个bolean类型的结果。 3.特点&#xff1a;先判断&#xff0c;后执行&#xff01; 二、程序调试&#xff1a; 1.步骤&#xff1a; 设置断点 单步运行 观察变量 三、 1.i2 等价于ii2; …

bui ajax,BUI 数据交互

BUI里面有3种数据交互.数据请求bui.ajax(option)数据请求 bui.ajax API 数据请求的跨域处理,请查看调试章节.参数: option 是一个对象option.urlType: stringDetail: url地址option.dataType: objectDetail: 请求的参数,默认:{}option.methodType: stringDetail: 默认: GET示例…

SparkSQL性能优化

SparkSQL性能优化 1、设置shuffle过程中的并行度,可以通过spark.sql.shuffle.partitions设置shuffle并行度。&#xff08;在SQLContext.setConf&#xff08;&#xff09;中设置&#xff09;。 2、Hive数据仓库创建的时候&#xff0c;合理设置数据类型&#xff0c;比如设置成I…