实战干货|Spark 在袋鼠云数栈的深度探索与实践

Spark 是一个快速、通用、可扩展的大数据计算引擎,具有高性能、易用、容错、可以与 Hadoop 生态无缝集成、社区活跃度高等优点。在实际使用中,具有广泛的应用场景:

· 数据清洗和预处理:在大数据分析场景下,数据通常需要进行清洗和预处理操作以确保数据质量和一致性,Spark 提供了丰富的 API,可以对数据进行清洗、过滤、转换等操作

· 批处理分析:Spark 适用于各种应用场景下的批处理任务,包括统计分析、数据挖掘、特征提取等,用户可以利用 Spark 强大的 API 和内置库进行复杂的数据处理和分析,从而挖掘数据中的内在价值

· 交互式查询:Spark 提供了支持 SQL 查询的 Spark SQL 模块,用户可以使用标准的 SQL 语句进行交互式查询和大规模数据分析

Spark 在袋鼠云的使用

在袋鼠云数栈离线开发平台,我们提供了三种使用 Spark 的方式:

● 创建 Spark SQL 任务

用户可以直接通过编写 SQL 的方式实现自己的业务逻辑。这种方式是目前数栈离线平台使用 Spark 最广泛的方式,也是最为推荐的一种方式。

● 创建 Spark Jar 任务

用户需要在 IDEA 上使用 Scala 或者 Java 语言实现业务逻辑,然后对该项目进行编译打包,并将得到的 Jar 包上传到离线平台,随后在创建 Spark Jar 任务的时候引用这个 Jar 包,最后将任务提交到调度运行即可。

对于使用 SQL 难以实现或表达的需求,或者用户有其他更深层次的需求,Spark Jar 任务无疑给用户提供了一种更为灵活的使用 Spark 的方式。

● 创建 PySpark 任务

用户可以直接编写对应的 Python 代码。在我们的客户群体中,有相当一部分客户,他们除了 SQL 之外,Python 可能是他们的主力语言。特别是针对有一定数据分析基础、算法基础的用户,他们往往会对处理好的数据进行更深层的分析,此时 PySpark 任务自然是他们的不二之选。

Spark 在袋鼠云数栈离线开发平台发挥着重要的作用,因此,我们内部对 Spark 做了也不少的优化,使客户在使用 Spark 提交任务时更加方便。我们还基于 Spark 做了一些工具来增强整个数栈离线开发平台的功能。

除此之外,在数据湖场景下,Spark 也发挥着相当重要的作用。在袋鼠云的湖仓一体模块中,已经支持了 Iceberg 和 Hudi 两大数据湖,用户可以使用 Spark 对湖表进行读写,湖表的治理底层也是通过使用 Spark 调用不同的存储过程实现。

下文就将从引擎侧和 Spark 本身两个方面来阐述袋鼠云内部所做的优化。

引擎端优化

袋鼠云内部引擎端的功能主要是用于任务提交、任务状态获取、任务日志获取、停止任务、语法校验等。每个功能点我们都做了不同程度的优化,下文通过两个例子进行简单介绍。

Spark on Yarn 提交速度提升

随着引擎端 Spark 插件上新功能的不断开发和完善,引擎端提交 Spark 任务所需的时间也在相应的增加,因此需要对提交 Spark 任务相关的代码进行优化,以缩短 Spark 任务提交的时长,提升用户体验。

为此,我们做了以下工作,对于一些公用的配置文件,如 core-site.xml、yarn-site.xml、keytab 文件、spark-sql-application.jar 等,原来每次提交任务都需要预先从服务器下载并提交这些配置文件。现在经过优化后,上述文件仅仅需要在客户端 SparkYarnClient 初始化的时候下载一次,然后上传到指定的 HDFS 路径,后续提交 Spark 任务只需要通过参数的方式指定到对应的 HDFS 路径即可。通过这种方式大大缩短了每次 Spark 任务的提交时间。

在新版本的数栈中,对于临时查询,我们还会根据自定义的规则判断待执行 SQL 的复杂度,将复杂度不高的 SQL 发送到引擎端启动的 SparkSQLEngine 运行,以加快运行速度。这个内部的 SparkSQLEngine 在以前仅仅用于语法校验,现在也承担了一部分 SQL 执行的功能,并且 SparkSQLEngine 还可以根据运行的整体情况,动态扩缩资源,实现资源的有效利用。

