当前位置:   article > 正文

(三)PySpark3:SparkSQL40题_pyspark练习题

pyspark练习题

目录

一、前言

二、实践

三、总结


PySpark系列文章:

(一)PySpark3:安装教程及RDD编程

(二)PySpark3:SparkSQL编程

(三)PySpark3:SparkSQL40题

(四)PySpark3:Mlib机器学习实战-信用卡交易数据异常检测

一、前言

本文主要根据我写的另一篇文章:SQL笔试经典40题,使用PySpark SQL代码实现。

  1. import numpy as np
  2. import findspark
  3. import pyspark
  4. from pyspark import SparkContext, SparkConf
  5. from pyspark.sql import SparkSession
  6. import pyspark.sql.functions as F
  7. from pyspark.sql.window import Window
  8. import pandas as pd
  9. from pyspark.sql.types import *
  10. findspark.init()
  11. spark = SparkSession.builder \
  12. .appName("test") \
  13. .config("master","local[4]") \
  14. .enableHiveSupport() \
  15. .getOrCreate()
  16. sc = spark.sparkContext
  17. StudentData = [('01' , '赵雷' , '1990-01-01' , '男'),
  18. ('02' , '钱电' , '1990-12-21' , '男'),
  19. ('03' , '孙风' , '1990-05-20' , '男'),
  20. ('04' , '李云' , '1990-08-06' , '男'),
  21. ('05' , '周梅' , '1991-12-01' , '女'),
  22. ('06' , '吴兰' , '1992-03-01' , '女'),
  23. ('07' , '郑竹' , '1989-07-01' , '女'),
  24. ('08' , '王菊' , '1990-01-20' , '女')]
  25. CourseData = [('01' , '语文' , '02'),
  26. ('02' , '数学' , '01'),
  27. ('03' , '英语' , '03'),
  28. ('04' , '物理' , '01')]
  29. TeacherData = [('01' , '张三'),
  30. ('02' , '李四'),
  31. ('03' , '王五')]
  32. SCData = [('01' , '01' , 80),
  33. ('01' , '02' , 90),
  34. ('01' , '03' , 99),
  35. ('02' , '01' , 70),
  36. ('02' , '02' , 60),
  37. ('02' , '03' , 80),
  38. ('03' , '01' , 80),
  39. ('03' , '02' , 80),
  40. ('03' , '03' , 80),
  41. ('04' , '01' , 50),
  42. ('04' , '02' , 30),
  43. ('04' , '03' , 20),
  44. ('05' , '01' , 76),
  45. ('05' , '02' , 87),
  46. ('06' , '01' , 31),
  47. ('06' , '03' , 34),
  48. ('07' , '02' , 89),
  49. ('07' , '03' , 98),
  50. ('04' , '04' , 90)]
  51. df_student = spark.createDataFrame(StudentData,["sid","sname","sage","ssex"])
  52. df_course = spark.createDataFrame(CourseData,["cid","cname","tid"])
  53. df_teacher = spark.createDataFrame(TeacherData,["tid","tname"])
  54. df_sc = spark.createDataFrame(SCData,["sid","cid","score"])

二、实践

1、查询“01”课程比“02”课程成绩高的所有学生的学号

如果使用注册临时表视图的方式,那么直接通过spark.sql接受SQL语句进行查询就行了。

  1. df_sc.createOrReplaceTempView("sc")
  2. query='''select distinct t1.sid as sid
  3. from (select * from sc where cid='01') t1
  4. left join
  5. (select * from sc where cid='02') t2
  6. on t1.sid=t2.sid
  7. where t1.score>t2.score;
  8. '''
  9. spark.sql(query).show()

通过DataFrame API:

  1. t1 = df_sc.filter(df_sc.cid=='01').selectExpr("sid as sid1", "score as score1" )
  2. t2 = df_sc.filter(df_sc.cid=='02').selectExpr("sid as sid2", "score as score2" )
  3. t3 = t1.join(t2,on=(t1.sid1==t2.sid2),how="left")
  4. t3.filter(t3["score1"] >t3["score2"]).select("sid1").show()

输出结果:

