Flink系列之:State Time-To-Live (TTL)

Flink系列之:State Time-To-Live TTL

  • 一、TTL
  • 二、TTL实现代码
  • 三、过期状态的清理

一、TTL

  • Flink的TTL(Time-To-Live)是一种数据过期策略,用于指定数据在流处理中的存活时间。TTL可以应用于Flink中的状态或事件时间窗口,以控制数据的保留时间。
  • 当应用程序使用状态进行计算时,状态可能会消耗存储资源。TTL可以用来设置状态的最大生存时间,超过该时间的状态将被自动清理,以释放存储资源。这可以帮助应对状态数据的增长和资源限制问题。
  • 对于事件时间窗口,TTL可以用来指定窗口的持续时间。当到达窗口结束时间后,该窗口的结果将被输出,并且窗口中的所有数据将被清理。这可以确保计算结果及时输出,并释放计算资源。
  • 通过设置适当的TTL值,可以控制数据的保留时间,避免资源浪费和计算延迟。TTL的使用可以根据具体应用场景和需求进行配置,以实现数据管理的灵活性和效率。
  • 可以将生存时间 (TTL) 分配给任何类型的键控状态。如果配置了 TTL 并且状态值已过期,则将尽最大努力清除存储的值,这将在下面更详细地讨论。
  • 所有状态集合类型都支持每条目 TTL。这意味着列表元素和映射条目独立过期。
  • 为了使用状态 TTL,必须首先构建一个 StateTtlConfig 配置对象。然后可以通过传递配置在任何状态描述符中启用 TTL 功能:

二、TTL实现代码

java代码:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

这段代码使用Apache Flink提供的StateTtlConfig来设置状态的TTL(Time-To-Live)配置。

  • 首先,导入必要的包org.apache.flink.api.common.state.StateTtlConfig和org.apache.flink.api.common.state.ValueStateDescriptor。
  • 然后,创建StateTtlConfig对象ttlConfig,并使用StateTtlConfig.newBuilder(Time.seconds(1))来指定TTL的时间长度为1秒。这意味着状态数据的最大生存时间为1秒。
  • 接下来,调用ttlConfig的setUpdateType方法,将UpdateType设置为StateTtlConfig.UpdateType.OnCreateAndWrite。这表示在创建和写入状态时更新TTL。
  • 然后,调用ttlConfig的setStateVisibility方法,将StateVisibility设置为StateTtlConfig.StateVisibility.NeverReturnExpired。这表示状态在过期后永远不会返回,也就是被清理后不会再被读取。
  • 最后,使用ValueStateDescriptor创建一个名为"text state"的状态描述符stateDescriptor,并调用stateDescriptor的enableTimeToLive方法,将ttlConfig传递给它。这将启用状态的TTL配置。
  • 通过配置TTL,可以控制状态的生存时间,以及何时更新和清理状态。这有助于管理状态数据的存储和性能。

Scala代码:

import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Timeval ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).buildval stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
stateDescriptor.enableTimeToLive(ttlConfig)

Python代码:

from pyflink.common.time import Time
from pyflink.common.typeinfo import Types
from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfigttl_config = StateTtlConfig \.new_builder(Time.seconds(1)) \.set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) \.set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) \.build()state_descriptor = ValueStateDescriptor("text state", Types.STRING())
state_descriptor.enable_time_to_live(ttl_config)

该配置有几个选项需要考虑:

  • newBuilder方法的第一个参数是必需的,它是生存时间值。
  • 更新类型配置何时刷新状态 TTL(默认为 OnCreateAndWrite):
    • StateTtlConfig.UpdateType.OnCreateAndWrite - 仅在创建和写入访问时
    • StateTtlConfig.UpdateType.OnReadAndWrite - 也用于读取访问
    • (注:如果同时将状态可见性设置为StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp,状态读缓存将被禁用,这会导致PyFlink中的一些性能损失)

