在 Flink + Kafka 实时数仓中,如何确保端到端的 Exactly-Once

在 Flink + Kafka 构建实时数仓时,确保端到端的 Exactly-Once(精确一次) 需要从 数据消费(Source)、处理(Processing)、写入(Sink) 三个阶段协同设计,结合 Flink 的 检查点机制(Checkpoint) 和 Kafka 的 事务支持。以下是具体实现方法及示例配置:


1. 核心机制

(1) Flink Checkpoint
  • 作用:定期将算子的状态(State)和 Kafka 消费偏移量(Offset)持久化到可靠存储(如 HDFS、S3)。

  • 配置
     

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.enableCheckpointing(60000); // 60秒触发一次Checkpoint
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000); // Checkpoint间最小间隔

(2) Kafka 事务
  • 两阶段提交(2PC):Flink 的 Kafka Producer 在 Checkpoint 完成时提交事务,确保数据仅写入一次。

  • 关键参数

    • transactional.id:唯一事务标识,需确保每个 Producer 实例的 ID 唯一。

    • transaction.timeout.ms:需大于 Flink Checkpoint 间隔(避免事务超时)。


2. 端到端 Exactly-Once 实现步骤

(1) Source 端:Kafka Consumer 偏移量管理
  • Flink 的 Kafka Consumer 会在 Checkpoint 时将 消费偏移量 存入状态后端,恢复时从该偏移量重新消费。

  • 配置

    Properties props = new Properties();
    props.setProperty("bootstrap.servers", "kafka:9092");
    props.setProperty("group.id", "flink-group");
    props.setProperty("isolation.level", "read_committed"); // 只读取已提交的事务数据
    ​
    FlinkKafkaConsumer<String> source = new FlinkKafkaConsumer<>("input-topic", new SimpleStringSchema(), props
    );

(2) 处理阶段:状态一致性
  • Flink 的算子状态(如 KeyedStateOperatorState)通过 Checkpoint 持久化,确保故障恢复后状态一致。

(3) Sink 端:Kafka Producer 事务写入
  • 事务性 Producer:在 Checkpoint 完成时提交事务,确保数据仅写入一次。

  • 配置

    Properties sinkProps = new Properties();
    sinkProps.setProperty("bootstrap.servers", "kafka:9092");
    sinkProps.setProperty("transaction.timeout.ms", "600000"); // 大于 Checkpoint 间隔
    ​
    FlinkKafkaProducer<String> sink = new FlinkKafkaProducer<>("output-topic",new SimpleStringSchema(),sinkProps,FlinkKafkaProducer.Semantic.EXACTLY_ONCE // 启用Exactly-Once模式
    );
    ​
    stream.addSink(sink);


3. 端到端流程详解

  1. Checkpoint 触发

    • JobManager 向所有 TaskManager 发送 Checkpoint 信号。

    • Kafka Consumer 提交当前消费偏移量到状态后端。

    • Flink 算子状态持久化。

    • Kafka Producer 预提交事务(写入数据但未提交)。

  2. Checkpoint 完成

    • 所有算子确认状态保存成功后,JobManager 标记 Checkpoint 完成。

    • Kafka Producer 提交事务(数据对下游可见)。

  3. 故障恢复

    • Flink 回滚到最近一次成功的 Checkpoint。

    • Kafka Consumer 从 Checkpoint 中的偏移量重新消费。

    • Kafka Producer 回滚未提交的事务(避免数据重复)。


4. 关键注意事项

  • 事务超时时间:确保 transaction.timeout.ms > checkpoint间隔 + max checkpoint duration

  • 唯一 Transactional ID:每个 Kafka Producer 实例需分配唯一 ID(可通过算子ID + 子任务ID生成)。

  • 幂等性 Sink:若 Sink 为非 Kafka 系统(如数据库),需支持幂等写入或事务(如 MySQL 的 INSERT ... ON DUPLICATE KEY UPDATE)。


5. 示例场景:实时交易风控

  • 需求:从 Kafka 读取交易流水,实时计算用户交易频次(1分钟内超过10次触发风控),结果写回 Kafka。

  • 实现

    DataStream<Transaction> transactions = env.addSource(kafkaSource).map(parseTransaction); // 解析交易数据
    ​
    DataStream<Alert> alerts = transactions.keyBy(Transaction::getUserId).window(TumblingEventTimeWindows.of(Time.minutes(1))).process(new FraudDetectionProcessFunction()); // 检测高频交易
    ​
    alerts.addSink(kafkaSink); // 事务性写入告警结果

  • Exactly-Once 保障

    • 消费偏移量由 Checkpoint 管理。

    • 窗口计数状态由 Flink 持久化。

    • 告警结果通过 Kafka 事务写入。