语法校验

在较老的数栈版本,对于 SQL 进行语法校验,引擎端会先把 SQL 发送到 Spark Thrift Server。这个 Spark Thrift Server 是以 local 模式部署,不仅仅需要用于语法校验,其他平台上所有元数据的获取都是通过发送 SQL 到这个 Spark Thrift Server 执行来获取。这种方式弊端较大,为此我们做了一些优化。在 Engine 端以 local 模式启动了一个 Spark 任务,在进行语法校验的时候不再将 SQL 发送到 Spark Thrift Server,而是内部维护了一个 SparkSession,直接对 SQL 进行语法校验。

这种方式虽然可以不需要再跟外部的 Spark Thrift server 强关联,但是会给调度组件带来一定的压力,在实现的过程中 Engine-Plugins 的整体复杂度也增大了不少。

为了优化以上问题,我们做了更进一步的优化,调度组件在启动的时候,提交了一个 Spark 任务 SparkSQLEngine 到 Yarn 上。可以理解为是一个远程的运行在 Yarn 上的 Spark Thrift Server,引擎端时刻监控这个 SparkSQLEngine 的健康状态。这样,每次执行语法校验的时候,引擎端将 SQL 通过 JDBC 的形式发送给 SparkSQLEngine 进行语法校验。

通过上述的优化,使得离线开发平台与 Spark Thrift Server 解耦合,EasyManager 不需要额外部署 Spark Thrift Server,使部署更轻量化。调度侧也不用维护一个 local 模式的 Spark 常驻进程。也为离线开发平台上 Spark SQL 任务交互式查询增强做铺垫。

离线开发平台与 EasyManager 部署的 Spark Thrift Server 解耦合后会有以下好处:

· 能够真正意义上的实现 Spark 多集群多版本共存

· EasyManager 标准部署可以去除 Spark Thrift Server,为一线运维减负

· Spark SQL 语法校验变得更轻量,不用缓存 SparkContext,减少 Engine 的资源占用

Spark 功能优化

随着业务的发展深入,我们发现开源的 Spark 在一些场景并没有对应的功能实现。因此我们在开源 Spark 的基础上开发了更多新的插件,以支持数栈更多的功能应用。

任务诊断

首先,我们对 Spark 的 metric sink 做了增强。Spark 内部提供了各种 Sink,除了 ConsoleSink 之外,还有 CSVSink、JmxSink、MetricsServlet、GraphiteSink、Slf4jSink、StatsdSink 等。在 Spark3.0 之后还新增了 PrometheusServlet,但这些还不能满足我们的需求。

在开发任务诊断功能的时候,我们需要通过把 Spark 内部的指标统一推送到 PushGateway,由 Prometheus Server 周期性的从 PushGateway 中拉取指标,最后通过调用 Prometheus 提供的查询接口可以近实时地查询到 Spark 内部的指标。

file

但是 Spark 并没有实现将内部指标 sink 到 PushGateway。因此我们新增了 spark-prometheus-sink 插件,并且自定义了 PrometheusPushGatewaySink 用于将 Spark 内部的指标 push 到 PushGateway。

file

除此之外,我们还自定义了一个新的指标用来描述 Spark SQL 临时查询展示任务执行进度。具体步骤如下:

· 通过自定义 JobProgressSource 来新增用于描述离线任务进度的指标,将该指标注册到 Spark 内部管理系统中的指标管理系统中

· 自定义 JobProgressListener,并将 JobProgressListener 注册到 Spark 内部管理系统中的 ListenerBus。其中,JobProgressListener 的 onJobStart 方法的逻辑是计算当前 Job 下所有的 Task 数量;onTaskEnd 方法的逻辑是在每个 Task 完成后计算并更新当前离线任务进度;onJobEnd 方法的逻辑是在每个 Job 完成后计算并更新当前离线任务进度

对接商业版 Hadoop 集群

随着袋鼠云客户越来越多,客户的环境也是各不相同。有的客户使用的是开源版本的 Hadoop 集群,也有相当一部分客户使用的是 HDP、CDH、CDP、TDH 等。我们在对接这些客户的集群的时候,开发侧往往需要进行新的适配,运维侧每次部署升级的时候也需要配置额外的参数或者有其他额外的操作。

