Flink 读 Parquet RowData / Avro(Generic/Specific/Reflect)+ FileSource 批流一体 + PyFlink 依赖

1. 依赖准备:flink-parquet 与 parquet-avro

1.1 Java:读取 Parquet 的基础依赖

要在 Flink 里读取 Parquet,你需要加flink-parquet

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-parquet</artifactId><version>2.2.0</version></dependency>

1.2 Java:读成 Avro Record 还需要 parquet-avro

如果你希望读取后输出 Avro Record(Generic/Specific/Reflect),再加parquet-avro,并且一般会排掉一些不需要的传递依赖:

<dependency><groupId>org.apache.parquet</groupId><artifactId>parquet-avro</artifactId><version>1.12.2</version><optional>true</optional><exclusions><exclusion><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId></exclusion><exclusion><groupId>it.unimi.dsi</groupId><artifactId>fastutil</artifactId></exclusion></exclusions></dependency>

1.3 PyFlink:需要额外 JAR

在 PyFlink 中使用 Parquet,需要引入对应的PyFlink JAR(通过 Python dependency management 管理)。核心点是:Python 作业本身不自动携带这些 Java format 依赖,你得把 JAR 带上。

2. FileSource:同一套 Source 同时支持批与流(目录监控)

FileSource是 Flink 新一代 Source(可用于 batch 和 streaming),配合 Parquet format 可以覆盖两类数据形态:

  • Bounded(有界):启动时列举目录中所有文件并读完
  • Unbounded(无界):持续监控目录,新文件出现就继续读(“文件流”)

默认是 bounded。如果要 unbounded,需要额外开启目录监控:

FileSource.forBulkFileFormat(bulkFormat,path).monitorContinuously(Duration.ofMillis(5L)).build();

你可以理解为:bounded 是“跑一次把目录扫完”,unbounded 是“目录里持续落文件,我持续消费”。

3. 两种读取方式:BulkFormat(向量化) vs StreamFormat(记录流)

Flink 读 Parquet 有两条路:

3.1 Vectorized reader:BulkFormat(批量/向量化解码)

特点:列式批量解码,一批一批输出,通常性能更好,适合 RowData、列投影等。

FileSource.forBulkFileFormat(BulkFormat,Path...)

3.2 Avro Parquet reader:StreamFormat(Record stream)

特点:以 Record 流形式输出,常用于 Avro Record 场景。

FileSource.forRecordStreamFormat(StreamFormat,Path...)

是否监控目录变成 unbounded,同样通过.monitorContinuously(...)实现。

4. 读成 Flink RowData:列投影 + 批量大小 + 时区/大小写控制

如果你想把 Parquet 直接读成 Flink 内部的RowData(对 Table/内部算子更友好),可以用ParquetColumnarRowInputFormat

这个例子做了几件很“生产化”的事:

  • 只投影读取字段:f7,f4,f99(减少 IO 与解码成本)
  • 批量解码:每批 500 条
  • 时间戳按 UTC 解释(布尔参数)
  • Parquet 字段名大小写敏感(布尔参数)
  • 不启用 watermark(因为文件记录没事件时间)
