当前位置:   article > 正文

Spark 【RDD基础编程(一)RDD的创建、转换操作】_rdd的创建 -java 第1关:集合并行化创建rdd

rdd的创建 -java 第1关:集合并行化创建rdd

RDD

简介

        在Spark中,RDD是弹性分布式数据集(Resilient Distributed Dataset)的缩写。通俗来讲,RDD是一种抽象的数据结构,用于表示分布式计算中的数据集合。它是Spark中最基本的数据模型,可以看作是一个不可变的、可分区、可并行处理的数据集合。这个数据集的全部或部分可以缓存在内存中,可在多次计算中重用。
        RDD是由一系列的记录(或元素)组成的,这些记录可以分散存储在集群的多个节点上,每个节点上的数据可以被并行处理。RDD提供了一系列的操作函数,例如map、reduce、filter等,可以对数据进行转换和计算。RDD的特点是具有容错性和弹性,即使在节点故障的情况下,也能自动恢复数据和计算过程。 

RDD编程基础

1、RDD 创建

Spark 通过 textFile() 从文件系统(本地系统、HDFS、集合)中加载数据来创建RDD。

1.1、从文件系统中加载数据创建 RDD

  1. import org.apache.spark.{SparkConf, SparkContext}
  2. object CreateRddByFileScala {
  3. def main(args: Array[String]): Unit = {
  4. //创建SparkContext对象
  5. val conf = new SparkConf()
  6. conf.setAppName("CreateRddByFileScala")
  7. .setMaster("local")
  8. val sc = new SparkContext(conf)
  9. //windows
  10. val path = "D:\\test\\data"
  11. //linux
  12. // val path = "file:///usr/local/test/data/"
  13. //读取文件数据,可以在textFile中生成的RDD分区数量
  14. val rdd = sc.textFile(path,2)
  15. //获取每一行数据的长度,计算文件内数据的总长度
  16. val length = rdd.map(_.length).reduce(_+_)
  17. println(length)
  18. //关闭SparkContext
  19. sc.stop()
  20. }
  21. }

1.2、从HDFS中加载数据

只需要修改路径如下:

  1. val path = "hadoop101:9000/test/"
  2. //读取文件数据,可以在textFile中生成的RDD分区数量
  3. val rdd = sc.textFile(path,2)

1.3、通过并行集合(数组)创建RDD

调用 SparkContext 的 parallelize() 方法,通过一个已经存在的集合(数组)来创建RDD。

  1. //创建SparkContext
  2. val conf = new SparkConf()
  3. conf.setAppName("CreateRddByArrayScala")
  4. .setMaster("local") //local表示在本地执行
  5. val sc = new SparkContext(conf)
  6. //创建集合
  7. val arr = Array(1,2,3,4,5)
  8. //基于集合创建RDD
  9. val rdd = sc.parallelize(arr)

2、RDD 操作

        RDD 的操作包括两种类型:转换操作和行动操作。其中,转换操作主要有map()、filter()、groupBy()、join()等,对RDD而言,每次转换都会产生一个新的RDD,供下一次操作使用。而行动操作(如count()、collect()等)返回的一般都是一个值。

2.1、转换操作

        RDD 的真个转换过程是采用惰性机制的,也就是说,整个转换过程只记录了转换的轨迹,并不会真正的运算,只有遇到行动操作才会触发从头到尾的真正计算。

1、filter(f: String => Boolean)

用法和Scala中的filter一致。

输入文档:

  1. Hadoop is good
  2. Spark is better
  3. Spark is fast
  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object RDDAction {
  4. def main(args: Array[String]): Unit = {
  5. // 创建 SparkContext 对象
  6. val conf = new SparkConf()
  7. conf.setAppName("filter-test").setMaster("local")
  8. val sc = new SparkContext(conf)
  9. // 通过加载数据创建RDD对象
  10. val rdd: RDD[String] = sc.textFile("data/word.txt")
  11. //filter 的参数是一个匿名函数 要求返回一个Boolean 类型的值 true-留下 false-过滤
  12. val lineWithSpark: RDD[String] = rdd.filter(line => {
  13. line.contains("Spark")
  14. })
  15. lineWithSpark.foreach(println)
  16. // 关闭sc对象
  17. sc.stop()
  18. }
  19. }

 运行结果:
 

  1. Spark is better
  2. Spark is fast
