当前位置:   article > 正文

Spark——一文理解SparkSQL的基础概念、函数、执行流程和优化流程_spark sql

spark sql

1、什么是SparkSQL

SparkSQLSpark的一个模块,用于处理海量结构化数据

限定: 结构化数据处理

2、为什么学习Spark

SparkSQL是非常成熟的海量结构化数据处理框架。

学习SparkSQL主要在2个点:

  • SparkSQL本身十分优秀,支持SQL语言\性能强\可以自动优化`API简单\兼容HIVE`等等
  • 企业大面积在使用SparkSQL处理业务数据
    • 离线开发

    • 数仓搭建

    • 科学计算

    • 数据分析

3、SparkSQL的特点

  • 融合性:SQL可以无缝集成在代码中,随时用SQL处理数据;
  • 统一数据访问: 一套标准API可读写不同数据源;
  • Hive兼容: 可以使用SparkSQL直接计算并生成Hive数据表;
  • 标准化连接: 支持标准化JDBC\ODBC连接,方便和各种数据库进行数据交互。
results=spark.sql("SELECT * FROM people")
names = results.map(lambda p: p.name)
  • 1
  • 2

在这里插入图片描述

4、SparkSQL的发展历史

在许多年前(2012\2013左右)Hive逐步火热起来,大片抢占分布式SQL计算市场;

Spark作为通用计算框架,也不可能放弃这一细分领域。于是,Spark官方模仿Hive推出了Shark框架(Spark 0.9版本);

Shark框架是几乎100%模仿Hive,内部的配置项\优化项等都是直接模仿而来。不同的在于将执行引擎由MapReduce更换为了Spark;

因为Shark框架太模仿HiveHive是针对MR优化,很多地方和SparkCore(RDD)水土不服, 最终被放弃;
Spark官方下决心开发一个自己的分布式SQL引擎也就是诞生了现在的SparkSQL
在这里插入图片描述

  • 2014年1.0正式发布;

  • 2015年1.3发布DataFrame数据结构,沿用至今;

  • 2016年1.6发布Dataset数据结构(带泛型的DataFrame)适用于支持泛型的语言(Java\Scala);

  • 2016年20统一了DatasetDataFrame,以后只有Dataset,Python用的DataFrame就是没有泛型的Dataset

  • 2019年3.0发布,性能大幅度提升,SparkSQL变化不大。

5、SparkSQLHive的异同

在这里插入图片描述

  • HiveSpark均是:“分布式SQL计算引擎”
  • 均是构建大规模结构化数据计算的绝佳利器,同时SparkSQL拥有更好的性能。
  • 目前,企业中使用Hive仍旧居多,但SparkSQL将会在很近的未来替代Hive成为分布式SQL计算市场的顶级。

6、SparkSQL的数据抽象

在这里插入图片描述

  • Pandas-DataFrame
    • 二维表数据结构

    • 单机(本地)集合

  • SparkCore-RDD
    • 无标准数据结构,存储什么数据均可

    • 分布式集合(分区)

  • SparkSQL-DataFrame
    • 二维表数据结构

    • 分布式集合(分区)

6.1、语言适配

在这里插入图片描述

SparkSQL 其实有3类数据抽象对象。

  • SchemaRDD对象(已废弃)
  • DataSet对象:可用于Java、Scala语言
  • DataFrame对象:可用于Java、Scala、Python、R

我们以Python开发SparkSQL,主要使用的就是 DataFrame对象作为核心数据结构

6.2、数据抽象的发展

在这里插入图片描述

SparkSQL的发展历史可以看到:

  • 14年最早的数据抽象是:SchemaRDD(内部存储二维表数据结构的RDD),SchemaRDD就是魔改的RDD,将RDD支持的存储数据,限定为二维表数据结构用以支持SQL查询。由于是魔改RDD,只是一个过渡产品,现已废弃。
  • 15年发布DataFrame对象,基于PandasDataFrame(模仿)独立于RDD进行实现,将数据以二维表结构进行存储并支持分布式运行。
  • 16年发布DataSet对象,在DataFrame之上添加了泛型的支持,用以更好的支持JavaScala这两个支持泛型的编程语言。
  • 16年,Spark2.0版本,将DataFrameDataSet进行合并。其底层均是DataSet对象,但在PythonR语言到用时,显示为DataFrame对象。和老的DataFrame对象没有区别。

