PyDeequ库在AWS EMR启动集群中数据质量检查功能的配置方法和实现代码

PyDeequ是一个基于Apache Spark的Python API,专门用于定义和执行“数据单元测试”,从而在大规模数据集中测量数据质量。
PyDeequ框架在PySpark代码中提供了全面的数据质量检查功能,能够帮助用户&有效地监控和提升大规模数据集的数据质量。它在PySpark代码中的数据质量检查功能主要包括以下几个方面:

核心组件

  1. 指标计算(Metrics Computation):利用分析器(Analyzers)对数据集的每一列进行分析,生成数据概要。

  2. 约束建议:自动提出基于不同分析组的验证约束,以确保数据的一致性。

  3. 约束验证:依据设定的标准对数据集进行实时或批量验证。

  4. 度量存储库:实现对验证历史的跟踪与存储,便于持续监控数据质量。

功能特性

  1. 数据剖析:PyDeequ可以对数据集的每一列进行深入的剖析,包括数据的完整性、空值情况、唯一性统计等关键指标。

  2. 约束定义与验证:用户可以定义各种数据质量约束,如数据的类型、范围、唯一性、非空性等,并使用PyDeequ对这些约束进行验证。验证结果会明确指出哪些数据不符合预设的约束条件。

  3. 灵活性与可扩展性:PyDeequ支持用户根据业务需求自定义约束条件和分析规则,灵活应对各种数据质量挑战。同时,它也易于集成到现有的PySpark工作流中。

  4. 报告与监控:PyDeequ可以生成详细的数据质量报告,帮助用户了解数据集的整体质量情况。此外,它还支持对验证历史的跟踪与存储,便于用户持续监控数据质量的变化趋势。

应用场景

  1. 数据湖管理:在AWS Glue、Athena等服务的支持下,PyDeequ可以帮助用户监控数据湖中的数据质量。

  2. 数据仓库:在数据仓库中,PyDeequ可以用于定期检测数据质量,防止数据质量问题影响业务决策。

  3. 实时数据处理:在实时数据处理系统中,PyDeequ可以用于实时监控数据流的质量。

一、AWS EMR 集群配置 PyDeequ 的具体步骤

1. 创建 Bootstrap Script (引导脚本)

PyDeequ 依赖 Java 库和 Python 包,需在 EMR 集群初始化时自动安装。

#!/bin/bash
# bootstrap.sh# 安装 Python 依赖
sudo pip3 install pydeequ# 下载 Deequ JAR 包到 Spark 类路径
aws s3 cp s3://deequ/jars/deequ-2.0.3-spark-3.1.jar /usr/lib/spark/jars/
2. 启动 EMR 集群时指定 Bootstrap 动作

通过 AWS CLI 或控制台启动集群时添加以下参数:

aws emr create-cluster \
--name "PyDeequ_Cluster" \
--release-label emr-6.9.0 \
--applications Name=Spark Name=Hadoop \
--instance-type m5.xlarge \
--instance-count 3 \
--bootstrap-actions Path="s3://your-bucket/bootstrap.sh" \
--use-default-roles
3. 关键验证点
  • 确保 JAR 文件路径正确:/usr/lib/spark/jars/deequ-*.jar
  • Python 环境需为 3.x,可通过 EMR 配置 emr-release-label >= 6.0

二、PyDeequ 数据质量检查核心代码示例

