第一章:那只准时敲门的“幽灵”——Checkpoint与其背后的IO风暴
我们拿到的是一个极其诡异的现场:每30分钟一次,持续5分钟的反压。
这不像是因为数据倾斜导致的“长尾”,也不像代码逻辑死循环导致的“猝死”。它太规律了,规律得像是一个精心设计的定时炸弹。在排除了业务侧的“整点秒杀”或“批量导数”后,我们的目光必须从业务逻辑层,下沉到Flink的Runtime以及更底层的OS层。
很多人看到反压(Backpressure),第一反应是看inPoolUsage,看Stack Trace。错了。对于这种周期性故障,你首先要看的是时间轴的重合度。
1.1 嫌疑人X:Checkpoint的“对齐”诅咒
你可能会说:“我的Checkpoint设的是3分钟一次,跟30分钟有什么关系?”
别急。让我们把Flink Web UI的Checkpoint History拉出来,别只看最近的几次,我要你拉过去24小时的。你需要关注的不是“Completed”,而是那些Duration突然变长,或者Alignment Time突然飙升的点。
假设你发现了Checkpoint的时间虽然是3分钟一次,但每隔10次左右(约30分钟),就会出现一次耗时极长的Checkpoint(比如从10秒飙升到4分钟)。
这时候,Barrier对齐(Barrier Alignment)就是最大的嫌疑人。
在EXACTLY_ONCE语义下,Flink算子需要等待所有上游通道的Barrier到齐。如果每30分钟,HDFS(或S3/OSS)作为State Backend的存储端,出现了一次写入延迟抖动,或者你的网络带宽在那一刻被某个“邻居”挤占了,Checkpoint的同步阶段(Sync Phase)就会被无限拉长。
排查实操:
别盯着end_to_end_duration发呆,那个指标太宏观,没用。去Metrics里找这两个关键指标的P99和Max值:
checkpoint_alignment_time:如果这个值周期性飙升,说明数据流中有“慢车”。Barrier被卡在了某个Channel里。checkpoint_start_delay:这个更隐蔽。如果这个值高,说明系统在Checkpoint还没开始就卡住了!通常是因为Checkpoint Lock拿不到,或者Task Thread在处理一个巨大的Record,根本没空响应Barrier。
老鸟笔记:很多时候,周期性的start_delay飙升,是因为你的Source端在每30分钟进行一次Partition Discovery(分区发现)或者元数据刷新,这个操作是同步阻塞的,直接导致Barrier发不出来。
如果你发现Checkpoint的周期和反压周期完美重合(或者反压发生在Checkpoint期间),那么恭喜你,范围缩小了。但这只是表象。
为什么Checkpoint会变慢?
1.2 RocksDB的“Compaction风暴”
这才是重头戏。绝大多数生产环境的大状态作业都跑在RocksDB上。RocksDB是基于LSM Tree的,它的写入是极快的(Append Only),但读取和清理是昂贵的。
你有没有配置过这个参数?state.backend.rocksdb.compaction.style
如果是默认的LEVEL模式,RocksDB会把数据分成L0, L1, ... Ln层。当L0层满了(通常很快),它会触发Compaction(合并压缩),把数据Merge到L1。这个过程还好。但随着时间推移,数据会像滚雪球一样推向高层级。
高能预警:当数据从L3合并到L4,或者L4到L5时,涉及的磁盘IO量是指数级爆炸的。
如果你的业务场景中,State的TTL(过期时间)设置得不凑巧,或者你的写入量正好在30分钟左右填满了一个较大的Level阈值,就会触发一次Major Compaction。
这就是那“5分钟”的真相。
在Compaction发生时,尽管RocksDB宣称是后台线程在跑,但它会疯狂抢占磁盘IO和CPU。如果你的TaskManager所在的机器磁盘IOPS(比如云盘)被打满,Checkpoint的fsync操作就会被阻塞。一旦Checkpoint卡住,反压立刻顺着算子链往上传导。
验证手段(不看即使你把眼珠子瞪出来也找不到原因):
开启RocksDB Native Metrics:在
flink-conf.yaml里加上:state.backend.rocksdb.metrics.actual-delayed-write-rate: truestate.backend.rocksdb.metrics.background-errors: truestate.backend.rocksdb.metrics.num-running-compactions: true观察Grafana监控:盯着
rocksdb_num_running_compactions和rocksdb_background_errors。 如果这根曲线呈现出锯齿状,并且波峰正好对应你反压的30分钟周期,案子就破了。IO Wait指标:切到机器监控(Node Exporter),看
iowaitcpu usage。如果那5分钟里,iowait从1%飙到了30%以上,这就是典型的磁盘瓶颈。
解决策略:这里的坑很深。千万别直接把state.backend.rocksdb.thread.num调大,那只会让IO死得更快。
试着换成Universal Compaction风格(
state.backend.rocksdb.compaction.style: UNIVERSAL),它对写更友好,虽然读性能稍差,但能避免周期性的IO尖峰。增大Write Buffer:让更多数据在内存里合并,减少落盘频次。
第二章:JVM的“定期大扫除”——GC STW的隐秘角落
如果RocksDB和Checkpoint看起来都岁月静好,IO也没有波澜,那我们就得把听诊器贴到JVM的心脏上。
30分钟一次,持续5分钟。这个时间跨度对于一次普通的Full GC来说太长了(除非你的堆有几百G且配置极烂),但对于内存泄漏导致的频繁GC爆发期,或者是G1 GC的Mixed GC周期,是有可能的。
2.1 堆外内存的“慢性中毒”
Flink不仅用堆内(Heap),还大量使用堆外(Off-Heap/Direct)内存,特别是Netty的网络传输Buffer。
想象这样一个场景:你的代码里用了一个第三方Client(比如去查Redis或HBase),你以为你把它关了,但其实每次连接都泄露了一个DirectByteBuffer。 前25分钟,内存池慢慢涨,JVM觉得“我还行,我能扛”。 到了第29分钟,Direct Memory逼近-XX:MaxDirectMemorySize。 JVM慌了。它开始拼命调用System.gc()来回收堆外内存(注意:堆外内存的回收往往依赖堆内对象的finalize或Cleaner机制,这需要触发Full GC)。
于是,你看到了连续5分钟的“GC地狱模式”。CPU利用率飙升,但全在做GC,Task线程根本拿不到时间片处理数据,导致反压。
怎么抓现行?
别只看Heap Usage。在Grafana上把以下几个指标叠在一起看:
GarbageCollection Count (Old Gen / Full GC)
GarbageCollection Time
BufferPool Direct Memory Used(这是关键!)
如果每30分钟,Direct Memory Usage达到顶峰,紧接着伴随一波Full GC,然后Memory Usage断崖式下跌,这就是典型的资源未释放问题。
2.2 G1 GC的“Humongous Object”陷阱
如果你用的是G1 GC(现在的标配),要特别小心Humongous Allocation(巨型对象分配)。
在Flink里,如果你有个算子在做窗口聚合,比如WindowFunction,你用了一个List<String>来攒数据。 正常流量下没事。但每隔30分钟,可能有一个上游的业务窗口关闭,吐出一个巨大的List。 只要这个对象超过了Region Size的50%,G1就会把它当做巨型对象,直接分配在老年代(Old Gen),而且往往需要连续的Region。
如果内存碎片化严重,G1为了找连续空间,会触发Serial Old GC(单线程Full GC,慢到令人发指)。
排查指令:去TaskManager的日志里grep这个:grep -i "Humongous" taskmanager.log或者开启GC日志详情:-Xlog:gc*=info,gc+humongous=debug:file=gc.log:time,uptime,level,tags
如果你在反压期间看到了大量的G1 Humongous Allocation日志,或者To-space exhausted,那你就要去查查你的代码了。是不是把什么巨大的HashMap或者List塞进了State里,或者在处理函数里搞了个巨大的临时变量?
代码审查点:检查是否有ListState被全量读取到内存(Wait for it...iterable.iterator()),然后转成了Java List。如果State里有几十万条数据,这个动作每30分钟触发一次,绝对能把JVM搞崩。
2.3 那个被遗忘的“Local Cache”
还有一个非常容易被忽视的杀手:应用层本地缓存(Guava Cache / Caffeine)。
开发者为了加速Lookup Join,通常会在RichFunction里搞个Cache。CacheBuilder.newBuilder().expireAfterWrite(30, TimeUnit.MINUTES)...
看到这行代码了吗?30分钟过期。
如果你的Cache很大(比如存了GB级别的维表数据),且设置了集中过期。每过30分钟,Cache里的几百万个对象同时失效。 虽然失效本身不耗时,但随后的GC周期需要扫描并标记这几百万个死对象。更糟糕的是,缓存失效意味着缓存击穿。 接下来的几分钟里,所有的请求都会穿透到外部存储(MySQL/HBase)。 外部存储扛不住突发的并发,响应变慢。 Flink算子等待IO,处理变慢。反压形成。
验证方法:
Review代码,搜索
expire关键字。看外部存储(如HBase)的监控,在反压期间,QPS是不是突然暴涨?Latnecy是不是飙升?
解决之道:给过期时间加个Jitter(抖动)。 不要写死30分钟,写成30分钟 + Random(0-300秒)。让缓存失效的时间分散开,别搞“大阅兵”。
第三章:不仅是Flink的事儿——外部生态的“蝴蝶效应”
我们不能只盯着Flink的一亩三分地。Flink是流式计算的管道,管道堵了,很有可能是出口被封住了,或者是由于某种神秘的“引力波”干扰。
3.1 HDFS/S3 的“Block回收周期”
这是我亲身经历过的一个坑。 曾经有个集群,也是每隔一段时间反压。查了半天Flink没问题。 最后发现是HDFS的NameNode在做CheckPoint(Image合并)或者DataNode在做Block Report。
如果你的Flink作业Sink到HDFS(比如StreamingFileSink),且文件滚动策略设置得比较碎(小文件多)。 每隔一段时间(通常是小时级别,但也可能配置成30分钟),HDFS DataNode会向NameNode汇报全量Block信息。 如果集群文件数过亿,这个Report过程会消耗大量网络带宽和NameNode锁资源。 这时候,Flink请求写Block会超时。 StreamingFileSink卡在flush()或close()方法上。 反压开始。
怎么看?不要只看Flink日志。去翻翻HDFS DataNode的日志,看看有没有Block report相关的耗时记录。或者问问运维大哥,那个时间点NameNode的RPC Queue Time是不是高了。
3.2 Kafka Broker的“Rebalance”与“Log Compaction”
如果你的Source是Kafka。 30分钟一次,有没有可能是Consumer Group发生了Rebalance? 虽然Rebalance通常很快,但如果你的Topic Partition巨多(几千个),Consumer初始化慢,Rebalance可能持续几分钟。 在这几分钟里,Source停止消费,下游没数据——这叫“断流”,不叫反压? 不。 如果部分Consumer上线了,部分还没上,或者Rebalance导致Partition分配倾斜,某些Task负载激增,就会导致局部的反压。
更隐蔽的是Kafka服务端的Log Compaction或Segment Rolling。 如果在30分钟时,Kafka Broker所在的磁盘正在疯狂做Log Cleanup,IO被打满。Flink写Kafka(Sink端)的ACK延时增加。 Sink慢,全链路慢。
实锤证据:在Flink Metrics里看KafkaProducer的request-latency-avg。 如果这根线和反压周期吻合,别查Flink了,去盘Kafka Broker。
3.3 容器环境的“嘈杂邻居” (Noisy Neighbor)
既然你说已经排除了业务高峰,那我们就假设流量是平稳的。 但是,物理机是平稳的吗?
现在的Flink大多跑在K8s或Yarn容器上。你的TaskManager可能和别人的ElasticSearch节点,或者一个定时的ETL批处理任务跑在同一台物理机上。 每30分钟,那个ETL任务启动,狂吃网络带宽(Network Throttling)或Page Cache。 OS层面的资源争抢是残酷的。如果不做绑核(CPU Pinning)或严格的Cgroup隔离,你的CPU时间片会被偷走。
Top命令大法:这需要一点运气和脚本。写个脚本,在反压期间,自动SSH到反压严重的TaskManager所在节点,执行top和iotop。 看看除了java(你的TaskManager)之外,还有没有哪个看起来面目可憎的进程在榜首赖着不走。 如果是Yarn环境,看看NodeManager的日志,是不是那个点有新的Container启动了?
第四章:时间引发的血案——Timer Storm(定时器风暴)
Flink是基于事件时间(Event Time)的,这大家都知道。但很多人忽略了,Flink内部对时间的驱动是依赖Timer(定时器)的。
如果你的业务逻辑里用到了窗口(Window),或者在ProcessFunction里注册了定时器(ctx.timerService().registerEventTimeTimer(...)),那么请警惕“整点效应”。
4.1 窗口触发的“拥堵路口”
假设你有一个TumblingWindow(滚动窗口),窗口大小正好是30分钟。window(TumblingEventTimeWindows.of(Time.minutes(30)))
虽然Flink的数据是流式处理的,每来一条处理一条。但是,窗口的计算和输出是批量的。 每过30分钟,Watermark推过了窗口结束时间。这时候,所有Key的窗口同时触发计算(Trigger Fire)。
如果你的Key基数(Cardinality)很大,比如有100万个Key。 在那一毫秒,Flink的TimerService会尝试触发100万个onTimer回调。 这100万个回调虽然是在各个Slot里并行执行的,但对于单个Slot来说,它是串行的。
现象还原:平时CPU利用率20%。 一到30分钟整点(或者Watermark推进到的那个逻辑时间点),CPU瞬间打满(User CPU高,System CPU低)。 反压从Sink端瞬间传导到Source。 持续几分钟处理完这批聚合后,一切恢复平静。
但这太简单了,你应该早就排除了对吧?那我们看个更复杂的:Watermark的周期性滞后。
4.2 Watermark生成器的“摸鱼”时刻
如果你的Source是Kafka,且分区很多(比如100个Partition),而你的并行度只有10。意味着每个Source Task要读取10个Partition。
Flink的Watermark是取所有Partition中最小的那个。 如果有一个Partition因为某种原因(比如上游写入该Partition的Producer挂了,或者网络抖动)断流了30分钟。 Flink的全局Watermark就会停在原地不动。
憋大招模式开启:
Watermark卡住30分钟。
但这30分钟里,其他正常的99个Partition数据还在源源不断地进来。
因为Watermark没动,所有依赖Watermark触发的窗口、定时器全部憋在内存里,无法触发,也无法清理。State越来越大。
突然!那个断流的Partition恢复了,或者因为
idleTimeout机制触发,Watermark瞬间向前跳跃了30分钟。灾难发生:积压了30分钟的所有定时器,在这一瞬间被同时唤醒。
这就好比大坝泄洪。本来应该是涓涓细流的触发,变成了一次毁灭性的洪峰。这5分钟的反压,就是Flink在疯狂补作业。
排查核武器:
去Flink Web UI,找到Source算子,点开Watermarks指标。 不要看平均值。看Min Watermark。 观察它是不是呈“台阶式”跳跃? 正常应该是平滑上升的直线。如果它是一条水平线,然后突然垂直上升,那就是这个问题。
代码里怎么修?一定要配置withIdleness!
StreamExecutionEnvironment env = ... env.getConfig().setAutoWatermarkInterval(200); // 在WatermarkStrategy里加上这个! WatermarkStrategy .<MyType>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withIdleness(Duration.ofMinutes(1)); // 关键!这个配置的意思是:如果某个分区1分钟没数据,就把它标记为Idle,不再用它来拖累全局Watermark。
第五章:僵尸连接与网络防火墙的“30分钟魔咒”
如果你的Flink作业需要写数据库(MySQL, PostgreSQL, Oracle)或者调用外部HTTP接口。这一章请反复阅读三遍。
很多公司的网络架构里,在应用层和数据库层之间,横亘着一道防火墙(Firewall)或者负载均衡(SLB/LVS)。 这些网络设备有一个默认的安全策略:TCP Idle Timeout(空闲连接超时)。
这个值,在很多设备上,默认就是30分钟。
5.1 沉默的杀手
场景还原: 你的Sink是一个JDBC Sink(或者自定义的RichSinkFunction),里面维护了一个连接池(Connection Pool,比如HikariCP或Druid)。 作业运行过程中,可能某些Sink Subtask分到的数据比较稀疏,或者因为之前的某种过滤逻辑,导致某个数据库连接在30分钟内没有数据传输。
这时候,防火墙静悄悄地把这个TCP连接在路由表中抹去了。 注意:它不会发送RST包给客户端,也不会发给服务端。它只是默默地“忘掉”了这个连接。
30分钟零1秒,新的数据来了。 Sink线程拿起这个连接,试图发送一条Insert SQL。 因为TCP连接在本地看来还是ESTABLISHED状态,数据发包成功写入Socket Buffer。 但是,数据包到了防火墙那里,防火墙说:“你谁啊?我不认识你。” 直接丢弃(Drop),不给任何回应。
Sink线程陷入了SocketOutputStream.write()或者读取响应的read()中。 它在等。 等什么?等操作系统的TCP重传机制,或者JDBC驱动层的Socket Timeout。 这一等,可能就是15分钟(Linux默认TCP重传甚至更久)。
这就是为什么反压会持续几分钟的原因:线程被IO Block住了。直到超时抛出异常,连接池剔除坏连接,重新建立新连接,系统才恢复正常。
怎么验证?这很难通过Metrics看到,因为线程是卡在Native方法的。 你需要用netstat -ano | grep ESTABLISHED或者ss命令,配合抓包。 但最简单的验证方法是:改配置。
保命手段:
开启TCP KeepAlive:在操作系统层面和JDBC URL参数里都开启。但TCP KeepAlive默认间隔通常是2小时,远水解不了近渴。
连接池心跳(Validation Query): 这是最管用的。在你的连接池配置里,必须设置:
testWhileIdle = truetimeBetweenEvictionRunsMillis = 60000(1分钟检查一次)minEvictableIdleTimeMillis = 300000(5分钟不用就回收)最关键的:
validationQuery = "SELECT 1"(或者JDBC 4的isValid())
强迫连接池每隔几分钟就发个心跳包,哪怕没数据,也要通过网络层告诉防火墙:“大哥,我还在,别杀我。”
真实案例:我曾经排查过一个写入ElasticSearch的作业,也是每30分钟卡死。最后发现是云厂商的NAT网关对Idle连接的超时限制正好是1800秒。我们在RestClient里加了KeepAlive回调,问题瞬间消失。
第六章:剖析黑盒——当Stack Trace遇到Arthas
如果以上都是推测,那现在我要教你如何抓现行。 当反压发生的那5分钟里,TaskManager内部到底发生了什么?别猜,我们要看证据。
传统的jstack虽然有用,但对于短暂的5分钟,往往你登上去敲完命令,现场已经没了。而且jstack只能看到快照,看不到CPU时间片到底花哪儿了。
这里推荐用阿里开源的Arthas,或者Async-Profiler。但为了生产环境安全(不需要安装Agent),我们用最简单的Flame Graph(火焰图)思维。
6.1 抓住那只“吞噬CPU的怪兽”
假设反压又开始了。第一步:找到最忙的那个TaskManager的PID。top -H -p <PID>找到占用CPU最高的那个线程ID(假设是hex 0x1a2b)。
第二步:采样。 如果你有权限,用async-profiler是最好的:./profiler.sh -d 30 -f flamegraph.html <PID>采集30秒的CPU Profile。
第三步:分析火焰图。 打开生成的HTML。 找那座“平顶山”(Plateau)。 火焰图的X轴是占用CPU的时间比例。如果你看到一段很宽的条,上面写着:java.util.HashMap.resize()或者com.yourcompany.utils.ComplexRegex.match()或者org.rocksdb.RocksDB.get()
如果是HashMap resize:说明你在处理函数里疯狂创建大Map或者频繁扩容,每30分钟一波数据洪峰导致频繁扩容。如果是Regex:说明某条特殊的数据触发了你的正则回溯死循环(ReDoS)。如果是RocksDB get:说明前面提到的IO问题还是没解决,或者你的Key设计导致了BloomFilter失效,每次都要读盘。
6.2 奇怪的锁竞争
有时候,CPU不高,但就是处理慢。这时候要看锁。 在Arthas里输入:thread -b(找出当前阻塞其他线程的线程)
你可能会发现这样一个恐怖场景: 所有的SourceReader线程都在WAITING (parking)。 而被谁Block住了呢? 可能是一个静态单例(Static Singleton)的工具类,里面有个synchronized方法。 或者是因为Log4j 1.x 在高并发写日志时的锁竞争(哪怕是AsyncAppender,如果Queue满了也会阻塞)。
一定要查日志框架:很多故障是因为每30分钟,业务触发某种边缘case,打印大量Error日志。log.error("...", e)同步写文件,磁盘IO变慢,Log4j锁住所有业务线程。验证方法:直接把日志级别调成OFF,看看反压还在不在。如果在,那就是日志系统背锅。
第七章:隐形倾斜——不是所有的热点都写在脸上
提到数据倾斜(Data Skew),是个写Flink的都能背两句:加盐(Salt)、两阶段聚合。 但那种倾斜通常是持续性的。某个Key一直热,某个Subtask一直忙。
你遇到的可是“30分钟一次”的周期性反压。这意味着,这个热点Key是间歇性爆发的。
7.1 “脉冲式”热点(Pulse Skew)
场景还原: 上游业务系统有个定时任务(Cron Job),每30分钟把一批特定的状态数据推送到Kafka。 这批数据有一个共同特征,比如它们的User ID都是0或者SYSTEM_ADMIN,或者某个字段是空的。 平时,正常用户的流量像细水长流,经过keyBy(userId)后均匀分布在各个Subtask上。 但每到30分钟整,那几十万条userId=0的数据瞬间涌入。
根据Hash算法,hash(0)的结果是固定的。 这意味着,这几十万条数据会全部挤进同一个Subtask里。 其他的Subtask在围观,唯独这个Subtask被活活撑死。
怎么确诊?
别看总体的Backpressure指标。你要点开Flink Web UI的该算子详情,查看Subtasks列表。 一定要在反压发生的那5分钟里看! 如果发现:
Subtask 3:InPoolUsage 100%,Backpressure High。
Subtask 0, 1, 2, 4...:InPoolUsage 0%,Backpressure OK。
这就是典型的单点脉冲倾斜。
解决这种“定时炸弹”的绝技:
普通的加盐(给Key加随机后缀)虽然能解决,但在需要精准去重或排序的场景下会很麻烦。 我推荐一种更轻量的“动态采样+旁路”策略:
Metric埋点:在KeyBy后的处理函数里,加个简单的Counter,统计每个Key的QPS。
日志打印:
if (counter > threshold) log.warn("Hot Key Detected: {}", key);发现凶手后:如果确认是
userId=0这种无效数据或者是系统数据,直接在Source端或者KeyBy之前把它Filter掉,或者Rebalance到单独的侧输出流(SideOutput)去慢慢处理,别让一颗老鼠屎坏了一锅汤。
7.2 那个被忽视的null值
千万别信业务方说“这个字段必填”。 在ETL链路里,Null值往往是倾斜之源。 如果每30分钟有一批脏数据进来,Join Key全是Null。 Flink默认会把Null当做一个普通的Key来Hash。 结果就是:所有Null值数据全去了同一个Subtask。
防御性编程:
.filter(data -> data.getKey() != null) // 简单粗暴 // 或者 .keyBy(data -> { if (data.getKey() == null) { return UUID.randomUUID().toString(); // 把Null打散! } return data.getKey(); })这行代码能救你一命。
第八章:序列化的代价——POJO vs Kryo 的暗战
如果你的CPU在反压期间飙高,但Logic代码很简单,也没发生GC。那你要看看你的对象是怎么在这个分布式系统里传输的。
Flink的数据在算子之间传输(Network Shuffle)时,必须序列化。 Flink有一套非常高效的POJO Serializer。但前提是你的Java Bean必须符合POJO规范(有空构造函数、Getter/Setter等)。 一旦你的对象里包含了一些奇怪的类型(比如未经注册的第三方库对象,或者Object类型),Flink就会退化使用Kryo序列化。
Kryo虽然全能,但比POJO Serializer慢10倍不止,而且极其消耗CPU。
这跟30分钟有什么关系?
假设每30分钟涌入的那批数据,是一个复杂的、嵌套很深的JSON转换过来的大对象。或者这批数据里包含了某种多态类型。 平时的小对象,POJO搞得定。 30分钟时的那批大对象,触发了Kryo的深层递归序列化。 CPU瞬间被序列化操作吃光,来不及处理业务逻辑。反压形成。
核查手段:
在你的main函数最前面,加上这行代码:
env.getConfig().disableGenericTypes();这就好比给系统装了个“报警器”。如果Flink发现无法使用高效序列化器,必须回退到Kryo时,它会直接抛出异常,让程序报错。
别怕报错。报错是好事。 看到报错信息,你就能知道是哪个Class没写好。 然后,要么把它改成标准的POJO,要么手动注册TypeInfo。 把这个隐患消灭在萌芽状态,别让它在生产环境每30分钟折磨你一次。
第九章:微批化的艺术——LocalKeyBy 与 MiniBatch
流计算是逐条处理的,但这在某些高吞吐场景下是极大的浪费。 每来一条数据,都要序列化、网络传输、反序列化、拿State锁、读State、写State、释放锁。
如果每30分钟有一波高峰,QPS翻了10倍。这种逐条处理的开销(Overhead)就成了瓶颈。
9.1 手写 LocalKeyBy(本地预聚合)
在数据进行keyBy(网络Shuffle)之前,先在本地的Map里攒一攒。 比如,在Source或者是Map算子后面,接一个BundleOperator。
逻辑如下:
定义一个本地Map
Map<Key, Accumulator>。数据来了,先不发往下游,而是更新本地Map。
关键点:当Map大小达到阈值(比如1000条),或者时间过去了一小会儿(比如100ms)。
将Map里的聚合结果,统一发往下游。
效果:原本30分钟那一波峰值,可能有100万条记录,需要Shuffle 100万次。 经过预聚合,只要Key的重复率高,可能只需要Shuffle 1万次。 网络压力瞬间降低99%。
代码骨架(干货):
public class LocalAggregator extends RichFlatMapFunction<Event, Event> { private transient Map<String, Long> buffer; private final int flushSize = 1000; @Override public void flatMap(Event value, Collector<Event> out) { buffer.merge(value.getKey(), 1L, Long::sum); if (buffer.size() >= flushSize) { flush(out); } } // 别忘了在 close() 里 flush 剩下的数据! @Override public void close() { flush(out); // 伪代码,实际要把Collector存起来 } }当然,Flink SQL里只需要开启table.exec.mini-batch.enabled,参数配好,它底层就是这么干的。但在DataStream API里,很多时候得自己撸。
第十章:最后的杀手锏——自定义分区器(Custom Partitioner)
如果你的下游Sink也是一个分布式系统(比如写入HDFS的不同Bucket,或者写入Kafka的不同Partition)。 而30分钟那一波数据,业务逻辑上决定了它们都要去往同一个下游分区。
比如,按照时间切分Bucket写入HDFS。30分钟整,所有数据都要写入2023-10-27-10-30这个文件夹。 不管你在Flink里怎么并行,到了Sink端,本质上是对同一个文件系统的目录在进行高并发写入(或者Rename操作)。 文件系统的NameNode锁竞争,或者单目录下的文件数限制,会反向卡住Flink的Sink。
破局思路:
不要直接用KeyBy。使用partitionCustom。
dataStream.partitionCustom(new Partitioner<String>() { @Override public int partition(String key, int numPartitions) { // 这里的逻辑你自己定! // 比如检测到热点Key,就轮询发送给不同的Subtask // 或者结合负载情况动态分发 return (key.hashCode() & Integer.MAX_VALUE) % numPartitions; } }, "keyField");通过完全控制数据流向,你可以绕过那些拥堵的“收费站”。甚至可以实现一个“感知负载的分区器”:如果发现下游Subtask 3的处理延迟高了(可以通过共享变量或外部存储反馈),就把流量切给Subtask 4。这属于高阶玩法,慎用,但救命时很管用。