6.3、RDDDataFrame的区别

DataFrameRDD都是:弹性的、分布式的、数据集。只是,DataFrame存储的数据结构“限定”为:二维表结构化数据;而RDD可以存储的数据则没有任何限制,想处理什么就处理什么。

  • DataFrame是按照二维表格的形式存储数据;
  • RDD则是存储对象本身。

7、SparkSession

RDD阶段,程序的执行入口对象是:SparkContext

Spark2.0后,推出了SparkSession对象,作为Spark编码的统一入口对象。SparkSession对象可以:

  • 用于SparkSQL编程作为入口对象;
  • 用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext

所以,我们后续的代码,执行环境入口对象,统一变更为SparkSession对象。

在这里插入图片描述

构建SparkSession对象

# coding:utf8
#SparkSQL中的入口对象是SparkSession对象
from pyspark.sql import SparkSession

if __name__ == '__main__':
  #构建SparkSession对象,这个对象是构建器模式 通过builder方法来构建
  spark=SparkSession.builder.\
    appName("local[*]").\
    config("spark.sql.shufflepartitions","4").\
    getOrCreate()
  #appName设置程序名称,config设置一些常用属性
  #最后通过getOrCreate()方法创建SparkSession对象
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

8、SparkSQL函数

8.1、UDF函数

无论Hive还是SparkSQL分析处理数据时,往往需要使用函数,SparkSQL模块本身自带很多实现公共功能的函数,在pysparksql.functions中。
SparkSQLHive一样支持定义函数:UDFUDAF,尤其是UDF函数在实际项目中使用最为广泛。

回顾Hive中自定义函数有三种类型:

  • 第一种:UDF(User-Defined-Function)函数
    • 一对一的关系,输入一个值经过函数以后输出一个值;
    • Hive中继承UDF类,方法名称为evaluate,返回值不能为void,其实就是实现一个方法;
  • 第二种:UDAF(User-Defined Aggregation Function)聚合函数
    • 多对一的关系,输入多个值输出一个值,通常与GroupBy联合使用;
  • 第三种:UDTF(User-Defined Table-GeneratingFunctions)函数
    • 一对多的关系,输入一个值输出多个值(一行变为多行);
    • 用户自定义生成函数,有点像flatMap

在这里插入图片描述

SparkSQL中,目前仅仅支持UDF函数和UDAF函数,目前Python仅支持UDF

8.1.1、构建方式

  • sparksessionudfregister() 注册的UDF可以用于DSLSQL,返回值用于DSL风格,传参内给的名字用于SQL风格;
udf对象 = sparksession.udf.register(参数1,参数2,参数3)
参数1:UDF名称,可用于SQL风格
参数2:被注册成UDF的方法名
参数3:声明UDF的返回值类型
udf对象: 返回值对象,是一个UDF对象,可用于DSL风格

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • pyspark.sql.functions.udf 仅能用于DSL风格。
udf对象 = F.udf(参数1, 参数2)
参数1:被注册成UDF的方法名
参数2:声明UDF的返回值类型
udf对象: 返回值对象,是一个UDF对象,可用于DSL风格
  • 1
  • 2
  • 3
  • 4

其中F是:from pyspark.sql import functions as F
其中,被注册成UDF的方法名是指具体的计算方法,如:def add(x, y): x + y
add就是将要被注册成UDF的方法名

8. 1.2、构建一个Integer返回类型的UDF

# TODO 方式1注册
#注册UDF,功能:将数字都乘以10
def num ride 10(num):
  return num * 10

# 返回值用于DSL风格        内部注册的名称用于SQL(字符串表达式)风格
#参数1:UDF名称(可用于SQL风格),参数2:UDF的本体方法(处理逻辑),参数3:声明返回值类型
# 返回值可用于DSL
udf2 = spark.udf.register("udf1",num_ride_10,IntegerType())
df.select(udf2(df['num'])).show()

# select udf1(num)
df.selectExpr("udf1(num)").show()

#TODO 方式2注册,仅能用DSL风格
#参数1:UDF的本体方法(处理逻辑),参数2:声明返回值类型
udf3 = F.udf(num_ride_10,IntegerType())
df.select(udf3(df['num'])).show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

