赞
踩
SparkSQL
SparkSQL
是Spark的一个模块,用于处理海量结构化数据。
限定: 结构化数据处理
Spark
SparkSQL
是非常成熟的海量结构化数据处理框架。
学习SparkSQL
主要在2个点:
SparkSQL
本身十分优秀,支持SQL
语言\性能强\可以自动优化`API简单\兼容
HIVE`等等SparkSQL
处理业务数据
离线开发
数仓搭建
科学计算
数据分析
SparkSQL
的特点SQL
可以无缝集成在代码中,随时用SQL
处理数据;API
可读写不同数据源;SparkSQL
直接计算并生成Hive
数据表;JDBC\ODBC
连接,方便和各种数据库进行数据交互。results=spark.sql("SELECT * FROM people")
names = results.map(lambda p: p.name)
SparkSQL
的发展历史在许多年前(2012\2013左右)Hive
逐步火热起来,大片抢占分布式SQL
计算市场;
Spark作为通用计算框架,也不可能放弃这一细分领域。于是,Spark官方模仿Hive
推出了Shark
框架(Spark 0.9版本);
Shark框架是几乎100%模仿Hive
,内部的配置项\优化项等都是直接模仿而来。不同的在于将执行引擎由MapReduce
更换为了Spark;
因为Shark
框架太模仿Hive
,Hive
是针对MR
优化,很多地方和SparkCore(RDD)
水土不服, 最终被放弃;
Spark官方下决心开发一个自己的分布式SQL
引擎也就是诞生了现在的SparkSQL
。
2014年1.0正式发布;
2015年1.3发布DataFrame
数据结构,沿用至今;
2016年1.6发布Dataset
数据结构(带泛型的DataFrame
)适用于支持泛型的语言(Java\Scala
);
2016年20统一了Dataset
和DataFrame
,以后只有Dataset
,Python用的DataFrame
就是没有泛型的Dataset
;
2019年3.0发布,性能大幅度提升,SparkSQL
变化不大。
SparkSQL
和Hive
的异同Hive
和Spark
均是:“分布式SQL
计算引擎”SparkSQL
拥有更好的性能。Hive
仍旧居多,但SparkSQL
将会在很近的未来替代Hive
成为分布式SQL
计算市场的顶级。SparkSQL
的数据抽象Pandas-DataFrame
二维表数据结构
单机(本地)集合
SparkCore-RDD
无标准数据结构,存储什么数据均可
分布式集合(分区)
SparkSQL-DataFrame
二维表数据结构
分布式集合(分区)
SparkSQL
其实有3类数据抽象对象。
SchemaRDD
对象(已废弃)DataSet
对象:可用于Java、Scala语言DataFrame
对象:可用于Java、Scala、Python、R我们以Python开发SparkSQL
,主要使用的就是 DataFrame
对象作为核心数据结构。
从SparkSQL
的发展历史可以看到:
SchemaRDD
(内部存储二维表数据结构的RDD
),SchemaRDD
就是魔改的RDD
,将RDD
支持的存储数据,限定为二维表数据结构用以支持SQL
查询。由于是魔改RDD
,只是一个过渡产品,现已废弃。DataFrame
对象,基于Pandas
的DataFrame
(模仿)独立于RDD
进行实现,将数据以二维表结构进行存储并支持分布式运行。DataSet
对象,在DataFrame
之上添加了泛型的支持,用以更好的支持Java
和Scala
这两个支持泛型的编程语言。DataFrame
和DataSet
进行合并。其底层均是DataSet
对象,但在Python
和R
语言到用时,显示为DataFrame
对象。和老的DataFrame
对象没有区别。RDD
和DataFrame
的区别DataFrame
和RDD
都是:弹性的、分布式的、数据集。只是,DataFrame
存储的数据结构“限定”为:二维表结构化数据;而RDD
可以存储的数据则没有任何限制,想处理什么就处理什么。
DataFrame
是按照二维表格的形式存储数据;RDD
则是存储对象本身。SparkSession
在RDD
阶段,程序的执行入口对象是:SparkContext
。
在Spark2.0
后,推出了SparkSession
对象,作为Spark编码的统一入口对象。SparkSession
对象可以:
SparkSQL
编程作为入口对象;SparkCore
编程,可以通过SparkSession
对象中获取到SparkContext
。所以,我们后续的代码,执行环境入口对象,统一变更为SparkSession
对象。
SparkSession
对象# coding:utf8
#SparkSQL中的入口对象是SparkSession对象
from pyspark.sql import SparkSession
if __name__ == '__main__':
#构建SparkSession对象,这个对象是构建器模式 通过builder方法来构建
spark=SparkSession.builder.\
appName("local[*]").\
config("spark.sql.shufflepartitions","4").\
getOrCreate()
#appName设置程序名称,config设置一些常用属性
#最后通过getOrCreate()方法创建SparkSession对象
SparkSQL
函数UDF
函数无论Hive
还是SparkSQL
分析处理数据时,往往需要使用函数,SparkSQL
模块本身自带很多实现公共功能的函数,在pysparksql.functions
中。
SparkSQL
与Hive
一样支持定义函数:UDF
和UDAF
,尤其是UDF
函数在实际项目中使用最为广泛。
回顾Hive中自定义函数有三种类型:
UDF(User-Defined-Function)
函数
Hive
中继承UDF
类,方法名称为evaluate
,返回值不能为void
,其实就是实现一个方法;UDAF(User-Defined Aggregation Function)
聚合函数
GroupBy
联合使用;UDTF(User-Defined Table-GeneratingFunctions)
函数
flatMap
;在SparkSQL
中,目前仅仅支持UDF
函数和UDAF
函数,目前Python仅支持UDF
。
sparksessionudfregister()
: 注册的UDF
可以用于DSL
和SQL
,返回值用于DSL
风格,传参内给的名字用于SQL
风格;udf对象 = sparksession.udf.register(参数1,参数2,参数3)
参数1:UDF名称,可用于SQL风格
参数2:被注册成UDF的方法名
参数3:声明UDF的返回值类型
udf对象: 返回值对象,是一个UDF对象,可用于DSL风格
pyspark.sql.functions.udf
: 仅能用于DSL
风格。udf对象 = F.udf(参数1, 参数2)
参数1:被注册成UDF的方法名
参数2:声明UDF的返回值类型
udf对象: 返回值对象,是一个UDF对象,可用于DSL风格
其中F是:
from pyspark.sql import functions as F
其中,被注册成UDF
的方法名是指具体的计算方法,如:def add(x, y): x + y
add
就是将要被注册成UDF
的方法名
Integer
返回类型的UDF
# TODO 方式1注册 #注册UDF,功能:将数字都乘以10 def num ride 10(num): return num * 10 # 返回值用于DSL风格 内部注册的名称用于SQL(字符串表达式)风格 #参数1:UDF名称(可用于SQL风格),参数2:UDF的本体方法(处理逻辑),参数3:声明返回值类型 # 返回值可用于DSL udf2 = spark.udf.register("udf1",num_ride_10,IntegerType()) df.select(udf2(df['num'])).show() # select udf1(num) df.selectExpr("udf1(num)").show() #TODO 方式2注册,仅能用DSL风格 #参数1:UDF的本体方法(处理逻辑),参数2:声明返回值类型 udf3 = F.udf(num_ride_10,IntegerType()) df.select(udf3(df['num'])).show()
Float
返回值类型rdd = sc.parallelize([1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7]).map(lambda x: [x]) df = rdd.toDF(["num"]) #TODO 方式1注册 #注册UDF,功能:将数字都乘以10 def num_ride_10(num): return num * 10 # 返回值用于DSL风格 内部注册的名称用于SQL(字符串表达式)风格 udf2 = spark.udf.register("udf1", num_ride_10, FloatType()) df.select(udf2(df['num'])).show() # select udf1(num) df.selectExpr("udf1(num)").show() #TODO方式2注册,仅能用DSL风格 udf3 = F.udf(num_ride_10,FloatType()) df.select(udf3(df['num'])).show()
ArrayType
(数字\ list
)类型的返回值UDF
rdd = sc.parallelize([["hadoop spark flink"], ["hadoop flink java"]]) df = rdd.toDF(["line"]) #TODO 方式1注册 #注册UDF,功能:将数字都乘以10 def split line(line): return line.split(" ") # 返回值用于DSL风格 内部注册的名称用于SQL(字符串表达式)风格 udf2 = spark.udf.register("udf1",split_line,ArrayType(StringType())) df.select(udf2(df['line'])).show() # select udf1(num) df.selectExpr("udf1(line)").show(() #TODO 方式2注册,仅能用DSL风格 udf3 = F.udf(splitline, ArrayType(StringType())) df.select(udf3(df['line'])).show(truncate = False)
数组或者
list
类型,可以使用Spark的ArrayType
来描述即可。
声明ArrayType
要类似这样:ArrayType(StringType())
,在ArrayType
中传入数组内的数据类型。
rdd = sc.parallelize([[1], [2], [3]]) df = rdd.toDF(["num"]) # TODO 方式1注册 # 注册UDF,功能:将数字都乘以10 def split_line(num): return {"num": num,"letter str": string.ascii letters[num]} structtype=StructType().add("num", IntegerType(), nullable=True).\ add("letter_str", StringType(), nullable=True) # 返回值用于DSL风格 路内部注册的名称用于SQL(字符串表达式)风格 udf2=spark.udf.register("udf1", split_line, struct_type) df.select(udf2(df['num'])).show() # select udf1(num) df.selectExpr("udf1(num)").show() #TODO 方式2注册,仅能用DSL风格 udf3=F.udf(split_line, struct_type) df.select(udf3(df['num'])).show(truncate=False)
字典类型返回值,可以用
StructType
来进行描述,StructType
是一个普通的Spark支持的结构化类型。只是可以用在:
(1)DF
中用于描述Schema
;
(2)UDF
中用于描述返回值是字典的数据。
使用UDF
两种方式的注册均可以。唯一需要注意的就是:返回值类型一定要有合适的类型来声明。
int
可以用IntergerType
;FolatType
或者DoubleType
;list
可用ArrayType
描述;StructType
描述。
pyspark.sql.types
包中。
开窗函数的引入是为了既显示聚集前的数据,又显示聚集后的数据。即在每一行的最后一列添加聚合函数的结果。
开窗用于为行定义一个窗口(这里的窗口是指运算将要操作的行的集合),它对一组值进行操作,不需要使用GROUP BY
子句对数据进行分组,能够在同一行中同时返回基础行的列和聚合列。
count
,avg
…;group by
中;group by
,直接将所有信息显示出来。OVER
(选项),这里的选项可以是PARTITION BY
子句,但不可以是 ORDER BY
子句。OVER
(选项),这里的选项可以是ORDER BY
子句,也可以是OVER
(PARTITION BY
子句 ORDER BY
子句),但不可以是PARTITION BY
子句。NTILE
的窗口函数。# 聚合类型SUM\MIN\MAX\AVG\COUNT
sum() OVER([PARTITION BY XXX][ORDER BYXXX[DESC]])
#排序类型:ROW NUMBER|RANK|DENSE RANK
ROW NUMBER() OVER([PARTITION BY XXX][ORDER BYXXX[DESC]])
#分区类型:NTILE
NTILE(number) OVER([PARTITION BY XXX][ORDER BY XXX[DESC]])
RDD
的执行流程回顾代码 ⇒ DAG
调度器逻辑任务 ⇒ Task
调度器任务分配和管理监控 ⇒ Worker
干活
SparkSQL
的自动优化RDD
的运行会完全按照开发者的代码执行,如果开发者水平有限,RDD
的执行效率也会受到影响。而SparkSQL
会对写完的代码,执行“自动优化”,以提升代码运行效率,避免开发者水平影响到代码执行效率。
SparkSQL
可以自动优化,而RDD
不可以?RDD
: 内含数据类型不限格式和结构
DataFrame
: 100% 是二维表结构,可以被针对
SparkSQL
的自动优化,依赖于:Catalyst
优化器
为了解决过多依赖Hive
的问题,SparkSQL
使用了一个新的SQL
优化器替代Hive
中的的优化器,这个优化器就是Catalyst
整个SparkSQL
的架构大致如下:
API
层简单的说就是Spark会通过一些API
接受SQL
语句;SQL
语句以后,将其交给Catalyst
,Catalyst
负责解析SQL
,生成执行计划等;Catalyst
的输出应该是RDD
的执行计划;SparkSQL
代码catalyst
优化
AST
语法数AST
元数据AST
上AST
得到,生成执行计划RDD
代码Driver
执行环境入口构建(SparkSession
)DAG
调度器规划逻辑任务TASK
调度区分配逻辑任务到具体Executor
上工作并监控管理任务Worker
干活.AST
中加入元数据信息,做这一步主要是为了一些优化score.id
⇒ id#1#L
为 score.id
生成id
为1,类型是Long
;score.math_score
⇒ math_score#2#L
为scoremath_score
生成id
为2,类型为Long
;people.id
⇒ id#3#L
为peopleid
生成id
为3,类型为Long
;peopleage
⇒ age#4#L
为peopleage
生成id
为4类型为Long
。Predicate Pushdown
,将Filter
这种可以减小数据集的操作下推,放在Scan
的位置,这样可以减少操作时候的数据量;ColumnPruning
,在断言下推后执行裁剪,由于people
表之上的操作只用到了id
列,所以可以把其它列裁剪掉,这样可以减少处理的数据量,从而优化处理速度。AST
叫做逻辑计划,结束后,需要生成物理计划,从而生成RDD
来物理计划的时候,会经过成本模型
对整棵树再次执行优化,选择一个更好的计划物理计划
以后,因为考虑到性能,所以会使用代码生成,在机器中运行可以使用quervExecution
方法查看逻辑执行计划,使用explain
方法查看物理执行计划。
catalyst
的各种优化细节非常多,大方面的优化点有2个:
PredicatePushdown
)\断言下推: 将逻辑判断提前到前面,以减少shuffle
阶段的数据量;Column Pruning
): 将加载的列进行裁剪,尽量减少被处理数据的宽度。Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。