当 PyIceberg 和 DuckDB 遇见 AWS S3 Tables:打造 Serverless 数据湖“开源梦幻组合”

引言

在一些大数据分析场景比如电商大数据营销中,我们需要快速分析存储海量用户行为数据(如浏览、加购、下单),以进行用户行为分析,优化营销策略。传统方法依赖 Spark/Presto 集群或 Redshift 查询 S3 上的 Parquet/ORC 文件,这对于需要快速迭代、按需执行的分析来说,成本高、运维复杂且响应不够敏捷。

本文将介绍一种现代化的 Serverless 解决方案:利用 S3 Tables(内置优化的 Apache Iceberg 支持)作为存储基础,并结合 PyIceberg 的便捷性与 DuckDB 的高性能嵌入式分析能力,直接在 AWS Lambda 等环境中实现对 S3 数据的低成本、高效率即时查询,彻底摆脱集群运维的负担,加速您的用户行为分析。关键工具及技术点:

  1. S3 Tables:在 S3 上为表格数据(采用内建优化的 Apache Iceberg 格式)设计的、具备自动性能优化的智能对象存储。
  2. Lambda:提供按需运行代码的无服务器计算能力
  3. PyIceberg:Iceberg 官方开源项目,提供简洁的 Python API 来操作 Iceberg 表
  4. DuckDB:高性能嵌入式分析引擎,支持 Iceberg rest catalog 接口

核心实践

使用 PyIceberg 创建和插入 S3 Tables

首先,安装 python 依赖

pip install pyiceberg[s3fs, pyarrow]

创建表和插入表的核心代码如下,通过 pyiceberg 对接 S3 Tables 的 rest catalog api 来实现 catalog 的获取,从而实现表的创建、列出,以及数据的插入等操作。

from pyiceberg.catalog import load_catalog
import pyarrow as pa
rest_catalog = load_catalog("catalog_name",**{"type": "rest","warehouse": "arn:aws:s3tables:us-west-2:${awsAccountId}:bucket/testtable","uri": "https://s3tables.us-west-2.amazonaws.com/iceberg","rest.sigv4-enabled": "true","rest.signing-name": "s3tables","rest.signing-region": "us-west-2","py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"}
)
# 新建namespace
rest_catalog.create_namespace("namespace_example")
# 新建表
rest_catalog.create_table("namespace_example.test_table",schema=pa.schema([("id", pa.int32()),("data", pa.string()),])
)
# 打印表列表
tables_list = rest_catalog.list_tables("namespace_example")
print(tables_list)
# 获取表对象
table = rest_catalog.load_table("namespace_example.test_table")
df = pa.Table.from_pylist([{"id": 303, "data": 'test insert icb'}], schema=table.schema().as_arrow()
)
#插入表
table.append(df)
# 读取表并打印
for row in table.scan().to_arrow().to_pylist():
print(row)

可以先通过本地配置 AWS CLI 权限然后运行代码进行测试,然后通过 docker 的方式部署 Lambda。

参考 Dockerfile:

FROM public.ecr.aws/lambda/python:3.12
COPY requirements.txt ${LAMBDA_TASK_ROOT}
RUN pip install -r requirements.txt
COPY lambda_function.py ${LAMBDA_TASK_ROOT}
CMD [ "lambda_function.handler" ]

使用 DuckDB 在 S3 Tables 进行复杂数据分析查询

这里使用 1.2.1 版本的 DuckDB,通过 pip install duckdb==1.2.1 来安装,DuckDB 最新的夜间版本插件支持了 Apache Iceberg REST 目录,而 S3 Tables 也有 REST 目录接口。可以通过在 Lambda 上部署 DuckDB 来读取查询分析 S3 Tables 里面的数据。也可以把 DuckDB 嵌入到您的应用程序中直接查询 S3 Tables。

DuckDB 的 Lambda 实现代码如下,结合了 boto3 的 S3 Tables 客户端,通过 api 把 S3 Tables 里面的桶加载到 DuckDB 的 catalog 中,后续就可以直接通过 sql 来进行查询了。Lambda 的入口函数接收 sql,然后返回 sql 的执行结果,示例 sql: Select * from bucketname.namespace.tablename 就可以直接查询出对应桶里面的表的数据了,需要注意的是在 DuckDB 里面一般通过 DETACH 和 ATTACH 来获取最新的 catalog 表元数据。

