当前位置:   article > 正文

(二)PySpark3:SparkSQL编程_pyspark sql

pyspark sql

目录

一、SparkSQL介绍

二、创建DataFrame

1、通过ToDF方法

2、通过createDataFrame方法

3、通过读取文件或数据库

三、保存DataFrame

四、DataFrame API

1、显示数据

2、统计信息

3、类RDD操作

4、类Excel操作

5、类SQL表操作

五、DataFrame+SQL

1、注册视图

2、操作Hive表

六、总结


 PySpark系列文章:

(一)PySpark3:安装教程及RDD编程

(二)PySpark3:SparkSQL编程

(三)PySpark3:SparkSQL40题

(四)PySpark3:Mlib机器学习实战-信用卡交易数据异常检测

一、SparkSQL介绍

Spark SQL是Apache Spark生态系统的一个关键组件,专注于处理和分析结构化和半结构化的大规模数据。Spark SQL建立在Spark核心之上,为用户提供了高效且易用的数据处理接口,从而将关系型和非关系型数据融入到分布式计算环境中。

核心概念之一是DataFrame API,它提供了一个高级的、面向数据的抽象,允许用户以声明性的方式处理数据。DataFrame是一个分布式的、具有表格结构的数据集,类似于传统数据库中的表。通过DataFrame API,用户可以执行各种数据操作,包括过滤、转换、聚合、连接等,而无需深入了解底层的分布式计算模型。

RDD,DataFrame和DataSet对比:

RDD可以存储任何类型的数据,包括结构化数据、半结构化数据和非结构化数据,RDD的操作更接近底层,更适合对数据进行底层控制和自定义处理。

DataFrame构建在RDD之上,提供了更高级的抽象,是一个分布式的、以列为主的数据集合,类似于关系型数据库中的表。DataFrame可以通过多种数据源进行创建,包括结构化数据源(如JSON、CSV、Parquet)和Hive表,并且提供了丰富的SQL和DataFrameAPI,可以方便地进行数据处理和分析。

DataSet在DataFrame基础上进一步增加了数据类型信息,可以通过编程语言的类型系统来检查错误,并提供更好的编译时类型检查。

DataFrame和DataSet都支持SQL交互式查询,可以和 Hive无缝衔接。DataSet只有Scala语言和Java语言接口中才支持,在Python和R语言接口只支持DataFrame。

二、创建DataFrame

首先导包:

  1. import pandas as pd
  2. from pyspark.sql.types import *
  3. import pyspark.sql.functions as F
  4. from pyspark.sql import Row
  5. pd.DataFrame.iteritems = pd.DataFrame.items

1、通过ToDF方法

可以将RDD用toDF方法转换成DataFrame。

  1. rdd = sc.parallelize([(1,"James",27,1000),(2,"Bob",22,500),(3,"Alice",25,800),(4,"Jon",29,1200)])
  2. df = rdd.toDF(["id","name","age","sal"])
  3. df.show()

输出结果:

  1. +---+-----+---+----+
  2. | id| name|age| sal|
  3. +---+-----+---+----+
  4. | 1|James| 27|1000|
  5. | 2| Bob| 22| 500|
  6. | 3|Alice| 25| 800|
  7. | 4| Jon| 29|1200|
  8. +---+-----+---+----+

2、通过createDataFrame方法