8. 1.3、注册一个Float返回值类型

rdd = sc.parallelize([1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7]).map(lambda x: [x])
df = rdd.toDF(["num"])

#TODO 方式1注册
#注册UDF,功能:将数字都乘以10
def num_ride_10(num):
  return num * 10
# 返回值用于DSL风格     内部注册的名称用于SQL(字符串表达式)风格
udf2 = spark.udf.register("udf1", num_ride_10, FloatType())
df.select(udf2(df['num'])).show()

# select udf1(num)
df.selectExpr("udf1(num)").show()

#TODO方式2注册,仅能用DSL风格
udf3 = F.udf(num_ride_10,FloatType())
df.select(udf3(df['num'])).show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

8. 1.4、注册一个ArrayType(数字\ list)类型的返回值UDF

rdd = sc.parallelize([["hadoop spark flink"], ["hadoop flink java"]])
df = rdd.toDF(["line"])
#TODO 方式1注册
#注册UDF,功能:将数字都乘以10
def split line(line):
  return line.split(" ")

# 返回值用于DSL风格      内部注册的名称用于SQL(字符串表达式)风格
udf2 = spark.udf.register("udf1",split_line,ArrayType(StringType()))
df.select(udf2(df['line'])).show()

# select udf1(num)
df.selectExpr("udf1(line)").show(()

#TODO 方式2注册,仅能用DSL风格
udf3 = F.udf(splitline, ArrayType(StringType()))
df.select(udf3(df['line'])).show(truncate = False)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

数组或者list类型,可以使用Spark的ArrayType来描述即可。
声明ArrayType要类似这样:ArrayType(StringType()),在ArrayType中传入数组内的数据类型。

8. 1.5、定义一个字典类型的返回值的UDF

rdd = sc.parallelize([[1], [2], [3]])
df = rdd.toDF(["num"])
# TODO 方式1注册
# 注册UDF,功能:将数字都乘以10
def split_line(num):
  return {"num": num,"letter str": string.ascii letters[num]}
structtype=StructType().add("num", IntegerType(), nullable=True).\
  add("letter_str", StringType(), nullable=True)

# 返回值用于DSL风格     路内部注册的名称用于SQL(字符串表达式)风格
udf2=spark.udf.register("udf1", split_line, struct_type)
df.select(udf2(df['num'])).show()

# select udf1(num)
df.selectExpr("udf1(num)").show()

#TODO 方式2注册,仅能用DSL风格
udf3=F.udf(split_line, struct_type)
df.select(udf3(df['num'])).show(truncate=False)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

字典类型返回值,可以用StructType来进行描述,StructType是一个普通的Spark支持的结构化类型。只是可以用在:
(1)DF中用于描述Schema
(2)UDF中用于描述返回值是字典的数据。

8. 1.6、注意事项

使用UDF两种方式的注册均可以。唯一需要注意的就是:返回值类型一定要有合适的类型来声明。

  • 返回int可以用IntergerType
  • 返回值小数,可以用FolatType或者DoubleType
  • 返回数组list可用ArrayType描述;
  • 返回字典可用StructType描述。

pyspark.sql.types包中。

8. 2、窗口函数

8. 2.1、介绍

开窗函数的引入是为了既显示聚集前的数据,又显示聚集后的数据。即在每一行的最后一列添加聚合函数的结果。

开窗用于为行定义一个窗口(这里的窗口是指运算将要操作的行的集合),它对一组值进行操作,不需要使用GROUP BY子句对数据进行分组,能够在同一行中同时返回基础行的列和聚合列。

8. 2.2、聚合函数和开窗函数

  • 聚合函数是将多行变成一行,countavg…;
  • 开窗函数是将一行变成多行;
  • 聚合函数如果要显示其他的列必须将列加入到group by中;
  • 开窗函数可以不使用group by,直接将所有信息显示出来。

8. 2.3、开窗函数分类

  • 聚合开窗函数: 聚合函数(列)OVER(选项),这里的选项可以是PARTITION BY 子句,但不可以是 ORDER BY子句。
  • 排序开窗函数: 排序函数(列)OVER(选项),这里的选项可以是ORDER BY子句,也可以是OVERPARTITION BY子句 ORDER BY子句),但不可以是PARTITION BY 子句。
  • 分区类型NTILE的窗口函数。
