Flink CEP是什么?

Apache Flink 的 CEP(Complex Event Processing,复杂事件处理) 是 Flink 提供的一个库,用于在无界数据流中检测符合特定模式的事件组合。


🎯 一、什么是 CEP?

✅ 定义:

CEP 是一种从连续的数据流中识别出符合预设模式(Pattern)的事件组合的技术。

它可以用来实现:

  • 用户行为分析(如“登录 → 加入购物车 → 放弃支付”)
  • 异常检测(如“连续失败请求超过3次”)
  • 风控规则匹配(如“短时间内多次转账”)

🧠 二、CEP 的核心概念

概念描述
Pattern定义你想要匹配的事件序列规则
PatternStream表示匹配到的事件流
Event Stream原始输入的数据流
Time Limit设置模式匹配的时间窗口(例如:10秒内完成一系列操作)
Quantifier控制事件出现的次数(如 oneOrMore, times(n), within() 等)

🔍 三、Flink CEP 的工作流程图解

原始事件流↓
[ Pattern API ] → 定义模式(如 A → B → C)↓
PatternStream → 匹配成功的事件组合↓
处理逻辑(如报警、记录日志等)

📦 四、Flink CEP 核心组件

1. Pattern<Event, ?>

定义事件匹配规则,例如:

Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {@Overridebegin方法详解public boolean filter(Event event) {return event.getType().equals("登录");}}).next("middle").where(new SimpleCondition<Event>() {public boolean filter(Event event) {return event.getType().equals("加入购物车");}}).within(Time.seconds(10)); // 在10秒内完成整个流程

2. PatternStream<Event>

将原始流与 Pattern 关联,得到匹配结果:

PatternStream<Event> patternStream = CEP.pattern(eventStream, pattern);

3. select / process 操作

对匹配成功的事件进行处理:

patternStream.select(new PatternSelectFunction<Event, String>() {@Overridepublic String select(Map<String, List<Event>> patternMap) throws Exception {Event start = patternMap.get("start").get(0);Event middle = patternMap.get("middle").get(0);return "用户行为路径匹配: " + start + " -> " + middle;}
}).print();

🧪 五、Java 示例代码演示

示例目标:

检测“连续三次登录失败”的用户行为

import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;public class FlinkCEPExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 模拟输入事件流DataStream<Event> eventStream = env.fromElements(new Event("userA", "登录失败", 1000L),new Event("userB", "登录成功", 1500L),new Event("userA", "登录失败", 2000L),new Event("userA", "登录失败", 3000L),new Event("userA", "登录成功", 4000L));// 定义 CEP 模式:连续3次登录失败(时间窗口为10秒)Pattern<Event, ?> pattern = Pattern.<Event>begin("first").where(new SimpleCondition<Event>() {@Overridepublic boolean filter(Event event) {return event.getType().equals("登录失败");}}).times(3).within(Time.seconds(10));// 将模式应用到事件流上PatternStream<Event> patternStream = CEP.pattern(eventStream, pattern);// 输出匹配到的事件patternStream.select(new PatternSelectFunction<Event, String>() {@Overridepublic String select(Map<String, List<Event>> patternMap) throws Exception {List<Event> events = patternMap.get("first");return "发现异常行为!用户 [" + events.get(0).userId + "] 连续3次登录失败";}}).print();env.execute("Flink CEP Example");}// 事件类public static class Event {public String userId;public String type;public long timestamp;public Event(String userId, String type, long timestamp) {this.userId = userId;this.type = type;this.timestamp = timestamp;}public String getType() {return type;}public String getUserId() {return userId;}@Overridepublic String toString() {return "{" + "\"userId\":\"" + userId + "\", \"type\":\"" + type + "\", \"timestamp\":" + timestamp + "}";}}
}

📈 六、运行结果示例

发现异常行为!用户 [userA] 连续3次登录失败

表示 userA 在 10 秒内连续出现了 3 次 “登录失败” 的行为,触发了 CEP 规则。


⚙️ 七、常用 Pattern 条件和匹配方式

方法描述
.begin("name")开始一个新的模式
.where(condition)添加一个条件
.times(n)匹配 n 次
.oneOrMore()匹配至少一次
.greedy()贪婪匹配(尽可能多匹配)
.followedBy("name")非严格近邻(允许中间有其他事件)
.notFollowedBy("name")排除某个事件
.within(Time.time)设置模式匹配的最大时间窗口

🧩 八、CEP 的应用场景

