Spark SQL UDF2的使用
继续之前的UDF1进行说明:
UDF1博客地址点击打开链接
与UDF1的区别在于两个参数:
需求: 获取文本中的两个数字 计算每行中数字的和
文本:
1,1
2,2
3,3
4,4
5,5
6,6
7,7
8,8
9,9
10,10
2,2
3,3
4,4
5,5
6,6
7,7
8,8
9,9
10,10
代码:
package com.bynear.spark_sql; import java.util.ArrayList; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.api.java.UDF2; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; /** * 张建设 * 2018/4/27 * 15:41 */ public class JavaUDF2 {public static void main(String[] args) {SparkConf conf = new SparkConf().setAppName("JavaUDF2").setMaster("local"); JavaSparkContext jsc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(jsc.sc()); JavaRDD<String> numRDD = jsc.textFile("//Desktop//Spark//JavaUDF2.txt"); JavaRDD<Row> numRowRDD = numRDD.map(new Function<String, Row>() {@Overridepublic Row call(String line) throws Exception {String[] split = line.split(","); return RowFactory.create(Integer.valueOf(split[0]), Integer.valueOf(split[1])); }}); ArrayList<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("num1", DataTypes.IntegerType, true)); fields.add(DataTypes.createStructField("num2", DataTypes.IntegerType, true)); StructType structType = DataTypes.createStructType(fields); DataFrame numDF = sqlContext.createDataFrame(numRowRDD, structType); numDF.registerTempTable("table_num"); sqlContext.udf().register("addSum", new UDF2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer num1, Integer num2) throws Exception {return num1 + num2; }}, DataTypes.IntegerType); sqlContext.sql("select num1,num2 ,addSum(num1,num2) as Sum from table_num").show(); System.out.println("第一次成功了"); Row[] rows = sqlContext.sql("select num1,num2 ,addSum(num1,num2) as Sum from table_num").collect(); for (Row row : rows) {System.out.println(row); }System.out.println("第二次成功了"); jsc.stop(); } }
结果:
+----+----+---+
|num1|num2|Sum|
+----+----+---+
| 1| 1| 2|
| 2| 2| 4|
| 3| 3| 6|
| 4| 4| 8|
| 5| 5| 10|
| 6| 6| 12|
| 7| 7| 14|
| 8| 8| 16|
| 9| 9| 18|
| 10| 10| 20|
+----+----+---+
|num1|num2|Sum|
+----+----+---+
| 1| 1| 2|
| 2| 2| 4|
| 3| 3| 6|
| 4| 4| 8|
| 5| 5| 10|
| 6| 6| 12|
| 7| 7| 14|
| 8| 8| 16|
| 9| 9| 18|
| 10| 10| 20|
+----+----+---+
第一次成功了
[1,1,2]
[2,2,4]
[3,3,6]
[4,4,8]
[5,5,10]
[6,6,12]
[7,7,14]
[8,8,16]
[9,9,18]
[10,10,20]
第二次成功了
[2,2,4]
[3,3,6]
[4,4,8]
[5,5,10]
[6,6,12]
[7,7,14]
[8,8,16]
[9,9,18]
[10,10,20]
第二次成功了