Flink JSON 序列化/反序列化 Schema KafkaSource/KafkaSink + 自定义 ObjectMapper + PyFlink Row

1. JsonDeserializationSchema:KafkaSource 中反序列化 POJO

JsonDeserializationSchema实现了 Flink 的DeserializationSchema,因此只要某个 connector 支持DeserializationSchema,你就能直接使用它。

典型用法:KafkaSource 只消费 value,反序列化成 POJO:

JsonDeserializationSchema<SomePojo>jsonFormat=newJsonDeserializationSchema<>(SomePojo.class);KafkaSource<SomePojo>source=KafkaSource.<SomePojo>builder().setValueOnlyDeserializer(jsonFormat)// ....build();

适用场景:

  • Kafka 的 value 是 JSON
  • 你希望在 DataStream 里直接拿到业务对象SomePojo

工程建议:

  • POJO 字段尽量使用包装类型(Integer/Long)应对字段缺失或 null
  • 为了兼容字段变动,可以配合 ObjectMapper 设置忽略未知字段(见第 3 节)

2. JsonSerializationSchema:KafkaSink 中序列化 POJO

写回 Kafka 时,JsonSerializationSchema实现了SerializationSchema,可用于任何支持SerializationSchema的 connector。

典型用法:KafkaSink 写 value,序列化 POJO 为 JSON:

JsonSerializationSchema<SomePojo>jsonFormat=newJsonSerializationSchema<>();KafkaSink<SomePojo>sink=KafkaSink.<SomePojo>builder().setRecordSerializer(newKafkaRecordSerializationSchemaBuilder<SomePojo>().setValueSerializationSchema(jsonFormat)// ....build()).build();

适用场景:

  • 你希望下游系统继续消费 JSON
  • 你不想自己手写 Jackson 序列化逻辑

3. 自定义 ObjectMapper:控制 Jackson 行为(非常常用)

Flink 允许你通过构造函数传入SerializableSupplier<ObjectMapper>来定制 mapper,相当于提供一个“ObjectMapper 工厂”。

你可以用它做很多工程级增强,比如:

  • 忽略未知字段(兼容上游 schema 变更)
  • 注册模块(Java 时间类型、参数名模块等)
  • 开启/关闭某些序列化特性(字段排序、空值处理等)

示例:自定义序列化 mapper,让 map key 有序,并注册模块:

JsonSerializationSchema<SomeClass>jsonFormat=newJsonSerializationSchema<>(()->newObjectMapper().enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS).registerModule(newParameterNamesModule()));

你也可以把“兼容字段变更”的设置加进去(强烈建议生产开启类似配置):

  • FAIL_ON_UNKNOWN_PROPERTIES关闭
  • JavaTimeModule 等

(这里不展开写完整 mapper 配置,你只要知道:用 supplier 你就能完全掌控 Jackson。)

4. PyFlink:Row 类型用 JsonRowSerializationSchema / JsonRowDeserializationSchema

在 PyFlink 中,Flink 内置了 Row 的 JSON Schema:

  • JsonRowDeserializationSchema
  • JsonRowSerializationSchema

这对 Python 流处理特别友好,因为 Python 侧更常操作 Row 而不是 POJO 类。

KafkaSource:JSON -> Row

row_type_info=Types.ROW_NAMED(['name','age'],[Types.STRING(),Types.INT()])json_format=JsonRowDeserializationSchema.builder()\.type_info(row_type_info)\.build()source=KafkaSource.builder()\.set_value_only_deserializer(json_format)\.build()

KafkaSink:Row -> JSON

row_type_info=Types.ROW_NAMED(['name','age'],[Types.STRING(),Types.INT()])json_format=JsonRowSerializationSchema.builder()\.with_type_info(row_type_info)\.build()sink=KafkaSink.builder()\.set_record_serializer(KafkaRecordSerializationSchema.builder().set_topic('test').set_value_serialization_schema(json_format).build())\.build()

适用场景:

  • Python 处理流数据,行结构清晰
  • Kafka 中 value 为 JSON

5. 选型建议:POJO vs ObjectNode vs Row

  • Java POJO:类型安全、IDE 友好、适合稳定 schema 的业务流
  • ObjectNode:更灵活,适合 schema 频繁变化、半结构化数据
  • PyFlink Row:Python 生态更顺手,适合表/行式处理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/1163512.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【项目管理】项目管理流程文件(PPT)

