赞
踩
项目数据下载地址: guiliVideo.zip谷粒影音项目视频表、用户表
包含内容:
两个文件夹
User表中的74702条数据
video表中5张表,每张表中都有多条数据
统计硅谷影音视频网站的常规指标,各种TopN指标:
--统计视频观看数Top10
--统计视频类别热度Top10
--统计视频观看数Top20所属类别
--统计视频观看数Top50所关联视频的所属类别Rank
--统计每个类别中的视频热度Top10
--统计每个类别中视频流量Top10
--统计上传视频最多的用户Top10以及他们上传的视频
--统计每个类别视频观看数Top10
1.视频表
表6-13 视频表
字段 | 备注 | 详细描述 |
video id | 视频唯一id | 11位字符串 |
uploader | 视频上传者 | 上传视频的用户名String |
age | 视频年龄 | 视频在平台上的整数天 |
category | 视频类别 | 上传视频指定的视频分类 |
length | 视频长度 | 整形数字标识的视频长度 |
views | 观看次数 | 视频被浏览的次数 |
rate | 视频评分 | 满分5分 |
ratings | 流量 | 视频的流量,整型数字 |
conments | 评论数 | 一个视频的整数评论数 |
related ids | 相关视频id | 相关视频的id,最多20个 |
表6-14 用户表2.用户表
字段 | 备注 | 字段类型 |
uploader | 上传者用户名 | string |
videos | 上传视频数 | int |
friends | 朋友数量 | int |
通过观察原始数据形式,可以发现,视频可以有多个所属分类,每个所属分类用&符号分割,且分割的两边有空格字符,同时相关视频也是可以有多个元素,多个相关视频又用“\t”进行分割。为了分析数据时方便对存在多个子元素的数据进行操作,我们首先进行数据重组清洗操作。即:将所有的类别用“&”分割,同时去掉两边空格,多个相关视频id也使用“&”进行分割。
三件事情
1.ETL之ETLUtil
- public class ETLUtil {
- public static String oriString2ETLString(String ori){
- StringBuilder etlString = new StringBuilder();
- String[] splits = ori.split("\t");
- if(splits.length < 9) return null;
- splits[3] = splits[3].replace(" ", "");
- for(int i = 0; i < splits.length; i++){
- if(i < 9){
- if(i == splits.length - 1){
- etlString.append(splits[i]);
- }else{
- etlString.append(splits[i] + "\t");
- }
- }else{
- if(i == splits.length - 1){
- etlString.append(splits[i]);
- }else{
- etlString.append(splits[i] + "&");
- }
- }
- }
- return etlString.toString();
- }
- }
-

2.ETL之Mapper
- import java.io.IOException;
-
- import org.apache.commons.lang.StringUtils;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
-
- import com.z.youtube.util.ETLUtil;
-
- public class VideoETLMapper extends Mapper<Object, Text, NullWritable, Text>{
- Text text = new Text();
-
- @Override
- protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
- String etlString = ETLUtil.oriString2ETLString(value.toString());
-
- if(StringUtils.isBlank(etlString)) return;
-
- text.set(etlString);
- context.write(NullWritable.get(), text);
- }
- }

