CSV Format Flink / PyFlink 读写 CSV 的正确姿势(含 Schema 高级配置)

1、依赖引入

Java/Scala 工程需要加 Flink CSV 依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>2.2.0</version></dependency>

PyFlink 用户一般可以直接在作业里使用(前提是集群环境里对应的 jar 能被加载;如果你是在远程集群跑,仍然需要按你前面“依赖管理”章节的方式把 jar 加入pipeline.jarsenv.add_jars())。

2、Java:快速读取 POJO(自动推导 Schema)

最省事的方式:让 Jackson 根据 POJO 字段推导 CSV schema:

CsvReaderFormat<SomePojo>csvFormat=CsvReaderFormat.forPojo(SomePojo.class);FileSource<SomePojo>source=FileSource.forRecordStreamFormat(csvFormat,Path.fromLocalFile(...)).build();

注意:CSV 列顺序必须和 POJO 字段顺序一致。必要时加:

@JsonPropertyOrder({"field1","field2",...})

否则列对不上会出现解析错位(最常见的“字段都不为空但值都错”的隐性 bug)。

3、Java:高级配置(自定义分隔符、禁用引号等)

需要精细控制时,用forSchema(...)自己生成CsvSchema,例如把分隔符改成|,并禁用 quote:

Function<CsvMapper,CsvSchema>schemaGenerator=mapper->mapper.schemaFor(CityPojo.class).withoutQuoteChar().withColumnSeparator('|');CsvReaderFormat<CityPojo>csvFormat=CsvReaderFormat.forSchema(()->newCsvMapper(),schemaGenerator,TypeInformation.of(CityPojo.class));FileSource<CityPojo>source=FileSource.forRecordStreamFormat(csvFormat,Path.fromLocalFile(...)).build();

对应 CSV:

Berlin|52.5167|13.3833|Germany|DE|Berlin|primary|3644826 San Francisco|37.7562|-122.443|United States|US|California||3592294 Beijing|39.905|116.3914|China|CN|Beijing|primary|19433000

更复杂的类型也能做(比如数组列),通过CsvSchema.ColumnType.ARRAY并指定数组元素分隔符:

CsvReaderFormat<ComplexPojo>csvFormat=CsvReaderFormat.forSchema(CsvSchema.builder().addColumn(newCsvSchema.Column(0,"id",CsvSchema.ColumnType.NUMBER)).addColumn(newCsvSchema.Column(4,"array",CsvSchema.ColumnType.ARRAY).withArrayElementSeparator("#")).build(),TypeInformation.of(ComplexPojo.class));

4、PyFlink:手动定义 CSV Schema(输出为 Row)

PyFlink 里通常自己建 schema,每一列映射为 Row 字段:

frompyflink.common.watermark_strategyimportWatermarkStrategyfrompyflink.tableimportDataTypesfrompyflink.datastreamimportStreamExecutionEnvironmentfrompyflink.datastream.connectors.file_systemimportFileSourcefrompyflink.formats.csvimportCsvReaderFormat,CsvSchema# 具体 import 以你环境包结构为准env=StreamExecutionEnvironment.get_execution_environment()schema=CsvSchema.builder()\.add_number_column('id',number_type=DataTypes.BIGINT())\.add_array_column('array',separator='#',element_type=DataTypes.INT())\.set_column_separator(',')\.build()source=FileSource.for_record_stream_format(CsvReaderFormat.for_schema(schema),CSV_FILE_PATH).build()ds=env.from_source(source,WatermarkStrategy.no_watermarks(),'csv-source')# ds 的 record 类型是 Row(具名字段 + 复合类型)# Types.ROW_NAMED(['id', 'array'], [Types.LONG(), Types.LIST(Types.INT())])

对应 CSV:

0,1#2#3 1, 2,1

补一个实战提醒:如果某列可能为空(比如上面的array),你后续算子处理时要把None/空数组的分支写好,否则很容易在 map/flat_map 里触发类型错误。

5、PyFlink:写 CSV(Bulk Format)

写 CSV 通常用CsvBulkWriters生成 BulkWriterFactory,再配合FileSink.for_bulk_format(...)

frompyflink.tableimportDataTypesfrompyflink.datastream.connectors.file_systemimportFileSinkfrompyflink.formats.csvimportCsvBulkWriters,CsvSchema# 具体 import 以你环境包结构为准schema=CsvSchema.builder()\.add_number_column('id',number_type=DataTypes.BIGINT())\.add_array_column('array',separator='#',element_type=DataTypes.INT())\.set_column_separator(',')\.build()sink=FileSink.for_bulk_format(OUTPUT_DIR,CsvBulkWriters.for_schema(schema)).build()ds.sink_to(sink)

