当前位置:   article > 正文

Hive项目之谷粒影音:ETL清洗原数据、Hive统计视频观看数top10、视频类别top、视频观看数top其所属类别、类别流量top、类别热度top、上传视频用户数量top、类别视频观看top

谷粒影音

Hive实战之谷粒影音

项目数据下载地址: guiliVideo.zip谷粒影音项目视频表、用户表

包含内容:

两个文件夹

User表中的74702条数据

video表中5张表,每张表中都有多条数据

需求描述

统计硅谷影音视频网站的常规指标,各种TopN指标:

--统计视频观看数Top10

--统计视频类别热度Top10

--统计视频观看数Top20所属类别

--统计视频观看数Top50所关联视频的所属类别Rank

--统计每个类别中的视频热度Top10

--统计每个类别中视频流量Top10

--统计上传视频最多的用户Top10以及他们上传的视频

--统计每个类别视频观看数Top10

 项目数据结构

1.视频表

6-13 视频表

字段

备注

详细描述

video id

视频唯一id

11位字符串

uploader

视频上传者

上传视频的用户名String

age

视频年龄

视频在平台上的整数天

category

视频类别

上传视频指定的视频分类

length

视频长度

整形数字标识的视频长度

views

观看次数

视频被浏览的次数

rate

视频评分

满分5

ratings

流量

视频的流量,整型数字

conments

评论数

一个视频的整数评论数

related ids

相关视频id

相关视频的id,最多20


表6-14 用户表2.用户表

字段

备注

字段类型

uploader

上传者用户名

string

videos

上传视频数

int

friends

朋友数量

int


通过观察原始数据形式,可以发现,视频可以有多个所属分类,每个所属分类用&符号分割,且分割的两边有空格字符,同时相关视频也是可以有多个元素,多个相关视频又用“\t”进行分割。为了分析数据时方便对存在多个子元素的数据进行操作,我们首先进行数据重组清洗操作。即:将所有的类别用“&”分割,同时去掉两边空格,多个相关视频id也使用“&”进行分割。

ETL原始数据

三件事情

  1. 长度不够9的删掉
  2. 视频类别删掉空格
  3. 该相关视频的分割符

1.ETL之ETLUtil

  1. public class ETLUtil {
  2.     public static String oriString2ETLString(String ori){
  3.          StringBuilder etlString = new StringBuilder();
  4.          String[] splits = ori.split("\t");
  5.          if(splits.length < 9) return null;
  6.          splits[3] = splits[3].replace(" ", "");
  7.          for(int i = 0; i < splits.length; i++){
  8.              if(i < 9){
  9.                   if(i == splits.length - 1){
  10.                       etlString.append(splits[i]);                   
  11.                   }else{
  12.                       etlString.append(splits[i] + "\t");   
  13.                   }
  14.              }else{
  15.                   if(i == splits.length - 1){
  16.                       etlString.append(splits[i]);
  17.                   }else{
  18.                       etlString.append(splits[i] + "&");
  19.                   }
  20.              }
  21.          }
  22.              return etlString.toString();
  23.     }
  24. }

 

2.ETL之Mapper

  1. import java.io.IOException;
  2. import org.apache.commons.lang.StringUtils;
  3. import org.apache.hadoop.io.NullWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Mapper;
  6. import com.z.youtube.util.ETLUtil;
  7. public class VideoETLMapper extends Mapper<Object, Text, NullWritable, Text>{
  8. Text text = new Text();
  9. @Override
  10. protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
  11. String etlString = ETLUtil.oriString2ETLString(value.toString());
  12. if(StringUtils.isBlank(etlString)) return;
  13. text.set(etlString);
  14. context.write(NullWritable.get(), text);
  15. }
  16. }

