当前位置:   article > 正文

Spark编程基础(Python版)RDD编程_提交python编写的spark rdd程序需要使用什么命令()

提交python编写的spark rdd程序需要使用什么命令()

一、创建RDD

1、textFile(url)从文件系统中加载数据创建RDD:

url可以是本地文件系统的地址,也可以是分布式文件系统DFS,亦或是Amazon S3地址。

  1. from pyspark import SparkContext,SparkConf
  2. #从本地文件系统
  3. lines = sc.textFile("file:///root/class/score.txt")
  4. #从分布式文件系统,下面三种写法等价
  5. lines = sc.textFile("hdfs://localhost:9000/usr/local/score.txt")
  6. lines = sc.textFile("/usr/local/score.txt")
  7. lines = sc.textFile("score.txt")

 

        上面本地文件系统  file:///root/class/score.txt    中file://相当于网页http:// ,而后面 /root是绝对地址。

2、通过并行集合创建RDD

        parallelize()从一个已存在的集合(列表)上创建RDD

  1. array = [1,2,3,4]
  2. rdd = sc.parallelize(array)
  3. rdd.collect()

二、RDD操作

        rdd的操作分为转换(transformation)以及行动(action)。

1、转换(transformation)

        对于RDD而言,每一次转换操作都会产生不同的RDD,供给下一个“转换”使用 转换得到的RDD是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作。

         1、1 filter(func):筛选出满足函数func的元素,并返回一个新的数据

  1. lines = sc.textFile("file:///usr/local/word.txt")
  2. lines_1 = lines.filter(lambda line: "Spark" in line)
  3. lines_1.collect()

        1、2 map(func):操作将每个元素传递到函数func中,并将结果返回为一个新的数据集

  1. data = [1,2,3,4,5]
  2. rdd1 = sc.parallelize(data)
  3. rdd2 = rdd1.map(lambda x:x+10)
  4. rdd2.collect()

        1、3 flatMap(func)

  1. #map()
  2. lines = sc.textFile("file:///usr/local/word.txt")
  3. words = lines.map(lambda line:line.split(" "))
  4. words.collect()
  5. #flatMap()
  6. words = lines.flatMap(lambda line:line.split(" "))
  7. words.collect()

         1、4 groupByKey():应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集

  1. words = sc.parallelize([("Hadoop",1),("is",1),("good",1), \
  2. ... ("Spark",1),("is",1),("fast",1),("Spark",1),("is",1),("better",1)])
  3. words1 = words.groupByKey()
  4. words1.collcet()
  5. #输出
  6. ('Hadoop', <pyspark.resultiterable.ResultIterable object at 0x7fb210552c88>)
  7. ('better', <pyspark.resultiterable.ResultIterable object at 0x7fb210552e80>)
  8. ('fast', <pyspark.resultiterable.ResultIterable object at 0x7fb210552c88>)
  9. ('good', <pyspark.resultiterable.ResultIterable object at 0x7fb210552c88>)
  10. ('Spark', <pyspark.resultiterable.ResultIterable object at 0x7fb210552f98>)
  11. ('is', <pyspark.resultiterable.ResultIterable object at 0x7fb210552e10>)

        可以这么理解, 例如(‘is',1)、(‘is',1)、(‘is',1)三个元素的key一样,groupByKey()将三个进行合并生成 (’is',(1,1,1)),这个时候key为is,value为(1,1,1),最后将vlaue封装成Iterable对象(可迭代对象)。

        1、6 reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合后得到的结果

  1. words = sc.parallelize([("Hadoop",1),("is",1),("good",1),("Spark",1), \
  2. ... ("is",1),("fast",1),("Spark",1),("is",1),("better",1)])
  3. \words1 = words.reduceByKey(lambda a,b:a+b)
  4. words1.collect()
  5. #输出
  6. ('good', 1)
  7. ('Hadoop', 1)
  8. ('better', 1)
  9. ('Spark', 2)
  10. ('fast', 1)
  11. ('is', 3)

 2、行动(action)

        行动操作是真正触发计算的地方。Spark程序执行到行动操作时,才会执行真正的计算,从文件中加载数据,完成一次又一次转换操作,最终,完成行动操作得到结果。

  1. rdd = sc.parallelize([1,2,3,4,5])
  2. rdd.count() #输出:5
  3. rdd.first() #输出:1
  4. rdd.take(3) #输出:[1, 2, 3]
  5. rdd.reduce(lambda a,b:a+b) #输出:15
  6. rdd.collect() #输出:[1, 2, 3, 4, 5]
  7. rdd.foreach(print)
  8. #输出:
  9. 1
  10. 2
  11. 3
  12. 4
  13. 5

 三、持久化

        在Spark中,RDD采用惰性求值的机制,每次遇到行动操作,都会从头开始执行计算。每次调用行动操作,都会触发一次从头开始的计算。这对于迭代计算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据。

  1. list = ["Hadoop","Spark","Hive"]
  2. rdd = sc.parallelize(list)
  3. print(rdd.count()) //行动操作,触发一次真正从头到尾的计算 3
  4. print(','.join(rdd.collect())) //行动操作,触发一次真正从头到尾的计算 Hadoop,Spark,Hive

        可以通过持久化(缓存)机制避免这种重复计算的开销

        可以使用persist()方法对一个RDD标记为持久化 。之所以说“标记为持久化”,是因为出现persist()语句的地方,并不会马上计算生成RDD并把它持久化,而是要等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化 持久化后的RDD将会被保留在计算节点的内存中被后面的行动操作重复使用。

四、分区

        1、分区的作用

  •  增加并行度
  • 减少通信开销

        2、分区原则

        RDD分区的一个原则是使得分区的个数尽量等于集群中的CPU核心(core)数目

        对于不同的Spark部署模式而言(本地模式、Standalone模式、YARN模式、Mesos模式),都可以通过设spark.default.parallelism这个参数的值,来配置默认的分区数目,一般而言:

         *本地模式:默认为本地机器的CPU数目,若设置了local[N],则默认为N

         *Apache Mesos:默认的分区数为8

        *Standalone或YARN:在“集群中所有CPU核心数目总和”和“2”二者中取较大值作为默认值

        3、设置分区个数

        3、1  创建RDD时手动指定分区个数

        在调用textFile()和parallelize()方法的时候手动指定分区个数即可,语法格式如下: sc.textFile(path, partitionNum) 其中,path参数用于指定要加载的文件的地址,partitionNum参数用于指定分区个数。

        3、2 使用reparititon方法重新设置分区个数

        通过转换操作得到新 RDD 时,直接调用 repartition 方法即可。例如:

  1. rdd_1 = sc.parallelize([1,2,4,5,6,7],3) #第一种方法
  2. rdd_1.glom().collect() #读取数据
  3. #输出: [[1, 2], [4, 5], [6, 7]]
  4. rdd_2 = rdd_1.repartition(1) #第二种方法
  5. rdd_2.glom().collect()
  6. #输出: [[1, 2, 4, 5, 6, 7]]
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/木道寻08/article/detail/887609
推荐阅读
相关标签
  

闽ICP备14008679号