2、map()

同样和Scala中的map()用法一致。

  1. //省略创建AparkContext对象的代码...
  2. // 使用并行集合创建 RDD
  3. val arr = Array(1,2,3,4,5)
  4. val rdd1: RDD[Int] = sc.parallelize(arr)
  5. //转换操作
  6. val rdd2 = rdd1.map(num => num*2)
  7. rdd2.foreach(println)

运行结果:

  1. 2
  2. 4
  3. 6
  4. 8
  5. 10
  1. //使用本地文件作为数据加载创建RDD 对象
  2. val rdd1: RDD[String] = sc.textFile("data/word.txt")
  3. val rdd2: RDD[Array[String]] = rdd1.map(line => {
  4. line.split(" ")
  5. })

解析:

输入:

  1. Hadoop is good
  2. Spark is better
  3. Spark is fast

Spark 读取进来后,就变成了 RDD("Hadoop is good","Spark is better","Spark is fast"),我们知道,Scala中要进行扁平化操作的话,对象必须是一个多维数组,所以我们要通过 map() 对读取进来的格式进行处理,处理后的格式:RDD(Array("Hadoop is good"),Array("Spark is better"),Array("Spark is fast"))

RDD("Hadoop is good","Spark is better","Spark is fast") => RDD(Array("Hadoop is good"),Array("Spark is better"),Array("Spark is fast"))
3、flatMap()

和Scala中用法基本一样。

  1. //使用本地文件作为数据加载创建RDD 对象
  2. val rdd1: RDD[String] = sc.textFile("data/word.txt")
  3. val rdd2: RDD[String] = rdd1.flatMap(line => line.split(" "))

flatMap 的过程:

  1. RDD("Hadoop is good","Spark is better","Spark is fast")
  2. 先进行 map() =>
  3. RDD(Array("Hadoop is good"),Array("Spark is better"),Array("Spark is fast"))
  4. 在进行 flatten =>
  5. RDD("Hadoop","is",good","Spark","is","better","Spark","is","fast"))

扁平化后我们的数据又变为了一维集合的数据结构(RDD)了。

4、groupByKey()

        这个函数十分重要,上面我们得到了关于每次单词的一个RDD集合,现在我们要进行wordcount 的话肯定还需要对相同的键进行一个分类,这样会生成一个RDD集合(key:String,valut_list:Interable[Int])。

我们同样基于上面的结果进行操作:

  1. val rdd3: RDD[(String, Int)] = rdd2.map(word => {
  2. (word, 1)
  3. })
  4. //RDD(("Hadoop",1),("is",1),("good",1),("Spark",1),("is",1),("better",1),("Spark",1),("is",1),("fast",1)))
  5. val rdd4: RDD[(String, Iterable[Int])] = rdd3.groupByKey()
  6. //RDD(("Hadoop",1),("is",1,1,1),("good",1),("Spark",1,1),("better",1),("fast",1)))
5、reduceByKey()

需要注意的是,reduceByKey是对(key:String,value:Int)这种相同键值对元素的合并,而不是对上面groupByKey()的结果(key:String,value_list:Interable[Int])进行操作,这个粗心让我找了半天。

  1. //rdd5和6效果都一样
  2. val rdd5: RDD[(String,Int)] = rdd4.map(t => {
  3. (t._1, t._2.size)
  4. })
  5. //RDD(("Hadoop",1),("is",3),("good",1),("Spark",2),("better",1),("fast",1)))
  6. // rdd3.reduceByKey((v1,v2)=>v1+v2) //v1 v2代表发现key相同的键值对的值 参数按照顺序在函数体中只出现了一次 那么可以用下划线代替
  7. val rdd6: RDD[(String, Int)] = rdd3.reduceByKey(_ + _)
  8. //RDD(("Hadoop",1),("is",3),("good",1),("Spark",2),("better",1),("fast",1)))
  9. //打印结果
  10. rdd6.foreach(println)

运行结果:
 

  1. (Spark,2)
  2. (is,3)
  3. (fast,1)
  4. (good,1)
  5. (better,1)
  6. (Hadoop,1)
  7. Process finished with exit code 0

总结

剩下的RDD转换操作下午再新开一篇,以及RDD的行动操作篇、持久化、分区和综合实例后续更新。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小舞很执着/article/detail/749263
推荐阅读
相关标签
  

闽ICP备14008679号