3.ETL之Runner

  1. import java.io.IOException;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.FileSystem;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.NullWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.Job;
  8. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  9. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  10. import org.apache.hadoop.util.Tool;
  11. import org.apache.hadoop.util.ToolRunner;
  12. public class VideoETLRunner implements Tool {
  13. private Configuration conf = null;
  14. @Override
  15. public void setConf(Configuration conf) {
  16. this.conf = conf;
  17. }
  18. @Override
  19. public Configuration getConf() {
  20. return this.conf;
  21. }
  22. @Override
  23. public int run(String[] args) throws Exception {
  24. conf = this.getConf();
  25. conf.set("inpath", args[0]);
  26. conf.set("outpath", args[1]);
  27. Job job = Job.getInstance(conf, "youtube-video-etl");
  28. job.setJarByClass(VideoETLRunner.class);
  29. job.setMapperClass(VideoETLMapper.class);
  30. job.setMapOutputKeyClass(NullWritable.class);
  31. job.setMapOutputValueClass(Text.class);
  32. job.setNumReduceTasks(0);
  33. this.initJobInputPath(job);
  34. this.initJobOutputPath(job);
  35. return job.waitForCompletion(true) ? 0 : 1;
  36. }
  37. private void initJobOutputPath(Job job) throws IOException {
  38. Configuration conf = job.getConfiguration();
  39. String outPathString = conf.get("outpath");
  40. FileSystem fs = FileSystem.get(conf);
  41. Path outPath = new Path(outPathString);
  42. if(fs.exists(outPath)){
  43. fs.delete(outPath, true);
  44. }
  45. FileOutputFormat.setOutputPath(job, outPath);
  46. }
  47. private void initJobInputPath(Job job) throws IOException {
  48. Configuration conf = job.getConfiguration();
  49. String inPathString = conf.get("inpath");
  50. FileSystem fs = FileSystem.get(conf);
  51. Path inPath = new Path(inPathString);
  52. if(fs.exists(inPath)){
  53. FileInputFormat.addInputPath(job, inPath);
  54. }else{
  55. throw new RuntimeException("HDFS中该文件目录不存在:" + inPathString);
  56. }
  57. }
  58. public static void main(String[] args) {
  59. try {
  60. int resultCode = ToolRunner.run(new VideoETLRunner(), args);
  61. if(resultCode == 0){
  62. System.out.println("Success!");
  63. }else{
  64. System.out.println("Fail!");
  65. }
  66. System.exit(resultCode);
  67. } catch (Exception e) {
  68. e.printStackTrace();
  69. System.exit(1);
  70. }
  71. }
  72. }

 

4.执行ETL

上传数据到HDFS笔记

[atguigu@hadoop102 datas]$ hadoop fs -put user /

[atguigu@hadoop102 datas]$ hadoop fs -put video /

 

gulivideo_user_orc:

insert into table gulivideo_user_orc select * from gulivideo_user_ori;

完整步骤:

创建maven工程,在pom中添加如下:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <parent>
  6. <artifactId>hdfs</artifactId>
  7. <groupId>Hdfs20190817</groupId>
  8. <version>1.0-SNAPSHOT</version>
  9. </parent>
  10. <modelVersion>4.0.0</modelVersion>
  11. <groupId>hive</groupId>
  12. <artifactId>Hive20190828</artifactId>
  13. <dependencies>
  14. <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->
  15. <dependency>
  16. <groupId>org.apache.hive</groupId>
  17. <artifactId>hive-exec</artifactId>
  18. <version>1.2.1</version>
  19. </dependency>
  20. <dependency>
  21. <groupId>junit</groupId>
  22. <artifactId>junit</artifactId>
  23. <version>RELEASE</version>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.apache.logging.log4j</groupId>
  27. <artifactId>log4j-core</artifactId>
  28. <version>2.8.2</version>
  29. </dependency>
  30. <dependency>
  31. <groupId>org.apache.hadoop</groupId>
  32. <artifactId>hadoop-common</artifactId>
  33. <version>2.7.2</version>
  34. </dependency>
  35. <dependency>
  36. <groupId>org.apache.hadoop</groupId>
  37. <artifactId>hadoop-client</artifactId>
  38. <version>2.7.2</version>
  39. </dependency>
  40. <dependency>
  41. <groupId>org.apache.hadoop</groupId>
  42. <artifactId>hadoop-hdfs</artifactId>
  43. <version>2.7.2</version>
  44. </dependency>
  45. <dependency>
  46. <groupId>jdk.tools</groupId>
  47. <artifactId>jdk.tools</artifactId>
  48. <version>1.8</version>
  49. <scope>system</scope>
  50. <systemPath>C:/DeveloperTools/Java/jdk1.8.0_211/lib/tools.jar</systemPath>
  51. </dependency>
  52. </dependencies>
  53. </project>

