Flink(八):DataStream API (五) Join

1. Window Join

Window join 作用在两个流中有相同 key 且处于相同窗口的元素上。这些窗口可以通过 window assigner 定义,并且两个流中的元素都会被用于计算窗口的结果。两个流中的元素在组合之后,会被传递给用户定义的 JoinFunction 或 FlatJoinFunction,用户可以用它们输出符合 join 要求的结果。常见的用例可以总结为以下代码

stream.join(otherStream).where(<KeySelector>).equalTo(<KeySelector>).window(<WindowAssigner>).apply(<JoinFunction>);

语义上有一些值得注意的地方:

  • 从两个流中创建成对的元素与 inner-join 类似,即一个流中的元素在与另一个流中对应的元素完成 join 之前不会被输出。
  • 完成 join 的元素会将他们的 timestamp 设为对应窗口中允许的最大 timestamp。比如一个边界为 [5, 10) 窗口中的元素在 join 之后的 timestamp 为 9。

1.1 滚动 Window Join

使用滚动 window join 时,所有 key 相同且共享一个滚动窗口的元素会被组合成对,并传递给 JoinFunction 或 FlatJoinFunction。因为这个行为与 inner join 类似,所以一个流中的元素如果没有与另一个流中的元素组合起来,它就不会被输出!

如图所示,定义了一个大小为 2 毫秒的滚动窗口,即形成了边界为 [0,1], [2,3], ... 的窗口。图中展示了如何将每个窗口中的元素组合成对,组合的结果将被传递给 JoinFunction。注意,滚动窗口 [6,7] 将不会输出任何数据,因为绿色流当中没有数据可以与橙色流的 ⑥ 和 ⑦ 配对。

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(TumblingEventTimeWindows.of(Time.milliseconds(2))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});

1.2 滑动 Window Join

当使用滑动 window join 时,所有 key 相同且处于同一个滑动窗口的元素将被组合成对,并传递给 JoinFunction 或 FlatJoinFunction。当前滑动窗口内,如果一个流中的元素没有与另一个流中的元素组合起来,它就不会被输出!注意,在某个滑动窗口中被 join 的元素不一定会在其他滑动窗口中被 join。

本例中定义了长度为两毫秒,滑动距离为一毫秒的滑动窗口,生成的窗口实例区间为 [-1, 0],[0,1],[1,2],[2,3], …。 X 轴下方是每个滑动窗口中被 join 后传递给 JoinFunction 的元素。图中可以看到橙色 ② 与绿色 ③ 在窗口 [2,3] 中 join,但没有与窗口 [1,2] 中任何元素 join。

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */)).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});

1.3 会话 Window Join

使用会话 window join 时,所有 key 相同且组合后符合会话要求的元素将被组合成对,并传递给 JoinFunction 或 FlatJoinFunction。这个操作同样是 inner join,所以如果一个会话窗口中只含有某一个流的元素,这个窗口将不会产生输出!

这里我们定义了一个间隔为至少一毫秒的会话窗口。图中总共有三个会话,前两者中两个流都有元素,它们被 join 并传递给 JoinFunction。而第三个会话中,绿流没有任何元素,所以 ⑧ 和 ⑨ 没有被 join!

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(EventTimeSessionWindows.withGap(Time.milliseconds(1))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});

2. Interval Join

Interval join 组合元素的条件为:两个流(我们暂时称为 A 和 B)中 key 相同且 B 中元素的 timestamp 处于 A 中元素 timestamp 的一定范围内。这个条件可以更加正式地表示为 b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound] 或 a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

这里的 a 和 b 为 A 和 B 中共享相同 key 的元素。上界和下界可正可负,只要下界永远小于等于上界即可。 Interval join 目前仅执行 inner join。当一对元素被传递给 ProcessJoinFunction,他们的 timestamp 会从两个元素的 timestamp 中取最大值 (timestamp 可以通过 ProcessJoinFunction.Context 访问)。Interval join 目前仅支持 event time。

上例中,我们 join 了橙色和绿色两个流,join 的条件是:以 -2 毫秒为下界、+1 毫秒为上界。 默认情况下,上下界也被包括在区间内,但 .lowerBoundExclusive() 和 .upperBoundExclusive() 可以将它们排除在外。