6、运行模式:Batch / Streaming 都可用

CsvReaderFormat类似TextLineInputFormat,既可用于批也可用于流(持续监控目录等),具体取决于你用的 Source/RuntimeMode 以及文件系统是否支持持续发现。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/1159437.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

直流母线电压采集与缓冲调理电路

一、电路类型与原理 这是一个直流母线电压采集与缓冲调理电路,主要功能是将高压直流母线(如 48V)分压后,通过运放构成的电压跟随器进行缓冲,再送入 ADC 进行精确采样。 分压网络:R38(240kΩ)和 R41(20kΩ)组成电阻分压,将 48V 输入按比例降到 3V 左右,满足后续 AD…

Aliro统一生态、UWB精准无感,2026智能门锁格局将迎巨变

曾几何时&#xff0c;智能门锁的进化似乎陷入了瓶颈。指纹识别受环境与皮肤状态影响&#xff0c;人脸识别在光线不佳时可能失灵&#xff0c;而蓝牙或NFC解锁仍需用户掏出手机完成一个“近场接触”的动作。行业亟待一种既能彻底解放双手、又能确保极高安全性的新方案。这一僵局&…

2026 年计算机圈赚钱技能:必学技术盘点,高薪赛道认准这些!

别再迷茫了&#xff0c;这些技能才是就业和财富的硬通货 经常有同学问&#xff1a;“现在学计算机&#xff0c;哪些技能最值得投入时间&#xff1f;学这些东西真的能赚钱吗&#xff1f;” 答案是肯定的&#xff0c;但选对方向很重要。计算机领域正在快速分化&#xff0c;有些…

国外论文参考文献怎么找:实用方法与资源推荐

刚开始做科研的时候&#xff0c;我一直以为&#xff1a; 文献检索就是在知网、Google Scholar 里反复换关键词。 直到后来才意识到&#xff0c;真正消耗精力的不是“搜不到”&#xff0c;而是—— 你根本不知道最近这个领域发生了什么。 生成式 AI 出现之后&#xff0c;学术检…

最近在搞永磁同步电机离线参数辨识的项目,发现不少新手在玩SVPWM时总会遇到死区补偿和高频注入这两个大坑。今天就拿Simulink模型说事,咱们边看代码边唠嗑

SVPWM死区补偿&#xff08;基于电流极性)高频注入法辨识PMSM的dq轴电感&#xff08;离线辨识&#xff09;—simulink先说说SVPWM的实现。在Simulink里用PWM Generator模块生成六路PWM信号时&#xff0c;记得把载波频率设成和实际硬件一致。比如我用的是20kHz&#xff0c;这时候…

深度学习毕设选题推荐:基于python_CNN机器学习卷积神经网络训练识别橘子是否新鲜基于python_CNN深度学习卷积神经网络训练识别橘子是否新鲜

博主介绍&#xff1a;✌️码农一枚 &#xff0c;专注于大学生项目实战开发、讲解和毕业&#x1f6a2;文撰写修改等。全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战 ✌️技术范围&#xff1a;&am…

基于51单片机的车辆出入光电传感管理系统设计

第1章 系统总体方案设计 1.1 整体方案设计 1.1.1控制系统的选择 方案一&#xff1a;采用DSP作为系统控制器。DSP&#xff08;Digital Signal Processor&#xff09;它是利用数字信号来处理信息的元器件&#xff0c;它对元件值的容限不敏感而且受外部因素影响较小&#xff0c;容…

Spark Streaming:Spark的实时流计算API

你好&#xff0c;我是程序员贵哥。 今天我要与你分享的内容是“Spark Streaming”。 通过上一讲的内容&#xff0c;我们深入了解了Spark SQL API。通过它&#xff0c;我们可以像查询关系型数据库一样查询Spark的数据&#xff0c;并且对原生数据做相应的转换和动作。 但是&#…

20 个超实用 CTF 练习平台,让你从菜鸟进阶大神!零基础入门到精通,看这篇就够!

在网络安全领域&#xff0c;CTF&#xff08;Capture The Flag&#xff09;和渗透测试是每个技术爱好者梦寐以求的技能。但很多人会问&#xff1a;“我该怎么入门&#xff1f;去哪里练习&#xff1f;” 别急&#xff0c;今天我们就为大家整理了20个超实用的CTF和渗透测试练习平台…

亲测好用专科生必看TOP9AI论文平台测评

