DataFrame相关的API

目录

DataFrame的操作方案

 SQL相关的API

创建一个视图/表

 DSL相关的API

DSL的传递方式

 SQL的函数库

Spark SQL的综合应用

直接基于DataFrame来处理

SQL方式

 DSL方式

 基于RDD转换DataFrame的方式


DataFrame的操作方案

        操作DataFrame一般有两种操作方案:一种为DSL方式,一种为SQL方式

SQL方式:通过编写SQL语句完成统计分析操作

DSL操作:特定领域语言,使用DataFrame特有的API完成计算,也就是代码形式

从使用角度来说:SQL更加方便一些,当适应了DSL写法后,会发现DSL比SQL更好用

从Soark角度来说:推荐使用DSL方案,更有利于Spark底层优化处理

 SQL相关的API

创建一个视图/表

df.createTempview('视图名称'):创建一个临时的视图(表名)

df.createorReplaceTempview('视图名称'):创建一个临时的视图(表名),如果视图存在,直接替换

临时视图:仅能在当前这个spark session的会话中使用

df.createGlobalTempview('视图名称'):创建一个全局视图,运行在一个Spark应用中,多个spark会话中读可以使用,使用的时候必须通过global_temp.视图名称方式才可以加载到,较少使用

执行SQL语句:

        spark.sql('书写SQL')

 DSL相关的API

show():用于展示DF中数据,默认仅展示前20行

             参数1:设置默认展示多少行,默认为20

               参数2:是否为阶段列,默认仅展示前20个字符数据,如果过长,不展示(一般不设置)

printSchema():用于打印当前这个DF的表结构信息

select():类似于SQL中的select,SQL中的select后面可以些什么,这里也一样

filter()和where():用于对数据进行过滤操作,一般在SparkSQL中只要使用where

groupBy():用于执行分组操作

orderBy():用于执行排序操作

DSL的传递方式

DSL主要支持一下几种传递的方式: str | column对象 | 列表

        str格式: '字段'

        column对象:

                              DataFrame含有的字段  df['字段']

                               执行过程产生:F.col('字段')

        列表:

                ['字段1','字段2'...]

                [df['字段1'],df['字段2']]

 SQL的函数库

为了能够支持在编写Spark SQL的DSL时候,在DSL中使用SQL函数,专门提供一个SQL的函数库。直接加载使用即可

导入这个函数库:import pyspark.sql.functions as F

通过F调用对应的函数即可

SparkSQL中所支持的函数,都可以通过以下地址查询到:
https://spark.apache.org/docs/3.1.2/api/sql/index.html

Spark SQL的综合应用

world count 案例

已知HDFS又一个words.txt的文件,words.txt文件的内容如下:

hadoop hive hadoop sqoop hive
sqoop hadoop zookeeper hive hue
hue sqoop hue zookeeper hive
spark oozie spark hadoop oozie
hive oozie spark hadoop

直接基于DataFrame来处理

SQL方式

SQL方式一:子查询

SQL方式二:侧视图

炸裂函数配合侧视图使用如下:

        格式:select 原表别名.字段名,侧视图名.字段名 from 原表 原表别名 lateral view explode(要炸开的字段) 侧视图名 as 字段名

# 直接基于DataFrame来处理
# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
import pyspark.sql.functions as F# 绑定指定的python解释器
"""
1.2 直接基于DataFrame来处理需求分析:1- 将每行内容切分得到单个的单词2- 组织DataFrame的数据结构2.1- 有两列。一列是单词,一列是次数
2.2- 只有一列。单词
"""os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':print('直接基于DataFrame来处理')spark = SparkSession \.builder \.appName('dataFrame_world_count_demo') \.master('local[*]') \.getOrCreate()# 数据输入# text方式读取hdfs上的文件init_df = spark.read.text(paths='hdfs://node1:8020/source/word.txt')# # 查看数据# init_df.show()# # 打印dataframe表结构信息# init_df.printSchema()# 创建临时视图init_df.createTempView('words')# 数据处理"""sparksql方式处理数据-子查询1.先切分每一行的数据2.使用炸裂函数获得一个word单词列3.使用子查询方式聚合统计每个单词出现的次数"""spark.sql("""select word,count(*) as cnt from (select explode(split(value,' ')) as word from words)group by word order by cnt desc""").show()"""sparksql方式处理数据-侧视图1.先切分每一行的数据2.使用炸裂函数获得一个word单词列3.使用侧视图方式聚合统计每个单词出现的次数炸裂函数配合侧视图使用如下:格式:select 原表别名.字段名,侧视图名.字段名 from 原表 原表别名 lateral view explode(要炸开的字段)侧视图名 as 字段名"""spark.sql("""select word,count(*) as cntfrom words w lateral view explode(split(value,' ')) t as wordgroup by word order by cnt desc""").show()

 DSL方式