为加强对DataFrame API的掌握,以下题目皆使用DataFrame API进行编程,不再使用spark.sql(SQL语句)的方式。

2、查询平均成绩大于60分的同学的学号和平均成绩;

  1. result = df_sc.groupBy("sid")\
  2. .agg(F.mean("score").alias("mean_score"))\
  3. .filter(F.col('mean_score')>60)
  4. result.show()

输出结果:

  1. +---+-----------------+
  2. |sid| mean_score|
  3. +---+-----------------+
  4. | 01|89.66666666666667|
  5. | 02| 70.0|
  6. | 03| 80.0|
  7. | 05| 81.5|
  8. | 07| 93.5|
  9. +---+-----------------+

3、查询所有同学的学号、姓名、选课数、总成绩。

  1. t1 = df_sc.groupBy("sid").agg(F.sum("score").alias("total_score"),
  2. F.count("cid").alias("course_cnt"))
  3. result =t1.join(df_student.select('sid','sname'),on='sid',how='inner')
  4. result.show()

输出结果:

  1. +---+-----------+----------+-----+
  2. |sid|total_score|course_cnt|sname|
  3. +---+-----------+----------+-----+
  4. | 01| 269| 3| 赵雷|
  5. | 02| 210| 3| 钱电|
  6. | 03| 240| 3| 孙风|
  7. | 04| 190| 4| 李云|
  8. | 05| 163| 2| 周梅|
  9. | 06| 65| 2| 吴兰|
  10. | 07| 187| 2| 郑竹|
  11. +---+-----------+----------+-----+

4、查询姓“李”的老师的个数

  1. #type1
  2. df_teacher.filter(F.col('tname').startswith('李')).count()
  3. #type2
  4. df_teacher.where("tname like'李%'").count()

输出结果:

1'
运行

5、查询没学过“张三”老师课的同学的学号、姓名;

  1. t1 = df_course.join(df_teacher.where("tname='张三'"),on='tid',how='inner').select('cid')
  2. t2 = df_sc.join(t1,on='cid',how='inner').select('sid')
  3. t3 = df_student.select('sid').exceptAll(t2)
  4. result = t3.join(df_student.select('sid','sname'),on='sid',how='inner').distinct()
  5. result.show()

输出结果:

  1. +---+-----+
  2. |sid|sname|
  3. +---+-----+
  4. | 06| 吴兰|
  5. | 08| 王菊|
  6. +---+-----+

6、查询学过“01”并且也学过编号“02”课程的同学的学号、姓名;

  1. t1 = df_sc.filter(df_sc.cid == '01').select(F.col('sid').alias('sid01'))
  2. t2 = df_sc.filter(df_sc.cid == '02').select(F.col('sid').alias('sid02'))
  3. t3 = t1.join(t2, t1.sid01==t2.sid02, 'inner').select('sid01')
  4. result = df_student.join(t3, df_student.sid == t3.sid01, 'inner').select('sid01', 'sname')
  5. result.show()

输出结果:

  1. +-----+-----+
  2. |sid01|sname|
  3. +-----+-----+
  4. | 01| 赵雷|
  5. | 02| 钱电|
  6. | 03| 孙风|
  7. | 04| 李云|
  8. | 05| 周梅|
  9. +-----+-----+

7、查询学过“张三”老师所教的课的同学的学号、姓名

  1. t1 = df_course.join(df_teacher.where("tname='张三'"),on='tid',how='inner').select('cid')
  2. t2 = df_sc.join(t1,on='cid',how='inner').select('sid')
  3. result = t2.join(df_student.select('sid','sname'),on='sid',how='inner').distinct()
  4. result.show()

输出结果:

  1. +---+-----+
  2. |sid|sname|
  3. +---+-----+
  4. | 01| 赵雷|
  5. | 02| 钱电|
  6. | 03| 孙风|
  7. | 04| 李云|
  8. | 05| 周梅|
  9. | 07| 郑竹|
  10. +---+-----+

