从MapReduce的执行来看如何优化MaxCompute(原ODPS) SQL

摘要: SQL基础有这些操作(按照执行顺序来排列): from join(left join, right join, inner join, outer join ,semi join) where group by select sum distinct count order by 如果我们能理解mapreduce是怎么实现这些SQL中的基本操作的,那么我们将很容易理解怎么优化SQL写法。

点此查看原文:http://click.aliyun.com/m/41382/

SQL基础有这些操作(按照执行顺序来排列):

from
join(left join, right join, inner join, outer join ,semi join)
where
group by
select
sum
distinct
count
order by
如果我们能理解mapreduce是怎么实现这些SQL中的基本操作的,那么我们将很容易理解怎么优化SQL写法。接下来我们一个一个的谈:

from
这个操作是在解析过程中就完成了,目的就是找出输入的表(文件)。

join(left join, right join, inner join, outer join ,semi join)
这个操作需要在参与map和reduce整个阶段。下图给出了各个阶段的数据输入输出变化:
假如执行这个SQL:

select student_id, student_name, course_id 
from student left join student_course on student.student_id = student_course.student_id;

图片描述

从上面图可以看出当出现数据在某个(某些)key特别集中的时候,就会出现reduce的接收数据是不均匀的,导致reduce端数据倾斜。

where
这个地方如果有分区字段的话,会直接解析阶段就做裁剪。不会拖到后面的map和reduce阶段。如果不是分区字段,则只会涉及得到map阶段,在这个阶段直接过滤。

group by

select student_id, sum(score)
from student_course
group by student_id

将GroupBy的字段组合为map的输出key值,利用MapReduce的排序,在reduce阶段保存LastKey区分不同的key。MapReduce的过程如下(当然这里只是说明Reduce端的非Hash聚合过程)
图片描述

select

因为MaxComput(原ODPS)的文件存储是列式的,所以在select在编译解析的过程中会起到裁剪列的作用。比如一个表假如有100列,select中只出现了3列,那么其余的97列是没有进行计算的。写select尽量避免使用*,并且不需要的字段尽量删减掉。

sum

到这里开始涉及到了聚合函数,聚合函数需要区分可以拆分并行和不可以拆分并行两种。sum是典型的可拆分并行的。sum(1,2,3,1) = sum(1,2) + sum(3,1) = 7。而avg就是不可并行计算,avg(1,2,3,1) != avg(1,2) + avg(3,1) != avg(avg(1,2) + avg(3,1))。但是avg可以转化成可并行计算,比如先sum分子,再sum分母来并行化。

如果函数可并行,那么就可以在map阶段进行提前聚合,大大减少后面的发往reduce端的网络传递。

distinct

如果是单distinct的话,会把distinct的列直接附在group-by字段组后面,然后进行处理。

麻烦的是multi distinct。根据disinct的逻辑,必须保证每个分组(group-by)相同的distinct列相同的key都分在同一个reduce中,否则就没有办法完成去重工作。所以如果按照单distinct的逻辑,reduce端就需要针对每一个distinct字段进行排序和去重。这样做显然是不高效的,因为对reduce端的计算压力很大,而且也没有利用到shuffle阶段的排序。

第二种方法就是把distinct的字段都拆开,形成独立的n张表。最后再做union all的操作。过程如下:

select date, count(distinct student_id),count(distinct course), sum(score)
from student_course
group by date

图片描述

order by

在odps上和order by相似的功能在还有sort by, distribute by,cluster by。 后面的语法在普通的关系型数据库都不存在。算是mapreduce特有的功能。这里先解释下每个语句的含义:

order by —— order by会对输入做全局排序,因此只有一个Reducer(多个Reducer无法保证全局有序),然而只有一个Reducer,会导致当输入规模较大时,消耗较长的计算时间。