场景描述
风控系统检测欺诈行为、异常交易
用户行为分析识别漏斗转化率、用户流失路径
IoT 设备监控检测设备故障前的行为序列
运维监控检测服务调用链中的异常顺序
安全审计检测非法操作组合(如“登录失败→尝试访问敏感资源”)

✅ 九、CEP 使用建议

建议说明
时间窗口设置合理太大会影响性能,太小可能漏掉有效模式
合理使用 greedy 模式避免重复匹配或遗漏
与 Watermark 结合使用确保事件时间语义正确
限制状态大小防止状态无限增长(可使用 withStateCleaning(true)
使用侧输出处理未匹配事件可选,用于调试或补救机制

📌 十、总结

特性描述
名称Flink CEP
功能流式数据中识别事件模式
输入无界流
输出匹配到的事件组合
适用场景用户行为分析、风控、安全审计等
依赖库flink-cepflink-cep-java

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/81844.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ARM (Attention Refinement Module)

ARM模块【来源于BiSeNet】&#xff1a;细化特征图的注意力&#xff0c;增强重要特征并抑制不重要的特征。 Attention Refinement Module (ARM) 详解 ARM (Attention Refinement Module) 是 BiSeNet 中用于增强特征表示的关键模块&#xff0c;它通过注意力机制来细化特征图&…

AR0144CSSC20SUKA0-CRBR——1/4英寸 1.0 MP 高性能CMOS图像传感器解析

产品概述&#xff1a; AR0144CSSC20SUKA0-CRBR 是一款1/4 英寸&#xff0c;1.0 Mp CMOS 数字图像传感器&#xff0c;带有 1280H x 800V 有效像素阵列 全局快门CMOS数字图像传感器&#xff0c;它结合了新型的创新全局快门像素设计&#xff0c;适用于准确快速的移动场景捕捉。该…

深入理解递归算法:Go语言实现指南

深入理解递归算法&#xff1a;Go语言实现指南 引言 递归是编程中一种优雅而强大的算法思想&#xff0c;通过函数自我调用的方式解决复杂问题。本文将使用Go语言演示递归的核心原理&#xff0c;并通过典型示例帮助开发者掌握这一重要技术。 一、递归基础概念 1.1 递归定义 递归…

vue2实现【瀑布流布局】

瀑布流 1. 解释2. 形成结构和样式3. 自定义指令 1. 解释 瀑布流特征&#xff1a; 等宽不等高&#xff1a;元素宽度固定&#xff0c;高度根据内容自适应。错落排列&#xff1a;元素像瀑布一样从上到下依次填充&#xff0c;自动寻找最短列插入 体现&#xff1a;图中第一排1&…

CSS display有几种属性值

在 CSS 中&#xff0c;display 属性是控制元素布局和渲染方式的核心属性之一。它有多种属性值&#xff0c;每个值都决定了元素在文档流中的表现形式。以下是 display 的主要属性值分类及说明&#xff1a; 1. 块级和行内布局 块级元素 (block) 特性&#xff1a;独占一行&…

基于Java实现可靠传输

实现可靠传输 1. 结合代码和 LOG 文件分析针对每个项目举例说明解决效果。 RDT1.0 对应 Log 日志&#xff1a;Log 1.0.txt&#xff0c;接收文件 recvData 1.0.txt RDT1.0 版本是在可靠信道上进行可靠的数据传输&#xff0c;因此没有过多的内容需要说明&#xff0c;发送方 L…

机器学习10-随机森林

随机森林学习笔记 一、随机森林简介 随机森林&#xff08;Random Forest&#xff09;是一种集成学习算法&#xff0c;基于决策树构建模型。它通过组合多个决策树的结果来提高模型的准确性和稳定性。随机森林的核心思想是利用“集成”的方式&#xff0c;将多个弱学习器组合成一…

LeetCode 438. 找到字符串中所有字母异位词 | 滑动窗口与字符计数数组解法

文章目录 问题描述核心思路&#xff1a;滑动窗口 字符计数数组1. 字符计数数组2. 滑动窗口 算法步骤完整代码实现复杂度分析关键点总结类似问题 问题描述 给定两个字符串 s 和 p&#xff0c;要求找到 s 中所有是 p 的**字母异位词&#xff08;Anagram&#xff09;**的子串的起…

idea中,git的cherry-pick怎么用

背景: A同学在A分支进行开发, B同学在B分支进行开发,B同学开发过程中发现,A同学在A分支上面的某次提交,例如某次提交了一个工具类,B同学也用的到这个工具类,但是B又不想mergeA分支的代码,此时就可以用到git的chery pick能力.

深入解析:如何基于开源OpENer开发EtherNet/IP从站服务

一、EtherNet/IP协议概述 EtherNet/IP(Industrial Protocol)是一种基于以太网的工业自动化通信协议,它将CIP(Common Industrial Protocol)封装在标准以太网帧中,通过TCP/IP和UDP/IP实现工业设备间的通信。作为ODVA(Open DeviceNet Vendors Association)组织的核心协议…

当 PyIceberg 和 DuckDB 遇见 AWS S3 Tables:打造 Serverless 数据湖“开源梦幻组合”

引言 在一些大数据分析场景比如电商大数据营销中&#xff0c;我们需要快速分析存储海量用户行为数据&#xff08;如浏览、加购、下单&#xff09;&#xff0c;以进行用户行为分析&#xff0c;优化营销策略。传统方法依赖 Spark/Presto 集群或 Redshift 查询 S3 上的 Parquet/O…

流复备机断档处理

文章目录 环境症状问题原因解决方案 环境 系统平台&#xff1a;UOS&#xff08;海光&#xff09;,UOS &#xff08;飞腾&#xff09;,UOS&#xff08;鲲鹏&#xff09;,UOS&#xff08;龙芯&#xff09;,UOS &#xff08;申威&#xff09;,银河麒麟svs&#xff08;X86_64&…

【蓝桥杯真题精讲】第 16 届 Python A 组(省赛)

文章目录 T1 偏蓝 (5/5)T2 IPv6 (0/5)T3 2025 图形 (10/10)T4 最大数字 (10/10)T5 倒水 (15/15)T6 拼好数 (0/15)T7 登山 (20/20)T8 原料采购 (20/20) 更好的阅读体验 高速访问&#xff1a;https://wiki.dwj601.cn/ds-and-algo/lan-qiao-cup/16th-python-a/永久链接&#xff1…

SpringBoot+Dubbo+Zookeeper实现分布式系统步骤

SpringBootDubboZookeeper实现分布式系统 一、分布式系统通俗解释二、环境准备&#xff08;详细版&#xff09;1. 软件版本2. 安装Zookeeper&#xff08;单机模式&#xff09; 三、完整项目结构&#xff08;带详细注释&#xff09;四、手把手代码实现步骤1&#xff1a;创建父工…

Spring的业务层,持久层,控制层的关系

在 Spring 框架中&#xff0c;控制层&#xff08;Controller&#xff09;、业务层&#xff08;Service&#xff09; 和 持久层&#xff08;Repository/Mapper&#xff09; 是分层架构的核心组成部分&#xff0c;职责分离明确&#xff0c;通过依赖注入&#xff08;DI&#xff09…

css实现不确定内容的高度过渡

实现效果&#xff1a;鼠标悬浮按钮&#xff0c;高度过渡出现如图所示文本框 代码&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-widt…

计算机视觉与深度学习 | matlab实现ARIMA-WOA-CNN-LSTM时间序列预测(完整源码和数据)

以下是一个基于MATLAB的ARIMA-WOA-CNN-LSTM时间序列预测框架。由于完整代码较长,此处提供核心模块和实现思路,完整源码和数据可通过文末方式获取。 1. 数据准备(示例数据) 使用MATLAB内置的航空乘客数据集: % 加载数据 data = readtable(airline-passengers.csv); data …

在 Excel 中使用东方仙盟软件————仙盟创梦IDE

安装插件 用仙盟创梦编写插件代码 源码 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using ExcelDna.Integration;namespace 东方仙盟.仙盟创梦IDE_招标系统 {public static class 仙盟创梦_招标专…

Sql刷题日志(day9)

一、笔试 1、limit offset&#xff1a;分页查询 SELECT column1, column2, ... FROM table_name LIMIT number_of_rows OFFSET start_row; --跳过前 start_row 行&#xff0c;返回接下来的 number_of_rows 行。 2、lag、lead&#xff1a;查询前后行数据 --lag函数用于访问当…

C++面试3——const关键字的核心概念、典型场景和易错陷阱

const关键字的核心概念、典型场景和易错陷阱 一、const本质&#xff1a;类型系统的守护者 1. 与#define的本质差异 维度#defineconst编译阶段预处理替换编译器类型检查作用域无作用域&#xff08;全局污染&#xff09;遵循块作用域调试可见性符号消失保留符号信息类型安全无类…