以 HDP 为例,在对接 HDP 的时候,我们使用的 Spark 是 HDP 自带的 Spark2.3,并且我们还需要在运维侧新增一些参数,并将 HDP 自带的 Spark 的所有 Jar包 移动到指定目录。这些操作其实会给运维带来一定的困惑和麻烦,不同类型的集群,运维需要维护不同的运维文档,部署的过程也比较容易出错。并且我们其实对 Spark 的源码做了功能增强和 bug 的修复,如果使用的是 HDP 自带的 Spark,那么就享受不到我们内部维护的 Spark 带来的所有好处。

为了解决上面这些问题,我们内部的 Spark 对现有市场上已有的、常见的发行商都做了适配。换句话来说,我们内部的 Spark 可以在所有不同的 Hadoop 集群上运行。这样,无论对接哪一种类型的 Hadoop 集群,运维只需要部署同一个 Spark 即可,这大大减轻了运维部署的压力。更重要的是,客户可以直接使用我们内部的 Spark 稳定版本,享受到更多的新特性和更大的性能提升。

Spark3.2 新特性-AQE

较老的数栈版本中,默认的 Spark 版本是 2.1.3,后来我们将 Spark 的版本升级到 2.4.8,从数栈6.0开始,Spark3.2 也可以使用了。这里着重介绍一下 AQE,这也是 Spark3.x 中最重要的新特性。

AQE 概述

Spark3.2 之前,AQE 默认是关闭的,需要通过将 spark.sql.adaptive.enabled 设置为 true,才能开启 AQE。Spark3.2 之后,AQE 默认是开启的,任务在运行过程中只要满足 AQE 的触发条件,即可享受 AQE 带来的优化。

需要注意的是,AQE 的优化只会发生在 shuffle 阶段,如果 SQL 在运行过程中并没有涉及到 shuffle 操作,那么即使 spark.sql.adaptive.enabled 的值为 true,AQE 也不会发挥作用。更准确来说,只有物理执行计划包含 exchange 节点或者包含子查询,AQE 才会生效。

AQE 在运行期间,会收集 shuffle map 阶段所生成的中间文件的信息,并将这些信息进行统计,结合已有的规则动态的调整尚未执行的 Optimized Logical Plan 和 Spark Plan,从而对原来的 SQL 语句进行运行时优化。

file

从 Spark 源码来看,AQE 涉及到以下4个优化规则:

file

我们知道,RBO 是根据一系列的规则(rule)来对 SQL 进行优化,包括谓词下推、列剪枝、常量替换等。这些静态规则本身已经内置在 Spark 中,Spark 在执行 SQL 的过程中,这些 rule 会一一作用到 SQL 中。

AQE 的优势

CBO 这个特性是 Spark2.2 之后才有的,相比于 RBO,CBO 会结合表的统计信息,并根据这些统计信息和代价模型(Cost Model)选择出较为优化的执行计划。

但是,CBO 仅仅支持注册到 Hive Metastore 的表。对于存储在分布式文件系统的 parquet、orc 等文件,CBO 是不支持的。并且,如果 Hive 表缺少元数据信息,CBO 收集统计信息的时候就会收集不到,这可能会导致 CBO 失效。

CBO 的另外一个劣势在于 CBO 在优化之前需要先执行 ANALYZE TABLE COMPUTE STATISTICS 来收集统计信息。该语句在执行过程中如果碰到大表则会较为耗时,收集效率较低。

无论是 CBO 还是 RBO,它们都属于静态优化。在物理执行计划提交后,如果任务在运行过程中,数据量、数据分布情况发生变化,CBO 也不会对已有的物理执行计划进行优化。

与 CBO、RBO 不同的是,AQE 在运行过程中,会对 shuffle map 过程中所产生的中间文件进行分析,动态的调整并优化尚未开始执行的逻辑执行计划和物理执行计划,相对静态优化的 CBO 和 RBO 而言,AQE 的处理能得到更加优化的物理执行计划。

AQE 三大特性

● 自动分区合并

Shuffle 过程分为 Map 阶段和 Reduce 两个阶段,Reduce 阶段会将 Map 阶段产生的中间临时文件拉取到对应的 Executor 下,如果 Map 阶段所处理的数据分布非常不均匀,有很多 key 其实仅仅只有几条数据,数据经过处理后可能会形成比较多的小文件。

为了避免上述情况,可以开启 AQE 的自动分区合并功能,可以避免启动过多的 reduce task 去拉取 Map 阶段生成的小文件。

