Flink双流(join)

 一、介绍

Join大体分类只有两种:Window Join和Interval Join

Window Join有可以根据Window的类型细分出3种:Tumbling(滚动) Window Join、Sliding(滑动) Window Join、Session(会话) Widnow Join。

        🌸Window 类型的join都是利用window的机制,先将数据缓存在Window State中,当窗口触发计算时,执行join操作。

        🌸Interval join也是利用state存储数据再处理,区别在于state中的数据有失效机制,依靠数据触发数据清理,目前Stream join的结果是数据的卡尔积。

二、Window Join

✨Tumbling Window Join

        执行翻滚窗口联接时,具有公共键和公告翻滚窗口的所有元素将成对组合联接,并传递JoinFunction或FlatJoinFunction。因为它的行为类似于内部连接,所以一个流中的元素在其滚动窗口中没有来自另一个流的元素,因此不会被发射。

        如图所示,我们定义了一个为2毫秒的翻滚窗口,结果窗口的形式为[0,1]、[2,3]..............该图显示了每个窗口中所以元素的成对组合,这些元素将传递给JoinFunction。注意在翻滚窗口[6,7]中没有发射任何东西,因为绿色流中不存在与橙色元素⑥和⑦结合的元素。

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(TumblingEventTimeWindows.of(Time.milliseconds(2))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});

✨Sliding Window Join

        在执行滑动窗口联接时,具有公共键和公共滑动窗口的所以元素将作为成对组合联接,并传递JoinFunction或FlatJoinFunction。在当前滑动窗口中,一个流的元素没有来自另一个流的元素,则不会发射!请注意,某些元素可能会联接到一个滑动窗口中,但不会联接到另一个滑动窗口中!

        在本例中,我们使用大小为2毫秒的滑动窗口,并将其滑动1毫秒,从而产生滑动窗口[-1,0],[1,2],[2,3]...........x轴下方的连续元素时传递给每个滑动窗口的Join Function的元素。在这里,你还可以看到,例如在窗口[2,3]中,橙色②和绿色③连接,但在窗口[1,2]中没有与任何对象连接。

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */)).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});

✨Session Window Join

        在执行会话窗口联接时,具有相同键(当“组合”满足会话条件)的所有元素以成对组合方式联接,并传递给JoinFunction或FlatJoinFunction。同样,这执行一个内部连接,所以如果有一个会话窗口只包含来自一个流的元素,则不会发出任何输出

        这里,我们定义一个会话窗口连接,其中每个会话被至少1毫秒的时间分割。有三个会话,在前两个会话中,来自两个流的连接元素被传递给JoinFunction。在第三个会话中,绿色流中没有元素,所以⑧和⑨没有连接!

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(EventTimeSessionWindows.withGap(Time.milliseconds(1))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});

三、Interval Join

        前面学习的Window Join必须要在一个Window中进行Join,那如果没有Window如何处理呢?interval join也是使用相同的key来join两个流(流A、流B),并且流B中的元素中的时间戳,和流A元素的时间戳,有一个时间间隔。

b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound] or

a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

也就是:流B的元素的时间戳 ≥ 流A的元素时间戳 + 下界,且,流B的元素的时间戳 ≤ 流A的元素时间戳

 

在上面的示例中,我们将两个流“orange”和“green”连接起来,其下限为-2毫秒,上限为+1毫秒。默认情况下,这些边界是包含的,但是可以应用.lowerBoundExclusive()和.upperBoundExclusive来更改行为orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound 

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.keyBy(<KeySelector>).intervalJoin(greenStream.keyBy(<KeySelector>)).between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String(){@Overridepublic void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {out.collect(first + "," + second);}});

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/696876.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【OpenFeign常用配置】

OpenFeign常用配置 快速入门&#xff1a;1、引入依赖2、启用OpenFeign 实践1、引入依赖2、开启连接池功能3、模块划分4、日志5、重试 快速入门&#xff1a; OpenFeign是一个声明式的http客户端&#xff0c;是spring cloud在eureka公司开源的feign基础上改造而来。其作用及时基于…

