赞
踩
参考文章:spark的RDD中的action(执行)和transformation(转换)两种操作中常见函数介绍
返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
将func函数作用到数据集的每个元素,生成一个新的分布式的数据集并返回
- // 定义
- def map[U: ClassTag](f: T => U): RDD[U]
-
- // 示例
- >>> a = sc.parallelize(('a', 'b', 'c'))
- >>> a.map(lambda x: x+'1').collect()
- ['a1', 'b1', 'c1']
与map相似,但是mapPartitions的输入函数单独作用于RDD的每个分区(block)上,
因此func的输入和返回值都必须是迭代器iterator。
类似于map,但独立地在RDD的每一个分区(block)上运行,因此在类型为T的RDD上运行时,
func的函数类型必须是Iterator[T] => Iterator[U]。
假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,
一个函数一次处理所有分区对于一个分区中的所有数据执行一个函数,性能比map要高。
例如:假设RDD有十个元素0~9,分成三个区,使用mapPartitions返回每个元素的平方。
如果使用map方法,map中的输入函数会被调用10次,而使用mapPartitions方法,
输入函数只会被调用3次,每个分区被调用1次(partition内部自定义函数分别会被调用3,3,4次)。
- >>> def squareFunc(a):
- . . . for i in a:
- . . . yield i*i
- . . .
-
- >>> a = sc.parallelize(range(10), 3)
- PythonRDD[1] at RDD at PythonRDD.scala:48
-
- >>> a.mapPartitions(squareFunc).collect()
- [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
与mapPartitions相似, 但是输入函数func提供了一个正式的参数,
可以用来表示分区的编号。
类似于mapPartitions, 但func带有一个整数参数表示分片的索引值,
因此在类型为T的RDD上运行时, func的函数类型必须是(Int, Interator[T]) => Iterator[U]
注意:map():每次处理一条数据。mapPartitions():每次处理一个分区的数据,
这个分区的数据处理完后,原RDD中分区的数据才能释放,可能导致OOM。
因此当内存空间较大的时候建议使用mapPartition(),以提高处理效率。
- // 定义
- def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T])
- => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
-
- // 示例
- >>> def func(index, iterator): # 返回每个分区的编号和数值
- . . . yield (‘index ‘ + str(index) + ’ is: ‘ + str(list(iterator)))
- . . .
- >>> a = sc.parallelize(range(10),3)
- >>> a.mapPartitionsWithIndex(func).collect()
- ['index 0 is: [0, 1, 2]', 'index 1 is: [3, 4, 5]', 'index 2 is: [6, 7, 8, 9]']
- >>> def squareIndex(index, iterator): # 返回每个数值所属分区的编号和数值的平方
- ... for i in iterator:
- ... yield ("The index is: " + str(index) + ", and the square is: "
- + str(i*i))
- ...
- >>> a.mapPartitionsWithIndex(squareIndex).collect()
- ['The index is: 0, and the square is: 0', 'The index is: 0, and the square is: 1',
- 'The index is: 1, and the square is: 4', 'The index is: 1, and the square is: 9',
- 'The index is: 1, and the square is: 16', 'The index is: 2, and the square is: 25',
- 'The index is: 2, and the square is: 36', 'The index is: 3, and the square is: 49',
- 'The index is: 3, and the square is: 64', 'The index is: 3, and the square is: 81']

与map相似, 但是每个输入的item能够被map到0个或多个items输出,
也就是说func的返回值应当是一个Sequence, 而不是一个单独的item
类似于map, 但是每一个输入元素可以被映射为0或多个输出元素
(所以func应该返回一个序列, 而不是单一元素)
- // 定义
- def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
-
- // 示例
- >>> l = ['I am Tom', 'She is Jenny', 'He is Ben']
- >>> a = sc.parallelize(l,3)
- >>> a.flatMap(lambda line: line.split()).collect() # 将每个字符串中的单词划分出来
- ['I', 'am', 'Tom', 'She', 'is', 'Jenny', 'He', 'is', 'Ben']
将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]
def glom(): RDD[Array[T]]
groupBy算子接收一个函数,这个函数返回的值作为key,然后通过这个key来对里面的元素进行分组。
- // 语法
- def groupBy[K: ClassTag](f: T => K): RDD[(K, Iterable[T])]
示例:
- // 示例一
- val a = sc.parallelize(1 to 9, 3)
-
- a.groupBy(x => { if (x % 2 == 0) "new" else "old" }).collect
- // 返回的new或者old字符串作为key来group RDD里面的值,
- res42: Array[(String, Seq[Int])] = Array((new,ArrayBuffer(2, 4, 6, 8)), (old,ArrayBuffer(1, 3, 5, 7, 9)))
-
-
- // 示例二
- scala> val rdd = sc.parallelize(1 to 4)
- rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24
-
- scala> val group = rdd.groupBy(_%2)
- group: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[2] at groupBy at <console>:26
-
- scala> group.collect
- res0: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4)), (1,CompactBuffer(1, 3)))