sort by —— sort by不是全局排序,其在数据进入reducer前完成排序,因此,如果用sort by进行排序,并且设置mapred.reduce.tasks>1,则sort by只会保证每个reducer的输出有序,并不保证全局有序。sort by不同于order by,它不受Hive.mapred.mode属性的影响,sort by的数据只能保证在同一个reduce中的数据可以按指定字段排序。使用sort by你可以指定执行的reduce个数(通过set mapred.reduce.tasks=n来指定),对输出的数据再执行归并排序,即可得到全部结果。

distribute by —— distribute by是控制在map端如何拆分数据给reduce端的。hive会根据distribute by后面列,对应reduce的个数进行分发,默认是采用hash算法。sort by为每个reduce产生一个排序文件。在有些情况下,你需要控制某个特定行应该到哪个reducer,这通常是为了进行后续的聚集操作。distribute by刚好可以做这件事。因此,distribute by经常和sort by配合使用。

cluster by —— cluster by除了具有distribute by的功能外还兼具sort by的功能。但是排序只能是倒叙排序,不能指定排序规则为ASC或者DESC。

MapReduce的几个阶段

input
split
map
shuffle
reduce
output 这每个阶段都会出现各种问题,我们依次从前到后来讲怎么处理各个阶段出现的问题。

Input & split

根据MaxCompute的功能,input可以是本地文件,也可以是数据库的表。可以通过InputFormat借口来定义。但是这个Format和后面的split阶段息息相关。因为split只切割比block小的文件,对于小文件则不作处理。所以当存在大量的小文件(特指大小达不到block大小的文件),会生成大量的split块,同时也会启动大量map任务。

可能出现的问题

分区裁剪中出现问题 > 解决方法是让odps在生成任务之前就能确定好读区到分区的范围
输入存在大量小文件,导致map instance数量超标 > 解决办法是读取时候设定块大小,可以使用setSplitSize来控制读取文件总大小 > 解决方案二是提前就把这些小文件给合并了
输入文件大小分布非常不均匀,导致split的块大小分布不均匀,从而导致map端倾斜 > 可以使用setSplitSize来控制读取文件总大小
输入的文件不能被切割,导致split块大小不均匀

暂时没有找到解法

相比于hadoop,odps系统在小文件处理方面的功能已经比较完善,主要体现在以下两个方面:
(1) 默认情况下,当Job完成之后,如果满足一定的条件,系统会自动分配一个FuxiTask(调度任务)进行小文件合并,即我们经常看到的MergeTask;

map

map阶段的输入是上面Input&split阶段来保障的,一个分片一个map任务。所以当分片处理的不合理,map阶段就会出现问题。而map端经过shuffle和combianer(可选)后,会把数据交给reduce端。

从input&split 到map可能出现的问题
输入存在大量小文件,导致map instance数量超标 > 同上
因为ODPS的SQL或者其他任务会解析成一个Task DAG。所以从最初输入到最终输出会有很多的中间计算。而这些中间计算之间也是对应着一个个的map reduce。如果当上一个map/reduce任务产生的输入可能形成一个种长尾分布,导致下一个mapreduce输入出现长尾。也就是map端任务倾斜。

shuffle

这个阶段是mapreduce的核心,设计到sort,group和数据分发。

可能出现的问题
数据量特别大,可以使用combinar来进行mapper端的聚合。odps的参数是

reduce

知道mapreduce计算模型的人都知道,map阶段输入是非结构化的,并不需要实现规定好输入的内容,输出则是一块块分区好的pair。而到reduce则有要求,那就是同样key的map处理的pair需要发送到同样的reduce中。这样就会出现某key数据量很大,某key数据量很小的时候对应的reduce处理的数据量大小也是不均匀的。一旦出现这种情,任务执行的结束时间必然会受到最长任务的拖累。,v>,v>

能产生reduce数据分布不均匀的操作,最长出现的有两分类:

  1. join

这里推荐本书《mapreduce设计模式》,其中的连接模式篇章把各种join的描述。在这里大概说下join的类型:

reduce端连接
map端连接(在odps中使用mapjoin即可),这个操作的前提是存在一个小表能放入到mapreduce中的环形内存中。而且大表必须作为“主表”(比如left join的话就必须是左表,而right join就必须是右表)。
所以到底为什么会产生倾斜呢?map端连接肯定是不会产生数据倾斜的,那么倾斜的必然是reduce连接。当一张表出现数据热点。这样就会出现热点reduce的运行远远大于其它的长尾,导致数据不均衡。

大概总结下就是:

  • 如果存在小表,且如果左外连接时候小表是右表(或者是右外连接,小表必须是左表),可以使用mapjoin。
  • 如果都是大表且有热点,这样会出现倾斜,这时候需要剔除热点数据单独处理。
  • 如果都是大表没有热点,这样不会出现倾斜,这样还需要怎么优化?——这里首选想办法减小数据集合,如果不能在查看是否出现某些热门的数据,如果有,则对数据进行分桶。

count(distinct) 对于distinct的实现,单键的时候会被直接附到group by的字段后,同时作为map输出的key值来处理。这样转化成了group by处理,一般是没有问题的。但是麻烦的是多键值count(distinct),这个没有办法直接把所有的distinct的字段附到group by后面了。因为这样无法利用shuffle阶段的排序,到了reduce阶段需要做很多遍的去重操作。所有一般对于multi distinct都是采用给distinct 字段做编号,然后复制数据。比如输入数据是这样:
可以看到distinct会导致数据翻倍膨胀,而这些膨胀的数据后会通过网络传输到reduce,必然会造成很大的浪费。所以要治理,方法一是首先把distinct转成group by放在子查询中,然后外层再套一层查询进行分组count。

select user_id,count(deal_id),count(item) 
from
(select  user_id,deal_id, item from deal_list group by user_id,deal_id, item
) group by user_id;

方法二:设置参数——odps.sql.groupby.skewindata=true
当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/452760.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

套接字(socket)基本知识与工作原理

套接字(socket)基本知识与工作原理 一、Socket相关概念 Socket通常也称作“套接字”,用于描述IP地址和端口,是一个通信链的句柄。(其实就是两个程序通信用的。) SOCKET用于在两个基于TCP/IP协议的应用程序之…

python 多线程--重点知识

1.全局变量global的用法 2.多线程共享全局变量-args参数 注意args参数类型为元组,逗号不能少!

当导用模块与包的import与from的问题(模块与包的调用)

当在views.py里写impor models会不会报错呢? 1、Python里面的py文件都是每一行的代码。2、Python解释器去找一个模块的时候,只去sys.path的路径里找3、django项目启动(django项目的启动文件是manage.py)启动项目是将manage.py的路…

Python多线程--互斥锁、死锁

1、互斥锁 为解决资源抢夺问题,使用mutex Threading.Lock()创建锁,使用mutex.acquire()锁定,使用mutex.release()释放锁。 代码一: import threading import time# 定义一个全局变量 g_num 0def test1(num):global g_num# 上锁…

freemind 要下载java_Freemind

动手编辑先按Ctrln,新建一个文件。这时出现了一个根节点。用光标单击它,改成“我学FreeMind”,然后在节点之外任一地方点击鼠标(或按Enter)完成编辑。然后,按Insert键,输入“下载安装”,按Enter键&#xff…

pyecharts对于经纬度_一文带你掌握Pyecharts地理数据可视化的方法

本文主要介绍了Pyecharts地理数据可视化,分享给大家,具体如下:一、Pyecharts简介和安装1. 简介Echarts 是一个由百度开源的数据可视化,凭借着良好的交互性,精巧的图表设计,得到了众多开发者的认可。而 Pyth…

使用Sqlmap对dvwa进行sql注入测试(初级阶段)

0.测试准备 1)打开Kali虚拟机终端; 2)打开靶机OWASP,并通过浏览器,输入IP地址进入dvwa的主页,然后选择SQL injection进入SQL注入的测试页面 1.获取DVWA的url和cookie 在输入框中输入1,显示有内容&…

如果备份还原SecureCRT、Xshell远程工具远程

