当前位置:   article > 正文

MapReduce总结 + 相关Hadoop权威指南读书笔记(未完......欢迎补充,互相学习)_hadoop相关书籍读书笔记

hadoop相关书籍读书笔记


MapReduce概述

MapReduce是一种可用于数据处理的编程模型

MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。

MapReduce的核心功能是将用户编写的业务逻辑代码自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上

MapReduce优缺点

优点

  1. MapReduce易于编程

    • 简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价PC机器上运行。
  2. 良好的扩展性

    • 当计算资源不能得到满足的时候,可以通过简单地增加机器来扩展它的计算能力。
  3. 高容错性

    • MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,这就要求它具有很高的容错性。比如,其中其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成的。
  4. 适合PB级以上海量数据的离线处理

    • 可实现上千台服务器集群的并发工作,提供数据处理能力

缺点

  1. 不擅长实时计算

    • MapReduce无法像MySQL一样,在毫秒或者秒级内返回结果
  2. 不擅长流式计算

    • 流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的
  3. 不擅长DAG(有向无环图)计算

    • 多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下

MapReduce核心思想

MapReduce任务过程分为两个处理阶段:

  • map阶段,第一阶段的MapTask并发实例,完全并行运行,互不相干

    • map阶段可以作为数据准备阶段,通过这种方式来准备数据,使reduce函数能够继续对它进行处理

    • map阶段还是一个比较适合出去已损记录的地方

  • reduce阶段,第二阶段的ReduceTask并发实例,互不相干,但是它们的输入数据依赖于上一阶段的所有MapTask并发实例的输出

(每阶段都以键值对作为输入和输出,其类型由程序员来选择。

程序员还需要写两个函数:map函数和reduce函数。)

MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能够多个MapReduce程序,串行运行。

MapReduce进程

一个完整的MapReduce程序在分布式运行时由三类实例进程:

  • MrAppMaster,负责整个程序的的过程调度及状态协调

  • MapTask,负责Map阶段的整个数据处理流程

  • ReduceTask,负责Reduce阶段的整个数据处理流程

MapReduce编程规范

用户编写的程序分为三个部分:Mapper、Reducer、Driver

Mapper阶段

  1. 用户自定义的Mapper要继承自己的父类

  2. Mapper的输入数据是KV对的形式

    • 一般 K 是偏移量,V是这一行的内容
  3. Mapper中的业务逻辑写在map()方法中

  4. Mapper的输出数据是KV对的形式

    • K、V的类型与计算逻辑有关系
  5. map()方法(MapTask进程)对每一个K,V调用一次

    • 一般K 是偏移量,V是这一行的内容

Reducer阶段

  1. 用户自定义的Reducer,要继承自己的父类

  2. Reducer的输入数据类型对应Mapper的输出数据类型,也是KV

  3. Reducer的业务逻辑写在reduce()方法中

  4. reduce()方法(ReduceTask进程)对每一组相同k的<K,V>组调用一次reduce()方法

Driver阶段

相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象

WordCount 案例实操

本地测试

WordCount Windows本地案例实操

集群测试

用maven打jar包,显示BUILD SUCCESS,就可以在target目录下,看到两个jar包,一个包含依赖,另一个不包含依赖

<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.6.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

选择不包含依赖的jar包,修改jar包的名字(方便,改不改都行),上传到集群,启动hadoop集群

sbin/start-dfs.sh
sbin/start-yarn.sh
  • 1
  • 2

然后执行指令
注意,wcinput下,事先上传了测试数据

hadoop jar wc.jar mapreduce.wordcount.WordCountDriver /wcinput / wcoutput
  • 1

执行成功以后,就会在hdfs的根目录下,发现一个新创建的wcoutput文件夹,其中的part-r-00000文件记录了统计结果

Hadoop序列化

【Hadoop】序列化、反序列化、序列化案例实操(包括Windows本地运行,hadoop集群运行)

MapReduce框架原理

InputFormat数据输入

切片与MapTask并行度决定机制

MapTask的并行度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度。

MapTask并行度决定机制

数据块:Block是HDFS物理上把数据分成一块一块。数据块是HDFS存储单位

