赞
踩
字段 | 备注 | 详细信息 |
---|---|---|
video id | 视频唯一id(String) | 11位字符串 |
uploader | 视频上传者(String) | 上传视频的用户名String |
age | 视频年龄(int) | 视频在平台上的整数天 |
category | 视频类别(Array) | 上传视频指定的视频分类 |
length | 视频长度(Int) | 整形数字标识的视频长度 |
views | 观看次数(Int) | 视频被浏览的次数 |
rate | 视频评分(Double) | 满分5分 |
Ratings | 流量(Int) | 视频的流量,整型数字 |
conments | 评论数(Int) | 一个视频的整数评论数 |
related ids | 相关视频id(Array) | 相关视频的id,最多20个 |
字段 | 备注 | 详细信息 |
---|---|---|
uploader | 上传者用户名 | string |
videos | 上传视频数 | int |
friends | 朋友数量 | int |
通过观察原始数据形式发现,视频可以有多个所属分类,每个所属分类用&符号分割,且分割的两边有空格字符,同时相关视频也是可以有多个元素,多个相关视频又用“\t”进行分割。为了分析数据时方便对存在多个子元素的数据进行操作,我们首先进行数据重组清洗操作。即:将所有的类别用“&”分割,同时去掉两边空格,多个相关视频id也使用“&”进行分割。
在HDFS服务器hikevideo目录下创建文件夹video和user,在此目录下将原始数据上传。
--在pom.xml文件中引入依赖 <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>2.12.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.1.3</version> </dependency> <!-- <dependency>--> <!-- <groupId>org.apache.hadoop</groupId>--> <!-- <artifactId>hadoop-client-runtime</artifactId>--> <!-- <version>3.1.3</version>--> <!-- </dependency>--> </dependencies>
--创建log4j2.xml文件 <?xml version="1.0" encoding="UTF-8"?> <Configuration status="error" strict="true" name="XMLConfig"> <Appenders> <!-- 类型名为Console,名称为必须属性 --> <Appender type="Console" name="STDOUT"> <!-- 布局为PatternLayout的方式, 输出样式为[INFO] [2018-01-22 17:34:01][org.test.Console]I'm here --> <Layout type="PatternLayout" pattern="[%p] [%d{yyyy-MM-dd HH:mm:ss}][%c{10}]%m%n" /> </Appender> </Appenders> <Loggers> <!-- 可加性为false --> <Logger name="test" level="info" additivity="false"> <AppenderRef ref="STDOUT" /> </Logger> <!-- root loggerConfig设置 --> <Root level="info"> <AppenderRef ref="STDOUT" /> </Root> </Loggers> </Configuration>
package com.hike.etl; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class ETLMapper extends Mapper<LongWritable,Text,Text,NullWritable> { private Counter pass; private Counter fail; private StringBuilder sb = new StringBuilder(); private Text result = new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException { pass = context.getCounter("ETL","pass"); fail = context.getCounter("ETL","fail"); } /** * 将一行日志进行处理:把第四个字段中的空格去掉,将最后相关视频字段的分隔符改成‘&’, * 并且字段长度不够的数据要清理掉 * @param key 行号 * @param value 一行日志 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); //判断字段个数是否足够,除了最后一个相关信息字段不存在外,其他字段都要存在 if(fields.length >= 9){ //处理数据,去掉第四个字段的空格 fields[3] = fields[3].replace(" ", ""); //拼接字段成一行,并注意最后几个字段的分隔符 sb.setLength(0); //拼接之前,将以前旧的字符串清零,使得每一次拼接字符串都是从0开始拼接 for (int i = 0; i < fields.length; i++) { //如果当前正在拼接的字段是这一行的最后一个字段 if(i == fields.length - 1){ sb.append(fields[i]); }else if(i <= 8){ //如果拼接的是前9个字段加上分隔符\t sb.append(fields[i]).append("\t"); }else { //如果拼接的是相关视频的字段,使用&隔开 sb.append(fields[i]).append("&"); } } //最后转换成string写出去 result.set(sb.toString()); context.write(result,NullWritable.get()); pass.increment(1); }else{ //清楚数据,不写出即可 fail.increment(1); } } }
package com.hike.etl; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import javax.xml.soap.Text; import java.io.IOException; public class ETLDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration entries = new Configuration(); //设置mapreduce在tez引擎上运行 entries.set("mapreduce.framwork.name","yarn-tez"); Job job = Job.getInstance(entries); job.setJarByClass(ETLDriver.class); job.setMapperClass(ETLMapper.class); job.setNumReduceTasks(0); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }
yarn jar etltool-1.0-SNAPSHOT.jar com.hike.etl.ETLDriver /hikevideo/video /hikevideo/video_etl
在user和video_etl目录下创建两张外部表,将数据囊括进去,外部表中的数据不直接进行查找,因为文本数据查询效率比较低,一般是创建外部表,再将外部表的数据导入到内部表中。
最简单的etl工作一般是在导表这一步骤进行的。
一般引用外部数据的时候都会创建外部表,因为数据是共享的,外部表被删除数据仍然存在。
--创建外部表video_ori create external table video_ori( videoId string, uploader string, age int, category array<string>, length int, views int, rate float, ratings int, comments int, relatedId array<string>) row format delimited fields terminated by "\t" collection items terminated by "&" location '/hikevideo/video_etl'; --创建外部表user_uri create external table user_ori( uploader string, videos int, friends int) row format delimited fields terminated by "\t" location '/hikevideo/user';
--创建video_orc create table video_orc( videoId string, uploader string, age int, category array<string>, length int, views int, rate float, ratings int, comments int, relatedId array<string>) stored as orc tblproperties("orc.compress"="SNAPPY"); --创建内部表user_orc create table user_orc( uploader string, videos int, friends int) stored as orc tblproperties("orc.compress"="SNAPPY");
在导入数据的过程中,如果有需要,可以进行必要的数据清洗操作
--从外部表插入数据
insert into table video_orc select * from video_ori;
insert into table user_orc select * from user_ori;
--使用order by按照views字段做一个全局排序即可,同时设置只显示前10条
select
videoid,
views
from
video_orc
order by
views desc
limit 10;
定义视频类别热度(假设按照类别下视频的个数决定)
类别是一个数组,需要将内部元素分开
select
videoid,
cate
from
video_orc lateral view explode(category) tbl as cate;
在以上基础上统计各个类别有多少视频,并排序取前10
select
cate,
count(videoid) n
from
(select
videoid,
cate
from
video_orc lateral view explode(category) tbl as cate) t1
group by
cate
order by n limit 10;
统计前20视频和其类别
select
videoid,
views,
category
from
video_orc
order by
views desc
limit
20;
将类别分开
select
videoid,
cate
from
t1 lateral view explode(category) tbl as cate;
按照类别统计个数
select
cate,
count(videoid) n
from
t2
group by
cate
order by
n desc;
合并以上三个步骤
select cate, count(videoid) n from (select videoid, cate from (select videoid, views, category from video_orc order by views desc limit 20)t1 lateral view explode(category) tbl as cate)t2 group by cate order by n desc;
统计视频频观看数前50所关联的视频
select
videoid,
views,
relatedid
from
video_orc
order by
views desc
limit 50
将关联视频的类别拆开
select
explode(relatedid) videoid
from
t1
和原表join获取类别
select
distinct t2.videoid,
v.category
from
t2
join
video_orc v
on
t2.videoid=v.ideoid
将关联视频的类别拆散开
select
explode(category) cate
from
t3
和类别热度表join排序
select
distinct t4.cate,
t5.n
from
t4
join
t5
on
t4.cate=t5.cate
order by
t5.n desc
综合
select distinct t4.cate, t5.n from ( select explode(category) cate from ( select distinct t2.videoid, v.category from ( select explode(relatedid) videoid from ( select videoid, views, relatedid from video_orc order by views desc limit 50 )t1 )t2 join video_orc v on t2.videoid=v.ideoid )t3 )t4 join ( select cate, count(videoid) n from (select videoid, cate from video_orc lateral view explode(category) tbl as cate) x1 group by cate )t5 on t4.cate=t5.cate order by t5.n desc;
将视频表的类别拆开
create table video_category
stored as orc tblproperties("orc.compress"="SNAPPY") as
select
videoid,
uploader,
age,
cate,
length,
views,
ratings,
comments,
relatedid
from
video_orc
lateral view explode(category) tbl as cate;
从中间表格查询music类的前10视频
select
videoid,
views
from
video_category
where
cate="Music"
order by
views desc
limit 10;
从video_category查询Music类的流量前10视频
select
videoid,
ratings
from
video_category
where
cate="Music"
order by
ratings desc
limit 10;
统计上传视频最多的用户Top10
select uploader,videos from user_orc order by videos desc limit 10;
和video_orc联立,找出这些用户上传的视频
select
t1.uploader,
v.videoid,
rank() over(partition by t1.uploader order by v.views desc) hot
from
t1
join video_orc v on t1.uploader=v.uploader;
求每人前20
select t2.uploader, t2.videoid, t2.hot from ( select t1.uploader, v.videoid, rank() over(partition by t1.uploader order by v.views desc) hot from ( select uploader,videos from user_orc order by videos desc limit 10 )t1 join video_orc v on t1.uploader=v.uploader; )t2 where hot <= 20;
求视频上传最多的前10用户
select uploader,videos from user_orc order by videos desc limit 10;
求观看数总榜单前二十的视频
select
videoid,
uploader,
views
from
video_orc
order by
views desc
limit 20;
联立两张表
select t1.uploader, t2.videoid from ( select uploader,videos from user_orc order by videos desc limit 10 )t1 join ( select videoid, uploader, views from video_orc order by views desc limit 20 )t2 on t1.uploader=t2.uploader;
从video_category表查出每个类别视频观看数排名
select
cate,
videoid,
views,
rank() over(partition by cate order by views desc) hot
from
video_category
取每个类别的top10
select cate, videoid, views from ( select cate, videoid, views, rank() over(partition by cate order by views desc) hot from video_category )t1 where hot<=10;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。