聊聊flink Table的OrderBy及Limit

本文主要研究一下flink Table的OrderBy及Limit

实例

Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.orderBy("a.asc");Table in = tableEnv.fromDataSet(ds, "a, b, c");// returns the first 5 records from the sorted result
Table result1 = in.orderBy("a.asc").fetch(5); // skips the first 3 records and returns all following records from the sorted result
Table result2 = in.orderBy("a.asc").offset(3);// skips the first 10 records and returns the next 5 records from the sorted result
Table result3 = in.orderBy("a.asc").offset(10).fetch(5);
  • orderBy方法类似sql的order by;limit则由offset及fetch两个方法构成,类似sql的offset及fetch

Table

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala

class Table(private[flink] val tableEnv: TableEnvironment,private[flink] val logicalPlan: LogicalNode) {//......def orderBy(fields: String): Table = {val parsedFields = ExpressionParser.parseExpressionList(fields)orderBy(parsedFields: _*)}def orderBy(fields: Expression*): Table = {val order: Seq[Ordering] = fields.map {case o: Ordering => ocase e => Asc(e)}new Table(tableEnv, Sort(order, logicalPlan).validate(tableEnv))}def offset(offset: Int): Table = {new Table(tableEnv, Limit(offset, -1, logicalPlan).validate(tableEnv))}def fetch(fetch: Int): Table = {if (fetch < 0) {throw new ValidationException("FETCH count must be equal or larger than 0.")}this.logicalPlan match {case Limit(o, -1, c) =>// replace LIMIT without FETCH by LIMIT with FETCHnew Table(tableEnv, Limit(o, fetch, c).validate(tableEnv))case Limit(_, _, _) =>throw new ValidationException("FETCH is already defined.")case _ =>new Table(tableEnv, Limit(0, fetch, logicalPlan).validate(tableEnv))}}//......
}
  • Table的orderBy方法,支持String或Expression类型的参数,其中String类型最终是转为Expression类型;orderBy方法最后使用Sort重新创建了Table;offset及fetch方法,使用Limit重新创建了Table(offset方法创建的Limit其fetch为-1;fetch方法如果之前没有指定offset则创建的Limit的offset为0)

Sort

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala

case class Sort(order: Seq[Ordering], child: LogicalNode) extends UnaryNode {override def output: Seq[Attribute] = child.outputoverride protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {child.construct(relBuilder)relBuilder.sort(order.map(_.toRexNode(relBuilder)).asJava)}override def validate(tableEnv: TableEnvironment): LogicalNode = {if (tableEnv.isInstanceOf[StreamTableEnvironment]) {failValidation(s"Sort on stream tables is currently not supported.")}super.validate(tableEnv)}
}
  • Sort继承了UnaryNode,它的构造器接收Set类型的Ordering,其construct方法使用relBuilder.sort来构建sort条件

Ordering

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/expressions/ordering.scala

abstract class Ordering extends UnaryExpression {override private[flink] def validateInput(): ValidationResult = {if (!child.isInstanceOf[NamedExpression]) {ValidationFailure(s"Sort should only based on field reference")} else {ValidationSuccess}}
}case class Asc(child: Expression) extends Ordering {override def toString: String = s"($child).asc"override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {child.toRexNode}override private[flink] def resultType: TypeInformation[_] = child.resultType
}case class Desc(child: Expression) extends Ordering {override def toString: String = s"($child).desc"override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {relBuilder.desc(child.toRexNode)}override private[flink] def resultType: TypeInformation[_] = child.resultType
}
  • Ordering是一个抽象类,它有Asc及Desc两个子类

Limit

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala

case class Limit(offset: Int, fetch: Int = -1, child: LogicalNode) extends UnaryNode {override def output: Seq[Attribute] = child.outputoverride protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {child.construct(relBuilder)relBuilder.limit(offset, fetch)}override def validate(tableEnv: TableEnvironment): LogicalNode = {if (tableEnv.isInstanceOf[StreamTableEnvironment]) {failValidation(s"Limit on stream tables is currently not supported.")}if (!child.isInstanceOf[Sort]) {failValidation(s"Limit operator must be preceded by an OrderBy operator.")}if (offset < 0) {failValidation(s"Offset should be greater than or equal to zero.")}super.validate(tableEnv)}
}
  • Limit继承了UnaryNode,它的构造器接收offset及fetch参数,它的construct方法通过relBuilder.limit来设置offset及fetch

