使用 Pandera 的 PySpark 应用程序的数据验证

推荐:使用 NSDT场景编辑器 快速搭建3D应用场景

本文简要介绍了 Pandera 的主要功能,然后继续解释 Pandera 数据验证如何与自最新版本 (Pandera 0.16.0) 以来使用本机 PySpark SQL 的数据处理工作流集成。

Pandera 旨在与其他流行的 Python 库配合使用,如 pandas、pyspark.pandas、Dask 等。这样可以轻松地将数据验证合并到现有数据处理工作流中。直到最近,Pandera 还缺乏对 PySpark SQL 的原生支持,但为了弥合这一差距,QuantumBlack 的一个团队,麦肯锡的 AI 由 Ismail Negm-PARI、Neeraj Malhotra、Jaskaran Singh Sidana、Kasper Janehag、Oleksandr Lazarchuk 以及 Pandera 创始人 Niels Bantilan 组成。,开发了原生的 PySpark SQL 支持并将其贡献给了 Pandera。本文的文字也是团队准备的,下面用他们的话写。

Pandera的主要特点

如果您不熟悉使用Pandera来验证数据,我们建议您查看Khuyen Tran的“使用Pandera验证您的pandas DataFrame”,其中描述了基础知识。总之,我们简要解释了简单直观的 API、内置验证功能和自定义的主要功能和优势。

简单直观的接口

Pandera 的突出特点之一是其简单直观的 API。您可以使用易于阅读和理解的声明性语法来定义数据架构。这使得编写既高效又有效的数据验证代码变得容易。

下面是 Pandera 中的架构定义示例:

class InputSchema(pa.DataFrameModel):year: Series[int] = pa.Field()month: Series[int] = pa.Field()day: Series[int] = pa.Field()

内置验证函数

Pandera 提供了一组内置函数(通常称为检查)来执行数据验证。当我们调用 Pandera 模式时,它将执行模式和数据验证。数据验证将在后台调用函数。validate()check

下面是如何使用 Pandera 在数据帧对象上运行数据的简单示例。check

class InputSchema(pa.DataFrameModel):year: Series[int] = pa.Field(gt=2000, coerce=True)month: Series[int] = pa.Field(ge=1, le=12, coerce=True)day: Series[int] = pa.Field(ge=0, le=365, coerce=True)InputSchema.validate(df)

如上所示,对于字段,我们定义了一个检查,强制此字段中的所有值必须大于 2000,否则 Pandera 将引发验证失败。yeargt=2000

以下是 Pandera 默认提供的所有内置检查的列表:

eq: checks if value is equal to a given literal
ne: checks if value is not equal to a given literal
gt: checks if value is greater than a given literal
ge: checks if value is greater than & equal to a given literal
lt: checks if value is less than a given literal
le: checks if value is less than & equal to a given literal
in_range: checks if value is given range
isin: checks if value is given list of literals
notin: checks if value is not in given list of literals
str_contains: checks if value contains string literal
str_endswith: checks if value ends with string literal
str_length: checks if value length matches
str_matches: checks if value matches string literal
str_startswith: checks if value starts with a string literal

自定义验证函数

除了内置的验证检查之外,Pandera 还允许您定义自己的自定义验证函数。这使您能够根据用例灵活地定义自己的验证规则。

例如,您可以定义一个用于数据验证的 lambda 函数,如下所示:

schema = pa.DataFrameSchema({"column2": pa.Column(str, [pa.Check(lambda s: s.str.startswith("value")),pa.Check(lambda s: s.str.split("_", expand=True).shape[1] == 2)]),
})

向 Pandera 添加对 PySpark SQL DataFrame 的支持

在添加对 PySpark SQL 的支持的过程中,我们坚持了两个基本原则:

  • 界面和用户体验的一致性
  • 针对 PySpark 的性能优化。

首先,让我们深入研究一致性的主题,因为从用户的角度来看,无论选择的框架如何,他们都有一组一致的 API 和一个接口,这一点很重要。由于Pandera提供了多种框架可供选择,因此在PySpark SQL API中拥有一致的用户体验更为重要。

考虑到这一点,我们可以使用 PySpark SQL 定义 Pandera 模式,如下所示:

from pyspark.sql import DataFrame, SparkSession
import pyspark.sql.types as T
import pandera.pyspark as paspark = SparkSession.builder.getOrCreate()class PanderaSchema(DataFrameModel):"""Test schema"""id: T.IntegerType() = Field(gt=5)product_name: T.StringType() = Field(str_startswith="B")price: T.DecimalType(20, 5) = Field()description: T.ArrayType(T.StringType()) = Field()meta: T.MapType(T.StringType(), T.StringType()) = Field()data_fail = [(5, "Bread", 44.4, ["description of product"], {"product_category": "dairy"}),(15, "Butter", 99.0, ["more details here"], {"product_category": "bakery"}),]spark_schema = T.StructType([T.StructField("id", T.IntegerType(), False),T.StructField("product", T.StringType(), False),T.StructField("price", T.DecimalType(20, 5), False),T.StructField("description", T.ArrayType(T.StringType(), False), False),T.StructField("meta", T.MapType(T.StringType(), T.StringType(), False), False),],)
df_fail = spark_df(spark, data_fail, spark_schema)

在上面的代码中,定义了传入 pyspark 数据帧的架构。它有 5 个字段,对 and 字段进行数据检查和强制执行。 PanderaSchemadtypesidproduct_name

class PanderaSchema(DataFrameModel):"""Test schema"""id: T.IntegerType() = Field(gt=5)product_name: T.StringType() = Field(str_startswith="B")price: T.DecimalType(20, 5) = Field()description: T.ArrayType(T.StringType()) = Field()meta: T.MapType(T.StringType(), T.StringType()) = Field()

接下来,我们构建了一个虚拟数据,并强制实施了 中定义的本机 PySpark SQL 架构spark_schema

spark_schema = T.StructType([T.StructField("id", T.IntegerType(), False),T.StructField("product", T.StringType(), False),T.StructField("price", T.DecimalType(20, 5), False),T.StructField("description", T.ArrayType(T.StringType(), False), False),T.StructField("meta", T.MapType(T.StringType(), T.StringType(), False), False),],)df_fail = spark_df(spark, data_fail, spark_schema)

这样做是为了模拟架构和数据验证失败。

以下是数据帧的内容:df_fail

