当前位置:   article > 正文

SparkRDD算子(四) Action操作take, collect, count, countByValue, reduce, fold,top,countByKey,collectAsMap_spark rdd take

spark rdd take

Spark RDD算子(四)基本的Action操作 first, take, collect, count, countByValue, reduce, fold,top

1. first,take, collect, count,countByValue, reduce, fold,top

scala版本
object ActionRDDScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("ActionRDDScala").setMaster("local")
    val sc = new SparkContext(conf)

    val rdd = sc.parallelize(List(1,2,3,3))

   //返回第一个元素 
    println(rdd.first())
    println("----------------------")

   //rdd.take(n)返回前n个元素 
    rdd.take(2).foreach(println)
    println("----------------------")

   //rdd.collect() 返回 RDD 中的所有元素 
    rdd.collect.foreach(println)
    println("----------------------")

   //rdd.count() 返回 RDD 中的元素个数 
    println(rdd.count())
    println("----------------------")


	//各元素在 RDD 中出现的次数 返回{(key1,次数),(key2,次数),…(keyn,次数)} 
    rdd.countByValue().foreach(println)
    println("----------------------")


	//rdd.reduce(func) 
	//并行整合RDD中所有数据, 类似于是scala中集合的reduce 
    println(rdd.reduce(_ + _))
    println("----------------------")

	//rdd.fold(num)(func) 一般不用这个函数 
	//和 reduce() 一 样, 但是提供了初始值num,每个元素计算时,先要合这个初始值进行折叠, 注意,这里会按照每个分区进行fold,然后分区之间还会再次进行fold 
	//提供初始值 
    println(rdd.fold(1)(_ + _))
    println("----------------------")

	//rdd.top(n) 
	//按照降序的或者指定的排序规则,返回前n个元素 
    rdd.top(2).foreach(println)
    println("----------------------")
	
	//rdd.take(n) 
	//对RDD元素进行升序排序,取出前n个元素并返回,也可以自定义比较器(这里不介绍),类似于top的相反的方法 
    rdd.takeOrdered(2)
    println("----------------------")

	//对 RDD 中的每个元素使用给 
	//定的函数 
    rdd.foreach(println)
    println("----------------------")
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
java版本
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import java.util.Arrays;


public class ActionRDDJava {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("ActionRDDJava").setMaster("local[3]");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 3));

        Integer firstRDD = rdd.first();
        System.out.println(firstRDD);
        System.out.println("------------------");

        System.out.println(rdd.take(2));
        System.out.println("------------------");

        System.out.println(rdd.collect());
        System.out.println("------------------");

        System.out.println(rdd.count());
        System.out.println("------------------");

        System.out.println(rdd.countByValue());
        System.out.println("------------------");

        Integer reduceRDD = rdd.reduce(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        System.out.println(reduceRDD);
        System.out.println("------------------");

        Integer foldRDD = rdd.fold(1,new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        System.out.println(foldRDD);
        System.out.println("------------------");

        System.out.println(rdd.top(2));
        System.out.println("------------------");

        System.out.println(rdd.takeOrdered(2));
        System.out.println("------------------");

        rdd.foreach(new VoidFunction<Integer>() {
            @Override
            public void call(Integer integer) throws Exception {
                System.out.print(integer);
            }
        });
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63

2. countByKey, collectAsMap

scala版本
object ActionPairRDDScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("ActionPairRDDScala")
    val sc = new SparkContext(conf)

    val rdd = sc.parallelize(Array((1,2),(2,4),(2,5),(3,4),(3,5),(3,6)))
    println(rdd.countByKey())
    println(rdd.collectAsMap())
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
java版本
import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Map;


public class ActionPairRDDJava {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("ActionPairRDDJava");
        JavaSparkContext sc = new JavaSparkContext(conf);

        //一步到位JavaPairRDD
        JavaPairRDD<Integer,Integer> tupRDD = sc.parallelizePairs(Lists.newArrayList(new Tuple2<Integer,Integer>(1, 2),
                new Tuple2<Integer,Integer>(2, 4),
                new Tuple2<Integer,Integer>(2, 5),
                new Tuple2<Integer,Integer>(3, 4),
                new Tuple2<Integer,Integer>(3, 5),
                new Tuple2<Integer,Integer>(3, 6)
        ));

//        JavaPairRDD<Integer, Integer> mapRDD = JavaPairRDD.fromJavaRDD(tupRDD);   //不好用

        //countByKey
        Map<Integer, Long> countByKeyRDD =tupRDD.countByKey();
        for (Integer key :
                countByKeyRDD.keySet()) {
            System.out.println(key+","+countByKeyRDD.get(key));
        }
        System.out.println("------------------------");

        //collectAsMap 打印输出方式一
        Map<Integer, Integer> collectRDD = tupRDD.collectAsMap();
        for (Map.Entry<Integer,Integer> entry :
                collectRDD.entrySet()) {
            System.out.println(entry.getKey()+"->"+entry.getValue());
        }

        //collectAsMap  打印输出方式二
        System.out.println("-----------------");
        Map<Integer, Integer> collectRDD1 = tupRDD.collectAsMap();
        for (Integer key :
                collectRDD1.keySet()) {
            System.out.println(key+","+collectRDD1.get(key));
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/码创造者/article/detail/736386
推荐阅读
相关标签
  

闽ICP备14008679号