当前位置:   article > 正文

MapReduce词频统计【自定义复杂类型、自定义Partitioner、NullWritable使用介绍】

nullwritable

一、MapReduce1.0运行模型

 

二、MapReduce编程模型之执行步骤 

1、准备map处理的输入数据

2、交给Mapper进行处理

3、Shuffle【规则可以自己控制】

4、Reduce处理[合并、归并]

5、输出

 

MapReduce处理流程

InputFormat读数据,通过Split将数据切片成InputSplit,通过RecordReader读取记录,再交给map处理,处理后输出一个临时的<k,v>键值对,再将结果交给shuffle处理,最终在reduce中将最后处理后的<k,v>键值对结果通过OutputFormat重新写回到HDFS中。

 

三、词频统计原理图:

Combiner能减少网络IO、提升作业的性能

Combiner的局限性求平均数:总数 / 个数   对于含有除法的操作,需要慎重,有可能结果会不正确

 

四、词频统计具体代码实现[读写在HDFS和本地完成]

0、pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>cn.itcats</groupId>
  7. <artifactId>hadoop-mapreduce</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <dependencies>
  10. <dependency>
  11. <groupId>org.apache.hadoop</groupId>
  12. <artifactId>hadoop-client</artifactId>
  13. <version>2.6.0</version>
  14. </dependency>
  15. <dependency>
  16. <groupId>junit</groupId>
  17. <artifactId>junit</artifactId>
  18. <version>4.12</version>
  19. </dependency>
  20. </dependencies>
  21. </project>

 

1、准备一个自定义的Mapper类

  1. package cn.itcats.hadoop.mapreduce.wordcount;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Mapper;
  6. import java.io.IOException;
  7. /**
  8. * Mapper类4个泛型的含义
  9. * KEYIN: Map任务读数据的key类型,offset,是每行数据起始位置的偏移量,Long(Java)
  10. * VALUEIN:Map任务读数据的value类型,其实就是一行行的字符串,String
  11. *
  12. * 如文本中的数据为 :
  13. * hello world welcome
  14. * hello welcome
  15. *
  16. * KEYOUT: map方法自定义实现输出的key的类型,String
  17. * VALUEOUT:map方法自定义实现输出的value类型,Integer
  18. *
  19. * 词频统计: 相同单词的次数 (word,1)
  20. * Long,String,String,Integer是Java里面的数据类型
  21. * 因为涉及网络传输,需要序列化与反序列化
  22. * 使用Hadoop提供的自定义类型:
  23. * Long => LongWritable String => Text Integer => IntWritable
  24. */
  25. //词频统计
  26. public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
  27. @Override
  28. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  29. //把value对应的行数据按照指定的分隔符拆开
  30. String[] words = value.toString().split(",");
  31. for(String word : words){
  32. //(hello,1) (word,1)
  33. //转成小写,忽略大小写
  34. context.write(new Text(word.toLowerCase()) , new IntWritable(1));
  35. }
  36. }
  37. }

 

2、准备一个自定义的Reducer类

  1. package cn.itcats.hadoop.mapreduce.wordcount;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Reducer;
  5. import java.io.IOException;
  6. import java.util.Iterator;
  7. public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  8. /**
  9. * (hello,1) (world,1)
  10. * (hello,1) (world,1)
  11. * (hello,1) (world,1)
  12. * (welcome,1)
  13. * <p>
  14. * key 为 word values含义:
  15. * map的输出到reduce端,是按照相同的key分发到一个reduce上去执行
  16. * reduce1 : (hello,1) (hello,1) (hello,1) => (hello,[1,1,1])
  17. * reduce2 : (world,1) (world,1) (world,1) => (world,[1,1,1])
  18. * reduce3 : (welcome,1) => (welcome,[1])
  19. *
  20. * Reducer和Mapper中使用到了什么设计模式? 模板模式
  21. */
  22. protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  23. int count = 0;
  24. Iterator<IntWritable> iterator = values.iterator();
  25. while (iterator.hasNext()){
  26. IntWritable value = iterator.next();
  27. count += value.get();
  28. }
  29. context.write(key, new IntWritable(count));
  30. }
  31. }

 

读写在HDFS完成

