16、Flink 的广播状态 (Broadcast State) 示例

1、Broadcast State 案例

规则流:1,a,b [规则名1 规则为 a 或 b]
图形流:green,a [绿色 a]

问题:如果规则流先于数据流则匹配不上=>此时缓冲数据流中的数据【如果规则流为null】

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;import java.util.*;public class _06_BroadcastState {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 接收广播的规则数据SingleOutputStreamOperator<_06_Rule> ruleStream = env.socketTextStream("localhost", 8888).map(new MapFunction<String, _06_Rule>() {@Overridepublic _06_Rule map(String value) throws Exception {String[] fields = value.split(",");return new _06_Rule(fields[0], new Tuple2<>(fields[1], fields[2]));}});// 一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构MapStateDescriptor<String, _06_Rule> ruleStateDescriptor = new MapStateDescriptor<>("RulesBroadcastState",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint<_06_Rule>() {}));// 广播流,广播规则并且创建 broadcast stateBroadcastStream<_06_Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);// 接收图形数据SingleOutputStreamOperator<_06_Shape> shapeStream = env.socketTextStream("localhost", 9999).map(new MapFunction<String, _06_Shape>() {@Overridepublic _06_Shape map(String value) throws Exception {String[] fields = value.split(",");return new _06_Shape(fields[0], fields[1]);}});shapeStream.keyBy(_06_Shape::getColour).connect(ruleBroadcastStream).process(new KeyedBroadcastProcessFunction<String, _06_Shape, _06_Rule, String>() {
//                    private transient ValueState<List<_06_Shape>> dataBuffer;private transient ListState<_06_Shape> dataBuffer;@Overridepublic void open(Configuration parameters) throws Exception {
//                        ValueStateDescriptor<List<_06_Shape>> dataListStateDescriptor = new ValueStateDescriptor<>("dataBuffer", TypeInformation.of(new TypeHint<List<_06_Shape>>() {
//                        }));
//
//                        dataBuffer = getRuntimeContext().getState(dataListStateDescriptor);ListStateDescriptor<_06_Shape> listStateDescriptor = new ListStateDescriptor<>("dataBuffer", _06_Shape.class);dataBuffer = getRuntimeContext().getListState(listStateDescriptor);}@Overridepublic void processElement(_06_Shape value, KeyedBroadcastProcessFunction<String, _06_Shape, _06_Rule, String>.ReadOnlyContext ctx, Collector<String> out) throws Exception {// 获取广播的规则数据System.out.println("输入的数据颜色为=>" + value.getColour() + ",类型为=>" + value.getType());ReadOnlyBroadcastState<String, _06_Rule> broadcastState = ctx.getBroadcastState(ruleStateDescriptor);Iterator<Map.Entry<String, _06_Rule>> iterator = broadcastState.immutableEntries().iterator();if (iterator.hasNext()) {// 使用 ValueState// 先从缓存中读取数据进行匹配
//                            List<_06_Shape> shapeList = dataBuffer.value();// 多并行度时,防止某个并行度无数据导致报错
//                            if (shapeList != null) {
//                                if (!shapeList.isEmpty()) {
//                                    for (_06_Shape shape : shapeList) {
//                                        System.out.println("被缓冲的数据开始进行处理=>" + shape);
//                                        // 从事件数据中继续匹配
//                                        while (iterator.hasNext()) {
//                                            Map.Entry<String, _06_Rule> rule = iterator.next();
//                                            if (Objects.equals(rule.getValue().getRule().f0, shape.getType()) || Objects.equals(rule.getValue().getRule().f1, shape.getType())) {
//                                                out.collect("匹配上的数据为=>" + value + "匹配上的规则名称为=>" + rule.getValue().getRuleName());
//                                            }
//                                        }
//                                    }
//
//                                    shapeList.clear();
//                                }
//                            }// 使用 ListStateIterator<_06_Shape> dataIterator = dataBuffer.get().iterator();while (dataIterator.hasNext()){_06_Shape shape = dataIterator.next();System.out.println("被缓冲的数据开始进行处理=>" + shape);while (iterator.hasNext()) {Map.Entry<String, _06_Rule> rule = iterator.next();if (Objects.equals(rule.getValue().getRule().f0, value.getType()) || Objects.equals(rule.getValue().getRule().f1, value.getType())) {out.collect("匹配上的数据为=>" + value + "匹配上的规则名称为=>" + rule.getValue().getRuleName());}}dataIterator.remove();}// 从事件数据中继续匹配while (iterator.hasNext()) {Map.Entry<String, _06_Rule> rule = iterator.next();if (Objects.equals(rule.getValue().getRule().f0, value.getType()) || Objects.equals(rule.getValue().getRule().f1, value.getType())) {out.collect("匹配上的数据为=>" + value + "匹配上的规则名称为=>" + rule.getValue().getRuleName());}}} else {System.out.println("此时规则流中无规则,先缓冲数据流");// 使用 listStatedataBuffer.add(value);// 使用 valueState
//                            List<_06_Shape> shapeList = dataBuffer.value();
//                            if (shapeList == null) {
//                                shapeList = new ArrayList<>();
//                            }
//                            shapeList.add(value);
//                            dataBuffer.update(shapeList);}}@Overridepublic void processBroadcastElement(_06_Rule value, KeyedBroadcastProcessFunction<String, _06_Shape, _06_Rule, String>.Context ctx, Collector<String> out) throws Exception {// 获取广播流输入的数据,存入广播状态System.out.println("输入的规则名称为=>" + value.getRuleName() + ",规则为=>" + value.getRule());BroadcastState<String, _06_Rule> broadcastState = ctx.getBroadcastState(ruleStateDescriptor);broadcastState.put(value.getRuleName(), value);}}).print();env.execute();}
}

