Flink之Table API

Apache Flink 的 Table API 是 Flink 提供的一种高级抽象,用于以声明式方式处理批处理和流处理数据。它是基于关系模型的 API,用户可以像编写 SQL 一样,以简洁、类型安全的方式编写数据处理逻辑。


一、基本概念

1. 什么是 Table API?

Table API 是 Flink 中用于处理结构化数据的 关系型编程接口,它支持:

  • 批处理(Batch)

  • 流处理(Streaming)

Table API 提供了类似 SQL 的语法风格,但以函数式 API 方式表达,具备更好的类型安全和 IDE 友好性。


二、核心组件

1. Table

  • Flink 中的 Table 是对结构化数据的一种抽象。

  • 相当于数据库中的表,可以进行过滤、聚合、连接等操作。

2. TableEnvironment

  • Table API 的执行上下文。

  • 创建表、注册 UDF、执行 SQL/Table API 操作等都依赖它。

3. Schema(模式)

  • 定义表结构,包括字段名、数据类型、主键、水位线等。


三、编程模型

// 1. 创建 TableEnvironment
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);// 2. 注册表(从外部数据源)
tableEnv.executeSql("""CREATE TABLE source_table (id STRING,ts TIMESTAMP(3),val INT,WATERMARK FOR ts AS ts - INTERVAL '5' SECOND) WITH ('connector' = 'kafka','topic' = 'test',...)
""");// 3. 使用 Table API 处理数据
Table result = tableEnv.from("source_table").filter($("val").isGreater(10)).groupBy($("id")).select($("id"), $("val").avg().as("avg_val"));// 4. 输出结果到目标表
tableEnv.executeSql("""CREATE TABLE sink_table (id STRING,avg_val DOUBLE) WITH ('connector' = 'print')
""");result.executeInsert("sink_table");

四、常用操作

操作类型示例
过滤table.filter($("age").isGreater(18))
投影table.select($("name"), $("age"))
聚合table.groupBy($("city")).select($("city"), $("salary").avg())
连接table1.join(table2).where(...).select(...)
去重table.distinct()
排序table.orderBy($("time").desc())
窗口table.window(...)(见下文)

五、时间和窗口支持

Table API 支持两种时间语义:

  • 事件时间(Event Time)

  • 处理时间(Processing Time)

常见的窗口类型:

  • 滚动窗口(Tumble)

  • 滑动窗口(Slide)

  • 会话窗口(Session)

示例:

table.window(Tumble.over(lit(10).minutes()).on($("ts")).as("w")).groupBy($("id"), $("w")).select($("id"), $("w").start(), $("val").sum());

六、与 SQL 的关系

Table API 与 SQL 是等价的抽象:

  • SQL 更加声明式,适合数据分析人员;

  • Table API 更加灵活、支持编程逻辑,适合开发者。

两者可以混合使用,例如:

Table result = tableEnv.sqlQuery("SELECT id, COUNT(*) FROM source GROUP BY id");

七、数据源和连接器支持

Table API 支持多种数据源和 sink,通过 Flink Connector 实现:

常见的:

  • Kafka

  • HDFS

  • MySQL / JDBC

  • Elasticsearch

  • Hive

  • Iceberg / Delta / Hudi

  • Redis 等

通过 SQL 创建表:

CREATE TABLE example (...
) WITH ('connector' = 'kafka',...
);

八、UDF 扩展

可以定义自定义函数:

  • ScalarFunction(标量函数)

  • TableFunction(表函数)

  • AggregateFunction(聚合函数)

  • TableAggregateFunction(表聚合函数)

示例:

public class HashCode extends ScalarFunction {public int eval(String s) {return s.hashCode();}
}tableEnv.createTemporarySystemFunction("HashCode", HashCode.class);
table.select(call("HashCode", $("name")));

九、批与流统一

Flink 的 Table API 实现了 批流统一语义,相同的 API 可用于处理批或流数据,只需切换 EnvironmentSettings 即可。


十、优点总结

  • 统一的 API:批流统一,开发逻辑一致

  • 类型安全:Java/Scala 函数式风格,IDE 友好

  • 与 SQL 无缝切换

  • 可插拔的连接器与格式支持

  • 强大的时间与窗口语义支持

  • 与 Flink Runtime 深度整合,性能高效

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/81013.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于Vue3.0的高德地图api教程005:实现绘制线并编辑功能

