【大数据】Flink SQL 语法篇(八):集合、Order By、Limit、TopN

Flink SQL 语法篇(八):集合、Order By、Limit、TopN

  • 1.集合操作
  • 2.Order By、Limit 子句
    • 2.1 Order By 子句
    • 2.2 Limit 子句
  • 3.TopN 子句

1.集合操作

集合操作支持 Batch / Streaming 任务。

在这里插入图片描述

  • UNION:将集合合并并且去重。
  • UNION ALL:将集合合并,不做去重。
Flink SQL> create view t1(s) as values ('c'), ('a'), ('b'), ('b'), ('c');
Flink SQL> create view t2(s) as values ('d'), ('e'), ('a'), ('b'), ('b');Flink SQL> (SELECT s FROM t1) UNION (SELECT s FROM t2);
+---+
|  s|
+---+
|  c|
|  a|
|  b|
|  d|
|  e|
+---+Flink SQL> (SELECT s FROM t1) UNION ALL (SELECT s FROM t2);
+---+
|  c|
+---+
|  c|
|  a|
|  b|
|  b|
|  c|
|  d|
|  e|
|  a|
|  b|
|  b|
+---+
  • Intersect:交集并且去重。
  • Intersect ALL:交集不做去重。
Flink SQL> create view t1(s) as values ('c'), ('a'), ('b'), ('b'), ('c');
Flink SQL> create view t2(s) as values ('d'), ('e'), ('a'), ('b'), ('b');
Flink SQL> (SELECT s FROM t1) INTERSECT (SELECT s FROM t2);
+---+
|  s|
+---+
|  a|
|  b|
+---+Flink SQL> (SELECT s FROM t1) INTERSECT ALL (SELECT s FROM t2);
+---+
|  s|
+---+
|  a|
|  b|
|  b|
+---+
  • Except:差集并且去重。
  • Except ALL:差集不做去重。
Flink SQL> (SELECT s FROM t1) EXCEPT (SELECT s FROM t2);
+---+
| s |
+---+
| c |
+---+Flink SQL> (SELECT s FROM t1) EXCEPT ALL (SELECT s FROM t2);
+---+
| s |
+---+
| c |
| c |
+---+

上述 SQL 在流式任务中,如果一条左流数据先来了,没有从右流集合数据中找到对应的数据时会直接输出,当右流对应数据后续来了之后,会下发回撤流将之前的数据给撤回。这也是一个回撤流。

  • In 子查询:这个大家比较熟悉了,但是注意,In 子查询的结果集只能有一列。
SELECT user, amount
FROM Orders
WHERE product IN (SELECT product FROM NewProducts
)

上述 SQL 的 In 子句其实就和之前介绍到的 Inner Join 类似。并且 In 子查询也会涉及到大状态问题,大家注意设置 State 的 TTL。

2.Order By、Limit 子句

2.1 Order By 子句

支持 Batch / Streaming,但在实时任务中一般用的非常少。

实时任务中,Order By 子句中 必须要有时间属性字段,并且时间属性必须为 升序 时间属性,即 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND 或者 WATERMARK FOR rowtime_column AS rowtime_column

举例:

CREATE TABLE source_table_1 (user_id BIGINT NOT NULL,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time
) WITH ('connector' = 'datagen','rows-per-second' = '10','fields.user_id.min' = '1','fields.user_id.max' = '10'
);CREATE TABLE sink_table (user_id BIGINT
) WITH ('connector' = 'print'
);INSERT INTO sink_table
SELECT user_id
FROM source_table_1
Order By row_time, user_id desc

2.2 Limit 子句

支持 Batch / Streaming,但实时场景一般不使用,但是此处依然举一个例子。

CREATE TABLE source_table_1 (user_id BIGINT NOT NULL,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time
) WITH ('connector' = 'datagen','rows-per-second' = '10','fields.user_id.min' = '1','fields.user_id.max' = '10'
);CREATE TABLE sink_table (user_id BIGINT
) WITH ('connector' = 'print'
);INSERT INTO sink_table
SELECT user_id
FROM source_table_1
Limit 3

