Flink checkpoint 源码分析- Checkpoint snapshot 处理流程

背景

在上一篇博客中我们分析了代码中barrier的是如何流动改的。Flink checkpoint 源码分析- Checkpoint barrier 传递源码分析-CSDN博客

最后跟踪到了代码org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate#handleEvent 

现在我们接着跟踪相应代码,观察是如何算子接受到了barrier是如何进行下一步代码处理的。以及了解flink应对不同的消费语义(At least once, exactly once)对于checkpoint的影响是怎样的。

代码分析

org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate#handleEvent 中我们主要关注对于checkpointBarrier的处理流程。

processBarrier方法实现上就可以看出,flink barrier的处理分成两种。

在这里我们需要跟踪一下barrierHandler 是如何生成的才能知道后面所要走的流程是哪一步。

通过往上追溯barrierHandler的生成,我们跟踪到方法:org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil#createCheckpointBarrierHandler 从代码中我们可以看到 如果是 EXACTLY_ONCE 那么生成的就SingleCheckpointBarrierHandler, 如果checkpoint 模式是AT_LEAST_ONCE, 生成对应的handler就是CheckpointBarrierTracker。 但是从代码中,EXACTLY_ONCE似乎不是简单的new 一个SingleCheckpointBarrierHandler, 而是通过一个方法来生成。因此需要进一步的观察这个方法是如何实现的。

org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil#createBarrierHandler

这里针对checkpoint类型做了区分,主要是分为aligned checkpoint 和 unaliged checkpoint的差异。这里可以进一步观察一下这两类checkpoint之前的差异。

对比这两个方法参数的差异,发现主要就是两处处参数有差异。subTaskCheckpointCoordinator、barrierHandlerState。这两个的差异主要体现在flink 在aligned checkpoint超时时,会切换为unaligned checkpoint。这里可以先按下不表,回到最开始的处理历程。

总结一下就是如果是flink 设置了at least once是使用的是CheckpointBarrierTracker, 当flink模式为exactly once时是SingleCheckpointBarrierHandler。 当为exactly once时checkpoint 类型又可以分为是aligned checkpoint还是unaligned checkpoint。

At least once 下 barrier是如何处理的

at least once 下对于barrier的处理是在以下的方法中实现的。

org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker#processBarrier

