当前位置:   article > 正文

Mapreduce setup和clearup解析_hadoop中的clearup函数

hadoop中的clearup函数


hadoop中的MapReduce框架里已经预定义了相关的接口,其中如Mapper类下的方法setup()和cleanup()。
  • setup(),此方法被MapReduce框架仅且执行一次,在执行Map任务前,进行相关变量或者资源的集中初始化
  • 工作。若是将资源初始化工作放在方法map()中,导致Mapper任务在解析每一行输入时都会进行资源初始化工作,导致重复,程序运行效率不高!
  • cleanup(),此方法被MapReduce框架仅且执行一次,在执行完毕Map任务后,进行相关变量或资源的释放工作。若是将释放资源工作放入方法map()中,也会导致Mapper任务在解析、处理每一行文本后释放资源,而
  • 且在下一行文本解析前还要重复初始化,导致反复重复,程序运行效率不高!
如下面代码

  1. public class Matrix {
  2. public static class MatrixMap extends Mapper<Object,Text,Text,Text>{
  3. private Text map_key = new Text();
  4. private Text map_value = new Text();
  5. int columnN;
  6. int rowM;
  7. //执行map函数之前通过setup从输入文件名中取出部分信息 赋值给全局变量
  8. public void setup(Context context){
  9. Configuration conf = context.getConfiguration();
  10. columnN = Integer.parseInt(conf.get("columnN"));
  11. rowM = Integer.parseInt(conf.get("rowM"));
  12. }
  13. @Override
  14. protected void map(Object key, Text value,Context context)throws IOException, InterruptedException {
  15. FileSplit fileSplit = (FileSplit) context.getInputSplit();
  16. String filename = fileSplit.getPath().getName();
  17. if(filename.contains("M")){
  18. String[] tuple = value.toString().split(",");
  19. int i = Integer.parseInt(tuple[0]);
  20. String[] tuples = tuple[1].split("\t");
  21. int j = Integer.parseInt(tuples[0]);
  22. int Mij = Integer.parseInt(tuples[1]);
  23. for(int k =1;k<columnN+1;k++){
  24. map_key.set(i+","+k);
  25. map_value.set("M"+","+j+","+Mij);
  26. context.write(map_key, map_value);
  27. }
  28. }else if(filename.contains("N")){
  29. String[] tuple = value.toString().split(",");
  30. int j = Integer.parseInt(tuple[0]);
  31. String[] tuples = tuple[1].split("\t");
  32. int k = Integer.parseInt(tuples[1]);
  33. int Njk = Integer.parseInt(tuples[1]);
  34. for(int i =1;i<columnN+1;i++){
  35. map_key.set(i+","+k);
  36. map_value.set("N"+","+j+","+Njk);
  37. context.write(map_key, map_value);
  38. }
  39. }
  40. }
  41. }
  1. public static void main(String[] args) throws Exception {
  2. if (args.length != 3) {
  3. System.err
  4. .println("Usage: MatrixMultiply <inputPathM> <inputPathN> <outputPath>");
  5. System.exit(2);
  6. } else {
  7. String[] infoTupleM = args[0].split("_");
  8. rowM = Integer.parseInt(infoTupleM[1]);
  9. columnM = Integer.parseInt(infoTupleM[2]);
  10. String[] infoTupleN = args[1].split("_");
  11. columnN = Integer.parseInt(infoTupleN[2]);
  12. }
  13. Configuration conf = new Configuration();
  14. /** 设置三个全局共享变量 **/
  15. conf.setInt("rowM", rowM);
  16. conf.setInt("columnM", columnM);
  17. conf.setInt("columnN", columnN);
  18. Job job = new Job(conf, "MatrixMultiply");
  19. job.setJarByClass(MatrixMultiply.class);
  20. job.setMapperClass(MatrixMapper.class);
  21. job.setReducerClass(MatrixReducer.class);
  22. job.setOutputKeyClass(Text.class);
  23. job.setOutputValueClass(Text.class);
  24. FileInputFormat.setInputPaths(job, new Path(args[0]), new Path(args[1]));
  25. FileOutputFormat.setOutputPath(job, new Path(args[2]));
  26. System.exit(job.waitForCompletion(true) ? 0 : 1);
  27. }


声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/人工智能uu/article/detail/760020
推荐阅读
相关标签
  

闽ICP备14008679号