Flink SQL 基于Update流出现空值无法过滤问题

问题背景

  • 问题描述
基于Flink-CDC ,Flink SQL的实时计算作业在运行一段时间后,突然发现插入数据库的计算结果发生部分主键属性发生失败,导致后续计算结果无法插入,
超过失败次数失败的情况
  • 问题报错
	Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO dm_hljy.dws_table_name (op_date, school_year, campus_name, school_name, depart_name, total_opfare, ids, update_time) VALUES ('2024-03-11 00:00:00+08', '2023', 'xxxx', 'xxxx学校', 'xxxx小学部', '203333300000', '57', '2024-03-21 09:31:08.47+08') ON DUPLICATE KEY UPDATE school_year=VALUES(school_year), total_opfare=VALUES(total_opfare), ids=VALUES(ids), update_time=VALUES(update_time) was aborted: ERROR: dn_6007_6008: null value in column "depart_name" violates not-null constraint  Call getNextException to see other errors in the batch.at com.huawei.gauss200.jdbc.jdbc.BatchResultHandler.handleCompletion(BatchResultHandler.java:171) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.executeBatch(QueryExecutorImpl.java:586) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.jdbc.PgStatement.executeBatch(PgStatement.java:883) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1580) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:266) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:236) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:159) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_332]at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[?:1.8.0_332]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_332]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[?:1.8.0_332]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_332]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_332]... 1 moreCaused by: com.huawei.gauss200.jdbc.util.PSQLException: ERROR: dn_6007_6008: null value in column "depart_name" violates not-null constraintat com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2856) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2587) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.executeBatch(QueryExecutorImpl.java:575) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.jdbc.PgStatement.executeBatch(PgStatement.java:883) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at com.huawei.gauss200.jdbc.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1580) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:266) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:236) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:159) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_332]at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[?:1.8.0_332]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_332]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[?:1.8.0_332]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_332]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_332

在这里插入图片描述

定位

定位思路

1.方向一:怀疑数据库插入存在数据处理时,造成数据处理出现空值的情况,即数据本身不为空,但是数据插入却出现了空
2.方向二:Flink-SQL在消费kafka数据时存在了空值,故加工的数据计算结果存在空值

定位过程

  • 因插入数据库定位比较麻烦,且数据库已经设置该字段为主属性,故出现插入时处理为空值的概率较小。故先从较为简单的Flink SQL查询数据
  • 定位方法一,查询该字段为空的记录,待作业执行完成后,未查询到空值对应记录
 select  select * from table_name where depart_name is null or depart_name = '' or char_length(depart_name) = 0;
  • 因考虑到使用Flink-CDC进行变更数据捕获,故对应的update流存在-U,+U,-D,+I记录,因此随着插入记录存在空值被记录进去的情况,故采用view的方式,先将宽表的加工、关联方式创建为view,然后进行空值的过滤。实施如下
create view view_prd as 
select a.* ,b.*  from a join b on a.id = b.idselect * from view_prd where depart_name is null or depart_name = '' or char_length(depart_name) = 0;
  • 通过查询结果,发现存在最后一条记录存在空值的原因,往源头定位,发现该字段之前为空,后面进行更新填充到值出现-U记录,导致数据插入持续失败
    在这里插入图片描述

原因

  • 因为flink-SQL消费的数据时kafka topic,flink以upsert-kafka形式的connector进行写入,故存在changelog 流中数据更新存在-U,+U的记录(按照Key进行区分唯一条记录),value 为空(-U)的记录kafka也,导致出现空值,
    在这里插入图片描述

解决

通过在DWS宽表创建一层View(如上),在写入DWS宽表的kafka topic之前,现将该字段空值过滤,即可排除空值涉及记录被纳入结果指标计算的范围中

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/782628.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

智慧公厕:让公共厕所变得更智能、更卫生、更舒适的解决方案

