当前位置:   article > 正文

《Hive用户指南》- Hive性能调优_hive multigroupby singlemr

hive multigroupby singlemr


Hive针对的应用场景是OLAP,通常对大数据集进行查询等操作,很多SQL要运行几十分钟到几个小时。

因此我们在使用Hive时,要注意对SQL的性能进行调优,主要包括两点——

  • 避免时间复杂度过高的SQL
  • 避免数据倾斜问题

针对Hive的性能调优,本文分为4个部分介绍,包括——

  • 数据模型相关的知识点
  • 特定场景下的优化
  • 配置相关的优化
  • 常见数据倾斜解决方案

1. 数据模型相关

对于Hive中的表,可以建立为分区表桶表,了解这两种表,在合适的场景下使用可以提高Hive的性能。

1.1 Partition 分区表

分区表是指表按照某个字段进行划分,比如日期等。

分区表需要在创建表的时候通过PARTITIONED BY指定分区字段,如下为按照年份进行分区,注意分区字段不能跟前面括号中的字段重复。

CREATE TABLE employee(id INT, name STRING)
PARTITIONED BY(year INT)
  • 1
  • 2

在Hive的数据模型中,每个表在HDFS中表现为所在数据库目录下的一个文件夹,如下图,前面的d表示文件夹。若不是分区表,则内部是一个个子文件,存储表的一部分数据。
在这里插入图片描述

当设置分区后,比如我设置employee表按照年份进行分区,则每个分区又是一个单独的文件夹,如下:
在这里插入图片描述

在设置分区后,在查询时通过WHERE子句过滤分区,就可以让Hive只取对应分区的数据进行计算,而不用取整个表的数据,提高计算效率。

1.2 Bucket 桶表

桶表是指表按照某个字段的hash值分配到不同的桶中,与分区表的区别在于同一个桶中用于划分的字段不需要指定取值,且可以有多个取值,通常用于高效采样(Sampling)。

桶表同样需要建表时指定,如下表示按照id分为4个桶,划分字段在前面括号中已经声明。

CREATE TABLE employee2(id INT, name STRING)
CLUSTERED BY(id) INTO 4 BUCKETS;
  • 1
  • 2

桶表每个分区是一个文件,而不是文件夹,如下:
在这里插入图片描述

桶表通常用于高效采样,如下采样就可以只取其中一个桶的数据,相较随机采样等方式更高效。

SELECT * FROM employee2 TABLESAMPLE(BUCKET 2 OUT OF 4);
  • 1

2. 场景优化

前面介绍了分区表和桶表,本节主要介绍在Hive使用中几个特殊场景的优化。

2.1 全排序

关于排序,在之前的笔记中《Hive用户指南》- Hive的连接join与排序有简单介绍。

当我们需要对数据进行全排序时,有两种方案——

  1. 直接使用ORDER BY,由于只有一个Reduce任务,导致性能瓶颈。

    实践中仍然是最常用的方法,因为排序的目的一般为找出TOP K个结果,因此通过增加LIMIT子句,可以将Reduce任务的数据量降低至可接受的K*Map个数

  2. 修改Partitioner,如TotalOrderPartitioner可以对数据进行有序分发,如0-9的数据在第一个分区中,10-19的数据在第二个分区中,这样使用SORT BYDISTRIBUTE BY就可以使用多个Reduce任务完成全排序

    过程需要先生成分区文件,详细的过程可以参考这篇hive 全排序优化

2.2 笛卡尔积

当Hive为严格模式hive.mapred.mode=strict时,不允许在SQL中出现笛卡尔积。

考虑笛卡尔积的使用场景,大部分是小表和大表JOIN的情况,如大表是学校的学生,小表是学校的课程,笛卡尔积得到的结果为所有可能的选课情况。

对于小表和大表JOIN的情况,可以使用MAPJOIN优化,通过将连接操作全部在Map任务中完成,大大提高效率。

Mapjoin的实现原理如下:

  • 将小表的数据,先转化成hashtable格式,再上传到分布式缓存中,每个节点都有小表的完整副本

  • 然后启动MR任务处理大表,在map阶段,依据每一条记录去与缓存中小表对应的hashtable连接得到结果

对于mapjoin的使用,需要注意:

  • 小表必须是从表,即连接时的右表(右半连接则是左表)
  • 关于小表的定义,默认阈值是25M,可以通过 hive.mapjoin.smalltable.filesize 参数来修改大小
  • Hive默认会自动将可以转化为mapjoin的任务进行转化,当然也可以按照下列语法显式指定使用
SELECT /*+MAPJOIN(B)*/ A.value1, A.value2, B.value2
FROM A JOIN B ON A.value1=B.value1;
  • 1
  • 2