8、查询课程编号“01”的成绩比课程编号“02”课程低的所有同学的学号、姓名

  1. t1 = df_sc.filter(df_sc.cid == '01').selectExpr("sid as sid01","score as score01")
  2. t2 = df_sc.filter(df_sc.cid == '02').selectExpr("sid as sid02","score as score02")
  3. t3 = t1.join(t2, t1.sid01==t2.sid02, 'inner').filter(F.col("score01")<F.col("score02")).select('sid01')
  4. result = t3.join(df_student.select('sid','sname'),t3.sid01==df_student.sid,how='inner').distinct()
  5. result.select('sid','sname').show()

输出结果:

  1. +---+-----+
  2. |sid|sname|
  3. +---+-----+
  4. | 01| 赵雷|
  5. | 05| 周梅|
  6. +---+-----+

9、查询所有课程成绩小于60分的同学的学号、姓名。

  1. t1 = df_sc.filter(df_sc.score>=60).select('sid')
  2. t2 = df_student.select('sid').exceptAll(t1)
  3. t2_sid_set = set(t2.rdd.map(lambda row: row.sid).collect())
  4. df_student.filter(df_student.sid.isin(t2_sid_set)).select('sid', 'sname').show()

输出结果:

  1. +---+-----+
  2. |sid|sname|
  3. +---+-----+
  4. | 06| 吴兰|
  5. | 08| 王菊|
  6. +---+-----+

10、查询没有学全所有课的同学的学号、姓名。

  1. counts = df_course.select("cid").distinct().count()
  2. df_course_cnt = df_sc.groupBy("sid").agg(F.count("cid").alias("course_cnt"))
  3. result = df_course_cnt.filter(F.col("course_cnt")<counts).join(df_student,on='sid',how='inner').select("sid","sname")
  4. result.show()

输出结果:

  1. +---+-----+
  2. |sid|sname|
  3. +---+-----+
  4. | 01| 赵雷|
  5. | 02| 钱电|
  6. | 03| 孙风|
  7. | 05| 周梅|
  8. | 06| 吴兰|
  9. | 07| 郑竹|
  10. +---+-----+

11、查询至少有一门课与学号为“01”的同学所学相同的同学的学号和姓名。

  1. courses_of_01 = df_sc.filter(F.col("sid") == "01").select("cid").rdd.map(lambda x:x.cid).collect()
  2. t1 = df_sc.filter((df_sc.cid.isin(courses_of_01))&(df_sc.sid!='01')).select('sid').distinct()
  3. result = t1.join(df_student,on='sid',how='inner').select("sid","sname")
  4. result.show()

输出结果:

  1. +---+-----+
  2. |sid|sname|
  3. +---+-----+
  4. | 02| 钱电|
  5. | 03| 孙风|
  6. | 04| 李云|
  7. | 05| 周梅|
  8. | 06| 吴兰|
  9. | 07| 郑竹|
  10. +---+-----+

12、查询和"01"号的同学学习的课程完全相同的其他同学的学号和姓名。

  1. courses_of_01 = df_sc.filter(F.col("sid") == "01").select("cid").rdd.map(lambda x:x.cid).collect()
  2. t1 = df_sc.filter((df_sc.cid.isin(courses_of_01))&(df_sc.sid!='01')).select('sid', 'cid')
  3. t2 = t1.groupby('sid').agg(F.count("cid").alias("course_cnt")).filter(F.col("course_cnt")==len(courses_of_01))
  4. t3 = df_sc.groupby('sid').agg(F.count("cid").alias("course_cnt")).filter(F.col("course_cnt")==len(courses_of_01))
  5. result = t2.select("sid").intersect(t3.select("sid")).join(df_student,on='sid',how='inner').select("sid","sname")
  6. result.show()

输出结果:

  1. +---+-----+
  2. |sid|sname|
  3. +---+-----+
  4. | 02| 钱电|
  5. | 03| 孙风|
  6. +---+-----+

13、把“SC”表中“张三”老师教的课的成绩都更改为此课程的平均成绩。

  1. tid_of_zhangsan = df_teacher.filter(F.col("tname")=='张三').select("tid").first().asDict().get("tid")
  2. t1 = df_course.filter(F.col("tid")==tid_of_zhangsan)
  3. t2 = t1.join(df_sc,on='cid',how='inner').groupby('cid').agg(F.mean('score').alias("score_avg"))
  4. result = df_sc.join(t2,on='cid',how='left').selectExpr("cid","sid","nvl(score_avg,score) as score")
  5. result.show()