DSL方式总结:
            withColumnRenamed(参数1,参数2):给字段重命名操作。参数1是旧字段名,参数2是新字段名
            agg():推荐使用,更加通用。执行聚合操作。如果有多个聚合,聚合之间使用逗号分隔即可
            withColumn(参数1,参数2):用来产生新列。参数1是新列的名称;参数2是新列数据的来源

 

# 直接基于DataFrame来处理
# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
import pyspark.sql.functions as F# 绑定指定的python解释器
"""
1.2 直接基于DataFrame来处理需求分析:1- 将每行内容切分得到单个的单词2- 组织DataFrame的数据结构2.1- 有两列。一列是单词,一列是次数
"""os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':print('直接基于DataFrame来处理')spark = SparkSession \.builder \.appName('dataFrame_world_count_demo') \.master('local[*]') \.getOrCreate()# 数据输入# text方式读取hdfs上的文件init_df = spark.read.text(paths='hdfs://node1:8020/source/word.txt')# # 查看数据# init_df.show()# # 打印dataframe表结构信息# init_df.printSchema()# 创建临时视图init_df.createTempView('words')# 数据处理"""DSL方式处理数据-方式一1.先切分每一行的数据2.使用炸裂函数获得一个word单词列3.调用API聚合统计单词个数再排序"""init_df.select(F.explode(F.split('value', ' ')).alias('word')).groupBy('word').count().orderBy('count', ascending=False).show()"""DSL方式处理数据-方式二1.先切分每一行的数据2.使用炸裂函数获得一个word单词列3.调用API聚合统计单词个数再排序4.agg():推荐使用,更加通用。执行聚合操作。如果有多个聚合,聚合之间使用逗号分隔即可"""init_df.select(F.explode(F.split('value', ' ')).alias('word')).groupBy('word').agg(F.count('word').alias('cnt'),F.max('word').alias('max_word'),F.min('word').alias('min_word'),).orderBy('cnt', ascending=False).show()"""DSL方式处理数据-方式三withColumnRenamed(参数1,参数2):给字段重命名操作。参数1是旧字段名,参数2是新字段名withColumn(参数1,参数2):用来产生新列。参数1是新列的名称;参数2是新列数据的来源"""init_df.withColumn('word',F.explode(F.split('value', ' '))).groupBy('word').agg(F.count('word').alias('cnt'),F.max('word').alias('max_word'),F.min('word').alias('min_word')).orderBy('cnt', ascending=False).show()# 数据输出# 是否资源spark.stop()

 基于RDD转换DataFrame的方式

