Spark,数据提取和保存

以下是使用 Spark 进行数据提取(读取)和保存(写入)的常见场景及代码示例(基于 Scala/Java/Python,不含图片操作):
 
一、数据提取(读取)
 
1. 读取文件数据(文本/CSV/JSON/Parquet 等)
 
Scala
 
scala   
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("Data Read")
  .getOrCreate()

// 读取 CSV(含表头)
val csvDf = spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true") // 自动推断数据类型
  .load("path/to/csv/file.csv")

// 读取 JSON
val jsonDf = spark.read.json("path/to/json/file.json")

// 读取 Parquet(Spark 原生格式,高效)
val parquetDf = spark.read.parquet("path/to/parquet/dir")
 
 
Python
 
python   
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Data Read").getOrCreate()

# 读取 CSV
csv_df = spark.read.csv("path/to/csv/file.csv", header=True, inferSchema=True)

# 读取 JSON
json_df = spark.read.json("path/to/json/file.json")

# 读取 Parquet
parquet_df = spark.read.parquet("path/to/parquet/dir")
 
 
2. 读取数据库数据(如 MySQL/Hive)
 
Scala(以 MySQL 为例)
 
scala   
val jdbcDf = spark.read.format("jdbc")
  .option("url", "jdbc:mysql://host:port/db?useSSL=false")
  .option("dbtable", "table_name")
  .option("user", "username")
  .option("password", "password")
  .load()
 
 
Python(以 Hive 为例,需启用 Hive 支持)
 
python   
# 读取 Hive 表(需在 SparkSession 中启用 Hive)
hive_df = spark.sql("SELECT * FROM hive_table")
 
 
二、数据保存(写入)
 
1. 保存为文件(CSV/JSON/Parquet 等)
 
Scala
 
scala   
// 保存为 CSV(覆盖模式,含表头)
csvDf.write.format("csv")
  .option("header", "true")
  .mode("overwrite") // 模式:overwrite/append/ignore/errorIfExists
  .save("output/csv_result")

// 保存为 Parquet(分区存储,提升查询性能)
parquetDf.write.partitionBy("category") // 按字段分区
  .mode("append")
  .parquet("output/parquet_result")
 
 
Python
 
python   
# 保存为 JSON
json_df.write.json("output/json_result", mode="overwrite")

# 保存为 Parquet(指定压缩格式)
parquet_df.write.parquet("output/parquet_result", compression="snappy")
 
 
2. 保存到数据库(如 MySQL/Hive)
 
Scala(以 MySQL 为例)
 
scala   
jdbcDf.write.format("jdbc")
  .option("url", "jdbc:mysql://host:port/db?useSSL=false")
  .option("dbtable", "target_table")
  .option("user", "username")
  .option("password", "password")
  .mode("append") // 追加模式
  .save()
 
 
Python(以 Hive 为例)
 
python   
# 保存为 Hive 表(需启用 Hive 支持)
hive_df.write.saveAsTable("hive_target_table", mode="overwrite")
 
 
三、关键参数说明
 
1. 读取模式(文件)
 
-  inferSchema : 是否自动推断数据类型(适用于 CSV/JSON,需读取少量数据,影响性能)。
 
-  header : CSV 是否包含表头( true/false )。
 
2. 写入模式( mode )
 
-  overwrite : 覆盖已有数据。
 
-  append : 追加到现有数据。
 
-  ignore : 忽略写入(不报错)。
 
-  errorIfExists : 存在则报错(默认)。
 
3. 数据库连接
 
- 需添加对应数据库驱动(如 MySQL 的  mysql-connector-java )。
 
- 对于大规模数据,建议使用分区并行写入(如  option("numPartitions", "4") )。
 
四、典型场景示例
 
场景:从 MySQL 读取数据,清洗后保存为 Parquet
 
scala   
// 读取 MySQL 数据
val mysqlDf = spark.read.jdbc(
  url = "jdbc:mysql://host:port/source_db",
  dbtable = "source_table",
  properties = Map("user" -> "u", "password" -> "p")
)