输出结果:

  1. +---+---+-----------------+
  2. |cid|sid| score|
  3. +---+---+-----------------+
  4. | 01| 01| 80.0|
  5. | 02| 01|72.66666666666667|
  6. | 03| 01| 99.0|
  7. | 01| 02| 70.0|
  8. | 02| 02|72.66666666666667|
  9. | 01| 03| 80.0|
  10. | 03| 02| 80.0|
  11. | 02| 03|72.66666666666667|
  12. | 03| 03| 80.0|
  13. | 01| 04| 50.0|
  14. | 02| 04|72.66666666666667|
  15. | 01| 05| 76.0|
  16. | 03| 04| 20.0|
  17. | 02| 05|72.66666666666667|
  18. | 01| 06| 31.0|
  19. | 03| 06| 34.0|
  20. | 02| 07|72.66666666666667|
  21. | 03| 07| 98.0|
  22. | 04| 04| 90.0|
  23. +---+---+-----------------+

14、查询没学过"张三"老师讲授的任一门课程的学生姓名

  1. t1 = df_course.join(df_teacher.where("tname='张三'"),on='tid',how='inner').select('cid')
  2. t2 = df_sc.join(t1,on='cid',how='inner').select('sid')
  3. result = df_student.select('sid').exceptAll(t2).join(df_student,on='sid',how='inner').select("sid","sname")
  4. result.show()

输出结果:

  1. +---+-----+
  2. |sid|sname|
  3. +---+-----+
  4. | 06| 吴兰|
  5. | 08| 王菊|
  6. +---+-----+

15、查询两门及其以上不及格课程的同学的学号,姓名及其平均成绩。

  1. t1 = df_sc.filter(F.col("score") < 60).groupby("sid")\
  2. .agg(F.count("cid").alias("counts"),
  3. F.mean("score").alias("score_avg"))
  4. result = t1.filter(F.col("counts") >1)\
  5. .join(df_student,on='sid',how='inner')\
  6. .select("sid","sname","score_avg")
  7. result.show()

输出结果:

  1. +---+-----+------------------+
  2. |sid|sname| score_avg|
  3. +---+-----+------------------+
  4. | 04| 李云|33.333333333333336|
  5. | 06| 吴兰| 32.5|
  6. +---+-----+------------------+

16、检索"01"课程分数小于60,按分数降序排列的学生信息。

  1. t1 = df_sc.where("score<60 and cid='01'")\
  2. .join(df_student,on='sid',how='inner')\
  3. .select("sid","sname","score")
  4. result = t1.orderBy(F.col("score").desc())
  5. result.show()

输出结果:

  1. +---+-----+-----+
  2. |sid|sname|score|
  3. +---+-----+-----+
  4. | 04| 李云| 50|
  5. | 06| 吴兰| 31|
  6. +---+-----+-----+

17、按平均成绩从高到低显示所有学生的平均成绩

  1. t1 = df_sc.groupby("sid").agg(F.mean("score").alias("score_avg"))
  2. result = t1.select("sid","score_avg").orderBy(F.col("score_avg").desc())
  3. result.show()

输出结果:

  1. +---+-----------------+
  2. |sid| score_avg|
  3. +---+-----------------+
  4. | 07| 93.5|
  5. | 01|89.66666666666667|
  6. | 05| 81.5|
  7. | 03| 80.0|
  8. | 02| 70.0|
  9. | 04| 47.5|
  10. | 06| 32.5|
  11. +---+-----------------+

18、查询各科成绩最高分、最低分和平均分:以如下形式显示:课程ID,课程name,最高分,最低分,平均分,及格率。

  1. t1 = df_sc.groupby("cid").agg(F.max("score").alias("score_max"),
  2. F.min("score").alias("score_min"),
  3. F.mean("score").alias("score_avg"),
  4. (F.sum(F.when(F.col("score") >= 60, 1).otherwise(0))/F.count("*")).alias("pass_ratio")
  5. )
  6. result = t1.join(df_course,on='cid',how='left').select("cid","cname","score_max","score_min","score_avg","pass_ratio")
  7. result.show()

