赞
踩
Map端的主要工作:为来自不同表或文件的key/value对,`打标签以区别不同来源的记录`。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些`来源于不同文件的记录(在Map阶段已经打标志)分开`,最后进行合并就ok了。
1001 01 1
1002 02 2
1003 03 3
1004 01 4
1005 02 5
1006 03 6
id | pid | amount |
---|---|---|
1001 | 01 | 1 |
1002 | 02 | 2 |
1003 | 03 | 3 |
1004 | 01 | 4 |
1005 | 02 | 5 |
1006 | 03 | 6 |
01 小米
02 华为
03 格力
pid | pname |
---|---|
01 | 小米 |
02 | 华为 |
03 | 格力 |
id | pname | amount |
---|---|---|
1001 | 小米 | 1 |
1004 | 小米 | 4 |
1002 | 华为 | 2 |
1005 | 华为 | 5 |
1003 | 格力 | 3 |
1006 | 格力 | 6 |
![(pho\reducejoin案例需求分析.png)
package com.saddam.bigdata.ShangGuiGu.Join; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class TableBean implements Writable { private String order_id; // 订单id private String p_id; // 产品id private int amount; // 产品数量 private String pname; // 产品名称 private String flag; // 表的标记 public TableBean() { super(); } public TableBean(String order_id, String p_id, int amount, String pname, String flag) { this.order_id = order_id; this.p_id = p_id; this.amount = amount; this.pname = pname; this.flag = flag; } public String getOrder_id() { return order_id; } public void setOrder_id(String order_id) { this.order_id = order_id; } public String getP_id() { return p_id; } public void setP_id(String p_id) { this.p_id = p_id; } public int getAmount() { return amount; } public void setAmount(int amount) { this.amount = amount; } public String getPname() { return pname; } public void setPname(String pname) { this.pname = pname; } public String getFlag() { return flag; } public void setFlag(String flag) { this.flag = flag; } @Override public String toString() { return order_id+"\t"+amount+"\t"+pname; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(order_id); out.writeUTF(p_id); out.writeInt(amount); out.writeUTF(pname); out.writeUTF(flag); } @Override public void readFields(DataInput in) throws IOException { order_id=in.readUTF(); p_id=in.readUTF(); amount=in.readInt(); pname=in.readUTF(); flag=in.readUTF(); } }
package com.saddam.bigdata.ShangGuiGu.Join; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; public class TableMapper extends Mapper<LongWritable,Text,Text,TableBean> { String name; Text outK=new Text(); TableBean tableBean=new TableBean(); @Override protected void setup(Context context) throws IOException, InterruptedException { FileSplit fileSplit=(FileSplit)context.getInputSplit(); name = fileSplit.getPath().getName(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //获取一行数据 String line = value.toString(); //分文件处理 if (name.startsWith("order")){//order.txt /* 1001 01 1 1002 02 2 */ String[] fields = line.split("\t"); /* private String order_id; // 订单id private String p_id; // 产品id private int amount; // 产品数量 private String pname; // 产品名称 private String flag; // 表的标记 */ //封装对象 tableBean.setOrder_id(fields[0]); tableBean.setP_id(fields[1]); tableBean.setAmount(Integer.parseInt(fields[2])); tableBean.setPname(""); tableBean.setFlag("order"); //p_id作为key 两个文件都有 outK.set(fields[1]); }else {//pd.txt /* 01 小米 02 华为 03 格力 */ String[] fields=line.split("\t"); tableBean.setOrder_id(""); tableBean.setP_id(fields[0]); tableBean.setAmount(0); tableBean.setPname(fields[1]); tableBean.setFlag("pd"); outK.set(fields[0]); } context.write(outK,tableBean); } }
package com.saddam.bigdata.ShangGuiGu.Join; import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; public class TableReducer extends Reducer<Text,TableBean,TableBean, NullWritable> { @Override protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException { // 1准备存储订单的集合 ArrayList<TableBean> orderBeans = new ArrayList<>(); // 2 准备bean对象 TableBean pdBean=new TableBean(); for(TableBean bean:values){ if ("order".equals(bean.getFlag())){ TableBean orderBean=new TableBean(); try { BeanUtils.copyProperties(orderBean, bean); } catch (Exception e) { e.printStackTrace(); } orderBeans.add(orderBean); }else { try { BeanUtils.copyProperties(pdBean, bean); } catch (Exception e) { e.printStackTrace(); } } } 3 表的拼接 for(TableBean bean:orderBeans){ bean.setPname (pdBean.getPname()); // 4 数据写出去 context.write(bean, NullWritable.get()); } } }
package com.saddam.bigdata.ShangGuiGu.Join; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.log4j.BasicConfigurator; public class TableDriver{ public static void main(String[] args)throws Exception { BasicConfigurator.configure(); // 1 获取配置信息,或者job对象实例 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2 指定本程序的jar包所在的本地路径 job.setJarByClass(TableDriver.class); // 3 指定本业务job要使用的Mapper/Reducer业务类 job.setMapperClass(TableMapper.class); job.setReducerClass(TableReducer.class); // 4 指定Mapper输出数据的kv类型 job.setMapOutputKeyClass(Text .class); job.setMapOutputValueClass(TableBean.class); // 5 指定最终输出的数据的kv类型 job.setOutputKeyClass(TableBean.class); job.setOutputValueClass(NullWritable .class); // 6 指定job的输入原始文件所在目录 String inputPath1 ="D:\\MR\\MapReduce\\InputDatas\\Reduce Join\\order"; String inputPath2 ="D:\\MR\\MapReduce\\InputDatas\\Reduce Join\\pd"; String outputPath ="D:\\MR\\MapReduce\\OutputDatas\\output_join"; FileInputFormat.setInputPaths(job,new Path(inputPath1),new Path(inputPath2)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
Map Join适用于一张表十分小、一张表很大的场景。
思考:在Reduce端处理过多的表,非常容易产生数据倾斜。怎么办?
在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。
采用DistributedCache
(1)在Mapper的setup阶段,将文件读取到缓存集合中。
(2)在驱动函数中加载缓存。
// 缓存普通文件到Task运行节点。
job.addCacheFile(new URI("file://e:/cache/pd.txt"));
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。