目录
DataFrame的操作方案
SQL相关的API
创建一个视图/表
DSL相关的API
DSL的传递方式
SQL的函数库
Spark SQL的综合应用
直接基于DataFrame来处理
SQL方式
DSL方式
基于RDD转换DataFrame的方式
DataFrame的操作方案
操作DataFrame一般有两种操作方案:一种为DSL方式,一种为SQL方式
SQL方式:通过编写SQL语句完成统计分析操作
DSL操作:特定领域语言,使用DataFrame特有的API完成计算,也就是代码形式
从使用角度来说:SQL更加方便一些,当适应了DSL写法后,会发现DSL比SQL更好用
从Soark角度来说:推荐使用DSL方案,更有利于Spark底层优化处理
SQL相关的API
创建一个视图/表
df.createTempview('视图名称'):创建一个临时的视图(表名)
df.createorReplaceTempview('视图名称'):创建一个临时的视图(表名),如果视图存在,直接替换
临时视图:仅能在当前这个spark session的会话中使用
df.createGlobalTempview('视图名称'):创建一个全局视图,运行在一个Spark应用中,多个spark会话中读可以使用,使用的时候必须通过global_temp.视图名称方式才可以加载到,较少使用
执行SQL语句:
spark.sql('书写SQL')
DSL相关的API
show():用于展示DF中数据,默认仅展示前20行
参数1:设置默认展示多少行,默认为20
参数2:是否为阶段列,默认仅展示前20个字符数据,如果过长,不展示(一般不设置)
printSchema():用于打印当前这个DF的表结构信息
select():类似于SQL中的select,SQL中的select后面可以些什么,这里也一样
filter()和where():用于对数据进行过滤操作,一般在SparkSQL中只要使用where
groupBy():用于执行分组操作
orderBy():用于执行排序操作
DSL的传递方式
DSL主要支持一下几种传递的方式: str | column对象 | 列表
str格式: '字段'
column对象:
DataFrame含有的字段 df['字段']
执行过程产生:F.col('字段')
列表:
['字段1','字段2'...]
[df['字段1'],df['字段2']]
SQL的函数库
为了能够支持在编写Spark SQL的DSL时候,在DSL中使用SQL函数,专门提供一个SQL的函数库。直接加载使用即可
导入这个函数库:import pyspark.sql.functions as F
通过F调用对应的函数即可
SparkSQL中所支持的函数,都可以通过以下地址查询到:
https://spark.apache.org/docs/3.1.2/api/sql/index.html
Spark SQL的综合应用
world count 案例
已知HDFS又一个words.txt的文件,words.txt文件的内容如下:
hadoop hive hadoop sqoop hive
sqoop hadoop zookeeper hive hue
hue sqoop hue zookeeper hive
spark oozie spark hadoop oozie
hive oozie spark hadoop
直接基于DataFrame来处理
SQL方式
SQL方式一:子查询
SQL方式二:侧视图
炸裂函数配合侧视图使用如下:
格式:select 原表别名.字段名,侧视图名.字段名 from 原表 原表别名 lateral view explode(要炸开的字段) 侧视图名 as 字段名
# 直接基于DataFrame来处理
# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
import pyspark.sql.functions as F# 绑定指定的python解释器
"""
1.2 直接基于DataFrame来处理需求分析:1- 将每行内容切分得到单个的单词2- 组织DataFrame的数据结构2.1- 有两列。一列是单词,一列是次数
2.2- 只有一列。单词
"""os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':print('直接基于DataFrame来处理')spark = SparkSession \.builder \.appName('dataFrame_world_count_demo') \.master('local[*]') \.getOrCreate()# 数据输入# text方式读取hdfs上的文件init_df = spark.read.text(paths='hdfs://node1:8020/source/word.txt')# # 查看数据# init_df.show()# # 打印dataframe表结构信息# init_df.printSchema()# 创建临时视图init_df.createTempView('words')# 数据处理"""sparksql方式处理数据-子查询1.先切分每一行的数据2.使用炸裂函数获得一个word单词列3.使用子查询方式聚合统计每个单词出现的次数"""spark.sql("""select word,count(*) as cnt from (select explode(split(value,' ')) as word from words)group by word order by cnt desc""").show()"""sparksql方式处理数据-侧视图1.先切分每一行的数据2.使用炸裂函数获得一个word单词列3.使用侧视图方式聚合统计每个单词出现的次数炸裂函数配合侧视图使用如下:格式:select 原表别名.字段名,侧视图名.字段名 from 原表 原表别名 lateral view explode(要炸开的字段)侧视图名 as 字段名"""spark.sql("""select word,count(*) as cntfrom words w lateral view explode(split(value,' ')) t as wordgroup by word order by cnt desc""").show()
DSL方式
DSL方式总结:
withColumnRenamed(参数1,参数2):给字段重命名操作。参数1是旧字段名,参数2是新字段名
agg():推荐使用,更加通用。执行聚合操作。如果有多个聚合,聚合之间使用逗号分隔即可
withColumn(参数1,参数2):用来产生新列。参数1是新列的名称;参数2是新列数据的来源
# 直接基于DataFrame来处理
# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
import pyspark.sql.functions as F# 绑定指定的python解释器
"""
1.2 直接基于DataFrame来处理需求分析:1- 将每行内容切分得到单个的单词2- 组织DataFrame的数据结构2.1- 有两列。一列是单词,一列是次数
"""os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':print('直接基于DataFrame来处理')spark = SparkSession \.builder \.appName('dataFrame_world_count_demo') \.master('local[*]') \.getOrCreate()# 数据输入# text方式读取hdfs上的文件init_df = spark.read.text(paths='hdfs://node1:8020/source/word.txt')# # 查看数据# init_df.show()# # 打印dataframe表结构信息# init_df.printSchema()# 创建临时视图init_df.createTempView('words')# 数据处理"""DSL方式处理数据-方式一1.先切分每一行的数据2.使用炸裂函数获得一个word单词列3.调用API聚合统计单词个数再排序"""init_df.select(F.explode(F.split('value', ' ')).alias('word')).groupBy('word').count().orderBy('count', ascending=False).show()"""DSL方式处理数据-方式二1.先切分每一行的数据2.使用炸裂函数获得一个word单词列3.调用API聚合统计单词个数再排序4.agg():推荐使用,更加通用。执行聚合操作。如果有多个聚合,聚合之间使用逗号分隔即可"""init_df.select(F.explode(F.split('value', ' ')).alias('word')).groupBy('word').agg(F.count('word').alias('cnt'),F.max('word').alias('max_word'),F.min('word').alias('min_word'),).orderBy('cnt', ascending=False).show()"""DSL方式处理数据-方式三withColumnRenamed(参数1,参数2):给字段重命名操作。参数1是旧字段名,参数2是新字段名withColumn(参数1,参数2):用来产生新列。参数1是新列的名称;参数2是新列数据的来源"""init_df.withColumn('word',F.explode(F.split('value', ' '))).groupBy('word').agg(F.count('word').alias('cnt'),F.max('word').alias('max_word'),F.min('word').alias('min_word')).orderBy('cnt', ascending=False).show()# 数据输出# 是否资源spark.stop()
基于RDD转换DataFrame的方式
# 直接基于DataFrame来处理
# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
import pyspark.sql.functions as F# 绑定指定的python解释器
"""
基于RDD转换DataFrame的方式需求分析:1- 将每行内容切分得到单个的单词2- 组织DataFrame的数据结构2.1- 有两列。一列是单词,一列是次数
"""os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':print('直接基于DataFrame来处理')# 创建SparkSession对象spark = SparkSession \.builder \.appName('dataFrame_world_count_demo') \.master('local[*]') \.getOrCreate()# 创建sparkContext顶级对象sc = spark.sparkContext# 数据输入# text方式读取hdfs上的文件init_rdd = sc.textFile('hdfs://node1:8020/source/word.txt')# RDD数据结构转化为二维数据map_rdd = init_rdd.flatMap(lambda line: line.split()).map(lambda word: (word,))# 查看数据# print(map_rdd.collect())# 通过Rdd构建DataFrameschema = StructType([StructField("value", StringType(), True)])init_df = spark.createDataFrame(data=map_rdd, schema=schema)# 打印dataframe表结构信息# init_df.show()# init_df.printSchema()# 创建临时视图init_df.createTempView('words')# 数据处理"""sparksql方式处理数据"""spark.sql("""select value as word,count(*) as cntfrom words group by value order by cnt desc""").show()print('=' * 50)"""DSL方式处理数据"""init_df.withColumn('word',init_df.value).groupBy('word').agg(F.count('word').alias('cnt'),F.max('word').alias('max_word'),F.min('word').alias('min_word'),).orderBy('cnt', ascending=False).show()# 释放资源spark.stop()