输出结果:

  1. +---+-----+---------+---------+-----------------+------------------+
  2. |cid|cname|score_max|score_min| score_avg| pass_ratio|
  3. +---+-----+---------+---------+-----------------+------------------+
  4. | 01| 语文| 80| 31| 64.5|0.6666666666666666|
  5. | 02| 数学| 90| 30|72.66666666666667|0.8333333333333334|
  6. | 03| 英语| 99| 20| 68.5|0.6666666666666666|
  7. | 04| 物理| 90| 90| 90.0| 1.0|
  8. +---+-----+---------+---------+-----------------+------------------+

19、按各科课程ID和成绩从高到低顺序排列。

  1. result = df_sc.orderBy(F.asc("cid"), F.desc("score"))
  2. result.show()

输出结果:

  1. +---+-----+
  2. |cid|score|
  3. +---+-----+
  4. | 01| 80|
  5. | 01| 80|
  6. | 01| 76|
  7. | 01| 70|
  8. | 01| 50|
  9. | 01| 31|
  10. | 02| 90|
  11. | 02| 89|
  12. | 02| 87|
  13. | 02| 80|
  14. | 02| 60|
  15. | 02| 30|
  16. | 03| 99|
  17. | 03| 98|
  18. | 03| 80|
  19. | 03| 80|
  20. | 03| 34|
  21. | 03| 20|
  22. | 04| 90|
  23. +---+-----+

20、查询学生的总成绩并进行排名

  1. t1 = df_sc.groupby("sid").agg(F.sum("score").alias("score_sum"))
  2. w = Window.orderBy(F.col("score_sum").desc())
  3. result = t1.withColumn("rank", F.row_number().over(w))
  4. result.show()

输出结果:

  1. +---+---------+----+
  2. |sid|score_sum|rank|
  3. +---+---------+----+
  4. | 01| 269| 1|
  5. | 03| 240| 2|
  6. | 02| 210| 3|
  7. | 04| 190| 4|
  8. | 07| 187| 5|
  9. | 05| 163| 6|
  10. | 06| 65| 7|
  11. +---+---------+----+

21、查询不同老师所教不同课程平均分从高到低显示

  1. t1 = df_sc.join(df_course,on='cid',how='left')\
  2. .join(df_teacher,on='tid',how='left')\
  3. .select("tname","cname","score")
  4. t2 = t1.groupby(["tname","cname"]).agg(F.mean("score").alias("score_mean"))
  5. result = t2.orderBy(F.col("score_mean").desc())
  6. result.show()

输出结果:

  1. +-----+-----+-----------------+
  2. |tname|cname| score_mean|
  3. +-----+-----+-----------------+
  4. | 张三| 物理| 90.0|
  5. | 张三| 数学|72.66666666666667|
  6. | 王五| 英语| 68.5|
  7. | 李四| 语文| 64.5|
  8. +-----+-----+-----------------+

22、查询所有课程的成绩第2名到第3名的学生信息及该课程成绩

  1. w = Window.partitionBy("cid").orderBy(F.col("score").desc())
  2. t1 = df_sc.withColumn("rank", F.dense_rank().over(w))
  3. result = t1.where("rank=2 or rank=3").join(df_student,on='sid',how='left').select("sid","sname","cid","score","rank")
  4. result.show()

输出结果:

  1. +---+-----+---+-----+----+
  2. |sid|sname|cid|score|rank|
  3. +---+-----+---+-----+----+
  4. | 02| 钱电| 01| 70| 3|
  5. | 02| 钱电| 03| 80| 3|
  6. | 03| 孙风| 03| 80| 3|
  7. | 05| 周梅| 01| 76| 2|
  8. | 05| 周梅| 02| 87| 3|
  9. | 07| 郑竹| 02| 89| 2|
  10. | 07| 郑竹| 03| 98| 2|
  11. +---+-----+---+-----+----+

