赞
踩
object ActionRDDScala { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("ActionRDDScala").setMaster("local") val sc = new SparkContext(conf) val rdd = sc.parallelize(List(1,2,3,3)) //返回第一个元素 println(rdd.first()) println("----------------------") //rdd.take(n)返回前n个元素 rdd.take(2).foreach(println) println("----------------------") //rdd.collect() 返回 RDD 中的所有元素 rdd.collect.foreach(println) println("----------------------") //rdd.count() 返回 RDD 中的元素个数 println(rdd.count()) println("----------------------") //各元素在 RDD 中出现的次数 返回{(key1,次数),(key2,次数),…(keyn,次数)} rdd.countByValue().foreach(println) println("----------------------") //rdd.reduce(func) //并行整合RDD中所有数据, 类似于是scala中集合的reduce println(rdd.reduce(_ + _)) println("----------------------") //rdd.fold(num)(func) 一般不用这个函数 //和 reduce() 一 样, 但是提供了初始值num,每个元素计算时,先要合这个初始值进行折叠, 注意,这里会按照每个分区进行fold,然后分区之间还会再次进行fold //提供初始值 println(rdd.fold(1)(_ + _)) println("----------------------") //rdd.top(n) //按照降序的或者指定的排序规则,返回前n个元素 rdd.top(2).foreach(println) println("----------------------") //rdd.take(n) //对RDD元素进行升序排序,取出前n个元素并返回,也可以自定义比较器(这里不介绍),类似于top的相反的方法 rdd.takeOrdered(2) println("----------------------") //对 RDD 中的每个元素使用给 //定的函数 rdd.foreach(println) println("----------------------") } }
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.VoidFunction; import java.util.Arrays; public class ActionRDDJava { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("ActionRDDJava").setMaster("local[3]"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 3)); Integer firstRDD = rdd.first(); System.out.println(firstRDD); System.out.println("------------------"); System.out.println(rdd.take(2)); System.out.println("------------------"); System.out.println(rdd.collect()); System.out.println("------------------"); System.out.println(rdd.count()); System.out.println("------------------"); System.out.println(rdd.countByValue()); System.out.println("------------------"); Integer reduceRDD = rdd.reduce(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); System.out.println(reduceRDD); System.out.println("------------------"); Integer foldRDD = rdd.fold(1,new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); System.out.println(foldRDD); System.out.println("------------------"); System.out.println(rdd.top(2)); System.out.println("------------------"); System.out.println(rdd.takeOrdered(2)); System.out.println("------------------"); rdd.foreach(new VoidFunction<Integer>() { @Override public void call(Integer integer) throws Exception { System.out.print(integer); } }); } }
object ActionPairRDDScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("ActionPairRDDScala")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(Array((1,2),(2,4),(2,5),(3,4),(3,5),(3,6)))
println(rdd.countByKey())
println(rdd.collectAsMap())
}
}
import com.google.common.collect.Lists; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; import java.util.Map; public class ActionPairRDDJava { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local").setAppName("ActionPairRDDJava"); JavaSparkContext sc = new JavaSparkContext(conf); //一步到位JavaPairRDD JavaPairRDD<Integer,Integer> tupRDD = sc.parallelizePairs(Lists.newArrayList(new Tuple2<Integer,Integer>(1, 2), new Tuple2<Integer,Integer>(2, 4), new Tuple2<Integer,Integer>(2, 5), new Tuple2<Integer,Integer>(3, 4), new Tuple2<Integer,Integer>(3, 5), new Tuple2<Integer,Integer>(3, 6) )); // JavaPairRDD<Integer, Integer> mapRDD = JavaPairRDD.fromJavaRDD(tupRDD); //不好用 //countByKey Map<Integer, Long> countByKeyRDD =tupRDD.countByKey(); for (Integer key : countByKeyRDD.keySet()) { System.out.println(key+","+countByKeyRDD.get(key)); } System.out.println("------------------------"); //collectAsMap 打印输出方式一 Map<Integer, Integer> collectRDD = tupRDD.collectAsMap(); for (Map.Entry<Integer,Integer> entry : collectRDD.entrySet()) { System.out.println(entry.getKey()+"->"+entry.getValue()); } //collectAsMap 打印输出方式二 System.out.println("-----------------"); Map<Integer, Integer> collectRDD1 = tupRDD.collectAsMap(); for (Integer key : collectRDD1.keySet()) { System.out.println(key+","+collectRDD1.get(key)); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。