Flink 有状态流处理State、Keyed State、Checkpoint、对齐/不对齐与生产实践 - 实践

news/2025/10/14 19:07:21/文章来源:https://www.cnblogs.com/wzzkaifa/p/19141811

1. 为什么要在流里“记住”?——State 的意义

在实际业务里,你很难“看一条算一条”。典型需求包括:

  • 模式识别:需要记住到目前为止的事件序列
  • 窗口聚合:分钟/小时/天的未完成聚合
  • 在线学习:模型参数的最新版本
  • 历史管理:高效访问过去的事件

这都需要 状态(State)。Flink 在运行时感知状态,借助 Checkpoints/Savepoints 保障容错,并支持弹性伸缩时的状态再分布。

2. Keyed State 与分区:让状态“本地化”

Keyed State 可以理解为嵌入式 Key/Value 存储:
只有在 Keyed 流(即经过 keyBy/分区)上才能访问状态,并且仅限当前事件的 Key。这保证了:
在这里插入图片描述

进一步地,Keyed State 被组织为 Key Groups(数量 = 最大并行度)。执行时,每个并行子任务处理一个或多个 Key Group,实现最小原子粒度的再分配

3. 状态持久化与恢复:Checkpoint 是如何工作的?

Flink 的 Exactly-Once 容错由 流回放 + Checkpointing 实现:

  • Checkpoint 记录:每个输入流的位置 + 每个算子的状态快照
  • 故障恢复:选择最近完成的检查点 k,恢复算子状态,源从 Sₖ 位置继续(Kafka 即 offset Sₖ)
  • Checkpoint 间隔 = 容错开销 ↔ 恢复时间 的权衡
  • 小状态场景下,快照足够轻量、可高频拍摄
  • 状态应存放于可靠分布式存储(生产别放 JobManager 内存)

启用 Checkpoint(默认关闭) 的基本配置示例(DataStream API,Java):

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每 10 秒做一次检查点
env.enableCheckpointing(10_000);
// Exactly-Once 语义(默认),常与 Kafka 事务 Sink 配合
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 允许的最大并发检查点数(通常 1~2)
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 状态后端与存储
// 1) RocksDB(大状态推荐)
// env.setStateBackend(new EmbeddedRocksDBStateBackend());
// 2) HashMap(小状态内存)
// env.setStateBackend(new HashMapStateBackend());
// Checkpoint 存储位置(必须是可靠持久化存储,如 HDFS/S3)
env.getCheckpointConfig().setCheckpointStorage("hdfs://nameservice1/flink/ckpt");

4. 屏障与对齐:一致性快照的关键细节

屏障(Barrier) 注入于 Source,携带快照 ID,与记录严格按序前进,不会超车。
中间算子所有输入收到同一快照的屏障后,才向所有输出发出该屏障;当 Sink 从所有输入收到屏障并确认后,快照完成。

多输入时为何要“对齐”?

如果一个输入先到了 n 号屏障,而其他输入还没到,此时继续处理该输入会把 属于快照 n 的记录与 属于 n+1 的记录混在一起。
对齐做的就是:暂停先到达屏障的那路输入,等待其余输入也到达 n,再统一快照并继续处理。

5. 不对齐检查点(Unaligned):为“慢路径”兜底

从 Flink 1.11 起,检查点可 不对齐

开启 Unaligned Checkpoints(示例)

env.getCheckpointConfig().enableUnalignedCheckpoints(true);
// 也可配置对齐超时,超时后自动退化为不对齐
// env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ofSeconds(30));

6. 算子状态快照:何时拍、拍到哪?

算子在从所有输入收到屏障、且向下游发出屏障之前拍摄状态快照。
为了避免阻塞,状态会异步写入配置的状态后端(如 HDFS/S3/RocksDB 增量)。
快照包含:

  • 每个并行 Source 的流位置(offset/position)
  • 每个算子状态对象的指针/句柄

7. Savepoints:升级迁移的“保险带”

  • 手动触发的检查点,写入状态后端
  • 不会因新检查点完成而自动过期
  • 常用于程序/集群升级拓扑变更Job 重部署等场景
  • 正确使用前务必理解 Checkpoints vs Savepoints 的差异(保存点的可移植性、格式与版本兼容性)

常用 CLI(示意)

# 触发保存点
flink savepoint <jobId> hdfs://.../savepoints/# 从保存点恢复flink run -s hdfs://.../savepoints/savepoint-xxx/_metadata job.jar

8. Exactly-Once vs At-Least-Once:延迟与语义的取舍

  • 对齐可能增加毫秒级延迟,但可给出 Exactly-Once

  • 追求极致低延迟(极少数毫秒级)时,可跳过对齐

    • 算子在部分输入已到达 n 的屏障后仍继续处理所有输入
    • 结果:在恢复时会出现重复记录(At-Least-Once)
  • 仅在完全并行(map/flatMap/filter… 无 join/shuffle)的拓扑中,即使 At-Least-Once 模式也能实际达到 Exactly-Once(因为无对齐点)

