当前位置:   article > 正文

浅析Spark的RangePartitioner_spark rangepartition

spark rangepartition

Spark版本推进到2.0,其在shuffle时提供的partitioner主要有两种,HashPartitioner与RangePartitioner。本文主要对RangePartitioner的实现作一简单说明。
 partitioner主要实现两个方法

  def numPartitions: Int
  def getPartition(key: Any): Int
  • 1
  • 2

RangePartitioner实现这两方法主要依赖于数组变量rangeBounds: Array[K] ,数组存放的是排序好的(K类型进行的排序)一序列K值,根据这些值来确定RDD中每一个元素shuffle后的存放的partition,下面来看一下rangeBounds获取的代码

  private var rangeBounds: Array[K] = {
    if (partitions <= 1) {
      Array.empty
    } else {
      // This is the sample size we need to have roughly balanced output partitions, capped at 1M.
      val sampleSize = math.min(20.0 * partitions, 1e6)
      // Assume the input partitions are roughly balanced and over-sample a little bit.
      val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt
      val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
      if (numItems == 0L) {
        Array.empty
      } else {
        // If a partition contains much more than the average number of items, we re-sample from it
        // to ensure that enough items are collected from that partition.
        val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
        val candidates = ArrayBuffer.empty[(K, Float)]
        val imbalancedPartitions = mutable.Set.empty[Int]
        sketched.foreach { case (idx, n, sample) =>
          if (fraction * n > sampleSizePerPartition) {
            imbalancedPartitions += idx
          } else {
            // The weight is 1 over the sampling probability.
            val weight = (n.toDouble / sample.size).toFloat
            for (key <- sample) {
              candidates += ((key, weight))
            }
          }
        }
        if (imbalancedPartitions.nonEmpty) {
          // Re-sample imbalanced partitions with the desired sampling probability.
          val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
          val seed = byteswap32(-rdd.id - 1)
          val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
          val weight = (1.0 / fraction).toFloat
          candidates ++= reSampled.map(x => (x, weight))
        }
        RangePartitioner.determineBounds(candidates, partitions)
      }
    }
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

 排序界线是通过抽样的方式来确定的,也就是在RDD中抽取一下样本,然后根据该样本来确定rangeBounds。具体步骤如下:
1、先确定每个partition中要抽取的样本数量,方式如下:

      val sampleSize = math.min(20.0 * partitions, 1e6)
      val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt
  • 1
  • 2

该值由shuffle后的partitions与当前rdd的partitions数量共同决定,由于样本都要抽取到driver中进行计算,1e6可以确保不会发生OOM。
2、对于抽取过来的样本,(Long, Array[(Int, Long, Array[K])]) 会以该种格式返回,信息包括RDD元素的个数,partitionId,partition中元素的个数及抽取的样本数组。
3、由个各partition中抽取上来的样本,会计算一下val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)因子,该因子主要描述每个partition的大小,及如果一个partition中元素过多,该因子的值就会偏大,通过该因子判断,如果一个partition中的元素过多,会对其进行重新的抽样。最后每个样本都会有一个权重值,该值与partition相关,及同一个partition抽取的样本的权重是一样的。

val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
val seed = byteswap32(-rdd.id - 1)
val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
  • 1
  • 2
  • 3

4、把所有的样本进行排序并计算出一个平均权重,通过不断递增的权重与样本的权重做比较,来筛选出rangeBounds数组中的值。该做法主要是要保证数组中的值多数是来源于权重大的partition。

通过以上步骤来筛选出想要的rangeBounds,并通过rangeBounds完成numPartitions与getPartition(key: Any)方法。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/727478
推荐阅读
相关标签
  

闽ICP备14008679号