小结

  • Table的orderBy方法类似sql的order by;limit则由offset及fetch两个方法构成,类似sql的offset及fetch
  • Table的orderBy方法,支持String或Expression类型的参数,其中String类型最终是转为Expression类型;orderBy方法最后使用Sort重新创建了Table;offset及fetch方法,使用Limit重新创建了Table(offset方法创建的Limit其fetch为-1;fetch方法如果之前没有指定offset则创建的Limit的offset为0)
  • Sort继承了UnaryNode,它的构造器接收Set类型的Ordering,其construct方法使用relBuilder.sort来构建sort条件;Ordering是一个抽象类,它有Asc及Desc两个子类;Limit继承了UnaryNode,它的构造器接收offset及fetch参数,它的construct方法通过relBuilder.limit来设置offset及fetch

doc

  • OrderBy, Offset & Fetch

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/388353.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

binary masks_Python中的Masks概念

binary masksAll men are sculptors, constantly chipping away the unwanted parts of their lives, trying to create their idea of a masterpiece … Eddie Murphy所有的人都是雕塑家&#xff0c;不断地消除生活中不必要的部分&#xff0c;试图建立自己的杰作理念……埃迪墨…

Iphone在ScrollView下点击TextField使文本筐不被键盘遮住

1.拖一个Scroll View视图填充View窗口&#xff0c;将Scroll View视图拖大一些&#xff0c;使其超出屏幕。 2.向Scroll View拖&#xff08;添加&#xff09;多个Label视图和Text View视图。 3.在.h头文件中添加如下代码&#xff1a; [cpp] view plaincopyprint?#import <U…

python 仪表盘_如何使用Python刮除仪表板

python 仪表盘Dashboard scraping is a useful skill to have when the only way to interact with the data you need is through a dashboard. We’re going to learn how to scrape data from a dashboard using the Selenium and Beautiful Soup packages in Python. The S…

VS2015 定时服务及控制端

一. 服务端 如下图—新建项目—经典桌面—Windows服务—起名svrr2. 打到server1 改名为svrExecSqlInsert 右击对应的设计界面&#xff0c;添加安装服务目录结构如图 3. svrExecSqlInsert里有打到OnStart()方法开始写代码如下 /// <summary>/// 服务开启操作/// </su…

Iphone表视图的简单操作

1.创建一个Navigation—based—Application项目&#xff0c;这样Interface Builder中会自动生成一个Table View&#xff0c;然后将Search Bar拖放到表示图上&#xff0c;以我们要给表示图添加搜索功能&#xff0c;不要忘记将Search Bar的delegate连接到File‘s Owner项&#xf…

aws emr 大数据分析_DataOps —使用AWS Lambda和Amazon EMR的全自动,低成本数据管道

aws emr 大数据分析Progression is continuous. Taking a flashback journey through my 25 years career in information technology, I have experienced several phases of progression and adaptation.进步是连续的。 在我25年的信息技术职业生涯中经历了一次闪回之旅&…

先进的NumPy数据科学

We will be covering some of the advanced concepts of NumPy specifically functions and methods required to work on a realtime dataset. Concepts covered here are more than enough to start your journey with data.我们将介绍NumPy的一些高级概念&#xff0c;特别是…

lsof命令详解

基础命令学习目录首页 原文链接&#xff1a;https://www.cnblogs.com/ggjucheng/archive/2012/01/08/2316599.html 简介 lsof(list open files)是一个列出当前系统打开文件的工具。在linux环境下&#xff0c;任何事物都以文件的形式存在&#xff0c;通过文件不仅仅可以访问常规…

统计和冰淇淋

Photo by Irene Kredenets on UnsplashIrene Kredenets在Unsplash上拍摄的照片 摘要 (Summary) In this article, you will learn a little bit about probability calculations in R Studio. As it is a Statistical language, R comes with many tests already built in it, …