亲测好用专科生必看TOP9AI论文平台测评 专科生专属AI论文平台测评&#xff1a;选对工具事半功倍 随着AI技术在学术领域的广泛应用&#xff0c;越来越多的专科生开始借助智能写作工具提升论文效率。然而&#xff0c;面对市场上五花八门的AI论文平台&#xff0c;如何选择真正适合…

基于STM32F407设计的汽车仪表系统

摘 要 当前&#xff0c;随着科技的发展&#xff0c;汽车功能的日益增多&#xff0c;汽车仪表系统也变得愈发多元化。传统的机械式指针仪表因为其繁琐的布线方式和点对点的通信方式&#xff0c;已不能满足当前行业的需求。因此&#xff0c;研究一款功能多样&#xff0c;结构简单…

COMSOL玩转锂枝晶:四种生长模式实操指南

comsol锂枝晶模型 四合一 1雪花枝晶 2单点形核 3多点形核 4形状形核 包含相场、浓度场和电场三种物理场&#xff08;雪花枝晶除外&#xff09;&#xff0c;其中单枝晶定向生长另外包含对应的参考文献。锂枝晶模拟可不止调参画图这么简单&#xff0c;今天咱们用COMSOL实现四种典…

收藏这份AI客服构建指南:有赞从0到1的实践经验与思考

有赞分享了AI客服系统从0到1的完整实践历程。项目始于黑客马拉松&#xff0c;初期选用Dify平台快速验证&#xff0c;后采用混合架构应对性能挑战。文章详细阐述了模型选择、Workflow设计、上下文管理、知识工程等关键技术环节&#xff0c;并分享了评测优化和协作管理的经验。核…

网络安全小白自学指南:不用拜师学艺,求人不如靠自己

前言&#xff1a; 趁着今天下班&#xff0c;我花了几个小时整理了下&#xff0c;非常不易&#xff0c;希望大家可以点赞收藏支持一波&#xff0c;谢谢。 我的经历&#xff1a; 我19年毕业&#xff0c;大学专业是物联网工程&#xff0c;我相信很多人在象牙塔里都很迷茫&…

建议收藏:大模型时代程序员新机遇:6大高薪岗位技能要求全解析

文章详细解析了程序员转行大模型的6大方向&#xff1a;NLP工程师、计算机视觉工程师、大模型算法工程师、部署工程师和产品经理。每个方向都阐述了其市场前景和具体技能要求&#xff0c;包括编程能力、数学基础、专业知识和实践经验。文章强调&#xff0c;程序员应根据自身兴趣…

全网最全8个AI论文写作软件,助你轻松搞定本科毕业论文!

全网最全8个AI论文写作软件&#xff0c;助你轻松搞定本科毕业论文&#xff01; AI 工具&#xff0c;让论文写作不再焦虑 在当前的学术环境中&#xff0c;越来越多的本科生开始借助 AI 工具来辅助论文写作。这些工具不仅能够帮助学生快速生成内容&#xff0c;还能有效降低 AIGC&…

【零基础必学】LangChain+PDF RAG系统实战教程:手把手教你从零搭建可收藏的智能知识库

本文详细介绍如何使用LangChain框架结合Streamlit前端&#xff0c;从零构建基于PDF文档的RAG知识库系统。系统支持多PDF上传&#xff0c;自动完成文本提取、分块、向量化&#xff0c;构建FAISS检索数据库&#xff0c;用户可基于上传文档内容提问&#xff0c;系统调用DeepSeek模…

使用安全版数据库开启ssl加密后jdbc写法

文章目录文档用途详细信息文档用途 本文用于指导使用安全版数据库开启ssl加密后jdbc连接串写法。 默认写法会提示拒绝ssl连接&#xff0c;虽然可以通过关闭ssl等方法解决&#xff0c;但是在等保测评和数据库安全上会有隐患。 详细信息 jdbc写法如下&#xff1a; jdbc.urlj…

Claude Skills深度解析:大模型智能体架构与Gemini 3对比分析

Claude Skills是智能体可动态加载的"能力模块"&#xff0c;包含逻辑与执行脚本&#xff1b;Agent SDK是智能体的"操作系统"&#xff0c;提供运行环境。与Gemini 3相比&#xff0c;两者功能相似但实现路径不同&#xff0c;Anthropic采用模块化"渐进式披…

【收藏】2026年AI大模型最全学习资源包,助力Java开发者转型AI高薪岗

本文提供2026年AI大模型全面学习资源包&#xff0c;涵盖系统学习路线图、GeekAGI知识库、1200AI工具与框架、主流应用教程、开源项目案例、300道大厂面试真题及行业研究报告。资源由资深AI专家精心整理&#xff0c;适配初学者入门及进阶开发者提升&#xff0c;扫码即可免费获取…