结果如下,只有 3 条输出:

+I[5]
+I[9]
+I[4]

3.TopN 子句

TopN 定义(支持 Batch / Streaming):TopN 其实就是对应到离线数仓中的 row_number(),可以使用 row_number() 对某一个分组的数据进行排序。

应用场景:根据 某个排序 条件,计算 某个分组 下的排行榜数据。

SQL 语法标准:

SELECT [column_list]
FROM (SELECT [column_list],ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownumFROM table_name)
WHERE rownum <= N [AND conditions]
  • ROW_NUMBER():标识 TopN 排序子句。
  • PARTITION BY col1[, col2...]:标识分区字段,代表按照这个 col 字段作为分区粒度对数据进行排序取 TopN,比如下述案例中的 partition by key,就是根据需求中的搜索关键词(key)做为分区。
  • ORDER BY col1 [asc|desc][, col2 [asc|desc]...]:标识 TopN 的排序规则,是按照哪些字段、顺序或逆序进行排序。
  • WHERE rownum <= N:这个子句是一定需要的,只有加上了这个子句,Flink 才能将其识别为一个 TopN 的查询,其中 N 代表 TopN 的条目数。
  • [AND conditions]:其他的限制条件也可以加上。

实际案例:取某个搜索关键词下的搜索热度前 10 名的词条数据。

输入数据为搜索词条数据的搜索热度数据,当搜索热度发生变化时,会将变化后的数据写入到数据源的 Kafka 中:

-- 数据源 schema-- 字段名         备注
-- key          搜索关键词
-- name         搜索热度名称
-- search_cnt    热搜消费热度(比如 3000)
-- timestamp       消费词条时间戳CREATE TABLE source_table (name BIGINT NOT NULL,search_cnt BIGINT NOT NULL,key BIGINT NOT NULL,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time
) WITH (...
);-- 数据汇 schema-- key          搜索关键词
-- name         搜索热度名称
-- search_cnt    热搜消费热度(比如 3000)
-- timestamp       消费词条时间戳CREATE TABLE sink_table (key BIGINT,name BIGINT,search_cnt BIGINT,`timestamp` TIMESTAMP(3)
) WITH (...
);-- DML 逻辑
INSERT INTO sink_table
SELECT key, name, search_cnt, row_time as `timestamp`
FROM (SELECT key, name, search_cnt, row_time, -- 根据热搜关键词 key 作为 partition key,然后按照 search_cnt 倒排取前 100 名ROW_NUMBER() OVER (PARTITION BY keyORDER BY search_cnt desc) AS rownumFROM source_table)
WHERE rownum <= 100

输出结果:

-D[关键词1, 词条1, 4944]
+I[关键词1, 词条1, 8670]
+I[关键词1, 词条2, 1735]
-D[关键词1, 词条3, 6641]
+I[关键词1, 词条3, 6928]
-D[关键词1, 词条4, 6312]
+I[关键词1, 词条4, 7287]

可以看到输出数据是有回撤数据的,为什么会出现回撤,我们来看看 SQL 语义。

上面的 SQL 会翻译成以下三个算子:

  • 数据源:数据源即最新的词条下面的搜索词的搜索热度数据,消费到 Kafka 中数据后,按照 partition key 将数据进行 Hash 分发到下游排序算子,相同的 Key 数据将会发送到一个并发中。
  • 排序算子:为每个 Key 维护了一个 TopN 的榜单数据,接受到上游的一条数据后,如果 TopN 榜单还没有到达 N 条,则将这条数据加入 TopN 榜单后,直接下发数据,如果到达 N 条之后,经过 TopN 计算,发现这条数据比原有的数据排序靠前,那么新的 TopN 排名就会有变化,就变化了的这部分数据之前下发的排名数据撤回(即回撤数据),然后下发新的排名数据。
  • 数据汇:接收到上游的数据之后,然后输出到外部存储引擎中。

上面三个算子也是会 24 小时一直运行的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/708300.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

什么是索引优化?如何确定何时添加或删除索引?如何监控MySQL的性能?