file

● 自动数据倾斜处理

应用场景主要在 Data Joins 中,当发生数据倾斜,AQE 能够自动检测到倾斜分区,并对倾斜分区按照一定的规则进行拆分。目前,在 Spark3.2 中,对 SortMergeJoin 和 ShuffleHashJoin 都支持自动数据倾斜处理。

● Join 策略调整

AQE 会动态的将 Hash Join、Sort Merge Join,降级调整为 Broadcast Join。

我们知道,Spark 任务一旦开始执行,并行度就已经确定。比如说,shuffle map 阶段,并行度为分区的个数;shuffle reduce 阶段并行度则为 spark.sql.shuffle.partitions 的值,默认为200。如果 Spark 任务在运行的过程中数据量变小导致大部分的分区的大小变小,这时如果仍然启动那么多的线程去处理小的数据集就会导致资源的浪费。

而 AQE 在执行过程会根据 shuffle 后生成的中间临时结果,在一定条件下,通过应用 CoalesceShufflePartitions 规则,结合用户提供的参数自动合并分区,其实就是调整 reducer 的数量。原来一个 reduce 线程只会拉取一个处理后的分区的数据,现在一个 reduce 线程会根据实际情况拉取更多的分区的数据,这样就能减少资源的浪费,提高任务执行效率。 《行业指标体系白皮书》下载地址:https://www.dtstack.com/resources/1057?src=szsm

《数栈产品白皮书》下载地址:https://www.dtstack.com/resources/1004?src=szsm

《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001?src=szsm

想了解或咨询更多有关大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=szcsdn

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/830174.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

后台架构总结

前言 疫情三年,全国各地的健康码成为了每个人的重要生活组成部分。虽然过去一年,但是回想起来任然历历在目。 今天我就通过当时基于小程序的健康码架构,来给大家讲一下如何基于java,springboot等技术来快速搭建一个后台业务系统…

Pixelmator Pro for Mac:简洁而强大的图像编辑软件

Pixelmator Pro for Mac是一款专为Mac用户设计的图像编辑软件,它集简洁的操作界面与强大的功能于一身,为用户提供了卓越的图像编辑体验。 Pixelmator Pro for Mac v3.5.9中文激活版下载 该软件支持多种文件格式,包括常见的JPEG、PNG、TIFF等&…

系统触发器

目录 数据库触发器 常见触发器,记录登录和退出数据库事件 模式触发器 创建一个模式触发器,记录各种 DDL 操作的日志 Oracle从入门到总裁:​​​​​​https://blog.csdn.net/weixin_67859959/article/details/135209645 前面已经介绍过,…

WEB攻防-PHP特性-函数缺陷对比

目录 和 MD5函数 intval ​strpos in_array preg_match str_replace 和 使用 时,如果两个比较的操作数类型不同,PHP 会尝试将它们转换为相同的类型,然后再进行比较。 使用 进行比较时,不仅比较值,还比较变量…

MATLAB非均匀网格梯度计算

在matlab中,gradient函数可以很方便的对均匀网格进行梯度计算,但是对于非均匀网格,但是gradient却无法求解非均匀网格的梯度,这一点我之前犯过错误。我之前以为在gradient函数中指定x,y等坐标,其求解的就是…

Metasploit 溢出 samba 提权漏洞

一、信息收集 1.1 右键单击桌面,选择 Open Terminal Here ,打开终端。 1.2 输入命令 nmap -sS -p 139,445 -A 192.168.1.254 ,对目标主机进行扫描,发现 139、445 端口开放。 1.3 输入命令“msfconsole”,启动 MSF 终端。 1.4 输入命令“searc…

电脑录制视频快捷键,一键开启录屏新时代(干货)

“最近尝试录制一些电脑上的操作视频,用来制作教学教程。不过,每次录制都要通过菜单或搜索来打开录屏软件,实在是有些繁琐。有没有人知道哪些电脑录制视频的快捷键呀?或者有没有通用的快捷键设置方法?” 在当今数字时…

免费语音转文字:自建Whisper,贝锐花生壳3步远程访问

Whisper是OpenAI开发的自动语音识别系统(语音转文字)。 OpenAI称其英文语音辨识能力已达到人类水准,且支持其它98中语言的自动语音辨识,Whisper神经网络模型被训练来运行语音辨识与翻译任务。 此外,与其他需要联网运行…

