Flink-DataStream API

一、什么样的数据可以用于流式传输

Flink的DataStream API 允许流式传输他们可以序列化的任何内容。Flink自己的序列化程序用于

  • 基本类型:即字符串、长、整数、布尔值、数组
  • 复合类型:元组、POJO和Scala样例类

基本类型我们已经很熟悉了,下面我们看下复合类型。

1、元组

对于java,Flink定义了Tuple0Tuple25类型,例如:

Tuple2<String, Integer> person = Tuple2.apply("Fred",35);String name = person._1;
Integer age = person._2;

2、POJO

如果满足以下条件,Flink将数据类型识别为POJO类型(并允许“按名称”字段引用)

  • 该类是公共且独立的(没有非静态内部类)
  • 该类有一个公共的无参数构造函数
  • 类(以及所有超类)中的所有非静态、非瞬态字段要么是公共的(和非最终的),要么具有遵循getter和setterJavabean命名约定的公共getter和setter方法。

示例:

public class Person {public String name;  public Integer age;  public Person() {}public Person(String name, Integer age) {  . . .}
}  Person person = new Person("Fred Flintstone", 35);

3、样例类

样例类(Case classes)和普通类差不多,只有几点关键差别。样例类非常适合用于不可变的数据,多用于模式匹配。

case class Book(isbn: String)val frankenstein = Book("978-0486282114")

注意在实例化样例类Book时,并没有使用关键字new,这是因为样例类有一个默认的apply方法来负责对象的创建。

二、完整示例

该示例来自官方网站,是将有关人员的记录流作为输入,并对其进行过滤以仅包含成年人

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.FilterFunction;public class Example {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Person> flintstones = env.fromElements(new Person("Fred", 35),new Person("Wilma", 35),new Person("Pebbles", 2));DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {@Overridepublic boolean filter(Person person) throws Exception {return person.age >= 18;}});adults.print();env.execute();}public static class Person {public String name;public Integer age;public Person() {}public Person(String name, Integer age) {this.name = name;this.age = age;}public String toString() {return this.name.toString() + ": age " + this.age.toString();}}
}

1、执行环境

每个Flink应用程序都需要一个执行环境,在该例中为env。流应用程序需要使用StreamExecutionEnvironment

在应用程序中进行的DataStream API调用会构建一个附加到StreamExecutionEnvironment的作业图。当调用env.execute()时,此图会打包并发送到JobManager,JobManager会并行化作业并将其切片分发给TaskManager执行。作业的每个并行切片都将在一个任务槽中执行。

如果不调用execute(), 应用程序则不会执行。

此分布式运行时取决于您的应用程序是否可序列化。它还要求集群中的每个节点都可以使用所有依赖项。

2、source

上面的示例使用env.fromElements(...)构造DataStream<Person>。这是一种将简单流组合在一起以用于原型或测试的便捷方法。StreamExecutionEnvironment上还有一个fromCollection(Collection)方法。因此,也可以这样做:

List<Person> people = new ArrayList<Person>();people.add(new Person("Fred", 35));
people.add(new Person("Wilma", 35));
people.add(new Person("Pebbles", 2));DataStream<Person> flintstones = env.fromCollection(people);

在原型设计时将一些数据导入流的另一种方便方法是使用socket或文件

DataStream<String> lines = env.socketTextStream("localhost", 9999);
DataStream<String> lines = env.readTextFile("file:///path");

在实际应用中,最常用的数据源是那些支持低延迟、高吞吐量并行读取以及倒带和重放的数据源——这是高性能和容错的先决条件——例如Apache Kafka、Kinesis和各种文件系统。REST API和数据库也经常使用。

3、sink

上面的示例使用adults.print()将其结果打印到任务管理器日志(在IDE中运行时将显示在IDE的控制台中)。这将在流的每个元素上调用toString()

例如输出如下:

1> Fred: age 35
2> Wilma: age 35

其中1>和2>表示哪个子任务(即线程)产生了输出。

在生产中,常用的接收器包括FileSink、各种数据库和几个发布子系统。

---------------------------------------------------------------------------------------------------------------------------------

大多数高校硕博生毕业要求需要参加学术会议,发表EI或者SCI检索的学术论文会议论文:
可访问艾思科蓝官网,浏览即将召开的学术会议列表。会议如下:

 第八届大数据与应用统计国际学术研讨会(ISBDAS 2025)