过滤。返回一个新的RDD, 该RDD由经过func函数计算后返回值为true的输入元素组成
选出所有func返回值为true的元素, 作为一个新的数据集返回
- // 定义
- def filter(f: T => Boolean): RDD[T]
-
- // 示例
- >>> a = sc.parallelize(range(10))
- >>> a.filter(lambda x: x%2==0).collect() # 选出0-9的偶数
- [0, 2, 4, 6, 8]
以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样,seed用于指定随机数生成器种子。例子从RDD中随机且有放回的抽出50%的数据,随机种子值为3(即可能以1 2 3的其中一个起始值)比如当前时间戳,作用:在数据倾斜的时候,我们那么多数据如果想知道那个key倾斜了,就需要我们采样获取这些key,如果这些key数据不是很重要的话,可以过滤掉,这样就解决了数据倾斜。
从数据中抽样,withReplacement表示是否有放回,withReplacement=true表示有放回抽样,fraction为抽样的概率(0<=fraction<=1),seed为随机种子。
例如:从1-100之间抽取样本,被抽取为样本的概率为0.2
- >>> data = sc.parallelize(range(1,101),2)
- >>> sample = data.sample(True, 0.2)
- >>> sampleData.count()
- 19
- >>> sampleData.collect()
- [16, 19, 24, 29, 32, 33, 44, 45, 55, 56, 56, 57, 65, 65, 73, 83, 84, 92, 96]
!!!注意,Spark中的sample抽样,当withReplacement=True时,相当于采用的是泊松抽样;
当withReplacement=False时,相当于采用伯努利抽样,
fraction并不是表示抽样得到的样本占原来数据总量的百分比,而是一个元素被抽取为样本的概率。
fraction=0.2并不是说明要抽出100个数字中20%的数据作为样本,而是每个数字被抽取为样本的概率为0.2,
这些数字被认为来自同一总体,样本的大小并不是固定的,而是服从二项分布。
去除数据集中的重复元素。
对源RDD进行去重后返回一个新的RDD. 默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTasks参数改变它。
- # 定义
- def distinct(): RDD[T]
-
- >>> data1 = sc.parallelize(range(10))
- >>> data2 = sc.parallelize(range(6,15))
- >>> data1.union(data2).distinct().collect()
- [0, 8, 1, 9, 2, 10, 11, 3, 12, 4, 5, 13, 14, 6, 7]
-
- # 更改并行任务数为2
- >>> data1.union(data2).distinct(2).collect()
- [0, 8, 1, 9, 2, 10, 11, 3, 12, 4, 5, 13, 14, 6, 7]
下边的一系列transactions会用到键(Key)这一概念,在进行下列有关Key操作时使用的数据集为记录伦敦各个片区(英文称为ward)中学校和学生人数相关信息的表格,下载地址:
https://data.london.gov.uk/dataset/london-schools-atlas/resource/64f771ee-38b1-4eff-8cd2-e9ba31b90685#
下载后将其中命名为WardtoSecSchool_LDS_2015的sheet里边的数据保存为csv格式,删除第一行的表头,并重新命名为school.csv
数据格式为:
(Ward_CODE, Ward_NAME, TotalWardPupils, Ward2Sec_Flow_No., Secondary_School_URN, Secondary_School_Name, Pupil_count)
首先对数据进行一些预处理:
- >>> school = sc.textFile("file:///home/yang/下载/school.csv")
- Data = sc.textFile("file:///home/yang/下载/school.csv")
- >>> school.count() # 共有16796行数据
- 16796
- >>> import re # 引入python的正则表达式包
- >>> rows = school.map(lambda line: re.subn(',[\s]+',': ', line))
注意:
1. 从本地读取数据时,代码中要通过 “file://” 前缀指定读取本地文件。Spark shell 默认是读取 HDFS 中的文件,需要先上传文件到 HDFS 中,否则会有“org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/user/hadoop/school.csv”的错误。
2. 对数据集进行了一下预处理,利用正则匹配替换字符串,由于一些学校的名字的字符串中本身含有逗号,比如“The City Academy, Hackney”, 此时如果利用csv的分隔符’,’进行分割,并不能将名字分割为“The City Academy”和“Hackney”。我们注意到csv的分隔符逗号后边是没有空格的,而名字里边的逗号后边都会有空格(英语书写习惯),因此,先利用re.subn语句对逗号后边含有至少一个空格(正则表达式为’,[\s]+’)的子字符串进行替换,替换为’: ’,然后再进行后续操作。以上即为对这一数据集的预处理过程。
将RDD的分区数减小到numPartitions个。当数据集通过过滤规模减小时,使用这个操作可以提升性能。
缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。
- def coalesce(numPartitions: Int, shuffle: Boolean = false,partitionCoalescer:
- Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null) : RDD[T]
重组数据,数据被重新随机分区为numPartitions个,numPartitions可以比原来大,也可以比原来小,平衡各个分区。
这一操作会将整个数据集在网络中重新洗牌。
根据分区数,重新通过网络随机洗牌所有数据。
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
注意:
coalesce和repartition的区别
a)coalesce重新分区,可以选择是否进行shuffle过程
由参数shuffle: Boolean = false/true决定
b)repartition实际上是调用的coalesce,进行shuffle
源码如下:
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
与sortByKey类似,但是更灵活,可以用func先对数据进行处理,按照处理后的数据比较结果排序。默认为正序。
- # RDD[T] 根据f函数提供可以排序的key
- def sortBy[K]( f: (T) => K, ascending: Boolean = true, numPartitions:
- Int = this.partitions.length) (implicit ord: Ordering[K],
- ctag: ClassTag[K]):
将驱动程序中的RDD交给shell处理(外部进程),例如Perl或bash脚本。
RDD元素作为标准输入传给脚本,脚本处理之后的标准输出会作为新的RDD返回给驱动程序。
对于每个分区,都执行一个perl或者shell脚本,返回输出的RDD
- // 定义
- pipe(command, [envVars])
-
- // 示例
- // Shell脚本
- #!/bin/sh
- echo "AA"
- while read LINE; do
- echo ">>>"${LINE}
- done
-
- // 调用
- scala> val rdd = sc.parallelize(List("how", "are", "you"), 1)
- scala> rdd.pipe("./pipe.sh").collect()
- res6: Array[String] = Array(AA, >>>how, >>>are, >>>you)
根据给定的partitioner函数重新将RDD分区,并在分区内排序。
这比先repartition然后在分区内sort高效,
原因是这样迫使排序操作被移到了shuffle阶段。
作用于键值对(K, V)和(K, W)上,返回元组 (K, (Iterable, Iterable))。这一操作可叫做groupWith
- >>> class1 = sc.parallelize(('Tom', 'Jenny', 'Bob'))
- .map(lambda a: (a, 'attended'))
- >>> class2 = sc.parallelize(('Tom', 'Amy', 'Alice', 'John'))
- .map(lambda a: (a, 'attended'))
- >>> group = class1.cogroup(class2)
- >>> group.collect()
- [('John', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e808afd0>,
- <pyspark.resultiterable.ResultIterable object at 0x7fb7e808a1d0>)),
- ('Tom', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e808a7f0>,
- <pyspark.resultiterable.ResultIterable object at 0x7fb7e808a048>)),
- ('Jenny', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e808a9b0>,
- <pyspark.resultiterable.ResultIterable object at 0x7fb7e808a208>)),
- ('Bob', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e808ae80>,
- <pyspark.resultiterable.ResultIterable object at 0x7fb7e8b448d0>)),
- ('Amy', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e8b44c88>,
- <pyspark.resultiterable.ResultIterable object at 0x7fb7e8b44588>)),
- ('Alice', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e8b44748>,
- <pyspark.resultiterable.ResultIterable object at 0x7fb7e8b44f98>))]
-
- >>> group.map(lambda x: {x[0]: [list(x[1][0]), list(x[1][1])]}).collect()
- [{'John': [[], ['attended']]}, {'Tom': [['attended'], ['attended']]},
- {'Jenny': [['attended'], []]}, {'Bob': [['attended'], []]},
- {'Amy': [[], ['attended']]}, {'Alice': [[], ['attended']]}]