新建三个类用于数据清洗


ETLMapper

  1. package guliETL;
  2. import org.apache.commons.lang.StringUtils;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.NullWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Mapper;
  7. import java.io.IOException;
  8. /**
  9. * @author cherry
  10. * @create 2019-08-29-15:49
  11. */
  12. public class ETLMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
  13. private Text k = new Text();
  14. @Override
  15. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  16. //1、获取一行数据
  17. String line = value.toString();
  18. //2.清洗数据
  19. String etlStr = ETLUtils.etlStr(line);
  20. //3.过滤掉空数据
  21. if (StringUtils.isBlank(etlStr)) return;
  22. //4.写出数据
  23. k.set(etlStr);
  24. context.write(k, NullWritable.get());
  25. }
  26. }

ETLUtils

  1. package guliETL;
  2. /**
  3. * 工具类用于清洗数据
  4. * 1.过滤脏数据
  5. * 2.将类别字段中的空格" "替换为""
  6. *
  7. * @author cherry
  8. * @create 2019-08-29-15:51
  9. */
  10. public class ETLUtils {
  11. public static String etlStr(String line) {
  12. //0.按照制表符切割数据
  13. String[] split = line.split("\t");
  14. //1.过滤脏数据,将长度不够9的删掉
  15. if (split.length < 9) {
  16. return null;
  17. }
  18. //2.去掉类别字段中的空格
  19. split[3] = split[3].replaceAll(" ", "");
  20. //3.替换关联视频的分隔符
  21. StringBuffer sb = new StringBuffer();
  22. for (int i = 0; i < split.length; i++) {
  23. if (i < 9) {
  24. if (i == split.length - 1) sb.append(split[1]);
  25. else sb.append(split[i]).append("\t");
  26. } else {
  27. if (i == split.length - 1) sb.append(split[i]);
  28. else sb.append(split[i]).append("&");
  29. }
  30. }
  31. return sb.toString();
  32. }
  33. }

ETLDriver

  1. package guliETL;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.NullWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  9. import org.apache.hadoop.util.Tool;
  10. import org.apache.hadoop.util.ToolRunner;
  11. /**
  12. * 实现Tool接口对Driver进行封装
  13. *
  14. * @author cherry
  15. * @create 2019-08-29-16:23
  16. */
  17. public class ETLDriver implements Tool {
  18. private Configuration conf;
  19. public static void main(String[] args) throws Exception {
  20. int run = ToolRunner.run(new ETLDriver(), args);
  21. //将返回结果打印到控制台
  22. System.out.println(run);
  23. }
  24. @Override
  25. public int run(String[] args) throws Exception {
  26. //1.获取Job对象
  27. Job job = Job.getInstance(getConf());
  28. //2.封装driver类
  29. job.setJarByClass(ETLDriver.class);
  30. //3.关联Mapper类
  31. job.setMapperClass(ETLMapper.class);
  32. //4.Mapper输出KV类型
  33. job.setMapOutputKeyClass(Text.class);
  34. job.setMapOutputValueClass(NullWritable.class);
  35. //5.最终输出类型
  36. job.setOutputKeyClass(Text.class);
  37. job.setOutputValueClass(NullWritable.class);
  38. //6.输入输出路径
  39. FileInputFormat.setInputPaths(job, new Path(args[0]));//通过传参
  40. FileOutputFormat.setOutputPath(job, new Path(args[1]));
  41. //补充:将Reduce的个数设置为0以提高效率
  42. job.setNumReduceTasks(0);
  43. //7.提交
  44. boolean b = job.waitForCompletion(true);
  45. return b ? 0 : 1;
  46. }
  47. @Override
  48. public void setConf(Configuration conf) {
  49. this.conf = conf;
  50. }
  51. @Override
  52. public Configuration getConf() {
  53. return this.conf;
  54. }
  55. }

