赞
踩
目录
性别分区器 Partitioner的泛型kv类型是map阶段输出的kv类型
案例
计算用户消费的总金额,根据用户的不同性别,将结果输出到不同的文件中
在之前的基础上添加
SexPartitioner
入口文件示例
- package com.igeekhome.mapreduce.sale;
-
- import com.igeekhome.mapreduce.model.UserSale;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Partitioner;
-
- /**
- * 性别分区器
- * Partitioner的泛型kv类型是map阶段输出的kv类型
- */
- public class SexPartitioner extends Partitioner<Text, UserSale>{
- @Override
- public int getPartition(Text text, UserSale userSale, int i) {
- //获取用户的性别
- String sex = userSale.getSex();
- //设置分区号
- int partitionNum = 0;
- if (sex.equals("男")){
- partitionNum = 0;
- } else if (sex.equals("女")) {
- partitionNum = 1;
- }else{
- partitionNum = 2;
- }
- return partitionNum;
- }
- }

其中返回的0,1,2表示最终输出文件的
- package com.igeekhome.mapreduce.sale;
-
- import com.igeekhome.mapreduce.model.UserSale;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
- import java.io.IOException;
-
- public class SaleDriver {
- public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
- //1.获取配置对象和job对象
- Configuration conf = new Configuration();
- Job job = Job.getInstance(conf);
- //2.设置Driver类对象
- job.setJarByClass(SaleDriver.class);
- //3.设置mapper和reducer类对象
- job.setMapperClass(SaleMapper.class);
- job.setReducerClass(SaleReducer.class);
- //4.设置map阶段输出的kv对象
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(UserSale.class);
- //5.设置最终输出的kv类对象
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(UserSale.class);
- //6.设置读取文件的路径 和 输出文件的路径
- FileInputFormat.setInputPaths(job,new Path("D:\\code\\sale_details (1).txt"));
- FileOutputFormat.setOutputPath(job,new Path("D:\\code\\output"));

-
- //7.设置自定义分区和reduce task的个数
- job.setPartitionerClass(SexPartitioner.class);
- job.setNumReduceTasks(3);
-
- //8.提交job
- boolean result = job.waitForCompletion(true);
- System.out.println(result?"计算成功":"计算失败");
- }
-
- }
计算结果
part-r-00000
part-r-00001
part-r-00002
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。