并集操作, 将源数据集与union中的输入数据集取并集,
默认保留重复元素(如果不保留重复元素, 可以利用distinct操作去除,下边介绍distinct时会介绍)。
对源RDD和参数RDD求并集后返回一个新的RDD
- # 定义
- def union(other: RDD[T]): RDD[T]
-
- >>> data1 = sc.parallelize(range(10))
- >>> data2 = sc.parallelize(range(6,15))
- >>> data1.union(data2).collect()
- [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 6, 7, 8, 9, 10, 11, 12, 13, 14]
计算差的一种函数去除两个RDD中相同的元素,不同的RDD将保留下来
- # 定义
- def subtract(other: RDD[T]): RDD[T]
交集操作, 将源数据集与union中的输入数据集取交集, 并返回新的数据集。
对源RDD和参数RDD求交集后返回一个新的RDD
- # 定义
- def intersection(other: RDD[T]): RDD[T]
-
- >>> data1 = sc.parallelize(range(10))
- >>> data2 = sc.parallelize(range(6,15))
- >>> data1.intersection(data2).collect()
- [8, 9, 6, 7]
笛卡尔乘积, 作用于数据集T和U上, 返回(T, U), 即数据集中每个元素的两两组合
做笛卡尔积。 n * m
- # 定义
- def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]
-
- >>> a = sc.parallelize(('a', 'b', 'c'))
- >>> b = sc.parallelize(('d', 'e', 'f'))
- >>> a.cartesian(b).collect()
- [('a', 'd'), ('a', 'e'), ('a', 'f'), ('b', 'd'), ('b', 'e'),
- ('b', 'f'), ('c', 'd'), ('c', 'e'), ('c', 'f')]
将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。
- # 定义
- def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
-
- 例:创建两个RDD,并将两个RDD组合到一起形成一个(k,v)RDD
- (1)创建第一个RDD
- scala> val rdd1 = sc.parallelize(Array(1,2,3),3)
- rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
- (2)创建第二个RDD(与1分区数相同)
- scala> val rdd2 = sc.parallelize(Array("a","b","c"),3)
- rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at <console>:24
- (3)第一个RDD组合第二个RDD并打印
- scala> rdd1.zip(rdd2).collect
- res1: Array[(Int, String)] = Array((1,a), (2,b), (3,c))
- (4)第二个RDD组合第一个RDD并打印
- scala> rdd2.zip(rdd1).collect
- res2: Array[(String, Int)] = Array((a,1), (b,2), (c,3))
- (5)创建第三个RDD(与1,2分区数不同)
- scala> val rdd3 = sc.parallelize(Array("a","b","c"),2)
- rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:24
- (6)第一个RDD组合第三个RDD并打印
- scala> rdd1.zip(rdd3).collect
- java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: List(3, 2)
- at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:57)
- at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
- at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
- at scala.Option.getOrElse(Option.scala:121)
- at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
- at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965)
- at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
- at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
- at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
- at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
- at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
- ... 48 elided

对RDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区,否则会生成ShuffleRDD.
- # 定义
- partitionBy(partitioner: Partitioner): RDD[(K, V)]
作用于键值对(K, V)上,按Key分组,然后将Key相同的键值对的Value都执行func操作,得到一个值,注意func的类型必须满足
在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置
- # 定义
- def reduceByKey(func: (V, V) => V): RDD[(K, V)]
-
- # r[1]为ward的名字,r[6]为每个学校的学生数
- >>> pupils = newRows.map(lambda r: (r[1], int(r[6])))
- # 计算各个ward中的学生数
- >>> ward_pupils = pupils.reduceByKey(lambda x, y: x+y)
- # 输出各个ward中的学生数
- >>> ward_pupils.collect()
- [('Stifford Clays', 1566), ('Shenley', 1625), ('Southbury', 3526),
- ('Rainham and Wennington', 769), ('Bromley Town', 574),
- ('Waltham Abbey Honey Lane', 835), ('Telegraph Hill', 1238),
- ('Chigwell Village', 1506), ('Gooshays', 2097),
- ('Edgware', 2585), ('Camberwell Green', 1374), ('Glyndon', 4633),...]
groupByKey也是对每个key进行操作,但只生成一个sequence。即将key相同的value聚集在一起。
作用于由键值对(K, V)组成的数据集上,将Key相同的数据放在一起,返回一个由键值对(K, Iterable)组成的数据集。
注意:
1. 如果这一操作是为了后续在每个键上进行聚集(aggregation),
比如sum或者average, 此时使用reduceByKey或者aggregateByKey的效率更高。
2. 默认情况下, 输出的并行程度取决于RDD分区的数量,
但也可以通过给可选参数numTasks赋值来调整并发任务的数量。
3.reduceByKey:按照key进行聚合, 在shuffle之前有combine(预聚合)操作, 返回结果是RDD[k,v]。
groupByKey:按照key进行分组, 直接进行shuffle。
4.建议使用reduceByKey, 但是需要注意是否会影响业务逻辑
- # 定义
- def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
-
- >>> newRows = rows.map(lambda r: r[0].split(','))
- # r[1]为ward的名字,r[5]为学校的名字
- >>> ward_schoolname = newRows .map(lambda r: (r[1], r[5])).groupByKey()
- # 列出每个ward区域内所有的学校的名字
- >>> ward_schoolname.map(lambda x: {x[0]: list(x[1])}).collect()
- # 输出结果为在Stifford Clays这个ward里的学校有William Edwards School,
- # Brentwood County High School,The Coopers' Company and Coborn School等等...
- [{'Stifford Clays': ['William Edwards School', 'Brentwood County High School',
- "The Coopers' Company and Coborn School",
- 'Becket Keys Church of England Free School', ...]
对相同K,把V合并成一个集合。createCombiner: combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就 和之前的某个元素的键相同。如果这是一个新的元素,combineByKey() 会使用一个叫作 createCombiner() 的函数来创建那个键对应的累加器的初始值。mergeValue: 如果这是一个在处理当前分区之前已经遇到的键, 它会使用 mergeValue() 方法将该键的累加器对应的当前值与这个新的值进行合并。mergeCombiners: 由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器, 就需要使用用户提供的。mergeCombiners() 方法将各个分区的结果进行合并。
- # 定义
- def combineByKey[C](createCombiner: V => C,
- mergeValue: (C, V) => C,mergeCombiners: (C, C) => C,
- numPartitions: Int): RDD[(K, C)]
- (1) 创建一个pairRDD
- scala> val input = sc.parallelize(Array(("a", 88), ("b", 95),
- ("a", 91), ("b", 93), ("a", 95), ("b", 98)),2)
- input: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[52]
- at parallelize at <console>:26
-
- (2) 将相同key对应的值相加,同时记录该key出现的次数,放入一个二元组
- scala> val combine = input.combineByKey((_,1),(acc:(Int,Int),v)
- =>(acc._1+v,acc._2+1),(acc1:(Int,Int),acc2:(Int,Int))
- =>(acc1._1+acc2._1,acc1._2+acc2._2))
- combine: org.apache.spark.rdd.RDD[(String, (Int, Int))]
- = ShuffledRDD[5] at combineByKey at <console>:28
-
- (3) 打印合并后的结果
- scala> combine.collect
- res5: Array[(String, (Int, Int))] = Array((b,(286,3)), (a,(274,3)))
-
- (4) 计算平均值
- scala> val result = combine.map{case (key,value)
- => (key,value._1/value._2.toDouble)}
- result: org.apache.spark.rdd.RDD[(String, Double)]
- = MapPartitionsRDD[54] at map at <console>:30
-
- (5) 打印结果
- scala> result.collect()
- res33: Array[(String, Double)] = Array((b,95.33333333333333),
- (a,91.33333333333333))

在kv对的RDD中,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。seqOp函数用于在每一个分区中用初始值逐步迭代value,combOp函数用于合并每个分区中的结果。是CombineByKey的简化版,可以通过zeroValue直接提供一个初始值。
在键值对(K, V)的RDD中,按key将value进行分组合并,合并时,将每个value和初始值作为seqOp函数的参数,进行计算,
返回的结果作为一个新的键值对(K, V),然后再将结果按照key进行合并,
最后将每个分组的value传递给comOp函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给comOp函数,
以此类推),将key与计算结果作为一个新的键值对(K, V)输出。
- // 定义
- def aggregateByKey[U: ClassTag](zeroValue: U, partitioner:
- Partitioner)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)]
例子: 上述统计ward内学生人数的操作也可以通过aggregateByKey实现,此时,seqOp和comOp都是进行加法操作,代码如下:
- >>> ward_pupils = pupils.aggregateByKey(0, lambda x, y: x+y, lambda x, y: x+y)
- >>> ward_pupils.collect()
- [('Stifford Clays', 1566), ('Shenley', 1625), ('Southbury', 3526),
- ('Rainham and Wennington', 769), ('Bromley Town', 574),
- ('Waltham Abbey Honey Lane', 835), ('Telegraph Hill', 1238),
- ('Chigwell Village', 1506), ('Gooshays', 2097),
- ('Edgware', 2585), ('Camberwell Green', 1374), ('Glyndon', 4633),...]
该函数为aggregateByKey的简化版,seqOp和combOp一样,相同。
- // 定义
- def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD。
根据f函数提供可以排序的key
按照Key进行排序,ascending的值默认为True,True/False表示升序还是降序
例如:将上述ward按照ward名字降序排列,打印出前十个
- // 定义
- def sortBy[K]( f: (T) => K, ascending: Boolean = true, numPartitions:
- Int = this.partitions.length) (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
-
- // 示例
- >>> ward_pupils.sortByKey(False, 4).take(10)
- [('Yiewsley', 2560), ('Wormholt and White City', 1455), ('Woodside', 1204),
- ('Woodhouse', 2930), ('Woodcote', 1214), ('Winchmore Hill', 1116),
- ('Wilmington', 2243), ('Willesden Green', 1896), ('Whitefoot', 676),
- ('Whalebone', 2294)]
针对于(K,V)形式的类型只对V进行操作
- // 定义
- def mapValues[U](f: V => U): RDD[(K, U)]
在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
类似于SQL中的连接操作,即作用于键值对(K, V)和(K, W)上,返回元组 (K, (V, W)),
spark也支持外连接,包括leftOuterJoin,rightOuterJoin和fullOuterJoin。
JOIN : 只留下双方都有KEY
left JOIN: 留下左边RDD所有的数据
right JOIN: 留下右边RDD所有的数据
例子:
- >>> class1 = sc.parallelize(('Tom', 'Jenny', 'Bob'))
- .map(lambda a: (a, 'attended'))
- >>> class2 = sc.parallelize(('Tom', 'Amy', 'Alice', 'John'))
- .map(lambda a: (a, 'attended'))
- >>> class1.join(class2).collect()
- [('Tom', ('attended', 'attended'))]
- >>> class1.leftOuterJoin(class2).collect()
- [('Tom', ('attended', 'attended')), ('Jenny', ('attended', None)),
- ('Bob', ('attended', None))]
- >>> class1.rightOuterJoin(class2).collect()
- [('John', (None, 'attended')), ('Tom', ('attended', 'attended')),
- ('Amy', (None, 'attended')), ('Alice', (None, 'attended'))]
- >>> class1.fullOuterJoin(class2).collect()
- [('John', (None, 'attended')), ('Tom', ('attended', 'attended')),
- ('Jenny', ('attended', None)), ('Bob', ('attended', None)),
- ('Amy', (None, 'attended')), ('Alice', (None, 'attended'))]

Actions算子是Spark算子的一类,这一类算子会触发SparkContext提交job作业。
作用:通过func函数聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据。
使用函数func(两个输入参数,返回一个值)对数据集中的元素做聚集操作。
函数func必须是可交换的(我理解的就是两个参数互换位置对结果不影响),
并且是相关联的,从而能够正确的进行并行计算。
- // 示例1
- >>> data = sc.parallelize(range(1,101))
- >>> data.reduce(lambda x, y: x+y)
- 5050
-
- // 示例2
- 例:创建一个RDD,将所有元素聚合得到结果
- (1)创建一个RDD[Int]
- scala> val rdd1 = sc.makeRDD(1 to 10,2)
- rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[85] at makeRDD at <console>:24
- (2)聚合RDD[Int]所有元素
- scala> rdd1.reduce(_+_)
- res50: Int = 55
- (3)创建一个RDD[String]
- scala> val rdd2 = sc.makeRDD(Array(("a",1),("a",3),("c",3),("d",5)))
- rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[86] at makeRDD at <console>:24
- (4)聚合RDD[String]所有数据
- scala> rdd2.reduce((x,y)=>(x._1 + y._1,x._2 + y._2))
- res51: (String, Int) = (adca,12)

作用:在驱动程序中,以数组的形式返回数据集的所有元素。
在driver程序中以数组形式返回数据集中所有的元素。
这以action通常在执行过filter或者其他操作后返回一个较小的子数据集时非常有用。
- // 示例1:
- >>> data = sc.parallelize(range(1,101))
- >>> data.filter(lambda x: x%10==0).collect()
- [10, 20, 30, 40, 50, 60, 70, 80, 90, 100]
-
- // 示例2:
- 例:创建一个RDD,并将RDD内容收集到Driver端打印
- (1)创建一个RDD
- scala> val rdd = sc.parallelize(1 to 10)
- rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0]
- at parallelize at <console>:24
- (2)将结果收集到Driver端
- scala> rdd.collect
- res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
返回数据集中元素的个数。
- // 示例1
- >>> data.count()
- 100
-
- // 示例2
- 例:创建一个RDD,统计该RDD的条数
- (1)创建一个RDD
- scala> val rdd = sc.parallelize(1 to 10)
- rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
- (2)统计该RDD的条数
- scala> rdd.count
- res1: Long = 10
返回数据集中的第一个元素,相当于take(1)。
- // 示例1
- >>> data.first()
- 1
-
- // 示例2
- 例:创建一个RDD,返回该RDD中的第一个元素
- (1)创建一个RDD
- scala> val rdd = sc.parallelize(1 to 10)
- rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
- (2)统计该RDD的条数
- scala> rdd.first
- res2: Int = 1
返回一个由RDD的前n个元素组成的数组
以数组形式返回数据集中前n个元素。
需要注意的是,这一action并不是在多个node上并行执行,而是在driver程序所在的机器上单机执行,
会增大内存的压力,使用需谨慎。
- // 定义
- >>> data.take(10)
- [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
-
- // 示例
- 例:创建一个RDD,统计该RDD的条数
- (1)创建一个RDD
- scala> val rdd = sc.parallelize(Array(2,5,4,6,8,3))
- rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2]
- at parallelize at <console>:24
- (2)统计该RDD的条数
- scala> rdd.take(3)
- res10: Array[Int] = Array(2, 5, 4)
以数组形式返回从数据集中抽取的样本数量为num的随机样本,有替换或者无替换的进行采样。
可选参数[seed]可以允许用户自己预定义随机数生成器的种子。
- >>> data.takeSample(False, 20)
- [60, 97, 91, 62, 48, 7, 49, 89, 40, 44, 15, 2, 33, 8, 30, 82, 87, 96, 32, 31]
- >>> data.takeSample(True, 20)
- [96, 71, 20, 71, 80, 42, 70, 93, 77, 26, 14, 82, 50, 30, 30, 56, 93, 46, 70, 70]
返回该RDD排序后的前n个元素组成的数组
返回RDD的前n个元素,可以利用自然顺序或者由用户执行排序的comparator。
- // 示例1
- >>> score = [('Amy',98),('Bob',87),('David',95),('Cindy',76),('Alice',84),
- ('Alice',33)]
- >>> scoreRDD = sc.parallelize(score)
- >>> scoreRDD.takeOrdered(3)
- # 可以根据两个Alice的例子看到,当元祖中第一个元素相同时,会继续比较第二个元素,仍然按升序排列
- [('Alice', 33), ('Alice', 84), ('Amy', 98)]
- # 按照分数升序排序
- >>> scoreRDD.takeOrdered(3, key=lambda x: x[1])
- [('Alice', 33), ('Cindy', 76), ('Alice', 84)]
- # 按照分数降序排序
- >>> scoreRDD.takeOrdered(3, key=lambda x: -x[1])
- [('Amy', 98), ('David', 95), ('Bob', 87)]
-
- // 示例2
- 例:创建一个RDD,统计该RDD的条数
- (1)创建一个RDD
- scala> val rdd = sc.parallelize(Array(2,5,4,6,8,3))
- rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2]
- at parallelize at <console>:24
- (2)统计该RDD的条数
- scala> rdd.takeOrdered(3)
- res18: Array[Int] = Array(2, 3, 4)