图中三角形所表示的条件也可以写成更加正式的表达式:orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.keyBy(<KeySelector>).intervalJoin(greenStream.keyBy(<KeySelector>)).between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String>(){@Overridepublic void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {out.collect(left + "," + right);}});

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/68337.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Py之cv2:cv2(OpenCV,opencv-python)库的简介、安装、使用方法(常见函数、图像基本运算等)

1. OpenCV简介 1.1 OpenCV定义与功能 OpenCV&#xff08;Open Source Computer Vision Library&#xff09;是一个开源的计算机视觉和机器学习软件库。它为计算机视觉应用程序提供了一个通用的基础设施&#xff0c;并加速了在商业产品中使用机器感知。作为BSD许可的产品&…

JVM 触发类加载的条件有哪些?

目录 一、类加载生命周期 二、主动引用 2.1、创建类的实例 2.2、访问类的静态字段或静态方法 2.3、反射 2.4、初始化类的子类时&#xff0c;先初始化父类 2.5、虚拟机启动时&#xff0c;初始化 main 方法所在的类 2.6、动态语言支持 三、被动引用 3.1、通过子类引用父…

ElasticSearch-Nested 类型与 Object 类型的区别

在 Elasticsearch 中&#xff0c;nested 类型和 object 类型都用于处理嵌套的 JSON 数据&#xff0c;但它们在存储和查询方面有着显著的区别。本文将详细解释这两种类型的区别&#xff0c;并提供具体的示例。 一、基本概念 1. object 类型 定义&#xff1a;object 类型是 Elas…

推荐sdkman管理sdk和jdk

使用SDKMAN安装JDK通常是免费的。 SDKMAN是一个开源的命令行工具&#xff0c;用于管理和切换多个版本的软件开发工具包&#xff08;SDKs&#xff09;&#xff0c;包括JDK。它支持多种JVM相关工具&#xff0c;如Java、Scala、Groovy、Maven、Gradle等。 安装SDKMAN 首先&…

Flink CDC 在阿里云实时计算Flink版的云上实践

摘要&#xff1a;本文整理自阿里云高级开发工程师&#xff0c;Apache Flink Committer 阮航老师在 Flink Forward Asia 2024 生产实践&#xff08;三&#xff09;专场中的分享&#xff0c;主要分为以下四个方面&#xff1a; Flink CDC & 实时计算 Flink CDC YAML 核心功能…

如何使用wireshark 解密TLS-SSL报文

目录 前言 原理 操作 前言 现在网站都是https 或者 很多站点都支持 http2。这些站点为了保证数据的安全都通过TLS/SSL 加密过&#xff0c;用wireshark 并不能很好的去解析报文&#xff0c;我们就需要用wireshark去解密这些报文。我主要讲解下mac 在 chrome 怎么配置的&…

【大模型系列篇】数字人音唇同步模型——腾讯开源MuseTalk

之前有一期我们体验了阿里开源的半身数字人项目EchoMimicV2&#xff0c;感兴趣的小伙伴可跳转至《AI半身数字人开箱体验——开源项目EchoMimicV2》&#xff0c;今天带大家来体验腾讯开源的数字人音唇同步模型MuseTalk。 MuseTalk 是一个实时高品质音频驱动的唇形同步模型&#…

C++基础入门(二)

目录 前言 一、重载 1.函数重载 2.运算符重载 二、构造函数 1.什么是构造函数 2.带参数的构造函数 3.使用初始化列表 4.this关键字 5.new关键字 三、析构函数 1.什么是析构函数 四、静态成员变量 1.静态成员的定义 2.静态成员变量的作用 五、继承 1.继承基本概…

Spring boot框架下的RocketMQ消息中间件

1. RocketMQ 基础概念 1.1 核心概念 以下是 RocketMQ 核心概念在 Spring Boot 的 Java 后端代码中的实际使用方式&#xff1a; Producer&#xff08;生产者&#xff09; 定义&#xff1a;Producer 是负责发送消息到 RocketMQ 的组件。它可以将消息发送到指定的 Topic。 实…

基础vue3前端登陆注册界面以及主页面设计

1.下载依赖 "element-plus/icons": "^0.0.11", "element-plus/icons-vue": "^2.3.1", "fortawesome/fontawesome-svg-core": "^6.7.2", "fortawesome/free-solid-svg-icons": "^6.7.2", &quo…