配置语义示例

// Exactly-Once(默认)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// At-Least-Once(更低延迟,但可能重复)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

9. 状态后端选择:数据结构与快照实现

状态后端决定 Key/Value 索引的数据结构快照实现

  • HashMapStateBackend:内存 HashMap,小状态、超低延迟
  • RocksDB(EmbeddedRocksDBStateBackend):外部化 KV,大状态首选,支持增量快照

经验法则

  • 小状态、链路低延迟要求高 → HashMap
  • 大状态、海量 Key、需容错稳定 → RocksDB 增量快照
  • 无需改业务代码即可更换状态后端

10. 批模式下的状态与容错:不做 Checkpoint,直接“全量回放”

批作业是“有界流”的特例(BATCH ExecutionMode):

11. 代码与配置:从零到一的最小可用骨架

11.1 处理乱序 + 事件时间窗口(DataStream + Watermark)

WatermarkStrategy<Event> wm = WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofMinutes(2)).withTimestampAssigner((e, ts) -> e.getEventTimeMillis());DataStream<Event> events = env.fromSource(kafkaSource, wm, "events");

11.2 手写 KeyedProcessFunction(对齐逻辑自定义)

events
.keyBy(Event::getUserId)
.process(new KeyedProcessFunction<String, Event, Output>() {private transient ValueState<Long> cnt;private transient ValueState<Long> winEnd;@Override public void open(Configuration p) {cnt = getRuntimeContext().getState(new ValueStateDescriptor<>("cnt", Long.class));winEnd = getRuntimeContext().getState(new ValueStateDescriptor<>("end", Long.class));}@Override public void processElement(Event e, Context ctx, Collector<Output> out) throws Exception {cnt.update((cnt.value()==null?0:cnt.value()) + 1);long end = align10m(e.getEventTimeMillis());if (winEnd.value()==null || winEnd.value()!=end) {if (winEnd.value()!=null) ctx.timerService().deleteEventTimeTimer(winEnd.value()+1);ctx.timerService().registerEventTimeTimer(end+1);winEnd.update(end);}}@Override public void onTimer(long ts, OnTimerContext c, Collector<Output> out) throws Exception {if (winEnd.value()!=null && ts==winEnd.value()+1) {out.collect(new Output(c.getCurrentKey(), winEnd.value(), cnt.value()));cnt.clear(); winEnd.clear();}}private long align10m(long t){ long s=10*60*1000L; return t - (t%s) + s - 1; }});

11.3 Table/SQL 更快上手(带 Watermark 的动态表)

CREATE TABLE events (
user_id STRING,
event_time TIMESTAMP_LTZ(3),
WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE
) WITH (...);
CREATE TABLE sink (...) WITH (...);
INSERT INTO sink
SELECT user_id, WINDOW_END AS win_end, COUNT(*) AS cnt
FROM TABLE(
TUMBLE(TABLE events, DESCRIPTOR(event_time), INTERVAL '10' MINUTES)
)
GROUP BY user_id, WINDOW_START, WINDOW_END;

12. 生产最佳实践清单(可当上线前自检表)

  1. 语义选择:默认 Exactly-Once;极端低延迟再考虑 At-Least-Once / Unaligned

  2. Watermark:乱序容忍度保守设置,评估延迟与正确率

  3. 状态治理

    • 估算 Key/状态规模与 TTL(避免“永不清理”)
    • RocksDB + 增量快照应对大状态;关注热点与倾斜
  4. 检查点可靠性

    • Checkpoint 落在可靠存储(HDFS/S3),限制并发与超时
    • 定期 Savepoint 作为可回退版本
  5. Connectors 语义:Kafka Source 可回退;Sink 用两阶段提交/事务保证端到端一致性

  6. 观测与告警:Backpressure、Busy Time、Checkpoint Duration/Alignment、Watermark Lag、State Size

  7. 演进与扩缩容:用 Savepoint 做算子并行度/拓扑变更;最大并行度与 Key Group 提前规划

  8. 批/流一体:批作业不做 Checkpoint,容错依赖全量回放,成本主要在恢复阶段

13. 结语

理解 状态时间,是把 Flink 从“能跑”推进到“跑得稳、跑得准”的关键。
Keyed State 把业务上下文本地化;用 Checkpoints/Savepoints 守住一致性与可进化;根据链路特性在 对齐/不对齐 之间做正确取舍;结合 状态后端 管好规模与性能。
当这些拼图都对齐,你的实时系统就具备了在生产环境长期演进的基础能力。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/937017.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

对static新的认识

学习java时,我首先声明了一个统一响应结果的实体类: public Result success(Object object){Result result = new Result();result.code=1;result.msg="success";result.data=object;return result;}然后调…

C++STL之stack,queue与容器适配器 - 教程

C++STL之stack,queue与容器适配器 - 教程pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "…

2025年氧化镁厂家最新推荐排行榜,电工级/高温/低温/中温/防火电缆/矿物绝缘/熔盐加热器/电热管用/单头管用/合成云母用氧化镁公司推荐!

