在PySpark中,选择和访问数据是处理Spark DataFrame的基本操作。以下是一些常用的方法来选择和访问DataFrame中的数据。
- 选择列(Selecting Columns):
select: 用于选择DataFrame中的特定列。selectExpr: 用于通过SQL表达式选择列。
df.select('name', 'age') # 选择'name'和'age'列 df.select(df.name, df.age + 10) # 选择'name'列和'age'列加10 df.selectExpr('name', 'age + 10 as age_plus_10') # 使用SQL表达式选择列 - 筛选行(Filtering Rows):
filter: 用于根据指定条件筛选DataFrame中的行。
df.filter(df.age > 30) # 筛选年龄大于30的行 df.filter((df.age > 30) & (df.gender == 'male')) # 筛选年龄大于30且性别为男的行 - 排序数据(Sorting Data):
orderBy: 用于根据指定列排序DataFrame。sort: 与orderBy类似,用于排序DataFrame。
df.orderBy('age', ascending=False) # 按年龄降序排序 df.sort(df.age.desc()) # 按年龄降序排序 - 抽样数据(Sampling Data):
sample: 用于对DataFrame进行随机抽样。
df.sample(0.5, seed=42) # 抽取50%的数据,随机种子为42 - distinct 数据(Distinct Data):
distinct: 用于去除DataFrame中的重复行。
df.distinct() # 去除重复行 - 随机分割数据(Randomly Splitting Data):
randomSplit: 用于将DataFrame随机分割成多个DataFrame。
df.randomSplit([0.7, 0.3], seed=42) # 将数据随机分割为70%和30% - 列操作(Column Operations):
withColumn: 用于添加或替换DataFrame中的列。withColumnRenamed: 用于重命名DataFrame中的列。
df.withColumn('age_plus_10', df.age + 10) # 添加新列'age_plus_10' df.withColumnRenamed('old_name', 'new_name') # 重命名列 - 聚合数据(Aggregating Data):
groupBy: 用于对DataFrame进行分组。agg: 用于对分组后的DataFrame进行聚合操作。
df.groupBy('gender').agg({'age': 'mean'}) # 按性别分组并计算平均年龄 - 窗口函数(Window Functions):
window: 用于创建一个窗口 specification,用于窗口函数的计算。over: 用于指定窗口函数的应用范围。
from pyspark.sql.window import Window windowSpec = Window.partitionBy('gender').orderBy('age') df.withColumn('row_number', row_number().over(windowSpec)) # 计算行号 - 集合操作(Set Operations):
union: 合并两个DataFrame,去除重复行。unionAll: 合并两个DataFrame,不去除重复行。intersect: 获取两个DataFrame的交集。except: 获取两个DataFrame的差集。
df1.union(df2) # 合并df1和df2,去除重复行 df1.unionAll(df2) # 合并df1和df2,不去除重复行 - 访问数据(Accessing Data):
collect: 将DataFrame的数据作为一个Python列表返回。take: 返回DataFrame中的前几行。show: 显示DataFrame的内容。
df.collect() # 返回DataFrame的所有数据 df.take(5) # 返回DataFrame的前5行 df.show() # 显示DataFrame的内容
这些是PySpark中选择和访问数据的一些基本操作。你可以根据需要组合使用这些操作来处理和分析数据。