二分查找题目:在线选举

文章目录 题目标题和出处难度题目描述要求示例数据范围 解法思路和算法代码复杂度分析 题目 标题和出处 标题&#xff1a;在线选举 出处&#xff1a;911. 在线选举 难度 7 级 题目描述 要求 给定两个整数数组 persons \texttt{persons} persons 和 times \texttt{tim…

Mybatis-Plus:乐观锁与悲观锁

文章目录 一、场景二、乐观锁与悲观锁三、模拟修改冲突3.1 数据库中增加商品表3.2 添加数据3.3 添加实体3.4 添加mapper3.5 测试 四、乐观锁实现流程4.1 Mybatis-Plus实现乐观锁 一、场景 一件商品&#xff0c;成本价是80元&#xff0c;售价是100元。老板先是通知小李&#xf…

卷积神经网络——食物分类

整体框架&#xff1a; 导入库 导入了各种必需的Python库&#xff0c;用于数据处理、图像读取、模型构建和训练。 设置随机种子 seed_everything: 用于设置所有随机数生成器的种子&#xff0c;确保每次运行时的结果都是相同的。 图像预处理&#xff08;transform&#xff09; 对…

Jmeter配置服务代理器 Proxy(二)

1.创建脚本记录器 2.配置&#xff1a;Jmeter代理、端口、记录目标等 3.配置谷歌浏览器代理 浏览器配置代理的详细教程可参考&#xff1a;使用whistle代理-CSDN博客 4.启动Jmeter记录器 点击ok后弹出这个界面&#xff0c;生成了证书&#xff1a; 5.给浏览器安装Jmeter代理的证书…

灰色预测and BP神经网络 (详细上手使用)

灰色预测模型 基础知识&#xff1a; 白色系统&#xff1a;系统的信息是完全明确的。 灰色系统&#xff1a;系统的部分信息已知&#xff0c;部分信息未知。 黑色系统&#xff1a;系统的内部信息是未知的。 灰色预测是对既含有已知信息又含有不确定信息的系统进行预则&#xf…

mac 安装 node

brew versions node // 安装 node brew versions node14 // 安装指定版本 卸载node: sudo npm uninstall npm -g sudo rm -rf /usr/local/lib/node /usr/local/lib/node_modules /var/db/receipts/org.nodejs.* sudo rm -rf /usr/local/include/node /Users/$USER/.npm su…

(处理 Kafka 消息积压) - 高吞吐 + 零丢失的阻塞队列实战方案

一、分布式日志消费场景与挑战 在分布式日志系统中&#xff0c;Kafka 通常作为消息队列中间件&#xff0c;负责从日志生产者接收日志&#xff0c;并将其分发给日志消费者进行处理。为了平衡 Kafka 消费速度与日志处理速度&#xff0c;BlockingQueue 常被用作缓冲区&#xff0c…

【Unity】unity3D 调用LoadSceneAsync 场景切换后比较暗 部门材质丢失

解决方法&#xff1a;两个场景使用同样灯光 现象 直接进入第二个场景是可以正常显示 调用LoadSceneAsync来切换后&#xff0c;第二个场景出现比较暗的情况 解决方法&#xff1a;两个场景使用同样灯光&#xff0c;在loading 的场景中加入灯光。 Light—Directional Light 如果…

红日-VulnStack靶场一

http://vulnstack.qiyuanxuetang.net/vuln/ 一、环境部署 win7(被攻击机/关火墙) web服务器 1张外网网卡(桥接192.168.1.105)&#xff0c;一张内网网卡192.168.52.143/255.255.255.0/192.168.52.2 DNS 192.168.52.138 winser2008 域控服务器 1张…

【单片机通过蜂鸣器模拟警号 救护车 警车 等声音 】

单片机通过蜂鸣器模拟警号 救护车 警车 等声音 模拟原理实现代码 模拟原理 该函数利用定时器中断&#xff0c;通过改变 u16Compare 的值&#xff0c;并使用 Adt_SetPeriodBuf 和 Adt_SetCompareValue 函数调整定时器的周期和比较值&#xff0c;产生不同类型的声音。 SoundType…