窗口函数的用法
# 聚合类型SUM\MIN\MAX\AVG\COUNT
sum() OVER([PARTITION BY XXX][ORDER BYXXX[DESC]])

#排序类型:ROW NUMBER|RANK|DENSE RANK
ROW NUMBER() OVER([PARTITION BY XXX][ORDER BYXXX[DESC]])

#分区类型:NTILE
NTILE(number) OVER([PARTITION BY XXX][ORDER BY XXX[DESC]])
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

9、RDD的执行流程回顾

在这里插入图片描述

代码 ⇒ DAG调度器逻辑任务 ⇒ Task调度器任务分配和管理监控 ⇒ Worker干活

10、SparkSQL的自动优化

RDD的运行会完全按照开发者的代码执行,如果开发者水平有限,RDD的执行效率也会受到影响。而SparkSQL会对写完的代码,执行“自动优化”,以提升代码运行效率,避免开发者水平影响到代码执行效率。

为什么SparkSQL可以自动优化,而RDD不可以?

  • RDD 内含数据类型不限格式和结构

  • DataFrame 100% 是二维表结构,可以被针对

  • SparkSQL的自动优化,依赖于:Catalyst优化器

11、Catalyst优化器

为了解决过多依赖Hive的问题,SparkSQL使用了一个新的SQL优化器替代Hive中的的优化器,这个优化器就是Catalyst整个SparkSQL的架构大致如下:

在这里插入图片描述

  • API层简单的说就是Spark会通过一些API接受SQL语句;
  • 收到SQL语句以后,将其交给CatalystCatalyst负责解析SQL,生成执行计划等;
  • Catalyst的输出应该是RDD的执行计划;
  • 最终交由集群运行。

12、Catalyst优化流程

在这里插入图片描述

  • 提交SparkSQL代码
  • catalyst优化
    • 生成原始AST语法数
    • 标记AST元数据
    • 进行断言下推和列值裁剪以及其它方面的优化作用在AST
    • 将最终AST得到,生成执行计划
    • 将执行计划翻译为RDD代码
  • Driver执行环境入口构建(SparkSession
  • DAG 调度器规划逻辑任务
  • TASK调度区分配逻辑任务到具体Executor上工作并监控管理任务
  • Worker干活.

12.1、解析SQL,并生成AST(抽象语法树)

在这里插入图片描述

12.2、在AST中加入元数据信息,做这一步主要是为了一些优化

在这里插入图片描述

  • score.idid#1#Lscore.id 生成id为1,类型是Long
  • score.math_scoremath_score#2#Lscoremath_score生成id为2,类型为Long
  • people.idid#3#Lpeopleid生成id为3,类型为Long
  • peopleageage#4#Lpeopleage生成id为4类型为Long

12.3、对已经加入元数据的AST,输入优化器,进行优化,从两种常见的优化开始

在这里插入图片描述

  • 断言下推 Predicate Pushdown,将Filter这种可以减小数据集的操作下推,放在Scan的位置,这样可以减少操作时候的数据量;
  • 列值裁剪ColumnPruning,在断言下推后执行裁剪,由于people表之上的操作只用到了id列,所以可以把其它列裁剪掉,这样可以减少处理的数据量,从而优化处理速度。

12.4、上面的过程生成的AST其实最终还没办法直接运行,这个AST 叫做逻辑计划,结束后,需要生成物理计划,从而生成RDD

  • 在生成物理计划的时候,会经过成本模型对整棵树再次执行优化,选择一个更好的计划
  • 在生成物理计划以后,因为考虑到性能,所以会使用代码生成,在机器中运行

可以使用quervExecution方法查看逻辑执行计划,使用explain 方法查看物理执行计划。

12.5、总结

catalyst的各种优化细节非常多,大方面的优化点有2个:

  • 谓词下推(PredicatePushdown)\断言下推: 将逻辑判断提前到前面,以减少shuffle阶段的数据量;
  • 列值裁剪(Column Pruning): 将加载的列进行裁剪,尽量减少被处理数据的宽度。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Guff_9hys/article/detail/920941
推荐阅读
相关标签
  

闽ICP备14008679号