Flink Keyed State的优化与实践

本期作者

1.背景

Flink SQL在业务使用中有较多的双流join场景,当左右流的流量都较大,Join的等待时间即使为1小时,Flink Keyed State(Flink State分Operator State和Keyed State,后文所有State均代表后者)的存储大小也很容易达到TB级(内部默认使用的是RocksDBStateBackend)。

在State我们内部[1]之前就做了RT和长度的metric,当State的存储达到TB级别后,会发现State的scan/next/readNull请求RT会变得较高,另外双流Join不仅流量大,Join query的字段也较多,导致State的Value长度也较大,从而使得任务在流量高峰期CPU存在明显的周期性毛刺,根因是RocksDB的compaction引发。我们下面的内容主要是从业务场景跟进到RocksDB的读写行为,来优化RT耗时高的问题,并使用优化方案缓解compaction的压力。

2.Flink Keyed State诊断

我们统计了内部的TB级别大State任务,其State均由双流Join Operator产生。Flink的双流Join有两类,一类是无时间区间限制的Regular Join,其左右流的State Key在Inner和Outer Join下均会存放单条RowData数据,在监控中会发现Key length普遍较大,这类作业在内部使用数量以及State大小均相对较小。另一类是有时间区间限制的Interval Join,以及内部基于Interval Join实现的延迟弹出数据的Latency Join,其左右流的State在大State任务中Key很小,而Value较大,如下图所示是大State任务LatencyJoin的写KV长度监控。

图片

2.1 双流Join特性

第二类双流Join在内部使用数量和State存储大小均很大,内部的商业和AI这两个业务线中使用得最多,我们着重看一下这类Join在Flink的实现。

这类双流Join Operator,在Flink中会使用两个State,分别缓存左流和右流的数据,其结构为MapState<long,list>,Key存放的是毫秒时间戳,Value存放的是当前毫秒时刻相同On表达式列下的所有RowData记录,一个RowData就是当前流在Join后需要被Project的所有字段合集。当左右流的一条记录被Project的字段越多,单个RowData存储的字段也就越多,一个RowData的长度由所有Project字段的长度之和决定,大State任务中一般左右流的Project数量和长度均较大,所以也就导致了一个时间戳下即使只有一条RowData,写入RocksDB的List序列化结果也会较长。

图片

RocksDB的写流程示意图[2]如下,一对KV从client写入RocksDB中,会先写入WAL中(Flink中已关闭WAL),然后写入Memory table中,当Memory Table写满(阈值write_buffer_size=64MB)或主动触发flush(Flink checkpoint触发RocksDB的snapshot)后,会将数据刷入磁盘写入L0层的SST中,当L0层的文件数达到阈值(level0_file_num_compaction_trigger=4)会触发compaction操作,将L0层的所有数据和L1的数据合并并写入到L1中;当L1的存储大小达到阈值(max_bytes_for_level_base=256MB)后会触发compaction,将L1的文件和L2的若干个文件合并并写入L2;在L2及其以后,LN+1的存储大小为LN的10(max_bytes_for_level_multiplier)倍时均会触发compaction向LN+1合并数据。

图片

将任务添加state.backend.rocksdb.log.level=DEBUG_LEVEL配置后会发现,TB级别的双流Join大State任务,RocksDB的SST层级会变成[2,4,41,98,0,0,0],代表着L0至L7中SST文件个数,L3的文件数会十分多,这也是由于双流Join的两个特性决定的,一个是流量特别大,需要记录的KV数量庞大,另一个是前面提到的Value较长,导致整体的存储消耗也会较大,而RocksDB的层级分布,导致较多的数据存储到了L3,层级变多会导致compaction整体耗时变得较高。

2.2 RocksDB读RT高

RocksDB的读流程示意图[2]如下,Client读取数据会先从Memory Table中查询,如果没有找到则在磁盘的L0层读取数据,由于在L0层的数据不是全局有序的,所以会依次读取L0层的所有文件。RocksDB的L1至LN每层数据在当前Level内是全局排序的,所有L1至LN中的一个K只会在一个SST文件中,如果L0层未查询到数据,则会依次在L1至LN中每层只查询一个SST文件,直到查询到数据结束。 

图片

