从零构建Dagster分区管道:时间+类别分区实战案例

分区是Dagster中的核心抽象概念,它允许我们管理大型数据集、处理增量更新并提高管道性能。本文将详细介绍如何创建和实现基于时间和类别的分区资产。

在这里插入图片描述

什么是分区?

分区是将数据集划分为更小、更易管理的部分的技术。在Dagster中,分区可以基于时间、类别或其他自定义逻辑创建,从而优化数据处理流程。

创建时间分区资产

基于时间的月度分区

首先,我们将创建一个按月份分区的资产,用于计算每个销售代表的月度绩效:

monthly_partition = dg.MonthlyPartitionsDefinition(start_date="2023-01-01")@dg.asset(partitions_def=monthly_partition,compute_kind="duckdb",group_name="analysis",deps=[joined_data]
)
def monthly_sales_performance(context: dg.AssetExecutionContext, duckdb: DuckDBResource):partition_date_str = context.partition_keymonth_to_fetch = partition_date_str[:-3]  # 格式化为YYYY-MMwith duckdb.get_connection() as conn:# 创建表(如果不存在)conn.execute("""CREATE TABLE IF NOT EXISTS monthly_sales_performance (partition_date varchar,rep_name varchar,product varchar,total_dollar_amount double);""")# 删除该月已有数据conn.execute(f"""DELETE FROM monthly_sales_performance WHERE partition_date = '{month_to_fetch}';""")# 插入新数据conn.execute(f"""INSERT INTO monthly_sales_performanceSELECT '{month_to_fetch}' AS partition_date, rep_name, product_name AS product, SUM(dollar_amount) AS total_dollar_amountFROM joined_dataWHERE strftime(date, '%Y-%m') = '{month_to_fetch}'GROUP BY '{month_to_fetch}', rep_name, product_name;""")# 预览数据preview_query = f"SELECT * FROM monthly_sales_performance WHERE partition_date = '{month_to_fetch}';"preview_df = conn.execute(preview_query).fetchdf()row_count = conn.execute(f"""SELECT COUNT(*) FROM monthly_sales_performance WHERE partition_date = '{month_to_fetch}';""").fetchone()[0] if conn.execute(f"""SELECT COUNT(*) FROM monthly_sales_performance WHERE partition_date = '{month_to_fetch}';""").fetchone() else 0return dg.MaterializeResult(metadata={"row_count": dg.MetadataValue.int(row_count),"preview": dg.MetadataValue.md(preview_df.to_markdown(index=False))})

创建类别分区资产

基于产品类别的分区

接下来,我们创建一个基于预定义产品类别的静态分区资产:

product_category_partition = dg.StaticPartitionsDefinition(["Electronics", "Books", "Home and Garden", "Clothing"
])@dg.asset(deps=[joined_data],partitions_def=product_category_partition,group_name="analysis",compute_kind="duckdb"
)
def product_performance(context: dg.AssetExecutionContext, duckdb: DuckDBResource):product_category_str = context.partition_keywith duckdb.get_connection() as conn:# 创建表(如果不存在)conn.execute("""CREATE TABLE IF NOT EXISTS product_performance (product_category varchar,product_name varchar,total_dollar_amount double,total_units_sold double);""")# 删除该类别已有数据conn.execute(f"""DELETE FROM product_performance WHERE product_category = '{product_category_str}';""")# 插入新数据conn.execute(f"""INSERT INTO product_performanceSELECT '{product_category_str}' AS product_category, product_name, SUM(dollar_amount) AS total_dollar_amount, SUM(quantity) AS total_units_soldFROM joined_dataWHERE category = '{product_category_str}'GROUP BY '{product_category_str}', product_name;""")# 预览数据preview_query = f"SELECT * FROM product_performance WHERE product_category = '{product_category_str}';"preview_df = conn.execute(preview_query).fetchdf()row_count = conn.execute(f"""SELECT COUNT(*) FROM product_performance WHERE product_category = '{product_category_str}';""").fetchone()[0] if conn.execute(f"""SELECT COUNT(*) FROM product_performance WHERE product_category = '{product_category_str}';""").fetchone() else 0return dg.MaterializeResult(metadata={"row_count": dg.MetadataValue.int(row_count),"preview": dg.MetadataValue.md(preview_df.to_markdown(index=False))})

