赞
踩
使用Scala做自定义分区,针对想要指定组合的分区情况
一种可以用sql语句简单快捷,另一种就是自定义分区,大家常常鲜
```java import org.apache.spark.{Partitioner, SparkConf, SparkContext} import scala.collection.mutable object teacher_test2 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("t_test2").setMaster("local[*]") val sc = new SparkContext(conf) val rdd = sc.textFile("C:\\Users\\Administrator\\Desktop\\teacher.txt") ***//数据:http://bigdata.baway.cn/shifo*** val sub_teardd = rdd.map(line => { val fields = line.split("/") val sub = fields(2).split("[.]")(0) val teacher = fields(3) ((sub, teacher), 1) }) val rdd2 = sub_teardd.reduceByKey(_+_) val subarr = rdd2.map(_._1._1).distinct().collect() *//实例化myPartition* val partition = new myPartition(subarr) val partitioned = rdd2.partitionBy(partition) val res = partitioned.mapPartitions(it => { it.toList.sortBy(_._2).reverse.take(2).toIterator }) println(res.collect().toBuffer) } } class myPartition(subs:Array[String]) extends Partitioner{ val rules = new mutable.HashMap[String,Int]() var i = 0 for (sub<-subs){ rules.put(sub,i) i += 1 } ***//分区数量*** override def numPartitions = subs.length ***//获取分区*** override def getPartition(key: Any): Int = { val subsject = key.asInstanceOf[(String,String)]._1 rules(subsject) } }
**这是我敲的自定义分区,有不满意的请指点一二**
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。