数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask。

  • 一个Job的Map阶段并行度由客户端在提交Job时的切片数决定
  • 每一个split切片分配一个MapTask并行实时处理
  • 默认情况下,切片大小=BlockSize
  • 切片时不考虑数据整体,而是逐个针对每一个文件单独切片

FileInputFormat切片机制

  1. 简单地按照文件的内容长度进行切片
  2. 切片大小,默认等于Block大小
  3. 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片

源码中,计算切片大小的公式:

Math.max(minSize,Math.min(maxSize,blockSize))
  • 1

maxSize(切片最大值):参数如果调得比blockSize小,则会让切片变小,而且就等于配置得这个参数的值

minSize(切片最小值):参数调得比blockSize大,则可以让切片变得比blockSize还大

在运行MapReduce程序时,输入得文件格式包括:

  • 基于行的日志文件
  • 二进制格式文件
  • 数据库表

针对不同的数据类型,MapReduce使用FileInputFormat接口的实现类来读取这些数据,FileInputFormat常见的接口实现类包括:

  • TextInputFormat
    • 默认的FileInputFormat实现类
    • 按行读取每条记录
    • 键是存储该行在整个文件中的起始字节偏移量,LongWritable类型
    • 值是这行的内容,不包括任何行终止符(回车换行符)
  • KeyValueInputFormat
  • NLineInputFormat
  • CombineTextInputFormat
  • 自定义InputFormat

CombineTextInputFormat切片机制

默认的切片机制存在的问题:

框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小、都会是一个单独的切片,都会交给一个MapTask,如果有大量小文件,就会产生大量的MapTask,处理效率极其低下

CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多样多个小文件就可以交给一个MapTask处理

虚拟存储切片最大值设置:

CombineTextInputFormat.setMaxInputSplitSize(job,4194304);//4m
  • 1

注意:虚拟存储切片最大值最好根据实际的小文件大小情况来设置具体的值

Shuffle过程

map方法之后,reduce方法之前的数据处理过程称之为Shuffle

  1. MapTask收集map()方法输出的kv对,放到内存缓冲区中
  2. 从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
  3. 多个溢出文件会被合并成大的溢出文件
  4. 在溢出过程及合并的过程中,都要调用Partition进行分区和针对key进行排序
  5. ReduceTask根据自己的分区号,去各个MapTask机器上拉取相应的结果分区数据
  6. Reducetask会抓取到同一个分区的来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序)
  7. 合并成大文件后,Shuffle的过程也就结束了,后面进去ReduceTask的逻辑运算过程(从文件中取出一个一个的键值对Group,调用用户自定义的reduce()方法)

注意

  • Shuffle中的缓冲区大小会影响到MapReduce程序的执行效率,原则上说,缓冲区越大,磁盘IO的次数越少,执行速度就越快
  • 缓冲区大小可以通过参数调整,参数:mapreduce.task.io.sort.mb,默认100M

Hadoop权威指南 中这样解释Shuffle

MapReduce确保每个reducer的输入都是按键排序的。系统执行排序、将map输出作为输入传给reduce的过程称为shuffle。

map端

map函数开始产生输出时,并不是简单地将它写到磁盘,而是利用缓冲的方式写到内存并出于效率的考虑进行预排序

每个map任务都有一个环形内存缓冲区用于存储任务输出。在默认情况下,缓冲区的大小为100M。一旦缓冲区达到阈值(默认为80%),一个后台线程便开始把内容溢出到磁盘。在溢出写到磁盘过程中,map输出继续写到缓冲区,但如果在此期间缓冲区被填满,map会被阻塞,直到写磁盘过程完成。

写磁盘之前,线程首先根据数据最终要传的reducer把数据划分成相应的分区。在每个分区中,后台线程按键进行内存中的排序,如果有一个combiner函数,它就在排序后的输出上运行。运行combiner函数使得map输出结果更紧凑,因此减少写到磁盘的数据和传递给reducer的数据。

每次内存缓冲区达到溢出阈值,就会新建一个溢出文件,因此在map任务写完其最后一个输出记录之后,会有几个溢出文件。在任务完成之前,溢出文件被合并成一个已分区且已排序的输出文件

如果至少存在三个溢出文件,则combiner就会在输出文件写到磁盘之前再次运行。如果只有一到两个溢出文件,那么由于map输出规模减少,因而不值得调用combiner带来的开销,因此不会为该map输出再次运行combiner。