6. 常见问题与调优

  • 问题1:事务超时导致数据丢失 解决:增大 transaction.timeout.ms(默认15分钟)并监控 Checkpoint 耗时。

  • 问题2:Checkpoint 失败 解决:优化反压(如增加并行度)、调大 checkpoint timeout

  • 问题3:Kafka Producer 缓冲区满 解决:增大 buffer.memorybatch.size


总结

通过 Flink Checkpoint + Kafka 事务 的协同机制,可以实现从 Kafka 消费到 Kafka 写入的端到端 Exactly-Once。核心在于:

  1. Flink 统一管理消费偏移量和状态快照;

  2. Kafka Producer 通过事务提交保证数据原子性写入;

  3. 合理配置超时参数与资源,避免因超时或反压导致的一致性中断。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/79265.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

当可视化遇上 CesiumJS:突破传统,打造前沿生产配套方案

CesiumJS 技术基础介绍 CesiumJS 是一款基于 JavaScript 的开源库&#xff0c;专门用于创建动态、交互式的地理空间可视化。它利用 WebGL 技术&#xff0c;能够在网页浏览器中流畅地渲染高分辨率的三维地球和地图场景。CesiumJS 支持多种地理空间数据格式&#xff0c;包括但不…

RabbitMQ深入学习

继续上一节的学习&#xff0c;上一节学习了RabbitMQ的基本内容&#xff0c;本节学习RabbitMQ的高级特性。 RocketMQ的高级特性学习见这篇博客 目录 1.消息可靠性1.1生产者消息确认1.2消息持久化1.3消费者消息确认1.4消费失败重试机制1.5消息可靠性保证总结 2.什么是死信交换机…

Linux系统:虚拟文件系统与文件缓冲区(语言级内核级)

本节重点 初步理解一切皆文件理解文件缓冲区的分类用户级文件缓冲区与内核级文件缓冲区用户级文件缓冲区的刷新机制两级缓冲区的分层协作 一、虚拟文件系统 1.1 理解“一切皆文件” 我们都知道操作系统访问不同的外部设备&#xff08;显示器、磁盘、键盘、鼠标、网卡&#…

在c++中老是碰到string,这是什么意思?

定义一个string类型变量的引用&#xff0c;相当于给现有变量起个别名&#xff0c;与指针还是不一样的。比如string a;string& ba;这两句&#xff0c;b与a实际上是一回事&#xff0c;表示的是同一块内存。 std是系统的一个命名空间(有关命名空间可以参阅namespace_百度百科)…

Day21 奇异值分解(SVD)全面解析

一、奇异值分解概述 奇异值分解是线性代数中一个重要的矩阵分解方法&#xff0c;对于任何矩阵&#xff0c;无论是结构化数据转化成的“样本 * 特征”矩阵&#xff0c;还是天然以矩阵形式存在的图像数据&#xff0c;都能进行等价的奇异值分解&#xff08;SVD&#xff09;。 二…

akshare爬虫限制,pywencai频繁升级个人做量化,稳定数据源和券商的选择

做量化&#xff0c;数据和交易接口是策略和自动化交易的基石&#xff0c;而稳定的数据和快人一步的交易接口是个人做量化的催化剂。 之前写过一篇文章&#xff1a;个人做量化常用的数据&#xff0c;多以爬虫为主&#xff0c;最近akshare爬虫限制&#xff0c;pywencai频繁升级。…

数字签名与证书

1. 数字签名与证书 摘要算法用来实现完整性&#xff0c;能够为数据生成独一无二的“指纹”&#xff0c;常用的算法是 SHA-2&#xff1b;数字签名是私钥对摘要的加密&#xff0c;可以由公钥解密后验证&#xff0c;实现身份认证和不可否认&#xff1b;公钥的分发需要使用数字证书…

Ubuntu22.04安装显卡驱动/卸载显卡驱动

报错 今日输入nvidia-smi报错,在安装了535和550,包括560都没办法解决,但是又怕乱搞导致环境损坏,打算把显卡卸载然后重新安装系统默认推荐版本的显卡驱动 qinqin:~$ nvidia-smi Failed to initialize NVML: Driver/library version mismatch NVML library version: 560.35卸载…

Web 架构之负载均衡全解析

文章目录 一、引言二、思维导图三、负载均衡的定义与作用定义作用1. 提高可用性2. 增强性能3. 实现扩展性 四、负载均衡类型硬件负载均衡代表设备优缺点 软件负载均衡应用层负载均衡代表软件优缺点 网络层负载均衡代表软件优缺点 五、负载均衡算法轮询算法&#xff08;Round Ro…

