PySPARK带多组参数和标签的SparkSQL批量数据导出到S3的程序

设计一个基于多个带标签SparkSQL模板作为配置文件和多组参数的PySPARK代码程序,实现根据不同的输入参数自动批量地将数据导出为Parquet、CSV和Excel文件到S3上,标签和多个参数(以“_”分割)为组成导出数据文件名,文件已经存在则覆盖原始文件。
代码如下:

import json
from pyspark.sql import SparkSessiondef load_config(config_path):with open(config_path, 'r') as f:return json.load(f)def main(config_path, base_s3_path):# 初始化SparkSession,配置S3和Excel支持spark = SparkSession.builder \.appName("DataExportJob") \.config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.1") \.getOrCreate()# 配置S3访问(根据实际环境配置)spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", "YOUR_ACCESS_KEY")spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "YOUR_SECRET_KEY")spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com")config = load_config(config_path)for template in config['templates']:label = template['label']sql_template = template['sql_template']parameters_list = template['parameters']for params in parameters_list:# 验证参数数量是否匹配placeholders = sql_template.count('{')if len(params) != placeholders:raise ValueError(f"参数数量不匹配,模板需要{placeholders}个参数,但当前参数为{len(params)}个")# 替换SQL中的占位符formatted_sql = sql_template.format(*params)df = spark.sql(formatted_sql)# 生成文件名参数部分param_str = "_".join(params)base_filename = f"{label}_{param_str}"# 定义输出路径output_paths = {'parquet': f"{base_s3_path}/parquet/{base_filename}",'csv': f"{base_s3_path}/csv/{base_filename}",'excel': f"{base_s3_path}/excel/{base_filename}.xlsx"}# 写入Parquetdf.write.mode('overwrite').parquet(output_paths['parquet'])# 写入CSV(自动生成header)df.write.mode('overwrite') \.option("header", "true") \.csv(output_paths['csv'])# 写入Excel(使用spark-excel包)df.write.format("com.crealytics.spark.excel") \.option("header", "true") \.option("inferSchema", "true") \.mode("overwrite") \.save(output_paths['excel'])spark.stop()if __name__ == "__main__":import argparseparser = argparse.ArgumentParser()parser.add_argument('--config', type=str, required=True, help='Path to config JSON file')parser.add_argument('--s3-path', type=str, required=True, help='Base S3 path (e.g., s3a://your-bucket/data)')args = parser.parse_args()main(args.config, args.s3_path)

配置文件示例(config.json)

{"templates": [{"label": "sales_report","sql_template": "SELECT * FROM sales WHERE date = '{0}' AND region = '{1}'","parameters": [["202301", "north"],["202302", "south"]]},{"label": "user_activity","sql_template": "SELECT user_id, COUNT(*) AS cnt FROM activity WHERE day = '{0}' GROUP BY user_id","parameters": [["2023-01-01"],["2023-01-02"]]}]
}

使用说明

  1. 依赖管理

    • 确保Spark集群已安装Hadoop AWS和Spark Excel依赖:
      spark-submit --packages com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.1 your_script.py
      
  2. S3配置

    • 替换代码中的YOUR_ACCESS_KEYYOUR_SECRET_KEY为实际AWS凭证
    • 根据S3兼容存储调整endpoint(如使用MinIO需特殊配置)
  3. 执行命令

    spark-submit --packages com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.1 \
    data_export.py --config config.json --s3-path s3a://your-bucket/exports
    

输出结构

s3a://your-bucket/exports
├── parquet
│   ├── sales_report_202301_north
│   ├── sales_report_202302_south
│   └── user_activity_2023-01-01
├── csv
│   ├── sales_report_202301_north
│   ├── sales_report_202302_south
│   └── user_activity_2023-01-01
└── excel├── sales_report_202301_north.xlsx├── sales_report_202302_south.xlsx└── user_activity_2023-01-01.xlsx

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/69204.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何安装LangChain软件包

前言 LangChain是一个强大的框架,用于构建使用大型语言模型(LLMs)的应用程序。它提供了一系列软件包和工具,帮助开发人员将LLMs集成到他们的工作流程中。然而,由于其模块化设计,LangChain生态系统可能会让…

每日Attention学习19——Convolutional Multi-Focal Attention

每日Attention学习19——Convolutional Multi-Focal Attention 模块出处 [ICLR 25 Submission] [link] UltraLightUNet: Rethinking U-shaped Network with Multi-kernel Lightweight Convolutions for Medical Image Segmentation 模块名称 Convolutional Multi-Focal Atte…

2. K8S集群架构及主机准备

本次集群部署主机分布K8S集群主机配置主机静态IP设置主机名解析ipvs管理工具安装及模块加载主机系统升级主机间免密登录配置主机基础配置完后最好做个快照备份 2台负载均衡器 Haproxy高可用keepalived3台k8s master节点5台工作节点(至少2及以上)本次集群部署主机分布 K8S集群主…

游戏引擎学习第89天

回顾 由于一直没有渲染器,终于决定开始动手做一个渲染器,虽然开始时并不确定该如何进行,但一旦开始做,发现这其实是正确的决定。因此,接下来可能会花一到两周的时间来编写渲染器,甚至可能更长时间&#xf…

nuxt3中使用useFetch请求刷新不返回数据或返回html结构问题解决-完整nuxt3useFetchtch请求封装

前言 如果使用nuxt3写项目,可以查看nuxt3实战:完整的 nuxt3 vue3 项目创建与useFetch请求封装,此篇内容有详细步骤 但在此篇内容中useFetch请求在页面有多个请求的情况下,或者放在客户端渲染情境下是失败的,所以在此篇…

