Apache SeaTunnel 如何将 CDC 数据流转换为 Append-Only 模式?

news/2025/11/19 15:18:51/文章来源:https://www.cnblogs.com/seatunnel/p/19242642

4aea1ddbb5a97ca5e7ab7a20b5bea8f1

RowKindExtractor 是 Apache SeaTunnel 的一个转换插件,它能将 CDC 数据流转为 Append-Only 模式,并提取原始 RowKind 信息为新字段。本文将介绍 RowKindExtractor 的核心功能,其在 CDC 数据同步场景下的使用方法,以及配置选项、注意事项及多种应用示例。

RowKindExtractor

RowKindExtractor 转换插件用于将 CDC(Change Data Capture)数据流转换为 Append-Only(仅追加)模式,同时将原始的 RowKind 信息提取为一个新的字段。

核心功能:

  • 将所有数据行的 RowKind 统一改为 +I(INSERT),实现 Append-Only 模式
  • 将原始的 RowKind 信息(INSERT、UPDATE_BEFORE、UPDATE_AFTER、DELETE)保存到新增的字段中
  • 支持短格式和完整格式两种输出方式

为什么需要这个插件?

在 CDC 数据同步场景中,数据行带有 RowKind 标记(+I、-U、+U、-D),表示不同的变更类型。但某些下游系统(如数据湖、分析系统)只支持 Append-Only 模式,不支持 UPDATE 和 DELETE 操作。此时需要:

  1. 将所有数据转换为 INSERT 类型(Append-Only)
  2. 将原始的变更类型保存为普通字段,供后续分析使用

转换示例:

输入(CDC 数据):
  RowKind: -D (DELETE)
  数据: id=1, name="test1", age=20输出(Append-Only 数据):
  RowKind: +I (INSERT)
  数据: id=1, name="test1", age=20, row_kind="DELETE"

典型应用场景

  • 将 CDC 数据写入只支持 Append 的数据湖
  • 需要在数据仓库中保留完整的变更历史记录
  • 需要对不同类型的变更进行统计分析

配置选项

custom_field_name [string]

指定新增字段的名称,该字段用于存储原始的 RowKind 信息。

默认值:row_kind

注意事项:

  • 字段名不能与原有字段重名,否则会报错
  • 建议使用有意义的名称,如 operation_type、change_type、cdc_op 等

示例:

custom_field_name = "operation_type"  # 使用自定义字段名

transform_type [enum]

指定 RowKind 字段值的输出格式。

可选值:

默认值:SHORT

各值含义:

选择建议:

  • SHORT 格式:节省存储空间,适合对存储敏感的场景
  • FULL 格式:可读性更好,适合需要人工查看或分析的场景

示例:

transform_type = FULL  # 使用完整格式

完整示例

  • 示例 1:使用默认配置(SHORT 格式)

使用默认配置,将 CDC 数据转换为 Append-Only 模式,RowKind 以短格式保存。

env {
  parallelism = 1
  job.mode = "STREAMING"
}source {
  MySQL-CDC {
    plugin_output = "cdc_source"
    server-id = 5652
    username = "root"
    password = "your_password"
    table-names = ["mydb.users"]
    url = "jdbc:mysql://localhost:3306/mydb"
  }
}transform {
  RowKindExtractor {
    plugin_input = "cdc_source"
    plugin_output = "append_only_data"
    # 使用默认配置:
    # custom_field_name = "row_kind"
    # transform_type = SHORT
  }
}sink {
  Console {
    plugin_input = "append_only_data"
  }
}

数据转换过程:

输入数据(CDC 格式):
  1. RowKind=+I, id=1, name="张三", age=25
  2. RowKind=-U, id=1, name="张三", age=25
  3. RowKind=+U, id=1, name="张三", age=26
  4. RowKind=-D, id=1, name="张三", age=26输出数据(Append-Only 格式):
  1. RowKind=+I, id=1, name="张三", age=25, row_kind="+I"
  2. RowKind=+I, id=1, name="张三", age=25, row_kind="-U"
  3. RowKind=+I, id=1, name="张三", age=26, row_kind="+U"
  4. RowKind=+I, id=1, name="张三", age=26, row_kind="-D"
  • 示例 2:使用 FULL 格式和自定义字段名

使用完整格式输出 RowKind,并自定义字段名称。

env {
  parallelism = 1
  job.mode = "STREAMING"
}source {
  MySQL-CDC {
    plugin_output = "cdc_source"
    server-id = 5652
    username = "root"
    password = "your_password"
    table-names = ["mydb.orders"]
    url = "jdbc:mysql://localhost:3306/mydb"
  }
}transform {
  RowKindExtractor {
    plugin_input = "cdc_source"
    plugin_output = "append_only_data"
    custom_field_name = "operation_type"  # 自定义字段名
    transform_type = FULL                 # 使用完整格式
  }
}sink {
  Iceberg {
    plugin_input = "append_only_data"
    catalog_name = "iceberg_catalog"
    database = "mydb"
    table = "orders_history"
    # Iceberg 表会包含 operation_type 字段,记录每条数据的变更类型
  }
}
数据转换过程:输入数据(CDC 格式):
  1. RowKind=+I, order_id=1001, amount=100.00
  2. RowKind=-U, order_id=1001, amount=100.00
  3. RowKind=+U, order_id=1001, amount=150.00
  4. RowKind=-D, order_id=1001, amount=150.00输出数据(Append-Only 格式,FULL 格式):
  1. RowKind=+I, order_id=1001, amount=100.00, operation_type="INSERT"
  2. RowKind=+I, order_id=1001, amount=100.00, operation_type="UPDATE_BEFORE"
  3. RowKind=+I, order_id=1001, amount=150.00, operation_type="UPDATE_AFTER"
  4. RowKind=+I, order_id=1001, amount=150.00, operation_type="DELETE"
  • 示例 3:完整的测试示例(使用 FakeSource)