23、统计各科成绩各分数段人数:课程编号,课程名称,[100-85],[85-70],[70-60],[0-60]及所占百分比。

  1. t1 = df_sc.join(df_course, on='cid',how='inner')
  2. t2 = t1.groupby(["cid","cname"])\
  3. .agg(F.sum(F.when(F.col('score') >= 85, 1).otherwise(0)).alias('between85to100'),
  4. F.sum(F.when((F.col('score') >= 70) & (F.col('score') < 85), 1).otherwise(0)).alias('between70to85'),
  5. F.sum(F.when((F.col('score') >= 60) & (F.col('score') < 70), 1).otherwise(0)).alias('between60to70'),
  6. F.sum(F.when(F.col('score') < 60, 1).otherwise(0)).alias('between0to60'),
  7. F.count('*').alias('total_students')
  8. )
  9. result = t2.withColumn('between85to100_ratio',F.col("between85to100")/F.col("total_students"))\
  10. .withColumn('between70to85_ratio',F.col("between70to85")/F.col("total_students"))\
  11. .withColumn('between60to70_ratio',F.col("between60to70")/F.col("total_students"))\
  12. .withColumn('between0to60_ratio',F.col("between0to60")/F.col("total_students"))
  13. result.show()

输出结果:

  1. +---+-----+--------------+-------------+-------------+------------+--------------+--------------------+-------------------+-------------------+-------------------+
  2. |cid|cname|between85to100|between70to85|between60to70|between0to60|total_students|between85to100_ratio|between70to85_ratio|between60to70_ratio| between0to60_ratio|
  3. +---+-----+--------------+-------------+-------------+------------+--------------+--------------------+-------------------+-------------------+-------------------+
  4. | 01| 语文| 0| 4| 0| 2| 6| 0.0| 0.6666666666666666| 0.0| 0.3333333333333333|
  5. | 02| 数学| 3| 1| 1| 1| 6| 0.5|0.16666666666666666|0.16666666666666666|0.16666666666666666|
  6. | 03| 英语| 2| 2| 0| 2| 6| 0.3333333333333333| 0.3333333333333333| 0.0| 0.3333333333333333|
  7. | 04| 物理| 1| 0| 0| 0| 1| 1.0| 0.0| 0.0| 0.0|
  8. +---+-----+--------------+-------------+-------------+------------+--------------+--------------------+-------------------+-------------------+-------------------+

24、查询学生平均成绩及其名次

  1. t1 = df_sc.groupby("sid").agg(F.mean('score').alias('score_avg'))
  2. w = Window.partitionBy().orderBy(F.col("score_avg").desc())
  3. result = t1.withColumn("rank", F.row_number().over(w))
  4. result.show()

输出结果:

  1. +---+-----------------+----+
  2. |sid| score_avg|rank|
  3. +---+-----------------+----+
  4. | 07| 93.5| 1|
  5. | 01|89.66666666666667| 2|
  6. | 05| 81.5| 3|
  7. | 03| 80.0| 4|
  8. | 02| 70.0| 5|
  9. | 04| 47.5| 6|
  10. | 06| 32.5| 7|
  11. +---+-----------------+----+

25、查询各科成绩前三名的记录。

  1. w = Window.partitionBy("cid").orderBy(F.col("score").desc())
  2. result = df_sc.withColumn("rank", F.row_number().over(w)).filter(F.col("rank")<=3)
  3. result.show()

输出结果:

  1. +---+---+-----+----+
  2. |sid|cid|score|rank|
  3. +---+---+-----+----+
  4. | 01| 01| 80| 1|
  5. | 03| 01| 80| 2|
  6. | 05| 01| 76| 3|
  7. | 01| 02| 90| 1|
  8. | 07| 02| 89| 2|
  9. | 05| 02| 87| 3|
  10. | 01| 03| 99| 1|
  11. | 07| 03| 98| 2|
  12. | 02| 03| 80| 3|
  13. | 04| 04| 90| 1|
  14. +---+---+-----+----+

26、查询每门课程被选修的学生数

  1. result = df_sc.groupby("cid").agg(F.count("*").alias("students_cnt"))
  2. result.show()

