如何处理 Flink 作业中的数据倾斜问题?

分析&回答

什么是数据倾斜?

由于数据分布不均匀,造成数据大量的集中到一点,造成数据热点。

举例:一个 Flink 作业包含 200 个 Task 节点,其中有 199 个节点可以在很短的时间内完成计算。但是有一个节点执行时间远超其他结果,并且随着数据量的持续增加,导致该计算节点挂掉,从而整个任务失败重启。我们可以在 Flink 的管理界面中看到任务的某一个 Task 数据量远超其他节点。

大数据框架的特性

  • 不怕数据大,怕数据倾斜。
  • jobs数比较多的作业运行效率相对比较低,如子查询比较多。
  • sum,count,max,min等聚集函数,不会有数据倾斜问题

容易数据倾斜情况

  • group by
  • count(distinct ),在数据量大的情况下,容易数据倾斜,因为count(distinct)是按group by 字段分组,按distinct字段排序。
  • 小表关联超大表

优化常用的手段

Flink 任务出现数据倾斜的直观表现是任务节点频繁出现反压,但是增加并行度后并不能解决问题;部分节点出现 OOM 异常,是因为大量的数据集中在某个节点上,导致该节点内存被爆,任务失败重启。

产生数据倾斜的原因主要有 2 个方面:

  • 业务上有严重的数据热点,比如滴滴打车的订单数据中北京、上海等几个城市的订单量远远超过其他地区;
  • 技术上大量使用了 KeyBy、GroupBy 等操作,错误的使用了分组 Key,人为产生数据热点。

因此解决问题的思路也很清晰:

  1. 业务上要尽量避免热点 key 的设计,例如我们可以把北京、上海等热点城市分成不同的区域,并进行单独处理;
  2. 技术上出现热点时,要调整方案打散原来的 key,避免直接聚合;此外 Flink 还提供了大量的功能可以避免数据倾斜。

解决数据倾斜问题

  • 减少job数(合并MapReduce,用Multi-group by)
  • 设置合理的mapreduce的task数,能有效提升性能。
  • 数据量较大的情况下,慎用count(distinct)。
  • 对小文件进行合并,针对文件数据源。

优化案例

  • join原则 将条目少的表/子查询放在 Join的左边。 原因是在 Join 操作的 Reduce 阶段,位于 Join左边的表的内容会被加载进内存,将条目少的表放在左边,可以有效减少发生内存溢出的几率。

当一个小表关联一个超大表时,容易发生数据倾斜,可以用MapJoin把小表全部加载到内存在map端进行join,避免reducer处理。

如:SELECT /*+ MAPJOIN(user)*/  l.session_id, u.username from user u join page_views lon (u. id=l.user_id) ;
复制代码
  • 笛卡尔积 当Hive设定为严格模式(hive.mapred.mode=strict)时,不允许在HQL语句中出现笛卡尔积。

当无法躲避笛卡尔积时,采用MapJoin,会在Map端完成Join操作,将Join操作的一个或多个表完全读入内存。

MapJoin的用法是在查询/子查询的SELECT关键字后面添加/*+MAPJOIN(tablelist) */提示优化器转化为MapJoin 。

其中tablelist可以是一个表,或以逗号连接的表的列表。tablelist中的表将会读入内存,应该将小表写在这里

  • 控制Map数

同时可执行的map数是有限的。

通常情况下,作业会通过input的目录产生一个或者多个map任务

主要的决定因素有: input的文件总个数,input的文件大小。

举例:a) 假设input目录下有1个文件a,大小为780M,那么hadoop会将该文件a分隔成7个块(block为128M,6个128m的块和1个12m的块),从而产生7个map数b) 假设input目录下有3个文件a,b,c,大小分别为10m,20m,130m,那么hadoop会分隔成4个块(10m,20m,128m,2m),从而产生4个map数
复制代码

两种方式控制Map数:即减少map数和增加map数

  • 减少map数可以通过合并小文件来实现,这点是对文件数据源来讲。
  • 增加map数的可以通过控制上一个job的reduer数来控制

反思&扩展

Flink 消费 Kafka 上下游并行度不一致导致的数据倾斜

通常我们在使用 Flink 处理实时业务时,上游一般都是消息系统,Kafka 是使用最广泛的大数据消息系统。当使用 Flink 消费 Kafka 数据时,也会出现数据倾斜。

