赞
踩
因此我们在使用Hive时,要注意对SQL的性能进行调优,主要包括两点——
针对Hive的性能调优,本文分为4个部分介绍,包括——
对于Hive中的表,可以建立为分区表或桶表,了解这两种表,在合适的场景下使用可以提高Hive的性能。
分区表是指表按照某个字段进行划分,比如日期等。
分区表需要在创建表的时候通过PARTITIONED BY
指定分区字段,如下为按照年份进行分区,注意分区字段不能跟前面括号中的字段重复。
CREATE TABLE employee(id INT, name STRING)
PARTITIONED BY(year INT);
在Hive的数据模型中,每个表在HDFS中表现为所在数据库目录下的一个文件夹,如下图,前面的d
表示文件夹。若不是分区表,则内部是一个个子文件,存储表的一部分数据。
当设置分区后,比如我设置employee表按照年份进行分区,则每个分区又是一个单独的文件夹,如下:
在设置分区后,在查询时通过WHERE
子句过滤分区,就可以让Hive只取对应分区的数据进行计算,而不用取整个表的数据,提高计算效率。
桶表是指表按照某个字段的hash值分配到不同的桶中,与分区表的区别在于同一个桶中用于划分的字段不需要指定取值,且可以有多个取值,通常用于高效采样(Sampling)。
桶表同样需要建表时指定,如下表示按照id分为4个桶,划分字段在前面括号中已经声明。
CREATE TABLE employee2(id INT, name STRING)
CLUSTERED BY(id) INTO 4 BUCKETS;
桶表每个分区是一个文件,而不是文件夹,如下:
桶表通常用于高效采样,如下采样就可以只取其中一个桶的数据,相较随机采样等方式更高效。
SELECT * FROM employee2 TABLESAMPLE(BUCKET 2 OUT OF 4);
前面介绍了分区表和桶表,本节主要介绍在Hive使用中几个特殊场景的优化。
关于排序,在之前的笔记中《Hive用户指南》- Hive的连接join与排序有简单介绍。
当我们需要对数据进行全排序时,有两种方案——
直接使用ORDER BY
,由于只有一个Reduce任务,导致性能瓶颈。
实践中仍然是最常用的方法,因为排序的目的一般为找出TOP K个结果,因此通过增加LIMIT
子句,可以将Reduce任务的数据量降低至可接受的K*Map个数
修改Partitioner,如TotalOrderPartitioner可以对数据进行有序分发,如0-9的数据在第一个分区中,10-19的数据在第二个分区中,这样使用SORT BY
和DISTRIBUTE BY
就可以使用多个Reduce任务完成全排序
过程需要先生成分区文件,详细的过程可以参考这篇hive 全排序优化。
当Hive为严格模式hive.mapred.mode=strict
时,不允许在SQL中出现笛卡尔积。
考虑笛卡尔积的使用场景,大部分是小表和大表JOIN
的情况,如大表是学校的学生,小表是学校的课程,笛卡尔积得到的结果为所有可能的选课情况。
对于小表和大表JOIN
的情况,可以使用MAPJOIN
优化,通过将连接操作全部在Map任务中完成,大大提高效率。
Mapjoin的实现原理如下:
将小表的数据,先转化成hashtable格式,再上传到分布式缓存中,每个节点都有小表的完整副本
然后启动MR任务处理大表,在map阶段,依据每一条记录去与缓存中小表对应的hashtable连接得到结果
对于mapjoin的使用,需要注意:
hive.mapjoin.smalltable.filesize
参数来修改大小SELECT /*+MAPJOIN(B)*/ A.value1, A.value2, B.value2
FROM A JOIN B ON A.value1=B.value1;
Hive不支持 IN/EXISTS 子查询,左半连接是Hive对于 IN/EXISTS 子查询的一种更高效的实现。
IN/EXISTS 子查询的使用场景如下:
SELECT A.value1, A.value2
FROM A
WHERE A.value1 IN (SELECT B.value1
FROM B);
在Hive中,使用如下SQL更高效地实现:
SELECT A.value1, A.value2
FROM A LEFT SEMI JOIN B ON A.value1=B.value1;
对于左半连接,需要注意:
在书中提到,Multi-group by可以将查询中的多个group by操作组装到一个MapReduce任务中,起到优化作用。
我在默认配置文件中找到相关参数,组装到一个MR任务的前提是用于分组的Keys相同,所以好像用处没那么大。
<property>
<name>hive.multigroupby.singlereducer</name>
<value>true</value>
<description>
Whether to optimize multi group by query to generate single M/R job plan. If the multi group by query has
common group by keys, it will be optimized to generate single M/R job.
</description>
</property>
比如在下面的例子中,两次GROUP BY
只会生成一个MR任务。
FROM employee
INSERT OVERWRITE temp1
SELECT id, count(1) GROUP BY id,job
INSERT OVERWRITE temp2
SELECT id,job GROUP BY id,job;
上一节介绍了某些特定场景下SQL的优化,本节本来要记录Hive中的配置参数优化,不过在网上看到一篇更丰富的笔记,配置相关的优化可以直接参考hive优化。
这里摘抄其中几个认为比较重要的参数。
hive.exec.parallel=true
hive.exec.parallel参数控制在同一个sql中的不同的job是否可以同时运行,默认为false。
同一个sql中的几个子查询对应的mapreduce job是可以并行执行的,这种情况下如果系统资源充足设置此参数为true可以加快执行速度。
hive.exec.parallel.thread.number=8,控制对于同一个sql来说同时可以运行的job的最大值,默认为8,此时最大可以同时运行8个job。
hive.map.aggr=true
在map中会做部分聚集操作,效率更高但需要更多的内存。
hive.groupby.skewindata=true
数据倾斜时负载均衡,当选项设定为true,生成的查询计划会有两个MRJob。
第一个MRJob 中,Map的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的GroupBy Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;
第二个MRJob再根据预处理的数据结果按照GroupBy Key分布到Reduce中(这个过程可以保证相同的GroupBy Key被分布到同一个Reduce中),最后完成最终的聚合操作。
Map/reduce数量
mapred.reduce.tasks:MR任务中mapper和reducer的数量
mapred.max.split.size,mapred.min.split.size:根据数据量大小划分map任务
hive.exec.reducers.bytes.per.reducer=1G:每个reduce任务处理的数据量
hive.exec.reducers.max=999:每个任务最大的reduce数目
假设表a只有一个文件,大小为120M,包含几千万的记录,如果用1个map去完成这个任务,会比较耗时。
考虑将这一个文件合理拆分成多个,这样就可以用多个map任务去完成。
set mapred.reduce.tasks=10;
create table a_1 as select * from a distribute by rand(123);这样会将a表的记录,随机的分散到包含10个文件的a_1表中,再用a_1表代替a表,则会用10个map任务去完成。每个map任务处理大约12M(几百万记录)的数据,效率肯定会好很多。
小文件合并
hive.merge.mapfiles:是否和并 Map 输出文件,默认为 True
hive.merge.mapredfiles:是否合并 Reduce 输出文件,默认为 False
hive.merge.size.per.task:合并文件的大小
文件数目过多,会给 HDFS 带来压力,并且会影响处理效率,可以通过合并 Map和Reduce 的结果文件来消除这样的影响。
数据倾斜问题是指由于数据分布不均匀等原因,导致某个/某些Reduce任务需要处理大量数据,无法计算得到结果(任务超时失败),通常表现为Reduce执行到99%后长时间得不到结果。
本节介绍常见的数据倾斜场景及解决方案,例子来源于《hive用户指南》,里面的SQL有些错误,这里有做改正。
注意,数据倾斜通常是在Shuffle过程中相同key的键值对过多、且被分配到同一个Reduce任务中,因此在JOIN
、GROUP BY
、DISTINCT
这类需要根据key来Shuffle的任务要注意数据倾斜问题。
在JOIN
、GROUP BY
、DISTINCT
等需要Shuffle的操作中,需要注意表中是否有空值或无效值。
比如下面SQL中,如果A表较大,且A中key列空值较多(比如50%),则在MR任务的Shuffle过程,所有空值的键值对被分配到一个Reduce任务中,相对其他Reduce任务工作量大得多,导致数据倾斜。
SELECT *
FROM A JOIN B ON A.key=B.key
解决方法1:把空值的运算独立出来(如果空值不需要就直接过滤掉),如可修改为下面的SQL,注意两个SELECT子句的列要相同,所以空值部分要把B表相应的列也手动赋值
SELECT A1.*, B.value
FROM (SELECT *
FROM A
WHERE A.key is not null) A1
JOIN B ON A1.key=B.key
UNION ALL
SELECT A.*, null as value FROM A WHERE A.key is null;
解决方法2:把空值的key变成一个字符串加上随机数,就能把倾斜的数据分到不同的reduce上,如可修改为下面的SQL,case end
子句对key进行判断重新赋值
SELECT A.*, B.value
FROM A JOIN B
on case when A.key is null then concat('dp_hive',rand()) else A.key end =B.key;
网上很多笔记举的例子都是下面这一个:
注册表中ID字段为int类型,登录表中ID字段即有string类型,也有int类型。当按照ID字段进行两表之间的join操作时,默认的Hash操作会按int类型的ID来进行分配,这样会导致所有string类型ID的记录都分配到一个Reduce。
实际上跟Hash过程无关,这里解释下原因:
问题描述中,登录表中ID字段为string类型,string中有一些数字也有字符串,比如A12B255
,7843332
。
由于数据类型不同,Hive会把注册表和登录表的ID字段都转化为double类型,导致字符串的部分都变成NULL值,所以本质上也是空值导致的数据倾斜。
这里随便写一个int和string进行关联的例子,explain
一下,可以看到两个表均转化为double类型。
explain select p .userid, e.job from page_view p join employee e on p.userid=e.job;
在表的关联中,如果是小表和大表关联,则可以使用2.2中提到的Mapjoin优化,由于在Map端完成,不会产生数据倾斜。
对于大表与大表的关联、如果key分布不均,则容易产生数据倾斜,目前有几种解决思路:
解决思路1:修改Hive配置,hive.groupby.skewindata=true,用于数据倾斜时实现负载均衡,可以在第3节中查看具体解释。
解决思路2:考虑先对其中的一个表去重,过滤无用信息。这样可以得到小表,再使用Mapjoin解决。
一个实例是广告投放效果分析,例如将广告投放者信息表中的信息填充到广告曝光日志表中,使用投放者 id关联。
因为实际广告投放者数量很少,因此可以考虑先去重查询所有实际广告投放者的 id 列表,得到一个小表,就可以使用 Mapjoin。
解决思路3:对其中一个大表进行拆分,得到多个小表,进行多次Mapjoin。
一个实例是商品浏览日志分析,例如将商品信息表中的信息填充到商品浏览日志表中,使用商品 id 关联。但是某些热卖商品浏览量很大,造成数据倾斜。
通过抽样,把商品信息表拆分,再分别Mapjoin,最后把结果做组合即可。
抽样可以用tablesample或者随机抽样,参考【Hive】HiveQL实战之数据抽样Sample。若表不是桶表,tablesample也需要扫描整个表,但相对随机抽样(先排序)应该还是更高效。
示例SQL如下,通过tablesample抽样变成两个Mapjoin。
SELECT *
FROM A JOIN B tablesample(BUCKET 1 out of 2 on key) B1 ON A.key=B1.key
union all
SELECT *
FROM A JOIN B tablesample(BUCKET 2 out of 2 on key) B2 ON A.key=B2.key;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。