大数据学习(46) - Flink按键分区处理函数

&&大数据学习&&

🔥系列专栏: 👑哲学语录: 承认自己的无知,乃是开启智慧的大门
💖如果觉得博主的文章还不错的话,请点赞👍+收藏⭐️+留言📝支持一下博主哦🤞


之前提到,只有在KeyedStream中才支持使用TimerService设置定时器的操作。所以一般情况下,我们都是先做了keyBy分区之后,再去定义处理操作;代码中更加常见的处理函数是KeyedProcessFunction。

1 定时器(Timer)和定时服务(TimerService

在.onTimer()方法中可以实现定时处理的逻辑,而它能触发的前提,就是之前曾经注册过定时器、并且现在已经到了触发时间。注册定时器的功能,是通过上下文中提供的“定时服务”来实现的。

定时服务与当前运行的环境有关。前面已经介绍过,ProcessFunction的上下文(Context)中提供了.timerService()方法,可以直接返回一个TimerService对象。TimerService是Flink关于时间和定时器的基础服务接口,包含以下六个方法:

// 获取当前的处理时间long currentProcessingTime();// 获取当前的水位线(事件时间)long currentWatermark();// 注册处理时间定时器,当处理时间超过time时触发void registerProcessingTimeTimer(long time);// 注册事件时间定时器,当水位线超过time时触发void registerEventTimeTimer(long time);// 删除触发时间为time的处理时间定时器void deleteProcessingTimeTimer(long time);// 删除触发时间为time的处理时间定时器void deleteEventTimeTimer(long time);

六个方法可以分成两大类:基于处理时间和基于事件时间。而对应的操作主要有三个:获取当前时间,注册定时器,以及删除定时器。需要注意,尽管处理函数中都可以直接访问TimerService,不过只有基于KeyedStream的处理函数,才能去调用注册和删除定时器的方法;未作按键分区的DataStream不支持定时器操作,只能获取当前时间。

TimerService会以键(key)和时间戳为标准,对定时器进行去重;也就是说对于每个key和时间戳,最多只有一个定时器,如果注册了多次,onTimer()方法也将只被调用一次

2 KeyedProcessFunction案例

基于keyBy之后的KeyedStream,直接调用.process()方法,这时需要传入的参数就是KeyedProcessFunction的实现类。

stream.keyBy( t -> t.f0 ).process(new MyKeyedProcessFunction())

类似地,KeyedProcessFunction也是继承自AbstractRichFunction的一个抽象类,与ProcessFunction的定义几乎完全一样,区别只是在于类型参数多了一个K,这是当前按键分区的key的类型。同样地,我们必须实现一个.processElement()抽象方法,用来处理流中的每一个数据;另外还有一个非抽象方法.onTimer(),用来定义定时器触发时的回调操作。

代码如下:

public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) -> element.getTs() * 1000L));KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// TODO Process:keyedSingleOutputStreamOperator<String> process = sensorKS.process(new KeyedProcessFunction<String, WaterSensor, String>() {/*** 来一条数据调用一次* @param value* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {//获取当前数据的keyString currentKey = ctx.getCurrentKey();// TODO 1.定时器注册TimerService timerService = ctx.timerService();// 1、事件时间的案例Long currentEventTime = ctx.timestamp(); // 数据中提取出来的事件时间timerService.registerEventTimeTimer(5000L);System.out.println("当前key=" + currentKey + ",当前时间=" + currentEventTime + ",注册了一个5s的定时器");// 2、处理时间的案例//                        long currentTs = timerService.currentProcessingTime();//                        timerService.registerProcessingTimeTimer(currentTs + 5000L);//                        System.out.println("当前key=" + currentKey + ",当前时间=" + currentTs + ",注册了一个5s后的定时器");// 3、获取 process的 当前watermark//                        long currentWatermark = timerService.currentWatermark();//                        System.out.println("当前数据=" + value + ",当前watermark=" + currentWatermark);// 注册定时器: 处理时间、事件时间//                        timerService.registerProcessingTimeTimer();//                        timerService.registerEventTimeTimer();// 删除定时器: 处理时间、事件时间//                        timerService.deleteEventTimeTimer();//                        timerService.deleteProcessingTimeTimer();// 获取当前时间进展: 处理时间-当前系统时间,  事件时间-当前watermark//                        long currentTs = timerService.currentProcessingTime();//                        long wm = timerService.currentWatermark();}/*** TODO 2.时间进展到定时器注册的时间,调用该方法* @param timestamp 当前时间进展,就是定时器被触发时的时间* @param ctx       上下文* @param out       采集器* @throws Exception*/@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);String currentKey = ctx.getCurrentKey();System.out.println("key=" + currentKey + "现在时间是" + timestamp + "定时器触发");}});process.print();env.execute();}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/70303.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