MySQL中脏读与幻读

一般对于我们的业务系统去访问数据库而言,它往往是多个线程并发执行多个事务的,对于数据库而言,它会有多个事务同时执行,可能这多个事务还会同时更新和查询同一条数据,所以这里会有一些问题需要数据库来解决 我们来看…

centos 7使用源码编译安装Python 3.12.2(最新版本)

(一)、说明 在centos 7上,默认安装出来的python是:2.7.5版本 1.查看python版本: python --version 2.通过yum安装出来的,适合当前操作系统的,最新的python版本是:3.6.8 python3…

云手机对出海企业有什么帮助?

近些年,越来越多的企业开始向海外拓展,意图发掘更广阔的市场。在这过程中,云手机作为一个新型工具为很多企业提供了助力,尤其在解决海外市场拓展过程中的诸多挑战方面发挥着作用。 首先,云手机的出现解决了企业在海外拓…

【Linux系统化学习】死锁 | 线程同步

目录 死锁 死锁的必要条件 避免死锁 线程同步 条件变量 同步概念和竞态条件 条件变量接口 创建和初始化条件变量 等待条件满足 唤醒等待 毁条件变量 为什么 pthread_cond_wait 需要互斥量? 条件变量使用规范 等待条件代码 给条件发送信号代码 死锁 死锁是指在一…

扭蛋机小程序带来了什么优势?扭蛋机收益攻略

在当下的潮流消费时代,人们对潮玩也日益个性化,扭蛋机作为一种新型的娱乐消费模式,深受大众喜爱。扭蛋机的价格低,各个年龄层的玩家都可以进行购买,潜在玩家量非常大。扭蛋机商品主打热门IP周边等,种类繁多…

【PostgreSQL】Postgres数据库安装、配置、使用DBLink详解

目录 一、技术背景1.1 背景1.2 什么是 DBLink 二、安装配置 DBLink2.1 安装 DBLink2.2 配置 DBLink1. 修改 postgresql.conf2. 修改 pg_hba.conf 三、DBLink 使用3.1 数据准备3.2 DBLink 使用1. 创建 DBLink 连接2. 使用 DBLink 进行查询3. 使用 DBLink 进行增删改4. 使用 DBLi…

python代码实现kmeans对鸢尾花聚类

导入第三方库和模型 from sklearn import datasets import numpy as np import matplotlib.pyplot as plt from sklearn.cluster import KMeans2、创建画图函数 def draw_result(train_x, labels, cents, title):n_clusters np.unique(labels).shape[0]#获取类别个数color …

用vue3实现留言板功能

效果图&#xff1a; 代码&#xff1a; <script setup lang"ts"> import { ref } from vue;interface Message {name: string;phone: string;message: string; }const name ref<string>(); const phone ref<string>(); const message ref<st…

Llama 3 安装使用方法

Llama3简介&#xff1a; llama3是一种自回归语言模型&#xff0c;采用了transformer架构&#xff0c;目前开源了8b和70b参数的预训练和指令微调模型&#xff0c;400b正在训练中&#xff0c;性能非常强悍&#xff0c;并且在15万亿个标记的公开数据进行了预训练&#xff0c;比ll…

python——井字棋游戏——登入注册界面

本篇文章只讲解登入和注册页面&#xff0c;在后面的文章中会讲解井字棋游戏&#xff0c;然后把井字棋和登入界面进行连接&#xff0c;整合成一个完整的游戏。 登入注册界面在本篇文章的末尾。 1.实现登入界面 &#xff08;1&#xff09;导入图片 把这张图片存储在与代码路径…

Rundeck(四)安全配置

自动化运维工具rundeck GitHub - rundeck 是java开发的开源自动化服务&#xff0c;具有 Web 控制台、命令行工具和 WebAPI。它使您可以轻松地跨一组节点运行自动化任务&#xff0c;适合运维自动化管理、自动发布管理、运维数据分析等 网站&#xff1a;https://www.rundeck.co…

人人开源框架运行

Getting started renrenio/renren-fast-vue Wiki GitHub 人人开源 1.启动navicat&#xff1a;新建一个数据库renren-fast&#xff0c;字符集为utf-8,utf-8mb3或者utf-8mb4&#xff0c;排序规则不选 2.数据库操作在renren-fast数据库中选择表&#xff0c;运行renren-fast-ma…