df_fail.show()+---+-------+--------+--------------------+--------------------+| id|product|   price|         description|                meta|+---+-------+--------+--------------------+--------------------+|  5|  Bread|44.40000|[description of p...|{product_category...|| 15| Butter|99.00000| [more details here]|{product_category...|+---+-------+--------+--------------------+--------------------+

接下来,我们可以调用 Pandera 的验证函数来执行模式和数据级验证,如下所示:

df_out = PanderaSchema.validate(check_obj=df)

我们将很快探讨的内容。df_out

PySpark 的性能优化

我们的贡献是专门为使用 PySpark 数据帧时的最佳性能而设计的,这在处理大型数据集时至关重要,以便处理 PySpark 分布式计算环境的独特挑战。

Pandera 使用 PySpark 的分布式计算架构来高效处理大型数据集,同时保持数据的一致性和准确性。我们针对 PySpark 性能重写了 Pandera 的自定义验证函数,以便更快、更高效地验证大型数据集,同时降低数据错误和大容量不一致的风险。

全面的错误报告

我们对Pandera进行了另一项添加,以便能够以Python字典对象的形式生成详细的错误报告。这些报告可通过从验证函数返回的数据帧进行访问。它们根据用户的配置提供所有架构和数据级别验证的全面摘要。

事实证明,此功能对于开发人员快速识别和解决任何与数据相关的问题很有价值。通过使用生成的错误报告,团队可以编译其应用程序中架构和数据问题的完整列表。这使他们能够高效、精确地确定问题的优先级和解决方案。

需要注意的是,此功能目前仅适用于 PySpark SQL,为用户提供了在 Pandera 中使用错误报告时增强的体验。

在上面的代码示例中,请记住我们在 Spark 数据帧上调用过:validate()

df_out = PanderaSchema.validate(check_obj=df)

它返回了一个数据帧对象。使用访问器,我们可以从中提取错误报告,如下所示:

print(df_out.pandera.errors)
{"SCHEMA":{"COLUMN_NOT_IN_DATAFRAME":[{"schema":"PanderaSchema","column":"PanderaSchema","check":"column_in_dataframe","error":"column 'product_name' not in dataframe Row(id=5, product='Bread', price=None, description=['description of product'], meta={'product_category': 'dairy'})"}],"WRONG_DATATYPE":[{"schema":"PanderaSchema","column":"description","check":"dtype('ArrayType(StringType(), True)')","error":"expected column 'description' to have type ArrayType(StringType(), True), got ArrayType(StringType(), False)"},{"schema":"PanderaSchema","column":"meta","check":"dtype('MapType(StringType(), StringType(), True)')","error":"expected column 'meta' to have type MapType(StringType(), StringType(), True), got MapType(StringType(), StringType(), False)"}]},"DATA":{"DATAFRAME_CHECK":[{"schema":"PanderaSchema","column":"id","check":"greater_than(5)","error":"column 'id' with type IntegerType() failed validation greater_than(5)"}]}
}

如上所示,错误报告在 python 字典对象中的 2 个级别上聚合,以便下游应用程序轻松使用,例如使用 Grafana 等工具随时间推移的时间序列可视化错误:

  1. 验证类型 = 或SCHEMADATA
  2. 错误类别 = 或 等。DATAFRAME_CHECKWRONG_DATATYPE

这种重构错误报告的新格式是在 0.16.0 中引入的,作为我们贡献的一部分。

开/关开关

对于依赖 PySpark 的应用程序,具有开/关开关是一项重要功能,可以在灵活性和风险管理方面产生重大影响。具体而言,开/关开关允许团队在生产中禁用数据验证,而无需更改代码。

这对于性能至关重要的大数据管道尤其重要。在许多情况下,数据验证可能会占用大量处理时间,这可能会影响管道的整体性能。使用开/关开关,团队可以在必要时快速轻松地禁用数据验证,而无需经历耗时的修改代码过程。

我们的团队在 Pandera 中引入了开/关开关,因此用户只需更改配置设置即可轻松关闭生产中的数据验证。这提供了在必要时确定性能优先级所需的灵活性,而不会牺牲开发中的数据质量或准确性。

要启用验证,请在环境变量中设置以下内容:

export PANDERA_VALIDATION_ENABLED=False

Pandera将选取此选项以禁用应用程序中的所有验证。默认情况下,验证处于启用状态。

目前,此功能仅适用于 0.16.0 版本的 PySpark SQL,因为它是我们的贡献引入的新概念。

对Pandera执行的精细控制

除了开/关开关功能外,我们还引入了对 Pandera 验证流程执行的更精细的控制。这是通过引入可配置的设置来实现的,这些设置允许用户在三个不同的级别控制执行:

  1. SCHEMA_ONLY:此设置仅执行架构验证。它检查数据是否符合架构定义,但不执行任何其他数据级验证。
  2. DATA_ONLY:此设置仅执行数据级验证。它根据定义的约束和规则检查数据,但不验证架构。
  3. SCHEMA_AND_DATA:此设置同时执行架构和数据级验证。它根据架构定义以及定义的约束和规则检查数据。

通过提供这种精细控制,用户可以选择最适合其特定用例的验证级别。例如,如果主要关注点是确保数据符合定义的架构,则可以使用该设置来减少总体处理时间。或者,如果已知数据符合架构,并且重点是确保数据质量,则可以使用该设置来确定数据级验证的优先级。SCHEMA_ONLYDATA_ONLY

对 Pandera 执行的增强控制使用户能够在精度和效率之间取得微调的平衡,从而实现更有针对性和优化的验证体验。

export PANDERA_VALIDATION_DEPTH=SCHEMA_ONLY

默认情况下,将启用验证,并设置深度,可以根据用例将其更改为或根据需要更改。SCHEMA_AND_DATASCHEMA_ONLYDATA_ONLY

目前,此功能仅适用于 0.16.0 版本的 PySpark SQL,因为它是我们的贡献引入的新概念。

列和数据帧级别的元数据

我们的团队为 Pandera 添加了一项新功能,允许用户在和级别存储额外的元数据。此功能旨在允许用户在其架构定义中嵌入上下文信息,以供其他应用程序利用。FieldSchema / Model

例如,通过存储有关特定列的详细信息(如数据类型、格式或单位),开发人员可以确保下游应用程序能够正确解释和使用数据。同样,通过存储有关特定用例需要架构的哪些列的信息,开发人员可以优化数据处理管道、降低存储成本并提高查询性能。

在架构级别,用户可以存储信息以帮助对整个应用程序的不同架构进行分类。此元数据可以包括架构用途、数据源或数据的日期范围等详细信息。这对于管理复杂的数据处理工作流特别有用,其中多个架构用于不同的目的,需要有效地跟踪和管理。

class PanderaSchema(DataFrameModel):"""Pandera Schema Class"""id: T.IntegerType() = Field(gt=5,metadata={"usecase": ["RetailPricing", "ConsumerBehavior"],"category": "product_pricing"},)product_name: T.StringType() = Field(str_startswith="B")price: T.DecimalType(20, 5) = Field()class Config:"""Config of pandera class"""name = "product_info"strict = Truecoerce = Truemetadata = {"category": "product-details"}

在上面的示例中,我们引入了有关架构对象本身的其他信息。这在 2 个级别是允许的:字段和架构。

To extract the metadata on schema level (including all fields in it), we provide helper functions as:

PanderaSchema.get_metadata()
The output will be dictionary object as follows:
{"product_info": {"columns": {"id": {"usecase": ["RetailPricing", "ConsumerBehavior"],"category": "product_pricing"},"product_name": None,"price": None,},"dataframe": {"category": "product-details"},}
}

目前,此功能是 0.16.0 中的一个新概念,已针对 PySpark SQL 和 Pandas 添加。

总结

我们引入了几个新功能和概念,包括允许团队在不更改代码的情况下禁用生产中的验证的开/关开关、对 Pandera 验证流程的精细控制,以及在列和数据帧级别存储其他元数据的能力。

原文链接:使用 Pandera 的 PySpark 应用程序的数据验证 (mvrlink.com)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/76000.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux 压缩webfile文件夹 webfile.tar.gz和webfile.tar的区别

linux 压缩webfile文件夹 在Linux中,你可以使用tar命令来压缩文件夹。以下是将文件夹压缩为名为"webfile.tar"的示例命令: cd到webfile所在的文件夹,然后执行 tar -cvf webfile.tar webfile/上述命令中,-c选项表示创建…

DNS(域名解析系统)

含义 当我们在上网要访问莫个服务器的时候,就需要知道服务器的IP地址,但IP地址是一串数字,虽然这串数字用点分十进制已经清晰不少了,但还是不利于人们记忆和传播,于是人们使用单词来代替IP地址(例如baidu&a…

Jira 笔记

目录 1. Jira 笔记1.1. 项目管理工具 JIRA 实践指导1.2. JIRA 1. Jira 笔记 1.1. 项目管理工具 JIRA 实践指导 https://zhuanlan.zhihu.com/p/619453520?utm_id0 1、JIRA 序言篇1.1 为什么要使用项目管理工具1.2 项目管理工具分析与比较二 JIRA 配置篇2.1 JIRA 配置之问题类…

算法基础-数学知识-容斥原理、博弈论

容斥原理、博弈论 容斥原理890. 能被整除的数(二进制状态压缩版本,复杂度多一个Om)890. 能被整除的数(dfs版本) 博弈论无限制nim游戏AcWing 891. Nim游戏AcWing 892. 台阶-Nim游戏(待补) 集合版…

Linux中防火墙的简单使用方法

目录 前言 ​编辑 一、概念 1、防火墙的分类: 2、防火墙性能 3、硬件防火墙的品牌、软件防火墙的品牌 4、硬件防火墙与软件防火墙比较 二、linux中的防火墙 1、iptables 2.netfilter/iptables功能 3、四表 iptables中表的优先级 4、五链 三、iptables…

数字化转型背景下企业知识管理能力提升路径

近年来,科技不断进步,颠覆性技术(例如 5G、云计算、物联网、大数据分析和人工智能等)正在重新定义企业如何管理项目和运营效率。知识管理体系亦需要随着科技的进步而改变,以适应新的数字时代环境,并且高效知…

说说MySQL回表查询与覆盖索引

分析&回答 什么是回表查询? 通俗的讲就是,如果索引的列在 select 所需获得的列中(因为在 mysql 中索引是根据索引列的值进行排序的,所以索引节点中存在该列中的部分值)或者根据一次索引查询就能获得记录就不需要…

从零开始搭建vite4.0-vue3.0项目

目录 前言 项目地址 项目初始化 git初始化 别名配置 解决vscode报错 vue-router安装 pinia安装 环境配置 axios安装 element-plus按需引入 eslint与prettier安装 scss安装 stylelint配置 代码提交规范配置 husky与lint-stage配置 前言 pnpm和npm的命令行完全一…

FastChat

Fast Chat是一个用于训练/部署和评估基于大型语言模型的聊天机器人的开发平台。其核心功能包括: 最先进模型的权重/训练代码和评估代码(例如Vicuna/FastChat-T5)基于分布式多模型的服务系统,具有Web界面和与OpenAI兼容的RESTful API。 安装 pip instal…

在Cisco设备上配置接口速度和双工

默认情况下,思科交换机将自动协商速度和双工设置。将设备(交换机、路由器或工作站)连接到 Cisco 交换机上的端口时,将发生协商过程,设备将就传输参数达成一致,当今的大多数网络适配器都支持此功能。 在本文…

八路DI八路DO,开关量远程IO模块,Modbus TCP数据采集模块 YL90-RJ45

特点: ● 八路开关量输入,八路开关量输出 ● DI状态变化自动发送状态数据,可以捕获脉冲 ● 采用Socket自由协议编程简单、轻松应用 ● 开关量毫秒级响应速度适应多种场合 ● 内置网页功能,可以通过网页查询与控制 ● 同时也…

星际争霸之小霸王之小蜜蜂(十二)--猫有九条命

系列文章目录 星际争霸之小霸王之小蜜蜂(十一)--杀杀杀 星际争霸之小霸王之小蜜蜂(十)--鼠道 星际争霸之小霸王之小蜜蜂(九)--狂鼠之灾 星际争霸之小霸王之小蜜蜂(八)--蓝皮鼠和大…

练习接口测试详细步骤

最近一段时间学了Python语言,重新学了 Java,js,html语言,CSS,linux,一堆测试工具;唉~ 在接触接口测试过程中补了很多课, 终于有点领悟接口测试的根本; 偶是…

Beats:安装及配置 Metricbeat (二)- 8.x

这篇文章是继文章 “Beats:安装及配置 Metricbeat (一)- 8.x” 的续篇。你可以先阅读之前的那篇文章再继续阅读这篇文章。我们在这篇文章中继续之前的探讨。 使用 fingerprint 来代替证书 在实际的使用中,我们需要从 Elasticsear…

SB树,看这一篇就够了

算法拾遗三十九SB树及跳表 SB树SB树四种违规类型总结 SB树Code 跳表 SB树 SB树是基于搜索二叉树来的,也有左旋和右旋的操作,只是不同于AVL树,它也有它自己的一套平衡性方法。 任何以叔叔节点为头的子树的节点个数不小于自己任何一个侄子树的…

如何自己开发一个前端监控SDK

最近在负责团队前端监控系统搭建的任务。因为我们公司有统一的日志存储平台、日志清洗平台和基于 Grafana 搭建的可视化看板,就剩日志的采集和上报需要自己实现了,所以决定封装一个前端监控 SDK 来完成日志的采集和上报。 架构设计 因为想着以后有机会…

『力扣每日一题08』验证回文串

一、题目 如果在将所有大写字符转换为小写字符、并移除所有非字母数字字符之后,短语正着读和反着读都一样。则可以认为该短语是一个 回文串 。 字母和数字都属于字母数字字符。 给你一个字符串 s,如果它是 回文串 ,返回 true ;否…

【软考】系统集成项目管理工程师(三)信息系统集成专业技术知识③

一、云计算 1、定义 通过互联网来提供大型计算能力和动态易扩展的虚拟化资源;云是网络、互联网的一种比喻说法。是一种大集中的服务模式。 2、特点 (1)超大规模(2)虚拟化(3)高可扩展性&…

UG\NX二次开发 计算一个向量的反向向量UF_VEC3_negate

文章作者:里海 来源网站:王牌飞行员_里海_里海NX二次开发3000例,里海BlockUI专栏,C\C++-CSDN博客 简介: UG\NX二次开发 计算一个向量的反向向量UF_VEC3_negate 效果: 代码: #include "me.hpp"void ufusr(char* param, int* retcode, int paramLen) {UF…

什么是Docker和Docker-Compose?

Docker的构成 Docker仓库:https://hub.docker.com Docker自身组件 Docker Client:Docker的客户端 Docker Server:Docker daemon的主要组成部分,接受用户通过Docker Client发出的请求,并按照相应的路由规则实现路由分发…