10分钟上手DeepSeek开发:SpringBoot + Vue2快速构建AI对话系统

作者&#xff1a;后端小肥肠 目录 1. 前言 为什么选择DeepSeek&#xff1f; 本文技术栈 2. 环境准备 2.1. 后端项目初始化 2.2. 前端项目初始化 3. 后端服务开发 3.1. 配置文件 3.2. 核心服务实现 4. 前端服务开发 4.1. 聊天组件ChatWindow.vue开发 5. 效果展示及源…

Transformer多头注意力并行计算原理与工业级实现:从数学推导到PyTorch工程优化

一、核心数学原理剖析 1.1 多头注意力矩阵分解 Q XW^Q ∈ R^{nd_k} K XW^K ∈ R^{nd_k} V XW^V ∈ R^{nd_v} 多头分解公式&#xff1a; head_i Attention(QW_i^Q, KW_i^K, VW_i^V) 其中 W_i^Q ∈ R^{d_kd_k/h}, W_i^K ∈ R^{d_kd_k/h}, W_i^V ∈ R^{d_vd_v/h} (h为头数…

通过监督微调提升多语言大语言模型性能

引言 澳鹏助力一家全球科技公司提升其大语言模型&#xff08;LLM&#xff09;的性能。通过提供结构化的人工反馈形式的大语言模型训练数据&#xff0c;让该模型在30多种语言、70多种方言中的表现得到优化。众包人员们进行多轮对话&#xff0c;并依据回复的相关性、连贯性、准确…

大数据开发治理平台~DataWorks(核心功能汇总)

目录 数据集成 功能概述 使用限制 功能相关补充说明 数据开发 功能概述 数据建模 功能概述 核心技术与架构 数据分析 功能概述 数据治理 数据地图 功能概述 数据质量 功能概述 数据治理资产 功能概述 使用限制 数据服务 功能概述 数据集成 DataWorks的数据…

用Nginx打造防盗链护盾

用Nginx打造防盗链护盾 一、你的网站正在"为他人做嫁衣"&#xff1f; 想象一下这个场景&#xff1a; 你精心拍摄的摄影作品、录制的课程视频、设计的原创素材&#xff0c;被其他网站直接盗用链接。 更气人的是——当用户在他们网站查看这些资源时&#xff0c;消耗的…

STM32 看门狗

目录 背景 独立看门狗&#xff08;IWDG&#xff09; 寄存器访问保护 窗口看门狗&#xff08;WWDG&#xff09; 程序 独立看门狗 设置独立看门狗程序 第一步、使能对独立看门狗寄存器的写操作 第二步、设置预分频和重装载值 第三步、喂狗 第四步、使能独立看门狗 喂狗…

Kubernetes的Ingress 资源是什么?

在Kubernetes中&#xff0c;Ingress资源是一种用于管理集群外部对内部服务访问的API对象&#xff0c;主要用于将不同的外部请求路由到集群内的不同服务&#xff0c;以下是关于它的详细介绍&#xff1a; 定义与作用 Ingress资源定义了从集群外部到内部服务的HTTP和HTTPS路由规…