状态可见性配置如果尚未清除过期值,是否在读取访问时返回过期值(默认为 NeverReturnExpired):

  • StateTtlConfig.StateVisibility.NeverReturnExpired - 永远不会返回过期值

(注:状态读/写缓存将被禁用,这会导致 PyFlink 中的一些性能损失)

  • StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 如果仍然可用则返回

在 NeverReturnExpired 的情况下,过期状态的行为就好像它不再存在一样,即使它仍然需要被删除。该选项对于数据必须在 TTL 之后严格无法进行读取访问的用例很有用,例如处理隐私敏感数据的应用程序。

另一个选项 ReturnExpiredIfNotCleanedUp 允许在清理之前返回过期状态。

笔记:

  • 状态后端存储上次修改的时间戳以及用户值,这意味着启用此功能会增加状态存储的消耗。堆状态后端存储一个附加的 Java 对象,其中包含对用户状态对象的引用和内存中的原始 long 值。 RocksDB 状态后端为每个存储值、列表条目或映射条目添加 8 个字节。
  • 当前仅支持涉及处理时间的 TTL。
  • 尝试使用启用 TTL 的描述符恢复之前未配置 TTL 的状态,反之亦然,将导致兼容性失败和 StateMigrationException。
  • TTL 配置不是检查点或保存点的一部分,而是 Flink 在当前运行的作业中处理它的一种方式。
  • 不建议通过将 ttl 从短值调整为长值来恢复检查点状态,这可能会导致潜在的数据错误。
  • 目前,仅当用户值序列化程序可以处理空值时,具有 TTL 的映射状态才支持空用户值。如果序列化器不支持 null 值,则可以使用 NullableSerializer 对其进行包装,但需要在序列化形式中增加一个额外字节。
  • 启用 TTL 的配置后,StateDescriptor 中的 defaultValue 实际上已被弃用,将不再生效。这样做的目的是使语义更加清晰,并让用户在状态内容为空或过期时手动管理默认值。

三、过期状态的清理

默认情况下,过期值会在读取时显式删除,例如 ValueState#value,并在配置的状态后端支持的情况下定期在后台进行垃圾收集。可以在 StateTtlConfig 中禁用后台清理:

Java代码:

import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).disableCleanupInBackground().build();

Scala代码:

import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).disableCleanupInBackground.build

Python代码:

from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfigttl_config = StateTtlConfig \.new_builder(Time.seconds(1)) \.cleanup_in_rocksdb_compact_filter(1000, Time.hours(1)) \.build()

为了对后台的一些特殊清理进行更细粒度的控制,您可以如下所述单独配置它。目前,堆状态后端依赖于增量清理,RocksDB 后端使用压缩过滤器进行后台清理。

完整快照中的清理

此外,您可以在拍摄完整状态快照时激活清理,这将减少其大小。在当前实现下,本地状态不会被清除,但在从以前的快照恢复时,它不会包括删除的过期状态。可以在StateTtlConfig中配置:

Java代码:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupFullSnapshot().build();

Scala代码:

import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.time.Timeval ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupFullSnapshot.build

Python代码:

from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfigttl_config = StateTtlConfig \.new_builder(Time.seconds(1)) \.cleanup_full_snapshot() \.build()

此选项不适用于 RocksDB 状态后端中的增量检查点。

对于现有作业,可以在 StateTtlConfig 中随时激活或停用此清理策略,例如从保存点重新启动后。

增量清理

另一种选择是逐步触发某些状态条目的清理。触发器可以是来自每个状态访问或/和每个记录处理的回调。如果此清理策略对于某些状态是活动的,则存储后端会在其所有条目上为此状态保留一个惰性全局迭代器。每次触发增量清理时,迭代器都会前进。检查遍历的状态条目并清除过期的状态条目。

该功能可以在 StateTtlConfig 中配置:

Java代码:

import org.apache.flink.api.common.state.StateTtlConfig;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupIncrementally(10, true).build();

Scala代码:

import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupIncrementally(10, true).build

Python:

from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfigttl_config = StateTtlConfig \.new_builder(Time.seconds(1)) \.cleanup_incrementally(10, True) \.build()

该策略有两个参数。第一个是每次清理触发的检查状态条目数。它总是在每次状态访问时触发。第二个参数定义是否在每次记录处理时额外触发清理。堆后端的默认后台清理会检查 5 个条目,而不会针对每个记录处理进行清理。

笔记:

  • 如果没有对状态进行访问或没有处理任何记录,则过期状态将持续存在。
  • 增量清理所花费的时间会增加记录处理延迟。
  • 目前增量清理仅针对堆状态后端实现。对 RocksDB 设置它不会有任何效果。
  • 如果堆状态后端与同步快照一起使用,则全局迭代器在迭代时会保留所有键的副本,因为其特定实现不支持并发修改。启用此功能将增加内存消耗。异步快照则不存在这个问题。
  • 对于现有作业,可以在 StateTtlConfig 中随时激活或停用此清理策略,例如从保存点重新启动后。

RocksDB 压缩期间的清理

如果使用 RocksDB 状态后端,将调用 Flink 特定的压缩过滤器进行后台清理。 RocksDB 定期运行异步压缩来合并状态更新并减少存储。 Flink 压缩过滤器使用 TTL 检查状态条目的过期时间戳并排除过期值。

该功能可以在 StateTtlConfig 中配置:

Java代码:

import org.apache.flink.api.common.state.StateTtlConfig;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupInRocksdbCompactFilter(1000, Time.hours(1)).build();

Scala代码:

import org.apache.flink.api.common.state.StateTtlConfigval ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupInRocksdbCompactFilter(1000, Time.hours(1)).build

Python代码:

from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfigttl_config = StateTtlConfig \.new_builder(Time.seconds(1)) \.cleanup_in_rocksdb_compact_filter(1000, Time.hours(1)) \.build()

RocksDB 压缩过滤器每次处理一定数量的状态条目后都会从 Flink 查询当前时间戳,用于检查过期情况。您可以更改它并将自定义值传递给 StateTtlConfig.newBuilder(…).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries) 方法。更频繁地更新时间戳可以提高清理速度,但会降低压缩性能,因为它使用来自本机代码的 JNI 调用。 RocksDB 后端的默认后台清理会在每次处理 1000 个条目时查询当前时间戳。

定期压缩可以加快过期状态条目的清理速度,特别是对于很少访问的状态条目。早于该值的文件将被拾取进行压缩,并重新写入到与之前相同的级别。它确保文件定期通过压缩过滤器。您可以更改它并将自定义值传递给 StateTtlConfig.newBuilder(…).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries, Time periodicalCompactionTime) 方法。定期压缩秒数的默认值为 30 天。您可以将其设置为 0 以关闭定期压缩,或设置一个较小的值以加速过期状态条目清理,但它会触发更多压缩。

您可以通过激活 FlinkCompactionFilter 的调试级别来从 RocksDB 过滤器的本机代码激活调试日志:

log4j.logger.org.rocksdb.FlinkCompactionFilter=DEBUG

笔记:

  • 在压缩过程中调用 TTL 过滤器会减慢速度。 TTL 过滤器必须解析上次访问的时间戳,并检查正在压缩的每个键的每个存储状态条目的过期时间。如果是集合状态类型(列表或映射),还会针对每个存储的元素调用检查。
  • 如果此功能与包含非固定字节长度元素的列表状态一起使用,则本机 TTL 过滤器必须在每个状态条目(其中至少第一个元素已过期)额外通过 JNI 调用该元素的 Flink java 类型序列化器确定下一个未过期元素的偏移量。
  • 对于现有作业,可以在 StateTtlConfig 中随时激活或停用此清理策略,例如从保存点重新启动后。
  • 周期性压缩仅在启用 TTL 时才起作用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/228342.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