3.ETL之Runner
- import java.io.IOException;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
-
- public class VideoETLRunner implements Tool {
- private Configuration conf = null;
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- @Override
- public Configuration getConf() {
-
- return this.conf;
- }
-
- @Override
- public int run(String[] args) throws Exception {
- conf = this.getConf();
- conf.set("inpath", args[0]);
- conf.set("outpath", args[1]);
-
- Job job = Job.getInstance(conf, "youtube-video-etl");
-
- job.setJarByClass(VideoETLRunner.class);
-
- job.setMapperClass(VideoETLMapper.class);
- job.setMapOutputKeyClass(NullWritable.class);
- job.setMapOutputValueClass(Text.class);
- job.setNumReduceTasks(0);
-
- this.initJobInputPath(job);
- this.initJobOutputPath(job);
-
- return job.waitForCompletion(true) ? 0 : 1;
- }
-
- private void initJobOutputPath(Job job) throws IOException {
- Configuration conf = job.getConfiguration();
- String outPathString = conf.get("outpath");
-
- FileSystem fs = FileSystem.get(conf);
-
- Path outPath = new Path(outPathString);
- if(fs.exists(outPath)){
- fs.delete(outPath, true);
- }
-
- FileOutputFormat.setOutputPath(job, outPath);
-
- }
-
- private void initJobInputPath(Job job) throws IOException {
- Configuration conf = job.getConfiguration();
- String inPathString = conf.get("inpath");
-
- FileSystem fs = FileSystem.get(conf);
-
- Path inPath = new Path(inPathString);
- if(fs.exists(inPath)){
- FileInputFormat.addInputPath(job, inPath);
- }else{
- throw new RuntimeException("HDFS中该文件目录不存在:" + inPathString);
- }
- }
-
- public static void main(String[] args) {
- try {
- int resultCode = ToolRunner.run(new VideoETLRunner(), args);
- if(resultCode == 0){
- System.out.println("Success!");
- }else{
- System.out.println("Fail!");
- }
- System.exit(resultCode);
- } catch (Exception e) {
- e.printStackTrace();
- System.exit(1);
- }
- }
- }

4.执行ETL
上传数据到HDFS笔记
[atguigu@hadoop102 datas]$ hadoop fs -put user /
[atguigu@hadoop102 datas]$ hadoop fs -put video /
gulivideo_user_orc:
insert into table gulivideo_user_orc select * from gulivideo_user_ori; |
创建maven工程,在pom中添加如下:
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>hdfs</artifactId>
- <groupId>Hdfs20190817</groupId>
- <version>1.0-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>hive</groupId>
- <artifactId>Hive20190828</artifactId>
-
- <dependencies>
- <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->
- <dependency>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-exec</artifactId>
- <version>1.2.1</version>
- </dependency>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>RELEASE</version>
- </dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-core</artifactId>
- <version>2.8.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>2.7.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>2.7.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <version>2.7.2</version>
- </dependency>
-
- <dependency>
- <groupId>jdk.tools</groupId>
- <artifactId>jdk.tools</artifactId>
- <version>1.8</version>
- <scope>system</scope>
- <systemPath>C:/DeveloperTools/Java/jdk1.8.0_211/lib/tools.jar</systemPath>
- </dependency>
-
- </dependencies>
-
- </project>

新建三个类用于数据清洗
ETLMapper
- package guliETL;
-
- import org.apache.commons.lang.StringUtils;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
-
- import java.io.IOException;
-
- /**
- * @author cherry
- * @create 2019-08-29-15:49
- */
- public class ETLMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
- private Text k = new Text();
-
- @Override
- protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- //1、获取一行数据
- String line = value.toString();
- //2.清洗数据
- String etlStr = ETLUtils.etlStr(line);
- //3.过滤掉空数据
- if (StringUtils.isBlank(etlStr)) return;
- //4.写出数据
- k.set(etlStr);
- context.write(k, NullWritable.get());
- }
- }

ETLUtils
- package guliETL;
-
- /**
- * 工具类用于清洗数据
- * 1.过滤脏数据
- * 2.将类别字段中的空格" "替换为""
- *
- * @author cherry
- * @create 2019-08-29-15:51
- */
- public class ETLUtils {
- public static String etlStr(String line) {
- //0.按照制表符切割数据
- String[] split = line.split("\t");
- //1.过滤脏数据,将长度不够9的删掉
- if (split.length < 9) {
- return null;
- }
- //2.去掉类别字段中的空格
- split[3] = split[3].replaceAll(" ", "");
- //3.替换关联视频的分隔符
- StringBuffer sb = new StringBuffer();
- for (int i = 0; i < split.length; i++) {
- if (i < 9) {
- if (i == split.length - 1) sb.append(split[1]);
- else sb.append(split[i]).append("\t");
- } else {
- if (i == split.length - 1) sb.append(split[i]);
- else sb.append(split[i]).append("&");
- }
- }
- return sb.toString();
- }
- }