输出结果:

  1. +---+------------+
  2. |cid|students_cnt|
  3. +---+------------+
  4. | 01| 6|
  5. | 02| 6|
  6. | 03| 6|
  7. | 04| 1|
  8. +---+------------+

27、查询出只选修了两门课程的全部学生的学号和姓名。

  1. t1 = df_sc.groupby("sid").agg(F.count("*").alias("course_cnt"))
  2. t2 = t1.filter(F.col("course_cnt")==2)
  3. result = t2.join(df_student,on='sid').select("sid","sname")
  4. result.show()

输出结果:

  1. +---+-----+
  2. |sid|sname|
  3. +---+-----+
  4. | 05| 周梅|
  5. | 06| 吴兰|
  6. | 07| 郑竹|
  7. +---+-----+

28、查询男生、女生人数。

  1. result = df_student.groupby("ssex").agg(F.count("*").alias("counts"))
  2. result.show()

输出结果:

  1. +----+------+
  2. |ssex|counts|
  3. +----+------+
  4. | 男| 4|
  5. | 女| 4|
  6. +----+------+

29、查询名字中含有"风"字的学生信息

  1. result = df_student.where("sname like '%风%'")
  2. #或
  3. result = df_student.filter(F.col("sname").contains("风"))
  4. result.show()

输出结果:

  1. +---+-----+----------+----+
  2. |sid|sname| sage|ssex|
  3. +---+-----+----------+----+
  4. | 03| 孙风|1990-05-20| 男|
  5. +---+-----+----------+----+

30、查询同一课程分数相同的情况,并统计人数

  1. t1 = df_sc.groupby(["cid","score"]).agg(F.count("*").alias("counts"))
  2. result = t1.where("counts>1")
  3. result.show()

输出结果:

  1. +---+-----+------+
  2. |cid|score|counts|
  3. +---+-----+------+
  4. | 01| 80| 2|
  5. | 03| 80| 2|
  6. +---+-----+------+

31、查询1990年出生的学生名单。

  1. result = df_student.where("substr(sage,1,4)='1990'")
  2. #或
  3. result = df_student.withColumn("birth_year", F.substring(F.col("sage"), 1, 4)).filter(F.col("birth_year")=='1990')
  4. result.show()

32、查询每门课程的平均成绩,结果按平均成绩升序排列。

  1. t1 = df_sc.groupby("cid").agg(F.mean("score").alias("score_avg"))
  2. result = t1.orderBy(F.col("score_avg").asc())
  3. result.show()

输出结果:

  1. +---+-----------------+
  2. |cid| score_avg|
  3. +---+-----------------+
  4. | 01| 64.5|
  5. | 03| 68.5|
  6. | 02|72.66666666666667|
  7. | 04| 90.0|
  8. +---+-----------------+

33、查询每门不及格的成绩,并按课程号从小到大、分数从高到低的顺序排列。

  1. result = df_sc.where("score<60").orderBy(F.col("cid").asc(),F.col("score").desc())
  2. result.show()

输出结果:

  1. +---+---+-----+
  2. |sid|cid|score|
  3. +---+---+-----+
  4. | 04| 01| 50|
  5. | 06| 01| 31|
  6. | 04| 02| 30|
  7. | 06| 03| 34|
  8. | 04| 03| 20|
  9. +---+---+-----+

34、查询课程编号为"01"且课程成绩在60分以上的学生的学号和姓名。

  1. t1 = df_sc.where("score>60 and cid='01'")
  2. result = t1.join(df_student,on="sid").select("sid","sname")
  3. result.show()

输出结果:

  1. +---+-----+
  2. |sid|sname|
  3. +---+-----+
  4. | 01| 赵雷|
  5. | 02| 钱电|
  6. | 03| 孙风|
  7. | 05| 周梅|
  8. +---+-----+

35、查询选修“张三”老师所授课程的学生中,成绩最高的学生姓名及其成绩

  1. t1 = df_teacher.where("tname='张三'").join(df_course,on='tid').join(df_sc,on='cid')
  2. max_score = t1.agg(F.max("score")).collect()[0][0]
  3. result = df_sc.where("score={}".format(max_score)).join(df_student,on="sid").select("sname","score")
  4. result.show()

