Flink系列之:窗口聚合

Flink系列之:窗口聚合

  • 一、窗口表值函数(TVF)聚合
  • 二、窗口表值函数TVF
  • 三、分组集
  • 四、ROLLUP
  • 五、CUBE
  • 六、选择组窗口开始和结束时间戳
  • 七、多级窗口聚合
  • 八、分组窗口聚合
  • 九、时间属性
  • 十、选取分组窗口开始和结束时间戳

一、窗口表值函数(TVF)聚合

  • 适用于流批
  • 窗口聚合在 GROUP BY 子句中定义,包含应用窗口 TVF 的关系的“window_start”和“window_end”列。就像使用常规 GROUP BY 子句的查询一样,使用按窗口聚合进行分组的查询将计算每个组的单个结果行。
SELECT ...
FROM <windowed_table> -- relation applied windowing TVF
GROUP BY window_start, window_end, ...

与连续表上的其他聚合不同,窗口聚合不会发出中间结果,而只会发出最终结果,即窗口末尾的总聚合。此外,窗口聚合会在不再需要时清除所有中间状态。

二、窗口表值函数TVF

Flink 支持 TUMBLE、HOP 和 CUMULATE 类型的窗口聚合。在流模式下,窗口表值函数的时间属性字段必须位于事件或处理时间属性上。在批处理模式下,窗口表值函数的时间属性字段必须是 TIMESTAMP 或 TIMESTAMP_LTZ 类型的属性。

以下是 TUMBLE、HOP 和 CUMULATE 窗口聚合的一些示例。

-- 表必须具有时间属性,例如该表中的“bidtime”
Flink SQL> desc Bid;
+-------------+------------------------+------+-----+--------+---------------------------------+
|        name |                   type | null | key | extras |                       watermark |
+-------------+------------------------+------+-----+--------+---------------------------------+
|     bidtime | TIMESTAMP(3) *ROWTIME* | true |     |        | `bidtime` - INTERVAL '1' SECOND |
|       price |         DECIMAL(10, 2) | true |     |        |                                 |
|        item |                 STRING | true |     |        |                                 |
| supplier_id |                 STRING | true |     |        |                                 |
+-------------+------------------------+------+-----+--------+---------------------------------+Flink SQL> SELECT * FROM Bid;
+------------------+-------+------+-------------+
|          bidtime | price | item | supplier_id |
+------------------+-------+------+-------------+
| 2020-04-15 08:05 | 4.00  | C    | supplier1   |
| 2020-04-15 08:07 | 2.00  | A    | supplier1   |
| 2020-04-15 08:09 | 5.00  | D    | supplier2   |
| 2020-04-15 08:11 | 3.00  | B    | supplier2   |
| 2020-04-15 08:13 | 1.00  | E    | supplier1   |
| 2020-04-15 08:17 | 6.00  | F    | supplier2   |
+------------------+-------+------+-------------+-- 翻转窗口聚合
Flink SQL> SELECT window_start, window_end, SUM(price)FROM TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))GROUP BY window_start, window_end;
+------------------+------------------+-------+
|     window_start |       window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
+------------------+------------------+-------+-- 跳跃窗口聚合
Flink SQL> SELECT window_start, window_end, SUM(price)FROM TABLE(HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))GROUP BY window_start, window_end;
+------------------+------------------+-------+
|     window_start |       window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
| 2020-04-15 08:05 | 2020-04-15 08:15 | 15.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
| 2020-04-15 08:15 | 2020-04-15 08:25 | 6.00  |
+------------------+------------------+-------+-- 累积窗口聚合
Flink SQL> SELECT window_start, window_end, SUM(price)FROM TABLE(CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))GROUP BY window_start, window_end;
+------------------+------------------+-------+
|     window_start |       window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:00 | 2020-04-15 08:06 | 4.00  |
| 2020-04-15 08:00 | 2020-04-15 08:08 | 6.00  |
| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
| 2020-04-15 08:10 | 2020-04-15 08:12 | 3.00  |
| 2020-04-15 08:10 | 2020-04-15 08:14 | 4.00  |
| 2020-04-15 08:10 | 2020-04-15 08:16 | 4.00  |
| 2020-04-15 08:10 | 2020-04-15 08:18 | 10.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
+------------------+------------------+-------+