使用 FakeSource 生成测试数据,演示各种 RowKind 的转换效果。

env {
  parallelism = 1
  job.mode = "BATCH"
}source {
  FakeSource {
    plugin_output = "fake_cdc_data"
    schema = {
      fields {
        pk_id = bigint
        name = string
        score = int
      }
      primaryKey {
        name = "pk_id"
        columnNames = [pk_id]
      }
    }
    rows = [
      {
        kind = INSERT
        fields = [1, "A", 100]
      },
      {
        kind = INSERT
        fields = [2, "B", 100]
      },
      {
        kind = UPDATE_BEFORE
        fields = [1, "A", 100]
      },
      {
        kind = UPDATE_AFTER
        fields = [1, "A_updated", 95]
      },
      {
        kind = UPDATE_BEFORE
        fields = [2, "B", 100]
      },
      {
        kind = UPDATE_AFTER
        fields = [2, "B_updated", 98]
      },
      {
        kind = DELETE
        fields = [1, "A_updated", 95]
      }
    ]
  }
}transform {
  RowKindExtractor {
    plugin_input = "fake_cdc_data"
    plugin_output = "transformed_data"
    custom_field_name = "change_type"
    transform_type = FULL
  }
}sink {
  Console {
    plugin_input = "transformed_data"
  }
}

预期输出:

+I, pk_id=1, name="A", score=100, change_type="INSERT"
+I, pk_id=2, name="B", score=100, change_type="INSERT"
+I, pk_id=1, name="A", score=100, change_type="UPDATE_BEFORE"
+I, pk_id=1, name="A_updated", score=95, change_type="UPDATE_AFTER"
+I, pk_id=2, name="B", score=100, change_type="UPDATE_BEFORE"
+I, pk_id=2, name="B_updated", score=98, change_type="UPDATE_AFTER"
+I, pk_id=1, name="A_updated", score=95, change_type="DELETE"
```    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/970071.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2025北京十佳留学中介

2025北京十佳留学中介一、北京留学中介怎么选?这五个问题帮你理清思路作为一位有着八年经验的国际教育规划师,我每天都会遇到大量北京学生和家长咨询同样的问题:北京本地的留学中介到底哪家更靠谱?申请美国研究生需…

锂电池充电管理IC 快充输入 2串3串4串锂电池快充芯片

带诱骗的两节串联锂电升降压充电芯片XSP30,以其最大2A充电电流的特性,为现代电子设备的高效充电提供了强大的支持。这款充电芯片以其卓越的性能和可靠性,成为了市场上的热门选择。 首先,让我们来了解一下XSP30的基…

makefile简单初探索_2 结合bsp

makefile简单初探索_2 结合bsp Keruone 系统 ubuntu20.04 参考正点原子 只是学习中自己的小记某种意义上来说也是上一篇的后续目录makefile简单初探索_2 结合bspnull1. 工具链统一配置:打造通用 “烹饪工具”2. 路径集…

500美元权限控制漏洞:低权限用户可在开发者设置中执行受限操作

本文详细披露了一个在ExamNote平台发现的权限控制漏洞,该漏洞允许低权限用户(如支持者)在开发者设置中执行受限操作,特别是未经授权修改通知设置,揭示了应用程序逻辑操纵的安全风险。500美元权限控制漏洞:低权限…

2025年破碎格栅机生产厂家权威推荐榜单:回转式机械格栅/拦污格栅/机械格栅源头厂家精选

破碎格栅机作为污水处理、市政管网及工业固废处理的核心预处理设备,其技术性能直接影响后续工艺的稳定性和运行成本。随着环保政策趋严,2025年破碎格栅机市场需求预计增长 12%,高效、耐用的设备成为行业焦点。本文基…

[GenAI] 重新认识Agent

什么是 Agent基础篇Agent概念 不是指的 AI 智能体,而是指代理服务器。代理服务器充当用户和模型交流的中间人。 Agent:狭义:代理服务器 广义:AI智能体(AI Agent)什么是AI Agent AI Agent,中文称之为“AI智能体”…

2025 年 11 月吹塑厂家权威推荐榜:吹塑,中空吹塑,吹塑制品/玩具,吹塑瓶/容器瓶/泡泡水瓶/机油瓶,洗发水/沐浴露/医药瓶/化妆瓶厂家精选

2025 年 11 月吹塑厂家权威推荐榜:吹塑,中空吹塑,吹塑制品/玩具,吹塑瓶/容器瓶/泡泡水瓶/机油瓶,洗发水/沐浴露/医药瓶/化妆瓶厂家精选 行业背景与发展趋势 吹塑成型技术作为塑料加工行业的重要组成部分,近年来在包装…

[note] 网络流入门

网络流是最大流,最小割,费用流,上下界网络流等在网络图上的问题的总称。 注意:文章的目的是让零基础的人初步了解网络流,因此存在一些被省略的证明以及不太严谨的描述,想进阶的可以看 OI-Wiki 或者看些其他巨佬的…

gemini3.0 以及 antigravity

AntigravityCoding流程的思路很好, 专门做了一个review模式, 在完成plan之后, 可以comment , 继续修改plan, 在完成walkthrouth之后, 同样也会review. 非常有价值. browser use 做的非常出色, 能够直接和浏览器交互, d…

SBD3DF40V1H-ASEMI可直接替代安世PMEG4005EJ

SBD3DF40V1H-ASEMI可直接替代安世PMEG4005EJ编辑:ll SBD3DF40V1H-ASEMI可直接替代安世PMEG4005EJ 型号:SBD3DF40V1H 品牌:ASEMI 封装:SOD-323 正向电流:1A 反向电压:40V 正向压降:0.44V~0.47V 引线数量:2 芯片…

2025 年 11 月注塑厂家推荐排行榜,塑胶注塑,塑料注塑,注塑制品,精密注塑件公司推荐

2025年11月注塑厂家推荐排行榜:塑胶注塑、塑料注塑、注塑制品与精密注塑件公司权威指南 行业背景与发展趋势 注塑成型作为现代制造业的核心工艺之一,在汽车、电子、医疗、家电等领域发挥着不可替代的作用。随着工业4…

到家按摩app有哪些?五款热门服务平台推荐

随着现代生活压力的增大,越来越多的人开始关注健康养生,到家按摩app凭借便捷、专业的特点逐渐成为日常放松的重要选择。这类平台通过整合优质技师资源,为用户提供上门推拿、足疗、SPA等服务,有效解决了传统线下按摩…

2025 年 11 月塑胶容器厂家推荐排行榜,塑料容器,透明塑胶容器,吹塑容器,医药容器瓶源头厂家专业甄选

2025 年 11 月塑胶容器厂家推荐排行榜,塑料容器,透明塑胶容器,吹塑容器,医药容器瓶源头厂家专业甄选 在当今快速发展的工业环境中,塑胶容器作为包装行业的重要组成部分,其市场需求持续增长。塑料容器、透明塑胶容…

Maven插件核心原理讲解

目录1 Maven插件1.1 简介1.2 maven 插件使用举例1.2.1 flatten-maven-plugin1.2.1.1 插件配置1.2.1.2 作用1.2.2 exec-maven-plugin1.2.2.1 插件配置1.2.2.2 作用1.3 maven 插件原理1.3.1 MOJO(Maven Plain Old Java …

六、Agent设计模式与工程化 ——《动手学Agent应用开发》学习心得

六、Agent设计模式与工程化 ——《动手学Agent应用开发》学习心得 ================================================================================== 最近参加了Datawhale开源组织举办的组队学习。本篇的学习内…

2025年电力标牌支架供货商权威推荐榜单:安全警示牌/杆号牌/交通标志牌源头厂家精选

在电力系统中,一套稳固耐用的标牌支架,是保障电力设施安全运行与规范管理的第一道防线。 电力标牌支架作为电力设施安全标识的重要组成部分,其质量直接关系到电力系统的安全运行和维护效率。本文将基于厂家的技术实…

贪心方法与技巧总结

贪心方法与技巧总结贪心技巧十分重要,在各类比赛中十分常见, tag:greedy堆贪心 例题: P1631 序列合并 - 洛谷 D - Cake 123 按位贪心 例题: P2431 正妹吃月饼 - 洛谷 反悔贪心 例题: P14361 社团招新 / club - 洛…

在x86_64的ubuntu上运行arm架构的docker 用于开发嵌入式应用

一、镜像拉取 1、参考这个博客:https://luoxue.site/article/ubuntu-apt#066a4e87051f4cbd9a100c994d9c1c26 找到对应的镜像也无法拉取,可能是因为没有正确的使用代理服务器。所以我找到了一个国内的docker镜像源:…

LLM应用剖析: AI对冲基金

本文主要介绍AI对冲基金的项目架构,以及核心代码1. 背景上周发表了一篇LLM应用--微舆的应用剖析,收到一位读者的留言,想要了解基于LangGraph构建的多Agent应用,因此想起来了几个月前深入研究的LLM应用AI对冲基金(a…

2025年冷库货架厂家综合实力排行榜TOP10权威发布

摘要 随着冷链物流行业的快速发展,2025年冷库货架市场需求持续增长,行业规模预计突破200亿元。冷库货架作为冷链仓储的核心设备,其质量直接关系到仓储安全与效率。本文基于市场调研、技术参数对比和用户口碑,为您呈…