当前位置:   article > 正文

用PySpark开发时的调优思路(下)

pyspark调优 repartition

上期回顾:用PySpark开发时的调优思路(上)
2. 资源参数调优

如果要进行资源调优,我们就必须先知道Spark运行的机制与流程。

下面我们就来讲解一些常用的Spark资源配置的参数吧,了解其参数原理便于我们依据实际的数据情况进行配置。

1)num-executors

指的是执行器的数量,数量的多少代表了并行的stage数量(假如executor是单核的话),但也并不是越多越快,受你集群资源的限制,所以一般设置50-100左右吧。

2)executor-memory

这里指的是每一个执行器的内存大小,内存越大当然对于程序运行是很好的了,但是也不是无节制地大下去,同样受我们集群资源的限制。假设我们集群资源为500core,一般1core配置4G内存,所以集群最大的内存资源只有2000G左右。num-executors x executor-memory 是不能超过2000G的,但是也不要太接近这个值,不然的话集群其他同事就没法正常跑数据了,一般我们设置4G-8G。

3)executor-cores

这里设置的是executor的CPU core数量,决定了executor进程并行处理task的能力。

4)driver-memory

设置driver的内存,一般设置2G就好了。但如果想要做一些Python的DataFrame操作可以适当地把这个值设大一些。

5)driver-cores

与executor-cores类似的功能。

6)spark.default.parallelism

设置每个stage的task数量。一般Spark任务我们设置task数量在500-1000左右比较合适,如果不去设置的话,Spark会根据底层HDFS的block数量来自行设置task数量。有的时候会设置得偏少,这样子程序就会跑得很慢,即便你设置了很多的executor,但也没有用。