注意:为了更好地理解窗口的行为,我们简化了时间戳值的显示,不显示尾随零,例如如果类型为 TIMESTAMP(3),2020-04-15 08:05 在 Flink SQL Client 中应显示为 2020-04-15 08:05:00.000。

三、分组集

窗口聚合还支持 GROUPING SETS 语法。分组集允许比标准 GROUP BY 描述的分组操作更复杂的分组操作。行按每个指定的分组集单独分组,并为每个组计算聚合,就像简单的 GROUP BY 子句一样。

使用 GROUPING SETS 的窗口聚合要求 window_start 和 window_end 列必须位于 GROUP BY 子句中,但不能位于 GROUPING SETS 子句中。

Flink SQL> SELECT window_start, window_end, supplier_id, SUM(price) as priceFROM TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))GROUP BY window_start, window_end, GROUPING SETS ((supplier_id), ());
+------------------+------------------+-------------+-------+
|     window_start |       window_end | supplier_id | price |
+------------------+------------------+-------------+-------+
| 2020-04-15 08:00 | 2020-04-15 08:10 |      (NULL) | 11.00 |
| 2020-04-15 08:00 | 2020-04-15 08:10 |   supplier2 |  5.00 |
| 2020-04-15 08:00 | 2020-04-15 08:10 |   supplier1 |  6.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 |      (NULL) | 10.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 |   supplier2 |  9.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 |   supplier1 |  1.00 |
+------------------+------------------+-------------+-------+

GROUPING SETS 的每个子列表可以指定零个或多个列或表达式,并以与直接在 GROUP BY 子句中使用相同的方式进行解释。空分组集意味着所有行都聚合到一个组,即使不存在输入行也会输出该组。

对于未出现这些列的分组集,对分组列或表达式的引用将替换为结果行中的空值。

四、ROLLUP

ROLLUP 是用于指定常见类型的分组集的简写符号。它表示给定的表达式列表和列表的所有前缀,包括空列表。

使用 ROLLUP 进行窗口聚合要求 window_start 和 window_end 列必须位于 GROUP BY 子句中,但不能位于 ROLLUP 子句中。

例如,以下查询与上面的查询等效。

SELECT window_start, window_end, supplier_id, SUM(price) as price
FROM TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, ROLLUP (supplier_id);

五、CUBE

CUBE 是用于指定常见类型的分组集的简写符号。它表示给定的列表及其所有可能的子集 - 幂集。

使用 CUBE 的窗口聚合要求 window_start 和 window_end 列必须位于 GROUP BY 子句中,但不能位于 CUBE 子句中。

例如,以下两个查询是等效的。

SELECT window_start, window_end, item, supplier_id, SUM(price) as priceFROM TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))GROUP BY window_start, window_end, CUBE (supplier_id, item);SELECT window_start, window_end, item, supplier_id, SUM(price) as priceFROM TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))GROUP BY window_start, window_end, GROUPING SETS ((supplier_id, item),(supplier_id      ),(             item),(                 )
)

六、选择组窗口开始和结束时间戳

分组窗口的开始和结束时间戳可以通过 window_start 和 window_end 来选定.

七、多级窗口聚合

window_start 和 window_end 列是普通的时间戳字段,并不是时间属性。因此它们不能在后续的操作中当做时间属性进行基于时间的操作。 为了传递时间属性,需要在 GROUP BY 子句中添加 window_time 列。window_time 是 Windowing TVFs 产生的三列之一,它是窗口的时间属性。 window_time 添加到 GROUP BY 子句后就能被选定了。下面的查询可以把它用于后续基于时间的操作,比如:多级窗口聚合 和 Window TopN。

下面展示了一个多级窗口聚合:第一个窗口聚合后把时间属性传递给第二个窗口聚合。

-- 每个supplier_id翻滚5分钟
CREATE VIEW window1 AS
-- 注意:内部Window TVF 的窗口开始和窗口结束字段在select 子句中是可选的。但是,如果它们出现在子句中,则需要为它们起别名,以防止名称与外部窗口 TVF 的窗口开始和窗口结束发生冲突。
SELECT window_start as window_5mintumble_start, window_end as window_5mintumble_end, window_time as rowtime, SUM(price) as partial_priceFROM TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))GROUP BY supplier_id, window_start, window_end, window_time;-- 在第一个窗口翻滚 10 分钟
SELECT window_start, window_end, SUM(partial_price) as total_priceFROM TABLE(TUMBLE(TABLE window1, DESCRIPTOR(rowtime), INTERVAL '10' MINUTES))GROUP BY window_start, window_end;