将maven工程打包

将生成的jar文件重命名为ETL.jar,长传到集群

 业务分析

1 统计视频观看数Top10

思路:使用order by按照views字段做一个全局排序即可,同时我们设置只显示前10条。

hive (gulivideo)> select videoId, views, comments  from gulivideo_orc order by views desc limit 10;

uploading.gif转存失败重新上传取消  

最终代码:

  1. select
  2. videoId,
  3. uploader,
  4. age,
  5. category,
  6. length,
  7. views,
  8. rate,
  9. ratings,
  10. comments
  11. from
  12. gulivideo_orc
  13. order by
  14. views
  15. desc limit
  16. 10;


 

2 统计视频类别热度Top10

思路:

1) 即统计每个类别有多少个视频,显示出包含视频最多的前10个类别。

  1. SELECT videoId,category_name
  2. FROM gulivideo_orc
  3. lateral view explode(category) category_t as category_name; t1

 

2) 我们需要按照类别group by聚合,然后count组内的videoId个数即可。

3) 因为当前表结构为:一个视频对应一个或多个类别。所以如果要group by类别,需要先将类别进行列转行(展开),然后再进行count即可。

4) 最后按照热度排序,显示前10条。

最终代码:

  1. select
  2. category_name as category,
  3. count(t1.videoId) as hot
  4. from (
  5. select
  6. videoId,
  7. category_name
  8. from
  9. gulivideo_orc lateral view explode(category) t_catetory as category_name) t1
  10. group by
  11. t1.category_name
  12. order by
  13. hot
  14. desc limit
  15. 10;


 

3 统计出视频观看数最高的20个视频的所属类别以及类别包含Top20视频的个数

思路:

1) 先找到观看数最高的20个视频所属条目的所有信息,降序排列

  1. SELECT
  2. videoId,views,category
  3. FROM
  4. gulivideo_orc
  5. ORDER BY
  6. views DESC
  7. LIMIT 20;t1

2) 把这20条信息中的category分裂出来(列转行)

  1. SELECT
  2. distinct(category_name)
  3. FROM
  4. (
  5. SELECT
  6. videoId,
  7. views,
  8. category
  9. FROM
  10. gulivideo_orc
  11. ORDER BY
  12. views DESC
  13. LIMIT 20
  14. ) t1 lateral VIEW explode (category) category_t AS category_name;
  15. t2

3) 最后查询视频分类名称和该分类下有多少个Top20的视频

最终代码:

  1. select
  2.     category_name as category,
  3.     count(t2.videoId) as hot_with_views
  4. from (
  5.     select
  6.         videoId,
  7.         category_name
  8.     from (
  9.         select
  10.             *
  11.         from
  12.             gulivideo_orc
  13.         order by
  14.             views
  15.         desc limit
  16.             20) t1 lateral view explode(category) t_catetory as category_name) t2
  17. group by
  18.     category_name
  19. order by
  20.     hot_with_views
  21. desc;

4 统计视频观看数Top50所关联视频的所属类别Rank

1)查询出观看数最多的前50个视频的所有信息(当然包含了每个视频对应的关联视频),记为临时表t1

t1:观看数前50的视频

  1. select
  2.     *
  3. from
  4.     gulivideo_orc
  5. order by
  6.     views
  7. desc limit
  8.     50;

2)将找到的50条视频信息的相关视频relatedId列转行,记为临时表t2

t2:将相关视频的id进行列转行操作

  1. select
  2.     explode(relatedId) as videoId
  3. from
  4.     t1;

 

3)将相关视频的id和gulivideo_orc表进行inner join操作

t5:得到两列数据,一列是category,一列是之前查询出来的相关视频id

  1.  (select
  2.     distinct(t2.videoId),
  3.     t3.category
  4. from
  5.     t2
  6. inner join
  7.     gulivideo_orc t3 on t2.videoId = t3.videoId) t4 lateral view explode(category) t_catetory as category_name;

 

4) 按照视频类别进行分组,统计每组视频个数,然后排行