C++ template-2

第 5 章 基础技巧 5.1 typename 关键字 关键字typename在C标准化过程中被引入进来&#xff0c;用来澄清模板内部的一个标识符代表的 是某种类型&#xff0c;而不是数据成员。考虑下面这个例子&#xff1a; template<typename T> class MyClass { public:void foo() {t…

【代码随想录算法训练营Day09】28.实现 strStr(); 459.重复的子字符串

文章目录 Day 9 第四章 字符串part0228. 实现 strStr() &#xff08;本题可以跳过&#xff09;KMP 思路KMP 代码 459.重复的子字符串 &#xff08;本题可以跳过&#xff09;字符串总结双指针回顾 Day 9 第四章 字符串part02 今日任务 28.实现 strStr(); 459.重复的子字符串; 字…

题目:C++快速找到未知长度单链表的中间节点。普通方法和高级方法2种解题思路解析。

在数据结构的面试中&#xff0c;经常会出现这样的问题&#xff1a;如何快速找到未知长度单链表的中间节点&#xff1f;通常&#xff0c;面试官会期待你提供两种解法&#xff1a;一种是最基本的普通方法&#xff0c;另一种是更高效的 advanced 方法。本文将详细介绍这两种方法。…

Nginx -2

接着上文写 5.4.7 验证模块 需要输入用户名和密码 模块名称&#xff1a;ngx_http_auth_basic_module 访问控制基于模块 ngx_http_auth_basic_module 实现&#xff0c;可以通过匹配客户端资源进行限制 语法&#xff1a; Syntax: auth_basic string | off; Default: auth_ba…

威尔金森功分器基本原理学习笔记

威尔金森功分器基本原理 威尔金森功率分配器的功能是将输入信号等分或不等分的分配到各个输出端口&#xff0c;并保持相同输出相位。环形器虽然有类似功能&#xff0c;但威尔金森功率分配器在应用上具有更宽的带宽。微带形功分器的电路结构如图所示&#xff0c;其中&#xff0…

【OpenAI Sora】何时开放使用?付费课程已上线(sora什么时候开放使用 )

Sora何时开放使用 根据提供的信息&#xff0c;Sora目前还未对广大用户开放。OpenAI在2024年2月15日展示了Sora的视频&#xff0c;但没有设立等待名单或提供API访问。Sora仍在开发中&#xff0c;正在接受安全测试&#xff0c;并且尚未向公众开放使用。 付费课程已上线 根据最…

Vue图片浏览组件v-viewer,支持旋转、缩放、翻转等操作

Vue图片浏览组件v-viewer&#xff0c;支持旋转、缩放、翻转等操作 之前用过viewer.js&#xff0c;算是市场上用过最全面的图片预览。v-viewer&#xff0c;是基于viewer.js的一个图片浏览的Vue组件&#xff0c;支持旋转、缩放、翻转等操作。 基本使用 安装&#xff1a;npm安装…

费舍尔FISHER金属探测器探测仪维修F70

美国FISHER LABS费舍尔地下金属探测器&#xff0c;金属探测仪等维修&#xff08;考古探金银铜探宝等仪器&#xff09;。 费舍尔F70视听目标ID金属探测器&#xff0c;Fisher 金属探测器公司成立于1931年&#xff0c;在实验条件很艰苦的情况下&#xff0c;研发出了地下金属探测器…

【Python】实现一个类似于Glass2k的Windows窗口透明化软件

一 背景说明 网上看到一款Windows下的窗口透明化工具Glass2k&#xff08;Glass2k官网&#xff09;&#xff0c;可以简单地通过快捷键实现任意窗口的透明化&#xff0c;还挺方便的&#xff0c;想用Python自己实现一下类似的功能。 软件已经开源到&#xff1a;窗口透明化小工具开…

【Leetcode】889. 根据前序和后序遍历构造二叉树

文章目录 题目思路代码结果 题目 题目链接 给定两个整数数组&#xff0c;preorder 和 postorder &#xff0c;其中 preorder 是一个具有 无重复 值的二叉树的前序遍历&#xff0c;postorder 是同一棵树的后序遍历&#xff0c;重构并返回二叉树。 如果存在多个答案&#xff0c;…