八、分组窗口聚合

警告:分组窗口聚合已经过时。推荐使用更加强大和有效的窗口表值函数聚合。

“窗口表值函数聚合"相对于"分组窗口聚合"有如下优点:

  • 包含 性能调优 中提到的所有性能优化。
  • 支持标准的 GROUPING SETS 语法。
  • 可以在窗口聚合结果上使用 窗口 TopN。
  • 等等。

分组窗口聚合定义在 SQL 的 GROUP BY 子句中。和普通的 GROUP BY 子句一样,包含分组窗口函数的 GROUP BY 子句的查询会对各组分别计算,各产生一个结果行。批处理表和流表上的SQL支持以下分组窗口函数:

分组窗口函数

Group Window FunctionDescription
TUMBLE(time_attr, interval)定义一个滚动时间窗口。它把数据分配到连续且不重叠的固定时间区间(interval),例如:一个5分钟的滚动窗口以5分钟为间隔对数据进行分组。滚动窗口可以被定义在事件时间(流 + 批)或者处理时间(流)上。
HOP(time_attr, interval, interval)定义一个滑动时间窗口,它有窗口大小(第二个 interval 参数)和滑动间隔(第一个 interval 参数)两个参数。如果滑动间隔小于窗口大小,窗口会产生重叠。所以,数据可以被指定到多个窗口。例如:一个15分钟大小和5分钟滑动间隔的滑动窗口将每一行分配给3个15分钟大小的不同窗口,这些窗口以5分钟的间隔计算。滑动窗口可以被定义在事件时间(流 + 批)或者处理时间(流)上。
SESSION(time_attr, interval)定义一个会话时间窗口。会话时间窗口没有固定的时间区间,但其边界是通过不活动的时间 interval 定义的,即:一个会话窗口会在指定的时长内没有事件出现时关闭。例如:一个30分钟间隔的会话窗口收到一条数据时,如果之前已经30分钟不活动了(否则,这条数据会被分配到已经存在的窗口中),它会开启一个新窗口,如果30分钟之内没有新数据到来,就会关闭。会话窗口可以被定义在事件时间(流 + 批) 或者处理时间(流)上。

九、时间属性

  • 在流处理模式,分组窗口函数的 time_attr 属性必须是一个有效的处理或事件时间。
  • 在批处理模式,分组窗口函数的 time_attr 参数必须是一个 TIMESTAMP 类型的属性。

十、选取分组窗口开始和结束时间戳

分组窗口的开始和结束时间戳以及时间属性也可以通过下列辅助函数的方式获取到:

辅助函数描述
TUMBLE_START(time_attr, interval)、HOP_START(time_attr, interval, interval)、SESSION_START(time_attr, interval)返回相应的滚动,滑动或会话窗口的下限的时间戳(inclusive),即窗口开始时间。
TUMBLE_END(time_attr, interval)、HOP_END(time_attr, interval, interval)、SESSION_END(time_attr, interval)返回相应滚动窗口,跳跃窗口或会话窗口的上限的时间戳(exclusive),即窗口结束时间。注意: 上限时间戳(exlusive)不能作为 rowtime attribute 用于后续基于时间的操作,例如:interval joins 和 group window 或 over window aggregations。
TUMBLE_ROWTIME(time_attr, interval)、HOP_ROWTIME(time_attr, interval, interval)、SESSION_ROWTIME(time_attr, interval)返回相应滚动窗口,跳跃窗口或会话窗口的上限的时间戳(inclusive),即窗口事件时间,或窗口处理时间。返回的值是 rowtime attribute,可以用于后续基于时间的操作,比如:interval joins 和 group window 或 over window aggregations。
TUMBLE_PROCTIME(time_attr, interval)、HOP_PROCTIME(time_attr, interval, interval)、SESSION_PROCTIME(time_attr, interval)返回的值是 proctime attribute,可以用于后续基于时间的操作,比如: interval joins 和 group window 或 over window aggregations。

