赞
踩
在之前的分享中,曾系统地介绍了Spark的基本原理和使用方式,感兴趣的可以翻看之前的分享文章。在本篇分享中,将介绍一个完整的项目案例,该案例会真实还原企业中SparkSQL的开发流程,手把手教你构建一个基于SparkSQL的分析系统。为了讲解方便,我会对代码进行拆解,完整的代码已上传至GitHub,想看完整代码可以去clone,记得给个Star。以下是全文,希望本文对你有所帮助。
https://github.com/jiamx/spark_project_practise
公众号『大数据技术与数仓』,回复『资料』领取大数据资料包
使用MovieLens的名称为ml-25m.zip的数据集,使用的文件时movies.csv和ratings.csv,上述文件的下载地址为:
http://files.grouplens.org/datasets/movielens/ml-25m.zip
该文件是电影数据,对应的为维表数据,大小为2.89MB,包括6万多部电影,其数据格式为[movieId,title,genres],分别对应**[电影id,电影名称,电影所属分类]**,样例数据如下所示:逗号分隔
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
该文件为定影评分数据,对应为事实表数据,大小为646MB,其数据格式为:[userId,movieId,rating,timestamp],分别对应**[用户id,电影id,评分,时间戳]**,样例数据如下所示:逗号分隔
1,296,5,1147880044
该类是程序执行的入口,主要是获取数据源,转换成DataFrame,并调用封装好的业务逻辑类。
object DemoMainApp { // 文件路径 private val MOVIES_CSV_FILE_PATH = "file:///e:/movies.csv" private val RATINGS_CSV_FILE_PATH = "file:///e:/ratings.csv" def main(args: Array[String]): Unit = { // 创建spark session val spark = SparkSession .builder .master("local[4]") .getOrCreate // schema信息 val schemaLoader = new SchemaLoader // 读取Movie数据集 val movieDF = readCsvIntoDataSet(spark, MOVIES_CSV_FILE_PATH, schemaLoader.getMovieSchema) // 读取Rating数据集 val ratingDF = readCsvIntoDataSet(spark, RATINGS_CSV_FILE_PATH, schemaLoader.getRatingSchema) // 需求1:查找电影评分个数超过5000,且平均评分较高的前十部电影名称及其对应的平均评分 val bestFilmsByOverallRating = new BestFilmsByOverallRating //bestFilmsByOverallRating.run(movieDF, ratingDF, spark) // 需求2:查找每个电影类别及其对应的平均评分 val genresByAverageRating = new GenresByAverageRating //genresByAverageRating.run(movieDF, ratingDF, spark) // 需求3:查找被评分次数较多的前十部电影 val mostRatedFilms = new MostRatedFilms mostRatedFilms.run(movieDF, ratingDF, spark) spark.close() } /** * 读取数据文件,转成DataFrame * * @param spark * @param path * @param schema * @return */ def readCsvIntoDataSet(spark: SparkSession, path: String, schema: StructType) = { val dataSet = spark.read .format("csv") .option("header", "true") .schema(schema) .load(path) dataSet } }
该类为实体类,封装了数据源的样例类和结果表的样例类
class Entry { } case class Movies( movieId: String, // 电影的id title: String, // 电影的标题 genres: String // 电影类别 ) case class Ratings( userId: String, // 用户的id movieId: String, // 电影的id rating: String, // 用户评分 timestamp: String // 时间戳 ) // 需求1MySQL结果表 case class tenGreatestMoviesByAverageRating( movieId: String, // 电影的id title: String, // 电影的标题 avgRating: String // 电影平均评分 ) // 需求2MySQL结果表 case class topGenresByAverageRating( genres: String, //电影类别 avgRating: String
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。