1. 依赖准备:flink-parquet 与 parquet-avro
1.1 Java:读取 Parquet 的基础依赖
要在 Flink 里读取 Parquet,你需要加flink-parquet:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-parquet</artifactId><version>2.2.0</version></dependency>1.2 Java:读成 Avro Record 还需要 parquet-avro
如果你希望读取后输出 Avro Record(Generic/Specific/Reflect),再加parquet-avro,并且一般会排掉一些不需要的传递依赖:
<dependency><groupId>org.apache.parquet</groupId><artifactId>parquet-avro</artifactId><version>1.12.2</version><optional>true</optional><exclusions><exclusion><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId></exclusion><exclusion><groupId>it.unimi.dsi</groupId><artifactId>fastutil</artifactId></exclusion></exclusions></dependency>1.3 PyFlink:需要额外 JAR
在 PyFlink 中使用 Parquet,需要引入对应的PyFlink JAR(通过 Python dependency management 管理)。核心点是:Python 作业本身不自动携带这些 Java format 依赖,你得把 JAR 带上。
2. FileSource:同一套 Source 同时支持批与流(目录监控)
FileSource是 Flink 新一代 Source(可用于 batch 和 streaming),配合 Parquet format 可以覆盖两类数据形态:
- Bounded(有界):启动时列举目录中所有文件并读完
- Unbounded(无界):持续监控目录,新文件出现就继续读(“文件流”)
默认是 bounded。如果要 unbounded,需要额外开启目录监控:
FileSource.forBulkFileFormat(bulkFormat,path).monitorContinuously(Duration.ofMillis(5L)).build();你可以理解为:bounded 是“跑一次把目录扫完”,unbounded 是“目录里持续落文件,我持续消费”。
3. 两种读取方式:BulkFormat(向量化) vs StreamFormat(记录流)
Flink 读 Parquet 有两条路:
3.1 Vectorized reader:BulkFormat(批量/向量化解码)
特点:列式批量解码,一批一批输出,通常性能更好,适合 RowData、列投影等。
FileSource.forBulkFileFormat(BulkFormat,Path...)3.2 Avro Parquet reader:StreamFormat(Record stream)
特点:以 Record 流形式输出,常用于 Avro Record 场景。
FileSource.forRecordStreamFormat(StreamFormat,Path...)是否监控目录变成 unbounded,同样通过.monitorContinuously(...)实现。
4. 读成 Flink RowData:列投影 + 批量大小 + 时区/大小写控制
如果你想把 Parquet 直接读成 Flink 内部的RowData(对 Table/内部算子更友好),可以用ParquetColumnarRowInputFormat。
这个例子做了几件很“生产化”的事:
- 只投影读取字段:
f7,f4,f99(减少 IO 与解码成本) - 批量解码:每批 500 条
- 时间戳按 UTC 解释(布尔参数)
- Parquet 字段名大小写敏感(布尔参数)
- 不启用 watermark(因为文件记录没事件时间)
finalLogicalType[]fieldTypes=newLogicalType[]{newDoubleType(),newIntType(),newVarCharType()};finalRowTyperowType=RowType.of(fieldTypes,newString[]{"f7","f4","f99"});finalParquetColumnarRowInputFormat<FileSourceSplit>format=newParquetColumnarRowInputFormat<>(newConfiguration(),rowType,InternalTypeInfo.of(rowType),500,false,// timestamp as UTC (示例中为 false,请按你语义调整)true// case-sensitive field names);finalFileSource<RowData>source=FileSource.forBulkFileFormat(format,/* Flink Path */).build();finalDataStream<RowData>stream=env.fromSource(source,WatermarkStrategy.noWatermarks(),"file-source");实战建议:
- 文件字段很多时,优先用“投影”只读需要的列
- 批量大小(这里 500)要结合内存与吞吐调优:太小解码开销大,太大可能内存峰值高
- 时间戳 UTC 参数要统一,否则跨时区数据容易“看起来少 8 小时/多 8 小时”
5. 读成 Avro Records:Generic / Specific / Reflect 三种姿势
Flink 读取 Parquet 时,支持输出 3 类 Avro Record(PyFlink 目前只支持 Generic Record):
- GenericRecord:运行时提供 Avro Schema(JSON),最灵活
- SpecificRecord:用 Avro 工具生成 Java 类,类型最强
- ReflectRecord:直接用 Java POJO 反射生成 Schema,门槛低但有前置要求
5.1 GenericRecord:给 Schema JSON,直接读
先解析 schema(示例是拼 JSON 字符串,也可以从文件/流读取),再用AvroParquetReaders.forGenericRecord(schema):
finalSchemaschema=newSchema.Parser().parse("{\"type\":\"record\","+"\"name\":\"User\","+"\"fields\":["+" {\"name\":\"name\",\"type\":\"string\"},"+" {\"name\":\"favoriteNumber\",\"type\":[\"int\",\"null\"]},"+" {\"name\":\"favoriteColor\",\"type\":[\"string\",\"null\"]}"+"]}");finalFileSource<GenericRecord>source=FileSource.forRecordStreamFormat(AvroParquetReaders.forGenericRecord(schema),/* Flink Path */).build();env.enableCheckpointing(10L);finalDataStream<GenericRecord>stream=env.fromSource(source,WatermarkStrategy.noWatermarks(),"file-source");适合:
- schema 经常变
- 你想在作业里动态处理字段(比如按字段名取值)
5.2 SpecificRecord:代码生成后直接读(推荐长期稳定 schema)
把.avsc放在项目中,用 Avro Maven 插件或 avro-tools 生成Address这类 class。之后你无需在代码里手写 schema:
finalFileSource<GenericRecord>source=FileSource.forRecordStreamFormat(AvroParquetReaders.forSpecificRecord(Address.class),/* Flink Path */).build();适合:
- schema 稳定
- 强类型开发体验更好(IDE、编译期校验、重构安全)
5.3 ReflectRecord:用 POJO 反射读(最省事,但有坑)
你只提供一个 Java POJO(如Datum),Avro 通过反射生成 schema 和协议:
finalFileSource<GenericRecord>source=FileSource.forRecordStreamFormat(AvroParquetReaders.forReflectRecord(Datum.class),/* Flink Path */).build();适合:
- 快速验证/原型
- 不想引入 Avro 代码生成
关键坑:Parquet 文件必须带“反射需要的 meta”
要支持 reflect 读取,写入 Parquet 的 Avro schema 必须包含 namespace,并且文件里要有类似:
extra: parquet.avro.schema = ...writer.model.name = avrofile schema: <namespace>.<name>
也就是说,你写 Parquet 的时候用的 schema 要像这样包含 namespace(指向你的 Java 包路径):
finalStringschema="{\"type\":\"record\","+"\"name\":\"User\","+"\"namespace\":\"org.apache.flink.formats.parquet.avro\","+"\"fields\":[ ... ]}";当你的User类在org.apache.flink.formats.parquet.avro包下时,Flink 才能在 reflect 模式下正确匹配并反射。
6. 什么时候选 RowData,什么时候选 Avro
一个快速决策表:
你主要做 Flink 内部计算、投影字段、追求吞吐:选RowData + BulkFormat(向量化)
你要和外部系统以 schema 驱动交换、或你已经用 Avro 生态:选Avro
- 追求灵活:GenericRecord
- 追求强类型:SpecificRecord
- 快速省事:ReflectRecord(但确保文件 meta 完整且 namespace 匹配)
PyFlink 场景下,通常优先GenericRecord(因为限制更少)。
7. 一个“最常用”的落地模式:目录监控 + checkpoint
很多文件流场景是:上游不断往目录落 Parquet,新文件就是增量。你可以:
monitorContinuously(...)变成 unbounded- 开启 checkpoint,让作业重启后不会重复读/不会丢(具体一致性语义取决于 source、文件可见性和 checkpoint 策略)
示意:
finalFileSource<RowData>source=FileSource.forBulkFileFormat(format,path).monitorContinuously(Duration.ofSeconds(5)).build();env.enableCheckpointing(10_000);env.fromSource(source,WatermarkStrategy.noWatermarks(),"parquet-files");