文章目录 6、绘制多段线6.1 绘制多段线6.1.1 开启绘制功能6.1.2 双击完成绘制6.1.3 保存到数据库6.2 修改多段线6.2.1 点击线,进入编辑模式6.2.2 编辑线6.3 完整代码6、绘制多段线 6.1 绘制多段线 6.1.1 开启绘制功能 实现代码: const changeSwitchDrawPolyline = ()=>…

“redis 目标计算机积极拒绝,无法连接” 解决方法,每次开机启动redis

如果遇到以上问题 先打开“服务” 找到App Readiness 右击-启动 以管理员身份运行cmd,跳转到 安装redis的目录 运行:redis-server.exe redis.windows.conf 以管理员身份打开另一cmd窗口,跳转到安装redis的目录 运行:redis-…

Java 大视界——Java 大数据在智慧交通智能停车诱导系统中的数据融合与实时更新

面对城市停车资源错配导致的30%以上交通拥堵问题&#xff0c;本文以某新一线城市智慧交通项目为蓝本&#xff0c;深度解析Java大数据技术如何实现多源停车数据融合、动态路径规划与诱导策略优化。通过构建“感知-计算-决策”全链路系统&#xff0c;实现车位状态更新延迟<200…

牛客周赛 Round 92(再现京津冀蓝桥杯???)

1. 小红的签到题 现在小红希望你写出一个长度为 nnn 的、使用了下划线命名法命名的变量。为了显出特征&#xff0c;请保证该变量至少由两个单词组成。 输入描述: 输入一个正整数 n(3≦n≦100)&#xff0c;代表需要构造的变量长度。 输出描述: 输出一个长度为 n 的字符串&#x…

2025-05-11 Unity 网络基础11——UnityWebRequest

文章目录 1 UnityWebRequest 介绍2 搭建 HTTP 服务器3 常用操作3.1下载资源3.1.1 下载文本3.1.2 下载图片3.1.3 下载 AB 包 3.2 上传资源3.2.1 上传数据类3.2.2 POST 上传3.3.3 PUT 上传 4 自定义操作4.1 下载资源4.1.1 Unity 内置 Handler4.1.2 自定义 Handler 4.2 上传资源4.…

GitHub 趋势日报 (2025年05月09日)

本日报由 TrendForge 系统生成 https://trendforge.devlive.org/ &#x1f310; 本日报中的项目描述已自动翻译为中文 &#x1f4c8; 今日整体趋势 Top 10 排名项目名称项目描述今日获星总星数语言1voideditor/void⭐ 1879⭐ 15214TypeScript2ruanyf/weekly科技爱好者周刊&…

.NET MAUI 基础知识

文章目录 什么是 .NET MAUI&#xff1f;MAUI的核心特点与Xamarin.Forms的区别 开发环境搭建安装Visual Studio 2022安装必要组件配置Android开发环境配置iOS开发环境验证安装 创建第一个MAUI应用创建新项目MAUI项目结构解析理解关键文件运行应用程序简单修改示例使用热重载 MAU…

卷积神经网络全连接层详解:特征汇总、FCN替代与性能影响分析

【内容摘要】 本文聚焦卷积神经网络&#xff08;CNN&#xff09;的全连接层&#xff0c;详细介绍其将二维特征图转化为一维向量的过程&#xff0c;阐述全卷积网络&#xff08;FCN&#xff09;如何通过转置卷积替代全连接层以实现像素级分类&#xff0c;并分析全连接层对图像分类…

在C++中进行套接字编程时,主要使用以下头文件

目录 一.基本套接字头文件<sys/socket.h><netinet/in.h><arpa/inet.h><unistd.h><netdb.h> 二. 完整示例头文件包含三. 注意事项 在C中进行套接字编程时&#xff0c;主要使用以下头文件&#xff1a; 一.基本套接字头文件 <sys/socket.h>…

【Linux网络】HTTP

