赞
踩
MapReduce是一种可用于数据处理的编程模型
MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。
MapReduce的核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上
优点
MapReduce易于编程
良好的扩展性
高容错性
适合PB级以上海量数据的离线处理
缺点
不擅长实时计算
不擅长流式计算
不擅长DAG(有向无环图)计算
MapReduce任务过程分为两个处理阶段:
map阶段,第一阶段的MapTask并发实例,完全并行运行,互不相干
map阶段可以作为数据准备阶段,通过这种方式来准备数据,使reduce函数能够继续对它进行处理
map阶段还是一个比较适合出去已损记录的地方
reduce阶段,第二阶段的ReduceTask并发实例,互不相干,但是它们的输入数据依赖于上一阶段的所有MapTask并发实例的输出
(每阶段都以键值对作为输入和输出,其类型由程序员来选择。
程序员还需要写两个函数:map函数和reduce函数。)
MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能够多个MapReduce程序,串行运行。
一个完整的MapReduce程序在分布式运行时由三类实例进程:
MrAppMaster,负责整个程序的的过程调度及状态协调
MapTask,负责Map阶段的整个数据处理流程
ReduceTask,负责Reduce阶段的整个数据处理流程
用户编写的程序分为三个部分:Mapper、Reducer、Driver
Mapper阶段:
用户自定义的Mapper要继承自己的父类
Mapper的输入数据是KV对的形式
Mapper中的业务逻辑写在map()方法中
Mapper的输出数据是KV对的形式
map()方法(MapTask进程)对每一个K,V调用一次
Reducer阶段
用户自定义的Reducer,要继承自己的父类
Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
Reducer的业务逻辑写在reduce()方法中
reduce()方法(ReduceTask进程)对每一组相同k的<K,V>组调用一次reduce()方法
Driver阶段
相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象
用maven打jar包,显示BUILD SUCCESS
,就可以在target目录下,看到两个jar包,一个包含依赖,另一个不包含依赖
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
选择不包含依赖的jar包,修改jar包的名字(方便,改不改都行),上传到集群,启动hadoop集群
sbin/start-dfs.sh
sbin/start-yarn.sh
然后执行指令
注意,wcinput下,事先上传了测试数据
hadoop jar wc.jar mapreduce.wordcount.WordCountDriver /wcinput / wcoutput
执行成功以后,就会在hdfs的根目录下,发现一个新创建的wcoutput文件夹,其中的part-r-00000文件记录了统计结果
【Hadoop】序列化、反序列化、序列化案例实操(包括Windows本地运行,hadoop集群运行)
MapTask的并行度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度。
MapTask并行度决定机制
数据块:Block是HDFS物理上把数据分成一块一块。数据块是HDFS存储单位
数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask。
FileInputFormat切片机制
源码中,计算切片大小的公式:
Math.max(minSize,Math.min(maxSize,blockSize))
maxSize(切片最大值):参数如果调得比blockSize小,则会让切片变小,而且就等于配置得这个参数的值
minSize(切片最小值):参数调得比blockSize大,则可以让切片变得比blockSize还大
在运行MapReduce程序时,输入得文件格式包括:
针对不同的数据类型,MapReduce使用FileInputFormat接口的实现类来读取这些数据,FileInputFormat常见的接口实现类包括:
CombineTextInputFormat切片机制
默认的切片机制存在的问题:
框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小、都会是一个单独的切片,都会交给一个MapTask,如果有大量小文件,就会产生大量的MapTask,处理效率极其低下
CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多样多个小文件就可以交给一个MapTask处理
虚拟存储切片最大值设置:
CombineTextInputFormat.setMaxInputSplitSize(job,4194304);//4m
注意:虚拟存储切片最大值最好根据实际的小文件大小情况来设置具体的值
map方法之后,reduce方法之前的数据处理过程称之为Shuffle
注意:
MapReduce确保每个reducer的输入都是按键排序的。系统执行排序、将map输出作为输入传给reduce的过程称为shuffle。
map函数开始产生输出时,并不是简单地将它写到磁盘,而是利用缓冲的方式写到内存并出于效率的考虑进行预排序。
每个map任务都有一个环形内存缓冲区用于存储任务输出。在默认情况下,缓冲区的大小为100M。一旦缓冲区达到阈值(默认为80%),一个后台线程便开始把内容溢出到磁盘。在溢出写到磁盘过程中,map输出继续写到缓冲区,但如果在此期间缓冲区被填满,map会被阻塞,直到写磁盘过程完成。
在写磁盘之前,线程首先根据数据最终要传的reducer把数据划分成相应的分区。在每个分区中,后台线程按键进行内存中的排序,如果有一个combiner函数,它就在排序后的输出上运行。运行combiner函数使得map输出结果更紧凑,因此减少写到磁盘的数据和传递给reducer的数据。
每次内存缓冲区达到溢出阈值,就会新建一个溢出文件,因此在map任务写完其最后一个输出记录之后,会有几个溢出文件。在任务完成之前,溢出文件被合并成一个已分区且已排序的输出文件。
如果至少存在三个溢出文件,则combiner就会在输出文件写到磁盘之前再次运行。如果只有一到两个溢出文件,那么由于map输出规模减少,因而不值得调用combiner带来的开销,因此不会为该map输出再次运行combiner。
reduce任务需要在集群上若干个map任务的map输出作为其特殊的分区文件。每个map任务的完成时间可能不同,因此在每个任务完成时,reduce任务就开始复制其输出。--------------------复制阶段
Q:Reducer如何知道从哪台机器取得map输出
A:map任务成功完成后,它们会使用心跳机制通知它们的application master。对于指定作业,application master直到map输出和主机位置之间的映射关系。reducer中的一个线程定期询问master以便获取map输出主机的位置,直到获得所有输出位置。
如果map的输出相当小,会被复制到reduce任务JVM的内存,否则,map输出被复制到磁盘。
随着磁盘上副本增多,后台线程会将它们合并为更大的、排序好的文件。这会为后面的合并节省一些时间。
复制完所有map输出后,reduce任务进入排序阶段(更恰当的说法是合并阶段,因为排序是在map端进行的),这个节点将合并map的输出,并维持其顺序。
在reduce阶段,对已排序的每个键调用reduce函数,此阶段的输出直接写到输出文件系统,一般为HDFS。
排序是MapReduce框架中最重要的操作之一。
MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。
默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
它会将处理的结果暂时放到环形缓冲区,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,将这些有序数据溢写到磁盘,而当数据处理完毕后,它会对磁盘上的所有文件进行归并排序。
它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写到磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大的文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。
最终输出结果只有一个文件,且文件内部有序。实现方式是只设置了一个ReduceTask。但该方法在处理大型文件时,效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构
在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较,不相同的)的key进入同一个reduce方法时,可以采用分组排序。
在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。
自定义排序原理
bean对象作为key传输,需要实现WritableComparable接口,重写compareTo方法,就可以实现排序
把【Hadoop】序列化、反序列化、序列化案例实操(包括Windows本地运行,hadoop集群运行) 这个案例的输出作为输入
排序的规则是:
让FlowBean继承WritableComparable接口
在compareTo方法中编写排序的逻辑
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements WritableComparable<FlowBean> {
private long upFlow;
private long downFlow;
private long sumFlow;
public FlowBean() {
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
public void setSumFlow() {
this.sumFlow = this.upFlow + this.downFlow;
}
//重写toString方法
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
//实现序列化和反序列化方法,注意顺序一定要一致
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}
//二次排序
@Override
public int compareTo(FlowBean o) {
if (this.sumFlow > o.sumFlow){
//按照降序排列
return -1;
}else if (this.sumFlow < o.sumFlow){
return 1;
}else {
//在总流量相同的情况下,按上行流量的升序排序
if (this.upFlow > o.upFlow){
return 1;
}else if (this.upFlow < o.upFlow){
return -1;
}else {
return 0;
}
}
}
}
Mapper类中,map方法是要获取每一行的数据,提取出每个电话号码的总上行流量,总下行流量,总流量,封装到FlowBean对象中,然后context.write()写出,需要注意的是:
排序是对key进行排序,所以,Mapper的输出的key得是FlowBean,输出的Value是 Text类型的电话号码
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
private FlowBean outK = new FlowBean();
private Text outV = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, FlowBean, Text>.Context context) throws IOException, InterruptedException {
String str = value.toString();
String[] split = str.split("\t");
outK.setUpFlow(Long.parseLong(split[1]));
outK.setDownFlow(Long.parseLong(split[2]));
outK.setSumFlow();
outV.set(split[0]);
context.write(outK, outV);
}
}
相同的key会传入一个Reduce方法,总上行流量和总流量相同的FlowBean对象,会进入同一个reduce方法,为了避免出现有多个相同的情况,使用for循环
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Reducer<FlowBean, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
//在FlowBean中写了两个比较条件
//当总流量不同的时候,按照总流量的降序排列
//当总流量相同的时候,按照总上行流量的升序排列
//所以这里传进来的,应该是总流量和上行流量相同的
for (Text value : values) {
context.write(value,key);
}
}
}
Driver类中需要指出Map阶段的输出类型,和序列化、反序列化时的案例输出类型刚好颠倒
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(FlowDriver.class);
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}
在全排序案例的基础上,又增加了一个需求,即根据手机号的前三位数字,将最终的结果分别写入多个文件中
增加自定义分区类
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class PhonePartitoner extends Partitioner<FlowBean, Text> {
@Override
public int getPartition(FlowBean flowBean, Text text, int numPartitions) {
String prePhone = text.toString().substring(0, 3);
switch (prePhone) {
case "136":
return 0;
case "137":
return 1;
case "138":
return 2;
case "139":
return 3;
default:
return 4;
}
}
}
在驱动类中设置自定义分区类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(FlowDriver.class);
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//指定自定义分区的类
job.setPartitionerClass(PhonePartitoner.class);
job.setNumReduceTasks(5);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}
需要继承Reducer类,重写reduce方法
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable outV = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
outV.set(sum);
context.write(key, outV);
}
}
需求:统计过程中对每一个MapTask的输出进行局部汇总,以减小网络传输量,即采用Combiner功能。
期望:Combiner输入数据越多,输出时经过合并,输出数据降低。
具体步骤:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable outV = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
outV.set(sum);
context.write(key, outV);
}
}
job.setCombinerClass(WordCountCombiner.class);
OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。
下面是几种常见的OutputFormat实现类:
步骤:
1 需求
过滤输入的log日志,包含atguigu的网站输出到 atguigu.log 这个文件中,不包含atguigu的网站输出到other.log 文件中
2 输入数据
http://www.baidu.com
http://www.google.com
http://cn.bing.com
http://www.atguigu.com
http://www.sohu.com
http://www.sina.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sindsafa.com
3 输出数据
atguigu.log 文件中,包含
http://www.atguigu.com
other.log 文件中,包含
MapReduce默认对key进行排序,所以输出的内容是有序的,默认按照字典序
http://cn.bing.com
http://www.baidu.com
http://www.google.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sina.com
http://www.sindsafa.com
http://www.sohu.com
4 自定义一个OutputFormat类
创建一个类LogRecordWriter继承RecordWriter
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {
@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
return new LogRecordWriter(job);
}
}
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
public class LogRecordWriter extends RecordWriter<Text, NullWritable> {
private FSDataOutputStream atguiguOut;
private FSDataOutputStream otherOut;
public LogRecordWriter(TaskAttemptContext job) {
//获取文件系统对象
try {
FileSystem fs = FileSystem.get(job.getConfiguration());
//用文件系统创建两个流对应的目录
atguiguOut = fs.create(new Path("D:\\output\\log\\atguigu.log"));
otherOut = fs.create(new Path("D:\\output\\log\\other.log"));
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
String log = key.toString();
if (log.contains("atguigu")) {
atguiguOut.writeBytes(log + "\n");
} else {
otherOut.writeBytes(log + "\n");
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
//关流
IOUtils.closeStream(atguiguOut);
IOUtils.closeStream(otherOut);
}
}
5 驱动类
要将自定义的输出格式组件设置到job中
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class LogDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(LogDriver.class);
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//设置自定义的outputformat
job.setOutputFormatClass(LogOutputFormat.class);
//设置的输入和输出路径
FileInputFormat.setInputPaths(job, new Path("D:\\input\\log\\"));
//虽然定义了outputformat,但是因为自定义的outputformat继承自fileoutputformat
//而fileoutputformat需要输出一个_SUCCESS文件,所以还是得指定一个输出目录
FileOutputFormat.setOutputPath(job, new Path("D:\\output\\log\\"));
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}
MapTask可分为五个阶段:
Read阶段
Map阶段
Collect收集阶段
Spill(溢写)阶段
Merge阶段
当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index
在文件合并的过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并mapreduce.task.io.sort.factor(默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件
让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销
溢写阶段详情
利用快速排序算法对缓存区内的数据进行排序,排序方式是:先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key排序。
按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写的次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。
将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括:在临时文件中的偏移量、压缩前数据大小、压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到output/spillN.out.index中。
ReduceTask分为三个阶段:
Copy阶段
Sort阶段
Reduce阶段
ETL Extract-Transform-Load,用来描述将数据从来源端经过抽取(Extract)、转换(Transform)、加载(Load)至目的端的过程。ETL一词较常用在数据仓库,但其对象并不限于数据仓库
在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只运行Mapper程序,不需要运行Reduce程序。
需求 : 去除日志中字段个数小于等于11的日志
期望 : 每行字段长度都大于11
需求分析 : 需要在Map阶段对输入的数据根据规则进行过滤清洗
Mapper
package mapreduce.etl;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WebLogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
String line = value.toString();
boolean res = parseLog(line, context);
if (!res) {
return;
}
context.write(value, NullWritable.get());
}
private boolean parseLog(String line, Context context) {
String[] split = line.split(" ");
return split.length > 11;
}
}
Driver
package mapreduce.etl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WebLogDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
args = new String[]{"D:\\input\\etl\\", "D:\\output\\etl\\"};
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(WebLogDriver.class);
job.setMapperClass(WebLogMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//前者表示运行进度信息将输出给用户,后者表示仅仅等待作业结束.
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。