Flink在指定时间窗口内统计均值,超过阈值后报警

1、需求

统计物联网设备收集上来的温湿度数据,如果5分钟内的均值超过阈值(30摄氏度)则发出告警消息,要求时间窗口和阈值可在管理后台随时修改,实时生效(完成当前窗口后下一个窗口使用最新配置)。

物联网设备的数据从kafka中读取,配置数据从mysql中读取,有个管理后台可以调整窗口和阈值大小。

2、思路

使用flink的双流join,配置数据使用广播流,设备数据使用普通流。

3、实现代码

package cu.iot.flink;import com.alibaba.fastjson2.JSON;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.formats.json.JsonSerializationSchema;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import window.alert.AlertConfig;
import window.alert.EnrichedSensorData;
import window.alert.SensorData;import java.time.Duration;
import java.util.Properties;public class BroadcastDemo {private static String KAFKA_SERVERS = "192.168.213.1:9092,192.168.213.2:9092,192.168.213.3:9092";private static String KAFKA_GROUP_ID = "public-system-group-dev";private static String KAFKA_CONSUMER_TOPIC = "public-system-collect-data-dev";private static String KAFKA_PRODUCER_TOPIC = "public-system-collect-data-dev-output";private static String KAFKA_PRODUCER_SLIDE_TOPIC = "public-system-collect-data-dev-slide-output";private static String MYSQL_URL = "jdbc:mysql://10.20.72.1:8190/alerting?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8";private static String MYSQL_USERNAME = "root";private static String MYSQL_PASSWORD = "xxxxx";public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//env.enableCheckpointing(1000);env.setParallelism(1);Properties kafkaProps = new Properties();kafkaProps.setProperty("bootstrap.servers", KAFKA_SERVERS);kafkaProps.setProperty("group.id", KAFKA_GROUP_ID);FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(KAFKA_CONSUMER_TOPIC, new SimpleStringSchema(), kafkaProps);kafkaConsumer.setStartFromLatest();// 侧输出OutputTag<EnrichedSensorData> lateData = new OutputTag<>("lateData", Types.GENERIC(EnrichedSensorData.class));DataStream<SensorData> sensorStream = env.addSource(kafkaConsumer).map((MapFunction<String, SensorData>) value -> JSON.parseObject(value,SensorData.class));Properties dbProps = new Properties();dbProps.setProperty("url", MYSQL_URL);dbProps.setProperty("username", MYSQL_USERNAME);dbProps.setProperty("password", MYSQL_PASSWORD);DataStreamSource<window.alert.AlertConfig> streamSource = env.addSource(new MySQLSourceFunction(dbProps));SingleOutputStreamOperator<window.alert.AlertConfig> streamOperator = streamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<window.alert.AlertConfig>forMonotonousTimestamps().withTimestampAssigner((o, timestamp) -> o.getTimestamp()));MapStateDescriptor<String, window.alert.AlertConfig> broadcastStateDescriptor = new MapStateDescriptor<>("alertConfig",TypeInformation.of(new TypeHint<String>() {}),TypeInformation.of(new TypeHint<window.alert.AlertConfig>() {}));// 设置事件时间和水印SingleOutputStreamOperator<String> alertStream = sensorStream.assignTimestampsAndWatermarks(WatermarkStrategy.<SensorData>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner((o, timestamp) -> o.getTimestamp())).connect(streamOperator.broadcast(broadcastStateDescriptor)).process(new BroadcastProcessFunction<SensorData, AlertConfig, EnrichedSensorData>() {@Overridepublic void processElement(SensorData value, ReadOnlyContext ctx, Collector<EnrichedSensorData> out) throws Exception {ReadOnlyBroadcastState<String, AlertConfig> broadcastState = ctx.getBroadcastState(broadcastStateDescriptor);AlertConfig alertConfig = broadcastState.get("alertConfig");if (alertConfig != null) {EnrichedSensorData enrichedSensorData = new EnrichedSensorData(value, alertConfig);//System.out.println("out.collect = "+enrichedSensorData);out.collect(enrichedSensorData);}}@Overridepublic void processBroadcastElement(AlertConfig value, BroadcastProcessFunction<SensorData, AlertConfig, EnrichedSensorData>.Context ctx, Collector<EnrichedSensorData> collector) throws Exception {BroadcastState<String, AlertConfig> broadcastState = ctx.getBroadcastState(broadcastStateDescriptor);//System.out.println("broadcastState.put = "+value);broadcastState.put("alertConfig", value);}}).keyBy(EnrichedSensorData::getSensorId).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(2)).sideOutputLateData(lateData)
//                .aggregate(
//                        new AggregateFunction<EnrichedSensorData, Tuple2<Double, Integer>, Double>() {
//                    @Override
//                    public Tuple2<Double, Integer> createAccumulator() {
//                        return new Tuple2<>(0.0D, 0);
//                    }
//
//                    @Override
//                    public Tuple2<Double, Integer> add(EnrichedSensorData val, Tuple2<Double, Integer> accumulator) {
//                        return new Tuple2<>(accumulator.f0 + val.getTemperature() , accumulator.f1 + 1);
//                    }
//
//                    @Override
//                    public Double getResult(Tuple2<Double, Integer> accumulator) {
//                        Double rs = accumulator.f0 / accumulator.f1;
//                        System.out.println("getResult...accumulator.f0 ="+accumulator.f0+", accumulator.f1 = "+accumulator.f1+","+rs);
//                        return rs;
//                    }
//
//                    @Override
//                    public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
//                        System.out.println("merge...  a.f0 + b.f0="+(a.f0 + b.f0)+", a.f1 + b.f1="+(a.f1 + b.f1));
//                        return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
//                    }
//                }, new ProcessWindowFunction<Double, String, String, TimeWindow>() {
//                    @Override
//                    public void process(String s, ProcessWindowFunction<Double, String, String, TimeWindow>.Context ctx, Iterable<Double> list, Collector<String> out) {
//                        TimeWindow window = ctx.window();
//                        int count = 0;
//                        double sum = 0.0;
//                        for (Double v : list) {
//                            sum += v;
//                            count++;
//                        }
//                        out.collect("<Alert> ID:"+s+",window:["+window.getStart()+","+window.getEnd()+") avg="+(count > 0 ? sum / count : 0));
//                    }
//                }
//                ).apply(new WindowFunction<EnrichedSensorData, String, String, TimeWindow>() {@Overridepublic void apply(String sensorId, TimeWindow window, Iterable<EnrichedSensorData> input, Collector<String> out) {double sumTemp = 0;int count = 0;AlertConfig alertConfig = null;for (EnrichedSensorData data : input) {//System.out.println("Processing event: " + data);sumTemp += data.getTemperature();count++;alertConfig = data.getAlertConfig();}double avgTemp = count > 0 ? sumTemp / count : 0;if (alertConfig != null && avgTemp > alertConfig.getThreshold()) {out.collect("<ALERT> window: [" + window.getStart() + "," + window.getEnd() + ") sensorId:"+sensorId+",均值:"+avgTemp+",阈值:"+alertConfig.getThreshold());}}});alertStream.print("正常输出>>>");SingleOutputStreamOperator<String> out = alertStream;SideOutputDataStream<EnrichedSensorData> output = out.getSideOutput(lateData);output.printToErr("侧输出>>>");// Kafka sink for alertsFlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(KAFKA_SERVERS, KAFKA_PRODUCER_TOPIC, new SimpleStringSchema());FlinkKafkaProducer<EnrichedSensorData> kafkaProducer2 = new FlinkKafkaProducer<>(KAFKA_SERVERS, KAFKA_PRODUCER_SLIDE_TOPIC, new JsonSerializationSchema<>());alertStream.addSink(kafkaProducer);output.addSink(kafkaProducer2);env.execute("Sensor Alerts");}
}
package cu.iot.flink;import cn.hutool.core.util.RandomUtil;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import window.alert.AlertConfig;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Properties;public class MySQLSourceFunction implements SourceFunction<AlertConfig> {private Properties p;private volatile boolean running = true;public MySQLSourceFunction(Properties p) {this.p = p;}@Overridepublic void run(SourceContext<AlertConfig> ctx) throws Exception {while (running) {AlertConfig config = fetchAlertConfig();ctx.collect(config);Thread.sleep(10000);}}@Overridepublic void cancel() {running = false;}private AlertConfig fetchAlertConfig() {AlertConfig config = new AlertConfig();try (Connection conn = DriverManager.getConnection(p.getProperty("url"), p.getProperty("username"), p.getProperty("password"));PreparedStatement stmt = conn.prepareStatement("SELECT time_window,upper_limit FROM t_alert_rule WHERE id = 3 AND rule_type = 'timeWindow'");ResultSet rs = stmt.executeQuery()) {if (rs.next()) {config.setWindowSizeMillis(rs.getLong("time_window"));config.setThreshold(rs.getDouble("upper_limit"));config.setTimestamp(System.currentTimeMillis()- RandomUtil.randomInt(1000, 10000));}} catch (Exception e) {e.printStackTrace();}return config;}
}
package window.alert;public class AlertConfig {private Long windowSizeMillis;private Double threshold;private Long timestamp;public Long getTimestamp() {return timestamp;}public void setTimestamp(Long timestamp) {this.timestamp = timestamp;}public Long getWindowSizeMillis() {return windowSizeMillis;}public void setWindowSizeMillis(Long windowSizeMillis) {this.windowSizeMillis = windowSizeMillis;}public Double getThreshold() {return threshold;}public void setThreshold(Double threshold) {this.threshold = threshold;}public AlertConfig() {}public AlertConfig(long windowSize, double threshold) {this.windowSizeMillis = windowSize;this.threshold = threshold;}@Overridepublic String toString() {return "AlertConfig{" +"windowSizeMillis=" + windowSizeMillis +", threshold=" + threshold +", timestamp=" + timestamp +'}';}
}
package window.alert;public class SensorData {private String sensorId;private double temperature;private double humidity;private long timestamp;public SensorData() {}public String getSensorId() {return sensorId;}public void setSensorId(String sensorId) {this.sensorId = sensorId;}public double getTemperature() {return temperature;}public void setTemperature(double temperature) {this.temperature = temperature;}public double getHumidity() {return humidity;}public void setHumidity(double humidity) {this.humidity = humidity;}public long getTimestamp() {return timestamp;}public void setTimestamp(long timestamp) {this.timestamp = timestamp;}public SensorData(String sensorId, double temperature, double humidity, long timestamp) {this.sensorId = sensorId;this.temperature = temperature;this.humidity = humidity;this.timestamp = timestamp;}@Overridepublic String toString() {return "SensorData{" +"sensorId='" + sensorId + '\'' +", temperature=" + temperature +", humidity=" + humidity +", timestamp=" + timestamp +'}';}
}
package window.alert;public class EnrichedSensorData {private SensorData sensorData;private AlertConfig alertConfig;public EnrichedSensorData(SensorData sensorData, AlertConfig alertConfig) {this.sensorData = sensorData;this.alertConfig = alertConfig;}public SensorData getSensorData() {return sensorData;}public void setSensorData(SensorData sensorData) {this.sensorData = sensorData;}public String getSensorId() {return sensorData.getSensorId();}public double getTemperature() {return sensorData.getTemperature();}public AlertConfig getAlertConfig() {return alertConfig;}@Overridepublic String toString() {return "EnrichedSensorData{" +"sensorData=" + sensorData +", alertConfig=" + alertConfig +'}';}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/69847.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Flutter Gradle 命令式插件正式移除,你迁移旧版 Gradle 配置了吗?

在 Flutter 3.29 版本里官方正式移除了 Flutter Gradle Apply 插件&#xff0c;其实该插件自 3.19 起已被弃用&#xff0c;同时 Flutter 团队后续也打算把 Flutter Gradle 从 Groovy 转换为 Kotlin&#xff0c;并将其迁移到使用 AGP&#xff08;Android Gradle Plugin&#xff…

15.Python网络编程:进程池、进程间通信、多线程、进程和线程区别、网络通信、端口、IP地址、socket、UDP、TCP、http

1. 进程池&#xff08;Process Pool&#xff09; 进程池是通过将多个进程放入池中管理来避免频繁地创建和销毁进程&#xff0c;提高效率。Python 提供了 multiprocessing.Pool 类来实现进程池&#xff0c;它可以用于并行计算任务。 示例&#xff1a;使用进程池 from multipr…

Vue 中报错 TypeError: crypto$2.getRandomValues is not a function

问题 在新建的项目中&#xff0c;使用的是 npm init vue 创建项目后&#xff0c;执行命令 npm i &#xff0c;然后去 npm run dev 这个时候报错 TypeError: crypto$2.getRandomValues is not a function 起初是以为搞错了&#xff0c;然后再删掉 node_modules 和 package-lo…

如何通过挂载debugfs来访问内核调试信息

1. DebugFS 的作用 bugFS 的作用 内核调试接口&#xff1a;允许内核模块或子系统在 DebugFS 中创建虚拟文件或目录&#xff0c;暴露调试信息。 动态交互&#xff1a;用户可以直接读写这些文件来查看或修改内核状态&#xff08;如调整日志级别、触发特定操作&#xff09;。 零…

001 SpringCloudAlibaba整合 - Nacos注册配置中心、Sentinel流控、Zipkin链路追踪、Admin监控

SpringCloudAlibaba 文章目录 SpringCloudAlibaba1.版本依赖关系2022.x 分支2021.x 分支2.2.x 分支 组件版本关系 2.基础项目构建1.引入全局pom文件2.创建对应的模块 3.SpringBootAdmin监控服务整合1.cloud-admin服务搭建1.导入服务端依赖2.主启动类添加EnableAdminServer注解启…

常用的网络安全设备

一、 WAF 应用防火墙 范围&#xff1a;应用层防护软件 作用&#xff1a; 通过特征提取和分块检索技术进行模式匹配来达到过滤&#xff0c;分析&#xff0c;校验网络请求包的目的&#xff0c;在保证正常网络应用功能的同时&#xff0c;隔绝或者阻断无效或者非法的攻击请求 可…

Jenkins 新建配置Pipeline任务 三

Jenkins 新建配置Pipeline任务 三 一. 登录 Jenkins 网页输入 http://localhost:8080 输入账号、密码登录 一个没有创建任务的空 Jenkins 二. 创建 任务 图 NewItem 界面左上角 New Item 图NewItemSelect 1.Enter an item name&#xff1a;输入任务名 2.Select an ite…

如何构建有效的人工智能代理

目录 什么是 AI 代理? 何时应使用 AI 代理? 人工智能代理的构建模块 构建 AI 代理的常用方法 1. 提示链接(分步说明) 2.路由(将任务发送到正确的地方) 3.并行处理(同时做多件事) 4. 协调者和工作者 AI(团队合作) 5. 评估器和优化器(修复错误) 如何让人工…

linux 下连接mysql(下)

case 表达式 表t1中的数据如下。 select * from t1; ---------------------------- | id | student_no | name | age | ---------------------------- | 3 | 202501 | ll | 10 | | 4 | 202502 | tt | 15 | ----------------------------如果学号是202501,…

hivemetastore 连接过多导致sql查询慢

MetaStore (DB)修改对应的处理连接数maxConnectionsPerPartition建议设置成 100 (不能超过最大值300)&#xff0c;重启Hive服务。 建议值100是根据与工行规模相当集群的设置作参考&#xff0c;尽量大满足连接数请求的同时考虑connection占用资源(如果connection太多会占用太多的…

sqli-labs布尔盲注

通过?id1及?id1判断是否存在sql注入的漏洞 ?id1有回显 ?id1无回显 再使用?id1 and 11 --及?id1 and 12 --判断是否为布尔盲注 ?id1 and 11 --有回显 ?id1 and 12 --无回显 为布尔盲注&#xff0c;使用python获取其中的库名&#xff0c;表名&#xff0c;列名及数据 1…

ovs源码分析

源码版本 ovs 内核部分的代码在linux内核的 /net/openswitch目录下&#xff0c;应用层控制面代码在ovs项目中。 Linux kernel: version 6.2.0 Ovs: v3.4.1 总体架构 整体结构图 ovs的架构如下图所示&#xff0c;主要由内核datapath、vswitchd、ovsdb以及用户空间的ovs-vs…

vue3实战-----使用mock模拟接口数据

vue3实战-----使用mock模拟接口数据 1.安装和配置2.创建mock数据3.使用axios测试 1.安装和配置 安装依赖:https://www.npmjs.com/package/vite-plugin-mock pnpm install -D vite-plugin-mock mockjs在 vite.config.js 配置文件启用插件: import { viteMockServe } from vit…

DeepSeek+Excel 效率翻倍

2025年初&#xff0c;DeepSeek以惊人的效率突破技术壁垒&#xff0c;用极低的成本实现了与行业顶尖AI相媲美的性能&#xff0c;瞬间成为全球科技领域的热门话题。 那么AI工具的普及将如何改变我们的工作方式&#xff1f;Excel会被取代吗&#xff1f; 今天&#xff0c;珠珠带你…

Rhel Centos环境开关机自动脚本

Rhel Centos环境开关机自动脚本 1. 业务需求2. 解决方法2.1 rc.local2.2 rc.d2.3 systemd2.4 systemd附着的方法2.5 tuned 3. 测试 1. 业务需求 一台较老的服务器上面业务比较简单,提供一个简单的网站,但已经没有业务的运维人员. 想达到的效果: 由于是非标准的apache或者nginx…

pyside6 中信号有的地方用connect有的用emit为什么

在 PySide6 的 Qt 框架中&#xff0c;connect 和 emit 是信号与槽机制的两个核心操作&#xff0c;但它们的作用完全不同&#xff1a; 1. connect()&#xff1a;建立信号与槽的绑定 作用&#xff1a;将某个信号&#xff08;Signal&#xff09;与一个槽函数&#xff08;Slot&…

React历代主要更新

一、React 16之前更新 React Fiber是16版本之后的一种更新机制&#xff0c;使用链表取代了树&#xff0c;是一种fiber数据结构&#xff0c;其有三个指针&#xff0c;分别指向了父节点、子节点、兄弟节点&#xff0c;当中断的时候会记录下当前的节点&#xff0c;然后继续更新&a…

使用 EDOT 监测由 OpenAI 提供支持的 Python、Node.js 和 Java 应用程序

作者&#xff1a;来自 Elastic Adrian Cole Elastic 很自豪地在我们的 Python、Node.js 和 Java EDOT SDK 中引入了 OpenAI 支持。它们为使用 OpenAI 兼容服务的应用程序添加日志、指标和跟踪&#xff0c;而无需任何代码更改。 介绍 去年&#xff0c;我们宣布了 OpenTelemetry…

RabbitMQ使用guest登录提示:User can only log in via localhost

guest用户默认是无法使用远程访问的&#xff0c;生产环境建议直接在对应服务器登录使用。 1、通过创建新增用户并赋予权限实现远程登录 添加新用户 rabbitmqctl add_user zjp zjp 设置管理员 rabbitmqctl set_user_tags zjp administrator 设置新用户的权限 rabbitmqctl…

Eclipse JSP/Servlet 深入解析

Eclipse JSP/Servlet 深入解析 引言 随着互联网的快速发展,Java Web开发技术逐渐成为企业级应用开发的主流。在Java Web开发中,JSP(JavaServer Pages)和Servlet是两个核心组件,它们共同构成了Java Web应用程序的基础。本文将深入解析Eclipse平台下的JSP/Servlet技术,帮…