【Flink银行反欺诈系统设计方案】1.短时间内多次大额交易场景的flink与cep的实现

【flink应用系列】1.Flink银行反欺诈系统设计方案

  • 1. 经典案例:短时间内多次大额交易
    • 1.1 场景描述
    • 1.2 风险判定逻辑
  • 2. 使用Flink实现
    • 2.1 实现思路
    • 2.2 代码实现
    • 2.3 使用Flink流处理
  • 3. 使用Flink CEP实现
    • 3.1 实现思路
    • 3.2 代码实现
  • 4. 总结

1. 经典案例:短时间内多次大额交易

1.1 场景描述

规则1:单笔交易金额超过10,000元。

规则2:同一用户在10分钟内进行了3次或更多次交易。

风险行为:同时满足规则1和规则2的交易行为。

1.2 风险判定逻辑

检测每笔交易是否满足“单笔交易金额超过10,000元”。

对同一用户,统计10分钟内的交易次数。

如果交易次数达到3次或更多,则判定为风险行为。

2. 使用Flink实现

2.1 实现思路

使用Flink的KeyedStream按用户分组。

使用ProcessFunction实现自定义窗口逻辑,统计10分钟内的交易次数。

结合规则1和规则2,判断是否为风险行为。

2.2 代码实现

// 定义交易数据POJO
public class Transaction {private String transactionId;private String userId;private Double amount;private Long timestamp;// getters and setters
}// 定义风控结果POJO
public class RiskResult {private String userId;private String transactionId;private String riskLevel;private String actionTaken;private Long createTime;// getters and setters
}// 实现风控逻辑
public class FraudDetectionProcessFunction extends KeyedProcessFunction<String, Transaction, RiskResult> {private transient ValueState<Integer> transactionCountState;private transient ValueState<Long> timerState;@Overridepublic void open(Configuration parameters) {// 初始化状态ValueStateDescriptor<Integer> countDescriptor = new ValueStateDescriptor<>("transactionCount", Types.INT);transactionCountState = getRuntimeContext().getState(countDescriptor);ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>("timerState", Types.LONG);timerState = getRuntimeContext().getState(timerDescriptor);}@Overridepublic void processElement(Transaction transaction,Context ctx,Collector<RiskResult> out) throws Exception {// 规则1:单笔交易金额超过10,000元if (transaction.getAmount() > 10000) {// 更新交易次数Integer count = transactionCountState.value();if (count == null) {count = 0;}count += 1;transactionCountState.update(count);// 如果是第一次满足规则1,设置10分钟的定时器if (count == 1) {long timer = ctx.timestamp() + 10 * 60 * 1000; // 10分钟ctx.timerService().registerEventTimeTimer(timer);timerState.update(timer);}// 规则2:10分钟内交易次数达到3次if (count >= 3) {RiskResult result = new RiskResult();result.setUserId(transaction.getUserId());result.setTransactionId(transaction.getTransactionId());result.setRiskLevel("HIGH");result.setActionTaken("ALERT");result.setCreateTime(System.currentTimeMillis());out.collect(result);}}}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<RiskResult> out) throws Exception {// 定时器触发时,重置状态transactionCountState.clear();timerState.clear();}
}

2.3 使用Flink流处理

java

DataStream<Transaction> transactionStream = env.addSource(transactionSource);DataStream<RiskResult> riskResultStream = transactionStream.keyBy(Transaction::getUserId).process(new FraudDetectionProcessFunction());riskResultStream.addSink(new AlertSink());

3. 使用Flink CEP实现

Flink CEP(Complex Event Processing)是Flink提供的复杂事件处理库,适合处理基于时间序列的模式匹配。以下是使用Flink CEP实现上述风控规则的示例。

3.1 实现思路

定义模式:检测10分钟内3次或更多次大额交易。

使用Flink CEP的模式匹配功能,匹配符合条件的事件序列。

3.2 代码实现

java

// 定义交易数据POJO
public class Transaction {private String transactionId;private String userId;private Double amount;private Long timestamp;// getters and setters
}// 定义风控结果POJO
public class RiskResult {private String userId;private List<String> transactionIds;private String riskLevel;private String actionTaken;private Long createTime;// getters and setters
}// 实现风控逻辑
public class FraudDetectionCEP {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 交易数据流DataStream<Transaction> transactionStream = env.addSource(transactionSource).assignTimestampsAndWatermarks(WatermarkStrategy.<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event, timestamp) -> event.getTimestamp()));// 按用户分组KeyedStream<Transaction, String> keyedStream = transactionStream.keyBy(Transaction::getUserId);// 定义CEP模式:10分钟内3次或更多次大额交易Pattern<Transaction, ?> pattern = Pattern.<Transaction>begin("first").where(new SimpleCondition<Transaction>() {@Overridepublic boolean filter(Transaction transaction) {return transaction.getAmount() > 10000;}}).next("second").where(new SimpleCondition<Transaction>() {@Overridepublic boolean filter(Transaction transaction) {return transaction.getAmount() > 10000;}}).next("third").where(new SimpleCondition<Transaction>() {@Overridepublic boolean filter(Transaction transaction) {return transaction.getAmount() > 10000;}}).within(Time.minutes(10));// 应用模式PatternStream<Transaction> patternStream = CEP.pattern(keyedStream, pattern);// 生成风控结果DataStream<RiskResult> riskResultStream = patternStream.process(new PatternProcessFunction<Transaction, RiskResult>() {@Overridepublic void processMatch(Map<String, List<Transaction>> match,Context ctx,Collector<RiskResult> out) throws Exception {RiskResult result = new RiskResult();result.setUserId(match.get("first").get(0).getUserId());result.setTransactionIds(match.values().stream().flatMap(List::stream).map(Transaction::getTransactionId).collect(Collectors.toList()));result.setRiskLevel("HIGH");result.setActionTaken("ALERT");result.setCreateTime(System.currentTimeMillis());out.collect(result);}});// 输出结果riskResultStream.addSink(new AlertSink());env.execute("Fraud Detection with Flink CEP");}
}

