FLINK SQL数据类型

Flink SQL支持非常完善的数据类型,以满足不同的数据处理需求。以下是对Flink SQL数据类型的详细归纳:

一、原子数据类型

  1. 字符串类型
    • CHAR、CHAR(n):定长字符串,n代表字符的定长,取值范围为[1, 2147483647]。如果不指定n,则默认为1。
    • VARCHAR、VARCHAR(n)、STRING:可变长字符串,n代表字符的最大长度,取值范围为[1, 2147483647]。如果不指定n,则默认为1。STRING等同于VARCHAR(2147483647)。
  2. 二进制字符串类型
    • BINARY、BINARY(n):定长二进制字符串,n代表定长,取值范围为[1, 2147483647]。如果不指定n,则默认为1。
    • VARBINARY、VARBINARY(n)、BYTES:可变长二进制字符串,n代表字符的最大长度,取值范围为[1, 2147483647]。如果不指定n,则默认为1。BYTES等同于VARBINARY(2147483647)。
  3. 精确数值类型
    • DECIMAL、DECIMAL§、DECIMAL(p, s)、DEC、DEC§、DEC(p, s)、NUMERIC、NUMERIC§、NUMERIC(p, s):固定长度和精度的数值类型,p代表数值位数(长度),取值范围为[1, 38];s代表小数点后的位数(精度),取值范围为[0, p]。如果不指定,p默认为10,s默认为0。
  4. 有损精度数值类型
    • TINYINT:-128到127的1字节大小的有符号整数。
    • SMALLINT:-32768到32767的2字节大小的有符号整数。
    • INT、INTEGER:-2147483648到2147483647的4字节大小的有符号整数。
    • BIGINT:-9223372036854775808到9223372036854775807的8字节大小的有符号整数。
  5. 浮点类型
    • FLOAT:4字节大小的单精度浮点数值。
    • DOUBLE、DOUBLE PRECISION:8字节大小的双精度浮点数值。
  6. 布尔类型
    • BOOLEAN。
  7. 日期、时间类型
    • DATE:由年-月-日组成的不带时区含义的日期类型,取值范围为[0000-01-01, 9999-12-31]。
    • TIME、TIME§:由小时:分钟:秒[.小数秒]组成的不带时区含义的时间数据类型,精度高达纳秒,取值范围为[00:00:00.000000000, 23:59:59.9999999]。其中p代表小数秒的位数,取值范围为[0, 9],如果不指定p,默认为0。
    • TIMESTAMP、TIMESTAMP§、TIMESTAMP WITHOUT TIME ZONE、TIMESTAMP§ WITHOUT TIME ZONE:由年-月-日 小时:分钟:秒[.小数秒]组成的不带时区含义的时间类型,取值范围为[0000-01-01 00:00:00.000000000, 9999-12-31 23:59:59.999999999]。其中p代表小数秒的位数,取值范围为[0, 9],如果不指定p,默认为6。
    • TIMESTAMP WITH TIME ZONE、TIMESTAMP§ WITH TIME ZONE:由年-月-日 小时:分钟:秒[.小数秒] 时区组成的带时区含义的时间类型,取值范围为[0000-01-01 00:00:00.000000000 +14:59, 9999-12-31 23:59:59.999999999 -14:59]。其中p代表小数秒的位数,取值范围为[0, 9],如果不指定p,默认为6。
    • TIMESTAMP_LTZ、TIMESTAMP_LTZ§:与TIMESTAMP WITH TIME ZONE类似,但时区信息不是携带在数据中的,而是由Flink SQL任务的全局配置决定的。

二、复合数据类型

  • ARRAY:数组类型,类似于Java的array。
  • MULTISET:集合类型,类似于Java的List。
  • ROW:对象类型,可以包含多个字段,每个字段有自己的类型和名称,类似于Java的Object或Scala的Case Class。
  • MAP:映射类型,包含键值对,键和值都可以是任意类型。
    样例:
  1. Tuple(元组)
// 创建一个包含String和Integer类型字段的Tuple2  
DataStream<Tuple2<String, Integer>> tupleStream = env.fromElements(  new Tuple2<>("Alice", 30),  new Tuple2<>("Bob", 25)  
);  // 访问Tuple2的字段  
tupleStream.map(tuple -> tuple.f0 + " is " + tuple.f1 + " years old")  .print();
  1. POJO(Plain Old Java Object,普通旧Java对象)