// 数据清洗(示例:过滤空值)
val cleanedDf = mysqlDf.na.drop("any")

// 保存为 Parquet(按日期分区)
cleanedDf.write.partitionBy("date")
  .parquet("output/cleaned_data")
 
 
通过以上方法,可灵活使用 Spark 完成数据提取和保存任务,支持多种数据源和格式。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/906070.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何用mockito+junit测试代码

Mockito 是一个流行的 Java 模拟测试框架,用于创建和管理测试中的模拟对象(mock objects)。它可以帮助开发者编写干净、可维护的单元测试,特别是在需要隔离被测组件与其他依赖项时。 目录 核心概念 1. 模拟对象(Mock Objects) 2. 打桩(Stubbing) 3. 验…

最新缺陷检测模型:EPSC-YOLO(YOLOV9改进)

目录 引言:工业缺陷检测的挑战与突破 一、EPSC-YOLO整体架构解析 二、核心模块技术解析 1. EMA多尺度注意力模块:让模型"看得更全面" 2. PyConv金字塔卷积:多尺度特征提取利器 3. CISBA模块:通道-空间注意力再进化 4. Soft-NMS:更智能的重叠框处理 三、实…

【Linux网络与网络编程】12.NAT技术内网穿透代理服务

1. NAT技术 之前我们说到过 IPv4 协议中IP 地址数量不充足的问题可以使用 NAT 技术来解决。还提到过本地主机向公网中的一个服务器发起了一个网络请求,服务器是怎么将应答返回到该本地主机呢?(如何进行内网转发?) 这就…

uniapp的适配方式