可以将Pandas.DataFrame转换成pyspark中的DataFrame,也可直接对数据列表、schema进行转换。

  1. #将pandas.DataFrame转换为pyspark.DataFrame
  2. pdf = pd.DataFrame([(1,"James",27,1000),(2,"Bob",22,500),(3,"Alice",25,800),(4,"Jon",29,1200)]
  3. ,columns=["id","name","age","sal"])
  4. df = spark.createDataFrame(pdf)
  5. print(type(df))
  6. #将list转换为pyspark.DataFrame
  7. df = spark.createDataFrame([(1,"James",27,1000),(2,"Bob",22,500),(3,"Alice",25,800),(4,"Jon",29,1200)]
  8. ,["id","name","age","sal"])
  9. print(type(df))
  10. #根据指定rdd和schema创建pyspark.DataFrame
  11. schema = StructType([StructField("id", IntegerType(), nullable = False),
  12. StructField("name", StringType(), nullable = True),
  13. StructField("age", IntegerType(), nullable = True),
  14. StructField("sal", FloatType(), nullable = True),
  15. ])
  16. rdd = sc.parallelize([Row(1,"James",27,1000),
  17. Row(2,"Bob",22,500),
  18. Row(3,"Alice",25,800),
  19. Row(4,"Jon",29,1200),
  20. ])
  21. df = spark.createDataFrame(rdd, schema)
  22. print(type(df))

输出结果:

  1. <class 'pyspark.sql.dataframe.DataFrame'>
  2. <class 'pyspark.sql.dataframe.DataFrame'>

3、通过读取文件或数据库

可以通过读取json、csv等文件,或hive、mysql数据表得到DataFrame。

spark.read.csv(...): 从 CSV 文件中读取数据。
spark.read.json(...): 从 JSON 文件中读取数据。
spark.read.parquet(...): 从 Parquet 文件中读取数据。
spark.read.text(...): 读取文本文件。
spark.read.format(...): 使用指定的格式读取数据。

读取json文件:

  1. df = spark.read.json("test.json")
  2. df.show()

输出结果:test.json内容如下:

  1. {"id":1,"name":"James","age":27,"sal":1000}
  2. {"id":2,"name":"Bob","age":22,"sal":500}
  3. {"id":3,"name":"Alice","age":25,"sal":800}
  4. {"id":4,"name":"Jon","age":29,"sal":1200}
'
运行

读取parquet文件:

  1. df = spark.read.parquet("data/users.parquet")
  2. df.show()

读取csv文件:

  1. #方式1
  2. df = spark.read.option("header","true") \
  3. .option("inferSchema","true") \
  4. .option("delimiter", ",") \
  5. .csv("test.csv")
  6. #方式2
  7. df = spark.read.format("com.databricks.spark.csv") \
  8. .option("header", "true") \
  9. .option("inferSchema", "true") \
  10. .option("delimiter", ",") \
  11. .load("test.csv")
  12. #方式3
  13. df = spark.read.csv(path="test.csv",
  14. header=True, #指定将第一行作为列名
  15. inferSchema=True, #自动推断出每列的数据类型
  16. sep=',' #分隔符
  17. )
  18. df.show()

输出结果:

  1. +---+-----+---+----+
  2. | id| name|age| sal|
  3. +---+-----+---+----+
  4. | 1|James| 27|1000|
  5. | 2| Bob| 22| 500|
  6. | 3|Alice| 25| 800|
  7. | 4| Jon| 29|1200|
  8. +---+-----+---+----+

读取Hive数据表:

  1. spark.sql("CREATE TABLE IF NOT EXISTS test (id INT, name STRING, age INT, sal FLOAT) USING hive")
  2. spark.sql("LOAD DATA LOCAL INPATH 'data/test.txt' INTO TABLE test")
  3. df = spark.sql("SELECT * FROM test")

三、保存DataFrame

 通过df.write()对DataFrame进行保存。

  1. #保存为csv文件
  2. df.write.format("csv").option("header","true").save("data/test.csv")
  3. #保存为json文件
  4. df.write.json("data/test.json")
  5. #保存成parquet文件
  6. df.write.parquet("data/test.parquet")
  7. df.write.partitionBy("age").format("parquet").save("data/test.parquet")
  8. #保存成hive数据表
  9. df.write.bucketBy(2, "name").sortBy("age").saveAsTable("test")

四、DataFrame API

1、显示数据

①df.collect()