finalLogicalType[]fieldTypes=newLogicalType[]{newDoubleType(),newIntType(),newVarCharType()};finalRowTyperowType=RowType.of(fieldTypes,newString[]{"f7","f4","f99"});finalParquetColumnarRowInputFormat<FileSourceSplit>format=newParquetColumnarRowInputFormat<>(newConfiguration(),rowType,InternalTypeInfo.of(rowType),500,false,// timestamp as UTC (示例中为 false,请按你语义调整)true// case-sensitive field names);finalFileSource<RowData>source=FileSource.forBulkFileFormat(format,/* Flink Path */).build();finalDataStream<RowData>stream=env.fromSource(source,WatermarkStrategy.noWatermarks(),"file-source");

实战建议:

  • 文件字段很多时,优先用“投影”只读需要的列
  • 批量大小(这里 500)要结合内存与吞吐调优:太小解码开销大,太大可能内存峰值高
  • 时间戳 UTC 参数要统一,否则跨时区数据容易“看起来少 8 小时/多 8 小时”

5. 读成 Avro Records:Generic / Specific / Reflect 三种姿势

Flink 读取 Parquet 时,支持输出 3 类 Avro Record(PyFlink 目前只支持 Generic Record):

  • GenericRecord:运行时提供 Avro Schema(JSON),最灵活
  • SpecificRecord:用 Avro 工具生成 Java 类,类型最强
  • ReflectRecord:直接用 Java POJO 反射生成 Schema,门槛低但有前置要求

5.1 GenericRecord:给 Schema JSON,直接读

先解析 schema(示例是拼 JSON 字符串,也可以从文件/流读取),再用AvroParquetReaders.forGenericRecord(schema)

finalSchemaschema=newSchema.Parser().parse("{\"type\":\"record\","+"\"name\":\"User\","+"\"fields\":["+" {\"name\":\"name\",\"type\":\"string\"},"+" {\"name\":\"favoriteNumber\",\"type\":[\"int\",\"null\"]},"+" {\"name\":\"favoriteColor\",\"type\":[\"string\",\"null\"]}"+"]}");finalFileSource<GenericRecord>source=FileSource.forRecordStreamFormat(AvroParquetReaders.forGenericRecord(schema),/* Flink Path */).build();env.enableCheckpointing(10L);finalDataStream<GenericRecord>stream=env.fromSource(source,WatermarkStrategy.noWatermarks(),"file-source");

适合:

  • schema 经常变
  • 你想在作业里动态处理字段(比如按字段名取值)

5.2 SpecificRecord:代码生成后直接读(推荐长期稳定 schema)

.avsc放在项目中,用 Avro Maven 插件或 avro-tools 生成Address这类 class。之后你无需在代码里手写 schema:

finalFileSource<GenericRecord>source=FileSource.forRecordStreamFormat(AvroParquetReaders.forSpecificRecord(Address.class),/* Flink Path */).build();

适合:

  • schema 稳定
  • 强类型开发体验更好(IDE、编译期校验、重构安全)

5.3 ReflectRecord:用 POJO 反射读(最省事,但有坑)

你只提供一个 Java POJO(如Datum),Avro 通过反射生成 schema 和协议:

finalFileSource<GenericRecord>source=FileSource.forRecordStreamFormat(AvroParquetReaders.forReflectRecord(Datum.class),/* Flink Path */).build();

适合:

  • 快速验证/原型
  • 不想引入 Avro 代码生成

关键坑:Parquet 文件必须带“反射需要的 meta”
要支持 reflect 读取,写入 Parquet 的 Avro schema 必须包含 namespace,并且文件里要有类似:

  • extra: parquet.avro.schema = ...
  • writer.model.name = avro
  • file schema: <namespace>.<name>

也就是说,你写 Parquet 的时候用的 schema 要像这样包含 namespace(指向你的 Java 包路径):

finalStringschema="{\"type\":\"record\","+"\"name\":\"User\","+"\"namespace\":\"org.apache.flink.formats.parquet.avro\","+"\"fields\":[ ... ]}";

当你的User类在org.apache.flink.formats.parquet.avro包下时,Flink 才能在 reflect 模式下正确匹配并反射。

6. 什么时候选 RowData,什么时候选 Avro

一个快速决策表:

  • 你主要做 Flink 内部计算、投影字段、追求吞吐:选RowData + BulkFormat(向量化)

  • 你要和外部系统以 schema 驱动交换、或你已经用 Avro 生态:选Avro

    • 追求灵活:GenericRecord
    • 追求强类型:SpecificRecord
    • 快速省事:ReflectRecord(但确保文件 meta 完整且 namespace 匹配)

PyFlink 场景下,通常优先GenericRecord(因为限制更少)。

7. 一个“最常用”的落地模式:目录监控 + checkpoint

很多文件流场景是:上游不断往目录落 Parquet,新文件就是增量。你可以:

  • monitorContinuously(...)变成 unbounded
  • 开启 checkpoint,让作业重启后不会重复读/不会丢(具体一致性语义取决于 source、文件可见性和 checkpoint 策略)

示意:

finalFileSource<RowData>source=FileSource.forBulkFileFormat(format,path).monitorContinuously(Duration.ofSeconds(5)).build();env.enableCheckpointing(10_000);env.fromSource(source,WatermarkStrategy.noWatermarks(),"parquet-files");

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/1163516.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度测评10个AI论文平台,自考学生轻松搞定毕业论文!

深度测评10个AI论文平台&#xff0c;自考学生轻松搞定毕业论文&#xff01; 自考路上的得力助手&#xff1a;AI论文工具如何改变你的写作方式 在自考学习过程中&#xff0c;撰写毕业论文往往是最让人头疼的一环。面对复杂的选题、繁重的文献整理以及反复修改的要求&#xff0c;…

深度复盘----计算机专业读了四年,我才明白这 5 个残酷真相(拒绝精神内耗)

摘要&#xff1a;很多计算机专业的同学从大一开始就陷入了“内卷”与“迷茫”的叠加态。学了 C 语言不懂指针&#xff0c;学了 Java 不懂高并发&#xff0c;天天去图书馆却写不出一个像样的项目。本文是我对计算机学习之路的深度复盘&#xff0c;分享 5 个我希望大一就能知道的…

高效学习----告别“视频收藏夹吃灰”!计算机专业如何建立“输出倒逼输入”的学习闭环?

摘要&#xff1a;你是否也收藏了几个 G 的视频教程却从未打开&#xff1f;是否在此刻觉得自己“学废了”&#xff0c;过两天又全忘了&#xff1f;本文不谈鸡汤&#xff0c;只谈方法论。教你如何利用“费曼技巧”和“开源思维”&#xff0c;构建一套杀手级的高效技术学习体系。&…

Flink JSON 序列化/反序列化 Schema KafkaSource/KafkaSink + 自定义 ObjectMapper + PyFlink Row

1. JsonDeserializationSchema&#xff1a;KafkaSource 中反序列化 POJO JsonDeserializationSchema 实现了 Flink 的 DeserializationSchema&#xff0c;因此只要某个 connector 支持 DeserializationSchema&#xff0c;你就能直接使用它。 典型用法&#xff1a;KafkaSource…

【项目管理】项目管理流程文件(PPT)

1、项目启动2、制定项目计划3、项目执行4、项目监控5、项目收尾软件全套资料部分文档清单&#xff1a; 工作安排任务书&#xff0c;可行性分析报告&#xff0c;立项申请审批表&#xff0c;产品需求规格说明书&#xff0c;需求调研计划&#xff0c;用户需求调查单&#xff0c;用…

火焰识别,火焰检测,火灾检测,基于yolov5的火焰检测,可以检测视频和图片,视频实时检测,将训练好的模型部署到英伟达边缘计算 基于 YOLOv5 的高精度、高帧率火焰检测系统

火焰识别&#xff0c;火焰检测&#xff0c;火灾检测&#xff0c;基于yolov5的火焰检测&#xff0c;可以检测视频和图片&#xff0c;也可视频实时检测&#xff0c;检测准确率高&#xff0c;帧率很高&#xff0c;有标注好的训练数据集可以自己重新训练。 可以将训练好的模型部署…

学长亲荐2026 MBA必用TOP10 AI论文工具测评

学长亲荐2026 MBA必用TOP10 AI论文工具测评 2026年MBA学术写作工具测评&#xff1a;为何需要这份榜单&#xff1f; 随着人工智能技术在学术领域的广泛应用&#xff0c;MBA学生和研究者在论文写作、数据分析、文献综述等环节中&#xff0c;越来越依赖AI工具提升效率与质量。然而…

期刊论文投稿快人一步!虎贲等考 AI 解锁学术发表 “加速器”

在学术发表竞争日趋激烈的当下&#xff0c;一篇优质期刊论文的诞生&#xff0c;往往要历经选题打磨、文献梳理、数据论证、格式校准等重重关卡。很多科研人明明手握扎实的研究成果&#xff0c;却因论文写作不规范、逻辑不严谨、格式不达标&#xff0c;屡屡在投稿环节碰壁。虎贲…

还在为降重降 AIGC 抓狂?虎贲等考 AI:学术改写天花板,两步搞定合规论文

查重率飙红、AIGC 检测亮灯&#xff0c;堪称学术写作的 “双重暴击”&#xff01;不少同学吐槽&#xff1a;“改了十遍重复率还是超标”“AI 写的内容一眼被识破”“降重后语句不通&#xff0c;逻辑全乱”。别慌&#xff01;虎贲等考 AI 智能写作平台&#xff08;官网&#xff…

PetaLinux工程目录设备树文件结构与作用

设备树文件列表 wpfminglie:~/petalinux/ant$ find . \( -path ./build -o -path ./tmp -o -path ./out -o -path ./components/yocto \) -prune -o -type f \( -name "*.dts" -o -name "*.dtsi" \) -print ./components/plnx_workspace/device-tree/devic…

机器人诊断系统十年演进

下面给你一条专门针对机器人系统的 「机器人诊断系统十年演进路线&#xff08;2025–2035&#xff09;」。 这里的“诊断系统”不是简单的“查日志、看告警”&#xff0c;而是机器人如何理解自身失效、判断风险、选择修复策略&#xff0c;并在长期运行中避免重复犯错。一、核心…

智能巡检车、无人机道路检测、AI 路况分析平台 智慧交通 驾驶视角道路病害缺陷检测数据集 建立基于深度学习框架YOLOV8道路病害缺陷检测系统 裂纹 网快 坑洼

道路缺陷检测数据集使用labelimg标注&#xff0c;标签的格式是txt格式&#xff0c;适用于yolo目标检测系列所有版本训练数据集。 标注了&#xff08;裂纹&#xff08;Crack&#xff09;、 检查井&#xff08;Manhole&#xff09;、 网&#xff08;Net&#xff09;、 裂纹块&…

ECC错误

保护机制误差校正码Error Correcting CodeECC可以防止存储在内存中的数据出现错误&#xff0c;提高系统的功能安全性&#xff0c;避免因读取错误的数据而导致错误。ECC的主要影响是在易失性存储器&#xff08;RAM&#xff09;中&#xff0c;其中技术的小型化导致更高的位翻转风…

机器人感知技术十年演进

下面给你一条专门聚焦机器人感知&#xff08;Perception&#xff09;的 「机器人感知技术十年演进路线&#xff08;2025–2035&#xff09;」。 我会刻意避开“更高精度、更大模型”的表层叙事&#xff0c;直指感知在真实世界长期运行中真正要解决的问题。一、核心判断&#xf…

使用C#控制台批量删除 Unity目录里的 .meta文件

因为Unity会生成.meta文件,有的时候比如我 SteamingAssets里面有很多视频文件 是.mp4格式的,某些原因我需要将里面的所有视频文件改为.webm格式,那么会残留很多 .meta文件我们可以创建一个控制台,批量删除class Program {static void Main(string[] args){if (args.Length 0 |…

机器人日志十年演进

下面给你一条专门针对机器人系统的 「机器人日志十年演进路线&#xff08;2025–2035&#xff09;」。 这里的“日志”不是简单的 printf&#xff0c;而是机器人如何记住自己做过什么、为什么这么做、以及如何避免重蹈覆辙。一、核心判断&#xff08;一句话&#xff09;未来十年…

全方位CRM源码系统功能详解,完全开源,支持个性化定制

温馨提示&#xff1a;文末有资源获取方式 随着市场竞争加剧&#xff0c;企业销售团队亟需一套高效工具来管理客户关系和优化销售流程。一款专为销售团队设计的CRM客户关系管理系统源码应运而生&#xff0c;它集成了多种实用功能&#xff0c;帮助企业实现客户数据整合、商机追踪…

机器人诊断十年演进

下面给你一条专门针对机器人系统的 「机器人诊断十年演进路线&#xff08;2025–2035&#xff09;」。 这里的“诊断”不是 IT 意义上的排错&#xff0c;而是机器人在真实世界中如何理解自身失效、判断风险、选择修复策略&#xff0c;并避免重复犯错。一、核心判断&#xff08;…

亲测好用10个AI论文网站,专科生毕业论文轻松搞定!

亲测好用10个AI论文网站&#xff0c;专科生毕业论文轻松搞定&#xff01; AI 工具如何助力论文写作&#xff1f; 在当今的学术环境中&#xff0c;AI 工具正逐渐成为学生和研究者的重要助手。对于专科生而言&#xff0c;撰写一篇符合要求的毕业论文往往是一项挑战。而 AI 降重工…

支持多终端的CRM系统源码 带完整的搭建部署教程以及源代码包

温馨提示&#xff1a;文末有资源获取方式 企业销售团队需要能够随时随地管理客户关系的解决方案。一款创新的CRM客户关系管理系统源码正式发布&#xff0c;特别集成Uniapp支持&#xff0c;可编译出微信小程序和H5&#xff0c;为企业提供无缝的多终端体验。该系统源码完全开源&a…