// 定义一个POJO类  
public class PersonPOJO {  public String name;  public int age;  // 无参构造方法  public PersonPOJO() {}  // 有参构造方法  public PersonPOJO(String name, int age) {  this.name = name;  this.age = age;  }  // Getter和Setter方法  public String getName() {  return name;  }  public void setName(String name) {  this.name = name;  }  public int getAge() {  return age;  }  public void setAge(int age) {  this.age = age;  }  
}  // 创建一个包含PersonPOJO对象的DataStream  
DataStream<PersonPOJO> personPOJOStream = env.fromElements(  new PersonPOJO("Alice", 30),  new PersonPOJO("Bob", 25)  
);  // 对DataStream进行处理  
personPOJOStream.map(person -> person.getName() + " is " + person.getAge() + " years old")  .print();
  1. Row
CREATE TABLE person_table (  id BIGINT,  name STRING,  age INT  
) WITH (  'connector' = '...,  ...  
);  -- 插入数据(假设已经有数据插入到person_table中)  -- 查询数据,并使用Row来表示结果集中的行  
SELECT id, name, age FROM person_table AS row(id BIGINT, name STRING, age INT);
  1. Map 和 Array
// 创建一个包含Map和Array的DataStream  
DataStream<Tuple2<Map<String, String>, Integer[]>> complexStream = env.fromElements(  new Tuple2<>(  new HashMap<String, String>() {{  put("key1", "value1");  put("key2", "value2");  }},  new Integer[]{1, 2, 3}  ),  // ... 其他元素  
);  // 对DataStream进行处理  
complexStream.map(tuple -> {  Map<String, String> map = tuple.f0;  Integer[] array = tuple.f1;  // ... 对map和array进行处理  return "Processed result"; // 示例返回值,实际应根据需求返回合适的类型  
})  
.print();

三、用户自定义数据类型

Flink SQL也支持用户自定义数据类型,用户可以根据自己的需求定义复杂的数据结构,并通过实现相应的接口或类来注册这些自定义类型。
1、定义与用途
用户自定义数据类型通常用于处理那些无法直接通过Flink内置数据类型表示的数据。例如,当需要处理一个包含多个字段的复杂数据结构时,就可以定义一个包含这些字段的用户自定义数据类型。

2、实现方式
在Flink中,实现用户自定义数据类型通常需要遵循以下步骤:

  1. 定义数据类型:首先,需要定义一个Java或Scala类来表示用户自定义数据类型。这个类应该包含所有需要的字段,并提供相应的getter和setter方法(如果是Java类,还需要一个无参构造方法)。
  2. 实现序列化与反序列化:为了使Flink能够处理用户自定义数据类型,需要实现相应的序列化器和反序列化器。这些序列化器和反序列化器负责将用户自定义数据类型转换为字节流,以及从字节流中恢复出用户自定义数据类型。
  3. 注册类型信息:在Flink中注册用户自定义数据类型的类型信息。这通常是通过在Flink的配置中指定类型信息的方式来实现的。
    3、注意事项
  4. 性能考虑:自定义数据类型的序列化与反序列化过程可能会对性能产生影响。因此,在实现自定义数据类型时,需要仔细考虑如何优化序列化与反序列化过程,以提高性能。
  5. 兼容性:当在不同的Flink集群或版本之间迁移时,需要确保自定义数据类型及其序列化器与反序列化器是兼容的。否则,可能会导致数据无法正确解析或处理。
  6. 错误处理:在处理用户自定义数据类型时,需要特别注意错误处理。例如,当遇到无法解析的数据时,应该能够优雅地处理这些错误,而不是导致整个作业失败。
    4、应用场景
    用户自定义数据类型在Flink中有广泛的应用场景。例如:
  7. 复杂数据结构处理:当需要处理包含多个字段的复杂数据结构时,可以使用用户自定义数据类型来表示这些结构。
  8. 自定义聚合函数:在实现自定义聚合函数时,可能需要使用用户自定义数据类型来存储中间结果或最终结果。
  9. 与外部系统交互:当Flink与外部系统(如数据库、消息队列等)交互时,可能需要将这些系统的数据类型转换为Flink能够处理的数据类型。这时,可以使用用户自定义数据类型来实现这种转换。
    5、样例
    1. 定义自定义数据类型
      首先,定义一个Java类来表示自定义数据类型。例如,我们定义一个名为Person的类,包含name和age两个字段。
public class Person {  private String name;  private int age;  // 无参构造方法  public Person() {}  // 有参构造方法  public Person(String name, int age) {  this.name = name;  this.age = age;  }  // Getter和Setter方法  public String getName() {  return name;  }  public void setName(String name) {  this.name = name;  }  public int getAge() {  return age;  }  public void setAge(int age) {  this.age = age;  }  // 重写toString方法,方便打印输出  @Override  public String toString() {  return "Person{name='" + name + "', age=" + age + "}";  }  
}
2. 实现序列化与反序列化

