flink广播算子Broadcast

文章目录

  • 一、Broadcast
  • 二、代码示例
  • 三.或者第二种(只读取一个csv文件到广播内存中)


提示:以下是本篇文章正文内容,下面案例可供参考

一、Broadcast

为了关联一个非广播流(keyed 或者 non-keyed)与一个广播流(BroadcastStream),我们可以调用非广播流的方法 connect(),并将 BroadcastStream 当做参数传入。 这个方法的返回参数是 BroadcastConnectedStream,具有类型方法 process(),传入一个特殊的 CoProcessFunction 来书写我们的模式识别逻辑。 具体传入 process() 的是哪个类型取决于非广播流的类型:

  • 如果流是一个 keyed 流,那就是 KeyedBroadcastProcessFunction 类型;
  • 如果流是一个 non-keyed 流,那就是 BroadcastProcessFunction 类型。

1).例如非keyby的要实现两个方法

public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {//主流 public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;//广播操作public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}

2).keyby的

public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {//主流public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;//广播public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;//只有keyby的可以onTimer。此方法可以不重写public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
}

在处理广播流元素这端,是具有读写权限的,而对于处理非广播流元素这端是只读的。 这样做的原因是,Flink 中是不存在跨 task 通讯的。所以为了保证 broadcast state 在所有的并发实例中是一致的,我们在处理广播流元素的时候给予写权限,在所有的 task 中均可以看到这些元素,并且要求对这些元素处理是一致的, 那么最终所有 task 得到的 broadcast state 是一致的。
广播算子是不使用 RocksDB state backend: broadcast state 在运行时保存在内存中,需要保证内存充足。这一特性同样适用于所有其他 Operator State。

二、代码示例

此处将本地csv文件加载到内存广播中
CSV文件的内容是:
1.user_details.csv

1,Alice,30
2,Bob,25

2.user_details03.csv

3,Charlie,35
5,name,5

下面是代码(下面是将两个本地CSV文件放到广播内存中案例)

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;import java.util.HashMap;
import java.util.Map;
public <

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/75835.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis 和 MySQL双写一致性的更新策略有哪些?常见面试题深度解答。

目录 一. 业务数据查询&#xff0c;更新顺序简要分析 二. 更新数据库、查询数据库、更新缓存、查询缓存耗时对比 2.1 更新数据库&#xff08;最慢&#xff09; 2.2 查询数据库&#xff08;较慢&#xff09; 2.3 更新缓存&#xff08;次快&#xff09; 2.4 查询缓存&#…

SRT协议

SRT&#xff08;Secure Reliable Transport&#xff09;是一种开源的视频传输协议&#xff0c;专为高丢包、高延迟网络环境设计&#xff0c;结合了UDP的低延迟和TCP的可靠性&#xff0c;广泛应用于直播、远程制作、视频会议等场景。 定位&#xff1a;SRT协议的官方C/C实现库&am…

“征服HTML引号恶魔:“完全解析手册”!!!(quot;表示双引号)

&#x1f6a8;&#x1f4e2; "征服HTML引号恶魔&#xff1a;“完全解析手册” &#x1f4e2;&#x1f6a8; &#x1f3af; 博客引言&#xff1a;当引号变成"恶魔" &#x1f631; 是否遇到过这种情况&#xff1a; 写HTML时满心欢喜输入<div title"他…

npm install 卡在创建项目:sill idealTree buildDeps

参考&#xff1a; https://blog.csdn.net/PengXing_Huang/article/details/136460133 或者再执行 npm install -g cnpm --registryhttps://registry.npm.taobao.org 或者换梯子

c++中cpp文件从编译到执行的过程

C 文件从编写到执行的过程可以分为几个主要阶段&#xff1a;编写代码、预处理、编译、汇编、链接和运行。以下是每个阶段的详细说明&#xff1a; 1. 编写代码 这是整个过程的起点。程序员使用文本编辑器&#xff08;如 VSCode、Sublime Text 或其他 IDE&#xff09;编写 C 源…

PROE 与 STL 格式转换:开启 3D 打印及多元应用的大门

在 3D 设计与制造的复杂生态中&#xff0c;将 PROE 格式转换为 STL 格式绝非无端之举&#xff0c;而是有着深厚且多元的现实需求作为支撑。 一、文件格式介绍​ &#xff08;一&#xff09;PROE 格式​ PROE 作为一款参数化设计软件&#xff0c;采用基于特征的参数化建模技术…

开发中后端返回下划线数据,要不要统一转驼峰?

先说结论。看情况&#xff01;&#xff01;&#xff01;&#xff01; 前端 主要用 JS/TS 建议后端返回 camelCase&#xff0c;减少前端转换成本。后端 主要是 Python/Go 建议保持 snake_case&#xff0c;前端做转换。但是团队统一风格最重要&#xff01;如果统一返回驼峰就驼峰…

docker pull时报错:https://registry-1.docker.io/v2/