在前文中我们提到过,双流Join的Key是时间戳,Value存放的是当前时间戳的数据集合List,左右流数据进入到Join Operator会在对应State中做一个Get请求,再将当前RowDate放入List中,随着时间的推移,会存在大量的Get请求击穿RocksDB,我们标记其为ReadNull,与此同时上文中提到双流Join大State的RocksDB L3层的文件数是比较多的,所有的ReadNull请求均会从L0访问至L3结束,整体读耗时是比较高的。

另外这类Join的State是MapState<long,list>结构,会有两个地方调用Map的iterator。一个是对流数据Join当前State的RowData集合,另一个是由Timer State定时触发清理当前State的时间窗口外的数据,否则State会越来越大。而State中Map的iterator调用的是RocksDB的seek和next来查询数据,seek和next操作会将所有满足currentKey的数据遍历出来,同样会读到RocksDB的最底层Level的SST文件。

我们通过监控发现大State的seek、next、readnull等读RT耗时99线比较高,达到了两位数的毫秒级,这对实时流计算来说是不期望看到的结果。

图片

3.State优化

无论是读RT耗时变高,还是compaction导致CPU毛刺,都是因为State中的Value过大,导致SST的层级变多,扫描过多SST文件,读性能也就会有所下降。一条State记录,从L0到LN的过程,不仅会在跨层中被读写,而且在同层中,因其他数据compaction到当层,也会被多次读写,在高level的compaction中会发现磁盘的读写IO之和会达到400MB/s,读写中占比较大的数据是State的Value,如果Value在compaction中不被搬迁移动,那么可以大大降低IO和CPU毛刺,在做调研中发现了RocksDB社区的BlobDB[3]方案。

RocksDB的BlobDB方案来自Wisckey[4]的KV分离,当一对KV被flush落盘时,如果Value长度大于阈值,会将Value写入Blob文件中,SST文件中仅记录Value在Blob的index,在大Value场景下,使用KV分离后SST文件的总大小和层级将会大大的降低。

图片

Flink社区中RocksDB最高版本[5]在去年调研时仅为RocksDB对应的6.20版本,而此版本还无法通过Java调用开启BlobDB,在与内部分布式存储团队沟通后,决定使用他们已线上运行的7.8.3版本来构建Flink的RocksDB,从而来使用BlobDB特性,于是我们将Flink原有的CompactionFilter等实现合并到了7.8.3的release中。

经过适配并在大State中开启KV分离后,观察RocksDB日志发现SST的文件大小急剧下降,State Key也全聚集在了L0和L1这两层中。由于SST层级的降低,ReadNull请求访问到L1层就结束了,而seek和next请求在有数据的情况下,只需要额外访问一次对应的blob文件即可,若没有数据,对blob的IO读操作也可省略。最后的效果是ReadNull耗时全降到了百微妙左右,scan和next的RT 99线也降到了1毫秒左右。

图片

另外我们通过用户Case的双跑任务对比发现,开启KV分离的任务CPU毛刺有所弱化,CPU整体使用降低了50%。任务依旧存在少量的CPU毛刺高峰现象,通过RocksDB日志分析毛刺依旧来源于compaction,虽然SST的层级降低了,但是blob文件的GC还是存在,GC会将blob文件数据做搬迁工作,整合新文件和删除老blob文件,其工作会比较消耗磁盘IO和CPU资源,而blob的GC在现实中是无法关闭的,关闭blob GC会导致存储层文件只增不减。即使compaction依旧存在,我们也是享受到了升级RocksDB版本带来的优化的,高毛刺的频率相比老版本来说少了很多了。

图片

4.总结与展望

除了对大State做了KV分离外,小State的有一些任务会存在20min一次周期性毛刺现象,我们引用了分布式存储团队的InnerCompaction[6]补丁,KV长度均很小,L1层存储大小接近max_bytes_for_level_base,四次Checkpoint生成的4个L0层SST向L1层compaction会导致CPU毛刺现象产生,而InnerCompaction的优化原理是L0层的小文件在同层预先做compaction,避免频繁的与L1的数据做合并,防止IO放大。

去年下半年在内部给用户推进Flink State的KV分离功能后,线上所有大State任务的CPU使用率都减少了20-50%。未来可能有如下几个方面会继续推进优化。

a) 将内部高版本Flink中的RocksDB完成升级。