FME之FeatureReader转换器按表格内容读取矢量数据

问题&#xff1a;平时会遇到只用某个大数据里某小部分数据参与下一步数据处理&#xff0c;此时我们会用到FeatureReader转换器&#xff0c;一般是通过空间关系&#xff08;相交、包含&#xff09;来读取相应涉及的图斑矢量&#xff0c;但就有一个问题&#xff0c;加入你的启动器…

太强了!利用 Python 连接 ES 查询索引某个字段命中数的脚本!

作者&#xff1a;JackTian 来源&#xff1a;公众号「杰哥的IT之旅」 ID&#xff1a;Jake_Internet 链接&#xff1a;太强了&#xff01;利用 Python 连接 ES 查询索引某个字段命中数的脚本&#xff01; 当我们在工作中&#xff0c;如果频繁查询 Elasticsearch 某个索引中的某个…

关于laravel的逻辑删除deleted_at与mysql唯一索引unique

使用mysql组合key去设置唯一索引unique时&#xff0c;可以避免因各种原因导致的重复脏数据问题&#xff0c;但由于我们绝大多数表都不建议采取物理删除的方式去对待可爱的数据们&#xff0c;因此我们常常使用逻辑删除&#xff08;软删除&#xff09;的方式去对错误数据或无效数…

计算机视觉(P2)-计算机视觉任务和应用

一、说明 在本文中&#xff0c;我们将探讨主要的计算机视觉任务以及每个任务最流行的应用程序。 二、图像内容分类 2.1. 图像分类 图像分类是计算机视觉领域的主要任务之一[1]。在该任务中&#xff0c;经过训练的模型根据预定义的类集为图像分配特定的类。下图是著名的CIFAR…

格式化Echarts的X轴显示,设置显示间隔

业务需求&#xff1a;x轴间隔4个显示&#xff0c;并且末尾显示23时 x轴为写死的0时-23时&#xff0c;使用Array.from data: Array.from({ length: 24 }).map((_, i) > ${i}时) 需要在axisLabel 里使用 interval: 0, // 强制显示所有刻度标签&#xff0c;然后通过 formatter …

分面中添加不同表格

简介 关于分面的推文&#xff0c;小编根据实际科研需求&#xff0c;已经分享了很多技巧。例如&#xff1a; 分面一页多图 基于分面的面积图绘制 分面中的细节调整汇总 分面中添加不同的直线 基于分面的折线图绘制 最近遇到了另一个需求&#xff1a;在分面中添加不同的表…

k8s 安装firewalld导致的网络疑难问题处理

