当前位置:   article > 正文

第十一篇|基于SparkSQL的电影分析项目实战_基于spark的电影数据分析

基于spark的电影数据分析

在之前的分享中,曾系统地介绍了Spark的基本原理和使用方式,感兴趣的可以翻看之前的分享文章。在本篇分享中,将介绍一个完整的项目案例,该案例会真实还原企业中SparkSQL的开发流程,手把手教你构建一个基于SparkSQL的分析系统。为了讲解方便,我会对代码进行拆解,完整的代码已上传至GitHub,想看完整代码可以去clone,记得给个Star。以下是全文,希望本文对你有所帮助。

https://github.com/jiamx/spark_project_practise
  • 1

公众号『大数据技术与数仓』,回复『资料』领取大数据资料包

项目介绍

数据集介绍

使用MovieLens的名称为ml-25m.zip的数据集,使用的文件时movies.csvratings.csv,上述文件的下载地址为:

http://files.grouplens.org/datasets/movielens/ml-25m.zip
  • 1
  • movies.csv

该文件是电影数据,对应的为维表数据,大小为2.89MB,包括6万多部电影,其数据格式为[movieId,title,genres],分别对应**[电影id,电影名称,电影所属分类]**,样例数据如下所示:逗号分隔

1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
  • 1
  • ratings.csv

该文件为定影评分数据,对应为事实表数据,大小为646MB,其数据格式为:[userId,movieId,rating,timestamp],分别对应**[用户id,电影id,评分,时间戳]**,样例数据如下所示:逗号分隔

1,296,5,1147880044
  • 1

项目代码结构

需求分析

  • 需求1:查找电影评分个数超过5000,且平均评分较高的前十部电影名称及其对应的平均评分

  • 需求2:查找每个电影类别及其对应的平均评分

  • 需求3:查找被评分次数较多的前十部电影

代码讲解

  • DemoMainApp

该类是程序执行的入口,主要是获取数据源,转换成DataFrame,并调用封装好的业务逻辑类。

object DemoMainApp {
   
  // 文件路径
  private val MOVIES_CSV_FILE_PATH = "file:///e:/movies.csv"
  private val RATINGS_CSV_FILE_PATH = "file:///e:/ratings.csv"

  def main(args: Array[String]): Unit = {
   
    // 创建spark session
    val spark = SparkSession
      .builder
      .master("local[4]")
      .getOrCreate
    // schema信息
    val schemaLoader = new SchemaLoader
    // 读取Movie数据集
    val movieDF = readCsvIntoDataSet(spark, MOVIES_CSV_FILE_PATH, schemaLoader.getMovieSchema)
    // 读取Rating数据集
    val ratingDF = readCsvIntoDataSet(spark, RATINGS_CSV_FILE_PATH, schemaLoader.getRatingSchema)

    // 需求1:查找电影评分个数超过5000,且平均评分较高的前十部电影名称及其对应的平均评分
    val bestFilmsByOverallRating = new BestFilmsByOverallRating
    //bestFilmsByOverallRating.run(movieDF, ratingDF, spark)

    // 需求2:查找每个电影类别及其对应的平均评分
    val genresByAverageRating = new GenresByAverageRating
    //genresByAverageRating.run(movieDF, ratingDF, spark)

    // 需求3:查找被评分次数较多的前十部电影
    val mostRatedFilms = new MostRatedFilms
    mostRatedFilms.run(movieDF, ratingDF, spark)

    spark.close()

  }
  /**
    * 读取数据文件,转成DataFrame
    *
    * @param spark
    * @param path
    * @param schema
    * @return
    */
  def readCsvIntoDataSet(spark: SparkSession, path: String, schema: StructType) = {
   

    val dataSet = spark.read
      .format("csv")
      .option("header", "true")
      .schema(schema)
      .load(path)
    dataSet
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • Entry

该类为实体类,封装了数据源的样例类和结果表的样例类

class Entry {
   

}

case class Movies(
                   movieId: String, // 电影的id
                   title: String, // 电影的标题
                   genres: String // 电影类别
                 )

case class Ratings(
                    userId: String, // 用户的id
                    movieId: String, // 电影的id
                    rating: String, // 用户评分
                    timestamp: String // 时间戳
                  )

// 需求1MySQL结果表
case class tenGreatestMoviesByAverageRating(
                                             movieId: String, // 电影的id
                                             title: String, // 电影的标题
                                             avgRating: String // 电影平均评分
                                           )

// 需求2MySQL结果表
case class topGenresByAverageRating(
                                     genres: String, //电影类别
                                     avgRating: String 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Guff_9hys/article/detail/780268
推荐阅读
相关标签
  

闽ICP备14008679号