赞
踩
要在 Eclipse 上编译和运行 MapReduce 程序,需要安装 hadoop-eclipse-plugin,可下载 Github 上的 hadoop2x-eclipse-plugin。
下载后,将 release 中的 hadoop-eclipse-kepler-plugin-2.6.0.jar 复制到 Eclipse 安装目录的 plugins 文件夹中,运行 eclipse -clean
重启 Eclipse 即可(添加插件后只需要运行一次该命令,以后按照正常方式启动就行了)。
打开eclipse,进行hadoop插件配置。
选择Window菜单下的Preference。
然后选择Hadoop Map/Reduce,选择hadoop的安装目录,并确认配置。
在输出窗口下又一个蓝色大象,点击可进行hadoop环境配置。
按如下进行设置:
其中,Localtion name可以随意填写,端口号则为9000。还有很多配置参数,为了方便,直接先创建WordCount的MapReduce工程,然后将/usr/local/hadoop/etc/hadoop中的配置文件core-site.xml ,hdfs-site.xml以及 log4j.properties 复制到 WordCount 项目下的 src 文件夹(~/workspace/WordCount/src)中:复制完成后,需要对工程文件进行刷新。
这样在运行MapReduce作业时,就会使用配置文件中的配置参数。
然后就可以进行开发了。
注:HDFS 中的内容变动后,Eclipse 不会同步刷新,需要右键点击 Project Explorer中的 MapReduce Location,选择 Refresh,才能看到变动后的文件。
功能:对指定输入的文件进行单词个数统计,然后输出到指定文件夹中。
程序代码:
import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; public class WordCount{ public WordCount(){ } public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); //指定输入文件路径input和输出文件路径output String[] otherArgs = new String[]{"/input","/output"}; if(otherArgs.length < 2){ System.err.println("没有输入输出路径!"); System.exit(2); } /** * Job:它允许用户配置作业、提交作业、控制其执行和查询状态。 * SET方法仅在提交作业之前工作, * 之后它们将引发非法LealEtExeExchange。 */ //创建没有特定集群和给定作业名的新作业。 //只有当需要时,才会从CONF参数创建一个集群。 //作业生成配置的副本,以便任何必要的内部修改不反映传入参数。 Job job = Job.getInstance(conf, "word count"); //通过找到给定类的来源来设置jar job.setJarByClass(WordCount.class); job.setMapperClass(WordCount.TokenizerMapper.class); job.setCombinerClass(WordCount.IntSumReducer.class); job.setReducerClass(WordCount.IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for(int i = 0; i < otherArgs.length - 1; ++i) { //FileInputFormat:基于文件的输入格式的基类 //添加输入文件路径 FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } //FileOutputFormat:基于文件的输出格式的基类 //添加输出文件路径 FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true)?0:1); } /** * Reduce:减少一组中间值,这些值共享一组较小的值。 */ public static class IntSumReducer extends Reducer <Text, IntWritable, Text, IntWritable>{ private IntWritable result = new IntWritable(); public IntSumReducer(){ } public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException{ int sum = 0; IntWritable val = null; for(Iterator it = values.iterator(); it.hasNext(); sum += val.get()){ val = (IntWritable) it.next(); } this.result.set(sum); context.write(key, this.result); } } /** * Mapper:将输入的键/值对映射到一组中间键/值对 * 映射是将输入记录转换为中间记录的单个任务。转换后的 * 中间记录不需要与输入记录相同。给定的输入对可以映射到零或多个输出对。 */ public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ //IntWritable:一个可写的用于int型的。 //设置一个变量one为1. private static final IntWritable one = new IntWritable(1); //Text:该类使用标准UTF8编码存储文本。 private Text word = new Text(); public TokenizerMapper(){ } //map():为输入分割中的每个键/值对调用一次。 //大多数应用程序应该重写这个,但是默认是标识函数。 public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException{ // StringTokenizer这个类主要是把一个字符串按某个标记分段, //默认的情况下的分割符是空格 StringTokenizer itr = new StringTokenizer(value.toString()); while(itr.hasMoreTokens()){ this.word.set(itr.nextToken()); context.write(this.word, one); } } } }
功能:计算学生的平均成绩,每个文件包括所有的学生成绩,格式为 姓名 成绩,有多少个科目,就有多少个输入文件。
import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //import org.apache.hadoop.util.GenericOptionsParser; /** * 计算学生的平均成绩 * 学生成绩以每科一个文件输入 * 文件内容:姓名 成绩 * 例如: 小明 80 */ public class AverageScore { public AverageScore(){ } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); //设置文件输入输出路径 String[] otherArgs = new String[]{"/input1","/output1"}; //可以用来读取输入输出文件参数,这里采用上一行代码,手动设置路径 //String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if(otherArgs.length < 2){ System.err.println("请输入至少两个文件!"); System.exit(2); } //设置工作参数 Job job = Job.getInstance(conf,"Average Score"); job.setJarByClass(AverageScore.class); job.setMapperClass(AverageScore.AverageMapper.class); job.setCombinerClass(AverageScore.AverageReduce.class); job.setReducerClass(AverageScore.AverageReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FloatWritable.class); //输入文件路径 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //输出文件路径 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true)?0:1); } /* * map():将每个输入文件,将姓名和成绩分割开。 */ public static class AverageMapper extends Mapper<Object, Text, Text, FloatWritable>{ public AverageMapper(){ } @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); //按行进行划分 StringTokenizer tokens = new StringTokenizer(line,"\n"); while(tokens.hasMoreTokens()){ String tmp = tokens.nextToken(); //按空格进行划分 StringTokenizer sz = new StringTokenizer(tmp); String name = sz.nextToken(); float score = Float.valueOf(sz.nextToken()); Text outName = new Text(name); FloatWritable outScore = new FloatWritable(score); context.write(outName, outScore); } } } /** * reduce():将同一个学的各科成绩加起来,求平均数 */ public static class AverageReduce extends Reducer<Text, FloatWritable, Text, FloatWritable>{ public AverageReduce(){ } protected void reduce(Text key, Iterable<FloatWritable> value, Context context) throws IOException, InterruptedException { float sum = 0;//刚开始总分为0 int count = 0;//记录有几科成绩 Iterator<FloatWritable> it = value.iterator();//遍历成绩 //获取各科成绩进行累加 while(it.hasNext()){ sum += it.next().get(); count++; } //求出平均值 FloatWritable averageScore = new FloatWritable(sum/count); //写人文件 context.write(key,averageScore); } } }
功能:数据重复,map中每一行做为一个key,value值任意,经过shuffle之后输入到reduce中利用key的唯一性直接输出key。
数据:
file1.txt
2016-6-1 b
2016-6-2 a
2016-6-3 b
2016-6-4 d
2016-6-5 a
2016-6-6 c
2016-6-7 d
2016-6-3 c
file2.txt
2016-6-1 a
2016-6-2 b
2016-6-3 c
2016-6-4 d
2016-6-5 a
2016-6-6 b
2016-6-7 c
2016-6-3 c
源代码:
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //import org.apache.hadoop.util.GenericOptionsParser; /** * 数据去重 */ public class Dedup { public static class MyMapper extends Mapper<Object, Text, Text, Text>{ @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { //value:为每行数据 context.write(value, new Text("")); } } public static class MyReducer extends Reducer<Text, Text, Text, Text>{ @Override protected void reduce(Text key, Iterable<Text> value, Context context) throws IOException, InterruptedException { context.write(key, new Text("")); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{ Configuration conf = new Configuration(); //String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); String[] otherArgs = new String[]{"/input2","/output2"}; if(otherArgs.length<2){ System.out.println("parameter errors!"); System.exit(2); } Job job = Job.getInstance(conf, "Dedup"); job.setJarByClass(Dedup.class); job.setMapperClass(MyMapper.class); job.setCombinerClass(MyReducer.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true)?0:1); } }
程序运行后输入文件为:
2016-6-1 a
2016-6-1 b
2016-6-2 a
2016-6-2 b
2016-6-3 b
2016-6-3 c
2016-6-4 d
2016-6-5 a
2016-6-6 b
2016-6-6 c
2016-6-7 c
2016-6-7 d
以上内容为听华为大数据培训课程和大学MOOC上厦门大学 林子雨的《大数据技术原理与应用》课程而整理的笔记。
大数据技术原理与应用: https://www.icourse163.org/course/XMU-1002335004
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。