什么是索引优化&#xff1f;如何确定何时添加或删除索引&#xff1f; 索引优化是数据库性能调优的一个重要环节&#xff0c;它涉及到对数据库索引的创建、修改和删除等操作&#xff0c;以提高数据库查询性能和数据访问效率。索引优化主要包括选择合适的索引类型、索引列、索引顺…

DataGrip 2023:让数据库开发变得更简单、更高效 mac/win版

JetBrains DataGrip 2023是一款功能强大的数据库IDE&#xff0c;专为数据库开发和管理而设计。通过DataGrip&#xff0c;您可以连接到各种关系型数据库管理系统(RDBMS)&#xff0c;并使用其提供的一组工具来查询、管理、编辑和开发数据库。 DataGrip 2023 软件获取 DataGrip …

[unity]lua热更新——个人复习笔记【侵删/有不足之处欢迎斧正】

一、AssetBundle AB包是特定于平台的资产压缩包&#xff0c;类似于压缩文件 相对于RESOURCES下的资源&#xff0c;AB包更加灵活轻量化&#xff0c;用于减小包体大小和热更新 可以在unity2019环境中直接下载Asset Bundle Browser 可以在其中设置关联 AB包生成的文件 AB包文件…

【Linux】云服务器的Redis被黑

&#x1f4dd;个人主页&#xff1a;五敷有你 &#x1f525;系列专栏&#xff1a;Linux ⛺️稳中求进&#xff0c;晒太阳 攻击发现&#xff1a; 这个异常情况是在腾讯云被入侵后&#xff0c;短信提醒发现的。并没有系统的学习过关于服务器安防相关的知识&#xff0c;遇到…

嵌入式学习日记 22

1.进程间的通信: 1.管道 2.信号 3.消息队列 4.共享内存 5.信号灯 6.套接字 1.管道: 1.无名管道 无名管道只能用于具有亲缘关系的进程间通信 pipe int pipe(int pipefd[2]); 功能: 创建一个无名管道 参数: …

【Hudi】并发控制

MVCC Hudi的表操作&#xff0c;如压缩、清理、提交&#xff0c;hudi会利用多版本并发控制来提供多个表操作写入和查询之间的快照隔离。使用MVCC这种模型&#xff0c;Hudi支持并发任意数量的操作作业&#xff0c;并保证不会发生任何冲突。Hudi默认这种模型。MVCC方式所有的tabl…

国产动漫|基于Springboot的国产动漫网站设计与实现(源码+数据库+文档)

国产动漫网站目录 目录 基于Springboot的国产动漫网站设计与实现 一、前言 二、系统功能设计 三、系统功能设计 1、用户信息管理 2、国漫先驱管理 3、国漫之最管理 4、公告信息管理 四、数据库设计 1、实体ER图 五、核心代码 六、论文参考 七、最新计算机毕设选题…

LVS-DR实验.

IP规划: servera(LVS-DR):192.168.145.128 serverb(WB1):192.168.145.129 servere(WB2):192.168.145.131 servera(LVS-DR): 添加VIP: # 使用ifconfig命令配置虚拟网卡ens160:0的IP地址为192.168.145.123&#xff0c;广播地址…

C语言中如何进行内存管理

主页&#xff1a;17_Kevin-CSDN博客 收录专栏&#xff1a;《C语言》 C语言是一种强大而灵活的编程语言&#xff0c;但与其他高级语言不同&#xff0c;它要求程序员自己负责内存的管理。正确的内存管理对于程序的性能和稳定性至关重要。 一、引言 C 语言是一门广泛使用的编程语…

VPX基于全国产飞腾FT-2000+/64核+复旦微FPGA的计算刀片

6U VPX计算板 产品简介 产品特点 飞腾计算平台&#xff0c;国产化率100% VPX-MPU6902是一款基于飞腾FT-2000/64核的计算刀片&#xff0c;主频2.2GHz&#xff0c;负责业务数据流的管控和调度。搭配自带独立显示芯片的飞腾X100芯片&#xff0c;可用于于各类终端及服务器类应用场…

