UDF小白入门
- 创建一个函数(普通的Python函数)将成绩转换到考察等级
在PySpark中,使用UDF涉及有三个步骤:
前置:先创建一个spark dataframe
from pyspark.sql import SparkSession
from pyspark.sql.functions import *spark = SparkSession.builder \.master("spark://localhost:7077") \.appName("pyspark sql demo") \.getOrCreate()# 创建学生成绩DataFrame
studentDF = spark.createDataFrame([("张三", 85),("李四", 90),("王老五", 55)],["name","score"]
)studentDF.printSchema()
studentDF.show()
(1) 第一步是用Python语法创建一个函数并进行测试。
创建一个函数(普通的Python函数)将成绩转换到考察等级
def convertGrade(score):if score > 100:return "作弊"elif score >= 90:return "优秀"elif score >= 80:return "良好"elif score >= 70:return "中等"else:return "不及格"
(2) 第二步是通过将函数名传递给PySpark SQL的udf()函数来注册它。
#注册为一个UDF(在DataFrame API中使用时的注册方法)
convertGradeUDF = udf(convertGrade,StringType())# 或者通过装饰器注册
@udf(StringType())
def convertGrade(score):if score > 100:return "作弊"elif score >= 90:return "优秀"elif score >= 80:return "良好"elif score >= 70:return "中等"else:return "不及格"
(3) 第三步是在DataFrame代码或发出SQL查询时使用UDF。在SQL查询中使用UDF时,注册过程略有不同。
# 使用该UDF将成绩转换为字母等级
studentDF \.withColumn("grade",convertGradeUDF(col("score"))) \.show()
'''