注意,第2个参数这里是一个匿名函数,这个匿名函数并不会改变scoreRDD中的值,也就是第3个例子中,并不是将每个人的分数变为负数,而是提供一个排序的依据,说明此时为降序排序。如果是想要改变RDD中的值,可以进行如下操作:
- >>> scoreRDD.map(lambda x: (x[0], -x[1])).takeOrdered(3, lambda x: x[1])
- [('Amy', -98), ('David', -95), ('Bob', -87)]
这个例子并没有什么实际意义,只是提醒takeOrdered算子中第二个参数的作用。
参数:(zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)
作用:aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。
- 例:创建一个RDD,将所有元素相加得到结果
- (1)创建一个RDD
- scala> var rdd1 = sc.makeRDD(1 to 10,2)
- rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[88] at makeRDD at <console>:24
- (2)将该RDD所有元素相加得到结果
- scala> rdd.aggregate(0)(_+_,_+_)
- res22: Int = 55
作用:折叠操作,aggregate的简化操作,seqop和combop一样。
- 例:创建一个RDD,将所有元素相加得到结果
- (1)创建一个RDD
- scala> var rdd1 = sc.makeRDD(1 to 10,2)
- rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[88] at makeRDD at <console>:24
- (2)将该RDD所有元素相加得到结果
- scala> rdd.fold(0)(_+_)
- res24: Int = 55
作用:将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,
对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
将数据集中的元素以文本文件(或者文本文件的一个集合)的形式写入本地文件系统,
或者HDFS,或者其他Hadoop支持的文件系统的指定路径path下。
Spark会调用每个元素的toString方法,将其转换为文本文件中的一行。
作用:将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,
可以使HDFS或者其他Hadoop支持的文件系统。
将数据集中的元素以Hadoop SequenceFile的形式写入本地文件系统,
或者HDFS,或者其他Hadoop支持的文件系统的指定路径path下。
RDD的元素必须由实现了Hadoop的Writable接口的key-value键值对组成。
在Scala中,也可以是隐式可以转换为Writable的键值对
(Spark包括了基本类型的转换,例如Int,Double,String等等)
作用: 用于将RDD中的元素序列化成对象, 存储到文件中。
利用Java序列化, 将数据集中的元素以一种简单的形式进行写操作,
并能够利用SparkContext.objectFile()加载数据。(适用于Java和Scala)
作用: 针对(K,V)类型的RDD, 返回一个(K,Int)的map, 表示每一个key对应的元素个数。
只能作用于键值对(K, V)形式的RDDs上。按照Key进行计数, 返回键值对(K, int)的哈希表。
- // 示例1
- 例:创建一个PairRDD,统计每种key的个数
- (1)创建一个PairRDD
- scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
- rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[95] at parallelize at <console>:24
- (2)统计每种key的个数
- scala> rdd.countByKey
- res63: scala.collection.Map[Int,Long] = Map(3 -> 2, 1 -> 3, 2 -> 1)
-
-
- // 示例2
- # 一组学生对应的成绩
- >>> score = [('Amy',98),('Bob',87),('David',95),('Cindy',76),
- ('Alice',84),('Alice',33)]
- >>> scoreRDD = sc.parallelize(score)
- >>> scoreRDD.countByKey()
- defaultdict(<class 'int'>, {'Cindy': 1, 'Alice': 2, 'Bob': 1, 'Amy': 1,
- 'David': 1})
- >>> result = scoreRDD.countByKey()
- # 查看返回值类型
- >>> type(result)
- <class 'collections.defaultdict'>
- >>> result['Alice']
- 2
- >>> result['Sunny']
- 0
- >>> testDict = {'Cindy': 1, 'Alice': 2, 'Bob': 1, 'Amy': 1, 'David': 1}
- >>> testDict['Sunny']
- Traceback (most recent call last):
- File "<stdin>", line 1, in <module>
- KeyError: 'Sunny'