注意: 辅助函数的参数必须和 GROUP BY 子句中的分组窗口函数一致。

下面的例子展示了在流式表上如何使用分组窗口 SQL 查询:

CREATE TABLE Orders (user       BIGINT,product    STRING,amount     INT,order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '1' MINUTE
) WITH (...);SELECTuser,TUMBLE_START(order_time, INTERVAL '1' DAY) AS wStart,SUM(amount) FROM Orders
GROUP BYTUMBLE(order_time, INTERVAL '1' DAY),user

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/230978.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux常用命令大全(摘录整理)

1、ls命令2、cd命令3、pwd命令4、mkdir命令5、rm命令6、rmdir命令7、mv命令8、cp命令9、cat命令10、more命令11、less命令12、head命令13、tail命令14、which命令15、whereis命令16、locate命令17、find命令18、chmod命令19、tar命令20、chown命令21、df命令22、du命令23、ln命…

R语言【rgbif】——occ_search对待字符长度大于1500的WKT的特殊处理真的有必要吗?

一句话结论&#xff1a;只要有网有流量&#xff0c;直接用长WKT传递给参数【geometry】、参数【limit】配合参数【start】获取所有记录。 当我在阅读 【rgbif】 给出的用户手册时&#xff0c;注意到 【occ_search】 强调了 参数 【geometry】使用的wkt格式字符串长度。 文中如…

Linux内存管理(十七):percpu 分配器——框架实现

源码基于:Linux5.4 约定: 芯片架构:ARM64内存架构:UMACONFIG_ARM64_VA_BITS:39CONFIG_ARM64_PAGE_SHIFT:12CONFIG_PGTABLE_LEVELS :3关联博文: percpu分配器——基础概念 percpu分配器——框架实现 percpu分配器——动态分配 0. 前言 上一篇博文</

使用数组模拟栈的相关操作【栈1.1】