2、Pojo 类

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.java.tuple.Tuple2;import java.io.Serializable;@NoArgsConstructor
@AllArgsConstructor
@Data
public class _06_Rule implements Serializable {private String ruleName;private Tuple2<String,String> rule;
}
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;@AllArgsConstructor
@NoArgsConstructor
@Data
public class _06_Shape implements Serializable {private String colour;private String type;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/6960.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

redis-内存回收机制

在 Redis 的源码中&#xff0c;redisDb 结构体用于表示一个 Redis 数据库实例。结构体大致如下 typedef struct redisDb {dict *dict; // 存储键值对的字典dict *expires; // 存储键的过期时间的字典dict *blocking_keys; // 阻塞键的字典…

子比主题小黑屋列表

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、pandas是什么?二、使用步骤1.引入库前言 提示:这里可以添加本文要记录的大概内容: 例如:随着人工智能的不断发展,机器学习这门技术也越来越重要,很多人都开启了学习机器学习,本文…

五一假期后,必读的10篇大模型论文

1.同时预测多个 token&#xff1a;更好更快的大型语言模型 目前&#xff0c;GPT 和 Llama 等大型语言模型&#xff08;LLMs&#xff09;都是通过下一个 token 预测损失来训练的。 在这项工作中&#xff0c;来自 Meta FAIR 的研究团队认为&#xff0c;训练语言模型同时预测多个…

用 Go map 要注意这个细节,避免依赖他!

有的小伙伴没留意过 Go map 输出、遍历顺序&#xff0c;以为它是稳定的有序的&#xff0c;会在业务程序中直接依赖这个结果集顺序&#xff0c;结果栽了个大跟头&#xff0c;吃了线上 BUG。 有的小伙伴知道是无序的&#xff0c;但却不知道为什么,有的却理解错误&#xff1f; 今…

PADS 规则设置-导线不跟随器件-导线允许回路

1、PADS Layout中设置拖动器件时导线不跟着移动 2、PADS Router中设置走线允许回路

【隧道篇 / WAN优化】(7.4) ❀ 01. 启动WAN优化 ❀ FortiGate 防火墙

【简介】几乎所有的人都知道&#xff0c;防火墙自带的硬盘是用来保存日志&#xff0c;以方便在出现问题时能找到原因。但是很少的人知道&#xff0c;防火墙自带的硬盘其实还有另一个功能&#xff0c;那就是用于WAN优化。 防火墙自带的硬盘 在FortiGate防火墙A、B、C、D系列&…

【备战软考(嵌入式系统设计师)】04-嵌入式软件架构

嵌入式操作系统 嵌入式系统有以下特点&#xff1a; 要求编码体积小&#xff0c;能够在有限的存储空间内运行。 面向应用&#xff0c;可以进行裁剪和移植。 用于特定领域&#xff0c;可以支持多任务。 可靠性高&#xff0c;及时响应&#xff0c;无需人工干预独立运行。 实…

软件全套资料整理包获取-软件各阶段支撑文档

软件全套精华资料包清单部分文件列表&#xff1a; 工作安排任务书&#xff0c;可行性分析报告&#xff0c;立项申请审批表&#xff0c;产品需求规格说明书&#xff0c;需求调研计划&#xff0c;用户需求调查单&#xff0c;用户需求说明书&#xff0c;概要设计说明书&#xff0c…

动手写一个简单的Android 表格控件支持固定列

Android 动手写一个简洁版表格控件 简介 源码已放到 Github Gitee 作为在测绘地理信息行业中穿梭的打工人&#xff0c;遇到各种数据采集需求&#xff0c;既然有数据采集需求&#xff0c;那当然少不了数据展示功能&#xff0c;最常见的如表格方式展示。 当然&#xff0c;类似…

大模型时序预测初步调研20240506

AI预测相关目录 AI预测流程&#xff0c;包括ETL、算法策略、算法模型、模型评估、可视化等相关内容 最好有基础的python算法预测经验 EEMD策略及踩坑VMD-CNN-LSTM时序预测对双向LSTM等模型添加自注意力机制K折叠交叉验证optuna超参数优化框架多任务学习-模型融合策略Transform…

MySQL —— 表的基本操作

一、创建 1.语法 create table 表名称( 自定义变量1, 自定义变量2, 自定义变量3&#xff08;最后一个变量末尾不需要加任何标点符号&#xff09; )charset字符集 collate校验集 engine存储引擎; ps&#xff1a;若是不具体给字符集、校验集、储存引擎&#xff0c;则采用配置文件…

Prop 校验

Vue 组件可以更细致地声明对传入的 props 的校验要求。比如我们上面已经看到过的类型声明&#xff0c;如果传入的值不满足类型要求&#xff0c;Vue 会在浏览器控制台中抛出警告来提醒使用者。这在开发给其他开发者使用的组件时非常有用。 要声明对 props 的校验&#xff0c;你…

C#面:当线程进入一个对象的一个synchronized方法后,其它线程是否可进入此对象的其它方法

不能&#xff0c;一个对象的一个synchronized方法只能由一个线程访问 当一个线程进入一个对象的synchronized方法后&#xff0c;其他线程是无法进入该对象的其他synchronized方法的。这是因为synchronized关键字可以确保同一时间只有一个线程可以进入被标记为synchronized的方…

Python氮氧甲烷乙烷乙烯丙烯气体和固体热力学模型计算

&#x1f3af;要点 &#x1f3af;固体和粒子&#xff1a;计算二态系统、简谐振子和爱因斯坦固体的内能和比热&#xff0c;比较爱因斯坦固体和德拜固体。模拟多个粒子的一维和二维随机游走&#xff0c;在数值上确认方差的线性趋势&#xff0c;模拟多个粒子的梯度下降&#xff0…

[Unity]备份许可文件

原因&#xff1a;因Unity需要重新恢复出厂设置&#xff0c;所以需要先将许可文件本分到本地&#xff0c;以便重新初始化后输入许可。 1&#xff09;登录管理网络web界面&#xff0c;在服务页面打开SSH 2&#xff09;ssh到管理网络&#xff0c;使用service用户登录 3&#xf…

『跨端框架』Flutter环境搭建

『跨端框架』Flutter环境搭建 资源网站简介跨平台高性能发展历程跨平台框架的比较成功案例 环境搭建&#xff08;windows&#xff09;基础环境搭建Windows下的安卓环境搭建Mac下的安卓环境配置资源镜像JDKAndroid StudioFlutter SDK问题一问题二问题三修改项目中的Flutter版本 …

厂家自定义 Android Ant编译流程源码分析

0、Ant安装 Windows下安装Ant&#xff1a; ant 官网可下载 http://ant.apache.org ant 环境配置&#xff1a; 解压ant的包到本地目录。 在环境变量中设置ANT_HOME&#xff0c;值为你的安装目录。 把ANT_HOME/bin加到你系统环境的path。 Ubuntu下安装Ant&#xff1a; sudo apt…

visio studio 中.NET Core(.net8.0)框架和.net framewok 框架有什么区别?

更新vs到2022版本后&#xff0c;新建项目时就多出不少选项&#xff0c;这里来个大家分享下.NET Core&#xff08;.net8.0&#xff09;框架和.net framewok的区别 如下图&#xff0c;不带后缀的就是使用.NET Core框架&#xff0c;后续选项是.net8.0。 .net framewok框架选项&am…

从0到1:商场导览小程序开发笔记一

背景 购物中心与商场小程序&#xff1a;旨在提供便捷的购物、导航、活动报名、服务查询等功能&#xff0c;让用户更好地体验购物和享受服务。通过提供便捷的购物、信息查询和互动预约等功能&#xff0c;提升了商场的服务水平和用户体验&#xff0c;帮助商场与消费者建立更紧密…

YOLOv5入门(四)训练自己的目标检测模型

前言 通过前面几篇文章&#xff0c;已经完成数据集制作和环境配置&#xff08;服务器&#xff09;&#xff0c;接下来将继续实践如何开始训练自己数据集~ 往期回顾 YOLOv5入门&#xff08;一&#xff09;利用Labelimg标注自己数据集 YOLOv5入门&#xff08;二&#xff09;处…