赞
踩
目录
PySpark系列文章:
(四)PySpark3:Mlib机器学习实战-信用卡交易数据异常检测
Spark SQL是Apache Spark生态系统的一个关键组件,专注于处理和分析结构化和半结构化的大规模数据。Spark SQL建立在Spark核心之上,为用户提供了高效且易用的数据处理接口,从而将关系型和非关系型数据融入到分布式计算环境中。
核心概念之一是DataFrame API,它提供了一个高级的、面向数据的抽象,允许用户以声明性的方式处理数据。DataFrame是一个分布式的、具有表格结构的数据集,类似于传统数据库中的表。通过DataFrame API,用户可以执行各种数据操作,包括过滤、转换、聚合、连接等,而无需深入了解底层的分布式计算模型。
RDD,DataFrame和DataSet对比:
RDD可以存储任何类型的数据,包括结构化数据、半结构化数据和非结构化数据,RDD的操作更接近底层,更适合对数据进行底层控制和自定义处理。
DataFrame构建在RDD之上,提供了更高级的抽象,是一个分布式的、以列为主的数据集合,类似于关系型数据库中的表。DataFrame可以通过多种数据源进行创建,包括结构化数据源(如JSON、CSV、Parquet)和Hive表,并且提供了丰富的SQL和DataFrameAPI,可以方便地进行数据处理和分析。
DataSet在DataFrame基础上进一步增加了数据类型信息,可以通过编程语言的类型系统来检查错误,并提供更好的编译时类型检查。
DataFrame和DataSet都支持SQL交互式查询,可以和 Hive无缝衔接。DataSet只有Scala语言和Java语言接口中才支持,在Python和R语言接口只支持DataFrame。
首先导包:
- import pandas as pd
- from pyspark.sql.types import *
- import pyspark.sql.functions as F
- from pyspark.sql import Row
- pd.DataFrame.iteritems = pd.DataFrame.items
可以将RDD用toDF方法转换成DataFrame。
- rdd = sc.parallelize([(1,"James",27,1000),(2,"Bob",22,500),(3,"Alice",25,800),(4,"Jon",29,1200)])
- df = rdd.toDF(["id","name","age","sal"])
- df.show()
输出结果:
- +---+-----+---+----+
- | id| name|age| sal|
- +---+-----+---+----+
- | 1|James| 27|1000|
- | 2| Bob| 22| 500|
- | 3|Alice| 25| 800|
- | 4| Jon| 29|1200|
- +---+-----+---+----+
可以将Pandas.DataFrame转换成pyspark中的DataFrame,也可直接对数据列表、schema进行转换。
- #将pandas.DataFrame转换为pyspark.DataFrame
- pdf = pd.DataFrame([(1,"James",27,1000),(2,"Bob",22,500),(3,"Alice",25,800),(4,"Jon",29,1200)]
- ,columns=["id","name","age","sal"])
- df = spark.createDataFrame(pdf)
- print(type(df))
-
- #将list转换为pyspark.DataFrame
- df = spark.createDataFrame([(1,"James",27,1000),(2,"Bob",22,500),(3,"Alice",25,800),(4,"Jon",29,1200)]
- ,["id","name","age","sal"])
- print(type(df))
-
- #根据指定rdd和schema创建pyspark.DataFrame
- schema = StructType([StructField("id", IntegerType(), nullable = False),
- StructField("name", StringType(), nullable = True),
- StructField("age", IntegerType(), nullable = True),
- StructField("sal", FloatType(), nullable = True),
- ])
-
- rdd = sc.parallelize([Row(1,"James",27,1000),
- Row(2,"Bob",22,500),
- Row(3,"Alice",25,800),
- Row(4,"Jon",29,1200),
- ])
-
- df = spark.createDataFrame(rdd, schema)
- print(type(df))