1、项目启动2、制定项目计划3、项目执行4、项目监控5、项目收尾软件全套资料部分文档清单&#xff1a; 工作安排任务书&#xff0c;可行性分析报告&#xff0c;立项申请审批表&#xff0c;产品需求规格说明书&#xff0c;需求调研计划&#xff0c;用户需求调查单&#xff0c;用…

火焰识别,火焰检测,火灾检测,基于yolov5的火焰检测,可以检测视频和图片,视频实时检测,将训练好的模型部署到英伟达边缘计算 基于 YOLOv5 的高精度、高帧率火焰检测系统

火焰识别&#xff0c;火焰检测&#xff0c;火灾检测&#xff0c;基于yolov5的火焰检测&#xff0c;可以检测视频和图片&#xff0c;也可视频实时检测&#xff0c;检测准确率高&#xff0c;帧率很高&#xff0c;有标注好的训练数据集可以自己重新训练。 可以将训练好的模型部署…

学长亲荐2026 MBA必用TOP10 AI论文工具测评

学长亲荐2026 MBA必用TOP10 AI论文工具测评 2026年MBA学术写作工具测评&#xff1a;为何需要这份榜单&#xff1f; 随着人工智能技术在学术领域的广泛应用&#xff0c;MBA学生和研究者在论文写作、数据分析、文献综述等环节中&#xff0c;越来越依赖AI工具提升效率与质量。然而…

期刊论文投稿快人一步!虎贲等考 AI 解锁学术发表 “加速器”

在学术发表竞争日趋激烈的当下&#xff0c;一篇优质期刊论文的诞生&#xff0c;往往要历经选题打磨、文献梳理、数据论证、格式校准等重重关卡。很多科研人明明手握扎实的研究成果&#xff0c;却因论文写作不规范、逻辑不严谨、格式不达标&#xff0c;屡屡在投稿环节碰壁。虎贲…

还在为降重降 AIGC 抓狂?虎贲等考 AI:学术改写天花板,两步搞定合规论文

查重率飙红、AIGC 检测亮灯&#xff0c;堪称学术写作的 “双重暴击”&#xff01;不少同学吐槽&#xff1a;“改了十遍重复率还是超标”“AI 写的内容一眼被识破”“降重后语句不通&#xff0c;逻辑全乱”。别慌&#xff01;虎贲等考 AI 智能写作平台&#xff08;官网&#xff…

PetaLinux工程目录设备树文件结构与作用

设备树文件列表 wpfminglie:~/petalinux/ant$ find . \( -path ./build -o -path ./tmp -o -path ./out -o -path ./components/yocto \) -prune -o -type f \( -name "*.dts" -o -name "*.dtsi" \) -print ./components/plnx_workspace/device-tree/devic…

机器人诊断系统十年演进

下面给你一条专门针对机器人系统的 「机器人诊断系统十年演进路线&#xff08;2025–2035&#xff09;」。 这里的“诊断系统”不是简单的“查日志、看告警”&#xff0c;而是机器人如何理解自身失效、判断风险、选择修复策略&#xff0c;并在长期运行中避免重复犯错。一、核心…

智能巡检车、无人机道路检测、AI 路况分析平台 智慧交通 驾驶视角道路病害缺陷检测数据集 建立基于深度学习框架YOLOV8道路病害缺陷检测系统 裂纹 网快 坑洼

道路缺陷检测数据集使用labelimg标注&#xff0c;标签的格式是txt格式&#xff0c;适用于yolo目标检测系列所有版本训练数据集。 标注了&#xff08;裂纹&#xff08;Crack&#xff09;、 检查井&#xff08;Manhole&#xff09;、 网&#xff08;Net&#xff09;、 裂纹块&…

ECC错误

保护机制误差校正码Error Correcting CodeECC可以防止存储在内存中的数据出现错误&#xff0c;提高系统的功能安全性&#xff0c;避免因读取错误的数据而导致错误。ECC的主要影响是在易失性存储器&#xff08;RAM&#xff09;中&#xff0c;其中技术的小型化导致更高的位翻转风…

机器人感知技术十年演进