场景 ubuntu 操作系统,部署了k8s集群,n 台 机器,某些机器之间 telnet ip 10250不通。 ufw 是关闭的,然后抓包会看到如下错误 04:43:09.154362 IP 192.168.1.3.56608 > 192.168.1.183.8000: Flags [S], seq 3664350430, win 64240, options [mss 1460,sackOK,TS val 281…

计算机网络(四)

九、网络安全 &#xff08;一&#xff09;什么是网络安全&#xff1f; A、网络安全状况 分布式反射攻击逐渐成为拒绝攻击的重要形式 涉及重要行业和政府部门的高危漏洞事件增多。 基础应用和通用软硬件漏洞风险凸显&#xff08;“心脏出血”&#xff0c;“破壳”等&#x…

Content-Type是什么

目录 Content-Type是什么 获取方式 设置方式 常见类型 application/x-www-form-urlencoded multipart/form-data application/json text/xml text/html text/plain Content-Type是什么 Content-Type出现在请求标头和响应标头中&#xff0c;意思是内容类型&#xff0…

LOF基金跟股票一样吗?

LOF基金&#xff0c;全称为"上市型开放式基金"&#xff0c;是一种可以在上海证券交易所认购、申购、赎回及交易的开放式证券投资基金。投资者可以通过上海证券交易所场内证券经营机构或场外基金销售机构进行认购、申购和赎回基金份额。 LOF基金的特点是既可以像股票…

论文阅读——GroupViT

GroupViT: Semantic Segmentation Emerges from Text Supervision 一、思想 把Transformer层分为多个组阶段grouping stages&#xff0c;每个stage通过自注意力机制学习一组tokens&#xff0c;然后使用学习到的组tokens通过分组模块Grouping Block融合相似的图片tokens。通过这…

2.5【渲染】Blitting

一,Blit的概念 Blit是一种计算机图形学中常用的数据操作,基础原理是使多个位图通过布尔函数(boolean function)组合成一个新位图。在U3D中,常说的Blit其实是渲染后期的一个概念,它将摄像机渲染好的一个图的所有像素点,通过各种形式的运算,然后重新绘制到屏幕。这种达到…

【docker 】基于Dockerfile创建镜像

Dockerfile文档 Dockerfile文档地址 Dockerfile 是一个用来构建镜像的文本文件&#xff0c;文本内容包含了一条条构建镜像所需的指令和说明。 DockerFile 可以说是一种可以被 Docker 程序解释的脚本&#xff0c;DockerFile 是由一条条的命令组成的&#xff0c;每条命令对应 …

LCR 181. 字符串中的单词反转

解题思路&#xff1a; class Solution {public String reverseMessage(String message) {message message.trim(); // 删除首尾空格int j message.length() - 1, i j;StringBuilder res new StringBuilder();while (i > 0) {while (i >…

极智一周 | 两系列汇总、MI300X、H100、特供芯片、GPT-4、火灾检测、酷睿Ultra And so on

欢迎关注我的公众号 [极智视界]&#xff0c;获取我的更多技术分享 大家好&#xff0c;我是极智视界&#xff0c;带来本周的 [极智一周]&#xff0c;关键词&#xff1a;两系列汇总、MI300X、H100、特供芯片、GPT-4、火灾检测、酷睿Ultra And so on。 邀您加入我的知识星球「极智…

数据分析为何要学统计学(2)——如何估计总体概率分布

明确总体的概率分布类型及参数是进行数据分析的基础&#xff0c;这项工作称为分布推断与参数估计。在总体分布及其参数不明确的情况下&#xff0c;我们可以利用手头掌握的样本来完成这项工作。具体过程由以下步骤组成。 第一步&#xff0c;样本统计特性直观估计 我们采用Seab…

在ViewPager下面加圆点指示(使用selector方式)

前面讲了如何使用ViewPager来做多个可滑动的页面。今天在页面的下面加上一排小圆点&#xff0c;用于指示当前在第几页。效果如下&#xff08;请忽略颜色和图案&#xff09;&#xff1a; 一、产生一个小圆点的视图 1、在drawable下产生一个选中和不选中颜色不同的小圆点形状&am…

1- Electron 创建项目、初始化项目

Electron官网 Build cross-platform desktop apps with JavaScript, HTML, and CSS | Electron Electron 初始化 初始化项目 - 构造package.json npm init -y 安装Electron模块包 npm i electron -D // 注意&#xff01;如果报错查看node包是否太高 配置启动脚本 {&quo…

SQL、Jdbc、JdbcTemplate、Mybatics

数据库&#xff1a;查询&#xff08;show、select&#xff09;、创建&#xff08;create)、使用(use)、删除(drop)数据库 表&#xff1a;创建&#xff08;【字段】约束、数据类型&#xff09;、查询、修改&#xff08;alter *add&#xff09;、删除 DML&#xff1a;增加(inse…

MySql的增、删、改、查(MySql数据库学习——五)

增&#xff08;数据添加/插入数据&#xff09; 使用 INSERT INTO SQL 语句来插入数据。我们可以通过 mysql> 命令提示窗口中向数据表中插入数据&#xff0c;或者 通过PHP 脚本来插入数据。 sql语句&#xff1a; INSERT INTO table_name ( field1, field2,...fieldN ) …