import os
import duckdb
import boto3
os.environ['HOME'] = '/tmp'
con = duckdb.connect(database=':memory:', config={'memory_limit': '9GB','worker_threads': 5,'temp_directory':'/tmp/file/overmem'})
# 验证设置
con.execute("""
FORCE INSTALL aws FROM core_nightly;
FORCE INSTALL httpfs FROM core_nightly;
FORCE INSTALL iceberg FROM core_nightly;
CREATE SECRET (TYPE s3,PROVIDER credential_chain
);
""")
s3tables = boto3.client('s3tables')
table_buckets = s3tables.list_table_buckets(maxBuckets=1000)['tableBuckets']
def handler(event, context):for table_bucket in table_buckets:name = table_bucket['name']arn = table_bucket['arn']try:con.execute(f"DETACH {name};")except:passcon.execute(f"""ATTACH '{arn}' AS {name} (TYPE iceberg,ENDPOINT_TYPE s3_tables);""")sql = event.get("sql")try:result = con.execute(sql).fetchall()return {"statusCode": 200,"result": result}except Exception as e:return {"statusCode": 500,"error": str(e)}

Dockerfile 可以参考插入部分的 Dockerfile,通过镜像部署到 Lambda,并设置好对应的 IAM 角色权限以及 Lambda 的超时以及内存设置。这里代码通过 duckdb.connect(database=’:memory:’, config={‘memory_limit’: ‘9GB’,’worker_threads’: 5,’temp_directory’:’/tmp/file/overmem’})来设置最大内存的使用和工作的线程数,这个可以根据实际的需要来调整

数据分析实践

测试数据集:电商用户行为数据,总量 13 亿数据,字段如下:

user_id               STRING       ‘用户ID(非真实ID),经抽样&字段脱敏处理后得到’

item_id               STRING       ‘商品ID(非真实ID),经抽样&字段脱敏处理后得到’

item_category    STRING       ‘商品类别ID(非真实ID),经抽样&字段脱敏处理后得到’

behavior_type    STRING       ‘用户对商品的行为类型,包括浏览、收藏、加购物车、购买,pv,fav,cart,buy)’

behavior_time    STRING       ‘行为时间,精确到小时级别’

测试 sql:用户行为数据漏斗分析

WITH user_behavior_counts AS (SELECTuser_id,SUM(CASE WHEN behavior_type = 'pv' THEN 1 ELSE 0 END) AS view_count,SUM(CASE WHEN behavior_type = 'fav' THEN 1 ELSE 0 END) AS favorite_count,SUM(CASE WHEN behavior_type = 'cart' THEN 1 ELSE 0 END) AS cart_count,SUM(CASE WHEN behavior_type = 'buy' THEN 1 ELSE 0 END) AS purchase_countFROM testtable.testdb.commerce_shoppingGROUP BY user_id
),
funnel_stages AS (SELECTCOUNT(DISTINCT user_id) AS total_users,COUNT(DISTINCT CASE WHEN view_count > 0 THEN user_id END) AS users_with_views,COUNT(DISTINCT CASE WHEN favorite_count > 0 THEN user_id END) AS users_with_favorites,COUNT(DISTINCT CASE WHEN cart_count > 0 THEN user_id END) AS users_with_cart_adds,COUNT(DISTINCT CASE WHEN purchase_count > 0 THEN user_id END) AS users_with_purchasesFROM user_behavior_counts
)
SELECTtotal_users,users_with_views,users_with_favorites,users_with_cart_adds,users_with_purchases,ROUND(100.0 * users_with_views / total_users, 2) AS view_rate,ROUND(100.0 * users_with_favorites / users_with_views, 2) AS view_to_favorite_rate,ROUND(100.0 * users_with_cart_adds / users_with_favorites, 2) AS favorite_to_cart_rate,ROUND(100.0 * users_with_purchases / users_with_cart_adds, 2) AS cart_to_purchase_rate,ROUND(100.0 * users_with_purchases / total_users, 2) AS overall_conversion_rate
FROM funnel_stages;

Lambda 测试结果:消耗内存 1934M

用时:37s

关键优势

将 PyIceberg 和 DuckDB 运行在 AWS Lambda 上来访问 S3 上的 Iceberg 表,这种 Serverless 数据湖模式主要的优势如下:

  • 低门槛:主要依赖 python 和 sql,这两种是数据开发领域最常见的技能,大大降低了学习和使用的门槛,同时基础设施 0 依赖且易于部署,不需要投入基础设施运维。
  • 高性价比:Lambda 按实际计算时间付费且自动伸缩,而 S3 的存储成本也较为低廉。加上 DuckDB 高性能的特性,这意味着更短的 Lambda 执行时间,进一步降低成本。
  • 开源与灵活性:核心组件 Apache Iceberg、DuckDB 和 PyIceberg 均为广泛应用的开源项目。受益于活跃的开源社区支持,可以获得持续的功能更新、问题修复和丰富的学习资源。