!!!特别注意,PySpark中返回的是一个collections.defaultdict()类,collections是python的一个模块,是一个数据类型容器模块,需要注意defaultdict与dict还是有区别的。
defaultdict是Python内建函数dict的一个子类,构建的是一个类似dictionary的对象,其中key的值是自行赋值,但是value的类型,是function_factory(工厂函数)的类实例,即使对于一个key,它的value值有缺失,也会有一个默认值。
上述代码中最后的例子可以看书,虽然result和testDict中都没有key为‘Sunny’这个键值对,但是result会返回一个默认值0,而testDict就出现了KeyError的错误。关于defaultdict和dict的区别,这里就不做过多的解释,但是大家需要注意这里返回的类型并不是dict。
作用:在数据集的每一个元素上,运行函数func进行更新。
在数据集的每个元素上调用函数func。这一操作通常是为了实现一些副作用,比如更新累加器或者与外部存储系统进行交互。
注意:在foreach()之外修改除了累加器以外的变量可能造成一些未定义的行为。
更多内容请参阅闭包进行理解。
- 例: 创建一个RDD, 对每个元素进行打印
- (1)创建一个RDD
- scala> var rdd = sc.makeRDD(1 to 5,2)
- rdd: org.apache.spark.rdd.RDD[Int] =
- ParallelCollectionRDD[107] at makeRDD at <console>:24
- (2)对该RDD每个元素进行打印
- scala> rdd.foreach(println(_))
关于 foreachRDD、foreachPartition和foreach:
foreachRDD、foreachPartition和foreach的不同之处主要在于它们的作用范围不同,
foreachRDD作用于DStream中每一个时间间隔的RDD,
foreachPartition作用于每一个时间间隔的RDD中的每一个partition,
foreach作用于每一个时间间隔的RDD中的每一个元素。
https://www.cnblogs.com/adienhsuan/p/5654485.html
http://blog.csdn.net/zhangyang10d/article/details/53239404
http://blog.csdn.net/zhangyang10d/article/details/53146953?locationNum=13&fps=1
http://blog.csdn.net/egraldloi/article/details/16343733
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。