下面说一个基本的参数设置的shell脚本,一般我们都是通过一个shell脚本来设置资源参数配置,接着就去调用我们的主函数。

  1. #!/bin/bash
  2. basePath=$(cd "$(dirname )"$(cd "$(dirname "$0"): pwd)")": pwd)
  3. spark-submit \
  4.     --master yarn \
  5.     --queue samshare \
  6.     --deploy-mode client \
  7.     --num-executors 100 \
  8.     --executor-memory 4G \
  9.     --executor-cores 4 \
  10.     --driver-memory 2G \
  11.     --driver-cores 2 \
  12.     --conf spark.default.parallelism=1000 \
  13.     --conf spark.yarn.executor.memoryOverhead=8G \
  14.     --conf spark.sql.shuffle.partitions=1000 \
  15.     --conf spark.network.timeout=1200 \
  16.     --conf spark.python.worker.memory=64m \
  17.     --conf spark.sql.catalogImplementation=hive \
  18.     --conf spark.sql.crossJoin.enabled=True \
  19.     --conf spark.dynamicAllocation.enabled=True \
  20.     --conf spark.shuffle.service.enabled=True \
  21.     --conf spark.scheduler.listenerbus.eventqueue.size=100000 \
  22.     --conf spark.pyspark.driver.python=python3 \
  23.     --conf spark.pyspark.python=python3 \
  24.     --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3 \
  25.     --conf spark.sql.pivotMaxValues=500000 \
  26.     --conf spark.hadoop.hive.exec.dynamic.partition=True \
  27.     --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict \
  28.     --conf spark.hadoop.hive.exec.max.dynamic.partitions.pernode=100000 \
  29.     --conf spark.hadoop.hive.exec.max.dynamic.partitions=100000 \
  30.     --conf spark.hadoop.hive.exec.max.created.files=100000 \
  31.     ${bashPath}/project_name/main.py $v_var1 $v_var2

3. 数据倾斜调优

相信我们对于数据倾斜并不陌生了,很多时间数据跑不出来有很大的概率就是出现了数据倾斜,在Spark开发中无法避免的也会遇到这类问题,而这不是一个崭新的问题,成熟的解决方案也是有蛮多的,今天来简单介绍一些比较常用并且有效的方案。

首先我们要知道,在Spark中比较容易出现倾斜的操作,主要集中在distinct、groupByKey、reduceByKey、aggregateByKey、join、repartition等,可以优先看这些操作的前后代码。而为什么使用了这些操作就容易导致数据倾斜呢?大多数情况就是进行操作的key分布不均,然后使得大量的数据集中在同一个处理节点上,从而发生了数据倾斜。

查看Key 分布
  1. # 针对Spark SQL
  2. hc.sql("select key, count(0) nums from table_name group by key")
  3. # 针对RDD
  4. RDD.countByKey()
Plan A: 过滤掉导致倾斜的key

这个方案并不是所有场景都可以使用的,需要结合业务逻辑来分析这个key到底还需要不需要,大多数情况可能就是一些异常值或者空串,这种就直接进行过滤就好了。

Plan B: 提前处理聚合

如果有些Spark应用场景需要频繁聚合数据,而数据key又少的,那么我们可以把这些存量数据先用hive算好(每天算一次),然后落到中间表,后续Spark应用直接用聚合好的表+新的数据进行二度聚合,效率会有很高的提升。

Plan C:调高shuffle并行度
  1. # 针对Spark SQL 
  2. --conf spark.sql.shuffle.partitions=1000  # 在配置信息中设置参数
  3. # 针对RDD
  4. rdd.reduceByKey(1000) # 默认是200
Plan D:分配随机数再聚合

大概的思路就是对一些大量出现的key,人工打散,从而可以利用多个task来增加任务并行度,以达到效率提升的目的,下面是代码demo,分别从RDD 和 SparkSQL来实现。

  1. # Way1: PySpark RDD实现
  2. import pyspark
  3. from pyspark import SparkContext, SparkConf, HiveContext
  4. from random import randint
  5. import pandas as pd
  6. # SparkSQL的许多功能封装在SparkSession的方法接口中, SparkContext则不行的。
  7. from pyspark.sql import SparkSession
  8. spark = SparkSession.builder \
  9.     .appName("sam_SamShare") \
  10.     .config("master""local[4]") \
  11.     .enableHiveSupport() \
  12.     .getOrCreate()
  13. conf = SparkConf().setAppName("test_SamShare").setMaster("local[4]")
  14. sc = SparkContext(conf=conf)
  15. hc = HiveContext(sc)
  16. # 分配随机数再聚合
  17. rdd1 = sc.parallelize([('sam'1), ('sam'1), ('sam'1), ('sam'1), ('sam'1), ('sam'1)])
  18. # 给key分配随机数后缀
  19. rdd2 = rdd1.map(lambda x: (x[0] + "_" + str(randint(1,5)), x[1]))
  20. print(rdd.take(10))
  21. # [('sam_5'1), ('sam_5'1), ('sam_3'1), ('sam_5'1), ('sam_5'1), ('sam_3'1)]
  22. # 局部聚合
  23. rdd3 = rdd2.reduceByKey(lambda x,y : (x+y))
  24. print(rdd3.take(10))
  25. # [('sam_5'4), ('sam_3'2)]
  26. # 去除后缀
  27. rdd4 = rdd3.map(lambda x: (x[0][:-2], x[1]))
  28. print(rdd4.take(10))
  29. # [('sam'4), ('sam'2)]
  30. # 全局聚合
  31. rdd5 = rdd4.reduceByKey(lambda x,y : (x+y))
  32. print(rdd5.take(10))
  33. # [('sam'6)]
  34. # Way2: PySpark SparkSQL实现
  35. df = pd.DataFrame(5*[['Sam'1],['Flora'1]],
  36.                   columns=['name''nums'])
  37. Spark_df = spark.createDataFrame(df)
  38. print(Spark_df.show(10))
  39. Spark_df.createOrReplaceTempView("tmp_table") # 注册为视图供SparkSQl使用
  40. sql = """
  41. with t1 as (
  42.     select concat(name,"_",int(10*rand())) as new_name, name, nums
  43.     from tmp_table
  44. ),
  45. t2 as (
  46.     select new_name, sum(nums) as n
  47.     from t1
  48.     group by new_name
  49. ),
  50. t3 as (
  51.     select substr(new_name,0,length(new_name) -2) as name, sum(n) as nums_sum 
  52.     from t2
  53.     group by substr(new_name,0,length(new_name) -2)
  54. )
  55. select *
  56. from t3
  57. """
  58. tt = hc.sql(sql).toPandas()
  59. tt

下面是原理图。


All Done!

????学习资源推荐:

1)《Spark性能优化指南——基础篇》

https://tech.meituan.com/2016/04/29/spark-tuning-basic.html

2)《Spark性能优化指南——高级篇》

https://tech.meituan.com/2016/05/12/spark-tuning-pro.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/66450
推荐阅读
相关标签
  

闽ICP备14008679号