ETLDriver
- package guliETL;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
-
- /**
- * 实现Tool接口对Driver进行封装
- *
- * @author cherry
- * @create 2019-08-29-16:23
- */
- public class ETLDriver implements Tool {
- private Configuration conf;
-
- public static void main(String[] args) throws Exception {
- int run = ToolRunner.run(new ETLDriver(), args);
- //将返回结果打印到控制台
- System.out.println(run);
- }
-
- @Override
- public int run(String[] args) throws Exception {
- //1.获取Job对象
- Job job = Job.getInstance(getConf());
-
- //2.封装driver类
- job.setJarByClass(ETLDriver.class);
-
- //3.关联Mapper类
- job.setMapperClass(ETLMapper.class);
-
- //4.Mapper输出KV类型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(NullWritable.class);
-
- //5.最终输出类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(NullWritable.class);
-
- //6.输入输出路径
- FileInputFormat.setInputPaths(job, new Path(args[0]));//通过传参
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
-
- //补充:将Reduce的个数设置为0以提高效率
- job.setNumReduceTasks(0);
- //7.提交
- boolean b = job.waitForCompletion(true);
- return b ? 0 : 1;
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- @Override
- public Configuration getConf() {
- return this.conf;
- }
- }

将maven工程打包
将生成的jar文件重命名为ETL.jar,长传到集群
思路:使用order by按照views字段做一个全局排序即可,同时我们设置只显示前10条。
hive (gulivideo)> select videoId, views, comments from gulivideo_orc order by views desc limit 10;
最终代码:
- select
- videoId,
- uploader,
- age,
- category,
- length,
- views,
- rate,
- ratings,
- comments
- from
- gulivideo_orc
- order by
- views
- desc limit
- 10;

思路:
1) 即统计每个类别有多少个视频,显示出包含视频最多的前10个类别。
- SELECT videoId,category_name
- FROM gulivideo_orc
- lateral view explode(category) category_t as category_name; t1
2) 我们需要按照类别group by聚合,然后count组内的videoId个数即可。
3) 因为当前表结构为:一个视频对应一个或多个类别。所以如果要group by类别,需要先将类别进行列转行(展开),然后再进行count即可。
4) 最后按照热度排序,显示前10条。
最终代码:
- select
- category_name as category,
- count(t1.videoId) as hot
- from (
- select
- videoId,
- category_name
- from
- gulivideo_orc lateral view explode(category) t_catetory as category_name) t1
- group by
- t1.category_name
- order by
- hot
- desc limit
- 10;
思路:
1) 先找到观看数最高的20个视频所属条目的所有信息,降序排列
- SELECT
- videoId,views,category
- FROM
- gulivideo_orc
- ORDER BY
- views DESC
- LIMIT 20;t1
2) 把这20条信息中的category分裂出来(列转行)
- SELECT
- distinct(category_name)
- FROM
- (
- SELECT
- videoId,
- views,
- category
- FROM
- gulivideo_orc
- ORDER BY
- views DESC
- LIMIT 20
- ) t1 lateral VIEW explode (category) category_t AS category_name;
- t2
3) 最后查询视频分类名称和该分类下有多少个Top20的视频
最终代码:
- select
- category_name as category,
- count(t2.videoId) as hot_with_views
- from (
- select
- videoId,
- category_name
- from (
- select
- *
- from
- gulivideo_orc
- order by
- views
- desc limit
- 20) t1 lateral view explode(category) t_catetory as category_name) t2
- group by
- category_name
- order by
- hot_with_views
- desc;

