48、Flink 的 Data Source API 详解

a)概述

本节将描述 FLIP-27 中引入的新 Source API 的主要接口。

b)Source

Source API 是一个工厂模式的接口,用于创建以下组件。

  • Split Enumerator
  • Source Reader
  • Split Serializer
  • Enumerator Checkpoint Serializer

此外,Source 还提供了 Boundedness【有界】的特性,使 Flink 可以选择合适的模式来运行 Flink 任务。

Source 实现应该是可序列化的,因为 Source 实例会在运行时被序列化并上传到 Flink 集群。

c)SplitEnumerator

SplitEnumerator 典型实现如下

  • SourceReader 的注册处理;
  • SourceReader 的失败处理;
    • SourceReader 失败时会调用 addSplitsBack() 方法;SplitEnumerator 会收回已经被分配,但尚未被该 SourceReader 确认(acknowledged)的分片。
  • SourceEvent 的处理
    • SourceEvents 是 SplitEnumerator 和 SourceReader 之间来回传递的自定义事件,可以利用此机制来执行复杂的协调任务。
  • 分片的发现以及分配
    • SplitEnumerator 可以将分片分配到 SourceReader 从而响应各种事件,包括发现新的分片、新 SourceReader 的注册、SourceReader 的失败处理等。

SplitEnumerator 可以在 SplitEnumeratorContext 的帮助下完成上述工作,SplitEnumeratorContext 会在 SplitEnumerator 创建或者恢复的时候提供给 Source。

SplitEnumeratorContext 允许 SplitEnumerator 检索到 reader 的必要信息并执行协调操作,而在 Source 的实现中会将 SplitEnumeratorContext 传递给 SplitEnumerator 实例。

SplitEnumerator 的实现可以仅采用被动工作方式,仅在其方法被调用时采取协调操作;但是一些 SplitEnumerator 的实现会采取主动的工作方式;例如 SplitEnumerator 定期寻找分片并分配给 SourceReader,这类问题使用 SplitEnumeratorContext 类中的 callAsync() 方法比较方便。

示例:如何在 SplitEnumerator 不需要自己维护线程的条件下实现这一点。

class MySplitEnumerator implements SplitEnumerator<MySplit, MyCheckpoint> {private final long DISCOVER_INTERVAL = 60_000L;/*** 一种发现分片的方法*/private List<MySplit> discoverSplits() {...}@Overridepublic void start() {...enumContext.callAsync(this::discoverSplits, splits -> {Map<Integer, List<MySplit>> assignments = new HashMap<>();int parallelism = enumContext.currentParallelism();for (MySplit split : splits) {int owner = split.splitId().hashCode() % parallelism;assignments.computeIfAbsent(owner, new ArrayList<>()).add(split);}enumContext.assignSplits(new SplitsAssignment<>(assignments));}, 0L, DISCOVER_INTERVAL);...}...
}
d)SourceReader

SourceReader 是一个运行在 Task Manager 上的组件,用于处理来自分片的记录。

SourceReader 提供了一个拉取式的(pull-based)处理接口,Flink 任务会在循环中不断调用 pollNext(ReaderOutput) 轮询来自 SourceReader 的记录,pollNext(ReaderOutput) 方法的返回值指示 SourceReader 的状态。

  • MORE_AVAILABLE - SourceReader 有可用的记录。
  • NOTHING_AVAILABLE - SourceReader 现在没有可用的记录,但是将来可能会有记录可用。
  • END_OF_INPUT - SourceReader 已经处理完所有记录,到达数据的尾部。即 SourceReader 可以终止任务了。

pollNext(ReaderOutput) 会使用 ReaderOutput 作为参数,为了提高性能且在必要情况下,SourceReader 可以在一次 pollNext() 调用中返回多条记录;例如外部系统的工作粒度为块,而一个块可以包含多个记录,但是 source 只能在块的边界处设置 Checkpoint,此时SourceReader 可以一次将一个块中的所有记录通过 ReaderOutput 发送至下游。

**注意:SourceReader 的实现应该避免在一次 pollNext(ReaderOutput) 的调用中发送多个记录;**因为对 SourceReader 轮询的任务线程工作在一个事件循环(event-loop)中,且不能阻塞。