将分区资产添加到Definitions

完成资产定义后,需要将它们添加到Dagster的Definitions对象中:

defs = dg.Definitions(assets=[products, sales_reps, sales_data, joined_data, monthly_sales_performance, product_performance,], asset_checks=[missing_dimension_check],resources={"duckdb": DuckDBResource(database="data/mydb.duckdb")}
)

物化分区资产

在Dagster UI中操作这些分区资产的步骤:

  1. 导航到"Assets"页面
  2. 点击"Reload definitions"重新加载定义
  3. 选择"monthly_sales_performance"资产,然后点击"Materialize selected"
    • 确保选择所有分区
    • 启动回填(backfill)作业
  4. 选择"product_performance"资产,然后点击"Materialize selected"
    • 确保选择所有分区
    • 启动回填作业

下一步计划

现在我们已经建立了ETL管道的主要资产,下一步可以考虑:

  1. 添加自动化调度
  2. 实现数据质量监控
  3. 添加异常处理机制
  4. 优化查询性能
  5. 扩展更多维度的分析

通过合理使用分区技术,我们可以显著提高Dagster管道的性能和可维护性,特别是在处理大规模数据集时。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/78731.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Cursor:AI时代的智能编辑器

在开发者社区掀起热潮的Cursor,正以破竹之势重塑编程工具格局。这款基于VS Code的AI优先编辑器,不仅延续了经典IDE的稳定基因,更通过深度集成的智能能力,将开发效率推向全新维度。2023年Anysphere公司获得的6000万美元A轮融资&…

SpringMVC再复习1

一、三层架构 表现层(WEB 层) 定义 :是应用程序与客户端进行交互的最外层,主要负责接收用户的请求,并将处理结果显示给用户。 作用 :在 Spring MVC 中,表现层通常采用 MVC 设计模式来构建。 技…

Centos 7系统 宝塔部署Tomcat项目(保姆级教程)

再看文章之前默认已经安装好系统,可能是云系统,或者是虚拟机。 宝塔安装 这个比较简单,参考这个老哥的即可: https://blog.csdn.net/weixin_42753193/article/details/125959289 环境配置 进入宝塔面板之后会出现环境安装&…

Nginx核心功能

目录 一:基于授权的访问控制 1:基于授权的访问控制简介 2:基于授权的访问控制步骤 (1)使用htpasswd 生成用户认证文件 (2)修改密码文件权限为400,将所有者改为nginx,…

AnimateCC基础教学:漫天繁星-由DeepSeek辅助完成

1.界面及元件抓图: 2.核心代码: // 初始化设置 var stars []; var stars2 []; var numStars 100; var stageWidth stage.canvas.width; var stageHeight stage.canvas.height; console.log(stageWidth, stageHeight);// 创建星星函数 function createStar() {var star n…

通过DeepSeek大语言模型控制panda机械臂,听懂人话,拟人性回答。智能机械臂助手又进一步啦

文章目录 前言环境配置运行测试报错 前言 通过使用智能化的工作流控制系统来精确操控机械臂,不仅能够基于预设算法可靠地规划每个动作步骤的执行顺序和力度,确保作业流程的标准化和可重复性,还能通过模块化的程序设计思路灵活地在原有工作流中…

分享一款免费的 AI 工作流平台

分享一款 AI 工作流/任务流平台,通过直观的流程图设计,轻松实现复杂业务流程的自动化与可视化,无缝集成 AI 大模型、AI 生图、数据库、条件分支、并行节点、自定义任务节点等等。 效果图: 官网体验地址:https://www.…

前端开发本地配置 HTTPS 全面详细教程

