当前位置:   article > 正文

Hadoop WordCount统计小说每个字的出现的次数并排序_wordcount词频统计顺序

wordcount词频统计顺序

闲来无事,想看看小说中的什么字出现的频率比较高,就改了一下WordCount的程序。

原理:

  主要的核心就是WordCount,那就先说下WordCount。

WordCount:

  Map:

  对每一行的输入,扫描到一个单词就将key设置为这个字符,将value设置为1。

Combiner:

  将同一个key中的链表中的value进行求和求出暂时这个字符的次数,key不变,value为新的次数。

Reduce:

  和Combiner的工作是一样的,Combiner就是为了减少中途的数据传输量。

上面就是最基础的WordCount了,我们要改进的是两个操作,一个是能读中文字符,第二个就是按照降序排列。

一:读中文字符:

原来的WorCount是根据空格Tab符进行分割,分割后是一个个的单词,而我们要将其改为中文字符就要用正则表达式的匹配了,在“UTF-8“编码下的中文字符的正则表达式为“[\\u4e00-\\u9fa5]”。(一定要小心的编码,如果编码不对结果就会出现错误,可以将文本主动保存为utf-8格式的就可以了。)然后将所有的找出来作为key就可以了。

  1. Pattern p=Pattern.compile("[\\u4e00-\\u9fa5]");
  2. Matcher matcher = p.matcher(line);
  3. while (matcher.find()) {
  4. word.set(matcher.group());
  5. context.write(word, one);
  6. //System.out.println(word);
  7. }

二:降序排列:

  这个我一开始以为可以很简单的,将结果并没有我想象的那么简单,相当于在程序里执行了两次作业,一次统计词频,一次专门排序。排序作业首先在map中将前面结果中的key,value中的值对调,所以现在key是词频,而value是汉字,然后重写IntWritable的 Comparator 类:

  1. private static class IntWritableDecreasingComparator extends IntWritable.Comparator {
  2. public int compare(WritableComparable a, WritableComparable b) {
  3. return -super.compare(a, b);
  4. }
  5. public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
  6. return -super.compare(b1, s1, l1, b2, s2, l2);
  7. }
  8. }

然后设置一下:

sortJob.setSortComparatorClass(IntWritableDecreasingComparator.class);

 