随着工业技术的不断进步,氧化镁在各个领域的应用越来越广泛。从电工级氧化镁到高温、低温、中温氧化镁,再到防火电缆氧化镁、矿物绝缘氧化镁、熔盐加热器氧化镁、电热管用氧化镁、单头管用氧化镁以及合成云母用氧化镁…

智能体分析

AI智能体技术正经历快速发展,从早期的简单对话机器人演进为能够自主规划、执行复杂任务的多模态系统。本次将对市面上功能较为完善的智能体平台进行系统分析,从通用型、垂直领域、开源/免费工具三个维度,对比它们的…

Excel - lookup()

Excel - lookup() Excel lookup(B3, sheet2!A3:A33, sheet2!E3:E33), 向下填充时如何只增加第1个参数值?例如增加B3为B4, B5...,保持sheet2!A3:A33, sheet2!E3:E33不变。 ChatGPT said:你这个问题是 Excel 相对/绝对…

2025 年玄武岩厂家推荐榜:玄武岩/0-3mm/3-5mm/5-10mm/10-15mm/10-20mm/石子厂,聚焦基建升级与高端化需求,山东展飞建筑材料有限公司成优选

随着国内基建工程持续推进(高铁、高速、市政项目等)、绿色建材标准趋严及高端新材料需求逐步释放,玄武岩建筑材料凭借高强度、耐磨损等特性,已从传统基建专属逐步拓展至道路铺建、景观绿化、污水治理等多个领域,2…

2025 佛山铝合金/系统/断桥铝/耐用/推拉/封阳台/别墅/静音门窗厂家品牌实力推荐:聚焦技术与服务的五大优选标杆

在家居装修与建筑工程中,门窗的性能与品质直接影响居住舒适度与建筑安全性。随着行业向 "科技美学" 升级,兼具技术创新、品质保障与服务能力的品牌愈发受到关注。以下结合市场调研与实际应用场景,推荐五个…

Ubuntu22.04 server网络配置

配置ubuntu ip地址新安装的 Ubuntu 服务器首需要配置 ip 才能进行网络连接。 1 配置 IP 1.1 查看网卡信息 sudo apt install lshw -y sudo lshw -C network例如我这里使用有线网网卡名是 enp3s01.2 配置 ip 编辑 netpl…

完整教程:深度学习优化器全面指南:核心参数选择与实战策略

完整教程:深度学习优化器全面指南:核心参数选择与实战策略pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Conso…

C#——方法的定义、调用与调试 - 详解

C#——方法的定义、调用与调试 - 详解pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Mona…

说说新版畅联云的一些重要约定

先了解几个基本概念:品目类似文件夹,而且从顶级来看,畅联将AIoT分成了3大分类,一个是视频设备,一个是IoT设备,一个是算法。品目是无限级的,可以一层层建一下。产品产品属于某一个具体品目,譬如某个4G安全帽产品…

App.vue(完整可运行示例)

🔧 TXT 文件 URL 批量替换工具将文件中的 https://alicdn-imags.zsgw.vip/default/20250826/imags 替换为 https://site.shrtxs.cn/agent <div><inputtype="file"ref="fileInput"@chan…

Windows MySQL 报错

1. 首次更改密码报错解决 ERROR 1820 (HY000) ERROR 1820 (HY000): You must reset your password using ALTER USER statement before executing this statement. 问题解决修改密码 ALTER USER username@host IDENTIF…

Redis:高性能内存数据库的六大核心优势 - 教程

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

lvgl刷新回调事件实现说明

static void my_lvgl_flush_cb(lv_disp_drv_t* disp_drv, const lv_area_t* area, lv_color_t* color_p) {lock(1); #if 1uint32_t offset = 0;int32_t x, y;for (y = area->y1; y <= area->y2; y++) {for (x…

Avalonia Behaviors 在 StackPanel 空白处无效问题解析与解决方案

问题描述 在 Avalonia UI 开发中,很多开发者会遇到这样的问题:在 StackPanel 上添加了 Behaviors 和事件触发器,但是只有在 StackPanel 内部的文本、按钮等可视化元素上点击才有效,而在 StackPanel 的空白区域点击…

完整教程:Django 入门:快速构建 Python Web 应用的强大框架

完整教程:Django 入门:快速构建 Python Web 应用的强大框架pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Cons…

Hyperliquid 的稳定币USDH发行机制与发行商竞选指南

引言:稳定币是什么,为什么 Hyperliquid 需要它? 想象一下,在加密货币世界里,价格像过山车一样波动,你需要一个“稳定锚”来固定价值——这就是稳定币(Stablecoin)。稳定币是一种设计成与美元或其他资产挂钩的加…

windows上建简单的ssh版git仓库

说来话长,公司用的svn,内网,我想弄个简单的git仓库客户端自己用,问了好久的AI,真垃圾的这个AI。 过程如下: 1、Windows自带的OpenSSH 自己找找网上怎么开启,没有的话离线下载: 离线下载 2、安装git,这里就不说…