因为有时候电脑操作系统要重新安装,需要将远程备份下来。或者要将远程发给其他同事。一、如何备份还原SecureCRT远程1、打开options-global options---general---configuration paths找到配置文件保存路径,如下图:2、打开C:\Users\NUC\AppDat…

Centos7 下yum安装mysql

转载于:https://www.cnblogs.com/nbjjy/p/9023991.html

Python协程--实现斐波那契数列(Fibonacci)的几种方式

1.使用for遍历list数组 # 使用for遍历list数组 nums list() a 0 b 1 i 0while i < 10:nums.append(a)a, b b, abi 1for num in nums:print(num)2.使用迭代器完成 class Fibonacci(object):def __init__(self, all_num):self.all_num all_numself.current_num 0sel…

FTP服务的简介和配置详解

FTP服务的简介和配置详解注意&#xff1a;配置FTP服务时&#xff0c;最好关闭防火墙和selinux1、FTP服务简介FTP 是File Transfer Protocol&#xff08;文件传输协议&#xff09;的英文简称&#xff0c;而中文简称为“文件传输协议”。用于Internet上的控制文件的双向传输。同时…

Python协程--生成器(实现多任务)

0.生成器 1.使用yield完成多任务 import timedef task_1():while True:print("---1----")time.sleep(0.1)yielddef task_2():while True:print("---2----")time.sleep(0.1)yielddef main():t1 task_1()t2 task_2()# 先让t1运行一会&#xff0c;当t1中遇…

技术分享连载(六十一)

资源管理 Q1&#xff1a;Unity5.4.1中&#xff0c;我将需要的Shader打到一个AssetBundle包中&#xff08;包含一个关联了所有Shader的Shader Variants&#xff09;&#xff0c;分别用Shader.WarmupAllShaders和ShaderVariantCollection.WarmUp两种方式进行预加载&#xff0c;后…

SNF软件开发机器人-子系统-导出-导入功能-多人合作时这个功能经常用到

导出 导出可以将资源表和子系统导出并形成一个json文件。 1.效果展示&#xff1a; 2.使用说明&#xff1a; 点击导出按钮后会弹出一个导出页面。页面的左侧可以选择功能&#xff0c;右侧可以选择资源表&#xff0c;选择功能的同时右侧中功能所需的资源表也会被选择。当功能之间…

基于物理的渲染-用真实的环境光照亮物体

目前&#xff0c;在游戏引擎中用于照亮物体的光源非常丰富。其中&#xff0c;比较常用的有&#xff1a;平行方向光、点光源、聚光灯以及体积光等&#xff0c;但它们都是对真实光源的近似&#xff0c;并不能很好地模拟真实世界中的复杂光照情况。为了增加光照效果的真实感&#…

克隆CentOS6虚拟机eth0被修改为eth1如何修改eth0

2019独角兽企业重金招聘Python工程师标准>>> 直接修改 /etc/sysconfig/network-script/ifcfg-eth0 删掉UUID HWADDR 配置静态地址 然后&#xff1a; rm -rf  /etc/udev/rules.d/70-persistent-net.rules然后reboot 转载于:https://my.oschina.net/hengbao666/blog/…

如何高效的编写与同步博客 (.NET Core 小工具实现)

系列目录 [如何高效的编写与同步博客&#xff08;一&#xff09;- 编写 ]如何高效的编写与同步博客&#xff08;二&#xff09;- 快速发布到多个渠道一.前言 写博客&#xff0c;可以带给我们很多好处&#xff0c;比如可以让我们结识更多志同道合的人&#xff1b;在写博客过程中…

mxf高速发展和数字电影母版制作技术

1.实现MXF的诺言——格式只是迈向可互操作内容管理的一步 当把元数据加到数字内容的MXF&#xff08;素材交换格式&#xff09;标准通过EBU于2002年推出时&#xff0c;当时曾预期广播机构将会迅速无缝地管理其所有来自不同厂家的制作、后期和分配系统上的数字内容。 六年过…