2.3 EXISTS/IN子查询

Hive不支持 IN/EXISTS 子查询,左半连接是Hive对于 IN/EXISTS 子查询的一种更高效的实现。

IN/EXISTS 子查询的使用场景如下:

SELECT A.value1, A.value2
FROM A
WHERE A.value1 IN (SELECT B.value1 
                   FROM B);
  • 1
  • 2
  • 3
  • 4

在Hive中,使用如下SQL更高效地实现:

SELECT A.value1, A.value2
FROM A LEFT SEMI JOIN B ON A.value1=B.value1;
  • 1
  • 2

对于左半连接,需要注意:

  • 左半连接只传递右表用于比较的key,因此最后的结果只会有左表的数据
  • 右表只能在on子句中设置过滤条件,在where等子句中不能过滤,原因与前面相同
  • 遇到右表重复记录,左表会跳过,而内连接会一直遍历,因此在右表有重复记录时,左半连接仅生成一条记录,与IN相同
2.4 Multi-group by

在书中提到,Multi-group by可以将查询中的多个group by操作组装到一个MapReduce任务中,起到优化作用。

我在默认配置文件中找到相关参数,组装到一个MR任务的前提是用于分组的Keys相同,所以好像用处没那么大。

<property>
    <name>hive.multigroupby.singlereducer</name>
    <value>true</value>
    <description>
      Whether to optimize multi group by query to generate single M/R  job plan. If the multi group by query has 
      common group by keys, it will be optimized to generate single M/R job.
    </description>
  </property>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

比如在下面的例子中,两次GROUP BY只会生成一个MR任务。

FROM employee
INSERT OVERWRITE temp1
SELECT id, count(1) GROUP BY id,job
INSERT OVERWRITE temp2
SELECT id,job GROUP BY id,job;
  • 1
  • 2
  • 3
  • 4
  • 5

在这里插入图片描述

3. 配置相关

上一节介绍了某些特定场景下SQL的优化,本节本来要记录Hive中的配置参数优化,不过在网上看到一篇更丰富的笔记,配置相关的优化可以直接参考hive优化

这里摘抄其中几个认为比较重要的参数。

hive.exec.parallel=true

hive.exec.parallel参数控制在同一个sql中的不同的job是否可以同时运行,默认为false。

同一个sql中的几个子查询对应的mapreduce job是可以并行执行的,这种情况下如果系统资源充足设置此参数为true可以加快执行速度。
hive.exec.parallel.thread.number=8,控制对于同一个sql来说同时可以运行的job的最大值,默认为8,此时最大可以同时运行8个job。

hive.map.aggr=true

在map中会做部分聚集操作,效率更高但需要更多的内存。

hive.groupby.skewindata=true

数据倾斜时负载均衡,当选项设定为true,生成的查询计划会有两个MRJob。

第一个MRJob 中,Map的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的GroupBy Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的

第二个MRJob再根据预处理的数据结果按照GroupBy Key分布到Reduce中(这个过程可以保证相同的GroupBy Key被分布到同一个Reduce中),最后完成最终的聚合操作。

Map/reduce数量

mapred.reduce.tasks:MR任务中mapper和reducer的数量

mapred.max.split.size,mapred.min.split.size:根据数据量大小划分map任务

hive.exec.reducers.bytes.per.reducer=1G:每个reduce任务处理的数据量

hive.exec.reducers.max=999:每个任务最大的reduce数目

假设表a只有一个文件,大小为120M,包含几千万的记录,如果用1个map去完成这个任务,会比较耗时。

考虑将这一个文件合理拆分成多个,这样就可以用多个map任务去完成。

set mapred.reduce.tasks=10;
create table a_1 as select * from a distribute by rand(123);

这样会将a表的记录,随机的分散到包含10个文件的a_1表中,再用a_1表代替a表,则会用10个map任务去完成。每个map任务处理大约12M(几百万记录)的数据,效率肯定会好很多。

小文件合并

hive.merge.mapfiles:是否和并 Map 输出文件,默认为 True

hive.merge.mapredfiles:是否合并 Reduce 输出文件,默认为 False

hive.merge.size.per.task:合并文件的大小

文件数目过多,会给 HDFS 带来压力,并且会影响处理效率,可以通过合并 Map和Reduce 的结果文件来消除这样的影响。

4. 数据倾斜问题

数据倾斜问题是指由于数据分布不均匀等原因,导致某个/某些Reduce任务需要处理大量数据,无法计算得到结果(任务超时失败),通常表现为Reduce执行到99%后长时间得不到结果

本节介绍常见的数据倾斜场景及解决方案,例子来源于《hive用户指南》,里面的SQL有些错误,这里有做改正。