4. 总结

Flink实现:通过KeyedProcessFunction和状态管理实现多规则匹配。

Flink CEP实现:通过定义复杂事件模式,简化多规则匹配的逻辑。

适用场景:

Flink适合需要自定义逻辑的场景。

Flink CEP适合基于时间序列的模式匹配场景。

通过以上实现,可以高效检测银行交易中的风险行为,并根据需要扩展更多规则

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/72605.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C语言——链表

大神文献&#xff1a;https://blog.csdn.net/weixin_73588765/article/details/128356985 目录 一、链表概念 1. 什么是链表&#xff1f; 1.1 链表的构成 2. 链表和数组的区别 数组的特点&#xff1a; 链表的特点&#xff1a; 二者对比&#xff1a; 二…

Spring框架自带的定时任务:Spring Task详解

文章目录 一、基本使用1、配置&#xff1a;EnableScheduling2、触发器&#xff1a;Scheduled 二、拓展1、修改默认的线程池2、springboot配置 三、源码分析参考资料 一、基本使用 1、配置&#xff1a;EnableScheduling import org.springframework.context.annotation.Config…

数据库事务、乐观锁及悲观锁

参考&#xff1a;node支付宝支付及同步、异步通知、主动查询支付宝订单状态 以下容结合上述链接查看 1. 什么是数据库事务&#xff1f; 1.1. 连续执行数据库操作 在支付成功后&#xff0c;我们在自定义的paidSuccess里&#xff0c;依次更新了订单状态和用户信息。也就说这里…

Android 创建一个全局通用的ViewModel

&#xff08;推荐&#xff09;使用ViewModelStore 代码示例&#xff1a; class MyApplication : Application(), ViewModelStoreOwner {private val mViewModelStore ViewModelStore()override fun onCreate() {super.onCreate()}override val viewModelStore: ViewModelSto…

SCI期刊推荐 | 免版面费 | 计算机领域:信息系统、软件工程、自动化和控制

在学术研究领域&#xff0c;选择合适的SCI期刊对科研成果的传播与认可至关重要。了解SCI期刊的研究领域和方向是基础&#xff0c;确保投稿内容与期刊主题相符。同时&#xff0c;要关注期刊的影响因子和评估标准&#xff0c;选择具有较高影响力和学术认可度的期刊。阅读期刊的投…

解锁Android RemoteViews:跨进程UI更新的奥秘

一、RemoteViews 简介 在 Android 开发的广阔领域中&#xff0c;RemoteViews 是一个独特且重要的概念&#xff0c;它为开发者提供了一种在其他进程中显示视图结构的有效方式。从本质上讲&#xff0c;RemoteViews 并非传统意义上在当前应用进程内直接渲染和操作的 View&#xf…

常见webshell工具的流量特征

1、蚁剑 1.1、蚁剑webshell静态特征 蚁剑中php使用assert、eval执行&#xff1b;asp只有eval执行&#xff1b;在jsp使用的是Java类加载&#xff08;ClassLoader&#xff09;&#xff0c;同时会带有base64编码解码等字符特征。 1.2、蚁剑webshell动态特征 查看流量分析会发现…

爬虫系列之【数据解析之bs4】《四》

目录 前言 一、用法详解 1.1 获取标签内容 1.2 获取标签属性 1.3 获取标签包裹的文本内容 1.4 获取标签列表 1.5 css 选择器&#xff1a;select 二、实战案例 完整代码 前言 HTML数据解析 1、正则 2、xpath&#xff08;居多&#xff09; 3、css 选择器&#xff08;bs…

Java-实现PDF合同模板填写内容并导出PDF文件