近年来,随着城市发展的不断壮大,公共设施的建设也越来越受到重视。而公共厕所作为城市基础设施的一部分,是城市文明程度的重要体现。然而,传统的公共厕所在使用、运行、管理、养护等方面存在诸多问题,严重影响了市民的…

特征选择集大成的包-arfs(python)

特征选择集大成的包-arfs(python) 一、介绍 arfs介绍文档https://arfs.readthedocs.io/en/latest/Introduction.html 英文好的朋友可以阅读作者写的介绍: All relevant feature selection means trying to find all features carrying info…

YOLOv5改进系列:升级版ResNet的新主干网络DenseNet

一、论文理论 论文地址:Densely Connected Convolutional Networks 1.理论思想 DenseNet最大化前后层信息交流,通过建立前面所有层与后面层的密集连接,实现了特征在通道维度上的复用,不但减缓了梯度消失的现象,也使其…

【二分图】【二分图最大匹配】LCP 04. 覆盖

作者推荐 视频算法专题 本文涉及知识点 二分图 二分图最大匹配 LeetCode LCP 04. 覆盖 你有一块棋盘,棋盘上有一些格子已经坏掉了。你还有无穷块大小为1 * 2的多米诺骨牌,你想把这些骨牌不重叠地覆盖在完好的格子上,请找出你最多能在棋盘…

vue 透传 Attributes(二)

禁用 Attributes 继承​ 如果你不想要一个组件自动地继承 attribute&#xff0c;你可以在组件选项中设置 inheritAttrs: false。 从 3.3 开始你也可以直接在 <script setup> 中使用 defineOptions&#xff1a; <script setup> defineOptions({inheritAttrs: fal…

2024年京东云主机租用价格_京东云服务器优惠价格表

2024年京东云服务器优惠价格表&#xff0c;轻量云主机优惠价格5.8元1个月、轻量云主机2C2G3M价格50元一年、196元三年&#xff0c;2C4G5M轻量云主机165元一年&#xff0c;4核8G5M云主机880元一年&#xff0c;游戏联机服务器4C16G配置26元1个月、4C32G价格65元1个月、8核32G费用…

新书速递——《可解释AI实战(PyTorch版)》

本书旨在帮助你实施最新的可解释AI技术&#xff0c;以构建公平且可解释的AI系统。可解释AI是当今AI研究中的热门话题&#xff0c;但只有少数资源和指南涵盖了所有重要技术&#xff0c;这些技术对实践者来说非常有价值。本书旨在填补这一空白。 本书读者对象 本书既适合那些有兴…

当代深度学习模型介绍--长短期记忆网络(LSTMs)

AI大模型学习 方向一&#xff1a;AI大模型学习的理论基础 模型和应用的多样化&#xff1a;随着研究的深入&#xff0c;深度学习领域出现了多种创新的模型架构&#xff1a; 卷积神经网络&#xff08;CNNs&#xff09;专门针对图像处理任务进行了优化&#xff0c;通过模拟生物视…

实验4(数据结构课程实验)

题目&#xff1a; 设计一个算法&#xff0c;将不带头节点的单链表所有结点的连接方向“原地”逆转&#xff0c;即要求利用原表的存储空间。 代码&#xff1a; /* 实验4 设计一个算法&#xff0c;将不带头节点的单链表所有结点的连接方向“原地”逆转&#xff0c; 即要求利用…

GIt的原理和使用(五):模拟多人协作的两种情况

目录 多人协作 多人协作一 准备工作 协作开发 多人协作二 准备工作 额外场景 申请单合并分支 更推荐写法 远程分支删除后&#xff0c;本地git branch -a依然能看到的解决办法 多人协作 多人协作一 目标&#xff1a;在远程master分支下的file.txt文件新增代码“aaa”…

鸿蒙OS开发实例:【窥探网络请求】