分为两步:生成证书、本地服务配置使用证书一、HTTPS 的基本概念 HTTPS 是一种安全的 HTTP 协议,它通过 SSL/TLS 对数据进行加密,确保数据在传输过程中不被窃取或篡改。在前端开发中,某些功能(如 Geolocation API、Web…

day10 python机器学习全流程实践

在机器学习的实践中,数据预处理与模型构建是极为关键的环节。本文将回顾数据预处理的全流程,并基于处理后的数据完成简单的机器学习建模与评估,暂不涉及复杂的调参过程。 一、预处理流程回顾 机器学习的成功,很大程度上依赖于高…

4月28号

初认web前端: web标准: HTML:

【Linux系统】systemV共享内存

system V共享内存 在Linux系统中,共享内存是一种高效的进程间通信(IPC)机制,它允许两个或者多个进程共享同一块物理内存区域,这些进程可以将这块区域映射到自己的虚拟地址空间中。 共享内存区是最快的IPC形式。一旦这…

(七)RestAPI 毛子(Http 缓存/乐观锁/Polly/Rate limiting/异步大文件上传)

文章目录 项目地址一、Http Cache1.1 服务注册1.2 Validation with ETag1. 添加ETagMiddleware中间件2. 创建内存ETag存储器3. 服务注册4. 测试二、使用ETag实现乐观锁2.1 添加乐观锁方法2.2 修改Controller2.3 测试乐观锁三、Rate Limiting3.1 添加速率控制服务1. 在Program里…

2025.4.26_STM32_SPI

1.SPI简介 2.硬件电路 所有SPI设备的SCK(时钟)、MOSI(主机输出从机输入)、MISO(主机输入从机输出)分别连在一起。SCK线只能被主机控制,和I2C相同。 主机另外引出多条SS控制线,分别接到各从机的SS引脚 (SS不用的时候为高电平,当主机需要选中某…

JAVA:单例模式

单例模式是设计模式之一 设计模式,就像古代打仗,我们都听过孙子兵法,把计谋概括下来弄成一种模式,形成一种套路。 软件开发中也有很多场景,多数类似的问题场景,解决方案就形成固定的模式,单例…

脑机接口:重塑人类未来的神经增强革命

引言 人类对大脑的探索从未停止,而脑机接口(Brain-Computer Interface, BCI)的崛起,正在将科幻电影中的“意念操控”变为现实。 这项技术通过解码脑电信号,实现人脑与外部设备的直接交互,不仅为医疗康复带来…

从SOA到微服务:架构演进之路与实践示例

一、架构演进背景 在软件开发领域,架构风格随着业务需求和技术发展不断演进。从早期的单体架构,到面向服务架构(SOA),再到如今的微服务架构,每一次变革都是为了解决当时面临的核心问题。 二、SOA架构解析 2.1 SOA核心概念 SOA&…

可灵AI 2.0上线:重新定义AI创作?好莱坞级特效触手可及

2025年4月15日,快手正式发布可灵AI 2.0,这款被誉为“让好莱坞特效师颤抖”的AI工具,以物理引擎级动态生成和电影语言自由操控两大核心技术,彻底颠覆了内容创作的想象边界。上线24小时内,全球用户已用它生成超过100万条…

Mysql存储引擎、锁机制

Mysql存储引擎 InnoDB​(MySQL 5.5 及以后版本中的默认存储引擎) ​​事务支持​​:支持 ​​ACID 事务​​,适合需要高可靠性的场景(如支付、订单)。 ​​锁机制​​:默认使用 ​​行级锁​​…

飞蛾扑火算法优化+Transformer四模型回归打包(内含MFO-Transformer-LSTM及单独模型)

飞蛾扑火算法优化Transformer四模型回归打包(内含MFO-Transformer-LSTM及单独模型) 目录 飞蛾扑火算法优化Transformer四模型回归打包(内含MFO-Transformer-LSTM及单独模型)预测效果基本介绍程序设计参考资料 预测效果 基本介绍 …

音视频开发---视频编码基础

一、视频编码的必要性 1. 存储与传输成本高 未经编码压缩的原始视频的数据量极大,例如:一般电影的亮度信号采样频率为13.5MHz;色度信号的频带通常为亮度信号的一半或更少,为6.75MHz或3.375MHz。以4:2:2的采样频率为例,Y信号采用13.5MHz,色度信号U和V采用6.75MHz采样,…