为了使Flink能够处理Person类型的数据,需要实现相应的序列化器和反序列化器。在Flink中,这通常通过实现TypeSerializer和TypeDeserializer接口来完成。然而,对于简单的POJO(Plain Old Java Object)类型,Flink通常能够自动推断并处理其序列化与反序列化过程,因此在这个例子中我们不需要显式实现这些接口。
3. 注册类型信息(如果需要)
在某些情况下,可能需要显式地在Flink中注册自定义数据类型的类型信息。这通常是在使用低级别的API(如DataStream API)时需要的。然而,在使用Table API或SQL时,Flink通常能够自动推断数据类型,因此不需要显式注册。
4. 使用自定义数据类型
现在,我们可以在Flink作业中使用Person类型的数据了。例如,我们可以创建一个DataStream,并向其中添加Person对象,然后对其进行处理。

// 假设已经有一个执行环境executionEnvironment  
DataStream<Person> personStream = executionEnvironment  .fromElements(new Person("Alice", 30), new Person("Bob", 25))  .name("Person Stream");  // 对DataStream进行处理,例如打印输出  
personStream.print();

综上所述,Flink SQL提供了丰富多样的数据类型,以满足不同的数据处理需求。用户可以根据实际情况选择合适的数据类型,并进行相应的数据处理操作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/56219.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

正点原子linux驱动笔记-字符设备驱动

1.linux驱动和模块加载函数 Linux驱动有两种运行方式 第一种是将驱动编译Linux内核中&#xff0c;这样当Linux内核启动的时候就会自动运行驱动程序。 第二种是将驱动编译成模块&#xff08;Linux下模块扩展名为".ko"&#xff09;&#xff0c;在Linux内核启动后使用…

《数据结构》--队列【各种实现,算法推荐】

一、认识队列 队列是一种常见的数据结构&#xff0c;按照先进先出&#xff08;FIFO&#xff0c;First In First Out&#xff09;的原则排列数据。也就是说&#xff0c;最早进入队列的元素最先被移除。队列主要支持两种基本操作&#xff1a; 入队&#xff08;enqueue&#xff0…

【微信小程序_9_WXSS模板样式】

摘要:本文主要介绍了微信小程序开发中的 WXSS。WXSS 类似于网页开发中的 CSS,具有其大部分特性同时又有扩展,如 rpx 尺寸单位、@import 样式导入等。其中 rpx 是解决屏适配的独特单位,有特定实现原理和不同设备的换算方式。@import 可导入外联样式表,有明确语法格式和示例…

爬虫设计思考之二

“所谓爬虫,其本质是一种计算机程序,它的行为看起来就像是蜘蛛在网上面爬行一样,顺着互联网这个“网”,一条线一条线地“爬行”。 一、认识爬虫 爬虫这个词对于非专业人士比较的陌生&#xff0c;但是实际却和我们的生活息息相关。例如我们国内经常使用的百度浏览器搜索&#x…

线性代数 行列式

一、行列式 1、定义 一个数学概念&#xff0c;主要用于 线性代数中&#xff0c;它是一个可以从方阵&#xff08;即行数和列数相等的矩阵&#xff09;形成的一个标量&#xff08;即一个单一的数值&#xff09; 2、二阶行列式 &#xff0c;像这样将一个式子收缩称为一个 2*2 的…

【数据结构】【链表代码】移除链表元素