用于将DataFrame中的所有行收集到Driver节点上,并以列表的形式返回这些行。

  1. df = spark.createDataFrame([(1,"James",27,1000),
  2. (2,"Bob",22,500),
  3. (3,"Alice",25,800),
  4. (4,"Jon",29,1200)]
  5. ,["id","name","age","sal"])
  6. df.collect()

输出结果:

  1. [Row(id=1, name='James', age=27, sal=1000),
  2. Row(id=2, name='Bob', age=22, sal=500),
  3. Row(id=3, name='Alice', age=25, sal=800),
  4. Row(id=4, name='Jon', age=29, sal=1200)]

②df.first()

获取第一行数据。

df.first()

输出结果:

Row(id=1, name='James', age=27, sal=1000)

③df.head(n)

获取前n行数据。

df.head(2)

输出结果:

  1. [Row(id=1, name='James', age=27, sal=1000),
  2. Row(id=2, name='Bob', age=22, sal=500)]

④df.show(n)

与df.head(n)类似,但是df.show(n)是打印成表格。

df.show(2)

输出结果:

  1. +---+-----+---+----+
  2. | id| name|age| sal|
  3. +---+-----+---+----+
  4. | 1|James| 27|1000|
  5. | 2| Bob| 22| 500|
  6. +---+-----+---+----+
  7. only showing top 2 rows

⑤df.printSchema()

用于打印DataFrame的模式schema,定义了各列的名称和类型。

df.printSchema()

输出结果:

  1. root
  2. |-- id: long (nullable = true)
  3. |-- name: string (nullable = true)
  4. |-- age: long (nullable = true)
  5. |-- sal: long (nullable = true)

2、统计信息

①df.describe()

一般与df.show()连用,用于查看DataFrame的数据分布。

df.describe().show()

输出结果:

  1. +-------+------------------+-----+------------------+----------------+
  2. |summary| id| name| age| sal|
  3. +-------+------------------+-----+------------------+----------------+
  4. | count| 4| 4| 4| 4|
  5. | mean| 2.5| null| 25.75| 875.0|
  6. | stddev|1.2909944487358056| null|2.9860788111948193|298.607881119482|
  7. | min| 1|Alice| 22| 500|
  8. | max| 4| Jon| 29| 1200|
  9. +-------+------------------+-----+------------------+----------------+

若只想查看某一列的数据分布,如:df.describe('age').show()。

②df.count()

返回数据总行数。

df.count()

输出结果:

4

③聚合函数

一些常用的聚合函数如下sum()、mean()、min()、max()、avg()

  1. #求最小工资
  2. df.select(F.min(df['sal'])).show()
  3. #输出结果:
  4. +--------+
  5. |min(sal)|
  6. +--------+
  7. | 500|
  8. +--------+
  9. #求最大工资
  10. df.select(F.max(df['sal'])).show()
  11. #输出结果:
  12. +--------+
  13. |max(sal)|
  14. +--------+
  15. | 1200|
  16. +--------+
  17. #求总工资
  18. df.select(F.sum(df['sal'])).show()
  19. #输出结果:
  20. +--------+
  21. |sum(sal)|
  22. +--------+
  23. | 3500|
  24. +--------+
  25. #求平均工资
  26. df.select(F.avg(df['sal'])).show()
  27. #输出结果:
  28. +--------+
  29. |avg(sal)|
  30. +--------+
  31. | 875.0|
  32. +--------+

同时对多列操作:

  1. df.agg({"name":"count","age":"max","sal":"avg"}).show()
  2. #输出:
  3. +-----------+--------+--------+
  4. |count(name)|max(age)|avg(sal)|
  5. +-----------+--------+--------+
  6. | 4| 29| 875.0|
  7. +-----------+--------+--------+

④df.stat.freqItems()

统计值出现的频率。

  1. #统计age、name两列出现频率超过0.25的值
  2. df.stat.freqItems(("age","name"),0.25).show()

