Spark概述
根据官方网站, “ Apache Spark是用于大规模数据处理的快速通用引擎”
最好与群集环境一起使用,在群集环境中,数据处理任务或作业被拆分为可以快速,高效地在多台计算机或节点上运行。 它声称运行程序的速度比Hadoop平台快100倍。
Spark使用称为RDD(弹性分布式数据集)的对象来处理和过滤数据。 RDD对象提供了各种有用的功能来以分布式方式处理数据。 Spark的优点在于您无需了解它如何在群集中的各个节点之间分布或拆分数据。 作为开发人员,您只专注于编写RDD函数来处理和转换数据。 Spark是本机使用Scala语言构建的。 但是您可以使用Java,Python或Scala编写Spark程序。
Spark由可以用于其数据处理需求的不同模块或组件组成。 Spark的组件模型如下:
主要组件是Spark Core ,它使用RDD在分布式环境中处理(映射和减少)数据。 基于Spark Core构建的其他组件是:
- Spark Stream:用于分析实时数据流
- Spark SQL:人们可以使用Hive上下文编写基于SQL的查询来处理数据
- MLLib:支持机器学习算法和工具来训练您的数据
- GraphX:创建数据图并执行诸如转换和联接之类的操作
搭建环境
首先安装JDK 8,并设置指向JDK home的JAVA_HOME
环境变量。 对于Python,您可以安装流行的IDE,例如Enthought Canopy或Anaconda,也可以从python.org网站安装基本的Python。 对于Spark,请从Apache Spark官方网站下载并安装最新版本。
安装完成后,将SPARK_HOME
环境变量设置为指向Spark主文件夹。 还要确保将PATH变量设置为指向相应的bin文件夹: JAVA_HOME\bin
和SPARK_HOME\bin.
接下来,创建一个样本数据集,我们将使用它来处理Spark。 数据集将表示产品评级的收集。 数据格式如下:
第一栏代表订单ID,第二栏代表产品ID,第三栏代表评级。 它会在每个订单中告诉您产品等级。 我们将处理数据以找出每个评级下的总产品。 上面只是一小部分数据,使您了解使用Spark进行处理有多么容易。 您可能需要下载或生成现实生活的数据以实现类似的用例。 将此数据文件存储在文件夹中(例如/spark/data
)。 它可以是您选择的文件夹。
看代码
- from pyspark import SparkConf, SparkContext
- import collections
-
- conf = SparkConf().setMaster("local").setAppName("Product Ratings")
- sc = SparkContext(conf = conf)
-
- rows = sc.textFile("file:///spark/data/product-ratings-data.py")
- ratings = rows.map(lambda x: x.split()[2])
- output = ratings.countByValue()
- sortedOutput = collections.OrderedDict(sorted(output.items()))
- for key, value in sortedOutput.items():
- print("%s %i" % (key, value))
Python通过使用pyspark
模块与Spark集成。 您将需要此模块中的两个核心类: SparkConf
和SparkContext
。 SparkContext
用于创建RDD对象。 SparkConf
用于配置或设置应用程序属性。 第一招表明我们的Spark作业将使用集群(分布式)环境还是在一台机器上。 我们现在可以将其设置为local
,因为这是基本程序,我们将运行该程序以大致了解Spark,因此我们不需要分布式环境。 我们还设置了应用程序的名称,我们将其称为“ Basic Spark App”。 如果使用Web UI管理Spark作业,则显示该作业的名称。 然后,从已配置的SparkConf
对象中获取SparkContext
( sc
)对象。
加载数据
rows = sc.textFile("file:///spark/data/product-ratings-data")
下一个代码段将从本地文件系统加载数据。 我们已经有一个文件存储在spark/data
文件夹中。 该文件包含产品等级信息,我们将加载该信息以查找特定等级下的产品总数。 的textFile()
函数是用来从指定的文件路径加载初始数据,并将它返回一个火花RDD对象- rows
。 这是我们将用于进一步处理和转换数据的核心对象。 该RDD对象将包含行记录形式的数据。 文件中的每一行都是RDD对象中的记录或值。
转换数据
- ratings = rows.map(lambda x: x.split()[2])
- output = ratings.countByValue()
map()
函数使用lambda功能(内部函数定义为lambda)按空间划分数据行并获取第三列(索引从0开始,因此2成为第三列)。 如此提取的第三列是评级列。 拆分将在每一行上执行,结果输出将存储在新的RDD对象中,我们称其为rating
。
现在,我们的新ratings
RDD将保留如下所示的评级:
现在,要查找每个评级的产品总数,我们将使用countByValue()
函数。 它将对每个重复的等级进行计数,并将计数值放在每个等级旁边。 最终输出将如下所示。
您可以使用spark-submit
应用程序运行python程序,如下所示:
spark-submit product-ratings-data.py
共有3个产品的评级为4,两个产品的评级为3,一个产品的评级为1和2。 countByValue()
函数将返回一个普通的Python集合对象,该对象最终可以进行迭代以显示结果。
翻译自: https://www.javacodegeeks.com/2016/11/apache-spark-quick-start-python.html