在创建 SourceReader 时,相应的 SourceReaderContext 会提供给 Source,而 Source 会将相应的上下文传递给 SourceReader 实例;SourceReader 可以通过 SourceReaderContextSourceEvent 传递给相应的 SplitEnumeratorSource 的一个典型设计模式是让 SourceReader 发送它们的本地信息给 SplitEnumerator,后者则会全局性地做出决定。

SourceReader API 是一个底层(low-level) API,允许用户自行处理分片,并使用自己的线程模型来获取和移交记录;为了帮助实现 SourceReader,Flink 提供了 SourceReaderBase 类,可以显著减少编写 SourceReader 所需要的工作量。

强烈建议连接器开发人员充分利用 SourceReaderBase 而不是从头开始编写 SourceReader

e)Source 使用方法

为了通过 Source 创建 DataStream,需要将 Source 传递给 StreamExecutionEnvironment

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Source mySource = new MySource(...);DataStream<Integer> stream = env.fromSource(mySource,WatermarkStrategy.noWatermarks(),"MySourceName");

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/23197.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

D-Day 上海站回顾丨以科技赋能量化机构业务

5月31日下午&#xff0c;DolphinDB 携手光大证券&#xff0c;在上海成功举办 D-Day 行业交流会。三十余位来自私募机构的核心策略研发、量化交易员、数据分析专家们齐聚现场&#xff0c;深入交流量化投研交易过程中的经验、挑战及解决方案。 DolphinDB 赋能机构业务平台 来自光…

1877java项目建设平台管理系统Myeclipse开发mysql数据库web结构java编程计算机网页项目

一、源码特点 java 项目建设平台管理系统 是一套完善的web设计系统&#xff0c;对理解JSP java编程开发语言有帮助采用了java设计&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统采用web模式&#xff0c;系统主要采用B/S模式开 发。开发环境为TOMCAT7.0,Myeclipse8.…

java表实体 蛇形转驼峰 正则匹配替换

java表实体 蛇形转驼峰 正则匹配替换 1.匹配寻找正则&#xff1a;([a-z])_([a-z])2.替换结果正则&#xff1a;$1\U$2\E效果如下图所示&#xff1a;

Python第二语言(三、Python函数def)

目录 1. Python函数&#xff08;def 函数名():&#xff09; 1.1 sorted对容器进行排序&#xff1a;无法指定排序规则 1.2 sort对容器自定义排序&#xff1a;可以指定排序规则 1.3 获取变量长度函数&#xff08;len&#xff09; 1.4 函数的定义 1.5 函数-传参定义 1.6 函…

如何使用 Systemd 和 Nginx 部署 Node.js 应用程序

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。 介绍 在将 Web 应用部署到 Droplet 时,可能会诱人地简单地使用与开发中相同的设置,即在终端中运行“ruby app.rb”或“node server.js”来启动服务器。这样做简单易行…

C#-for循环语句