1. 初始化 SparkSession
from pyspark.sql import SparkSessionspark = SparkSession.builder \.appName("PyDeequ-Data-Quality") \.config("spark.jars.packages", "com.amazon.deequ:deequ:2.0.3") \.config("spark.sql.parquet.datetimeRebaseModeInWrite", "CORRECTED") \.getOrCreate()
2. 指标计算(Metrics Computation)
from pydeequ.analyzers import *df = spark.read.parquet("s3://your-data-bucket/transactions")analysisResult = AnalysisRunner(spark) \.onData(df) \.addAnalyzer(Size()) \.addAnalyzer(Completeness("customer_id")) \.addAnalyzer(ApproxCountDistinct("order_id")) \.addAnalyzer(Mean("total_amount")) \.run()analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()
3. 约束建议(Constraint Suggestion)
from pydeequ.suggestions import *suggestionResult = ConstraintSuggestionRunner(spark) \.onData(df) \.addConstraintRule(DEFAULT()) \.run()print("Suggested Constraints:")
for constraint in suggestionResult['constraint_suggestions']:print(f"- {constraint['description']}")
4. 约束验证(Constraint Verification)
from pydeequ.checks import *
from pydeequ.verification import *check = Check(spark, CheckLevel.Error, "DataQualityCheck")result = VerificationSuite(spark) \.onData(df) \.addCheck(check.hasSize(lambda x: x >= 1000) \.isComplete("customer_id") \.isUnique("order_id") \.isNonNegative("total_amount") \.hasPattern("email", r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$") \).run()result_df = VerificationResult.checkResultsAsDataFrame(spark, result)
result_df.show(truncate=False)
5. 指标存储(Metric Repository)
from pydeequ.repository import *
from pydeequ.metrics import *metrics_repository = FileSystemMetricsRepository(spark, path="s3://quality-metrics-bucket/")
result_key = ResultKey(spark, datetime.strptime("2024-01-01", "%Y-%m-%d"))AnalysisRunner(spark) \.onData(df) \.useRepository(metrics_repository) \.saveOrAppendResult(result_key) \.addAnalyzer(Completeness("customer_id")) \.run()

三、关键配置说明

组件配置要点
JAR 依赖Deequ JAR 必须位于 Spark 的 jars 目录,版本需与 Spark 兼容
Python 版本EMR 6.x 默认使用 Python 3.7+,需通过 pip3 安装
权限配置EMR 角色需有权限访问 S3 存储桶(读取数据/写入指标)
优化参数调整 Spark 内存分配(spark.executor.memory)以处理大规模数据

四、高级应用场景扩展

1. 实时数据质量监控(Kafka 集成)
stream_df = spark.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "kafka-host:9092") \.option("subscribe", "transactions-topic") \.load()def quality_check_microbatch(df, epoch_id):VerificationSuite(spark).onData(df).addCheck(...).run()stream_df.writeStream \.foreachBatch(quality_check_microbatch) \.start()
2. 自定义分析规则
from pydeequ.analyzers import Analyzerclass CustomRangeAnalyzer(Analyzer):def __init__(self, column, min_val, max_val):super().__init__()self.column = columnself.min = min_valself.max = max_valdef to_metric(self, state):# 实现自定义指标计算逻辑passanalysisResult = AnalysisRunner(spark) \.addAnalyzer(CustomRangeAnalyzer("temperature", 0, 100)) \.run()

以上配置和代码实现了 PyDeequ 在 AWS EMR 的完整数据质量流水线。实际部署时需根据数据规模调整 Spark 资源配置(spark-submit 参数),并建议将质量报告存储至 DynamoDB 或 Amazon CloudWatch 实现可视化监控。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/69741.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

React第二十八章(css modules)

css modules 什么是 css modules 因为 React 没有Vue的Scoped,但是React又是SPA(单页面应用),所以需要一种方式来解决css的样式冲突问题,也就是把每个组件的样式做成单独的作用域,实现样式隔离,而css modules就是一种…

开源智慧园区管理系统如何重塑企业管理模式与运营效率

内容概要 在如今快速发展的商业环境中,企业面临着日益复杂的管理挑战。开源智慧园区管理系统应运而生,旨在通过技术创新来应对这些挑战。它不仅是一个简单的软件工具,而是一个全面整合大数据、物联网和智能化功能的综合平台,为企…

Pandas进行MongoDB数据库CRUD

在数据处理的领域,MongoDB作为一款NoSQL数据库,以其灵活的文档存储结构和高扩展性广泛应用于大规模数据处理场景。Pandas作为Python的核心数据处理库,能够高效处理结构化数据。在MongoDB中,数据以JSON格式存储,这与Pandas的DataFrame结构可以很方便地互相转换。通过这篇教…

vue3中el-input无法获得焦点的问题

文章目录 现象两次nextTick()加setTimeout()解决结论 现象 el-input被外层div包裹了&#xff0c;设置autofocus不起作用&#xff1a; <el-dialog v-model"visible" :title"title" :append-to-bodytrue width"50%"><el-form v-model&q…

stm32教程:EXTI外部中断应用

早上好啊大佬们&#xff0c;上一期我们讲了EXTI外部中断的原理以及基础代码的书写&#xff0c;这一期就来尝试一下用它来写一些有实际效能的工程吧。 这一期里&#xff0c;我用两个案例代码来让大家感受一下外部中断的作用和使用价值。 旋转编码器计数 整体思路讲解 这里&…

DeepSeek大模型技术深度解析:揭开Transformer架构的神秘面纱

摘要 DeepSeek大模型由北京深度求索人工智能基础技术研究有限公司开发&#xff0c;基于Transformer架构&#xff0c;具备卓越的自然语言理解和生成能力。该模型能够高效处理智能对话、文本生成和语义理解等复杂任务&#xff0c;标志着人工智能在自然语言处理领域的重大进展。 关…

Ubuntu 系统,如何使用双Titan V跑AI

要在Ubuntu系统中使用双NVIDIA Titan V GPU来运行人工智能任务&#xff0c;你需要确保几个关键组件正确安装和配置。以下是基本步骤&#xff1a; 安装Ubuntu操作系统&#xff1a; 下载最新版本的Ubuntu服务器或桌面版ISO文件。使用工具如Rufus&#xff08;Windows&#xff09;或…

ROS2---基础操作

工作空间(workspace) workspace是一个存放项目开发相关文件的文件夹。例如我们要开发一个机器人&#xff0c;我们可以创建一个工作空间&#xff0c;然后存放这个机器人不同功能的包&#xff08;感知&#xff08;雷达&#xff0c;相机等&#xff09;&#xff0c;运动&#xff0…

护眼好帮手:Windows显示器调节工具

在长时间使用电脑的过程中&#xff0c;显示器的亮度和色温对眼睛的舒适度有着重要影响。传统的显示器调节方式不仅操作繁琐&#xff0c;而且在低亮度下容易导致色彩失真。因此&#xff0c;今天我想为大家介绍一款适用于Windows系统的护眼工具&#xff0c;它可以帮助你轻松调节显…

Mysql进阶学习

目录 一.Mysql服务器内部架构(了解) 二.Mysql引擎 2.1 innodb引擎 2.2 myisam引擎 三.索引 3.1索引分类 3.2mysql索引数据结构 3.3聚簇索引和非聚簇索引 3.4回表查询 3.5索引下推 四.事务 数据库事务特征 事务隔离性&#xff0c;隔离级别 事务实现原理 五.锁 ①…

解锁维特比算法:探寻复杂系统的最优解密码

引言 在复杂的技术世界中&#xff0c;维特比算法以其独特的魅力和广泛的应用&#xff0c;成为通信、自然语言处理、生物信息学等领域的关键技术。今天&#xff0c;让我们一同深入探索维特比算法的奥秘。 一、维特比算法的诞生背景 维特比算法由安德鲁・维特比在 1967 年提出…

使用 postman 测试思源笔记接口

思源笔记 API 权鉴 官方文档-中文&#xff1a;https://github.com/siyuan-note/siyuan/blob/master/API_zh_CN.md 权鉴相关介绍截图&#xff1a; 对应的xxx&#xff0c;在软件中查看 如上图&#xff1a;在每次发送 API 请求时&#xff0c;需要在 Header 中添加 以下键值对&a…

doris:导入时实现数据转换

Doris 在数据导入时提供了强大的数据转换能力&#xff0c;可以简化部分数据处理流程&#xff0c;减少对额外 ETL 工具的依赖。主要支持以下四种转换方式&#xff1a; 列映射&#xff1a;将源数据列映射到目标表的不同列。 列变换&#xff1a;使用函数和表达式对源数据进行实时…

Bash 基础与进阶实践指南

目录 Bash 简介与基础基本命令与文件操作权限管理与用户管理重定向与管道变量与环境变量通配符与正则表达式Shell 脚本结构与控制流常用内建命令与技巧文本处理常用命令作业控制与进程管理别名与函数实用技巧与注意事项更多 Bash 进阶话题参考资源 1. Bash 简介与基础 1.1 什…

sizeof和strlen的对比与一些杂记

1.sizeof和strlen的对比 1.1sizeof &#xff08;1&#xff09;sizeof是一种操作符 &#xff08;2&#xff09;sizeof计算的是类型或变量所占空间的大小&#xff0c;单位是字节 注意事项&#xff1a; &#xff08;1&#xff09;sizeof 返回的值类型是 size_t&#xff0c;这是一…

实测数据处理(Wk算法处理)——SAR成像算法系列(十二)

系列文章目录 《SAR学习笔记-SAR成像算法系列&#xff08;一&#xff09;》 《wk算法-SAR成像算法系列&#xff08;五&#xff09;》 文章目录 前言 一、算法流程 1.1、回波信号生成 2.2 Stolt插值 2.3 距离脉冲压缩 2.4 方位脉冲压缩 2.5 SAR成像 二、仿真实验 2.1、仿真参数…

FFmpeg rtmp推流直播

文章目录 rtmp协议RTMP协议组成RTMP的握手过程RTMP流的创建RTMP消息格式Chunking(Message 分块) rtmp服务器搭建Nginx服务器配置Nginx服务器 librtmp库编译推流 rtmp协议 RTMP&#xff08;Real Time Messaging Protocol&#xff09;是由Adobe公司基于Flash Player播放器对应的…

鸿蒙开发在onPageShow中数据加载不完整的问题分析与解决

API Version 12 1、onPageShow()作什么的 首先说明下几个前端接口的区别&#xff1a; ArkUI-X的aboutToAppear()接口是一个生命周期接口&#xff0c;用于在页面即将显示之前调用。 在ArkUI-X中&#xff0c;aboutToAppear()接口是一个重要的生命周期接口&#xff0c;它会在页…

docker搭建redis集群(三主三从)

本篇文章不包含理论解释&#xff0c;直接开始集群&#xff08;三主三从&#xff09;搭建 环境 centos7 docker 26.1.4 redis latest &#xff08;7.4.2&#xff09; 服务器搭建以及环境配置 请查看本系列前几篇博客 默认已搭建好三个虚拟机并安装配置好docker 相关博客&#xf…

MinDoc 安装与部署

下载可执行文件 mindoc mindoc_linux_amd64.zip 上传并解压压缩包 cd /opt mkdir mindoc cd mindocunzip mindoc_linux_amd64.zip 创建数据库 CREATE DATABASE mindoc_db DEFAULT CHARSET utf8mb4 COLLATE utf8mb4_general_ci; 配置数据库 将解压目录下 conf/app.conf.exam…