可用于公司用户合同导出pdf文件 效果图 一、导入所需要jar包 <!--生成PDF--><dependency><groupId>com.itextpdf</groupId><artifactId>itextpdf</artifactId><version>5.5.11</version></dependency><dependency&…

【人工智能】GPT-4 vs DeepSeek-R1:谁主导了2025年的AI技术竞争?

前言 2025年&#xff0c;人工智能技术将迎来更加激烈的竞争。随着OpenAI的GPT-4和中国初创公司DeepSeek的DeepSeek-R1在全球范围内崭露头角&#xff0c;AI技术的竞争格局开始发生变化。这篇文章将详细对比这两款AI模型&#xff0c;从技术背景、应用领域、性能、成本效益等多个方…

前端开发10大框架深度解析

摘要 在现代前端开发中&#xff0c;框架的选择对项目的成功至关重要。本文旨在为开发者提供一份全面的前端框架指南&#xff0c;涵盖 React、Vue.js、Angular、Svelte、Ember.js、Preact、Backbone.js、Next.js、Nuxt.js 和 Gatsby。我们将从 简介、优缺点、适用场景 以及 实际…

【MySQL】索引(页目录、B+树)

文章目录 1. 引入索引2. MySQL与磁盘交互的基本单位3. 索引的理解3.1 页目录3.2 B树 4. 聚簇索引、非聚簇索引5. 索引的操作5.1 索引的创建5.1.1 创建主键索引5.1.2 创建唯一索引5.1.3 普通索引的创建5.1.4 全文索引的创建 5.2 索引的查询5.3 删除索引 1. 引入索引 索引&#…

python-串口助手(OV7670图传)

代码 主python文件 import serial import serial.tools.list_ports import time import tkinter as tk from tkinter import ttk import numpy as np from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg from matplotlib.figure import Figure import threadi…

筑牢网络安全防线:守护您的数据安全

在数字化时代&#xff0c;数据安全已成为企业和个人不容忽视的重要议题。近日印尼国家数据中心遭黑客袭击的事件&#xff0c;不仅扰乱了机场的移民检查&#xff0c;还影响了众多机构的服务运行。黑客利用恶意软件对数据中心进行攻击&#xff0c;索要巨额赎金&#xff0c;给印尼…

Vue 3 整合 WangEditor 富文本编辑器:从基础到高级实践

本文将详细介绍如何在 Vue 3 项目中集成 WangEditor 富文本编辑器&#xff0c;实现图文混排、自定义扩展等高阶功能。 一、为什么选择 WangEditor&#xff1f; 作为国内流行的开源富文本编辑器&#xff0c;WangEditor 具有以下优势&#xff1a; 轻量高效&#xff1a;压缩后仅…

FastGPT 引申:信息抽取到知识图谱的衔接流程

文章目录 信息抽取到知识图谱的衔接流程步骤1&#xff1a;原始信息抽取结果步骤2&#xff1a;数据标准化处理&#xff08;Python示例&#xff09;步骤3&#xff1a;Cypher代码动态生成&#xff08;Python驱动&#xff09; 关键衔接逻辑说明1. 唯一标识符生成规则2. 数据映射策略…

Webshell 入侵与防御全攻略

Webshell&#xff0c;是指攻击者上传到网站的远程控制后门&#xff0c;允许黑客像管理员一样远程控制网站&#xff0c;执行恶意命令&#xff0c;甚至完全接管网站。本文将带你深入了解 Webshell 的入侵方式以及相应的防御措施&#xff0c;帮助你加固自己的网站防线。 什么是 W…

NL2SQL-基于Dify+阿里通义千问大模型,实现自然语音自动生产SQL语句

本文基于Dify阿里通义千问大模型&#xff0c;实现自然语音自动生产SQL语句功能&#xff0c;话不多说直接上效果图 我们可以试着问他几个问题 查询每个部门的员工数量SELECT d.dept_name, COUNT(e.emp_no) AS employee_count FROM employees e JOIN dept_emp de ON e.emp_no d…

双链路提升网络传输的可靠性扩展可用带宽

为了提升网络传输的可靠性或增加网络可用带宽&#xff0c; 通常使用双链路冗余备份或者双链路聚合的方式。 本文介绍几种双链路网络通信的案例。 5GWiFi冗余传输 双Socket绑定不同网络接口&#xff1a;通过Android的ConnectivityManager绑定5G蜂窝网络和WiFi的Socket连接&…

Ubuntu22.04安装Ollama部署DeepSeek-R1:32B模型

一、环境准备 1.硬件要求 GPU: 至少 NVIDIA A30/A100 (显存 ≥ 24GB)内存: ≥ 64GB RAM存储: ≥ 100GB 可用空间 (模型文件约 60GB)2.软件依赖 # 验证NVIDIA驱动 nvidia-smi二、Ollama安装 方法 1:install.sh安装 运行一下安装命令: curl -fsSL https://ollama.com/inst…