需要十分注意的是,我们 Flink 消费 Kafka 的数据时,是推荐上下游并行度保持一致,即 Kafka 的分区数等于 Flink Consumer 的并行度。

但是会有一种情况,为了加快数据的处理速度,来设置 Flink 消费者的并行度大于 Kafka 的分区数。如果你不做任何的设置则会导致部分 Flink Consumer 线程永远消费不到数据。

这时候你需要设置 Flink 的 Redistributing,也就是数据重分配。

GroupBy + Aggregation 分组聚合热点问题

业务上通过 GroupBy 进行分组,然后紧跟一个 SUM、COUNT 等聚合操作是非常常见的。我们都知道 GroupBy 函数会根据 Key 进行分组,完全依赖 Key 的设计,如果 Key 出现热点,那么会导致巨大的 shuffle,相同 key 的数据会被发往同一个处理节点;如果某个 key 的数据量过大则会直接导致该节点成为计算瓶颈,引起反压。

两阶段聚合解决 KeyBy 热点

KeyBy 是我们经常使用的分组聚合函数之一。在实际的业务中经常会碰到这样的场景:双十一按照下单用户所在的省聚合求订单量最高的前 10 个省,或者按照用户的手机类型聚合求访问量最高的设备类型等。

喵呜面试助手:一站式解决面试问题,你可以搜索微信小程序 [喵呜面试助手] 或关注 [喵呜刷题] -> 面试助手 免费刷题。如有好的面试知识或技巧期待您的共享!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/65872.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

设计模式之原型模式

文章目录 概述克隆羊问题传统方式解决克隆羊问题传统的方式的优缺点原型模式原理结构图-uml 类图原理结构图说明 原型模式解决克隆羊问题的应用实例原型模式在Spring框架中的应用深入讨论-浅拷贝和深拷贝浅拷贝的介绍深拷贝基本介绍深拷贝应用实例注意事项 概述 原型模式&…

Linux - Docker 安装使用 常用命令 教程

Docker 官方文档地址: Get Started | Docker 中文参考手册: https://docker_practice.gitee.io/zh-cn/ 1.什么是 Docker 1.1 官方定义 最新官网首页 # 1.官方介绍 - We have a complete container solution for you - no matter who you are and where you are on your contain…

Linux学习之NAS服务器搭建

NAS是Network Attached Storage的缩写,也就是网络附属存储。可以使用自己已经不怎么使用的笔记本搭建一台NAS服务器。 fdisk -l可以看一下各个磁盘的状态。 可以看到有sda、sdb、sdc和sdd等四块硬盘。 lvs、vgs和pvs结合起来看,sdb和sdc没有被使用。 …

Mac“其他文件”存放着什么?“其他文件”的清理方法

很多Mac用户在清理磁盘空间时发现,内存占用比例比较大的除了有iCloud云盘、应用程序、影片、音频、照片等项目之外,还有一个“其他文件”的项目磁盘占用比也非常大,想要清理却无从下手。那么Mac“其他文件”里存放的是什么文件?我…

文本标注技术方案(NLP标注工具)

Doccano doccano 是一个面向人类的开源文本注释工具。它为文本分类、序列标记和序列到序列任务提供注释功能。您可以创建用于情感分析、命名实体识别、文本摘要等的标记数据。只需创建一个项目,上传数据,然后开始注释。您可以在数小时内构建数据集。 支持…

并发下的Map常见面试题

HashMap 和 HashTable 有什么区别?java中的另一个线程安全的与HashMap极其类似的类是什么?同样是线程安全,它与HashTable在线程同步上有什么不同?HashMap 与 ConcurrentHashMap的区别?为什么 ConcurrentHashMap 比 Has…

开源且强大的网络嗅探分析工具——Wireshark

Wireshark是一款强大的开源网络协议分析工具,旨在帮助用户深入了解网络通信的细节。通过捕获、解析和展示网络数据包,Wireshark能够帮助工程师诊断问题、优化性能,以及解决各种网络难题。无论是深入分析还是快速调试,Wireshark都是…

SpringCloud学习笔记(十三)_Zipkin使用SpringCloud Stream以及Elasticsearch