b) Flink做Checkpoint执行到RocksDB层的snapshot是比较轻的,和compaction不强关联的,后续可以考虑降低compaction的速率来达到进一步弱化CPU毛刺现象。

c) 目前内部的KV分离还需要使用一个参数控制开启,后期期望能默认全局开启,KV分离对Value长度大小的阈值也期望能自适应,无需用户感知。

参考:

[1]https://mp.weixin.qq.com/s/E23JO7YvzJrocbOIGO5X-Q

[2]https://blog.csdn.net/microGP/article/details/120416193

[3]https://github.com/facebook/rocksdb/wiki/BlobDB

[4]https://www.usenix.org/system/files/conference/fast16/fast16-papers-lu.pdf

[5]https://github.com/ververica/frocksdb

[6] https://github.com/bilibili/rocksdb

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/802242.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

opencv识别图形轮廓并计算面积以及在输出图片上显示中文字体

前言 opencv识别带有轮廓的图形,主要是常见数学图形如方形、圆形以及不规则图形,并计算其面积(像素统计)。 配置: 平台:windows 工具:visual studio code 语言:python 库:opencv、Pillow 一、图形识别与计算 opencv处理图形轮廓的步骤如下: 1、将图片转为灰度图:…

数据库(mysql)-基本查询语句(DQL)

查询语句 这边查询是给予一定表格,这边先做个解释 教师表包括(name(姓名),gender(性别),salary(工资),title(职位),subject_id(课程的编号),comm(奖金)) 学生表包括(姓名(name),gender(性别),job(职位),生日(birth)) 模版 SELECT 字段名 FROM 查询表 WHERE 查询语句 或与非…

k8s_入门_命令详解

命令详解 kubectl是官方的CLI命令行工具&#xff0c;用于与 apiserver进行通信&#xff0c;将用户在命令行输入的命令&#xff0c;组织并转化为 apiserver能识别的信息&#xff0c;进而实现管理k8s各种资源的一种有效途径 1. 帮助 2. 查看版本信息 3. 查看资源对象等 查看No…

Linux应用开发(3):Linux时间操作(time、mktime、localtime等)

1. 简述 在Linux系统中&#xff0c;时间操作函数是编程中经常使用的一部分&#xff0c;它们允许程序获取和设置系统时间&#xff0c;以及对时间进行各种处理。以下是一些常用的时间操作函数的详细介绍。 2. 时间操作 &#xff08;1&#xff09;time(): 获取1970年1月1日以来的…

Docker使用报错信息dial unix /var/run/docker.sock: connect: permission denied

报错信息dial unix /var/run/docker.sock: connect: permission denied masterlocalhost:~$ docker ps Got permission denied while trying to connect to the Docker daemon socket at unix:///var/run/docker.sock: Get http://%2Fvar%2Frun%2Fdocker.sock/v1.24/container…

爬虫入门教程(一)

爬虫入门教程 1.什么是爬虫 爬虫是一种自动获取网站数据的程序或脚本。它可以自动模拟人类访问网站,获取网页源代码,解析并提取出所需的数据。 爬虫的工作原理类似于搜索引擎的索引程序&#xff0c;它们会按照预定的规则和算法在互联网上不断地爬取网页&#xff0c;收集信息…

二维相位解包裹理论算法和软件【全文翻译-掩膜切割算法(4.4)】

4.4 掩膜切割算法 在上一节中,我们了解到质量引导路径跟踪算法可以解决一些相位解包问题,而在这些问题上,戈尔茨坦算法会因为分支切割的错位而失败。这是因为质量引导方法采用了更多的信息(质量图)来引导解包路径。在本节中,我们将这一想法与戈尔茨坦算法相结合,产生了…

k8s知识

k8s是用于容器编排和管理的&#xff0c;docker或者ctr是k8s的运行时&#xff0c;k8s通过容器运行时来启动容器&#xff0c;容器启动需要镜像&#xff0c;镜像可以用docker构建&#xff0c;dockerfile就是用于自定义如何构建镜像&#xff0c;所以上面那套流水线就是先用dockerfi…

iperf3使用记录

安装 大部分系统标配工具&#xff1a; yum install -y iperf3参数 常用的参数如下&#xff0c;其他参数请-h Server or Client:-p, --port # server port to listen on/connect to-f, --format [kmgtKMGT] format to report: Kbits, Mbits, Gbits, Tbits-i,…