注意,数据倾斜通常是在Shuffle过程中相同key的键值对过多、且被分配到同一个Reduce任务中,因此在JOINGROUP BYDISTINCT这类需要根据key来Shuffle的任务要注意数据倾斜问题。

4.1 空值导致的数据倾斜

JOINGROUP BYDISTINCT等需要Shuffle的操作中,需要注意表中是否有空值或无效值。

比如下面SQL中,如果A表较大,且A中key列空值较多(比如50%),则在MR任务的Shuffle过程,所有空值的键值对被分配到一个Reduce任务中,相对其他Reduce任务工作量大得多,导致数据倾斜。

SELECT * 
FROM A JOIN B ON A.key=B.key
  • 1
  • 2

解决方法1:把空值的运算独立出来(如果空值不需要就直接过滤掉),如可修改为下面的SQL,注意两个SELECT子句的列要相同,所以空值部分要把B表相应的列也手动赋值

SELECT A1.*, B.value 
FROM (SELECT * 
      FROM A 
      WHERE A.key is not null) A1
JOIN B ON A1.key=B.key

UNION ALL
               
SELECT A.*, null as value FROM A WHERE A.key is null;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

解决方法2:把空值的key变成一个字符串加上随机数,就能把倾斜的数据分到不同的reduce上,如可修改为下面的SQL,case end子句对key进行判断重新赋值

SELECT A.*, B.value 
FROM A JOIN B
on case when A.key is null then concat('dp_hive',rand()) else A.key end =B.key;
  • 1
  • 2
  • 3
4.2 不同数据类型关联导致的数据倾斜

网上很多笔记举的例子都是下面这一个:

注册表中ID字段为int类型,登录表中ID字段即有string类型,也有int类型。当按照ID字段进行两表之间的join操作时,默认的Hash操作会按int类型的ID来进行分配,这样会导致所有string类型ID的记录都分配到一个Reduce

实际上跟Hash过程无关,这里解释下原因:

问题描述中,登录表中ID字段为string类型,string中有一些数字也有字符串,比如A12B2557843332

由于数据类型不同,Hive会把注册表和登录表的ID字段都转化为double类型,导致字符串的部分都变成NULL值,所以本质上也是空值导致的数据倾斜。

这里随便写一个int和string进行关联的例子,explain一下,可以看到两个表均转化为double类型。

explain select p .userid, e.job from page_view p join employee e on p.userid=e.job;
  • 1

<img src="/ alt="image-20200411120647472" style="zoom:50%;" />

4.3 大表关联导致的数据倾斜

在表的关联中,如果是小表和大表关联,则可以使用2.2中提到的Mapjoin优化,由于在Map端完成,不会产生数据倾斜。

对于大表与大表的关联、如果key分布不均,则容易产生数据倾斜,目前有几种解决思路:

解决思路1:修改Hive配置,hive.groupby.skewindata=true,用于数据倾斜时实现负载均衡,可以在第3节中查看具体解释。

解决思路2:考虑先对其中的一个表去重,过滤无用信息。这样可以得到小表,再使用Mapjoin解决。

一个实例是广告投放效果分析,例如将广告投放者信息表中的信息填充到广告曝光日志表中,使用投放者 id关联。

因为实际广告投放者数量很少,因此可以考虑先去重查询所有实际广告投放者的 id 列表,得到一个小表,就可以使用 Mapjoin。

解决思路3:对其中一个大表进行拆分,得到多个小表,进行多次Mapjoin

一个实例是商品浏览日志分析,例如将商品信息表中的信息填充到商品浏览日志表中,使用商品 id 关联。但是某些热卖商品浏览量很大,造成数据倾斜。

通过抽样,把商品信息表拆分,再分别Mapjoin,最后把结果做组合即可。

抽样可以用tablesample或者随机抽样,参考【Hive】HiveQL实战之数据抽样Sample。若表不是桶表,tablesample也需要扫描整个表,但相对随机抽样(先排序)应该还是更高效。

示例SQL如下,通过tablesample抽样变成两个Mapjoin。

SELECT *
FROM A JOIN B tablesample(BUCKET 1 out of 2 on key) B1 ON A.key=B1.key

union all

SELECT *
FROM A JOIN B tablesample(BUCKET 2 out of 2 on key) B2 ON A.key=B2.key;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

Reference

  1. 《Hive用户指南》

  2. 《Hive用户指南》- Hive的连接join与排序

  3. hive 全排序优化

  4. hive优化

  5. hive中bigint和string比较join,我explain发现他们都转换成double类型。?

  6. 【Hive】HiveQL实战之数据抽样Sample

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号