flink对状态ttl进行单元测试

背景

在处理键值分区状态时,使用ttl设置过期时间是我们经常使用的,但是任何代码的修改都需要首先进行单元测试,本文就使用单元测试来验证一下状态ttl的设置是否正确

测试状态ttl超时的单元测试

首先看一下处理函数:

// 处理函数
public class MyStateProcessFunction extends KeyedProcessFunction<String, String, String> {// 键值分区状态ValueState<String> previousInput;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<String>("previousInput", Types.STRING);// 状态ttl超时时间设置StateTtlConfig ttlConfig =StateTtlConfig.newBuilder(Time.minutes(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)// check 10 keys for every state access.cleanupIncrementally(10, false).build();stateDescriptor.enableTimeToLive(ttlConfig);previousInput = getRuntimeContext().getState(stateDescriptor);}@Overridepublic void processElement(String in, Context context, Collector<String> collector) throws Exception {context.timerService().registerProcessingTimeTimer(100);String out = (Objects.nonNull(previousInput.value()) ? previousInput.value() : "") + in;collector.collect(out);if (!in.contains("NotUpdate")) {// 为了模仿有访问状态,但是不更新状态,正常情况下业务逻辑是访问其他key组的其它state,而一直没有访问的key的状态会在超时时间到之后被清理掉previousInput.update(in);}}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {if (Objects.nonNull(previousInput.value())) {out.collect(String.format("timer trigger %s", previousInput.value()));} else {out.collect(String.format("timer trigger state clear", previousInput.value()));}}}

单元测试代码:

/*** 测试状态处理函数,包含状态的ttl配置,以及ontimer方法**/
@Test
public void testKeyedStateProcessFunction() throws Exception {MyStateProcessFunction myStateProcessFunction = new MyStateProcessFunction();OneInputStreamOperatorTestHarness<String, String> testHarness =ProcessFunctionTestHarnesses.forKeyedProcessFunction(myStateProcessFunction, x -> "1", Types.STRING);testHarness.open();testHarness.processElement("hello", 10);// 注册了一个定时器,定时器100后过期Assert.assertEquals(1, testHarness.numProcessingTimeTimers());// 测试输出Assert.assertEquals(Lists.newArrayList("hello"), testHarness.extractOutputValues());ValueState<String> previousInput = myStateProcessFunction.getRuntimeContext().getState(new ValueStateDescriptor<>("previousInput", Types.STRING));// 查看下状态应该已经被设置Assert.assertEquals("hello", previousInput.value());testHarness.processElement("world", 10);// 再次测试输出Assert.assertEquals(Lists.newArrayList("hello", "helloworld"), testHarness.extractOutputValues());// 再次查看下状态应该已经被设置Assert.assertEquals("world", previousInput.value());// 设置时间为1分钟,让状态超时testHarness.setStateTtlProcessingTime(Time.minutes(1).toMilliseconds());// 触发下状态访问,这样flink就会清理,正常生产中不需要这一步,访问状态本来就一直在进行中,只是可能是其他key分组的状态testHarness.processElement("NotUpdate1", System.currentTimeMillis());// 查看下状态应该已经被清理Assert.assertNull(previousInput.value());// 设置让定时器过期,顺带确认下状态已经被清理testHarness.setProcessingTime(100);// 测试输出(包含两个输入+一个定时器的输出)Assert.assertEquals(Lists.newArrayList("hello", "helloworld", "NotUpdate1", "timer trigger state clear"),testHarness.extractOutputValues());testHarness.close();
}

测试代码中已经包含了详细的注解,我们实现自己的ttl单元测试时可以参考下

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/140928.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Hosts File Editor 实用工具

我一般手工编辑hosts文件&#xff0c;我想给hosts文件加一个开关&#xff0c;本想自己实现&#xff0c;但是忽然发现微软已经提供了官方的解决方案&#xff0c;感觉有能人。 对文件的行的修改被抽象成了一个开关。腻害&#xff01;&#xff01;&#xff01;

SpringBoot3-快速入门

1.前置知识 Java17Spring、SpringMVC、MyBatisMaven、IDEA\ 2. 环境要求 环境&工具 版本&#xff08;or later&#xff09; SpringBoot 3.0.5 IDEA 2021.2.1 Java 17 Maven 3.5 Tomcat 10.0 Servlet 5.0 GraalVM Community 22.3 Native Build Tools 0.9…

◢Django 自写分页与使用

目录 1、设置分页样式,并展示到浏览器 2、模拟页码 3、生成分页 4、数据显示 5、上一页下一页 6、数据库的数据分页 7、封装分页 8、使用封装好的分页 建立好app后&#xff0c;设置路径path(in2/,views.in2)&#xff0c;视图def in2(request): &#xff0c;HTML: in2.html…

RK3568笔记五:基于Yolov5的训练及部署

若该文为原创文章&#xff0c;转载请注明原文出处。 一. 部署概述 环境&#xff1a;Ubuntu20.04、python3.8 芯片&#xff1a;RK3568 芯片系统&#xff1a;buildroot 开发板&#xff1a;ATK-DLRK3568 开发主要参考文档&#xff1a;《Rockchip_Quick_Start_RKNN_Toolkit2_C…

开源知识库软件xwiki在Windows下的安装

文章目录 开源知识库软件-xwiki在windows上的部署0、参考文档1、前置环境准备1.1、Windows版本及系统配置1.2、JDK11安装1.3、Tomcat9安装1.4、MySQL5.7数据库的安装 2、xwiki安装3、配置3.1、修改配置支持对文档内容进行搜索 4、问题解决4.1、附件无法上传问题4.1、附件无法下…

FreeRTOS知识梳理

一、RTOS:Real time operating system,中文意思为 实时操作系统&#xff0c;它是一类操作系统&#xff0c;比如uc/OS、FreeRTOS、RTX、RT-Thread 这些都是实时操作系统。 二、移植FreeRTOS到STM32F103C8T6上 interface选择CMSIS_V1,RCC选择Crystal Ceramic Resonator 。 …

5. 深度学习——正则化

机器学习面试题汇总与解析——正则化 本章讲解知识点 什么是正则化为什么要使用正则化?详细讲解正则化本专栏适合于Python已经入门的学生或人士,有一定的编程基础。本专栏适合于算法工程师、机器学习、图像处理求职的学生或人士。本专栏针对面试题答案进行了优化,尽量做到好…

C#中匿名类的声明及使用

C#中声明方法或类时&#xff0c;可以不指定的类或方法的名字&#xff0c;也就是匿名类和匿名的方法。以下主要介绍C#中匿名类的声明及使用。 1、匿名类的使用 匿名类是C# 3.0/.NET 3.x中新增特性&#xff0c;在匿名类的语法中并没有为其命名&#xff0c;而是直接使用new { }方…

无感刷新token

无感刷新 无感刷新Token技术是一种用于实现持久登录体验的关键技术&#xff0c;它通过在用户登录后自动刷新Token&#xff0c;以延长用户的登录状态&#xff0c;避免频繁要求用户重新登录。 实现 使用access_token&#xff08;短效token&#xff09;和refresh_token&#xf…

CSS精灵图:提高网站性能的秘密武器

在网站开发中&#xff0c;页面加载速度是一个非常重要的指标。而CSS精灵图正是一种可以帮助提高网站性能的技术。它可以将多个小图标合并成一个大图&#xff0c;从而减少HTTP请求次数&#xff0c;提高页面加载速度。本篇博文将为你详细介绍CSS精灵图的概念、优势以及实现方式。…

海康威视(iVMS)综合安防系统任意文件上传漏洞复现 [附POC]

文章目录 海康威视&#xff08;iVMS&#xff09;综合安防系统任意文件上传漏洞复现 [附POC]0x01 前言0x02 漏洞描述0x03 影响版本0x04 漏洞环境0x05 漏洞复现1.访问漏洞环境2.构造POC3.复现 0x06 修复建议 海康威视&#xff08;iVMS&#xff09;综合安防系统任意文件上传漏洞复…

[PyTorch][chapter 62][强化学习-基本概念]

前言&#xff1a; 目录&#xff1a; 强化学习概念 马尔科夫决策 Bellman 方程 格子世界例子 一 强化学习 强化学习 必须在尝试之后&#xff0c;才能发现哪些行为会导致奖励的最大化。 当前的行为可能不仅仅会影响即时奖赏&#xff0c;还有影响下一步奖赏和所有奖赏 强…

使用Inis搭配内网穿透实现Ubuntu上快速搭建博客网站远程访问

文章目录 前言1. Inis博客网站搭建1.1. Inis博客网站下载和安装1.2 Inis博客网站测试1.3 cpolar的安装和注册 2. 本地网页发布2.1 Cpolar临时数据隧道2.2 Cpolar稳定隧道&#xff08;云端设置&#xff09;2.3.Cpolar稳定隧道&#xff08;本地设置&#xff09; 3. 公网访问测试总…

Spring bean标签

目录 Spring bean标签1.了解Spring Xml配置文件2.bean标签的Attrbute3.bean的子标签扩展FactoryBean Spring bean标签 在创建IOC容器的时候&#xff0c;是如何把配置文件解析成我们的BeanDefinition。本文针对其<bean/>标签中的属性及其子标签进行说明。 1.了解Spring Xm…

ElasticSearch搜索详细讲解与操作

全文检索基础 全文检索流程 流程&#xff1a; 创建索引 返回结果 查询索引 原始文档 创建索引 索引库 查询索引 创建索引&#xff1a; 获取文档 构建文档对象 分析文档分词 创建索引 查询索引&#xff1a; 用户查询结构 创建查询 执行查询 渲染结果 相关概念 索引库 索引库就…

在 uniapp 中 一键转换单位 (px 转 rpx)

在 uniapp 中 一键转换单位 px 转 rpx Uni-app 官方转换位置利用【px2rpx】插件Ctrl S一键全部转换下载插件修改插件 Uni-app 官方转换位置 首先在App.vue中输入这个&#xff1a; uni.getSystemInfo({success(res) {console.log("屏幕宽度", res.screenWidth) //屏…

酷柚易汛ERP - 商品库存余额表操作指南

1、应用场景 商品库存余额表用于查询商品在各仓库的实际结存量、单位成本以及成本等明细。 2、主要操作 打开【仓库】-【商品库存余额表】&#xff0c;可筛选仓库、商品、商品类别&#xff0c;导出/打印等操作见【销货单】不再赘述。 3、分享操作 库存余额分享&#xff0c;…

CCLink转Modbus TCP网关_MODBUS网口设置

兴达易控CCLink转Modbus TCP网关是一种用于连接CCLink网络和Modbus TCP网络的设备。它提供了简单易用的MODBUS网口设置&#xff0c;可以帮助用户轻松地配置和管理网络连接 1 、网关做为MODBUS主站 &#xff08;1&#xff09;将电脑用网线连接至网关的P3网口上。 &#xff08;…

计算机网络(一)

一、什么是计算机网络、计算机协议&#xff1f; 计算机网络就是由计算机作为收发端&#xff0c;不同计算机相互连接的网络&#xff0c;包括互联网&#xff08;Internet&#xff09;&#xff0c;公司或者家用网络&#xff08;intranet&#xff09;等等&#xff1b;其中Internet…

【C语言 | 指针】C指针详解(经典,非常详细)

&#x1f601;博客主页&#x1f601;&#xff1a;&#x1f680;https://blog.csdn.net/wkd_007&#x1f680; &#x1f911;博客内容&#x1f911;&#xff1a;&#x1f36d;嵌入式开发、Linux、C语言、C、数据结构、音视频&#x1f36d; &#x1f923;本文内容&#x1f923;&a…