1)查询出观看数最多的前50个视频的所有信息(当然包含了每个视频对应的关联视频),记为临时表t1
t1:观看数前50的视频
- select
- *
- from
- gulivideo_orc
- order by
- views
- desc limit
- 50;
2)将找到的50条视频信息的相关视频relatedId列转行,记为临时表t2
t2:将相关视频的id进行列转行操作
- select
- explode(relatedId) as videoId
- from
- t1;
3)将相关视频的id和gulivideo_orc表进行inner join操作
t5:得到两列数据,一列是category,一列是之前查询出来的相关视频id
- (select
- distinct(t2.videoId),
- t3.category
- from
- t2
- inner join
- gulivideo_orc t3 on t2.videoId = t3.videoId) t4 lateral view explode(category) t_catetory as category_name;
4) 按照视频类别进行分组,统计每组视频个数,然后排行
最终代码:
- select
- category_name as category,
- count(t5.videoId) as hot
- from (
- select
- videoId,
- category_name
- from (
- select
- distinct(t2.videoId),
- t3.category
- from (
- select
- explode(relatedId) as videoId
- from (
- select
- *
- from
- gulivideo_orc
- order by
- views
- desc limit
- 50) t1) t2
- inner join
- gulivideo_orc t3 on t2.videoId = t3.videoId) t4 lateral view explode(category) t_catetory as category_name) t5
- group by
- category_name
- order by
- hot
- desc;

思路:
1) 要想统计Music类别中的视频热度Top10,需要先找到Music类别,那么就需要将category展开,所以可以创建一张表用于存放categoryId展开的数据。
2) 向category展开的表中插入数据。
3) 统计对应类别(Music)中的视频热度。
最终代码:
创建表类别表:
- create table gulivideo_category(
- videoId string,
- uploader string,
- age int,
- categoryId string,
- length int,
- views int,
- rate float,
- ratings int,
- comments int,
- relatedId array<string>)
- row format delimited
- fields terminated by "\t"
- collection items terminated by "&"
- stored as orc;
向类别表中插入数据:
- insert into table gulivideo_category
- select
- videoId,
- uploader,
- age,
- categoryId,
- length,
- views,
- rate,
- ratings,
- comments,
- relatedId
- from
- gulivideo_orc lateral view explode(category) catetory as categoryId;
统计Music类别的Top10(也可以统计其他)
- select
- videoId,
- views
- from
- gulivideo_category
- where
- categoryId = "Music"
- order by
- views
- desc limit
- 10;
-
思路:
1) 创建视频类别展开表(categoryId列转行后的表)
2) 按照ratings排序即可
最终代码:
- select
- videoId,
- views,
- ratings
- from
- gulivideo_category
- where
- categoryId = "Music"
- order by
- ratings
- desc limit
- 10;
思路:
1) 先找到上传视频最多的10个用户的用户信息
- select
- *
- from
- gulivideo_user_orc
- order by
- videos
- desc limit
- 10;
2) 通过uploader字段与gulivideo_orc表进行join,得到的信息按照views观看次数进行排序即可。
最终代码:
- select
- t2.videoId,
- t2.views,
- t2.ratings,
- t1.videos,
- t1.friends
- from (
- select
- *
- from
- gulivideo_user_orc
- order by
- videos desc
- limit
- 10) t1
- join
- gulivideo_orc t2
- on
- t1.uploader = t2.uploader
- order by
- views desc
- limit
- 20;

将观看次数设置为100检测,可以看出以上代码没问题
思路:
1) 先得到categoryId展开的表数据
2) 子查询按照categoryId进行分区,然后分区内排序,并生成递增数字,该递增数字这一列起名为rank列
3) 通过子查询产生的临时表,查询rank值小于等于10的数据行即可。
最终代码:
- select
- t1.*
- from (
- select
- videoId,
- categoryId,
- views,
- row_number() over(partition by categoryId order by views desc) rank from gulivideo_category) t1
- where
- rank <= 10;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。