https://ais.cn/u/fEzmy2

第二届生成式人工智能与信息安全国际学术会议(GAIIS 2025)

https://ais.cn/u/uAbENn

第四届电子技术与人工智能国际学术会议(ETAI 2025)

https://ais.cn/u/vqM7Nj

第四届网络安全、人工智能与数字经济国际学术会议(CSAIDE 2025)

https://ais.cn/u/ZrERn2

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/70915.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

渗透利器:Burp Suite 联动 XRAY 图形化工具.(主动扫描+被动扫描)

Burp Suite 联动 XRAY 图形化工具.&#xff08;主动扫描被动扫描&#xff09; Burp Suite 和 Xray 联合使用&#xff0c;能够将 Burp 的强大流量拦截与修改功能&#xff0c;与 Xray 的高效漏洞检测能力相结合&#xff0c;实现更全面、高效的网络安全测试&#xff0c;同时提升漏…

AI时代,职场人如何开启学习之旅

为什么要学习 AI 在当今数字化时代&#xff0c;AI 正以前所未有的速度改变着我们的工作和生活方式。从智能客服到自动化生产&#xff0c;从数据分析到个性化推荐&#xff0c;AI 已经广泛渗透到各个行业和领域。学习 AI&#xff0c;对于工作人员来说&#xff0c;不仅是提升工作…

2.3 Transformer架构革命:从自注意力到万亿参数大模型的演进之路

Transformer架构革命&#xff1a;从自注意力到万亿参数大模型的演进之路 一、Transformer核心突破&#xff1a;彻底颠覆序列建模范式 1.1 传统序列模型的致命瓶颈 # RNN/LSTM的串行计算缺陷示例 hidden_state torch.zeros(seq_len, batch_size, hidden_dim) for t in ra…

Java 大视界 -- 深入剖析 Java 在大数据内存管理中的优化策略(49)

&#x1f496;&#x1f496;&#x1f496;亲爱的朋友们&#xff0c;热烈欢迎你们来到 青云交的博客&#xff01;能与你们在此邂逅&#xff0c;我满心欢喜&#xff0c;深感无比荣幸。在这个瞬息万变的时代&#xff0c;我们每个人都在苦苦追寻一处能让心灵安然栖息的港湾。而 我的…

JVM ②-双亲委派模型 || 垃圾回收GC

这里是Themberfue 在上节课对内存区域划分以及类加载的过程有了简单的了解后&#xff0c;我们再了解其他两个较为重要的机制&#xff0c;这些都是面试中常考的知识点&#xff0c;有必要的话建议背出来&#xff0c;当然不是死记硬背&#xff0c;而是要有理解的背~~~如果对 JVM …

html文件怎么转换成pdf文件,2025最新教程

将HTML文件转换成PDF文件&#xff0c;可以采取以下几种方法&#xff1a; 一、使用浏览器内置功能 打开HTML文件&#xff1a;在Chrome、Firefox、IE等浏览器中打开需要转换的HTML文件。打印对话框&#xff1a;按下CtrlP&#xff08;Windows&#xff09;或CommandP&#xff08;M…

【Redis】 - Redis的Bitmap实现用户签到

Redis的Bitmap实现用户签到 使用Redis的Bitmap数据结构来记录用户的每日签到状态是一种高效且节省空间的方法。通过将用户ID和日期结合生成动态Key&#xff0c;可以轻松管理不同用户在不同日期的签到情况。下面详细介绍如何设计这一方案。 设计思路 动态Key生成&#xff1a;根…

系统开发:大文件下载报错问题

问题描述&#xff1a;在个人开发文件上传管理系统时&#xff0c;遇到小文件可以直接下载&#xff0c;遇到大文件只能在刚登陆成功时下载一次&#xff0c;再次下载别的大文件就会报错&#xff0c;具体错误信息是这样&#xff1a; AxiosError {message: Network Error, name: Ax…

蓝桥杯备赛笔记(二)

这里的笔记是关于蓝桥杯关键知识点的记录&#xff0c;有别于基础语法&#xff0c;很多内容只要求会用就行&#xff0c;无需深入掌握。 文章目录 前言一、时间复杂度1.1 时间复杂度⭐1.2 空间复杂度1.3 分析技巧 总结 前言 持续更新&#xff0c;千里之行始于足下 一、时间复杂度…