输出结果:
- <class 'pyspark.sql.dataframe.DataFrame'>
- <class 'pyspark.sql.dataframe.DataFrame'>
可以通过读取json、csv等文件,或hive、mysql数据表得到DataFrame。
spark.read.csv(...): 从 CSV 文件中读取数据。
spark.read.json(...): 从 JSON 文件中读取数据。
spark.read.parquet(...): 从 Parquet 文件中读取数据。
spark.read.text(...): 读取文本文件。
spark.read.format(...): 使用指定的格式读取数据。
读取json文件:
- df = spark.read.json("test.json")
- df.show()
输出结果:test.json内容如下:
- {"id":1,"name":"James","age":27,"sal":1000}
- {"id":2,"name":"Bob","age":22,"sal":500}
- {"id":3,"name":"Alice","age":25,"sal":800}
- {"id":4,"name":"Jon","age":29,"sal":1200}
'运行
读取parquet文件:
- df = spark.read.parquet("data/users.parquet")
- df.show()
读取csv文件:
- #方式1
- df = spark.read.option("header","true") \
- .option("inferSchema","true") \
- .option("delimiter", ",") \
- .csv("test.csv")
-
- #方式2
- df = spark.read.format("com.databricks.spark.csv") \
- .option("header", "true") \
- .option("inferSchema", "true") \
- .option("delimiter", ",") \
- .load("test.csv")
-
- #方式3
- df = spark.read.csv(path="test.csv",
- header=True, #指定将第一行作为列名
- inferSchema=True, #自动推断出每列的数据类型
- sep=',' #分隔符
- )
- df.show()

输出结果:
- +---+-----+---+----+
- | id| name|age| sal|
- +---+-----+---+----+
- | 1|James| 27|1000|
- | 2| Bob| 22| 500|
- | 3|Alice| 25| 800|
- | 4| Jon| 29|1200|
- +---+-----+---+----+
读取Hive数据表:
- spark.sql("CREATE TABLE IF NOT EXISTS test (id INT, name STRING, age INT, sal FLOAT) USING hive")
- spark.sql("LOAD DATA LOCAL INPATH 'data/test.txt' INTO TABLE test")
- df = spark.sql("SELECT * FROM test")
通过df.write()对DataFrame进行保存。
- #保存为csv文件
- df.write.format("csv").option("header","true").save("data/test.csv")
-
- #保存为json文件
- df.write.json("data/test.json")
-
- #保存成parquet文件
- df.write.parquet("data/test.parquet")
- df.write.partitionBy("age").format("parquet").save("data/test.parquet")
-
- #保存成hive数据表
- df.write.bucketBy(2, "name").sortBy("age").saveAsTable("test")
①df.collect()
用于将DataFrame中的所有行收集到Driver节点上,并以列表的形式返回这些行。
- df = spark.createDataFrame([(1,"James",27,1000),
- (2,"Bob",22,500),
- (3,"Alice",25,800),
- (4,"Jon",29,1200)]
- ,["id","name","age","sal"])
- df.collect()
输出结果:
- [Row(id=1, name='James', age=27, sal=1000),
- Row(id=2, name='Bob', age=22, sal=500),
- Row(id=3, name='Alice', age=25, sal=800),
- Row(id=4, name='Jon', age=29, sal=1200)]
②df.first()
获取第一行数据。
df.first()
输出结果:
Row(id=1, name='James', age=27, sal=1000)
③df.head(n)
获取前n行数据。
df.head(2)
输出结果:
- [Row(id=1, name='James', age=27, sal=1000),
- Row(id=2, name='Bob', age=22, sal=500)]
④df.show(n)
与df.head(n)类似,但是df.show(n)是打印成表格。
df.show(2)
输出结果:
- +---+-----+---+----+
- | id| name|age| sal|
- +---+-----+---+----+
- | 1|James| 27|1000|
- | 2| Bob| 22| 500|
- +---+-----+---+----+
- only showing top 2 rows
⑤df.printSchema()
用于打印DataFrame的模式schema,定义了各列的名称和类型。
df.printSchema()
输出结果:
- root
- |-- id: long (nullable = true)
- |-- name: string (nullable = true)
- |-- age: long (nullable = true)
- |-- sal: long (nullable = true)
①df.describe()
一般与df.show()连用,用于查看DataFrame的数据分布。
df.describe().show()
输出结果:
- +-------+------------------+-----+------------------+----------------+
- |summary| id| name| age| sal|
- +-------+------------------+-----+------------------+----------------+
- | count| 4| 4| 4| 4|
- | mean| 2.5| null| 25.75| 875.0|
- | stddev|1.2909944487358056| null|2.9860788111948193|298.607881119482|
- | min| 1|Alice| 22| 500|
- | max| 4| Jon| 29| 1200|
- +-------+------------------+-----+------------------+----------------+
若只想查看某一列的数据分布,如:df.describe('age').show()。
②df.count()
返回数据总行数。
df.count()
输出结果:
4
③聚合函数
一些常用的聚合函数如下sum()、mean()、min()、max()、avg()
- #求最小工资
- df.select(F.min(df['sal'])).show()
- #输出结果:
- +--------+
- |min(sal)|
- +--------+
- | 500|
- +--------+
-
-
- #求最大工资
- df.select(F.max(df['sal'])).show()
- #输出结果:
- +--------+
- |max(sal)|
- +--------+
- | 1200|
- +--------+
-
-
- #求总工资
- df.select(F.sum(df['sal'])).show()
- #输出结果:
- +--------+
- |sum(sal)|
- +--------+
- | 3500|
- +--------+
-
- #求平均工资
- df.select(F.avg(df['sal'])).show()
- #输出结果:
- +--------+
- |avg(sal)|
- +--------+
- | 875.0|
- +--------+