在前面的文章中,我们已经成功的使用Zipkin收集了项目的调用链日志。但是呢,由于我们收集链路信息时采用的是http请求方式收集的,而且链路信息没有进行保存,ZipkinServer一旦重启后就会所有信息都会消失了。基于性能的考虑&#xf…

封装(个人学习笔记黑马学习)

1、格式 #include <iostream> using namespace std;const double PI 3.14;//设计一个圆类&#xff0c;求圆的周长 class Circle {//访问权限//公共权限 public://属性//半径int m_r;//行为//获取圆的周长double calculateZC() {return 2 * PI * m_r;} };int main() {//通…

QT day1登录界面设计

要设计如下图片&#xff1a; 代码如下&#xff1a; main.cpp widget.h widget.cpp 运行效果&#xff1a; 2&#xff0c;思维导图

DEAP库文档教程四——操作与算法

本节将将在初始化的基础上&#xff0c;进一步说明操作与算法。 1、Using the Toolbox toolbox(base.Toolbox())是包含所有进化操作的工具箱&#xff0c;从目标初始化到适应度计算。它允许在每个算法中实现简单的构造。toolbox基本上由两种方法组成&#xff0c;register()和un…

Linux和Windows下防火墙、端口和进程相关命令

&#x1f680;1 防火墙 1.1 firewall systemctl stop firewalld.service # 关闭防火墙 systemctl start firewalld.service # 开启防火墙 systemctl restart firewalld.service # 重启防火墙 systemctl status firewalld.service # 防火墙状态 firewall-cmd --reload # 重…

一些自己整理的工具实用参数

工具实用参数 sqlmap -u: 指定需要测试的目标URL&#xff08;格式&#xff1a;http://www.example.com/test.php?id1&#xff09; --cookie: 设置需要发送的 HTTP Cookie&#xff0c;例如&#xff1a;--cookie"sid123456;PHPSESSID654321" --threads&#xff1a;…

Shell脚本练习——系统应用相关

显示系统信息 [rootwenzi data]#cat systemInfo.sh #/bin/bash RED"\E[1;31m" GREEN"\E[1;32m" END"\E[0m" echo -e "$GREEN----------------------Host systeminfo--------------------$END" echo -e "HOSTNAME: $REDho…

Glide的使用及源码分析

前言 依赖 implementation com.github.bumptech.glide:glide:4.16.0 github: GitHub - bumptech/glide: An image loading and caching library for Android focused on smooth scrolling 基本使用 //加载url Glide.with(this) .load(url) .placeholder(R.drawable.placehol…

Springboot - 4.ApplicationContext

✍1. ApplicationContext&#xff1a; 当在Spring Boot中使用ApplicationContext时&#xff0c;您可以遵循以下详细示例来了解每个概念和用法&#xff0c;同时考虑了Spring Boot的自动配置和便利性&#xff1a; &#x1f3b7;1. ApplicationContext接口&#xff1a; Applica…

读word模板批量生成制式文件

文章目录 1、Maven依赖2、.docx或.doc格式的word模板准备3、读word模板&#xff0c;批量替换代码域&#xff0c;生成文件&#xff0c;demo4、结果展示 1、Maven依赖 <dependency><groupId>fr.opensagres.xdocreport</groupId><artifactId>fr.opensagre…

Cargo 静态编译

git clone --recursive https://github.com/kornelski/pngquant.git vi ~/.cargo/config.toml[http] debug true proxy "127.0.0.1:1080" 1.apt 更新 2.apt install cargo 3.修改源码的Cargo.toml [source.crates-io] #registry "https://code.aliyun.com…

一键替换工程文件和场景中的UI对象字体

具体流程&#xff1a; 找到工程中使用到的所有字体找到工程和场景中包含Text的所有对象展示要替换的字体名字让用户选择通过用户选择的字体&#xff0c;展示响应的物体对象一键替换 通过AssetDatabase.FindAssets找到工程中包含的所有字体&#xff1a; private List<strin…

nnUNet v2数据准备及格式转换 (二)

如果你曾经使用过nnUNet V1&#xff0c;那你一定明白数据集的命名是有严格要求的&#xff0c;必须按照特定的格式来进行命名才能正常使用。 这一节的学习需要有数据&#xff0c;如果你有自己的数据&#xff0c;可以拿自己的数据来实验&#xff0c;如果没有&#xff0c;可以用十…