原文&#xff1a;https://www.cnblogs.com/sdgtxuyong/p/18647915 https://www.cnblogs.com/OneSeting/p/18532166 docker 换源&#xff0c;解决连接不上的问题。 编辑以下文件&#xff0c;不存在则创建&#xff1a; vim /etc/docker/daemon.json {"registry-mirrors&qu…

Pytorch学习笔记(十二)Learning PyTorch - NLP from Scratch

这篇博客瞄准的是 pytorch 官方教程中 Learning PyTorch 章节的 NLP from Scratch 部分。 官网链接&#xff1a;https://pytorch.org/tutorials/intermediate/nlp_from_scratch_index.html 完整网盘链接: https://pan.baidu.com/s/1L9PVZ-KRDGVER-AJnXOvlQ?pwdaa2m 提取码: …

基础算法02——冒泡排序(Bubble Sort)

冒泡排序&#xff08;Bubble Sort&#xff09; 冒泡排序&#xff1a;是一种简单的排序算法&#xff0c;其基本思想是通过重复遍历要排序的列表&#xff0c;比较相邻的元素&#xff0c;并在必要时&#xff08;即前面的数比后面的数大的时候&#xff09;交换它们的位置&#xff…

RestTemplate远程调用接口方式

1.Post(body空参) 也就是说需要给一个空的json 代码: String getDeviceUrl this.MOVABLE_URL "detected-data/getMachineLists"; // 远程调用 RestTemplate restTemplate new RestTemplate(); restTemplate.getMessageConverters().set(1,new StringHttpMessageC…

ar头显和眼镜图像特效处理

使用一个线程从摄像头或者其他设备循环读取图像数据写入链表&#xff0c;另一个线程从链表循环读取数据并做相应的特效处理&#xff0c;由于写入的速度比读取的快&#xff0c;最终必然会因为写入过快导致线程读写一帧而引发冲突和数据帧正常数据帧被覆盖。最好使用共享内存&…

mysql--socket报错

错误原因分析 MySQL 服务未运行&#xff08;最常见原因&#xff09; 错误中的 (2) 表示 “No such file or directory”&#xff0c;即 /tmp/mysql.sock 不存在这通常意味着 MySQL 服务器根本没有启动 socket 文件路径不匹配 客户端尝试连接 /tmp/mysql.sock但 MySQL 服务器可…

labview加载matlab数据时报错提示:对象引用句柄无效。

1. labview报错提示 labview加载mat数据时报错提示&#xff1a;对象引用句柄无效。返回该引用句柄的节点可能遇到错误&#xff0c;并没有返回有效的引用句柄。该引用句柄所指的存储可能在执行调用之前已关闭。报错提示如下&#xff1a; 这是由于labview缺少matlab MathWorks导…

面试计算机操作系统解析(一中)

判断 1. 一般来说&#xff0c;先进先出页面置换算法比最近最少使用页面置换算法有较少的缺页率。&#xff08;✘&#xff09; 正确答案&#xff1a;错误解释&#xff1a;FIFO&#xff08;先进先出&#xff09;页面置换算法可能导致“Belady异常”&#xff0c;即页面数增加反而…

如何防御TCP洪泛攻击

TCP洪泛攻击&#xff08;TCP Flood Attack&#xff09;是一种常见的分布式拒绝服务&#xff08;DDoS&#xff09;攻击手段&#xff0c;以下是其原理、攻击方式和危害的详细介绍&#xff1a; 定义与原理 TCP洪泛攻击利用了TCP协议的三次握手过程。在正常的TCP连接建立过程中&a…

20250330 Pyflink with Paimon

1. 数据湖 2. 本地安装Pyflink和Paimon 必须安装Python 3.11 Pip install python -m pip install apache-flink1.20.1 需要手动加入这两个jar 测试代码&#xff1a; import argparse import logging import sys import timefrom pyflink.common import Row from pyflink.tab…

-PHP 应用SQL 盲注布尔回显延时判断报错处理增删改查方式

#PHP-MYSQL-SQL 操作 - 增删改查 1 、功能&#xff1a;数据查询(对数据感兴趣&#xff09; 查询&#xff1a; SELECT * FROM news where id$id 2 、功能&#xff1a;新增用户&#xff0c;添加新闻等&#xff08;对操作的结果感兴趣&#xff09; 增加&#xff1a; INSERT INT…

【学习记录】大模型微调之使用 LLaMA-Factory 微调 Qwen系列大模型,可以用自己的数据训练

一、LoRA微调的基本原理 1、基本概念 LoRA&#xff08;Low-Rank Adaptation&#xff09;是一种用于大模型微调的技术&#xff0c;通过引入低秩矩阵来减少微调时的参数量。在预训练的模型中&#xff0c;LoRA通过添加两个小矩阵B和A来近似原始的大矩阵ΔW&#xff0c;从而减少需…

Vue 使用 xlsx 插件导出 excel 文件

安装与引入 安装 npm install xlsx npm install file-saver # 或者 yarn add xlsx yarn add file-saver 引入 import * as XLSX from xlsx; import FileSaver from file-saver 基本功能 读取 Excel 文件 // 读取文件内容 const workbook XLSX.readFile(path/to/file.xl…