下面给你一条专门聚焦机器人感知&#xff08;Perception&#xff09;的 「机器人感知技术十年演进路线&#xff08;2025–2035&#xff09;」。 我会刻意避开“更高精度、更大模型”的表层叙事&#xff0c;直指感知在真实世界长期运行中真正要解决的问题。一、核心判断&#xf…

使用C#控制台批量删除 Unity目录里的 .meta文件

因为Unity会生成.meta文件,有的时候比如我 SteamingAssets里面有很多视频文件 是.mp4格式的,某些原因我需要将里面的所有视频文件改为.webm格式,那么会残留很多 .meta文件我们可以创建一个控制台,批量删除class Program {static void Main(string[] args){if (args.Length 0 |…

机器人日志十年演进

下面给你一条专门针对机器人系统的 「机器人日志十年演进路线&#xff08;2025–2035&#xff09;」。 这里的“日志”不是简单的 printf&#xff0c;而是机器人如何记住自己做过什么、为什么这么做、以及如何避免重蹈覆辙。一、核心判断&#xff08;一句话&#xff09;未来十年…

全方位CRM源码系统功能详解,完全开源,支持个性化定制

温馨提示&#xff1a;文末有资源获取方式 随着市场竞争加剧&#xff0c;企业销售团队亟需一套高效工具来管理客户关系和优化销售流程。一款专为销售团队设计的CRM客户关系管理系统源码应运而生&#xff0c;它集成了多种实用功能&#xff0c;帮助企业实现客户数据整合、商机追踪…

机器人诊断十年演进

下面给你一条专门针对机器人系统的 「机器人诊断十年演进路线&#xff08;2025–2035&#xff09;」。 这里的“诊断”不是 IT 意义上的排错&#xff0c;而是机器人在真实世界中如何理解自身失效、判断风险、选择修复策略&#xff0c;并避免重复犯错。一、核心判断&#xff08;…

亲测好用10个AI论文网站,专科生毕业论文轻松搞定!

亲测好用10个AI论文网站&#xff0c;专科生毕业论文轻松搞定&#xff01; AI 工具如何助力论文写作&#xff1f; 在当今的学术环境中&#xff0c;AI 工具正逐渐成为学生和研究者的重要助手。对于专科生而言&#xff0c;撰写一篇符合要求的毕业论文往往是一项挑战。而 AI 降重工…

支持多终端的CRM系统源码 带完整的搭建部署教程以及源代码包

温馨提示&#xff1a;文末有资源获取方式 企业销售团队需要能够随时随地管理客户关系的解决方案。一款创新的CRM客户关系管理系统源码正式发布&#xff0c;特别集成Uniapp支持&#xff0c;可编译出微信小程序和H5&#xff0c;为企业提供无缝的多终端体验。该系统源码完全开源&a…

移动机器人十年演进

下面给你一条专门聚焦“移动机器人&#xff08;AMR / AGV / 室内外移动平台&#xff09;”的 「移动机器人十年演进路线&#xff08;2025–2035&#xff09;」。 我会刻意避开“更像人”“通用智能”的叙事&#xff0c;聚焦真实工程能力如何一步步跨越规模化门槛。一、核心判断…

自动驾驶十年演进

下面给你一条从工程现实、系统能力与规模化落地视角出发的 「自动驾驶十年演进路线&#xff08;2025–2035&#xff09;」。 我会刻意避开“L5 神话”&#xff0c;聚焦哪些能力真的会发生跃迁、为什么、以及工程上意味着什么。一、核心判断&#xff08;一句话&#xff09;未来十…

具身智能十年演进

下面给你一条从工程现实、系统能力与规模化落地视角出发的 「具身智能十年演进路线&#xff08;2025–2035&#xff09;」。 我会刻意避开“通用智能”“像人一样聪明”的叙事&#xff0c;聚焦哪些能力真的会发生跃迁、为什么、以及工程上意味着什么。一、核心判断&#xff08;…

学长亲荐2026研究生AI论文网站TOP9:开题报告文献综述神器

学长亲荐2026研究生AI论文网站TOP9&#xff1a;开题报告文献综述神器 2026年研究生AI论文网站测评&#xff1a;选对工具&#xff0c;事半功倍 在研究生阶段&#xff0c;撰写开题报告和文献综述是科研工作的基础环节&#xff0c;也是许多学生面临的一大挑战。随着AI技术的不断进…