2025 西湖论剑wp

web Rank-l 打开题目环境&#xff1a; 发现一个输入框&#xff0c;看一下他是用上面语言写的 发现是python&#xff0c;很容易想到ssti 密码随便输&#xff0c;发现没有回显 但是输入其他字符会报错 确定为ssti注入 开始构造payload&#xff0c; {{(lipsum|attr(‘global…

Web前端开发--HTML

HTML快速入门 1.新建文本文件&#xff0c;后缀名改为.html 2.编写 HTML结构标签 3.在<body>中填写内容 HTML结构标签 特点 1.HTML标签中不区分大小写 2.HTML标签属性值中可以使用单引号也可使用双引号 3.HTML语法结构比较松散&#xff08;但在编写时要严格一点&…

加油口,电梯门的对称性对 TCP/IP 传输协议的启示

春节期间河南穷游屡次加油站排队加油之启示。 不考虑有意的设计因素&#xff0c;汽车加油口概率性分布在车身的左边或者右边&#xff0c;这个偶然的小细节让加油机同时为两辆车加油而无需额外的加油管。 如果所有车辆加油口都在同一侧&#xff0c;加油站的加油机就只能给一边的…

网络工程师 (30)以太网技术

一、起源与发展 以太网技术起源于20世纪70年代&#xff0c;最初由Xerox公司的帕洛阿尔托研究中心&#xff08;PARC&#xff09;开发。最初的以太网采用同轴电缆作为传输介质&#xff0c;数据传输速率为2.94Mbps&#xff08;后发展为10Mbps&#xff09;&#xff0c;主要用于解决…

ONES 功能上新|ONES Copilot、ONES TestCase、ONES Wiki 新功能一览

ONES Copilot 支持基于当前查看的工作项相关信息&#xff0c;利用 AI 模型&#xff0c;在系统中进行相似工作项的查找&#xff0c;包括基于已关联工作项的相似数据查找。 应用场景&#xff1a; 在查看工作项时&#xff0c;可利用 AI 模型&#xff0c;基于语义相似度&#xff0c…

从 X86 到 ARM :工控机迁移中的核心问题剖析

在工业控制领域&#xff0c;技术的不断演进促使着工控机从 X86 架构向 ARM 架构迁移。然而&#xff0c;这一过程并非一帆风顺&#xff0c;面临着诸多关键挑战。 首先&#xff0c;软件兼容性是一个重要问题。许多基于 X86 架构开发的工业控制软件可能无法直接在 ARM 架构上运行…

《qt open3d网格平滑》

qt open3d网格平滑 效果展示二、流程三、代码效果展示 二、流程 创建动作,链接到槽函数,并把动作放置菜单栏 参照前文 三、代码 1、槽函数实现 void on_actionFilterSmoothSimple_triggered();void MainWindow::on_actionF

DeepSeek自然语言处理(NLP)基础与实践

自然语言处理(Natural Language Processing, NLP)是人工智能领域的一个重要分支,专注于让计算机理解、生成和处理人类语言。NLP技术广泛应用于机器翻译、情感分析、文本分类、问答系统等场景。DeepSeek提供了强大的工具和API,帮助我们高效地构建和训练NLP模型。本文将详细介…

Redis 的缓存雪崩、缓存穿透和缓存击穿详解,并提供多种解决方案

本文是对 Redis 知识的补充&#xff0c;在了解了如何搭建多种类型的 Redis 集群&#xff0c;并清楚了 Redis 集群搭建的过程的原理和注意事项之后&#xff0c;就要开始了解在使用 Redis 时可能出现的突发问题和对应的解决方案。 引言&#xff1a;虽然 Redis 是单线程的&#xf…

路由过滤方法与常用工具

引言 在前面我们已经学习了路由引入&#xff0c;接下来我们就更进一步来学习路由过滤 前一篇文章&#xff1a;重发布&#xff1a;路由引入&#xff08;点击即可&#xff09; 路由过滤 定义&#xff1a;路由器在发布或者接收消息时&#xff0c;可能需要对路由信息进行过滤。 作用…

温故知新LS

这里写目录标题 chapter 1 numpychapter 2 torchchapter 3 chapter 1 numpy import numpy as np import torch ######################### chapter 1 numpy ######################### # 1 numpy与torch类型数据相互转换 a np.random.randint(0,10,[3,3]) a_t torch.from_nu…