# 直接基于DataFrame来处理
# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
import pyspark.sql.functions as F# 绑定指定的python解释器
"""
基于RDD转换DataFrame的方式需求分析:1- 将每行内容切分得到单个的单词2- 组织DataFrame的数据结构2.1- 有两列。一列是单词,一列是次数
"""os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':print('直接基于DataFrame来处理')# 创建SparkSession对象spark = SparkSession \.builder \.appName('dataFrame_world_count_demo') \.master('local[*]') \.getOrCreate()# 创建sparkContext顶级对象sc = spark.sparkContext# 数据输入# text方式读取hdfs上的文件init_rdd = sc.textFile('hdfs://node1:8020/source/word.txt')# RDD数据结构转化为二维数据map_rdd = init_rdd.flatMap(lambda line: line.split()).map(lambda word: (word,))# 查看数据# print(map_rdd.collect())# 通过Rdd构建DataFrameschema = StructType([StructField("value", StringType(), True)])init_df = spark.createDataFrame(data=map_rdd, schema=schema)# 打印dataframe表结构信息# init_df.show()# init_df.printSchema()# 创建临时视图init_df.createTempView('words')# 数据处理"""sparksql方式处理数据"""spark.sql("""select value as word,count(*) as cntfrom words group by value order by cnt desc""").show()print('=' * 50)"""DSL方式处理数据"""init_df.withColumn('word',init_df.value).groupBy('word').agg(F.count('word').alias('cnt'),F.max('word').alias('max_word'),F.min('word').alias('min_word'),).orderBy('cnt', ascending=False).show()# 释放资源spark.stop()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/608861.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

关于24年信息系统项目管理师论文如何提升?