【Android】java中如何判断设备是否在root状态

前言 客户需求&#xff0c;需要判断设备是否执在root状态。可以理解为是否执行了adb root 设置root状态&#xff0c;已经adb unroot设置unroot状态。 代码分析 分析adb deamon发现&#xff1a;在执行adb root 、adb unroot指令时&#xff0c;系统会更新service.adb.root 变量…

Linux|从 STDIN 读取 Awk 输入

简介 在之前关于 Awk 工具的系列文章中&#xff0c;主要探讨了如何从文件中读取数据。但如果你希望从标准输入&#xff08;STDIN&#xff09;中读取数据&#xff0c;又该如何操作呢&#xff1f; 在本文中&#xff0c;将介绍几个示例&#xff0c;展示如何使用 Awk 来过滤其他命令…

即插即用篇 | YOLOv8引入Haar小波下采样 | 一种简单而有效的语义分割下采样模块

本改进已集成到 YOLOv8-Magic 框架。 下采样操作如最大池化或步幅卷积在卷积神经网络(CNNs)中被广泛应用,用于聚合局部特征、扩大感受野并减少计算负担。然而,对于语义分割任务,对局部邻域的特征进行池化可能导致重要的空间信息丢失,这有助于逐像素预测。为了解决这个问题…

mysql 查询变量@i:=@i+1

学习完mysql的查询&#xff1a;基本查询&#xff0c;连接查询和子查询和mysql 正则表达式查询&#xff0c;接下来先学习下变量查询。 mysql中没有oracle序列号那一列。mysql可以使用查询变量的方式去处理。我们先了解下查询变量&#xff0c;后面应用起来就更清晰。 1&#xff0…

vue在鼠标光标指定位置插入文字

1、html <el-button type"primary" click"addScriptParameters(item.name)">{{ item.name }}</el-button> 2、js const addScriptParameters (name) > { if (name) { const text parameter.value.$refs.textarea if (document.selection…

“弱智吧”才是人类面对AI的最后一道堡垒

在 AI 的研究领域中&#xff0c;语言模型的训练数据选择一直是一个关键问题。传统的智慧告诉我们&#xff0c;高质量的数据集应该是由专家精心挑选和校对的文本组成&#xff0c;以确保模型学习到的语言是规范、准确、有文化内涵的。 然而&#xff0c;最近的一项研究颠覆了这一观…

【Java】Java中类的初始化顺序(静态方法,静态块,非静态块,最后有流程图)

&#x1f4dd;个人主页&#xff1a;哈__ 期待您的关注 在日常使用Java的时候&#xff0c;我们都接触过new这个关键字&#xff0c;那你是否知道在我们的对象真正创建出来之前都做了哪些事情呢&#xff1f; 实际上要去判断一个类的初始化的顺序&#xff0c;需要分一下情况&…

String 和 StringBuffer、StringBuilder 的区别是什么

不可变性&#xff1a; String类是不可变的&#xff0c;即一旦创建了String对象&#xff0c;就不能修改它的值。每次对String对象的操作都会创建一个新的String对象&#xff0c;导致内存开销较大。 StringBuffer和StringBuilder类是可变的&#xff0c;它们允许修改已有的字符串…

Bean的默认名称

1.使用spring的注解 Component、Repository、Service、Controller 等注解去把一个类配置为bean时&#xff0c;如果不指定bean的名称&#xff0c;那么bean的名称的默认规则是&#xff1a; ①类名的首字母小写&#xff0c;例如&#xff1a;类名称 UserDao &#xff0c;那么默认的…

hadoop中hdfs的fsimage文件与edits文件

hadoop中hdfs的fsimage文件与edits文件的作用 首先&#xff0c;我们抛出fsimage和edits文件的功能描述。 Fsimage文件: HDFS文件系统元数据的一个永久性的检查点&#xff0c;其中包含HDFS文件系统的 所有目录和文件inode的序列化信息。 Edits文件:存放HDFS文件系统的所有更…

C++笔记:STL容器库的使用

前置&#xff1a; 对于stl容器库&#xff0c;我只做了一些常用的笔记&#xff0c;关于更详细的使用可以参考:https://cppreference.com/https://cppreference.com/ 一.string--字符串 对于C中string字符串会比C语言的字符数组使用起来会顺手许多。 命名空间&#xff1a;std 关于…