赞
踩
MapReduce的两个阶段中:第一个阶段的MapTask并发实例,完全并行运行,互不相干;第二个阶段的ReduceTask并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出。Reduce阶段需要等Map阶段处理完毕才会执行,所以大量数据要临时放在内存中。
案例:下面通过MapReduce实现统计一个文件中单词出现的频率。
```java package com.lzj.hadoop.wordcount; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /* * LongWritable - 表示读取第几行 * Text - 表示读取一行的内容 * Text - 表示输出的键 * IntWritable - 表示输出的键对应的个数 * */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ Text k = new Text(); //键 IntWritable v = new IntWritable(1); //值(值为1) /*context设置输出*/ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { /*1、获取读取一行的内容*/ String line = value.toString(); /*2、按空格切割读取的单词*/ String[] words = line.split(" "); /*3、输出mapper处理完的内容*/ for(String word : words) { /*给键设置值*/ k.set(word); /*把mapper处理后的键值对写到context中*/ context.write(k, v); } } }
package com.lzj.hadoop.wordcount; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /* * Text - 输入的键(即Mapper阶段输出的键) * IntWritable - 输入的值(个数)(即Mapper阶段输出的值) * Text - 输出的键 * IntWritable - 输出的值 * */ public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ /*text为输入的键,value为输入的内容*/ @Override protected void reduce(Text text, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { /*1、统计键对应的个数*/ int sum = 0; for(IntWritable value : values) { sum = sum + value.get(); } /*2、设置reducer的输出*/ IntWritable v = new IntWritable(); v.set(sum); context.write(text, v); } }
相当于yarn集群的客户端,用于提交我们整个程序到yarn集群,提交的是封装了MapReduce程序相关运行参数的job对象。
package com.lzj.hadoop.wordcount; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { /*1、获取job的配置信息*/ Configuration conf = new Configuration(); Job job = Job.getInstance(conf); /*2、设置jar的加载路径*/ job.setJarByClass(WordCountDriver.class); /*3、分别设置Mapper和Reducer类*/ job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); /*4、设置map的输出类型*/ job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); /*5、设置最终输出的键值类型*/ job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); /*6、设置输入输出路径*/ FileInputFormat.setInputPaths(job, new Path("D:/tmp/WordCountIn.txt")); FileOutputFormat.setOutputPath(job, new Path("D:/tmp/WordCountOut")); /*7、提交任务*/ boolean flag = job.waitForCompletion(true); System.out.println("flag : " + flag); } }
测试,WordCountIn.txt文件中内容为:
zhang xue you
xie ting feng
zhang xin zhe
yu cheng qing
xiong nai jin
bao jian feng
zhang jie
zhang san feng
执行程序之后,WordCountOut目录下,打开part-r-00000,内容如下:
bao 1 cheng 1 feng 3 jian 1 jie 1 jin 1 nai 1 qing 1 san 1 ting 1 xie 1 xin 1 xiong 1 xue 1 you 1 yu 1 zhang 4 zhe 1
下面把上述代码打包成jar,放到hadoop集群上进行运行,修改WordCountDriver类中输入文件和输出文件路径,设置成参数形式,便于测试
/*6、设置输入输出路径*/
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
通过maven打包成jar,pom依赖如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.lzj</groupId> <artifactId>hdfs</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.8</version> <scope>system</scope> <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin </artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.lzj.hadoop.wordcount.WordCountDriver</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
执行maven install, 把生成的jar重命名成wordcount.jar,放到集群上,然后把测试的输入文件WordCountIn.txt文件放置到HDFS上的/user/lzj目录下,下面在hadoop集群中执行该jar
hadoop jar wordcount.jar com.lzj.hadoop.wordcount.WordCountDriver /user/lzj/WordCountIn.txt /user/lzj/output
执行成功后会在hdfs上/user/lzj/output目录下生成结果。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。