输出结果:

  1. +----------------+--------------------+
  2. | age_freqItems| name_freqItems|
  3. +----------------+--------------------+
  4. |[29, 22, 25, 27]|[Bob, Jon, Alice,...|
  5. +----------------+--------------------+

3、类RDD操作

可以把DataFrame当做数据类型为Row的RDD来进行操作。

部分操作需要先转换为RDD才能运行,如map、flatMap等等。

部分操作可以直接在DataFrame上进行,如filter、distinct、sample、cache、intersect等等。

①df.map()

  1. #所有人age+1
  2. rdd = df.rdd.map(lambda x:Row(x[2]+1))
  3. rdd.toDF(["age"]).show()

输出结果:

  1. +---+
  2. |age|
  3. +---+
  4. | 28|
  5. | 23|
  6. | 26|
  7. | 30|
  8. +---+

②df.flatMap()

  1. rdd = df.rdd.flatMap(lambda x:x[1].split('o')).map(lambda x:Row(x))
  2. rdd.toDF(["name"]).show()

输出结果:

  1. +-----+
  2. | name|
  3. +-----+
  4. |James|
  5. | B|
  6. | b|
  7. |Alice|
  8. | J|
  9. | n|
  10. +-----+

③df.filter()

  1. #筛选工资大于800的
  2. df.filter(F.col("sal")>800).show()
  3. +---+-----+---+----+
  4. | id| name|age| sal|
  5. +---+-----+---+----+
  6. | 1|James| 27|1000|
  7. | 4| Jon| 29|1200|
  8. +---+-----+---+----+
  9. #筛选姓名为Bob,以下三种方式输出结果一致
  10. df.filter(F.col("name")=="Bob").show()
  11. df.filter(df["name"]=="Bob").show()
  12. df.filter("name='Bob'").show()
  13. +---+----+---+---+
  14. | id|name|age|sal|
  15. +---+----+---+---+
  16. | 2| Bob| 22|500|
  17. +---+----+---+---+
  18. #筛选姓名以J开头的
  19. df.filter(F.col("name").startswith("J")).show()
  20. +---+-----+---+----+
  21. | id| name|age| sal|
  22. +---+-----+---+----+
  23. | 1|James| 27|1000|
  24. | 4| Jon| 29|1200|
  25. +---+-----+---+----+
  26. #筛选除指定值外的其他数据
  27. broads = sc.broadcast(["James","Bob"])
  28. df.filter(~F.col("name").isin(broads.value)).show()
  29. +---+-----+---+----+
  30. | id| name|age| sal|
  31. +---+-----+---+----+
  32. | 3|Alice| 25| 800|
  33. | 4| Jon| 29|1200|
  34. +---+-----+---+----+

④df.distinct()

  1. #去重
  2. df.distinct().show()

⑤df.cache()

  1. #cache缓存
  2. df.cache()
  3. #释放缓存
  4. df.unpersist()

⑥df.sample()

随机抽样。

  1. #withReplacement=False表示无放回,即抽取不重复的数据
  2. #fraction=0.5表示抽样的比例为50%
  3. #seed为随机种子,用于复现
  4. df_sample = df.sample(withReplacement=False, fraction=0.5, seed=2)
  5. df_sample.show()

输出结果:

  1. +---+-----+---+----+
  2. | id| name|age| sal|
  3. +---+-----+---+----+
  4. | 1|James| 27|1000|
  5. | 3|Alice| 25| 800|
  6. +---+-----+---+----+

⑦df.intersect(df)

取两个DataFrame所有交集的行,返回结果不包含重复行

df.intersect(df_sample).show()

输出结果:

  1. +---+-----+---+----+
  2. | id| name|age| sal|
  3. +---+-----+---+----+
  4. | 3|Alice| 25| 800|
  5. | 1|James| 27|1000|
  6. +---+-----+---+----+

⑧df.exceptAll()

求差集。

  1. #从df中移除与df_sample相同的行,返回一个新的DataFrame
  2. df.exceptAll(df_sample).show()

输出结果:

  1. +---+----+---+----+
  2. | id|name|age| sal|
  3. +---+----+---+----+
  4. | 2| Bob| 22| 500|
  5. | 4| Jon| 29|1200|
  6. +---+----+---+----+

4、类Excel操作

①df.withColumn()

增加列。

  1. df = df.withColumn("birthyear",-df["age"]+2024)
  2. df.show()

输出结果:

  1. +---+-----+---+----+---------+
  2. | id| name|age| sal|birthyear|
  3. +---+-----+---+----+---------+
  4. | 1|James| 27|1000| 1997|
  5. | 2| Bob| 22| 500| 2002|
  6. | 3|Alice| 25| 800| 1999|
  7. | 4| Jon| 29|1200| 1995|
  8. +---+-----+---+----+---------+

②df.select()

筛选列。

df.select("name","age").show()

输出结果:

  1. +-----+---+
  2. | name|age|
  3. +-----+---+
  4. |James| 27|
  5. | Bob| 22|
  6. |Alice| 25|
  7. | Jon| 29|
  8. +-----+---+

③df.drop()

删除列。

  1. #删除一列
  2. df.drop("age").show()
  3. #删除多列
  4. df.drop(*["age","birthyear"]).show()

④df.withColumnRenamed()

对列进行重命名。

  1. #对一列进行重命名
  2. df.withColumnRenamed("sal","salary").show()
  3. #对多列进行重命名
  4. df.withColumnRenamed("sal","salary").withColumnRenamed("birthyear","year").show()

⑤df.sort()、df.orderBy()

按照某一列或某几列进行排序。

  1. #按照某一列进行排序
  2. df.sort(df["age"].desc()).show() #降序
  3. df.sort(df["age"].asc()).show() #升序
  4. #按照某几列进行排序
  5. df.orderBy(F.col("age").asc(), F.col("sal").desc()).show()

⑥df.na.drop()、df.na.fill()

处理带空值的行。

注意,在填充空值时,只能对相同数据类型的列的空值进行填充。

  1. df = spark.createDataFrame([(1,"James",27,1000),(2,"Bob",22,500),(3,"Alice",25,800),(4,None,29,None)]
  2. ,["id","name","age","sal"])
  3. df.show()
  4. +---+-----+---+----+
  5. | id| name|age| sal|
  6. +---+-----+---+----+
  7. | 1|James| 27|1000|
  8. | 2| Bob| 22| 500|
  9. | 3|Alice| 25| 800|
  10. | 4| null| 29|null|
  11. +---+-----+---+----+
  12. #删除带有nan值的行
  13. df.na.drop().show()
  14. +---+-----+---+----+
  15. | id| name|age| sal|
  16. +---+-----+---+----+
  17. | 1|James| 27|1000|
  18. | 2| Bob| 22| 500|
  19. | 3|Alice| 25| 800|
  20. +---+-----+---+----+
  21. #填充nan值
  22. df.na.fill("Jon").show()
  23. +---+-----+---+----+
  24. | id| name|age| sal|
  25. +---+-----+---+----+
  26. | 1|James| 27|1000|
  27. | 2| Bob| 22| 500|
  28. | 3|Alice| 25| 800|
  29. | 4| Jon| 29|null|
  30. +---+-----+---+----+
  31. df.na.fill(0).show()
  32. +---+-----+---+----+
  33. | id| name|age| sal|
  34. +---+-----+---+----+
  35. | 1|James| 27|1000|
  36. | 2| Bob| 22| 500|
  37. | 3|Alice| 25| 800|
  38. | 4| null| 29| 0|
  39. +---+-----+---+----+

⑦df.replace()

替换指定的值。

  1. #注意,不能同时对不同数据类型的值进行替换
  2. #例如,这句代码会报错:df.replace({"James": "Jim",1000: 100}).show()
  3. df.replace({"James": "Jim", "Bob":"Bieber" }).show()
  4. df.replace({1000: 100}).show()

⑧df.dropDuplicates()

跟distinct方法不同的是,dropDuplicates方法接收传参,可以根据指定字段去重。

df.dropDuplicates(["name"]).show()

5、类SQL表操作

类SQL表操作主要包括表查询(select,selectExpr,where),表连接(join,union,unionAll),表分组(groupby,agg,pivot)等操作。

①df.select()

df.select()用于对DataFrame进行列的表达式操作,允许使用SQL表达式来筛选、计算列。

  1. #筛选两列,并限制输出前两行
  2. #df.limit(2)与df.take(2)类似,不过limit输出的是DataFrame,take输出的是list
  3. df.select("age","name").limit(2).show()
  4. +---+-----+
  5. |age| name|
  6. +---+-----+
  7. | 27|James|
  8. | 22| Bob|
  9. +---+-----+
  10. #可以对列进行操作
  11. df.select("name",df["age"] + 1).show()
  12. +-----+---------+
  13. | name|(age + 1)|
  14. +-----+---------+
  15. |James| 28|
  16. | Bob| 23|
  17. |Alice| 26|
  18. | Jon| 30|
  19. +-----+---------+
  20. #通过toDF()对列进行重命名
  21. df.select("name",-df["age"]+2024).toDF("name","birth_year").show()
  22. +-----+----------+
  23. | name|birth_year|
  24. +-----+----------+
  25. |James| 1997|
  26. | Bob| 2002|
  27. |Alice| 1999|
  28. | Jon| 1995|
  29. +-----+----------+

②df.selectExpr()

df.selectExpr()用于对DataFrame进行列的表达式操作,允许使用SQL表达式来筛选、计算和重命名列。参数是一个字符串列表,其中每个字符串都是一个SQL表达式。

  1. #创建一个UDF函数
  2. spark.udf.register("getBirthYear",lambda x:2024-x)
  3. #调用函数对列进行转换并重命名
  4. df.selectExpr("name",
  5. "getBirthYear(age) as birth_year" ,
  6. "UPPER(name) as NAME" ).show()
  7. +-----+----------+-----+
  8. | name|birth_year| NAME|
  9. +-----+----------+-----+
  10. |James| 1997|JAMES|
  11. | Bob| 2002| BOB|
  12. |Alice| 1999|ALICE|
  13. | Jon| 1995| JON|
  14. +-----+----------+-----+
  15. #使用row_number()函数
  16. df.selectExpr("name","age",
  17. "row_number() over (order by age desc) as order").show()
  18. +-----+---+-----+
  19. | name|age|order|
  20. +-----+---+-----+
  21. | Jon| 29| 1|
  22. |James| 27| 2|
  23. |Alice| 25| 3|
  24. | Bob| 22| 4|
  25. +-----+---+-----+

使用df.selectExpr()还可以将DataFrame转换为复合类型。

  1. #array类型
  2. dfarray = df.selectExpr("name","array(age,sal) as info")
  3. dfarray.selectExpr("name","info[0] as age","info[1] as sal").show()
  4. #struct类型
  5. df_struct = df.selectExpr("name","struct(age,sal) as info")
  6. df_struct.selectExpr("name","info.age","info.sal").show()
  7. #map类型
  8. df_map = df.selectExpr("name","map('age',age,'sal',sal) as info")
  9. df_map.selectExpr("name","info['age'] as age","info['sal'] as sal").show()
  10. #输出结果
  11. +-----+---+----+
  12. | name|age| sal|
  13. +-----+---+----+
  14. |James| 27|1000|
  15. | Bob| 22| 500|
  16. |Alice| 25| 800|
  17. | Jon| 29|1200|
  18. +-----+---+----+
  19. #构造named_struct类型
  20. df_named_struct = df.selectExpr("name","named_struct('age',age,'sal',sal) as info")
  21. df_named_struct.show()
  22. +-----+----------+
  23. | name| info|
  24. +-----+----------+
  25. |James|{27, 1000}|
  26. | Bob| {22, 500}|
  27. |Alice| {25, 800}|
  28. | Jon|{29, 1200}|
  29. +-----+----------+
  30. #转换为json类型
  31. df_named_struct.selectExpr("name","to_json(info) as json_info").show()
  32. +-----+--------------------+
  33. | name| json_info|
  34. +-----+--------------------+
  35. |James|{"age":27,"sal":1...|
  36. | Bob|{"age":22,"sal":500}|
  37. |Alice|{"age":25,"sal":800}|
  38. | Jon|{"age":29,"sal":1...|
  39. +-----+--------------------+

③df.where()

用法与SQL中的where一致,注意书写时,等于是一个“=”。

df.where("name='Bob' or age=27").show()

输出结果:

  1. +---+-----+---+----+
  2. | id| name|age| sal|
  3. +---+-----+---+----+
  4. | 1|James| 27|1000|
  5. | 2| Bob| 22| 500|
  6. +---+-----+---+----+

④df.join(df)

df.join(df, on='列名', how='inner')。on参数可以指定连接方式为inner、left、right、outer、semi、full、leftanti、anti"等多种方式。关联的列如果有多列,则传入一个列名list。

  1. scores = spark.createDataFrame([("James","English",90),("James","Math",60),
  2. ("Bob","Math",50),("Bob","Physics",50),
  3. ("Alice","Math",70),("Alice","Physics",80),
  4. ("Jon","English",40),("Jon","Math",80)
  5. ]) \
  6. .toDF("name","subject","score")
  7. scores.show()
  8. df.join(scores,on="name",how="inner").show()
  9. df.join(scores,df["name"]==scores["name"],"inner").show()
  10. +-----+---+---+----+-------+-----+
  11. | name| id|age| sal|subject|score|
  12. +-----+---+---+----+-------+-----+
  13. |Alice| 3| 25| 800| Math| 70|
  14. |Alice| 3| 25| 800|Physics| 80|
  15. | Bob| 2| 22| 500| Math| 50|
  16. | Bob| 2| 22| 500|Physics| 50|
  17. |James| 1| 27|1000|English| 90|
  18. |James| 1| 27|1000| Math| 60|
  19. | Jon| 4| 29|1200|English| 40|
  20. | Jon| 4| 29|1200| Math| 80|
  21. +-----+---+---+----+-------+-----+

⑤df.union(df)、df.unionAll(df)

  1. df.where("name='Jon'").union(df.limit(2)).show()
  2. df.where("name='Jon'").unionAll(df.limit(2)).show()

输出结果:

  1. +---+-----+---+----+
  2. | id| name|age| sal|
  3. +---+-----+---+----+
  4. | 4| Jon| 29|1200|
  5. | 1|James| 27|1000|
  6. | 2| Bob| 22| 500|
  7. +---+-----+---+----+

⑥df.groupBy()

  1. df_join = df.join(scores,on="name",how="inner")
  2. df_join.groupBy("name").mean("score").show()
  3. +-----+----------+
  4. | name|avg(score)|
  5. +-----+----------+
  6. |Alice| 75.0|
  7. | Bob| 50.0|
  8. |James| 75.0|
  9. | Jon| 60.0|
  10. +-----+----------+
  11. #多列聚合,并重命名
  12. df_join.groupBy("name")\
  13. .agg(F.mean("age").alias("mean_age"),
  14. F.collect_list("score").alias("scores")
  15. ).show()
  16. +-----+----------+--------+
  17. | name|mean_score| scores|
  18. +-----+----------+--------+
  19. |Alice| 75.0|[70, 80]|
  20. | Bob| 50.0|[50, 50]|
  21. |James| 75.0|[90, 60]|
  22. | Jon| 60.0|[40, 80]|
  23. +-----+----------+--------+
  24. #与以上输出结果一致
  25. df_join.groupBy("name").agg(F.expr("mean(score) as mean_score"),
  26. F.expr("collect_list(score) as scores")
  27. ).show()
  28. #数据透视表(行转列)
  29. df_join.groupBy("subject").pivot("name").max("score").show()
  30. +-------+-----+----+-----+----+
  31. |subject|Alice| Bob|James| Jon|
  32. +-------+-----+----+-----+----+
  33. | Math| 70| 50| 60| 80|
  34. |English| null|null| 90| 40|
  35. |Physics| 80| 50| null|null|
  36. +-------+-----+----+-----+----+

五、DataFrame+SQL

将DataFrame注册为临时表视图或者全局表视图后,可以使用SQL select语句对DataFrame进行操作,从而方便地实现对数据的查询、排序,不过由于DataFrame不可变,所以不支持delete、truncate、update等语句。不过可以通过SparkSQL对Hive表直接进行增删改查等操作。

1、注册视图

当使用createOrReplaceTempView()方法时,会创建一个临时表视图。这个视图只在当前的SparkSession中有效,当会话结束或者程序终止时,该视图也会随之消失。

如果使用createOrReplaceGlobalTempView方法,则会创建一个全局临时表视图。与临时表视图不同,全局临时表在整个Spark应用程序中都是有效的,不会因为单个SparkSession的结束而失效。

无论是临时表视图还是全局表视图,都不会占用额外的内存空间,它们实际上是对现有DataFrame的一种引用或者说是一种命名方式,方便用户通过SQL语句来进行数据操作。

①临时表视图

  1. df.createOrReplaceTempView("test")
  2. query='''select * from test
  3. where age>26
  4. '''
  5. spark.sql(query).show()
  6. +---+-----+---+----+
  7. | id| name|age| sal|
  8. +---+-----+---+----+
  9. | 1|James| 27|1000|
  10. | 4| Jon| 29|1200|
  11. +---+-----+---+----+

②全局表视图

  1. df.createOrReplaceGlobalTempView("test")
  2. query='''select * from global_temp.test
  3. where age>26
  4. '''
  5. spark.sql(query).show()
  6. #创建一个新Session也能使用全局表
  7. spark.newSession().sql(query).show()
  8. +---+-----+---+----+
  9. | id| name|age| sal|
  10. +---+-----+---+----+
  11. | 1|James| 27|1000|
  12. | 4| Jon| 29|1200|
  13. +---+-----+---+----+

2、操作Hive表

①创建表

  1. hsql = """CREATE TABLE IF NOT EXISTS `test`(
  2. `name` STRING COMMENT '姓名',
  3. `age` INT COMMENT '年龄'
  4. )
  5. PARTITIONED BY ( `sex` STRING COMMENT '性别')
  6. """.replace("\n"," ")
  7. spark.sql(hsql)

②删除表

  1. hsql= "DROP TABLE IF EXISTS test"
  2. spark.sql(hsql)

③写入表

  1. #动态写入数据到hive分区表
  2. spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
  3. df = spark.createDataFrame([("James",27,"1"),
  4. ("Bob",22,"1"),
  5. ("Alice",25,"0"),
  6. ("Jon",29,"1")]).toDF("name","age","sex")
  7. df.write.mode("overwrite").format("hive")\
  8. .partitionBy("sex").saveAsTable("test")

六、总结

总的来说,Spark SQL是一个功能强大的工具,适合于处理大规模数据集和进行复杂的数据分析。Spark SQL能够访问多种数据源,包括本地数据集、HDFS、Hive、HBase等,并且通过集成类RDD、类Excel、类SQL的数据处理操作,增强了数据处理的易用性和多样性。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小惠珠哦/article/detail/924150
推荐阅读
相关标签
  

闽ICP备14008679号