最终代码:

  1. select
  2.     category_name as category,
  3.     count(t5.videoId) as hot
  4. from (
  5.     select
  6.         videoId,
  7.         category_name
  8.     from (
  9.         select
  10.             distinct(t2.videoId),
  11.             t3.category
  12.         from (
  13.             select
  14.                 explode(relatedId) as videoId
  15.             from (
  16.                 select
  17.                     *
  18.                 from
  19.                     gulivideo_orc
  20.                 order by
  21.                     views
  22.                 desc limit
  23.                     50) t1) t2
  24.         inner join
  25.             gulivideo_orc t3 on t2.videoId = t3.videoId) t4 lateral view explode(category) t_catetory as category_name) t5
  26. group by
  27.     category_name
  28. order by
  29.     hot
  30. desc;

5 统计每个类别中的视频热度Top10,以Music为例

思路:

1) 要想统计Music类别中的视频热度Top10,需要先找到Music类别,那么就需要将category展开,所以可以创建一张表用于存放categoryId展开的数据。

2) 向category展开的表中插入数据。

3) 统计对应类别(Music)中的视频热度。

最终代码:

创建表类别表:

  1. create table gulivideo_category(
  2. videoId string,
  3. uploader string,
  4. age int,
  5. categoryId string,
  6. length int,
  7. views int,
  8. rate float,
  9. ratings int,
  10. comments int,
  11. relatedId array<string>)
  12. row format delimited
  13. fields terminated by "\t"
  14. collection items terminated by "&"
  15. stored as orc;


向类别表中插入数据:

  1. insert into table gulivideo_category
  2. select
  3. videoId,
  4. uploader,
  5. age,
  6. categoryId,
  7. length,
  8. views,
  9. rate,
  10. ratings,
  11. comments,
  12. relatedId
  13. from
  14. gulivideo_orc lateral view explode(category) catetory as categoryId;

 

统计Music类别的Top10(也可以统计其他)

  1. select
  2. videoId,
  3. views
  4. from
  5. gulivideo_category
  6. where
  7. categoryId = "Music"
  8. order by
  9. views
  10. desc limit
  11. 10;


 

6 统计每个类别中视频流量Top10,以Music为例

思路:

1) 创建视频类别展开表(categoryId列转行后的表)

2) 按照ratings排序即可

最终代码:

  1. select
  2. videoId,
  3. views,
  4. ratings
  5. from
  6. gulivideo_category
  7. where
  8. categoryId = "Music"
  9. order by
  10. ratings
  11. desc limit
  12. 10;

7 统计上传视频最多的用户Top10以及他们上传的观看次数在前20的视频

思路:

1) 先找到上传视频最多的10个用户的用户信息

  1. select
  2. *
  3. from
  4. gulivideo_user_orc
  5. order by
  6. videos
  7. desc limit
  8. 10;

2) 通过uploader字段与gulivideo_orc表进行join,得到的信息按照views观看次数进行排序即可。

最终代码:

  1. select
  2. t2.videoId,
  3. t2.views,
  4. t2.ratings,
  5. t1.videos,
  6. t1.friends
  7. from (
  8. select
  9. *
  10. from
  11. gulivideo_user_orc
  12. order by
  13. videos desc
  14. limit
  15. 10) t1
  16. join
  17. gulivideo_orc t2
  18. on
  19. t1.uploader = t2.uploader
  20. order by
  21. views desc
  22. limit
  23. 20;

将观看次数设置为100检测,可以看出以上代码没问题

8 统计每个类别视频观看数Top10

思路:

1) 先得到categoryId展开的表数据

2) 子查询按照categoryId进行分区,然后分区内排序,并生成递增数字,该递增数字这一列起名为rank列

3) 通过子查询产生的临时表,查询rank值小于等于10的数据行即可。

最终代码:

  1. select
  2. t1.*
  3. from (
  4. select
  5. videoId,
  6. categoryId,
  7. views,
  8. row_number() over(partition by categoryId order by views desc) rank from gulivideo_category) t1
  9. where
  10. rank <= 10;

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/782612
推荐阅读
相关标签
  

闽ICP备14008679号