应用层协议 HTTP 前置知识 我们上网的所有行为都是在做IO&#xff0c;&#xff08;我的数据给别人&#xff0c;别人的数据给我&#xff09;图片。视频&#xff0c;音频&#xff0c;文本等等&#xff0c;都是资源答复前需要先确认我要的资源在哪台服务器上&#xff08;网络IP&…

JAVA异常体系

在 Java 里&#xff0c;异常体系是其错误处理机制的核心内容&#xff0c;它能够帮助开发者有效应对程序运行时出现的各种意外状况。 异常体系的基本架构 它主要包含两个重要分支&#xff1a; Error&#xff08;错误&#xff09;&#xff1a;这类异常是程序自身无法处理的严重…

vue 去掉右边table的下拉条与下面的白色边框并补充满

::v-deep table {width: 100% !important; } ::v-deep .el-table::after, .el-table::before {display: none !important; }/* 隐藏滚动条但保留滚动功能 */ ::v-deep .el-table__body-wrapper::-webkit-scrollbar {width: 0 !important;height: 0 !important; }::v-deep .el-t…

uniapp+vue3+uview来开发我们的项目

前言&#xff1a; 就像我们vue的web的框架element、iview等一样&#xff0c;我们的uni-app开发也有适合的他的框架&#xff0c;除了他本身的扩展组件以外&#xff0c;第三方好用的就是就是uview了。 实现效果&#xff1a; 官网信息&#xff1a; vue2版本&#xff1a;uview-ui …

数据仓库:企业数据管理的核心引擎

一、数据仓库的由来 数据仓库&#xff08;Data Warehouse, DW&#xff09;概念的诞生源于企业对数据价值的深度挖掘需求。在1980年代&#xff0c;随着OLTP&#xff08;联机事务处理&#xff09;系统在企业中的普及&#xff0c;传统关系型数据库在处理海量数据分析时显露出明显瓶…

YOLOv12模型部署(保姆级)

一、下载YOLOv12源码 1.通过网盘分享的文件&#xff1a;YOLOv12 链接: https://pan.baidu.com/s/12-DEbWx1Gu7dC-ehIIaKtQ 提取码: sgqy &#xff08;网盘下载&#xff09; 2.进入github克隆YOLOv12源码包 二、安装Anaconda/pycharm 点击获取官网链接(anaconda) 点击获取…

一篇解决Redis:持久化机制

目录 认识持久化 持久化方案 RDB&#xff08;Redis DataBase&#xff09; 手动触发 自动触发 小结 AOF(Append-Only File) AOF缓冲区刷新机制 AOF重写机制 AOF重写流程 ​编辑 混合持久化 认识持久化 我们都知道Mysql有四大特征&#xff0c;原子性&#xff0c;持久…

从 Vue3 回望 Vue2:事件总线的前世今生

从 Vue3 回望 Vue2&#xff1a;事件总线的前世今生 以 Vue3 开发者视角回顾 Vue2 中事件总线机制 的文章。文章将围绕事件总线的缘起、用法、局限与演进展开&#xff0c;帮助 Vue3 开发者理解 Vue2 通信方式的历史意义及现代替代方案。 一、前言&#xff1a;Vue3 时代&#xff…

CSS结构性伪类、UI伪类与动态伪类全解析:从文档结构到交互状态的精准选择

一、结构性伪类选择器&#xff1a;文档树中的位置导航器 结构性伪类选择器是CSS中基于元素在HTML文档树中的层级关系、位置索引或结构特征进行匹配的一类选择器。它们无需依赖具体的类名或ID&#xff0c;仅通过文档结构即可精准定位元素&#xff0c;是实现响应式布局和复杂文档…

【SSL证书系列】SSL证书工作原理解读

SSL&#xff08;Secure Sockets Layer&#xff09;及其继任者TLS&#xff08;Transport Layer Security&#xff09;是用于保护网络通信安全的加密协议。SSL证书是实现HTTPS协议的核心&#xff0c;其工作原理涉及加密技术、身份验证和信任机制。以下是其工作原理的详细分步解析…

第二十四天打卡

import os os.getcwd() os.listdir() path_a r"C:\Users\renshuaicheng\Documents" path_b "MyProjectData" file "results.csv" file_path os.path.join(path_a,path_b,file) file_path import osstart_directory os.getcwd() # 假设这个目…