public class ArrayStackDemo {public static void main(String[] args) {ArrayStack arrayStack new ArrayStack(4);Scanner sc new Scanner(System.in);boolean loop true;char key ;while (loop) {System.out.println("栈操作菜单项");System.out.println(&q…

基于SpringBoot和微信小程序网上购物商城

文章目录 项目介绍主要功能截图:部分代码展示设计总结项目获取方式🍅 作者主页:超级无敌暴龙战士塔塔开 🍅 简介:Java领域优质创作者🏆、 简历模板、学习资料、面试题库【关注我,都给你】 🍅文末获取源码联系🍅 项目介绍 基于SpringBoot和微信小程序网上购物商城…

范仲淹仅存五首词作,篇篇经典

范仲淹&#xff0c;北宋时期杰出的政治家&#xff0c;文学家&#xff0c;提起他首先想到初中那篇《岳阳楼记》&#xff0c;难倒无数背诵困难户&#xff0c;它虽然难背&#xff0c;但却是一篇文字排列极美&#xff0c;意境极佳的美文。 “不以物喜&#xff0c;不以己悲&#xf…

Android hilt使用

一&#xff0c;添加依赖库 添加依赖库app build.gradle.kts implementation("com.google.dagger:hilt-android:2.49")annotationProcessor("com.google.dagger:hilt-android:2.49")annotationProcessor("com.google.dagger:hilt-compiler:2.49"…

Python顶级组件

顶级组件 Python 解释器可以从多种源获得输入&#xff1a;作为标准输入或程序参数传入的脚本&#xff0c;以交互方式键入的语句&#xff0c;导入的模块源文件等等。 这一章将给出在这些情况下所用的语法。 9.1. 完整的 Python 程序 虽然语言规范描述不必规定如何发起调用语言…

【leetcode刷题之MySQL】

175. 组合两个表 select firstName,lastName,city, state from Person left join Address on Person.personIdAddress.personId LEFT JOIN&#xff1a;保留左表内容&#xff0c;右表不存在的列使用 Null 代替 RIGHT JOIN&#xff1a;保留右表中连接字段的内容&#xff0c;左表…

【ubuntu】Linux常用截屏软件

Linux截屏软件及安装教程 截屏软件简介 在Linux系统中&#xff0c;有多种强大的截屏工具可供选择。以下是一些常用的截屏软件&#xff1a; Shutter 功能强大&#xff0c;支持全屏、窗口、选区等多种截屏方式。可以添加注释、标记和编辑截图。 Flameshot 轻量级截屏工具&…

程序人生,由“小作文”事件想到的

时势造英雄。自媒体时代&#xff0c;火出圈是靠大众的审美和爱好&#xff0c;自己能做的关键&#xff0c;其实是做好自己&#xff0c;选择向上生长&#xff0c;持续不断的读书、学习。同时保持一份好奇心&#xff0c;培养一个兴趣爱好并自得其乐。 展示自我 回想起我小时候&am…

Linux内核实现AES加密

本文涉及到编写一个内核模块&#xff0c;扩展内核密钥类型并使用该密钥实现AES加密。以下是一个简单的示例代码&#xff0c;演示如何在C语言中实现一个内核模块以及在内核中使用密钥进行AES加密。 c #include <linux/module.h> #include <linux/kernel.h> #includ…

Promise执行顺序

小编建议小伙伴们不要跳点看&#xff0c;每一点都是衔接&#xff0c;有比较的 本篇文章考查 ①promise是同步任务还是微任务 ②promise.then()什么时候执行&#xff0c;是微任务还是宏任务 ③如何控制状态变化&#xff0c;不同状态变化&#xff0c;会执行哪个回调函数 1、以下代…

进制之间的转换——n进制转换为m进制(C/C++实现,简单易懂)

目录 &#x1f308;前言&#xff1a; &#x1f4c1; 什么是进制转换&#xff1a; &#x1f4c1;其他进制转换成十进制&#xff1a; &#x1f4c2;二进制( B ) ——> 十进制( D ) &#x1f4c2;八进制( O ) ——> 十进制( D ) &#x1f4c2;十六进制( H ) ——> 十进制…

运维实践|采集MySQL数据出现many connection errors

文章目录 问题出现问题分析当前环境问题分析 解决方案1 检查调度事件任务是否开启2 开启调度事件任务3 创建一张日志表4 创建函数存储过程5 创建事件定时器6 开启事件调度任务7 检查核实是否创建 总结 问题出现 最近在做OGG结构化数据采集工作&#xff0c;在数据采集过程中&am…

抖音商品详情API接口在电商行业中的重要性及实时数据获取实现

随着移动互联网的快速发展&#xff0c;电商行业不断壮大。抖音作为一款短视频社交应用&#xff0c;近年来在电商领域取得了显著成果。本文将探讨抖音商品详情API接口在电商行业中的重要性&#xff0c;以及如何通过实时数据获取提高业务效率。我们将介绍相关的技术背景、API接口…

Linux 操作系统 004-远程连接

Linux 操作系统 004-远程连接 本节关键字&#xff1a;Linux、远程连接、XManager、Xshell、ssh 本节相关指令&#xff1a; XManager的下载 XManager官网 Xmanager的安装 1、双击下载好的安装包&#xff0c;下一步 2、接受许可&#xff0c;下一步 3、产品密钥&#xff08;试…

AngularJS

理解实现代码的逻辑为主要&#xff0c;代码怎么写为次要。 参考资料&#xff1a; 《AngularJS入门与进阶》&#xff0c;江荣波著 前端开发常用框架 React&#xff1a;由Facebook开发&#xff0c;用于构建用户界面的JavaScript库&#xff0c;以组件化和虚拟DOM著称。 Angular&…

2024最新FL Studio21.2MAC电脑版中文版下载安装步骤教程

FL Studio 简称FL&#xff0c;全称Fruity Loops Studio&#xff0c;因此国人习惯叫它"水果"。目前最新版本是FL Studio21.1.1.3750版本&#xff0c;它让你的计算机就像是全功能的录音室&#xff0c;大混音盘&#xff0c;非常先进的制作工具&#xff0c;让你的音乐突破…

docker的资源限制及容器应用

一、docker资源限制 在使用 docker 运行容器时&#xff0c;一台主机上可能会运行几百个容器&#xff0c;这些容器虽然互相隔离&#xff0c;但是底层却使用着相同的 CPU、内存和磁盘资源。如果不对容器使用的资源进行限制&#xff0c;那么容器之间会互相影响&#xff0c;小的来说…