HarmonyOS 平台中使用网络请求&#xff0c;需要引入 "ohos.net.http", 并且需要在 module.json5 文件中申请网络权限, 即 “ohos.permission.INTERNET” 本篇文章将尝试使用 ohos.net.http 来实现网络请求 场景设定 WeiBo UniDemo HuaWei : 请求顺序WeiBo1 UniDem…

华为数通 HCIP-Datacom H12-831 题库补充(3/27)

2024年 HCIP-Datacom&#xff08;H12-831&#xff09;最新题库&#xff0c;完整题库请扫描上方二维码&#xff0c;持续更新。 如图所示&#xff0c;关于R4路由器通过IS-IS计算出来的IPv6路由&#xff0c;哪一选项的描述是错误的&#xff1f; A&#xff1a;R4通过IS—IS只学习到…

基于SpringBoot的“校园台球厅人员与设备管理系统”的设计与实现(源码+数据库+文档+PPT)

基于SpringBoot的“校园台球厅人员与设备管理系统”的设计与实现&#xff08;源码数据库文档PPT) 开发语言&#xff1a;Java 数据库&#xff1a;MySQL 技术&#xff1a;SpringBoot 工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 系统功能结构图 系统首页界面图…

【OpenEuler】Docker部署Oracle和SQL Server

背景 国产化的浪潮&#xff0c;也打到了我的头上 安装步骤 Oracle docker run -d -p 1521:1521 -p 8080:8080 --name oracle_11g -e ORACLE_HOME/home/oracle/app/oracle/product/11.2.0/dbhome_2 -e ORACLE_SIDhelowin registry.cn-hangzhou.aliyuncs.com/helowin/oracle_…

达梦数据库的会话数限制

达梦可以通过初ini参数MAX_SESSIONS在全局设置数据库的最大连接数&#xff0c;也可以在创建用户时指定SESSION_PER_USER选项针对特定用户进行限制。 无论哪种限定&#xff0c;到达规定的会话数时客户端都将报如下错误&#xff1a; [-6001]:Exceed maximum session limitation.…

PCL拟合并绘制平面(二)

使用RANSAC拟合点云平面 1、C实现2、效果图 普通的点云平面拟合方式在一般情况下可以得到较好的平面拟合效果&#xff0c;但是容易出现平面拟合错误或是拟合的平面不是最优的情况。此时就需要根据自己的实际使用情况&#xff0c;调整平面拟合的迭代次数以及收敛条件。 使用RAN…

智慧工地整体解决方案(1)

背景 建筑行业是我国国民经济的重要物质生产部门和支柱产业之一,在改善居住条件、完善基础设施、吸纳劳动力就业、推动经济增长等方面发挥着重要作用。与此同时,建筑业也是一个安全事故多发的高危行业。近年来,在国家、各级地方政府主管部门和行业主体的高度关注和共同努力下…

亚马逊测评新策略:解决底层环境防关联,提升下单成功率

对于做测评的环境系统&#xff0c;确保稳定性和成功率是非常重要的。市面上有各种环境方案&#xff0c;如虚拟机、模拟机、gcs、云手机、VPS等。然而&#xff0c;这些方案不仅成本高&#xff0c;而且成功率很低。因此&#xff0c;一个好的环境系统是成功的基础。 亚马逊平台的…

Python 从0开始 一步步基于Django创建项目(8)使用表单编辑既有条目

与《Python 从0开始 一步步基于Django创建项目&#xff08;6&#xff09;》中的表单应用不同。 前者&#xff0c;是使用表单&#xff0c;提交新数据&#xff0c;新增内容。 本文&#xff0c;是使用表单&#xff0c;对既有数据&#xff0c;进行修改。 因为是对既有数据进行修…

vue v-model(二)

v-model 的参数​ 组件上的 v-model 也可以接受一个参数&#xff1a; <MyComponent v-model:title"bookTitle" /> 在子组件中&#xff0c;我们可以通过将字符串作为第一个参数传递给 defineModel() 来支持相应的参数&#xff1a; <!-- MyComponent.vue …