输出结果:

  1. +-----+-----+
  2. |sname|score|
  3. +-----+-----+
  4. | 赵雷| 90|
  5. | 李云| 90|
  6. +-----+-----+

36、统计每门课程的学生选修人数(超过5人的课程才统计)。要求输出课程号和选修人数,查询结果按人数降序排列,若人数相同,按课程号升序排列。

  1. t1 = df_sc.groupby("cid").agg(F.count("*").alias("counts")).where("counts>5")
  2. result = t1.orderBy(F.col("counts").desc(),F.col("cid").asc())
  3. result.show()

输出结果:

  1. +---+------+
  2. |cid|counts|
  3. +---+------+
  4. | 01| 6|
  5. | 02| 6|
  6. | 03| 6|
  7. +---+------+

37、检索至少选修两门课程的学生学号。

  1. result = df_sc.groupby("sid").agg(F.count("*").alias("counts")).where("counts>=2")
  2. result.show()

输出结果:

  1. +---+------+
  2. |sid|counts|
  3. +---+------+
  4. | 01| 3|
  5. | 02| 3|
  6. | 03| 3|
  7. | 04| 4|
  8. | 05| 2|
  9. | 06| 2|
  10. | 07| 2|
  11. +---+------+

38、查询各学生的年龄

  1. t1 = df_student.withColumn("birthday", F.to_date(F.col("sage")))
  2. t2 = t1.withColumn("age", F.datediff(F.current_date(), F.col("birthday"))/365)
  3. result = t2.withColumn("age",F.round("age")).select("sid","sname","age")
  4. result.show()

输出结果:

  1. +---+-----+----+
  2. |sid|sname| age|
  3. +---+-----+----+
  4. | 01| 赵雷|34.0|
  5. | 02| 钱电|33.0|
  6. | 03| 孙风|34.0|
  7. | 04| 李云|34.0|
  8. | 05| 周梅|32.0|
  9. | 06| 吴兰|32.0|
  10. | 07| 郑竹|35.0|
  11. | 08| 王菊|34.0|
  12. +---+-----+----+

39、查询本月过生日的学生。

  1. result = df_student.where('''to_date(sage,'yyyy-mm-dd')>=trunc(current_date(),'mm')
  2. and to_date(sage,'yyyy-mm-dd')<trunc(add_months(current_date(),1),'mm')
  3. ''')
  4. result.show()
  5. #或
  6. t1 = df_student.withColumn("birthday", F.to_date(F.col("sage")))
  7. current_month = F.month(F.current_date()).alias("current_month")
  8. birthday_month = F.month(F.col("birthday")).alias("birthday_month")
  9. result = t1.withColumn("current_month", current_month) \
  10. .withColumn("birthday_month", birthday_month) \
  11. .where(F.col("birthday_month") == F.col("current_month")) \
  12. .select("sid","sname","ssex","birthday")
  13. result.show()

输出结果:

  1. +---+-----+----+----------+
  2. |sid|sname|ssex| birthday|
  3. +---+-----+----+----------+
  4. | 06| 吴兰| 女|1992-03-01|
  5. +---+-----+----+----------+

40、查询年龄最大的学生。

  1. sage_min = df_student.agg(F.min(F.col("sage")).alias("sage_min")).collect()[0][0]
  2. result = df_student.where("sage='{}'".format(sage_min))
  3. result.show()

输出结果:

  1. +---+-----+----------+----+
  2. |sid|sname| sage|ssex|
  3. +---+-----+----------+----+
  4. | 07| 郑竹|1989-07-01| 女|
  5. +---+-----+----------+----+

三、总结

以上40题主要是对数据的关联、筛选、查询,最常用的DataFrame API是select、filter、where等。由于Spark DataFrame是不可变的分布式数据集,只能通过生成新的DataFrame已达到对其修改的效果。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/黑客灵魂/article/detail/924152
推荐阅读
相关标签
  

闽ICP备14008679号