当前位置:   article > 正文

课程笔记(厦门大学林子雨编著的《大数据技术原理与应用(第3版)》)_厦门大学林子雨jar包

厦门大学林子雨jar包

第一步:配置环境

执行DateDistinct.java的时候,需要在在Eclipse中创建项目的基础上添加以下的JAR包

(1)“/usr/local/hadoop/share/yarn/sources”目录下的所有JAR包;
(2)“/usr/local/hadoop/share/hadoop/yarn/lib”目录下的所有JAR包;
(3)“/usr/local/hadoop/share/hadoop/yarn”目录下的所有JAR包,但是,不包括目录,具体如下图所示。

第二步:上传文件

在本地中建立input文本文件

然后在基础链接上配置eclipse中的jar包以及开启hadoop,使用以下代码将input.txt上传

  1. import org.apache.hadoop.conf.Configuration;
  2. public class MergeFile {
  3. public static void main(String[] args) throws IOException {
  4. String localFilePath = "/home/hadoop/haha.txt";
  5. String hdfsFilePath = "/user/hadoop/input/haha.txt";
  6. Configuration conf = new Configuration();
  7. conf.set("fs.defaultFS", "hdfs://localhost:9000");
  8. FileSystem fs = FileSystem.get(conf);
  9. Path localPath = new Path(localFilePath);
  10. Path hdfsPath = new Path(hdfsFilePath);
  11. fs.copyFromLocalFile(localPath, hdfsPath);
  12. System.out.println("File uploaded to HDFS successfully!");
  13. }
  14. }

然后可以在新建一个.java文件

 

使用一下代码查看路径/user/hadoop/input/内有哪些hdfs文件,是否上传成功

  1. import org.apache.hadoop.conf.Configuration;
  2. public class MergeFile2 {
  3. public static void main(String[] args) throws IOException {
  4. String hdfsFolderPath = "/user/hadoop/input/";
  5. Configuration conf = new Configuration();
  6. conf.set("fs.defaultFS", "hdfs://localhost:9000");
  7. FileSystem fs = FileSystem.get(conf);
  8. Path folderPath = new Path(hdfsFolderPath);
  9. FileStatus[] fileStatuses = fs.listStatus(folderPath);
  10. for (FileStatus file : fileStatuses) {
  11. System.out.println(file.getPath().getName());
  12. }
  13. }
  14. }

第三步:运行DateDistinct.java

运行以下代码,可以实现数据去重以及展示结果

  1. import java.io.IOException;
  2. import java.util.StringTokenizer;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.FileSystem;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.IntWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.Mapper;
  10. import org.apache.hadoop.mapreduce.Reducer;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  13. import java.io.BufferedReader;
  14. import java.io.InputStreamReader;
  15. public class DateDistinct {
  16. public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
  17. private final static IntWritable one = new IntWritable(1);
  18. private Text word = new Text();
  19. public void map(Object key, Text value, Context context ) throws IOException, InterruptedException {
  20. StringTokenizer itr = new StringTokenizer(value.toString());
  21. while (itr.hasMoreTokens()) {
  22. word.set(itr.nextToken());
  23. context.write(word, one);
  24. }
  25. }
  26. }
  27. public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
  28. private IntWritable result = new IntWritable();
  29. public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  30. int sum = 0;
  31. for (IntWritable val : values) {
  32. sum += val.get();
  33. }
  34. result.set(sum);
  35. context.write(key, result);
  36. }
  37. }
  38. public static void main(String[] args) throws Exception {
  39. //1.设置HDFS配置信息
  40. String namenode_ip = "localhost";
  41. String hdfs = "hdfs://localhost:9000";
  42. Configuration conf = new Configuration(); //Hadoop配置类
  43. conf.set("fs.defaultFS", "hdfs://localhost:9000");
  44. conf.set("mapreduce.app-submission.cross-platform", "true"); //集群交叉提交
  45. /* conf.set("hadoop.job.user", "hadoop");
  46. conf.set("mapreduce.framework.name", "yarn");
  47. conf.set("mapreduce.jobtracker.address", namenode_ip + ":9001");
  48. conf.set("yarn.resourcemanager.hostname", namenode_ip);
  49. conf.set("yarn.resourcemanager.resource-tracker.address", namenode_ip + ":8031");
  50. conf.set("yarn.resourcemtanager.address", namenode_ip + ":8032");
  51. conf.set("yarn.resourcemanager.admin.address", namenode_ip + ":8033");
  52. conf.set("yarn.resourcemanager.scheduler.address", namenode_ip + ":8034");
  53. conf.set("mapreduce.jobhistory.address", namenode_ip + ":10020"); */
  54. //2.设置MapReduce作业配置信息
  55. String jobName = "WordCount"; //定义作业名称
  56. Job job = Job.getInstance(conf, jobName);
  57. job.setJarByClass(DateDistinct.class); //指定作业类
  58. job.setJar("/usr/local/hadoop/myapp/WordCount.jar"); //指定本地jar包
  59. job.setMapperClass(TokenizerMapper.class);
  60. job.setCombinerClass(IntSumReducer.class); //指定Combiner类
  61. job.setReducerClass(IntSumReducer.class);
  62. job.setOutputKeyClass(Text.class);
  63. job.setOutputValueClass(IntWritable.class);
  64. System.out.println("Hello, World!");
  65. //3.设置作业输入和输出路径
  66. String dataDir = "/user/hadoop/input"; //实验数据目录
  67. String outputDir = "/user/hadoop/output"; //实验输出目录
  68. Path inPath = new Path(hdfs + dataDir);
  69. Path outPath = new Path(hdfs + outputDir);
  70. FileInputFormat.addInputPath(job, inPath);
  71. FileOutputFormat.setOutputPath(job, outPath);
  72. //如果输出目录已存在则删除
  73. FileSystem fs = FileSystem.get(conf);
  74. if(fs.exists(outPath)) {
  75. fs.delete(outPath, true);
  76. }
  77. //4.运行作业
  78. System.out.println("Job: " + jobName + " is running...");
  79. if(job.waitForCompletion(true)) {
  80. System.out.println("success!");
  81. Path resultPath = new Path(hdfs + outputDir + "/part-r-00000"); // 输出结果文件路径
  82. BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(resultPath)));
  83. String line;
  84. while ((line = br.readLine()) != null) {
  85. System.out.println(line);
  86. }
  87. br.close();
  88. System.exit(0);
  89. } else {
  90. System.out.println("failed!");
  91. System.exit(1);
  92. }
  93. }
  94. }

成功结果如下

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号