典型应用场景

  • 低成本海量分析负载对于需要控制成本,但仍需进行有效数据分析的场景,如中小型企业或特定项目预算有限的情况。
  • 非频繁或突发性查询如定期的报表生成、临时的业务数据洞察、偶尔进行的数据探索等,这些场景下按需付费的 Lambda + DuckDB 极具优势。
  • 事件驱动的数据处理由 S3 事件触发 Lambda (PyIceberg) 进行数据验证、转换和加载到 Iceberg 表,后续可由另一个 Lambda (DuckDB) 进行即时查询或聚合。
  • 交互式查询接口后端通过 API Gateway 暴露一个 Lambda (DuckDB) 端点,为内部用户或应用提供一个低成本的 SQL 查询接口,用于查询特定范围的数据。
  • 快速原型验证在开发或研究阶段,快速搭建一个功能完备的数据湖查询环境,用于验证想法或进行小规模实验。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/81833.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

流复备机断档处理

文章目录 环境症状问题原因解决方案 环境 系统平台:UOS(海光),UOS (飞腾),UOS(鲲鹏),UOS(龙芯),UOS (申威),银河麒麟svs(X86_64&…

【蓝桥杯真题精讲】第 16 届 Python A 组(省赛)

文章目录 T1 偏蓝 (5/5)T2 IPv6 (0/5)T3 2025 图形 (10/10)T4 最大数字 (10/10)T5 倒水 (15/15)T6 拼好数 (0/15)T7 登山 (20/20)T8 原料采购 (20/20) 更好的阅读体验 高速访问:https://wiki.dwj601.cn/ds-and-algo/lan-qiao-cup/16th-python-a/永久链接&#xff1…

SpringBoot+Dubbo+Zookeeper实现分布式系统步骤

SpringBootDubboZookeeper实现分布式系统 一、分布式系统通俗解释二、环境准备(详细版)1. 软件版本2. 安装Zookeeper(单机模式) 三、完整项目结构(带详细注释)四、手把手代码实现步骤1:创建父工…

Spring的业务层,持久层,控制层的关系

在 Spring 框架中,控制层(Controller)、业务层(Service) 和 持久层(Repository/Mapper) 是分层架构的核心组成部分,职责分离明确,通过依赖注入(DI&#xff09…

css实现不确定内容的高度过渡

实现效果&#xff1a;鼠标悬浮按钮&#xff0c;高度过渡出现如图所示文本框 代码&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-widt…

计算机视觉与深度学习 | matlab实现ARIMA-WOA-CNN-LSTM时间序列预测(完整源码和数据)

以下是一个基于MATLAB的ARIMA-WOA-CNN-LSTM时间序列预测框架。由于完整代码较长,此处提供核心模块和实现思路,完整源码和数据可通过文末方式获取。 1. 数据准备(示例数据) 使用MATLAB内置的航空乘客数据集: % 加载数据 data = readtable(airline-passengers.csv); data …

在 Excel 中使用东方仙盟软件————仙盟创梦IDE

安装插件 用仙盟创梦编写插件代码 源码 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using ExcelDna.Integration;namespace 东方仙盟.仙盟创梦IDE_招标系统 {public static class 仙盟创梦_招标专…

Sql刷题日志(day9)

一、笔试 1、limit offset&#xff1a;分页查询 SELECT column1, column2, ... FROM table_name LIMIT number_of_rows OFFSET start_row; --跳过前 start_row 行&#xff0c;返回接下来的 number_of_rows 行。 2、lag、lead&#xff1a;查询前后行数据 --lag函数用于访问当…

C++面试3——const关键字的核心概念、典型场景和易错陷阱

const关键字的核心概念、典型场景和易错陷阱 一、const本质&#xff1a;类型系统的守护者 1. 与#define的本质差异 维度#defineconst编译阶段预处理替换编译器类型检查作用域无作用域&#xff08;全局污染&#xff09;遵循块作用域调试可见性符号消失保留符号信息类型安全无类…

16-看门狗和RTC

一、独立看门狗 1、独立看门狗概述 在由单片机构成的微型计算机系统中&#xff0c;由于单片机的工作常常会受到来自外界电磁场的干扰&#xff0c;造成程序的跑飞&#xff08;不按照正常程序进行运行&#xff0c;如程序重启&#xff0c;但是如果我们填加看门狗的技术&#xff0…

w~自动驾驶~合集3

我自己的原文哦~ https://blog.51cto.com/whaosoft/13269720 #FastOcc 推理更快、部署友好Occ算法来啦&#xff01; 在自动驾驶系统当中&#xff0c;感知任务是整个自驾系统中至关重要的组成部分。感知任务的主要目标是使自动驾驶车辆能够理解和感知周围的环境元素&…

怎么打包发布到npm?——从零到一的详细指南

怎么打包发布到npm&#xff1f;——从零到一的详细指南 目录 怎么打包发布到npm&#xff1f;——从零到一的详细指南一、准备工作1. 注册 npm 账号2. 安装 Node.js 和 npm 二、初始化项目三、编写你的代码四、配置 package.json五、打包你的项目六、登录 npm七、发布到 npm八、…

【C++ - 仿mudou库one thread one loop式高并发服务器实现】

文章目录 项目介绍项目模块和服务器主要设计模式项目主要流程前置知识1.bind函数2.定时器任务TimerTask和时间轮思想TimerWheel3.正则表达式4.通用型容器Any类 服务器设计模式1&#xff09;单Reactor单线程模式2&#xff09;单Reactor多线程模式3&#xff09;多Reactor多线程模…

RISC-V 开发板 MUSE Pi Pro USB 测试(3.0 U盘,2.0 UVC摄像头)

视频讲解&#xff1a; RISC-V 开发板 MUSE Pi Pro USB 测试&#xff08;3.0 U盘&#xff0c;2.0 UVC摄像头&#xff09; 总共开发板有4个USB的A口&#xff0c;1个USB的TypeC口&#xff0c;我们插上两个USB3.0的U盘和一个USB2.0的UVC摄像头来进行测试 lsusb -tv 可以看到有3个US…

docker学习与使用(概念、镜像、容器、数据卷、dockerfile等)

文章目录 前言引入docker 简介docker的应用场景docker的虚拟化技术VS虚拟机docker的优点docker架构Docker仓库Docker镜像linux操作系统的大致组成部分 Docker容器 docker安装与启动校验版本移除旧的版本安装依赖工具设置软件源安装docker验证 配置镜像加速器docker服务相关命令…

记录一次服务器卡顿

一、服务器卡顿现象 服务用了一段时间后&#xff0c;突然很卡&#xff0c;发现在服务器上新建excel也很卡&#xff0c;发现服务器中病毒了&#xff0c;然后重新安装了操作系统。重新安装服务环境时&#xff0c;发现同时安装pdf、tomcat时都很慢&#xff0c;只能一个安装好了&am…

基于 Reactor 的 Java 高性能异步编程:响应式流与背压详解

本文将围绕 Reactor 框架&#xff0c;深入剖析响应式流的核心机制&#xff0c;重点讲解背压&#xff08;Backpressure&#xff09;的实现原理与实际应用。通过理论结合实践&#xff0c;希望帮助你真正掌握 Java 世界的响应式异步编程。 一、响应式编程与 Reactor 简介 1.1 什么…

知识蒸馏实战:用PyTorch和预训练模型提升小模型性能

在深度学习的浪潮中&#xff0c;我们常常追求更大、更深、更复杂的模型以达到最先进的性能。然而&#xff0c;这些“庞然大物”般的模型往往伴随着高昂的计算成本和缓慢的推理速度&#xff0c;使得它们难以部署在资源受限的环境中&#xff0c;如移动设备或边缘计算平台。知识蒸…

python:mysql全局大览(保姆级教程)

本文目录&#xff1a; 一、关于数据库**二、sql语言分类**三、数据库增删改查操作**四、库中表增删改查操作**五、表中记录插入**六、表约束**七、单表查询**八、多表查询**&#xff08;一&#xff09;外键约束**&#xff08;二&#xff09;连结查询**1.交叉连接&#xff08;笛…

Android framework 问题记录

一、休眠唤醒&#xff0c;很快熄屏 1.1 问题描述 机器休眠唤醒后&#xff0c;没有按照约定的熄屏timeout 进行熄屏&#xff0c;很快就熄屏&#xff08;约2s~3s左右&#xff09; 1.2 原因分析&#xff1a; 抓取相关log&#xff0c;打印休眠背光 相关调用栈 //具体打印调用栈…