移除链表元素 /*** Definition for singly-linked list.* struct ListNode {* int val;* struct ListNode *next;* };*/struct ListNode* removeElements(struct ListNode* head, int val) { // 创建一个虚拟头节点&#xff0c;以处理头节点可能被删除的情况 struct…

【mysql 截断订单表order 报错】

truncate table order;这个是一个截断订单表的sql语句 看起来没有什么问题 但是实际执行的时候是会报错的 SQLSTATE[42000]: Syntax error or access violation: 1064 You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version…

27-云计算下一个十年技术Serverless

├──27-云计算下一个十年技术Serverless | ├──1-Serverless深度实战之Knative | | ├──1-使用Knative平台环境说明 | | ├──2-现阶段云原生应用领域介绍 | | ├──3-为什么要引入Serverless | | ├──4-Serverless应用场景 | | ├──5-Serve…

4. 单例模式线程安全问题--是否加锁

单例模式线程安全问题--是否加锁 是否加锁问题指什么&#xff1f;解决多线程并发来带的问题继承MonoBehaviour的单例模式不继承MonoBehaviour的单例模式 总结 是否加锁问题指什么&#xff1f; 如果程序当中存在多线程&#xff0c;我们需要考虑当多个线程同时访问同一个内存空间…

计算机毕业设计 内蒙古旅游景点数据分析系统的设计与实现 Python毕业设计 Python毕业设计选题 Spark 大数据【附源码+安装调试】

博主介绍&#xff1a;✌从事软件开发10年之余&#xff0c;专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精…

筛斗数据:如何提高数据治理的安全性

提高数据治理的安全性是一个多层次、多维度的任务&#xff0c;涉及技术、管理、法律等多个方面。以下是一些具体的策略和建议&#xff1a; 一、技术层面 数据加密&#xff1a;采用先进的加密算法对敏感数据进行加密存储和传输&#xff0c;确保数据在各个环节中的安全性。这包…

element-plus组件之Upload(2.0)

接上篇 下面的属性就对应着回调函数&#xff0c;下面就一一进行介绍。 因为element-plus在封装upload组件时就自带了一个预览和删除的图标&#xff0c;只是没有方法实现&#xff0c;这里进行指明。 就是在图片墙列表中&#xff0c;自动就带了这两个图标和遮罩&#xff0c;下面…

pip安装指定版本的tensorflow

安装CPU版本&#xff1a;(以2.9.0版本为例) pip install tensorflow2.9.0安装GPU版本&#xff1a;(以2.9.0版本为例) pip install tensorflow-gpu2.9.0若下载缓慢&#xff0c;使用阿里国内镜像源加速下载&#xff1a;(以2.9.0版本为例) pip install -i https://mirrors.aliy…

[C#]使用纯opencvsharp部署yolov11-onnx图像分类模型

【官方框架地址】 https://github.com/ultralytics/ultralytics.git 【算法介绍】 使用纯OpenCvSharp部署YOLOv11-ONNX图像分类模型是一项复杂的任务&#xff0c;但可以通过以下步骤实现&#xff1a; 准备环境&#xff1a;首先&#xff0c;确保开发环境已安装OpenCvSharp和必…

人脸识别face-api.js应用简介

前阵子学习了一下face-api.js &#xff0c;偶有心得&#xff0c;跟大家分享一下。 face-api.js的原始项目是https://github.com/justadudewhohacks/face-api.js &#xff0c;最后一个release是2020年3月22日的0.22.2版&#xff0c;组件较老&#xff0c;API文档很全&#xff0c;…

鸿蒙网络管理模块07——网络质量管理

如果你也对鸿蒙开发感兴趣&#xff0c;加入“Harmony自习室”吧&#xff01;扫描下方名片&#xff0c;关注公众号&#xff0c;公众号更新更快&#xff0c;同时也有更多学习资料和技术讨论群。 1、概述 HarmonyOS提供了一套网络网络质量管理的套件&#xff08;Network Boost Ki…

[论文笔记]DAPR: A Benchmark on Document-Aware Passage Retrieval

引言 今天带来论文DAPR: A Benchmark on Document-Aware Passage Retrieval的笔记。 本文提出了一个基准&#xff1a;文档感知段落检索(Document-Aware Passage Retrieval,DAPR)以及介绍了一些上下文段落表示的方法。 为了简单&#xff0c;下文中以翻译的口吻记录&#xff0c…

麒麟信安CentOS安全加固案例获评中国信通院第三届“鼎新杯”数字化转型应用奖

“鼎新杯”数字化转型应用大赛&#xff0c;由中国通信标准化协会主办、中国信息通信研究院承办&#xff0c;以落实国家“十四五”规划关于“加快数字化发展&#xff0c;建设数字中国”的总体要求为目标&#xff0c;意在打造一批具有产业引领与推广应用效应的企业数字化转型应用…

conda打包

tar 是一个在 Unix 和类 Unix 系统中常用的命令行工具&#xff0c;用于打包多个文件和目录到一个归档文件&#xff08;通常称为 tarball&#xff09;&#xff0c;以及从这些归档文件中解包文件和目录。 以下是使用 tar 进行打包和解包的基本用法&#xff1a; 打包&#xff08;…

OCR+PDF解析配套前端工具开源详解!

目录 一、项目简介 TextIn为相关领域的前端开发提供了优秀的范本。 目前项目已在Github上开源&#xff01; 二、性能特色 三、安装使用 安装依赖启动项目脚本命令项目结构 四、效果展示 面对日常生活和工作中常见的OCR识别、PDF解析、翻译、校对等场景&#xff0c;配套的…