reduce端

reduce任务需要在集群上若干个map任务的map输出作为其特殊的分区文件。每个map任务的完成时间可能不同,因此在每个任务完成时,reduce任务就开始复制其输出。--------------------复制阶段

Q:Reducer如何知道从哪台机器取得map输出
A:map任务成功完成后,它们会使用心跳机制通知它们的application master。对于指定作业,application master直到map输出和主机位置之间的映射关系。reducer中的一个线程定期询问master以便获取map输出主机的位置,直到获得所有输出位置。

如果map的输出相当小,会被复制到reduce任务JVM的内存,否则,map输出被复制到磁盘。

随着磁盘上副本增多,后台线程会将它们合并为更大的、排序好的文件。这会为后面的合并节省一些时间。

复制完所有map输出后,reduce任务进入排序阶段(更恰当的说法是合并阶段,因为排序是在map端进行的),这个节点将合并map的输出,并维持其顺序。

在reduce阶段,对已排序的每个键调用reduce函数,此阶段的输出直接写到输出文件系统,一般为HDFS。

Partition分区

  • 如果ReduceTask的数量 > getPartition的结果数,则会产生几个空的输出文件part-r-000xx
  • 如果1 < ReduceTask数量 < getPartition的结果数,则有一部分分区数据无处安放,会报错
  • 如果ReduceTask数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给一个ReduceTask,最终也只会产生一个结果文件 part-r-00000
  • 分区号必须从0开始,逐一累加

WritableComparable 排序

排序是MapReduce框架中最重要的操作之一。

MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。

默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。

对于MapTask:

它会将处理的结果暂时放到环形缓冲区,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,将这些有序数据溢写到磁盘,而当数据处理完毕后,它会对磁盘上的所有文件进行归并排序。

对于ReduceTask

它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写到磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大的文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。

排序分类

部分排序

MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。

全排序

最终输出结果只有一个文件,且文件内部有序。实现方式是只设置了一个ReduceTask。但该方法在处理大型文件时,效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构

辅助排序

在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较,不相同的)的key进入同一个reduce方法时,可以采用分组排序。

二次排序

在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。

自定义排序原理

bean对象作为key传输,需要实现WritableComparable接口,重写compareTo方法,就可以实现排序

全排序案例

全排序案例 - 视频教程

【Hadoop】序列化、反序列化、序列化案例实操(包括Windows本地运行,hadoop集群运行) 这个案例的输出作为输入

排序的规则是:

  • 当总流量不同时,按照总流量的降序排列
  • 当总流量相同时,按照总上行流量升序排列

让FlowBean继承WritableComparable接口