3、准备一个任务驱动类

  1. package cn.itcats.hadoop.mapreduce.wordcount;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.FileSystem;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.IntWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.Job;
  8. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  9. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  10. import java.net.URI;
  11. /**
  12. * 使用MapReduce统计HDFS上文件对应的词频
  13. * <p>
  14. * Driver: 配置Mapper,Reducer的相关属性
  15. * <p>
  16. * 提交到HDFS运行
  17. *
  18. * 含有Combiner操作
  19. */
  20. public class WordCountCombinerApp {
  21. public static void main(String[] args) throws Exception {
  22. System.setProperty("HADOOP_USER_NAME", "root");
  23. Configuration configuration = new Configuration();
  24. configuration.set("fs.defaultFS", "hdfs://hdp-01:9000");
  25. // 创建一个Job
  26. Job job = Job.getInstance(configuration);
  27. // 设置Job对应的主类、Mapper、Reducer类
  28. job.setJarByClass(WordCountCombinerApp.class);
  29. job.setMapperClass(WordCountMapper.class);
  30. job.setReducerClass(WordCountReducer.class);
  31. //添加Combiner的设置
  32. job.setCombinerClass(WordCountReducer.class);
  33. //设置Job对应的参数: Mapper输出key和value的类型
  34. job.setMapOutputKeyClass(Text.class);
  35. job.setMapOutputValueClass(IntWritable.class);
  36. //设置Job对应的参数: Reducer输出key和value的类型
  37. job.setOutputKeyClass(Text.class);
  38. job.setOutputValueClass(IntWritable.class);
  39. //如果文件已经存在则先删除,否则会报错org.apache.hadoop.mapred.FileAlreadyExistsException
  40. //获取FileSystem对象进行exists/delete操作
  41. FileSystem fileSystem = FileSystem.get(new URI("hdfs://hdp-01:9000"), configuration, "root");
  42. Path outputPath = new Path("/wordcount/output");
  43. if (fileSystem.exists(outputPath)) {
  44. fileSystem.delete(outputPath,true);
  45. }
  46. //设置job作业输入和输出的路径
  47. FileInputFormat.setInputPaths(job, new Path("/wordcount/input"));
  48. FileOutputFormat.setOutputPath(job, outputPath);
  49. //提交job
  50. boolean resullt = job.waitForCompletion(true);
  51. System.exit(resullt ? 0 : 1);
  52. }
  53. }

 

4、将文件上传到HDFS的/wordcount/input中(没有创建提前创建好该文件夹)

1.txt

hello,world,welcome,hello,welcome,Welcome,Hello,haha,Haha,hAha

 

 

读写在本地中完成

将上述的第四步改为:

在工程下创建一个input文件夹

 

修改上述第三步的代码,只new Configuration()即可,最后把输入路径改成input、输出路径改成output即可

  1. package cn.itcats.hadoop.mapreduce.wordcount;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.IntWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  9. import java.io.File;
  10. /**
  11. * 使用MapReduce统计HDFS上文件对应的词频
  12. *
  13. * Driver: 配置Mapper,Reducer的相关属性
  14. *
  15. * 提交到本地运行运行(使用本地文件进行统计,统计结果输出到本地路径)
  16. */
  17. public class WordCountLocalApp {
  18. public static void main(String[] args) throws Exception {
  19. Configuration configuration = new Configuration();
  20. // 创建一个Job
  21. Job job = Job.getInstance(configuration);
  22. // 设置Job对应的主类、Mapper、Reducer类
  23. job.setJarByClass(WordCountLocalApp.class);
  24. job.setMapperClass(WordCountMapper.class);
  25. job.setReducerClass(WordCountReducer.class);
  26. //设置Job对应的参数: Mapper输出key和value的类型
  27. job.setMapOutputKeyClass(Text.class);
  28. job.setMapOutputValueClass(IntWritable.class);
  29. //设置Job对应的参数: Reducer输出key和value的类型
  30. job.setOutputKeyClass(Text.class);
  31. job.setOutputValueClass(IntWritable.class);
  32. //设置job作业输入和输出的路径
  33. FileInputFormat.setInputPaths(job,new Path("input"));
  34. FileOutputFormat.setOutputPath(job,new Path("output"));
  35. //提交job
  36. boolean resullt = job.waitForCompletion(true);
  37. System.exit(resullt ? 0 : 1);
  38. }
  39. }

 

运行测试

运行WordCountLocalApp

得到最终结果输出:

输出结果:

 

 

五、关于自定义复杂类型的介绍

  1. package cn.itcats.hadoop.mapreduce.access;
  2. /*
  3. * 自定义复杂的数据类型
  4. * 对此Hadoop有一些规范
  5. * 1、需要实现Writable
  6. * 2、需要实现write和readFields这两个方法
  7. * 3、需要定义默认的构造方法
  8. */
  9. import lombok.Data;
  10. import org.apache.hadoop.io.Writable;
  11. import java.io.DataInput;
  12. import java.io.DataOutput;
  13. import java.io.IOException;
  14. @Data
  15. public class Access implements Writable {
  16. private String phone;
  17. private long up;
  18. private long down;
  19. private long sum;
  20. public void write(DataOutput dataOutput) throws IOException {
  21. dataOutput.writeUTF(phone); //字符串
  22. dataOutput.writeLong(up); //Long
  23. dataOutput.writeLong(down);
  24. dataOutput.writeLong(sum);
  25. }
  26. public void readFields(DataInput dataInput) throws IOException {
  27. //规范: 严格按照上面写的顺序
  28. this.phone = dataInput.readUTF();
  29. this.up = dataInput.readLong();
  30. this.down = dataInput.readLong();
  31. this.sum = dataInput.readLong();
  32. }
  33. public Access() {
  34. }
  35. }

 