public void processBarrier(CheckpointBarrier receivedBarrier, InputChannelInfo channelInfo) throws IOException {final long barrierId = receivedBarrier.getId();// fast path for single channel trackersif (totalNumberOfInputChannels == 1) {markAlignmentStartAndEnd(receivedBarrier.getTimestamp());notifyCheckpoint(receivedBarrier);return;}// general path for multiple input channelsif (LOG.isDebugEnabled()) {LOG.debug("Received barrier for checkpoint {} from channel {}", barrierId, channelInfo);}// find the checkpoint barrier in the queue of pending barriersCheckpointBarrierCount barrierCount = null;int pos = 0;for (CheckpointBarrierCount next : pendingCheckpoints) {if (next.checkpointId == barrierId) {barrierCount = next;break;}pos++;}if (barrierCount != null) {// add one to the count to that barrier and check for completionint numBarriersNew = barrierCount.incrementBarrierCount();if (numBarriersNew == totalNumberOfInputChannels) {// checkpoint can be triggered (or is aborted and all barriers have been seen)// first, remove this checkpoint and all all prior pending// checkpoints (which are now subsumed)for (int i = 0; i <= pos; i++) {pendingCheckpoints.pollFirst();}// notify the listenerif (!barrierCount.isAborted()) {if (LOG.isDebugEnabled()) {LOG.debug("Received all barriers for checkpoint {}", barrierId);}markAlignmentEnd();notifyCheckpoint(receivedBarrier);}}}else {// first barrier for that checkpoint ID// add it only if it is newer than the latest checkpoint.// if it is not newer than the latest checkpoint ID, then there cannot be a// successful checkpoint for that ID anywaysif (barrierId > latestPendingCheckpointID) {markAlignmentStart(receivedBarrier.getTimestamp());latestPendingCheckpointID = barrierId;pendingCheckpoints.addLast(new CheckpointBarrierCount(barrierId));// make sure we do not track too many checkpointsif (pendingCheckpoints.size() > MAX_CHECKPOINTS_TO_TRACK) {pendingCheckpoints.pollFirst();}}}}

如果只有一个inputchannel的情况下,在收到这一个barrier的时候,就可以做snapshot.

在这个中间会经过triggerCheckpointOnBarrier 等方法, 最后实际还是调到了org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl#checkpointState ,看到这里其实这很长的链路实际是一个循环,下一个算子会生成barrier,接着传递这个barrier。

实际情况是作业并行度不唯一,一个subtask往往是有多个inputchannel. 可以继续看看是如何处理的。

这里面当收取到第一个barrier,会将这个barrier信息存在个队列中。

当收取到时非第一个barrier的时候会进行计数,当收取到的是最后一个barrier的时候就会将barrier队列中在这个barrier之前的barrier全部清除,之后就可以通知做checkpoint snapshot, 这个流程就和之前的一个信道的checkpoint流程是一致的。

总结而言:at least 类型的checkpoint是在收到最后一个barrier的时候开始做snapshot的。

Exactly once checkpoint是如何处理的

首先看这一段的代码

@Overridepublic void processBarrier(CheckpointBarrier barrier, InputChannelInfo channelInfo) throws IOException {long barrierId = barrier.getId();LOG.debug("{}: Received barrier from channel {} @ {}.", taskName, channelInfo, barrierId);if (currentCheckpointId > barrierId|| (currentCheckpointId == barrierId && !isCheckpointPending())) {if (!barrier.getCheckpointOptions().isUnalignedCheckpoint()) {inputs[channelInfo.getGateIdx()].resumeConsumption(channelInfo);}return;}checkNewCheckpoint(barrier);checkState(currentCheckpointId == barrierId);if (numBarriersReceived++ == 0) {if (getNumOpenChannels() == 1) {markAlignmentStartAndEnd(barrier.getTimestamp());} else {markAlignmentStart(barrier.getTimestamp());}}// we must mark alignment end before calling currentState.barrierReceived which might// trigger a checkpoint with unfinished future for alignment durationif (numBarriersReceived == numOpenChannels) {if (getNumOpenChannels() > 1) {markAlignmentEnd();}}try {currentState = currentState.barrierReceived(context, channelInfo, barrier);} catch (CheckpointException e) {abortInternal(barrier.getId(), e);} catch (Exception e) {ExceptionUtils.rethrowIOException(e);}if (numBarriersReceived == numOpenChannels) {numBarriersReceived = 0;lastCancelledOrCompletedCheckpointId = currentCheckpointId;LOG.debug("{}: Received all barriers for checkpoint {}.", taskName, currentCheckpointId);resetAlignmentTimer();allBarriersReceivedFuture.complete(null);}}

这里需要关注一下currentState, 在最开始我们看了他的构造函数AlternatingWaitingForFirstBarrier, 因此可以可以看这个方法具体是现实。

这里可以看到这里会block 住收到barrier的信道,如果barrier 都收齐了,之后会检查是不是unaligned的checkpoint, 如果不是可以直接做一次checkpoint。这个checkpoint和之前的流程是一致的。

这里的下一个分支是超时转化,比如设置为30s,前30s是做aligned checkpoint, 如果30s还没有完成,就会转化为unaligned checkpoint。 当然,你如果不想有超时时间,可以直接设置为0.

如果是unaligned checkpoint, 会将channel 里面的数据也写会到远端。

这个中间会有一些状态转化,每次barrier的到达都会触发不同的状态变化。其中我们看到对于uc来说,uc的第一个barrier到达了,就会触发一次global checkpoint。org.apache.flink.streaming.runtime.io.checkpointing.AlternatingWaitingForFirstBarrierUnaligned#barrierReceived

org.apache.flink.streaming.runtime.io.checkpointing.AlternatingCollectingBarriersUnaligned#barrierReceived

最后如果收到所有的barrier之后会finished checkpoint。状态恢复到原位。

总结一下:在exactly once的语义下,aligned checkpoint的做法是,收到一个barrier的时候会将对应的channel block住。当收到最后一个barrier的时候再做一次checkpoint。

unaligned的做法是,收到barrier的时候,第一步就会触发一次checkpoint, 之后会不断上传channel state, 当收到最后一个barrier则表示checkpoint结束。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/8465.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

FTTR(光猫)ITMS注册NCE纳管

ITMS注册 TR069交互过程&#xff1a; 1.1. TR069交互—主动连接机制 主动连接机制是指CPE主动发出请求连接事件(事件可以为&#xff1a; 0 BOOTSTRAP&#xff1b; 1 BOOT; PERIODIC等等)给ACS。在连接建立之后才能进行业务处理(通过调用RPC方法实现)。 备注&#xff1a;政企…

2024.5.8

聊天框完善 #include "mywidget.h" #include "ui_mywidget.h"MyWidget::MyWidget(QWidget *parent): QWidget(parent), ui(new Ui::MyWidget) {ui->setupUi(this);//设置窗口大小this->resize(400,560);//设置窗口图标和标题this->setWindowTit…

Android C++ 开发调试 LLDB 工具的使用

文章目录 调试环境准备基础命令Breakpoint CommandsWatchpoint CommandsExamining VariablesEvaluating ExpressionsExamining Thread StateExecutable and Shared Library Query Commands 参考&#xff1a; Android 中在进行 NDK 开发的时候&#xff0c;我们经常需要进行 C 代…

隐式3D形状表示:Occupancy Networks

OccNet 的关键思想是隐式地表示3D形状&#xff0c;而不是显式地表示。与直接编码形状几何信息不同&#xff0c;OccNet 将形状的表面建模为非线性分类器的决策边界。 隐式表示&#xff1a;Occupancy Networks 将 3D 形状表示为非线性分类器函数的决策边界 f θ : R 3 X → [ 0…

crmeb知识付费系统正式上线,分屏录制网课用什么软件?教程有啥?

现在很多人为了提升自己知识储备&#xff0c;都会选择线上课程来提升自己&#xff0c;因为线上课程不受时间、地点的限制&#xff0c;大家可以在家就学习&#xff0c;也有很多人想做自己的网络课程&#xff0c;那分屏录制网课用什么软件好&#xff1f; 目前市面上有很多可以录屏…

高斯赛德尔迭代程序

高斯赛德尔迭代非常常用&#xff0c;看到网上很多例子写的不够简洁&#xff0c;这里我写了一个&#xff0c;供参考 import numpy as npdef gauss_seidel(A,b,x1,eps1.e-6):n len(A)max_iter 200iters 0while abs(np.dot(A[0,:],x1) - b)[0] > eps and iters < max_…

2024年颠覆商业模式《本草生活》项目,巧妙三招营销引流裂变套路

2024年颠覆商业模式《本草生活》项目&#xff0c;巧妙三招营销引流裂变套路 文丨微三云营销总监胡佳东&#xff0c;点击上方“关注”&#xff0c;为你分享市场商业模式电商干货。 - 引言&#xff1a;现如今流量枯竭、降本增效、红利不再已是线上营销的常态&#xff0c;互联网…

静态照片怎么合成gif?详细介绍一个方法

我们在各大平台中都能看到各种样式的gif动图。Gif动图其实就是由一帧一帧的静态图片合成的动态效果的gif&#xff0c;想要制作gif动画可以通过使用在线图片合成&#xff08;https://www.gif5.net/&#xff09;工具-GIF5工具网&#xff0c;手机、pc均可操作&#xff0c;只需要上…

网络代理与网络安全:解析 SOCKS5、代理IP、HTTP 的关系与应用

在当今数字化时代&#xff0c;网络代理技术成为了保护隐私、绕过地理限制、加强网络安全的重要工具。本文将探讨 SOCKS5 代理、代理IP、HTTP 代理等关键概念&#xff0c;并着重讨论它们在网络安全方面的应用与挑战。 SOCKS5 代理 定义与特点&#xff1a;SOCKS5 是一种网络代理…

nestjs 全栈进阶--自定义装饰器

视频教程 20_nest中自定义装饰器_哔哩哔哩_bilibili nest new custom-decorator -p pnpm pnpm start:dev 在Nestjs 中我们使用了大量装饰器 decorator &#xff0c;所以Nestjs 也允许我们去自定义装饰器。 1. 自定义方法装饰器 nest g decorator aaa --flat 它生产的代码…

C++进阶——浅谈隐式转化

在代码里我们或多或少都会依赖c的隐式类型转换。 然而不幸的是隐式类型转换也是c的一大坑点&#xff0c;稍不注意很容易写出各种奇妙的bug。 因此我梳理一遍c的隐式类型转换 一、什么是隐式类型转换 概念&#xff1a;就是当你只有一个类型T1&#xff0c;但是当前表达式需要类…

详细分析McCabe环路复杂度(附例题)

目录 前言1. 基本知识2. 例题 前言 该知识点常出在408或者软考中&#xff0c;对此此文重点讲讲理论知识以及例题 对于例题平时看到也会更新 1. 基本知识 McCabe环路复杂度是一种用于衡量软件代码复杂性的指标&#xff0c;主要是通过计算代码中的控制流图中的环路数量来衡量…

机房——蓝桥杯十三届2022国赛大学B组真题

问题分析 这题用深搜广搜都能做&#xff0c;不过我更倾向于用广搜&#xff0c;因为广搜能更容易找到目标点。那么是采用结构体存储边还是采用二维数组存储临接矩阵呢&#xff1f;我们注意到n的取值范围为1e5,用二维数组哪怕是bool类型就需要至少1e10Byte的连续空间,这个空间太大…

【C++PCL】点云处理3D-Harris关键点提取

作者:迅卓科技 简介:本人从事过多项点云项目,并且负责的项目均已得到好评! 公众号:迅卓科技,一个可以让您可以学习点云的好地方 重点:每个模块都有参数如何调试的讲解,即调试某个参数对结果的影响是什么,大家有问题可以评论哈,如果文章有错误的地方,欢迎来指出错误的…

2022 年全国职业院校技能大赛高职组云计算赛项试卷(公有云)

#需要资源&#xff08;软件包及镜像&#xff09;或有问题的&#xff0c;可私聊博主&#xff01;&#xff01;&#xff01; #需要资源&#xff08;软件包及镜像&#xff09;或有问题的&#xff0c;可私聊博主&#xff01;&#xff01;&#xff01; #需要资源&#xff08;软件包…

5V升8.4V2A升压恒压WT3231

5V升8.4V2A升压恒压WT3231 WT3231 是一种高性能直流-直流&#xff08;DC-DC&#xff09;转换器&#xff0c;集成了能够承受10A电流和26mΩ低导通电阻的功率MOSFET。该转换器能提供高达12V的稳定输出电压&#xff0c;并具有固定600KHz开关频率&#xff0c;使得小型外部电感和电…

解决github无法克隆私有仓库,Repository not found问题(2024最新)

一、背景 这个问题出现&#xff0c;是你用了其他主机设备&#xff0c;需要重新clone私有库时&#xff0c;发现一直报找不到仓库&#xff0c;如下报错&#xff1a; remote: Repository not found.二、解决方法 &#xff08;1&#xff09;账号密码方式&#xff08;已不支持&am…

构建自己的docker镜像node.js

学习资源&#xff1a; 构建自己的 Docker 镜像_哔哩哔哩_bilibili 针对其中的一些比较困难的点写篇文章。 以下是对app.js的注释&#xff1a; // 使用 Koa 框架搭建 Node.js 应用的示例代码// 这两行代码引入了 koa 模块&#xff0c;并创建了一个新的 Koa 应用实例&#xf…

C++之QT文本处理QDir、QFileDialog、QStringList、QFile

一、相应的头文件 #include <QFileDialog> #include <QDir> #include <QStringList> 二、简介 1.QFileDialog 实际效果如下&#xff1a;比如需要选择打开的文件夹或者文件名&#xff0c;通过调用资源管理器的方式进行可视化操作。 代码示例为&#xff1a…

gitlab集群高可用架构拆分部署

目录 前言 负载均衡器准备 外部负载均衡器 内部负载均衡器 (可选)Consul服务 Postgresql拆分 1.准备postgresql集群 手动安装postgresql插件 2./etc/gitlab/gitlab.rb配置 3.生效配置文件 Redis拆分 1./etc/gitlab/gitlab.rb配置 2.生效配置文件 Gitaly拆分 1.…