文章目录 前言✅ 一、核心适配方式对比📏 二、rpx 单位:uni-app 的核心适配机制🧱 三、默认设计稿适配(750宽)🔁 四、字体 & 屏幕密度适配🛠 五、特殊平台适配(底部安全区、刘海…

JAVA EE(进阶)_进阶的开端

别放弃浸透泪水的昨天,晨光已为明天掀开新篇 ——陳長生. ❀主页:陳長生.-CSDN博客❀ 📕上一篇:JAVA EE_HTTP-CSDN博客 1.什么是Java EE Java EE(Java Pla…

SQL脚本规范

主要作用:数据库的备份和迁移 SQL脚本规范 每一个sql语句必须与;结束 脚本结构: { 删库,建库 删表,建表 插入初始数据 } 建库语法: CREATE DATABASE 数据库名CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; CHARA…

std::ratio<1,1000> 是什么意思?

author: hjjdebug date: 2025年 05月 14日 星期三 09:45:24 CST description: std::ratio<1,1000> 是什么意思&#xff1f; 文章目录 1. 它是一种数值吗&#xff1f;2. 它是一种类型吗&#xff1f;3. std:ratio 是什么呢&#xff1f;4. 分析一个展开后的模板函数5.小结: …

测试--测试分类 (白盒 黑盒 单元 集成)

一、按照测试目标分类&#xff08;测试目的是什么&#xff09; 主类别细分说明1. 界面测试UI内容完整性、一致性、准确性、友好性&#xff0c;布局排版合理性&#xff0c;控件可用性等2. 功能测试检查软件功能是否符合需求说明书&#xff0c;常用黑盒方法&#xff1a;边界值、…

整理了 2009 - 2025 年的【199 管综真题 + 解析】PDF,全套共 34 份文件

每年真题原卷 ✅ 每年详细解析 ✅ &#x1f4c2;【管综真题 2009-2025】 &#x1f4c2;【管综解析 2009-2025】 目录树&#xff1a; ├── 2009-2025管综真题 PDF │ ├── 2009年199管综真题.pdf │ ├── 2010年199管综真题.pdf │ ├── 2011年199管综真题.pd…

用golang实现二叉搜索树(BST)

目录 一、概念、性质二、二叉搜索树的实现1. 结构2. 查找3. 插入4. 删除5. 中序遍历 中序前驱/后继结点 一、概念、性质 二叉搜索树&#xff08;Binary Search Tree&#xff09;&#xff0c;简写BST&#xff0c;又称为二叉查找树 它满足&#xff1a; 空树是一颗二叉搜索树对…

自动化:批量文件重命名

自动化&#xff1a;批量文件重命名 1、前言 2、效果图 3、源码 一、前言 今天来分享一款好玩的自动化脚&#xff1a;批量文件重命名 有时候呢&#xff0c;你的文件被下载下来文件名都是乱七八糟毫无规律&#xff0c;但是当时你下载的时候没办法重名或者你又不想另存为重新重…

VueUse/Core:提升Vue开发效率的实用工具库

文章目录 引言什么是VueUse/Core&#xff1f;为什么选择VueUse/Core&#xff1f;核心功能详解1. 状态管理2. 元素操作3. 实用工具函数4. 浏览器API封装5. 传感器相关 实战示例&#xff1a;构建一个拖拽上传组件性能优化技巧与原生实现对比常见问题解答总结 引言 在现代前端开发…

stm32 ADC单通道转换

stm32c8t6仅有12位分辨率 1、单次转换 非扫描 1、初始化 void Ad_Init() {RCC_APB2PeriphClockCmd(RCC_APB2Periph_GPIOA, ENABLE);RCC_APB2PeriphClockCmd(RCC_APB2Periph_ADC1, ENABLE);//配置ADCCLK时钟分频,ADC的输入时钟不得超过14MHzRCC_ADCCLKConfig(RCC_PCLK2_Div6);G…

2KW压缩机驱动参考设计【SCH篇】

实物展示&#xff1a; ACDC: VAC和VAC-为交流电压检测&#xff1a; 1.C33 C34作为Y电容走线宽度要求&#xff1a; Y电容一般用于L/N到地之间&#xff08;L-PE 或 N-PE&#xff09;&#xff0c;主要作用是抑制共模干扰。其走线的电流非常小&#xff0c;推荐使用 ≥ 1mm 宽的走…

python05——循环结构

1、while循环 n0 #初始条件 while n<5: #判断print(hello python) #要重复执行的代码print(n) #注意同级代码缩进相同n1 #计数器结果&#xff1a; hello python 0 hello python 1 hello python 2 hello python 3 hello python 4 hello python 5 #求阶乘和 sum0 n1 whil…

LINUX编译、运行、测试lowcoder_CN

参考 二者没有太大差异。 LINUX编译、运行、测试lowcoder-CSDN博客 下载 git clone https://github.com/mousheng/lowcoder_CN 或 git clone https://gitcode.com/gh_mirrors/lo/lowcoder_CNcd lowcoder_CN三个模块 node-service api-service client 每个模块都有自己的…

Python 基础之函数命名

几个问题 使用描述性蛇形命名法&#xff08;snake_case&#xff09;Python函数名应使用什么大小写格式&#xff1f;为什么函数名要具有描述性&#xff1f;方法的命名规范是什么&#xff1f;函数、变量和类的命名有何区别&#xff1f; Python函数的命名有一些不可违背的硬性规…

redis 命令大全整理

http://doc.redisfans.com/ 原网址 Redis 命令分类 Key(键) Key(键)命令 exists/del/keys/type/scanobject/move/dump/migratettl/pttl/persist/expireat/pexpireat/expire/pexpirerename/renamenxsort/randomkey/restoreexists 语法:exists key [key ...] 检查一个或多…

React中useDeferredValue与useTransition终极对比。

文章目录 前言一、核心差异对比二、代码示例对比1. useDeferredValue&#xff1a;延迟搜索结果更新2. useTransition&#xff1a;延迟路由切换 三、应用场景总结四、注意事项五、原理剖析1. 核心机制对比2. 关键差异3. 代码实现原理 总结 前言 在React的并发模式下&#xff0c…

高并发内存池|定长内存池的设计

二、定长内存池的设计 设计一个定长的内存池&#xff0c;这个内存池的定长在于&#xff0c;当剩余空间使用完毕后&#xff0c;总是开辟相同长度的新空间来使用。我们会使用到一个指针来切割划分大空间为小空间。大空间是内存池向系统申请的内存大小&#xff0c;而小空间是程序…