链式结构二叉树(递归暴力美学)

文章目录 1. 链式结构二叉树1.1 二叉树创建 2. 前中后序遍历2.1 遍历规则2.2 代码实现图文理解 3. 结点个数以及高度等二叉树结点个数正确做法: 4. 层序遍历5. 判断是否完全二叉树 1. 链式结构二叉树 完成了顺序结构二叉树的代码实现,可以知道其底层结构…

JS:将JS对象格式化为php语法形式(完美支持无unicode编码匹配的正则)

/*** 格式化Object数据为php语法形式* param {*} obj 任意数据* param {String} spaceLen 缩略符长度:必须在2~65536之间,否则默认为2* return {String} 格式化后的PHP语法字符串*/ function formatToPhp(obj, spaceLen) {formatToPhp function (obj, s…

Kubernetes 中 BGP 与二层网络的较量:究竟孰轻孰重?

如果你曾搭建过Kubernetes集群,就会知道网络配置是一个很容易让人深陷其中的领域。在负载均衡器、服务通告和IP管理之间,你要同时应对许多变动的因素。对于许多配置而言,使用二层(L2)网络就完全能满足需求。但边界网关协议(BGP)—— 支撑互联网运行的技术 —— 也逐渐出…

Linux提权--John碰撞密码提权

​John the Ripper​(简称 John)是一个常用的密码破解工具,可以通过暴力破解、字典攻击、规则攻击等方式,尝试猜解用户密码。密码的弱度是提权攻击中的一个重要因素,如果某个用户的密码非常简单或是默认密码&#xff0…

大数据学习之Spark分布式计算框架RDD、内核进阶

一.RDD 28.RDD_为什么需要RDD 29.RDD_定义 30.RDD_五大特性总述 31.RDD_五大特性1 32.RDD_五大特性2 33.RDD_五大特性3 34.RDD_五大特性4 35.RDD_五大特性5 36.RDD_五大特性总结 37.RDD_创建概述 38.RDD_并行化创建 演示代码: // 获取当前 RDD 的分区数 Since ( …

[创业之路-286]:《产品开发管理-方法.流程.工具 》-1- IPD两个跨职能团队的组织

IPD(集成产品开发)中的两个重要跨职能组织是IPMT(集成产品管理团队)和PDT(产品开发团队)。 在IPD(集成产品开发)体系中,IRB(投资评审委员会)、IPM…

maven详细讲解

学习目标 那什么是mavenmaven概念以及核心思想maven构建的生命周期、阶段以及目标maven仓库有哪些?maven依赖 那什么是maven?maven概念以及核心思想,maven构建的生命周期、阶段以及目标? 那什么是maven Maven是一个项目管理和构建…

DeepSeek 提示词之角色扮演的使用技巧

老六哥的小提示:我们可能不会被AI轻易淘汰,但是会被“会使用AI的人”淘汰。 在DeepSeek的官方提示库中,有“角色扮演(自定义人设)”的提示词案例。截图如下: 在“角色扮演”的提示词案例中,其实…

第二个Qt开发实例:在Qt中利用GPIO子系统和sysfs伪文件系统实现按钮(Push Button)点击控制GPIO口(效果为LED2灯的灭和亮)

引言 本文承接博文 https://blog.csdn.net/wenhao_ir/article/details/145420998 里的代码,在那里面代码的基础上添加上利用sysfs伪文件系统实现按钮(Push Button)点击控制GPIO口的代码,进而实现LED2灯的灭和亮。 最终的效果是点击下面的LED按钮实现LED…

登山第十七梯:矩形拟合——无惧噪声

文章目录 一 摘要 二 资源 三 内容 (文章末尾提供源代码) 一 摘要 目前,获取点集的矩形拟合结果的主要方法是计算其最小外包直立矩形或者旋转矩形。这些方法简单、易用,在数据质量良好的情况下能够较好的贴合矩形形状。然而,在数据缺失时,最小外包围盒方法将会…

57. Uboot图形化界面配置

一、Uboot图形化配置方法 1、通过终端配置。 2、进入到uboot的源码根目录下。 3、首先默认配置 make mx6ull_alientek_emmc_defconfig //默认配置 4、输入make menuconfig。打开图形化配置界面。 5、注意,新电脑需要安装ncurses库。sudo apt-get install libncurs…

Java面试场景题分享

假设你在做电商秒杀活动,秒杀开始时,成千上万的用户同时请求抢购商品。你会如何设计系统来处理这些请求,确保库存不超卖 你如何保证库存的准确性? 这个问题引导你思考如何在高并发下确保库存更新的原子性,最直接的方式…

kalman滤波器C++设计仿真实例第三篇

1. 仿真场景 水面上有条船在做匀速直线航行,航行过程中由于风和浪的影响,会有些随机的干扰,也就是会有些随机的加速度作用在船身上,这个随机加速度的均方差大约是0.1,也就是说方差是0.01。船上搭载GPS设备,…

(2025|ICLR,音频 LLM,蒸馏/ALLD,跨模态学习,语音质量评估,MOS)音频 LLM 可作为描述性语音质量评估器

Audio Large Language Models Can Be Descriptive Speech Quality Evaluators 目录 1. 概述 2. 研究背景与动机 3. 方法 3.1 语音质量评估数据集 3.2 ALLD 对齐策略 4. 实验结果分析 4.1 MOS 评分预测(数值评估) 4.2 迁移能力(在不同…

stm32生成hex文件详解

1.产生的map文件干啥的? 2.组成情况??? 废话少说,直接上代码具体内容况: Component: ARM Compiler 5.06 update 7 (build 960) Tool: armlink [4d3601]Section Cross Referencesstartup_stm32f103xe.o(S…