linux下的Redis的编译安装与配置

配合做开发经常会用到redis&#xff0c;整理下编译安装配置过程&#xff0c;仅供参考&#xff01; --------------------------------------Redis的安装与配置-------------------------------------- 下载 wget https://download.redis.io/releases/redis-6.2.6.tar.gz tar…

A2A大模型协议及Java示例

A2A大模型协议概述 1. 协议作用 A2A协议旨在解决以下问题&#xff1a; 数据交换&#xff1a;不同应用程序之间的数据格式可能不一致&#xff0c;A2A协议通过定义统一的接口和数据格式解决这一问题。模型调用&#xff1a;提供标准化的接口&#xff0c;使得外部应用可以轻松调…

关键点检测--使用YOLOv8对Leeds Sports Pose(LSP)关键点检测

目录 1. Leeds Sports Pose数据集下载2. 数据集处理2.1 获取标签2.2 将图像文件和标签文件处理成YOLO能使用的格式 3. 用YOLOv8进行训练3.1 训练3.2 预测 1. Leeds Sports Pose数据集下载 从kaggle官网下载这个数据集&#xff0c;地址为link&#xff0c;下载好的数据集文件如下…

20250508在WIN10下使用移远的4G模块EC200A-CN直接上网

1、在WIN10/11下安装驱动程序&#xff1a;Quectel_Windows_USB_DriverA_Customer_V1.1.13.zip 2、使用移远的专用串口工具&#xff1a;QCOM_V1.8.2.7z QCOM_V1.8.2_win64.exe 3、配置串口UART42/COM42【移远会自动生成连续三个串口&#xff0c;最小的那一个】 AT命令&#xf…

第J7周:ResNeXt解析

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 目标 具体实现 &#xff08;一&#xff09;环境 语言环境&#xff1a;Python 3.10 编 译 器: PyCharm 框 架: Tensorflow &#xff08;二&#xff09;具体…

C++之类和对象:初始化列表,static成员,友元,const成员 ……

目录 const成员函数&#xff1a; 前置和后置重载&#xff1a; 取地址及const取地址操作符重载&#xff1a; 初始化列表&#xff1a; explicit关键字&#xff1a; static成员&#xff1a; 友元&#xff1a; 友元函数&#xff1a; 友元类&#xff1a; 内部类&#xff1a…

uni-app 中的条件编译与跨端兼容

uni-app 为了实现一套代码编译到多个平台&#xff08;包括小程序&#xff0c;App&#xff0c;H5 等&#xff09;&#xff0c;引入了条件编译机制。 通过条件编译&#xff0c;我们可以针对不同的平台编写特定的代码&#xff0c;从而实现跨端兼容。 一、条件编译的作用 平台差异…

Linux平台下SSH 协议克隆Github远程仓库并配置密钥

目录 注意&#xff1a;先提前配置好SSH密钥&#xff0c;然后再git clone 1. 检查现有 SSH 密钥 2. 生成新的 SSH 密钥 3. 将 SSH 密钥添加到 ssh-agent 4. 将公钥添加到 GitHub 5. 测试 SSH 连接 6. 配置 Git 使用 SSH 注意&#xff1a;先提前配置好SSH密钥&#xff0c;然…

[C++] 大数减/除法

目录 高精度博客 - 前两讲高精度减法高精度除法高精度系列函数完整版 高精度博客 - 前两讲 讲次名称链接高精加法[C] 高精度加法(作用 模板 例题)高精乘法[C] 高精度乘法 高精度减法 void subBIG(int x[], int y[], int z[]){z[0] max(x[0], y[0]);for(int i 1; i < …

视频添加字幕脚本分享

脚本简介 这是一个给视频添加字幕的脚本&#xff0c;可以方便的在指定的位置给视频添加不同大小、字体、颜色的文本字幕&#xff0c;添加方式可以直接修改脚本中的文本信息&#xff0c;或者可以提前编辑好.srt字幕文件。脚本执行环境&#xff1a;windowsmingwffmpeg。本方法仅…

ubuntu nobel + qt5.15.2 设置qss语法识别正确

问题展示 解决步骤 首选项里面的高亮怎么编辑选择都没用。如果已经有generic-highlighter和css.xml&#xff0c;直接修改css.xml文件最直接&#xff01; 在generic-highlighter目录下找到css.xml文件&#xff0c;位置是&#xff1a;/opt/Qt/Tools/QtCreator/share/qtcreator/…