for循环语句 语法: for(初始化变量; 判断条件; 增量表达式) { // 内部代码 } 第一个空(初始表达式): 一般用来声明一个临时的局部变量 用来计数第二个空(条件表达式): 表明进入循环的条件 一个bool类型的值(bool类型 条件表达式 逻辑运算符)第三个空(增量表达式): 使用第一个空…

Python怎么翻译:探索Python在翻译领域的无限可能

Python怎么翻译&#xff1a;探索Python在翻译领域的无限可能 Python&#xff0c;这门强大而灵活的编程语言&#xff0c;已经在众多领域展现了其独特的魅力。然而&#xff0c;当谈到翻译这一领域时&#xff0c;许多人可能会感到困惑&#xff1a;Python怎么能用于翻译呢&#xf…

OpenCV如何判断一张图片是否有过高的明暗变化

操作系统&#xff1a;ubuntu22.04OpenCV版本&#xff1a;OpenCV4.9IDE:Visual Studio Code编程语言&#xff1a;C11 前言 判断一张图片是否有过高的明暗变化&#xff0c;可以通过分析图像的亮度分布一致性来实现。一种常见的做法是计算图像的亮度标准差&#xff08;Standard …

免费,C++蓝桥杯等级考试真题--第7级(含答案解析和代码)

C蓝桥杯等级考试真题--第7级 答案&#xff1a;D 解析&#xff1a;步骤如下&#xff1a; 首先&#xff0c;--a 操作会使 a 的值减1&#xff0c;因此 a 变为 3。判断 a > b 即 3 > 3&#xff0c;此时表达式为假&#xff0c;因为 --a 后 a 并不大于 b。因此&#xff0c;程…

ESP32-C3模组上跑通NVS(4)

接前一篇文章&#xff1a;ESP32-C3模组上跑通NVS&#xff08;3&#xff09; 本文内容参考&#xff1a; 非易失性存储库 - ESP32 - — ESP-IDF 编程指南 latest 文档 ESP32-C3入门教程 基础篇&#xff08;八、NVS — 非易失性存储库的使用&#xff09;_esp入门教学-CSDN博客 …

STM32 启用指令缓存 HAL_ICACHE_Enable

函数在 STM32 的 HAL&#xff08;硬件抽象层&#xff09;库中通常用于启用指令缓存&#xff08;I-Cache&#xff09;。以下是该函数的主要功能&#xff1a; 启用指令缓存&#xff1a; 当调用 HAL_ICACHE_Enable 函数时&#xff0c;STM32 的 Cortex-M 处理器&#xff08;特别是…

ElementUI的Table组件在无数据情况下让“暂无数据”文本居中显示

::v-deep .el-table__empty-block {width: 100%;min-width: 100%;max-width: 100%; }

如何在npm上发布自己的包

如何在npm上发布自己的包 npm创建自己的包 一、一个简单的创建 1、创建npm账号 官网&#xff1a;https://www.npmjs.com/创建账号入口&#xff1a;https://www.npmjs.com/signup 注意&#xff1a;需要进入邮箱验证 2、创建目录及初始化 $ mkdir ufrontend-test $ cd ufron…

今日科普:了解、预防、控制高血压

高血压&#xff0c;常被称为“隐形的健康威胁”&#xff0c;许多患者可能在毫无预警的情况下发病&#xff0c;且患病率逐年攀升&#xff0c;同时患者群体逐渐年轻化&#xff0c;高血压虽然难以根治&#xff0c;但并不可怕&#xff0c;真正可怕的是血压长期居高不下&#xff0c;…

STM32(七):ADC电位检测 (标准库函数)

前言 上一篇文章已经介绍了如何用STM32单片机中的定时器的PWM波来实现LED的“呼吸”。这篇文章我们来介绍一下如何用STM32单片机中ADC进行电位检测&#xff0c;并发送到XCOM串口中显示。 一、实验原理 1.ADC模数转换的介绍 首先&#xff0c;我们先介绍一下AD模数模块&#…

arcpy批量导出图且图名为shp属性值

1.打开arcmap加载需要导出的图。需求是逐村显示“村界内图斑”并导出为图&#xff0c;在导出每个村时不显示周围的村和“村界内图斑” 2.arcmap上方空白处右键打开“数据驱动页面” 3.在“数据驱动页面”工具条点击第一个图标&#xff0c;打开“设置数据驱动页面” 4.在“设置…

Oracle触发器的用途

0.DDL和DML DDL:data manipulation language,数据操作语言&#xff0c;主要命令有select&#xff0c;update&#xff0c;insert,delete,主要用于对数据库中的数据进行操作 DDL:data definition language,数据定义语言&#xff0c;主要命令有create&#xff0c;alter&#xff…

Spring (40)Spring Cloud和Spring Boot

Spring Boot和Spring Cloud是两个在现代Java应用开发中广泛使用的项目&#xff0c;它们共同构成了创建云原生、微服务架构应用的强大基础。虽然它们紧密协作&#xff0c;但服务于不同的目的。下面&#xff0c;我们将深入探讨它们之间的关系&#xff0c;结合源码和代码示例来进行…

Java1.8全套家政上门服务+springboot+ mysql +Thymeleaf 技术架构开发,家政APP系统在线派单,师傅入驻全套商业源码

Java1.8全套家政上门服务springboot mysql Thymeleaf 技术架构开发&#xff0c;家政APP系统在线派单&#xff0c;师傅入驻全套商业源码 家政预约上门小程序的实用性&#xff1f; 家政预约上门小程序的实用性主要体现在以下几个方面&#xff1a; 一、方便快捷的预约体验&#…

尚硅谷2024新版3小时速通Docker教程

尚硅谷2024新版3小时速通Docker教程 百度网盘&#xff1a;https://pan.baidu.com/s/1SncgHbdJehvZspjcrrbLSw?pwd6c27