六、NullWritable介绍

若Mapper或Reducer中某个输入或输出不想输出显示,则使用NullWritable替换我们常用的类型(如Text、LongWritable)等

Plus:NullWritable.get(),返回NullWritable类型

 

 

七、自定义Partitioner介绍

默认情况下Map的输出需要做shuffle操作,将key根据一定的算法分发到Reduce上执行【如相同的key,或者具有相似特征的key】,我们也可以自定义分区写数据

numReduceTasks:你的作业所指定的reducer的个数,决定了reduce作业输出文件的个数

HashPartitioner是MapReduce默认的分区规则

  1. //泛型对应map的输出(KEYOUT和VALUEOUT)
  2. public class AccessPartitioner extends Partitioner<Text,Access>{
  3. //根据手机号开头数字,分派到不同的分区
  4. public int getPartition(Text phone, Access access, int numPartitions) {
  5. //总共定义了4种分区规则,后面设置分区数也要填4
  6. if(phone.toString().startsWith("13")){
  7. return 0;
  8. }else if(phone.toString().startsWith("18")){
  9. return 1;
  10. }else if(phone.toString().startsWith("15")){
  11. return 2;
  12. }else{
  13. return 3;
  14. }
  15. }
  16. }

在job任务驱动类中加上一行代码:

  1. //设置自定义分区规则
  2. job.setPartitionerClass(AccessPartitioner.class);
  3. //设置reduce个数
  4. job.setNumReduceTasks(4);

观察output文件输出:【总共4个文件】

  1. part-r-00000
  2. part-r-00001
  3. part-r-00002
  4. part-r-00003

 

总结:Partitioner决定maptask输出的数据交由哪个reducetask处理

默认实现:分发的key的hash值与reduce task个数取模

 

八、程序修改为在Yarn上运行

第一步

那么outputPath、和inputPath则不能写死,修改为args[0]、args[1]

  1. package cn.itcats.hadoop.mapreduce.wordcount;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.FileSystem;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.IntWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.Job;
  8. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  9. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  10. import java.net.URI;
  11. /**
  12. * 使用MapReduce统计HDFS上文件对应的词频
  13. * <p>
  14. * Driver: 配置Mapper,Reducer的相关属性
  15. * <p>
  16. * 提交到HDFS运行
  17. *
  18. * 含有Combiner操作
  19. */
  20. public class WordCountYarnApp {
  21. public static void main(String[] args) throws Exception {
  22. System.setProperty("HADOOP_USER_NAME", "root");
  23. Configuration configuration = new Configuration();
  24. configuration.set("fs.defaultFS", "hdfs://hdp-01:9000");
  25. // 创建一个Job
  26. Job job = Job.getInstance(configuration);
  27. // 设置Job对应的主类、Mapper、Reducer类
  28. job.setJarByClass(WordCountYarnApp.class);
  29. job.setMapperClass(WordCountMapper.class);
  30. job.setReducerClass(WordCountReducer.class);
  31. //添加Combiner的设置
  32. job.setCombinerClass(WordCountReducer.class);
  33. //设置Job对应的参数: Mapper输出key和value的类型
  34. job.setMapOutputKeyClass(Text.class);
  35. job.setMapOutputValueClass(IntWritable.class);
  36. //设置Job对应的参数: Reducer输出key和value的类型
  37. job.setOutputKeyClass(Text.class);
  38. job.setOutputValueClass(IntWritable.class);
  39. //如果文件已经存在则先删除,否则会报错org.apache.hadoop.mapred.FileAlreadyExistsException
  40. //获取FileSystem对象进行exists/delete操作
  41. FileSystem fileSystem = FileSystem.get(new URI("hdfs://hdp-01:9000"), configuration, "root");
  42. Path outputPath = new Path(args[0]);
  43. if (fileSystem.exists(outputPath)) {
  44. fileSystem.delete(outputPath,true);
  45. }
  46. //设置job作业输入和输出的路径
  47. FileInputFormat.setInputPaths(job, new Path(args[1]));
  48. FileOutputFormat.setOutputPath(job, outputPath);
  49. //提交job
  50. boolean result = job.waitForCompletion(true);
  51. System.exit(result ? 0 : 1);
  52. }
  53. }

 

第二步

进入工程,执行maven打包命令

mvn clean package -DskipTests

打包完成后jar包在当前项目的 target/文件夹内 

 

在hadoop机器上执行

  1. hadoop jar hadoop-mapreduce-1.0-SNAPSHOT.jar 完整类名 args[0] arg[1]
  2. //其中上面的args[0]、args[1]都是我们修改源码后的参数,对应输入路径和输出路径,填上执行即可

 

总结:

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小舞很执着/article/detail/747946
推荐阅读
相关标签
  

闽ICP备14008679号