信息系统项目管理师论文满分是75分,45分及以上为及格,论文评分可分为优良、及格与不及格3个档次。 评分的分数可分为: (1)60分至75分优良(相当于百分制80分至100分)。 (2&#xf…

<设计模式> 七大原则

设计模式七大原则 开放封闭原则:对扩展开放,对修改关闭。这意味着在设计模式中,我们应该尽可能地将代码的扩展和修改分开处理,对于可以通过扩展来实现的功能,我们应该选择扩展代码,而对于必须修改现有代码…

数模学习day09-cftool使用

老版本的MATLAB可以在命令行使用cftool打开,2017a的版本可以直接找到。 x和y在你的工作区中需要已经存在,然后打开该工具箱就可以看见。 选择X和Y xy选择好之后就自动画好了拟合曲线。 Results分析 画好之后结果就呈现在这里了 这里的p1就是拟合系数&…

用React给XXL-JOB开发一个新皮肤(一):环境搭建和项目初始化

目录 一. 简述二. Fork 项目三. 搭建开发环境四. 初始化皮肤项目五. 添加相关依赖六. 预览 一. 简述 大名鼎鼎的 xxl-job 任务调度中心我们应该都使用过,项目地址:xxl-job。它是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单…

【代码随想录】刷题笔记Day48

前言 早上练车去了(好久没有8点前醒了),练科目二两小时下来脚根可真酸啊,希望下周一把过。练完顺带去Apple西湖免费换新了耳机,羊毛爽! 121. 买卖股票的最佳时机 - 力扣(LeetCode)…

JS逆向之无限debugger对抗

文章目录 JS中实现debugger的方法无限Debugger示例Demo1Demo2Demo3Demo4总结 无限Debugger实战 JS中实现debugger的方法 首先,我们要知道,在浏览器实现debugger的方法有哪些 debugger关键词 ,相当于C内联汇编的int3,在代码中嵌入…

网工内推 | 运维工程师,国企、上市公司,RHCE认证优先

01 广东机场白云信息科技股份有限公司 招聘岗位:基础架构运维工程师(中级) 职责描述: 1、参与公司业务系统的监控、巡检、维护、故障定位、原因分析; 2、负责业务系统的上线、升级割接工作; 3、负责服务器…

Unity 踩坑记录 AnyState 切换动画执行两次

AnySate 切换动画 Can Transition To Self 将这个勾选去掉!!!

C++ STL常用函数总结

一些总结&#xff0c;有错误欢迎指正&#xff01;&#xff01;&#xff01; 1 vector #include <cstdio> #include <iostream> #include <algorithm> #include <cstring> #include <vector>using namespace std;int main () {//vectorvector&l…

Python学习之路-Hello Python

Python学习之路-Hello Python Python解释器 简介 前面说到Python是解释型语言&#xff0c;Python解释器的作用就是用于"翻译"Python程序。Python规定了一个Python语法规则&#xff0c;根据该规则可编写Python解释器。 常见的Python解释器 CPython&#xff1a;官方…

文件或目录损坏的磁盘修复方法

文件或目录损坏是一种常见的计算机问题&#xff0c;可能由多种原因导致&#xff0c;如磁盘故障、病毒或恶意软件攻击、文件系统错误等。这些损坏可能导致数据丢失或无法访问文件&#xff0c;因此及时修复至关重要。本文将深入探讨文件或目录损坏的原因&#xff0c;并提供相应的…

试用统信服务器操作系统UOS 20

作者&#xff1a;田逸&#xff08;formyz&#xff09; 试用统信Linux操作系统UOS&#xff0c;想了解一下用已有的Linux经验能否轻松驾驭它。以便在某些场景下&#xff0c;可以多一种选择。本次试验在Proxmox VE 8&#xff08;以下简称PVE 8&#xff09;平台下进行&#xff0c;采…

唠一唠Java线程池

第1章&#xff1a;引言 大家好&#xff0c;我是小黑&#xff0c;咱们今天来聊聊Java线程池&#xff0c;如果没有线程池&#xff0c;每个线程都需要手动创建和销毁线程&#xff0c;那将是多么低效和耗资源啊&#xff01; 线程池的核心作用就是复用已创建的线程&#xff0c;减少…

一个初级测试工程师的经历--我在阿里的两年

工作两年了&#xff0c;我一直希望让自己每年对测试的理解更深入一层。 谈轮了自己对各种测试的理解&#xff0c;这一年来&#xff0c;虽然对那些理概念的有所加强&#xff0c;自我感觉没有什么质的变化。前些天听我们公司的一位测试经理讲《敏捷测试》豁然开朗。他在学造飞机&…

BGP公认必遵属性——Next-hop(一)

BGP公认必遵属性共有三个&#xff0c;分别是&#xff1a;Next-hop、Origin、As-path&#xff0c;本期介绍Next-hop 点赞关注&#xff0c;持续更新&#xff01;&#xff01;&#xff01; Next-hop 华为BGP路由下一跳特点&#xff1a; 默认情况下传给EBGP邻居的BGP路由的下一跳…

【AnyText】文字处理AI,让平面设计如虎添翼 —— 登录和使用:详细指南!

AnyTex 关于Anytext登录魔搭社区魔搭社区链接魔搭社区登录登录完毕 登录AnyText使用Anytext生成带文字的图片手绘拖框随机 编辑图片中的文字 总结 关于Anytext 2024年1月5日&#xff0c;阿里达摩院宣布推出名为“AnyText”的算法&#xff0c;旨在解决AI绘图在处理文字创作时面临…

Python战机

基础版 import pygame import random# 设置游戏屏幕大小 screen_width 480 screen_height 600# 定义颜色 WHITE (255, 255, 255) RED (255, 0, 0) GREEN (0, 255, 0) BLUE (0, 0, 255)# 初始化pygame pygame.init()# 创建游戏窗口 screen pygame.display.set_mode((scre…

java解析json复杂数据的两种思路

文章目录 一、原始需求二、简单分析三、具体实现一1. api接口2. 接口返回3. json 数据解析1.&#xff09;引入Jackson库2.&#xff09;定义实体3.&#xff09;解析json字符串4.&#xff09;运行结果 4. 过程分析 四、具体实现二1. 核心代码2.运行结果 五、方案比较六、源码传送…

render 函数中使用组件等

render: (h) > {// 方案1&#xff1a;return h(el-switch, {props: {value: true},on: {change: (e) > {console.log(e, 改变)}}})// 方案二&#xff1a;const onChange (e) > {console.log(e, 改变)}return (<el-switch value{true} onChange{onChange}></…

c++学习:STL库(框架)+字符串模板类string+vector容器+list链表

目录 stl库 常用组件包括 字符串库 字符串模板类string 头文件 最常用的字符串模板类 字符串类型 模板原型 模板的成员数据类型 模板成员函数 有些函数会有重载&#xff0c;可以去下面网址查看std::basic_string - cppreference.comhttps://zh.cppreference.com/w/cp…