同时对多列操作:
- df.agg({"name":"count","age":"max","sal":"avg"}).show()
-
- #输出:
- +-----------+--------+--------+
- |count(name)|max(age)|avg(sal)|
- +-----------+--------+--------+
- | 4| 29| 875.0|
- +-----------+--------+--------+
④df.stat.freqItems()
统计值出现的频率。
- #统计age、name两列出现频率超过0.25的值
- df.stat.freqItems(("age","name"),0.25).show()
输出结果:
- +----------------+--------------------+
- | age_freqItems| name_freqItems|
- +----------------+--------------------+
- |[29, 22, 25, 27]|[Bob, Jon, Alice,...|
- +----------------+--------------------+
可以把DataFrame当做数据类型为Row的RDD来进行操作。
部分操作需要先转换为RDD才能运行,如map、flatMap等等。
部分操作可以直接在DataFrame上进行,如filter、distinct、sample、cache、intersect等等。
①df.map()
- #所有人age+1
- rdd = df.rdd.map(lambda x:Row(x[2]+1))
- rdd.toDF(["age"]).show()
输出结果:
- +---+
- |age|
- +---+
- | 28|
- | 23|
- | 26|
- | 30|
- +---+
②df.flatMap()
- rdd = df.rdd.flatMap(lambda x:x[1].split('o')).map(lambda x:Row(x))
- rdd.toDF(["name"]).show()
输出结果:
- +-----+
- | name|
- +-----+
- |James|
- | B|
- | b|
- |Alice|
- | J|
- | n|
- +-----+
③df.filter()
- #筛选工资大于800的
- df.filter(F.col("sal")>800).show()
-
- +---+-----+---+----+
- | id| name|age| sal|
- +---+-----+---+----+
- | 1|James| 27|1000|
- | 4| Jon| 29|1200|
- +---+-----+---+----+
-
- #筛选姓名为Bob,以下三种方式输出结果一致
- df.filter(F.col("name")=="Bob").show()
- df.filter(df["name"]=="Bob").show()
- df.filter("name='Bob'").show()
-
- +---+----+---+---+
- | id|name|age|sal|
- +---+----+---+---+
- | 2| Bob| 22|500|
- +---+----+---+---+
-
- #筛选姓名以J开头的
- df.filter(F.col("name").startswith("J")).show()
-
- +---+-----+---+----+
- | id| name|age| sal|
- +---+-----+---+----+
- | 1|James| 27|1000|
- | 4| Jon| 29|1200|
- +---+-----+---+----+
-
- #筛选除指定值外的其他数据
- broads = sc.broadcast(["James","Bob"])
- df.filter(~F.col("name").isin(broads.value)).show()
-
- +---+-----+---+----+
- | id| name|age| sal|
- +---+-----+---+----+
- | 3|Alice| 25| 800|
- | 4| Jon| 29|1200|
- +---+-----+---+----+

④df.distinct()
- #去重
- df.distinct().show()
⑤df.cache()
- #cache缓存
- df.cache()
-
- #释放缓存
- df.unpersist()
⑥df.sample()
随机抽样。
- #withReplacement=False表示无放回,即抽取不重复的数据
- #fraction=0.5表示抽样的比例为50%
- #seed为随机种子,用于复现
- df_sample = df.sample(withReplacement=False, fraction=0.5, seed=2)
- df_sample.show()
输出结果:
- +---+-----+---+----+
- | id| name|age| sal|
- +---+-----+---+----+
- | 1|James| 27|1000|
- | 3|Alice| 25| 800|
- +---+-----+---+----+
⑦df.intersect(df)
取两个DataFrame所有交集的行,返回结果不包含重复行
df.intersect(df_sample).show()
输出结果:
- +---+-----+---+----+
- | id| name|age| sal|
- +---+-----+---+----+
- | 3|Alice| 25| 800|
- | 1|James| 27|1000|
- +---+-----+---+----+
⑧df.exceptAll()
求差集。
- #从df中移除与df_sample相同的行,返回一个新的DataFrame
- df.exceptAll(df_sample).show()
输出结果:
- +---+----+---+----+
- | id|name|age| sal|
- +---+----+---+----+
- | 2| Bob| 22| 500|
- | 4| Jon| 29|1200|
- +---+----+---+----+
①df.withColumn()
增加列。
- df = df.withColumn("birthyear",-df["age"]+2024)
- df.show()
输出结果:
- +---+-----+---+----+---------+
- | id| name|age| sal|birthyear|
- +---+-----+---+----+---------+
- | 1|James| 27|1000| 1997|
- | 2| Bob| 22| 500| 2002|
- | 3|Alice| 25| 800| 1999|
- | 4| Jon| 29|1200| 1995|
- +---+-----+---+----+---------+
②df.select()
筛选列。
df.select("name","age").show()
输出结果:
- +-----+---+
- | name|age|
- +-----+---+
- |James| 27|
- | Bob| 22|
- |Alice| 25|
- | Jon| 29|
- +-----+---+
③df.drop()
删除列。
- #删除一列
- df.drop("age").show()
-
- #删除多列
- df.drop(*["age","birthyear"]).show()
④df.withColumnRenamed()
对列进行重命名。
- #对一列进行重命名
- df.withColumnRenamed("sal","salary").show()
-
- #对多列进行重命名
- df.withColumnRenamed("sal","salary").withColumnRenamed("birthyear","year").show()
⑤df.sort()、df.orderBy()
按照某一列或某几列进行排序。
- #按照某一列进行排序
- df.sort(df["age"].desc()).show() #降序
- df.sort(df["age"].asc()).show() #升序
-
- #按照某几列进行排序
- df.orderBy(F.col("age").asc(), F.col("sal").desc()).show()
⑥df.na.drop()、df.na.fill()
处理带空值的行。
注意,在填充空值时,只能对相同数据类型的列的空值进行填充。
- df = spark.createDataFrame([(1,"James",27,1000),(2,"Bob",22,500),(3,"Alice",25,800),(4,None,29,None)]
- ,["id","name","age","sal"])
- df.show()
-
- +---+-----+---+----+
- | id| name|age| sal|
- +---+-----+---+----+
- | 1|James| 27|1000|
- | 2| Bob| 22| 500|
- | 3|Alice| 25| 800|
- | 4| null| 29|null|
- +---+-----+---+----+
-
- #删除带有nan值的行
- df.na.drop().show()
-
- +---+-----+---+----+
- | id| name|age| sal|
- +---+-----+---+----+
- | 1|James| 27|1000|
- | 2| Bob| 22| 500|
- | 3|Alice| 25| 800|
- +---+-----+---+----+
-
- #填充nan值
- df.na.fill("Jon").show()
-
- +---+-----+---+----+
- | id| name|age| sal|
- +---+-----+---+----+
- | 1|James| 27|1000|
- | 2| Bob| 22| 500|
- | 3|Alice| 25| 800|
- | 4| Jon| 29|null|
- +---+-----+---+----+
-
- df.na.fill(0).show()
-
- +---+-----+---+----+
- | id| name|age| sal|
- +---+-----+---+----+
- | 1|James| 27|1000|
- | 2| Bob| 22| 500|
- | 3|Alice| 25| 800|
- | 4| null| 29| 0|
- +---+-----+---+----+
-

⑦df.replace()
替换指定的值。
- #注意,不能同时对不同数据类型的值进行替换
- #例如,这句代码会报错:df.replace({"James": "Jim",1000: 100}).show()
- df.replace({"James": "Jim", "Bob":"Bieber" }).show()
- df.replace({1000: 100}).show()
⑧df.dropDuplicates()
跟distinct方法不同的是,dropDuplicates方法接收传参,可以根据指定字段去重。
df.dropDuplicates(["name"]).show()
类SQL表操作主要包括表查询(select,selectExpr,where),表连接(join,union,unionAll),表分组(groupby,agg,pivot)等操作。
①df.select()
df.select()用于对DataFrame进行列的表达式操作,允许使用SQL表达式来筛选、计算列。
- #筛选两列,并限制输出前两行
- #df.limit(2)与df.take(2)类似,不过limit输出的是DataFrame,take输出的是list
- df.select("age","name").limit(2).show()
- +---+-----+
- |age| name|
- +---+-----+
- | 27|James|
- | 22| Bob|
- +---+-----+
-
- #可以对列进行操作
- df.select("name",df["age"] + 1).show()
- +-----+---------+
- | name|(age + 1)|
- +-----+---------+
- |James| 28|
- | Bob| 23|
- |Alice| 26|
- | Jon| 30|
- +-----+---------+
-
- #通过toDF()对列进行重命名
- df.select("name",-df["age"]+2024).toDF("name","birth_year").show()
- +-----+----------+
- | name|birth_year|
- +-----+----------+
- |James| 1997|
- | Bob| 2002|
- |Alice| 1999|
- | Jon| 1995|
- +-----+----------+

②df.selectExpr()
df.selectExpr()用于对DataFrame进行列的表达式操作,允许使用SQL表达式来筛选、计算和重命名列。参数是一个字符串列表,其中每个字符串都是一个SQL表达式。
- #创建一个UDF函数
- spark.udf.register("getBirthYear",lambda x:2024-x)
-
- #调用函数对列进行转换并重命名
- df.selectExpr("name",
- "getBirthYear(age) as birth_year" ,
- "UPPER(name) as NAME" ).show()
- +-----+----------+-----+
- | name|birth_year| NAME|
- +-----+----------+-----+
- |James| 1997|JAMES|
- | Bob| 2002| BOB|
- |Alice| 1999|ALICE|
- | Jon| 1995| JON|
- +-----+----------+-----+
-
- #使用row_number()函数
- df.selectExpr("name","age",
- "row_number() over (order by age desc) as order").show()
- +-----+---+-----+
- | name|age|order|
- +-----+---+-----+
- | Jon| 29| 1|
- |James| 27| 2|
- |Alice| 25| 3|
- | Bob| 22| 4|
- +-----+---+-----+

使用df.selectExpr()还可以将DataFrame转换为复合类型。
- #array类型
- dfarray = df.selectExpr("name","array(age,sal) as info")
- dfarray.selectExpr("name","info[0] as age","info[1] as sal").show()
-
- #struct类型
- df_struct = df.selectExpr("name","struct(age,sal) as info")
- df_struct.selectExpr("name","info.age","info.sal").show()
-
- #map类型
- df_map = df.selectExpr("name","map('age',age,'sal',sal) as info")
- df_map.selectExpr("name","info['age'] as age","info['sal'] as sal").show()
-
- #输出结果
- +-----+---+----+
- | name|age| sal|
- +-----+---+----+
- |James| 27|1000|
- | Bob| 22| 500|
- |Alice| 25| 800|
- | Jon| 29|1200|
- +-----+---+----+
-
- #构造named_struct类型
- df_named_struct = df.selectExpr("name","named_struct('age',age,'sal',sal) as info")
- df_named_struct.show()
- +-----+----------+
- | name| info|
- +-----+----------+
- |James|{27, 1000}|
- | Bob| {22, 500}|
- |Alice| {25, 800}|
- | Jon|{29, 1200}|
- +-----+----------+
-
- #转换为json类型
- df_named_struct.selectExpr("name","to_json(info) as json_info").show()
- +-----+--------------------+
- | name| json_info|
- +-----+--------------------+
- |James|{"age":27,"sal":1...|
- | Bob|{"age":22,"sal":500}|
- |Alice|{"age":25,"sal":800}|
- | Jon|{"age":29,"sal":1...|
- +-----+--------------------+

③df.where()
用法与SQL中的where一致,注意书写时,等于是一个“=”。
df.where("name='Bob' or age=27").show()
输出结果:
- +---+-----+---+----+
- | id| name|age| sal|
- +---+-----+---+----+
- | 1|James| 27|1000|
- | 2| Bob| 22| 500|
- +---+-----+---+----+
④df.join(df)
df.join(df, on='列名', how='inner')。on参数可以指定连接方式为inner、left、right、outer、semi、full、leftanti、anti"等多种方式。关联的列如果有多列,则传入一个列名list。
- scores = spark.createDataFrame([("James","English",90),("James","Math",60),
- ("Bob","Math",50),("Bob","Physics",50),
- ("Alice","Math",70),("Alice","Physics",80),
- ("Jon","English",40),("Jon","Math",80)
- ]) \
- .toDF("name","subject","score")
- scores.show()
-
-
- df.join(scores,on="name",how="inner").show()
- df.join(scores,df["name"]==scores["name"],"inner").show()
- +-----+---+---+----+-------+-----+
- | name| id|age| sal|subject|score|
- +-----+---+---+----+-------+-----+
- |Alice| 3| 25| 800| Math| 70|
- |Alice| 3| 25| 800|Physics| 80|
- | Bob| 2| 22| 500| Math| 50|
- | Bob| 2| 22| 500|Physics| 50|
- |James| 1| 27|1000|English| 90|
- |James| 1| 27|1000| Math| 60|
- | Jon| 4| 29|1200|English| 40|
- | Jon| 4| 29|1200| Math| 80|
- +-----+---+---+----+-------+-----+
-

⑤df.union(df)、df.unionAll(df)
- df.where("name='Jon'").union(df.limit(2)).show()
-
- df.where("name='Jon'").unionAll(df.limit(2)).show()
输出结果:
- +---+-----+---+----+
- | id| name|age| sal|
- +---+-----+---+----+
- | 4| Jon| 29|1200|
- | 1|James| 27|1000|
- | 2| Bob| 22| 500|
- +---+-----+---+----+
⑥df.groupBy()
- df_join = df.join(scores,on="name",how="inner")
- df_join.groupBy("name").mean("score").show()
- +-----+----------+
- | name|avg(score)|
- +-----+----------+
- |Alice| 75.0|
- | Bob| 50.0|
- |James| 75.0|
- | Jon| 60.0|
- +-----+----------+
-
- #多列聚合,并重命名
- df_join.groupBy("name")\
- .agg(F.mean("age").alias("mean_age"),
- F.collect_list("score").alias("scores")
- ).show()
- +-----+----------+--------+
- | name|mean_score| scores|
- +-----+----------+--------+
- |Alice| 75.0|[70, 80]|
- | Bob| 50.0|[50, 50]|
- |James| 75.0|[90, 60]|
- | Jon| 60.0|[40, 80]|
- +-----+----------+--------+
-
- #与以上输出结果一致
- df_join.groupBy("name").agg(F.expr("mean(score) as mean_score"),
- F.expr("collect_list(score) as scores")
- ).show()
-
-
- #数据透视表(行转列)
- df_join.groupBy("subject").pivot("name").max("score").show()
- +-------+-----+----+-----+----+
- |subject|Alice| Bob|James| Jon|
- +-------+-----+----+-----+----+
- | Math| 70| 50| 60| 80|
- |English| null|null| 90| 40|
- |Physics| 80| 50| null|null|
- +-------+-----+----+-----+----+

将DataFrame注册为临时表视图或者全局表视图后,可以使用SQL select语句对DataFrame进行操作,从而方便地实现对数据的查询、排序,不过由于DataFrame不可变,所以不支持delete、truncate、update等语句。不过可以通过SparkSQL对Hive表直接进行增删改查等操作。
当使用createOrReplaceTempView()方法时,会创建一个临时表视图。这个视图只在当前的SparkSession中有效,当会话结束或者程序终止时,该视图也会随之消失。
如果使用createOrReplaceGlobalTempView方法,则会创建一个全局临时表视图。与临时表视图不同,全局临时表在整个Spark应用程序中都是有效的,不会因为单个SparkSession的结束而失效。
无论是临时表视图还是全局表视图,都不会占用额外的内存空间,它们实际上是对现有DataFrame的一种引用或者说是一种命名方式,方便用户通过SQL语句来进行数据操作。
①临时表视图
- df.createOrReplaceTempView("test")
- query='''select * from test
- where age>26
- '''
- spark.sql(query).show()
- +---+-----+---+----+
- | id| name|age| sal|
- +---+-----+---+----+
- | 1|James| 27|1000|
- | 4| Jon| 29|1200|
- +---+-----+---+----+
②全局表视图
- df.createOrReplaceGlobalTempView("test")
- query='''select * from global_temp.test
- where age>26
- '''
- spark.sql(query).show()
-
-
- #创建一个新Session也能使用全局表
- spark.newSession().sql(query).show()
-
- +---+-----+---+----+
- | id| name|age| sal|
- +---+-----+---+----+
- | 1|James| 27|1000|
- | 4| Jon| 29|1200|
- +---+-----+---+----+

①创建表
- hsql = """CREATE TABLE IF NOT EXISTS `test`(
- `name` STRING COMMENT '姓名',
- `age` INT COMMENT '年龄'
- )
- PARTITIONED BY ( `sex` STRING COMMENT '性别')
- """.replace("\n"," ")
- spark.sql(hsql)
②删除表
- hsql= "DROP TABLE IF EXISTS test"
- spark.sql(hsql)
③写入表
- #动态写入数据到hive分区表
- spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
- df = spark.createDataFrame([("James",27,"1"),
- ("Bob",22,"1"),
- ("Alice",25,"0"),
- ("Jon",29,"1")]).toDF("name","age","sex")
- df.write.mode("overwrite").format("hive")\
- .partitionBy("sex").saveAsTable("test")
总的来说,Spark SQL是一个功能强大的工具,适合于处理大规模数据集和进行复杂的数据分析。Spark SQL能够访问多种数据源,包括本地数据集、HDFS、Hive、HBase等,并且通过集成类RDD、类Excel、类SQL的数据处理操作,增强了数据处理的易用性和多样性。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。