赞
踩
执行DateDistinct.java的时候,需要在在Eclipse中创建项目的基础上添加以下的JAR包
(1)“/usr/local/hadoop/share/yarn/sources”目录下的所有JAR包;
(2)“/usr/local/hadoop/share/hadoop/yarn/lib”目录下的所有JAR包;
(3)“/usr/local/hadoop/share/hadoop/yarn”目录下的所有JAR包,但是,不包括目录,具体如下图所示。
在本地中建立input文本文件
然后在基础链接上配置eclipse中的jar包以及开启hadoop,使用以下代码将input.txt上传
- import org.apache.hadoop.conf.Configuration;
-
- public class MergeFile {
-
- public static void main(String[] args) throws IOException {
- String localFilePath = "/home/hadoop/haha.txt";
- String hdfsFilePath = "/user/hadoop/input/haha.txt";
-
- Configuration conf = new Configuration();
- conf.set("fs.defaultFS", "hdfs://localhost:9000");
- FileSystem fs = FileSystem.get(conf);
-
- Path localPath = new Path(localFilePath);
- Path hdfsPath = new Path(hdfsFilePath);
-
- fs.copyFromLocalFile(localPath, hdfsPath);
- System.out.println("File uploaded to HDFS successfully!");
- }
- }

然后可以在新建一个.java文件
使用一下代码查看路径/user/hadoop/input/内有哪些hdfs文件,是否上传成功
- import org.apache.hadoop.conf.Configuration;
-
- public class MergeFile2 {
-
- public static void main(String[] args) throws IOException {
- String hdfsFolderPath = "/user/hadoop/input/";
-
- Configuration conf = new Configuration();
- conf.set("fs.defaultFS", "hdfs://localhost:9000");
- FileSystem fs = FileSystem.get(conf);
-
- Path folderPath = new Path(hdfsFolderPath);
-
- FileStatus[] fileStatuses = fs.listStatus(folderPath);
- for (FileStatus file : fileStatuses) {
- System.out.println(file.getPath().getName());
- }
- }
- }

运行以下代码,可以实现数据去重以及展示结果
- import java.io.IOException;
- import java.util.StringTokenizer;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import java.io.BufferedReader;
- import java.io.InputStreamReader;
-
- public class DateDistinct {
-
- public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
- private final static IntWritable one = new IntWritable(1);
- private Text word = new Text();
-
- public void map(Object key, Text value, Context context ) throws IOException, InterruptedException {
- StringTokenizer itr = new StringTokenizer(value.toString());
- while (itr.hasMoreTokens()) {
- word.set(itr.nextToken());
- context.write(word, one);
- }
- }
- }
-
- public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
- private IntWritable result = new IntWritable();
-
- public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
- int sum = 0;
- for (IntWritable val : values) {
- sum += val.get();
- }
- result.set(sum);
- context.write(key, result);
- }
- }
-
- public static void main(String[] args) throws Exception {
- //1.设置HDFS配置信息
- String namenode_ip = "localhost";
- String hdfs = "hdfs://localhost:9000";
- Configuration conf = new Configuration(); //Hadoop配置类
- conf.set("fs.defaultFS", "hdfs://localhost:9000");
- conf.set("mapreduce.app-submission.cross-platform", "true"); //集群交叉提交
- /* conf.set("hadoop.job.user", "hadoop");
- conf.set("mapreduce.framework.name", "yarn");
- conf.set("mapreduce.jobtracker.address", namenode_ip + ":9001");
- conf.set("yarn.resourcemanager.hostname", namenode_ip);
- conf.set("yarn.resourcemanager.resource-tracker.address", namenode_ip + ":8031");
- conf.set("yarn.resourcemtanager.address", namenode_ip + ":8032");
- conf.set("yarn.resourcemanager.admin.address", namenode_ip + ":8033");
- conf.set("yarn.resourcemanager.scheduler.address", namenode_ip + ":8034");
- conf.set("mapreduce.jobhistory.address", namenode_ip + ":10020"); */
-
- //2.设置MapReduce作业配置信息
- String jobName = "WordCount"; //定义作业名称
- Job job = Job.getInstance(conf, jobName);
- job.setJarByClass(DateDistinct.class); //指定作业类
- job.setJar("/usr/local/hadoop/myapp/WordCount.jar"); //指定本地jar包
- job.setMapperClass(TokenizerMapper.class);
- job.setCombinerClass(IntSumReducer.class); //指定Combiner类
- job.setReducerClass(IntSumReducer.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
- System.out.println("Hello, World!");
- //3.设置作业输入和输出路径
- String dataDir = "/user/hadoop/input"; //实验数据目录
- String outputDir = "/user/hadoop/output"; //实验输出目录
- Path inPath = new Path(hdfs + dataDir);
- Path outPath = new Path(hdfs + outputDir);
- FileInputFormat.addInputPath(job, inPath);
- FileOutputFormat.setOutputPath(job, outPath);
- //如果输出目录已存在则删除
- FileSystem fs = FileSystem.get(conf);
- if(fs.exists(outPath)) {
- fs.delete(outPath, true);
- }
-
- //4.运行作业
- System.out.println("Job: " + jobName + " is running...");
- if(job.waitForCompletion(true)) {
- System.out.println("success!");
- Path resultPath = new Path(hdfs + outputDir + "/part-r-00000"); // 输出结果文件路径
- BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(resultPath)));
- String line;
- while ((line = br.readLine()) != null) {
- System.out.println(line);
- }
- br.close();
- System.exit(0);
- } else {
- System.out.println("failed!");
- System.exit(1);
- }
- }
-
- }

成功结果如下
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。