CSS基础属性

【三】基础属性 【1】高度和宽度 &#xff08;1&#xff09;参数 width&#xff08;宽度&#xff09;&#xff1a;用于设置元素的宽度。可以使用具体的数值&#xff08;如像素值&#xff09;或百分比来指定宽度。 height&#xff08;高度&#xff09;&#xff1a;用于设置元…

Kubernetes 卷存储 NFS | nfs搭建配置 原理介绍 nfs作为存储卷使用

目录 1、NFS介绍2、NFS服务部署2.1安装nfs服务 (服务端配置)2.2启动NFS服务2.3 服务检查2.4 客户端配置 3、nfs作为存储卷使用3.1 nfs作为volume3.2 nfs存储的缺点3.3 nfs作为PersistentVolum 4、nfs作为动态存储提供5、总结 1、NFS介绍 NFS&#xff08;Network File System&a…

4.pom文件介绍Maven常用命令

1.pom.xml文件介绍. 1.1project标签和modelVersion标签介绍. pom.xml文件是maven的核心文件&#xff0c;POM(Project Object Model&#xff0c;项目对象模型)定义了项目的基本信息&#xff0c;用于描述如何构建&#xff0c;声明项目依赖;&#xff1b; 1.2依赖坐标介绍. 依赖的…

得物面试:Kafka消息0丢失,如何实现?

得物面试&#xff1a;Kafka消息0丢失&#xff0c;如何实现&#xff1f; 尼恩说在前面 在40岁老架构师 尼恩的读者交流群(50)中&#xff0c;最近有小伙伴拿到了一线互联网企业如得物、阿里、滴滴、极兔、有赞、希音、百度、网易、美团的面试资格&#xff0c;遇到很多很重要的面…

新版Java面试专题视频教程——多线程篇②

新版Java面试专题视频教程——多线程篇② 0. 问题汇总0.1 线程的基础知识0.2 线程中并发安全0.3 线程池0.4 使用场景 1.线程的基础知识2.线程中并发锁3.线程池3.1 说一下线程池的核心参数&#xff08;线程池的执行原理知道嘛&#xff09;3.2 线程池中有哪些常见的阻塞队列Array…

高级语言期末2014级A卷

1.编写函数 int delarr(int a[] ,int n)&#xff0c;删除有n个元素的正整型数组a中所有素数&#xff0c;要求&#xff1a; 1&#xff09;数组a中剩余元素保持原来次序&#xff1b; 2&#xff09;将处理后的数组输出&#xff1b; 3&#xff09;函数值返回剩余元素个数&#xff1…

MySQL索引面试题(高频)

文章目录 前言什么时候需要&#xff08;不需要&#xff09;)使用索引&#xff1f;有哪些优化索引的方法前缀索引优化索引覆盖优化索引失效场景 总结 前言 今天来讲一讲 MySQL 索引的高频面试题。主要是针对前一篇文章 MySQL索引入门&#xff08;一文搞定&#xff09;进行查漏补…

虚拟机的内存结构

一、摘要 熟悉 Java 语言特性的同学都知道&#xff0c;相比 C、C 等编程语言&#xff0c;Java 无需通过手动方式回收内存&#xff0c;内存中所有的对象都可以交给 Java 虚拟机来帮助自动回收&#xff1b;而像 C、C 等编程语言&#xff0c;需要开发者通过代码手动释放内存资源&…

MedicalGPT 训练医疗大模型,实现了包括增量预训练、有监督微调、RLHF(奖励建模、强化学习训练)和DPO(直接偏好优化)

MedicalGPT 训练医疗大模型&#xff0c;实现了包括增量预训练、有监督微调、RLHF(奖励建模、强化学习训练)和DPO(直接偏好优化)。 MedicalGPT: Training Your Own Medical GPT Model with ChatGPT Training Pipeline. 训练医疗大模型&#xff0c;实现了包括增量预训练、有监督微…