完整代码:

  1. import java.io.IOException;
  2. import java.util.Random;
  3. import java.util.StringTokenizer;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.FileSystem;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.IntWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.io.WritableComparable;
  10. import org.apache.hadoop.mapreduce.Job;
  11. import org.apache.hadoop.mapreduce.Mapper;
  12. import org.apache.hadoop.mapreduce.Reducer;
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
  15. import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
  16. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  17. import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
  18. import org.apache.hadoop.util.GenericOptionsParser;
  19. import java.util.regex.*;
  20. public class Dedup {
  21. public static class TokenizerMapper extends
  22. Mapper<Object, Text, Text, IntWritable> {
  23. private final static IntWritable one = new IntWritable(1);
  24. private Text word = new Text();
  25. private String pattern = "[^\\u4e00-\\u9fa5]"; // 正则表达式,代表不是0-9, a-z, A-Z的所有其它字符,其中还有下划线
  26. Pattern p=Pattern.compile("[\\u4e00-\\u9fa5]");
  27. public void map(Object key, Text value, Context context)
  28. throws IOException, InterruptedException {
  29. String line = value.toString().toLowerCase(); // 全部转为小写字母
  30. // line = line.replaceAll(pattern, " "); // 将非0-9, a-z, A-Z的字符替换为空格
  31. //String itr[] = line.split(" ");
  32. //for (int i=0;i<itr.length;i++) {
  33. Matcher matcher = p.matcher(line);
  34. while (matcher.find()) {
  35. word.set(matcher.group());
  36. context.write(word, one);
  37. //System.out.println(word);
  38. }
  39. //}
  40. }
  41. }
  42. public static class IntSumReducer extends
  43. Reducer<Text, IntWritable, Text, IntWritable> {
  44. private IntWritable result = new IntWritable();
  45. public void reduce(Text key, Iterable<IntWritable> values,
  46. Context context) throws IOException, InterruptedException {
  47. int sum = 0;
  48. for (IntWritable val : values) {
  49. sum += val.get();
  50. }
  51. result.set(sum);
  52. context.write(key, result);
  53. }
  54. }
  55. private static class IntWritableDecreasingComparator extends IntWritable.Comparator {
  56. public int compare(WritableComparable a, WritableComparable b) {
  57. return -super.compare(a, b);
  58. }
  59. public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
  60. return -super.compare(b1, s1, l1, b2, s2, l2);
  61. }
  62. }
  63. public static void main(String[] args) throws Exception {
  64. Configuration conf = new Configuration();
  65. String[] otherArgs = new GenericOptionsParser(conf, args)
  66. .getRemainingArgs();
  67. FileSystem fs = FileSystem.get(conf);
  68. if (otherArgs.length != 2) {
  69. System.err.println("Usage: wordcount <in> <out>");
  70. System.exit(2);
  71. }
  72. if(fs.exists(new Path(otherArgs[1])))
  73. {
  74. fs.delete(new Path(otherArgs[1]));
  75. }
  76. Path tempDir = new Path("wordcount-temp-" + Integer.toString(
  77. new Random().nextInt(Integer.MAX_VALUE))); //定义一个临时目录
  78. Job job = new Job(conf, "word count");
  79. job.setJarByClass(Dedup.class);
  80. try{
  81. job.setMapperClass(TokenizerMapper.class);
  82. job.setCombinerClass(IntSumReducer.class);
  83. job.setReducerClass(IntSumReducer.class);
  84. job.setOutputKeyClass(Text.class);
  85. job.setOutputValueClass(IntWritable.class);
  86. FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  87. FileOutputFormat.setOutputPath(job, tempDir);//先将词频统计任务的输出结果写到临时目
  88. //录中, 下一个排序任务以临时目录为输入目录。
  89. job.setOutputFormatClass(SequenceFileOutputFormat.class);
  90. if(job.waitForCompletion(true))
  91. {
  92. Job sortJob = new Job(conf, "sort");
  93. sortJob.setJarByClass(Dedup.class);
  94. FileInputFormat.addInputPath(sortJob, tempDir);
  95. sortJob.setInputFormatClass(SequenceFileInputFormat.class);
  96. /*InverseMapper由hadoop库提供,作用是实现map()之后的数据对的key和value交换*/
  97. sortJob.setMapperClass(InverseMapper.class);
  98. /*将 Reducer 的个数限定为1, 最终输出的结果文件就是一个。*/
  99. sortJob.setNumReduceTasks(1);
  100. FileOutputFormat.setOutputPath(sortJob, new Path(otherArgs[1]));
  101. sortJob.setOutputKeyClass(IntWritable.class);
  102. sortJob.setOutputValueClass(Text.class);
  103. /*Hadoop 默认对 IntWritable 按升序排序,而我们需要的是按降序排列。
  104. * 因此我们实现了一个 IntWritableDecreasingComparator 类,
  105. * 并指定使用这个自定义的 Comparator 类对输出结果中的 key (词频)进行排序*/
  106. sortJob.setSortComparatorClass(IntWritableDecreasingComparator.class);
  107. //fs.delete(tempDir);
  108. if(sortJob.waitForCompletion(true))//删除中间文件
  109. fs.delete(tempDir);
  110. System.exit(sortJob.waitForCompletion(true) ? 0 : 1);
  111. }
  112. }finally{
  113. fs.delete(tempDir);
  114. }
  115. }
  116. }

最后的结果:

看看前面的“萧”和“炎”字,没错这就是斗破苍穹。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/731492
推荐阅读
相关标签
  

闽ICP备14008679号