在compareTo方法中编写排序的逻辑

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements WritableComparable<FlowBean> {

    private long upFlow;
    private long downFlow;
    private long sumFlow;

    public FlowBean() {
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    public void setSumFlow() {
        this.sumFlow = this.upFlow + this.downFlow;
    }

    //重写toString方法
    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

    //实现序列化和反序列化方法,注意顺序一定要一致
    @Override
    public void write(DataOutput out) throws IOException {

        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);

    }

    @Override
    public void readFields(DataInput in) throws IOException {

        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();

    }

    //二次排序
    @Override
    public int compareTo(FlowBean o) {
        if (this.sumFlow > o.sumFlow){
            //按照降序排列
            return -1;
        }else if (this.sumFlow < o.sumFlow){
            return 1;
        }else {
            //在总流量相同的情况下,按上行流量的升序排序
            if (this.upFlow > o.upFlow){
                return 1;
            }else if (this.upFlow < o.upFlow){
                return -1;
            }else {
                return 0;
            }
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90

Mapper类中,map方法是要获取每一行的数据,提取出每个电话号码的总上行流量,总下行流量,总流量,封装到FlowBean对象中,然后context.write()写出,需要注意的是:

排序是对key进行排序,所以,Mapper的输出的key得是FlowBean,输出的Value是 Text类型的电话号码

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> {

    private FlowBean outK = new FlowBean();
    private Text outV = new Text();

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, FlowBean, Text>.Context context) throws IOException, InterruptedException {

        String str = value.toString();

        String[] split = str.split("\t");

        outK.setUpFlow(Long.parseLong(split[1]));
        outK.setDownFlow(Long.parseLong(split[2]));
        outK.setSumFlow();

        outV.set(split[0]);

        context.write(outK, outV);

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

相同的key会传入一个Reduce方法,总上行流量和总流量相同的FlowBean对象,会进入同一个reduce方法,为了避免出现有多个相同的情况,使用for循环

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> {

    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Reducer<FlowBean, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {

        //在FlowBean中写了两个比较条件
        //当总流量不同的时候,按照总流量的降序排列
        //当总流量相同的时候,按照总上行流量的升序排列
        //所以这里传进来的,应该是总流量和上行流量相同的

        for (Text value : values) {
            context.write(value,key);
        }

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

Driver类中需要指出Map阶段的输出类型,和序列化、反序列化时的案例输出类型刚好颠倒

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        job.setJarByClass(FlowDriver.class);

        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean res = job.waitForCompletion(true);
        System.exit(res ? 0 : 1);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

区内排序案例

在全排序案例的基础上,又增加了一个需求,即根据手机号的前三位数字,将最终的结果分别写入多个文件中

增加自定义分区类

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class PhonePartitoner extends Partitioner<FlowBean, Text> {
    @Override
    public int getPartition(FlowBean flowBean, Text text, int numPartitions) {
        String prePhone = text.toString().substring(0, 3);

        switch (prePhone) {
            case "136":
                return 0;
            case "137":
                return 1;
            case "138":
                return 2;
            case "139":
                return 3;
            default:
                return 4;
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

在驱动类中设置自定义分区类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        job.setJarByClass(FlowDriver.class);

        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        //指定自定义分区的类
        job.setPartitionerClass(PhonePartitoner.class);
        job.setNumReduceTasks(5);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean res = job.waitForCompletion(true);
        System.exit(res ? 0 : 1);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

Combiner合并

  • Combiner是MR程序中Mapper和Reducer之外的一种组件
  • Combiner组件的父类就是Reducer
  • Combiner和Reducer的区别在于运行的位置
    • Combiner是在每一个MapTask所在的节点运行
    • Reducer是接收全局所有Mapper的输出结果
  • Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输
  • Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner输出的KV应该跟Reducer的输出KV类型要对应起来

自定义Combiner类

需要继承Reducer类,重写reduce方法

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable outV = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        outV.set(sum);
        context.write(key, outV);

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

Combiner合并案例

需求:统计过程中对每一个MapTask的输出进行局部汇总,以减小网络传输量,即采用Combiner功能。

期望:Combiner输入数据越多,输出时经过合并,输出数据降低。

具体步骤:

  1. 在已有的WordCount基础上,增加一个Combiner类,继承Reducer
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable outV = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        outV.set(sum);
        context.write(key, outV);

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  1. 在WordCountDriver中驱动类中,指定Combiner
job.setCombinerClass(WordCountCombiner.class);
  • 1

OutputFormat数据输出

OutputFormat接口实现类

OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。

下面是几种常见的OutputFormat实现类:

  • MapFileOuputFormat
  • SequenceFileOutputFormat
  • TextOutputFormat
  • DBOutputFormat

自定义OutputFormat

步骤:

  1. 自定义一个类继承FileOutputFormat
  2. 改写RecordWriter,具体改写输出数据的方法write()

自定义OutputFormat案例

1 需求

过滤输入的log日志,包含atguigu的网站输出到 atguigu.log 这个文件中,不包含atguigu的网站输出到other.log 文件中

2 输入数据

http://www.baidu.com
http://www.google.com
http://cn.bing.com
http://www.atguigu.com
http://www.sohu.com
http://www.sina.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sindsafa.com
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

3 输出数据

atguigu.log 文件中,包含

http://www.atguigu.com
  • 1

other.log 文件中,包含

MapReduce默认对key进行排序,所以输出的内容是有序的,默认按照字典序

http://cn.bing.com
http://www.baidu.com
http://www.google.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sina.com
http://www.sindsafa.com
http://www.sohu.com
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

4 自定义一个OutputFormat类

创建一个类LogRecordWriter继承RecordWriter

  • 创建两个文件的输出流:atguiguOut、otherOut
  • 如果输出数据包含atguigu,输出到atguiguOut流
  • 如果输出数据不包含atguigu,输出到otherOut流
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {
    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {

        return new LogRecordWriter(job);

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

public class LogRecordWriter extends RecordWriter<Text, NullWritable> {

    private FSDataOutputStream atguiguOut;
    private FSDataOutputStream otherOut;

    public LogRecordWriter(TaskAttemptContext job) {
        //获取文件系统对象
        try {
            FileSystem fs = FileSystem.get(job.getConfiguration());
            //用文件系统创建两个流对应的目录
            atguiguOut = fs.create(new Path("D:\\output\\log\\atguigu.log"));
            otherOut = fs.create(new Path("D:\\output\\log\\other.log"));

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void write(Text key, NullWritable value) throws IOException, InterruptedException {

        String log = key.toString();

        if (log.contains("atguigu")) {
            atguiguOut.writeBytes(log + "\n");
        } else {
            otherOut.writeBytes(log + "\n");
        }

    }

    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {

        //关流
        IOUtils.closeStream(atguiguOut);
        IOUtils.closeStream(otherOut);

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52

5 驱动类

要将自定义的输出格式组件设置到job中

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class LogDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(LogDriver.class);
        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        //设置自定义的outputformat
        job.setOutputFormatClass(LogOutputFormat.class);

        //设置的输入和输出路径
        FileInputFormat.setInputPaths(job, new Path("D:\\input\\log\\"));
        
        //虽然定义了outputformat,但是因为自定义的outputformat继承自fileoutputformat
        //而fileoutputformat需要输出一个_SUCCESS文件,所以还是得指定一个输出目录
        FileOutputFormat.setOutputPath(job, new Path("D:\\output\\log\\"));

        boolean res = job.waitForCompletion(true);
        System.exit(res ? 0 : 1);

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

MapTask工作机制

MapTask可分为五个阶段:

  • Read阶段

    • MapTask通过InputFormat获得RecordReader,从输入InputSplit中解析出一个个key/value
  • Map阶段

    • 该阶段主要是将解析出的key/value交给用户编写的map()函数处理,并产生一系列新的key/value
  • Collect收集阶段

    • 在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它将会生成key/value分区(调用Patitioner),并写入一个环形内存缓冲区中
  • Spill(溢写)阶段

    • 当缓冲区满后,或所有文件全部读取完之后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,需要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
  • Merge阶段

    • 当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。

    • 当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index

    • 在文件合并的过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并mapreduce.task.io.sort.factor(默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件

    • 让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销

溢写阶段详情

  1. 利用快速排序算法对缓存区内的数据进行排序,排序方式是:先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key排序。

  2. 按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写的次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。

  3. 将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括:在临时文件中的偏移量、压缩前数据大小、压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到output/spillN.out.index中。

ReduceTask工作机制

ReduceTask分为三个阶段:

  • Copy阶段

    • ReduceTask从各个MapTask上远程拷贝一片数据(这是ReduceTask主动拉取的过程),并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中
  • Sort阶段

    • 在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用基于排序的策略。由于各个MapTask已经实现了对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。
  • Reduce阶段

    • reduce()函数将计算结果写到HDFS上。

数据清洗

ETL Extract-Transform-Load,用来描述将数据从来源端经过抽取(Extract)、转换(Transform)、加载(Load)至目的端的过程。ETL一词较常用在数据仓库,但其对象并不限于数据仓库

在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只运行Mapper程序,不需要运行Reduce程序

数据清洗案例

需求 : 去除日志中字段个数小于等于11的日志

期望 : 每行字段长度都大于11

需求分析 : 需要在Map阶段对输入的数据根据规则进行过滤清洗

Mapper

package mapreduce.etl;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WebLogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {

        String line = value.toString();

        boolean res = parseLog(line, context);

        if (!res) {
            return;
        }

        context.write(value, NullWritable.get());

    }

    private boolean parseLog(String line, Context context) {

        String[] split = line.split(" ");
        return split.length > 11;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

Driver

package mapreduce.etl;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WebLogDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        args = new String[]{"D:\\input\\etl\\", "D:\\output\\etl\\"};

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(WebLogDriver.class);

        job.setMapperClass(WebLogMapper.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        job.setNumReduceTasks(0);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //前者表示运行进度信息将输出给用户,后者表示仅仅等待作业结束.
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/865704
推荐阅读
相关标签
  

闽ICP备14008679号