赞
踩
需求分析:
商品的点击量可以反映一个商品的受欢迎程度,最受欢迎的商品的定义是在所有商品中点击量最多的商品Top5,利用Map统计出所有被点击过的商品,利用Reduce统计出所有商品的点击次数,最后在cleanup阶段统计出Top5,输出到结果文件。
数据来源:
根据天池上下载的淘宝2015年7月1日至2015年11月30日之间的用户行为数据分析点击率最高的商品。
部分数据如下
结果:
可视化:如下图1所示
完整代码:
Map阶段
package clickrate; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable>{ Text k = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1 获取一行 String line = value.toString(); //2 切割单词 String[] fields = line.split("\t"); k.set(fields[2]); //3 写出 context.write(k, v); } }
Reduce阶段
package clickrate; import java.io.IOException; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{ IntWritable v = new IntWritable(); Map<String,Integer> map=new HashMap<String,Integer>(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum=0; // 1 累加求和 for (IntWritable value : values) { sum+=value.get(); } // 2 放入集合 String k=key.toString(); map.put(k, sum); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { //这里将map.entrySet()转换成list List<Map.Entry<String,Integer>> list=new LinkedList<Map.Entry<String,Integer>>(map.entrySet()); //通过比较器来实现排序 Collections.sort(list, new Comparator<Map.Entry<String, Integer>>(){ //升序排序 @Override public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) { return (int)(o2.getValue()-o1.getValue()); } }); for(int i=0;i<5;i++){ context.write(new Text(list.get(i).getKey()), new IntWritable(list.get(i).getValue())); } } }
Driver
package clickrate; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class ClickRate { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException{ Configuration conf = new Configuration(); //1 获取job对象 Job job = Job.getInstance(conf); //2 设置jar存储位置 job.setJarByClass(ClickRate.class); //3 关联map和reduce类 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); //4 设置Mapper阶段输出数据的key和value类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //5 设置最终数据输出的key和value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //6 设置输入路径和输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //7 提交job //job.submit(); boolean result = job.waitForCompletion(true); System.exit(result? 0 : 1); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。