
PySpark 操作函数一览
Created: Sep 14, 2020 10:28 AM Tags: Big Data, PySpark, Python, Spark
Pyspark.sql.functions
from pyspark.sql import functions as F函数使用说明
基本数学函数类
abssin、cos、tan、asin、acos、atan、sinh、cosh、tanhceil、round、floorexp、log、log2、pow、sqrt、cbrtfactorial
特定类型
日期
current_date、current_timestamp、add_months、unix_timestamp
df = spark.createDataFrame([('2015-04-08',)], ['dt']
df.select(F.add_months(df.dt, 1).alias('next_month'))""""
+----------+
|next_month|
+----------+
|2015-05-08|
+----------+
"""add_months、date_add、date_format、date_sub、date_trunc、date_diffdayofmonth、dayofweek、dayofyear、weekofyearhour、last_day、minute、month、months_between、next_day、year
字符
ascii、substring、substring_indexbase64、unbase64decode、encodeexpr、convformat_stringlengthlower、upperreversesize
Binary
bin、bitwiseNOT、hash、md5、sha1、sha2hex、unhex
角度
toDegrees、toRadians、radians
数字
format_number
判断
isnan、isnull
统计计算
avg、corr、count、countDistinct、cume_distgreatest、kurtosis、variancemax、min、mean、rand、randn、rankskewness、sum、sumDistinct
数组处理
flatten、slice、element_at、array_contains、array_distinct、array_except、array_intersect、array_join、array_max、array_min、array_position、array_remove、array_repeat、array_sort、array_union、arrays_overlap、arrays_zip
# 数组列包含元素
df = spark.createDataFrame([(["a", "b", "c"],), ([],)], ['data'])
df.withColumn("array_contains", F.array_contains(df.data, "a")).show()"""
+---------+--------------+
| data|array_contains|
+---------+--------------+
|[a, b, c]| true|
| []| false|
+---------+--------------+
"""数组函数说明
df = spark.createDataFrame([(([1, 2, 3], [2, 3, 4]))], ['vals1', 'vals2'])
df.show()
df_new = df.select(F.arrays_zip(df.vals1, df.vals2).alias('zipped'))
df_new.show()
row = df_new.collect()[0]
row['zipped'][0]['vals1'] == row['zipped'][0][0] == 1"""
+---------+---------+
| vals1| vals2|
+---------+---------+
|[1, 2, 3]|[2, 3, 4]|
+---------+---------++--------------------+
| zipped|
+--------------------+
|[[1, 2], [2, 3], ...|
+--------------------+True
"""列处理
coalesce
df = spark.createDataFrame([(None, None), (1, None), (None, 2)], ("a", "b"))
df.show()
df.withColumn('coalesce', F.coalesce(df.a, df.b)).show()"""
+----+----+
| a| b|
+----+----+
|null|null|
| 1|null|
|null| 2|
+----+----++----+----+--------+
| a| b|coalesce|
+----+----+--------+
|null|null| null|
| 1|null| 1|
|null| 2| 2|
+----+----+--------+
"""array
# 多列数据合并成单独一列数组
df = spark.createDataFrame([('2015-04-08', 1, )], ['dt', 'int'])
df.select(F.array([df.dt, df.int]).alias("arr")).show()"""
+---------------+
| arr|
+---------------+
|[2015-04-08, 1]|
+---------------+
"""concat、concat_ws
df = spark.createDataFrame([('abcd','123')], ['s', 'd'])
df.withColumn('concat', F.concat_ws('-', df.s, df.d).alias('s')).show()
"""
+----+---+--------+
| s| d| concat|
+----+---+--------+
|abcd|123|abcd-123|
+----+---+--------+
"""col、column、lit
df = spark.createDataFrame([(11, 12), (21, 22), (31, 32)], ("a", "b"))
df.withColumn('a+100', F.col('a') + F.lit(100)).show()"""
+---+---+-----+
| a| b|a+100|
+---+---+-----+
| 11| 12| 111|
| 21| 22| 121|
| 31| 32| 131|
+---+---+-----+
"""explode、explode_outer、posexplode、posexplode_outer
# 将 array/map 展开成新 dataframe 的行df = spark.createDataFrame([(1,2,3)], ArrayType(IntegerType()))
df.select(F.explode(df.value).alias("int")).show()"""
+---+
|int|
+---+
| 1|
| 2|
| 3|
+---+
"""df = spark.createDataFrame([({'a': 1, 'b': 2})], MapType(StringType(), IntegerType()))
df.select(F.explode(df.value).alias('key', 'value')).show()"""
+---+-----+
|key|value|
+---+-----+
| a| 1|
| b| 2|
+---+-----+
"""from_csv、from_json、get_json_object
# 从 json string 提取对应的字段并扩展成列
import json
data = {'a': 1, 'b': [1,2,3]}
data_s = json.dumps(data)schema = StructType([StructField('a', IntegerType(), True),StructField('b', ArrayType(IntegerType()), True)
])df= spark.createDataFrame([(data_s)], schema=StringType())
df.show()"""
+--------------------+
| value|
+--------------------+
|{"a": 1, "b": [1,...|
+--------------------+
"""df_from_json = df.withColumn('json', F.from_json(df.value, schema=schema))df_from_json.select(df_from_json.value, df_from_json.json.a.alias('value.a'), df_from_json.json.b.alias('value.b')
).show()"""
+--------------------+-------+---------+
| value|value.a| value.b|
+--------------------+-------+---------+
|{"a": 1, "b": [1,...| 1|[1, 2, 3]|
+--------------------+-------+---------+
"""
data = [("1", '''{"f1": "value1", "f2": "value2"}'''), ("2", '''{"f1": "value12"}''')]
df = spark.createDataFrame(data, ("key", "jstring"))
df.select(df.key, F.get_json_object(df.jstring, '$.f1').alias("c0"), F.get_json_object(df.jstring, '$.f2').alias("c1") ).show()"""
---+-------+------+
|key| c0| c1|
+---+-------+------+
| 1| value1|value2|
| 2|value12| null|
+---+-------+------+
"""create_map、map_from_arrays、map_from_entries、map_concat、map_keys、map_values、map_entries
df = spark.createDataFrame([([2, 5], ['a', 'b'])], ['k', 'v'])
df.select(F.map_from_arrays(df.k, df.v).alias("map")).show()"""
+----------------+
| map|
+----------------+
|[2 -> a, 5 -> b]|
+----------------+
"""regexp_extract、regexp_replace
# 正则提取与正则替换
df = spark.createDataFrame([('100-200',)], ['str'])
df.select('str', F.regexp_extract('str', r'(d+)-(d+)', 1).alias('first'),F.regexp_replace('str', r'(d+)-(d+)', "$2-$1").alias('swap'),
).show()"""
+-------+-----+-------+
| str|first| swap|
+-------+-----+-------+
|100-200| 100|200-100|
+-------+-----+-------+
"""udf
Pyspark.sql.types
Base 类型
DataType
基本类型
NullTypeStringTypeBinaryTypeBooleanTypeDateTypeTimestampTypeDecimalTypeDoubleTypeFloatTypeByteTypeIntegerTypeLongTypeShortType
叠加类型
ArrayType
df = spark.createDataFrame([([1,2,3])], schema=ArrayType(IntegerType()) )
df.show()
# +---------+
# | value|
# +---------+
# |[1, 2, 3]|
# +---------+df.collect()[0].value[0]
# 1
# 默认的 column name 为 valueMapType
df = spark.createDataFrame([({'a': 1, 'b': 2})], schema=MapType(StringType(), IntegerType()) )
df.show()
# +----------------+
# | value|
# +----------------+
# |[a -> 1, b -> 2]|
# +----------------+df.collect()[0]['value']['a']
# 1StructField
# 需要搭配 StructType 使用
StructField('column_name', DataType, is_nullable)StructType
df = spark.createDataFrame([(1,)], schema=StructType([StructField('col', IntegerType())]))
df.show()
"""
+---+
|col|
+---+
| 1|
+---+
"""# 复杂一些的情况
df = spark.createDataFrame([({'a': [2,3,4]},)], schema=StructType([StructField('col', MapType(StringType(), ArrayType(IntegerType())))]))
df.show()
"""
+----------------+
| col|
+----------------+
|[a -> [2, 3, 4]]|
+----------------+
"""df.collect()[0]['col']['a'][0]
# 2