作者 | 崔俊乐
引言:对企业而言,数据采集的核心挑战从来不仅仅是“同步”,而是如何在大规模、多元异构的复杂环境下,保障数据的准确性、完整性和时效性。本文将深入探讨中控技术基于 Apache SeaTunnel 构建企业级数据采集框架的实践,重点分享集群高可用配置、性能调优、容错机制及数据质量监控等方面的具体思考与方案。
1、困境:烟囱林立的采集架构与高昂的运维代价
作为深度赋能流程工业的工业AI平台型公司,中控技术的全球业务不断发展,目前已拥有近40多家全球子公司,服务超35000家全球客户。业务的不断扩张对数据工作提出了更高要求:数据不仅要“算得快”,更要“落得准”。为此,我们搭建了流批分离的大数据平台以应对复杂场景。然而,平台本身的复杂度却反向加剧了数据采集、开发和运维的难度,特别是在数据采集这一源头环节,我们面临着严峻挑战:
(1)架构复杂,烟囱林立:我们过去长期依赖多种工具拼凑的方案(如使用Sqoop进行批式数据同步至HDFS,借助Maxwell/StreamSets处理数据库增量日志并写入Kafka/Kudu)。这种“打补丁”式的架构使得技术栈碎片化,维护成本高昂。
(2)运维黑洞,疲于奔命:多套路线意味着双倍的运维监控压力。缺乏统一的监控告警机制,任何一方出现异常(如:同步延迟、资源耗尽),都需要投入大量人力进行排查和“救火”,稳定性难以保障。
(3)能力割裂,难以扩展:当面临新的数据源(国产数据库和SAP HANA等数据库),我们需要在不同的工具中寻找适配方案或自行开发插件,无法快速响应业务需求。
上图清晰地展示了过去分散的采集生态。 我们意识到,这种“各自为战”的模式已成为数据中最脆弱的一环,不仅无法匹配公司未来的发展速度,数据质量与时效性存在潜在威胁。打造一个统一、稳定、高效的数据采集框架,已变得至关重要且迫在眉睫。
2、破局:统一采集框架的思考与技术选型
经过深度的分析和思考,我们明确了新技术的五大核心选型标准:
(1) 全面的连接能力:能够完全覆盖公司当前与未来的所有数据源类型(从MySQL、Oracle、HANA到Kafka、StarRocks等),并同时支持离线与实时两种采集模式,从根本上解决技术栈统一的问题。
(2) 集群稳定性与高可用:框架本身必须是高可用的分布式集群,具备强大的容错能力。即使单个节点故障,整个服务也不应中断,且能够自动恢复,保障数据采集管道的持续运行。
(3) 可靠的数据一致性保障:在任务执行层面,必须提供精确一次(Exactly-Once)或至少一次(At-Least-Once)处理语义,确保在任务因异常中断后能够自动从断点恢复,杜绝数据重复或丢失,这是数据质量的基石。
(4) 强劲的吞吐性能: 必须能够轻松应对我们日均TB级的数据增量挑战,其架构应支持水平扩展,可通过增加节点来线性提升同步性能,满足业务高速发展带来的数据增长需求。
(5) 可观测的运维体验:必须提供完善的监控告警机制,能够对数据同步过程中的异常、延迟、吞吐量等关键指标进行实时追踪,并及时通知运维人员,变被动“救火”为主动“预警”。
基于这五大标准,我们对业界主流方案进行了深入的调研与对比测试。最终, Apache SeaTunnel 在所有维度上都表现出色,成为我们破局的最优解。
我们的核心诉求 | Apache SeaTunnel 的解决方案 |
---|---|
全面的连接能力 | 拥有极其丰富的 Connector 生态,官方支持上百种源库/目标库的读写,完全覆盖了我们所有数据类型,一套框架即可统一离线和实时采集。 |
集群稳定性与高可用 | SeaTunnel Engine 分离模式架构,即使单个 Master 或者 Worker 节点异常,也不会影响采集任务的持续性。 |
可靠的数据一致性保障 | 提供了强大的容错机制,支持精确一次(Exactly-Once)语义,并能通过 Checkpoint 机制实现任务异常后的自动断点续传,确保数据不丢不重。 |
强劲的吞吐性能 | 具备出色的分布式数据处理能力,通过简单配置即可调整并行度,轻松实现水平扩展。 |
可观测的运维体验 | 提供了丰富的监控指标,并可无缝集成 Prometheus、Grafana 和 AlertManager 主流监控告警体系,让我们对数据采集过程了如指掌。 |
3、实践:具体实施方案与细节
我们的Apache SeaTunnel实践之路,也是项目的成长之路。早期,我们基于 Apache SeaTunnel v2.3.5 进行构建,当时为了满足一些特定的需求(如处理不同数据库表名或字段名的大小写敏感问题),我们进行了一些二次开发工作。
然而,随着SeaTunnel社区的飞速发展,新版本的功能和转换器日益完善。当我们将集群顺利升级至 Apache SeaTunnel v2.3.11时惊喜地发现,过去那些需要定制化开发的需求,在新版本中均已得到原生支持。
目前,我们所有的数据同步任务均基于官方版本实现,实现了零改造,这极大地降低了我们的长期维护成本,并能让我们无缝享受社区带来的最新功能和性能提升。
以下是我们基于v2.3.11版本,经过生产环境TB级数据量验证的核心实施方案,为我们集群自搭建以来0故障的卓越表现,奠定了坚实基础。
(1)集群规划
为保障集群的高可用性,建议优先选择分离模式集群部署,以下是我们使用的资源。
节点 | CPU | 内存 | 磁盘 | JVM Heap |
---|---|---|---|---|
Master-01 | 8C | 32G | 200G | 30G |
Master-02 | 8C | 32G | 200G | 30G |
Worker-01 | 16C | 64G | 500G | 62G |
Worker-02 | 16C | 64G | 500G | 62G |
Worker-03 | 16C | 64G | 500G | 62G |
(2)集群关键配置文件
- seatunnel.yaml
该配置文件主要用于定义作业的执行行为、容错机制和运维监控设置。它通过启用类加载缓存和动态资源分配来优化性能,并通过配置基于S3的检查点(Checkpoint)来保障作业的容错与数据一致性。此外,还可以开启指标收集、日志管理以及设置,从而为作业的稳定运行、监控和日常管理提供全面支持。
seatunnel:
engine:
# 类加载器缓存模式:开启后可显著提升作业频繁启停时的性能,减少类加载开销。生产环境建议开启。
classloader-cache-mode: true# 历史作业数据过期时间(单位:分钟): 3天。超过此时间的已完成作业历史信息将被自动清理。
history-job-expire-minutes: 4320# 数据备份数量
backup-count: 1# 队列类型:阻塞队列
queue-type: blockingqueue# 执行信息打印间隔(秒):每隔60秒在日志中打印一次作业执行信息。
print-execution-info-interval: 60# 作业指标信息打印间隔(秒):每隔60秒在日志中打印一次详细的指标信息。
print-job-metrics-info-interval: 60slot-service:
# 动态Slot管理:开启后,引擎会根据节点资源情况动态分配计算槽位(Slot),提高资源利用率。
dynamic-slot: true# 检查点(Checkpoint)配置。
checkpoint:
interval: 60000 # 两次Checkpoint之间的时间间隔,单位毫秒(ms)。此处为1分钟。
timeout: 600000 # 执行Checkpoint的超时时间,单位毫秒(ms)。此处为10分钟。
storage:
type: hdfs # 此处声明存储类型为HDFS,实际存储在下方的S3。
max-retained: 3 # 最多保留的Checkpoint历史数量。旧的Checkpoint会被自动删除以节省空间。
plugin-config:
storage.type: s3 # 实际配置存储类型为S3(或MinIO等兼容S3协议的对象存储)
fs.s3a.access.key: xxxxxxx # S3兼容存储的访问密钥(Access Key)
fs.s3a.secret.key: xxxxxxx # S3兼容存储的私有密钥(Secret Key)
fs.s3a.endpoint: http://xxxxxxxx:8060 # S3兼容存储的服务端点(Endpoint)地址
s3.bucket: s3a://seatunel-pro-bucket # 用于存储Checkpoint数据的桶(Bucket)名称
fs.s3a.aws.credentials.provider: org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider # 认证凭证提供者
# 可观测性配置
telemetry:
metric:
enabled: true # 开启指标(Metrics)收集
logs:
# 启用日志定时删除:开启日志文件的自动清理功能,防止日志占满磁盘。
scheduled-deletion-enable: true# Web UI 和 REST API 配置
http:
enable-http: true # 启用Web UI和HTTP REST API服务
port: 8080 # Web服务绑定的端口号
enable-dynamic-port: false # 禁用动态端口。如果8080被占用,是否启用其他端口。
# 以下为Web UI基础认证配置
enable-basic-auth: true # 启用基础身份认证
basic-auth-username: admin # 登录用户名
basic-auth-password: xxxxxxx # 登录密码
- jvm_master_options
该JVM参数配置文件主要用于保障SeaTunnel引擎在大规模数据处理时的稳定性和性能。通过设定堆内存与元空间容量来提供基础内存保障,并专门针对G1垃圾回收器进行了系列优化,以有效管理内存垃圾、控制回收停顿时间并提升运行效率。
# JVM 堆内存
-Xms30g
-Xmx30g# 内存溢出诊断:当发生OOM时自动生成Heap Dump文件,保存至指定路径便于后续分析。
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/tmp/seatunnel/dump/zeta-server# 元空间:限制最大容量为5GB,防止元数据无限膨胀占用过多本地内存。
-XX:MaxMetaspaceSize=5g# G1垃圾回收器相关配置
-XX:+UseG1GC # 启用G1垃圾回收器
-XX:+PrintGCDetails # 在日志中打印详细的GC信息
-Xloggc:/path/to/gc.log # 将GC日志输出到指定文件
-XX:+PrintGCDateStamps # 在GC日志中打印时间戳
-XX:MaxGCPauseMillis=5000 # 目标最大GC暂停时间为5000毫秒(5秒)
-XX:InitiatingHeapOccupancyPercent=50 # 当堆内存使用率达到50%时启动并发GC周期
-XX:+UseStringDeduplication # 启用字符串去重,节省内存空间
-XX:GCTimeRatio=4 # 设置GC时间与应用时间的目标比例
-XX:G1ReservePercent=15 # 保留15%的堆内存
-XX:ConcGCThreads=6 # 设置并发GC阶段使用的线程数为6
-XX:G1HeapRegionSize=32m # 设置G1分区大小为32MB
- hazelcast-master.yaml(iMap存储在自建对象存储)
该配置文件定义了SeaTunnel引擎集群的底层分布式架构与协同机制。它主要用于建立和管理集群节点间的网络通信。配置还包含了高精度的故障检测心跳机制,以确保能快速发现并处理节点失效问题,保障集群的高可用性。同时,启用了基于S3兼容存储的分布式数据持久化功能,将关键状态信息可靠地保存到对象存储中。
hazelcast:
cluster-name: seatunnel # 集群名称,所有节点需保持一致
network:
rest-api:
enabled: true # 启用REST API
endpoint-groups:
CLUSTER_WRITE:
enabled: true
DATA:
enabled: true
join:
tcp-ip:
enabled: true # 使用TCP/IP发现机制
member-list: # 集群节点列表
- 10.xx.xx.xxx:5801
- 10.xx.xx.xxx:5801
- 10.xx.xx.xxx:5802
- 10.xx.xx.xxx:5802
- 10.xx.xx.xxx:5802
port:
auto-increment: false # 禁用端口自动递增
port: 5801 # 固定使用5801端口
properties:
hazelcast.invocation.max.retry.count: 20 # 调用最大重试次数
hazelcast.tcp.join.port.try.count: 30 # TCP连接端口尝试次数
hazelcast.logging.type: log4j2 # 使用log4j2日志框架
hazelcast.operation.generic.thread.count: 50 # 通用操作线程数
hazelcast.heartbeat.failuredetector.type: phi-accrual # 使用Phi-accrual故障检测器
hazelcast.heartbeat.interval.seconds: 2 # 心跳间隔(秒)
hazelcast.max.no.heartbeat.seconds: 180 # 无心跳超时时间(秒)
hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 10 # 故障检测阈值
hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200 # 检测样本大小
hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 100 # 最小标准差(毫秒)
hazelcast.operation.call.timeout.millis: 150000 # 操作调用超时时间(毫秒)
map:
engine*:
map-store:
enabled: true # 启用Map存储持久化
initial-mode: EAGER # 启动时立即加载所有数据
factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory # 持久化工厂类
properties:
type: hdfs # 存储类型
namespace: /seatunnel/imap # 命名空间路径
clusterName: seatunnel-cluster # 集群名称
storage.type: s3 # 实际使用S3兼容存储
fs.s3a.access.key: xxxxxxxxxxxxxxxx # S3访问密钥
fs.s3a.secret.key: xxxxxxxxxxxxxxxx # S3私有密钥
fs.s3a.endpoint: http://xxxxxxx:8060 # S3端点地址
s3.bucket: s3a://seatunel-pro-bucket # S3存储桶名称
fs.s3a.aws.credentials.provider: org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider # 认证提供者
(3)采集任务示例
① MySQL-CDC到StarRocks
采集MySQL-CDC数据需要确保源数据库需要开启了Binlog,格式为ROW,需要用户拥有相关权限,并且把对应的Mysql Jar包放入 ${SEATUNNEL_HOME}/lib目录。详情可以参考官网:https://seatunnel.apache.org/zh-CN/docs/2.3.11/connector-v2/source/MySQL-CDC。
下方是我们采集MySQL-CDC的配置样例。
env {
parallelism = 1 # 并行度设置为1,流式采集只能是1
job.mode = "STREAMING" # 流式作业模式
job.name = cdh2sr # 作业名称标识
job.retry.times = 3 # 作业失败重试次数
job.retry.interval.seconds=180 # 重试间隔时间(秒)
}source {
MySQL-CDC {
base-url = "jdbc:mysql://xxxxxxx:3306/databasename" # MySQL连接地址
username = "xxxxxxr" # 数据库用户名
password = "xxxxxx" # 数据库密码
table-names = ["databasename.table1","databasename_pro.table2"] # 需要同步的表列表,格式:数据库.表名
startup.mode = "latest" # 从最新位点开始同步
exactly_once = true # 启用精确一次语义
debezium {
include.schema.changes = "false" # 不包含Schema变更
snapshot.mode = when_needed # 按需进行快照
}
}
}transform {
TableRename {
plugin_input = "cdc" # 输入插件标识
plugin_output = "rs" # 输出插件标识
convert_case = "LOWER" # 表名转换为小写
prefix = "ods_cdh_databasename_" # 添加表名前缀
}
}sink {
StarRocks {
plugin_input = "rs" # 输入插件标识(与transform输出一致)
nodeUrls = ["xxxxxxx:8030","xxxxxxx:8030","xxxxxxx:8030"] # StarRocks FE节点地址
base-url = "jdbc:mysql://xxxxxxx:3307" # StarRocks MySQL协议地址
username = "xxxx" # StarRocks用户名
password ="xxxxxxx" # StarRocks密码
database = "ods" # 目标数据库
enable_upsert_delete = true # 启用更新删除功能
max_retries = 3 # 写入失败重试次数
http_socket_timeout_ms = 360000 # HTTP超时时间(毫秒)
retry_backoff_multiplier_ms = 2000 # 重试退避乘数
max_retry_backoff_ms = 20000 # 最大重试退避时间
batch_max_rows = 2048 # 单批次最大行数
batch_max_bytes = 50000000 # 单批次最大字节数
}
}
② Oracle-CDC到StarRocks
采集Oracle-CDC数据需要确保源数据库需要开启了Logminer,用户拥有相关权限,并且把对应的OJDBC.Jar和Orai18n.jar包放入 ${SEATUNNEL_HOME}/lib
目录。详情可以参考官网:https://seatunnel.apache.org/zh-CN/docs/2.3.11/connector-v2/source/Oracle-CDC。
值得一提的是,我们采集Oracle-CDC过程中遇到的延迟问题,可以优先让DBA查询Logminer日志切换的次数是否很频繁,官方建议控制在每小时在十次左右,太频繁的切换会导致发生长时间延迟情况的可能。如果次数过大,可以加大单个日志文件大小。其次考虑把QPS值极高的表拆分到新的SeaTunel任务中。
-- 查询日志切换次数
SELECT GROUP#, THREAD#, BYTES/1024/1024 || 'MB' "SIZE", ARCHIVED, STATUS FROM V$LOG;
SELECT
TO_CHAR(first_time, 'YYYY-MM-DD HH24') AS hour,
COUNT(*) AS switch_count
FROM
v$log_history
WHERE
first_time >= TRUNC(SYSDATE) - 1 -- 过去一天的数据
GROUP BY
TO_CHAR(first_time, 'YYYY-MM-DD HH24')
ORDER BY
hour;-- 查询日志文件大小
SELECT F.MEMBER, L.GROUP#, L.THREAD#, L.SEQUENCE#, L.BYTES/1024/1024 AS SIZE_MB, L.ARCHIVED, L.STATUS, L.FIRST_CHANGE#, L.NEXT_CHANGE#
FROM V$LOG L, V$LOGFILE F
WHERE F.GROUP# = L.GROUP#
ORDER BY L.GROUP#;
下方是我们采集Oracle-CDC的配置样例。
env {
parallelism = 1 # 并行度为1,流式采集只能是1
job.mode = "STREAMING" # 流式作业模式
job.name = bpm2sr # 作业名称标识
job.retry.times = 3 # 作业失败重试次数
job.retry.interval.seconds=180 # 重试间隔时间(秒)
}source {
Oracle-CDC {
plugin_output = "cdc" # 输出插件标识
base-url = "jdbc:oracle:thin:@xxxxxx:1521:DB" # Oracle连接地址
username = "xxxxxx" # 数据库用户名
password = "xxxxxx" # 数据库密码
table-names = ["DB.SC.TABLE1","DB.SC.TABLE2"] # 需要同步的表,格式:数据库.模式.表名
startup.mode = "latest" # 从最新位点开始同步
database-names = ["DB"] # 数据库名
schema-names = ["SC"] # 模式名
skip_analyze = true # 跳过表分析
use_select_count = true # 使用统计
exactly_once = true # 启用精确一次语义
connection.pool.size = 20 # 连接池大小
debezium {
log.mining.strategy = "online_catalog" # 日志挖掘策略
log.mining.continuous.mine = true # 持续挖掘日志
lob.enabled = false # 禁用LOB支持
internal.log.mining.dml.parser ="legacy" # 使用传统DML解析器
}
}
}transform {
TableRename {
plugin_input = "cdc" # 输入插件标识
plugin_output = "rs" # 输出插件标识
convert_case = "LOWER" # 表名转换为小写
prefix = "ods_crm_db_" # 添加表名前缀
}
}sink {
StarRocks {
plugin_input = "rs" # 输入插件标识
nodeUrls = ["xxxxxxx:8030","xxxxxxx:8030","xxxxxxx:8030"] # StarRocks FE节点
base-url = "jdbc:mysql://xxxxxxx:3307" # JDBC连接地址
username = "xxxx" # 用户名
password ="xxxxxxx" # 密码
database = "ods" # 目标数据库
enable_upsert_delete = true # 启用更新删除
max_retries = 3 # 最大重试次数
http_socket_timeout_ms = 360000 # HTTP超时时间
retry_backoff_multiplier_ms = 2000 # 重试退避乘数
max_retry_backoff_ms = 20000 # 最大重试退避时间
batch_max_rows = 2048 # 批次最大行数
batch_max_bytes = 50000000 # 批次最大字节数
}
}
(4)可观测的监控
得益于SeaTunnel新版本提供的强大监控指标(Metrics)和我们构建的完善监控体系,我们能够从集群全局和任务粒度两个层面,对数据采集平台的状态了如指掌。我们的监控体系主要包含以下两个维度:
① 集群监控
- 节点状态:实时监控集群节点个数与存活状态,确保Worker节点无异常下线,保障集群处理能力。
- 集群吞吐:监控集群整体的SourceReceivedQPS和SinkWriteQPS,掌控全局数据流入与流出速率,评估集群负载。
- 资源状态:监控集群节点的CPU、内存,为资源扩容或优化提供依据。
- 网络健康度:通过监控内部心跳与通信延迟,确保集群网络状况良好。
② 任务监控
- 任务运行状况:实时检查所有任务的运行状态(Running/Failed/Finished),是监控的最基本要求。
- 数据同步量:监控每个任务的SourceReceivedCount和SinkWriteCount,实时掌握每条数据流水线的吞吐量。
- 延迟时间:这是CDC任务最关键的指标之一,当采集端发生持续发生延迟时发送告警。
4、成效:可衡量收益
经过一段时间的稳定运行,基于Apache SeaTunnel构建的新一代数据采集框架为我们带来了显著且可量化的收益,主要体现在以下几个方面:
(1)稳定性:从“疲于奔命”到“高枕无忧”
- 任务故障率降低超99%: 旧方案下,每月需处理1-3次同步异常。新集群上线至今,核心数据同步任务保持0故障运行,未发生因框架本身导致的数据服务中断。
- 数据一致性达到100%:依托Apache SeaTunnel的Exactly-Once语义和强大的Checkpoint机制,实现了端到端的精确一次处理,彻底解决了之前可能存在的微量数据重复或丢失问题,数据质量得到根本保障。
- 可用性大幅提升: 集群的高可用设计确保了服务99.99% 的可用性,任何单点故障均可在分钟级内自动恢复,对业务透明无感。
(2)效率:开发运维效能的倍增
- 开发效率提升50%: 从过去编写维护多套脚本,转变为统一的配置化开发。新数据源的接入从原来的1-2人天缩短至1分钟内即可完成,效率提升显著。
- 运维成本降低70%: 现在仅需通过Grafana监控大屏即可掌控全局状态,日均主动运维投入小于0.5人时。
- 数据时效性优化: 数据端到端延迟从分钟级优化至秒级,为实时数据分析和决策提供了坚实基础。
(3)架构:资源优化与统一框架
- 技术栈统一: 成功将Sqoop、StreamSets等多种技术栈统一收口至Apache SeaTunnel,极大降低了技术复杂度和长期维护成本。
5、展望:未来规划
- (1)全面云原生化: 我们将积极探索Apache SeaTunnel在Kubernetes上的原生部署与调度能力,利用其弹性伸缩的特性,实现计算资源的按需分配,进一步优化成本与效率,更好地拥抱混合云与多云战略。
- (2)智能化运维: 基于已收集的丰富Metrics数据,构建AIOps能力,实现任务性能的智能预测、故障的根因分析自动定位与智能调参。
6、致谢
在此,我们由衷地感谢 Apache SeaTunnel 开源社区。同时,也要感谢公司内部项目团队的每一位成员,你们的辛勤付出与勇于探索,是此次架构升级得以成功实施的关键。最后,我们衷心祝愿Apache SeaTunnel项目未来越来越好,生态愈发繁荣!