vue3-03初学vue3中的配置项setup(Composition API (组合API组件中所用到的:数据、方法等,均要配置在setup中)

1.关于setup Vue3.0中一个新的配置项&#xff0c;值为一个函数.setup是所有Composition API (组合API)“表演的舞台”m组件中所用到的:数据、方法等等&#xff0c;均要配置在setup中。 2..setup函数使用 setup函数的两种返回值 1.若返回一个对象&#xff0c;则对象中的属性、…

【go语言规范】 使用函数式选项 Functional Options 模式处理可选配置

如何处理可选配置&#xff1f; Config Struct 方式 (config-struct/main.go) 这是最简单的方式&#xff0c;使用一个配置结构体&#xff1a; 定义了一个简单的 Config 结构体&#xff0c;包含 Port 字段创建服务器时直接传入配置对象优点&#xff1a;简单直接缺点&#xff1a…

leetcode 2585. 获得分数的方法数

题目如下 数据范围 莫要被困难的外衣骗了&#xff0c;本题就是有数量限制的完全背包问题。显然我们可以令 f(x,y)为当有x种题目时分数为y时的方法数 令某种题目的数量为k 那么方法数应该是 f(x,y) f(x - 1,y - k * (分值))其中(0 < k < 题目数量)通过代码 class So…

深入理解JavaScript中的异步编程与Promise

一、引言 在JavaScript的世界中&#xff0c;异步编程是一个核心概念&#xff0c;尤其是在处理网络请求、文件操作或任何可能阻塞主线程的任务时。本文将深入探讨JavaScript中的异步编程模型&#xff0c;特别是Promise对象的使用。 二、异步编程基础 2.1 什么是异步编程&…

VS Code 如何搭建C/C++开发环境

目录 1.VS Code是什么 2. VS Code的下载和安装 2.1 下载和安装 2.2.1 下载 2.2.2 安装 2.2 环境的介绍 2.3 安装中文插件 3. VS Code配置C/C开发环境 3.1 下载和配置MinGW-w64编译器套件 3.1.1 下载 3.1.2 配置 3.2 安装C/C插件 3.3 重启VSCode 4. 在VSCode上编写…

如何查询网站是否被百度蜘蛛收录?

一、使用site命令查询 这是最直接的方法。在百度搜索框中输入“site:你的网站域名”&#xff0c;例如“site:example.com”&#xff08;请将“example.com”替换为你实际的网站域名&#xff09;。如果搜索结果显示了你的网站页面&#xff0c;并且显示了收录的页面数量&#xf…

数仓搭建:DWS层(服务数据层)

DWS层示例: 搭建日主题宽表 需求 维度 步骤 在hive中建数据库dws >>建表 CREATE DATABASE if NOT EXISTS DWS; 建表sql CREATE TABLE yp_dws.dws_sale_daycount( --维度 city_id string COMMENT 城市id, city_name string COMMENT 城市name, trade_area_id string COMME…

伪类选择器

作用&#xff1a;选中特殊状态的元素 一、动态伪类 1. :link 超链接 未被访问 的状态。 2. :visited 超链接 访问过 的状态。 3. :hover 鼠标 悬停 在元素上的状态。 4. :active 元素 激活 的状态。 什么是激活&#xff1f; —— 按下鼠标不松开。 注意点&#xf…

Kubernetes:EKS 中 Istio Ingress Gateway 负载均衡器配置及常见问题解析

引言 在云原生时代&#xff0c;Kubernetes 已经成为容器编排的事实标准。AWS EKS (Elastic Kubernetes Service) 作为一项完全托管的 Kubernetes 服务&#xff0c;简化了在 AWS 上运行 Kubernetes 的复杂性。Istio 作为服务网格领域的佼佼者&#xff0c;为微服务提供了流量管理…

Docker安装Kafka(不依赖ZooKeeper)

创建docker-compose.yaml version: "3.9" #版本号 services:kafka:image: apache/kafka:3.9.0container_name: kafkahostname: kafkaports:- 9092:9092 # 容器内部之间使用的监听端口- 9094:9094 # 容器外部访问监听端口environment:KAFKA_NODE_ID: 1KAFKA_PROCES…

挪车小程序挪车二维码php+uniapp

一款基于FastAdminThinkPHP开发的匿名通知车主挪车微信小程序&#xff0c;采用匿名通话的方式&#xff0c;用户只能在有效期内拨打车主电话&#xff0c;过期失效&#xff0c;从而保护车主和用户隐私。提供微信小程序端和服务端源码&#xff0c;支持私有化部署。 更新日志 V1.0…

unity 设置可配置文件asset

使用可序列化类保存配置&#xff0c;并且将可序列化类保存成Unity的自定义文件&#xff08;.asset&#xff09;,然后配置自定义文件&#xff08;.asset&#xff09;。 [Serializable][CreateAssetMenu(menuName "ScriptableOject/BuildConfig")]public class BuildC…

一周学会Flask3 Python Web开发-http响应状态码

锋哥原创的Flask3 Python Web开发 Flask3视频教程&#xff1a; 2025版 Flask3 Python web开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili 在Flask程序中&#xff0c;客户端发出的请求触发相应的视图函数&#xff0c;获取返回值会作为响应的主体&#xff0c;最后生成…