蚓链数字化生态系统助力企业业务数字化转型

在当今数字化浪潮中&#xff0c;企业数字化转型已成为提升竞争力的关键。蚓链数字化生态系统致力于通过业务和技术的协同推动企业数字化转型&#xff0c;为企业提供了一套系统化的业务数字化解决方案。 业务数字化的第一步是根据企业当前业务分布&#xff0c;划分业务板块&…

spring boot整合cache使用memcached

之前讲了 spring boot 整合 cache 做 simple redis Ehcache 三种工具的缓存 上文 windows系统下载安装 memcached 我们装了memcached 但spring boot没有将它的整合纳入进来 那么 我们就要自己来处理客户端 java历史上 有过三种客户端 那么 我们用肯定是用最好的 Xmemcached …

Debian更改主机名重启失效怎么解决

要在Debian系统上更改主机名并使其生效&#xff0c;您可以按照以下步骤进行操作&#xff1a; 使用以下命令更改主机名&#xff1a; sudo hostnamectl set-hostname 新主机名 编辑 /etc/hosts 文件&#xff0c;将旧主机名替换为新主机名。您可以使用文本编辑器打开该文件进行编辑…

vue2 + axios + mock.js封装过程,包含mock.js获取数据时报404状态的解决记录,带图文,超详细!!!

vue axios mock.js 以下是封装的过程&#xff0c;记录一下 1、首先先了解什么是mock.js的用途及特点 官网地址&#xff1a;Mock.js (mockjs.com) 作用&#xff1a;生成随机数据&#xff0c;拦截 Ajax 请求 优势&#xff1a; 2、了解axios的原理及使用 官网地址&#xff1a…

Oracle之缓存融合

CACHE FUSION 原理 为了更深入的了解Oracle的后台进程的工作原理&#xff0c;需要先了解一下 RAC 中多节点对共享数据文件访问的管理是如何进行的。要了解 RAC 工作原理的中心&#xff0c;需要知道 Cache Fusion 这个重要的概念&#xff0c;要发挥 Cache Fusion 的作用&#xf…

大模型(LLM)的token学习记录-I

文章目录 基本概念什么是token?如何理解token的长度&#xff1f;使用openai tokenizer 观察token的相关信息open ai的模型 token的特点token如何映射到数值&#xff1f;token级操作&#xff1a;精确地操作文本token 设计的局限性 tokenizationtoken 数量对LLM 的影响训练模型参…

转转高效改表平台的演进之路

## 前言 我们假设这么一个场景&#xff0c;你管理着一个渡口&#xff0c;现在有一个人要过河&#xff0c;在没有桥的情况下&#xff0c;不管是早上晚上还是凌晨&#xff0c;你都要用船驮着他过河。随着时间的推移&#xff0c;越来越多的人过河。你会忙不过来&#xff0c;甚至崩…

研发日记,MatlabSimulink开箱报告(九)——Simulink Test模块

文章目录 前言 Simulink Test模块 静态测试 动态测试 逻辑测试 前言 见《开箱报告&#xff0c;Simulink Toolbox库模块使用指南&#xff08;四&#xff09;——S-Fuction模块》 见《开箱报告&#xff0c;Simulink Toolbox库模块使用指南&#xff08;五&#xff09;——S-F…

练习 2 Web [ACTF2020 新生赛]BackupFile 1

[ACTF2020 新生赛]BackupFile 1 Web常规题目 首先尝试查找常见的前端页面index.php之类的&#xff0c;没找到 题目有个“BackupFile”——备份文件 尝试用工具遍历查找相关的文件 御剑没扫出来&#xff0c;搜索搭建好dirsearch后&#xff0c;扫出来的index.php.bak 扫描工…

每天一个数据分析题(一百七十八)

在大样本&#xff08;样本量为n&#xff09;下进行某一列数据&#xff08;A列&#xff09;均值的区间估计时&#xff0c;假设点估计的值计算为a&#xff0c;显著性水平为0.05&#xff0c;z0.025为给定的显著性水平下的正态分布的临界值&#xff0c;则使用EXCEL的计算方法正确的…