信息流服务器哪种好,选购存储服务器需要注意六大关键因素,你知道几个?

原标题&#xff1a;选购存储服务器需要注意六大关键因素&#xff0c;你知道几个&#xff1f;信息技术的飞速发展带动了整个信息产业的发展。越来越多的电子商务平台和虚拟化环境出现在企业的日常应用中。存储服务器作为企业建设环境的核心设备&#xff0c;在整个信息流中承担着…

t3 深入Tornado

3.1 Application settings 前面的学习中&#xff0c;在创建tornado.web.Application的对象时&#xff0c;传入了第一个参数——路由映射列表。实际上Application类的构造函数还接收很多关于tornado web应用的配置参数。 参数&#xff1a; debug&#xff0c;设置tornado是否工作…

对数据仓库进行数据建模_确定是否可以对您的数据进行建模

对数据仓库进行数据建模Some data sets are just not meant to have the geospatial representation that can be clustered. There is great variance in your features, and theoretically great features as well. But, it doesn’t mean is statistically separable.某些数…

15 并发编程-(IO模型)

一、IO模型介绍 1、阻塞与非阻塞指的是程序的两种运行状态 阻塞&#xff1a;遇到IO就发生阻塞&#xff0c;程序一旦遇到阻塞操作就会停在原地&#xff0c;并且立刻释放CPU资源 非阻塞&#xff08;就绪态或运行态&#xff09;&#xff1a;没有遇到IO操作&#xff0c;或者通过某种…

不提拔你,就是因为你只想把工作做好

2019独角兽企业重金招聘Python工程师标准>>> 我有个朋友&#xff0c;他30出头&#xff0c;在500强公司做技术经理。他戴无边眼镜&#xff0c;穿一身土黄色的夹克&#xff0c;下面是一条常年不洗的牛仔裤加休闲皮鞋&#xff0c;典型技术高手范。 三 年前&#xff0c;…

python内置函数多少个_每个数据科学家都应该知道的10个Python内置函数

python内置函数多少个Python is the number one choice of programming language for many data scientists and analysts. One of the reasons of this choice is that python is relatively easier to learn and use. More importantly, there is a wide variety of third pa…

C#使用TCP/IP与ModBus进行通讯

C#使用TCP/IP与ModBus进行通讯1. ModBus的 Client/Server模型 2. 数据包格式及MBAP header (MODBUS Application Protocol header) 3. 大小端转换 4. 事务标识和缓冲清理 5. 示例代码 0. MODBUS MESSAGING ON TCP/IP IMPLEMENTATION GUIDE 下载地址&#xff1a;http://www.modb…

Hadoop HDFS常用命令

1、查看hdfs文件目录 hadoop fs -ls / 2、上传文件 hadoop fs -put 文件路径 目标路径 在浏览器查看:namenodeIP:50070 3、下载文件 hadoop fs -get 文件路径 保存路径 4、设置副本数量 -setrep 转载于:https://www.cnblogs.com/chaofan-/p/9742633.html

SAP UI 搜索分页技术

搜索分页技术往往和另一个术语Lazy Loading&#xff08;懒加载&#xff09;联系起来。今天由Jerry首先介绍S/4HANA&#xff0c;CRM Fiori和S4CRM应用里的UI搜索分页的实现原理。后半部分由SAP成都研究院菜园子小哥王聪向您介绍Twitter的懒加载实现。 关于王聪的背景介绍&#x…

万彩录屏服务器不稳定,万彩录屏 云服务器

万彩录屏 云服务器 内容精选换一换内网域名是指仅在VPC内生效的虚拟域名&#xff0c;无需购买和注册&#xff0c;无需备案。云解析服务提供的内网域名功能&#xff0c;可以让您在VPC中拥有权威DNS&#xff0c;且不会将您的DNS记录暴露给互联网&#xff0c;解析性能更高&#xf…

针对数据科学家和数据工程师的4条SQL技巧

SQL has become a common skill requirement across industries and job profiles over the last decade.在过去的十年中&#xff0c;SQL已成为跨行业和职位描述的通用技能要求。 Companies like Amazon and Google will often demand that their data analysts, data scienti…