MapReduce增强


MapReduce的分区与reduceTask的数量

  • 概述

    MapReduce当中的分区:物以类聚,人以群分。相同key的数据,去往同一个reduce。

    ReduceTask的数量默认为一个,可以自己设定数量  job.setNumRudeceTasks(3)

    分区决定了我们的数据该去往哪一个ReduceTask里面去
  • 用代码实现

    注意:分区的案例,只能打成jar包发布到集群上面去运行,本地模式已经不能正常运行了

定义一个map类

package cn.itcast.mr.demo1;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class PartitionMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//输出k2,v2 k2是一行文本数据,v2为NullWritable
context.write(value, NullWritable.get());
}
}

定义一个reduce类

package cn.itcast.mr.demo1;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class PartitionReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}

自定义Partitioner

package cn.itcast.mr.demo1;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner; public class PartitionOwn extends Partitioner<Text, NullWritable> {
/**
* 这个方法决定了数据去往哪一个reduce
*
* @param text k2
* @param nullWritable v2
* @param numReduceTask
* @return
*/
@Override
public int getPartition(Text text, NullWritable nullWritable, int numReduceTask) {
//以"\t"切分k2的数据
String result = text.toString().split("\t")[5];
System.out.println(result);
//判断结果值大于15去往一个分区,小于15去往一个分区
if (Integer.parseInt(result) > 15) {
return 1;
} else {
return 0;
}
}
}

程序main函数入口

package cn.itcast.mr.demo1;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import java.io.IOException; public class PartitionMain extends Configured implements Tool { public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new PartitionMain(), args);
System.exit(run);
} @Override
public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//获取我们的job对象,封装我们的job任务
Job job = Job.getInstance(super.getConf(), "myPartition");
//打成jar包运行时必备
job.setJarByClass(PartitionMain.class); //第一步:读取文件,解析成k1,v1
job.setInputFormatClass(TextInputFormat.class);
//设置输入类型
TextInputFormat.setInputPaths(job, new Path("hdfs://node01:8020/partitionin")); //第二步:自定义map逻辑,接收k1,v1,转换成k2,v2
job.setMapperClass(PartitionMapper.class);
//设置k2,v2类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class); //第三步:分区 相同key的数据发送到同一个reduce中去
job.setPartitionerClass(PartitionOwn.class); //第四步到第六步省略 //第七步:自定义reduce逻辑
job.setReducerClass(PartitionReducer.class);
//设置k3,v3类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class); //设置reduceTask的数量
//如果reduceTask的数量比分区多,就会有空文件
//如果reduceTask的数量比分区少,就会有一个reduce处理更多的数据
job.setNumReduceTasks(2); job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("hdfs://node01:8020/partition_out")); //提交任务
boolean b = job.waitForCompletion(true);
return b ? 0 : 1;
}
}

排序以及序列化

  • 概述

    序列化(Serialization)是指把结构化对象转化为字节流

    反序列化(Deserialization)是序列化的逆过程把字节流转为结构化对象

    Mapreduce是按照字典顺序对k2的值进行排序。Hadoop没有沿用java的serialize方式实现序列化,可以用自己的writable接口实现序列化。

    实现Writable可以进行序列化,实现Comparable可以进行排序,如果想既实现序列话,又进行排序,可以同时实现Writable和Comparable,或者实现WritableComparable

    如果以一行文本内容作为k2,不能够实现二次排序的功能,这时可以这两个字段封装成一个JavaBean当做的k2

  • 代码实现

a	1
a 9
b 3
a 7
b 8
b 10
a 5
将以上进行排序

自定义JavaBean并重写CompareTo

package cn.itcast.mr.demo2;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException; public class K2Bean implements WritableComparable<K2Bean> {
//将数据中字母封装到第一个,数字封装到第二个
private String first;
private int second; /**
* compareTo方法,用于数据的比较排序
*
* @param o
* @return
*/
@Override
public int compareTo(K2Bean o) {
//首先比较第一个字段
int i = this.first.compareTo(o.first);
//如果第一个字段相同,就比较第二个字段
if (i == 0) {
int i1 = Integer.valueOf(this.second).compareTo(Integer.valueOf(o.second));
return i1; //如果改为-i1,则为按第二个字段的字典顺序的倒序排序
} else {
//如果第一个字段不同,直接返回结果
return i; //如果改为-i,则为按第一个字段的字典顺序的倒序排序
} } /**
* Writable 序列化的方法
*
* @param out
* @throws IOException
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(first);
out.writeInt(second);
} /**
* 反序列化的方法
*
* @param in
* @throws IOException
*/
@Override
public void readFields(DataInput in) throws IOException {
this.first = in.readUTF();
this.second = in.readInt();
} //toSting方法
@Override
public String toString() {
return first + '\t' + second;
} //get(),set()方法
public String getFirst() {
return first;
} public void setFirst(String first) {
this.first = first;
} public int getSecond() {
return second;
} public void setSecond(int second) {
this.second = second;
}
}

定义一个map类

package cn.itcast.mr.demo2;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class SortMapper extends Mapper<LongWritable, Text, K2Bean, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//将接收到的数据切割为字符串集合
String[] split = value.toString().split("\t");
//将字符串转换为k2Bean
K2Bean k2Bean = new K2Bean();
k2Bean.setFirst(split[0]);
k2Bean.setSecond(Integer.parseInt(split[1]));
//输出k2,v2
context.write(k2Bean, NullWritable.get());
}
}

定义一个reduce类

package cn.itcast.mr.demo2;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class SortReducer extends Reducer<K2Bean, NullWritable, K2Bean, NullWritable> {
@Override
protected void reduce(K2Bean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
//循环遍历会让重复的Key2都打印出来
for (NullWritable value : values) {
context.write(key, NullWritable.get());
}
}
}

程序main函数入口

package cn.itcast.mr.demo2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; public class SortMain extends Configured implements Tool { public static void main(String[] args) throws Exception {
//创建Configuration对象
Configuration configuration = new Configuration();
//执行ToolRunner得到一个int类型的返回状态码
int run = ToolRunner.run(configuration, new SortMain(), args);
//程序退出
System.exit(run);
} @Override
public int run(String[] args) throws Exception {
//创建job对象
Job job = Job.getInstance(super.getConf(), "sort");
//输入数据,设置输入路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.setInputPaths(job, new Path("file:////Volumes/赵壮备份/大数据离线课程资料/4.大数据离线第四天/排序/input")); //自定义map逻辑
job.setMapperClass(SortMapper.class);
job.setMapOutputKeyClass(K2Bean.class);
job.setMapOutputValueClass(NullWritable.class); //分区、排序、规约、分组省略 //自定义reduce逻辑
job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(K2Bean.class);
job.setOutputValueClass(NullWritable.class); //输出数据,设置输出路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("file:////Volumes/赵壮备份/大数据离线课程资料/4.大数据离线第四天/排序/output")); //提交任务
boolean b = job.waitForCompletion(true);
return b ? 0 : 1;
}
}

计数器

  • 自带计数器

    【Hadoop离线基础总结】MapReduce增强(上)-LMLPHP
  • 自定义计数器

第一种是通过context上下文对象获取计数器

package cn.itcast.mr.demo2;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class SortMapper extends Mapper<LongWritable, Text, K2Bean, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //自定义计数器,这里实现了统计map数据的条数
Counter counter = context.getCounter("MR_COUNT", "MapRecordCounter");
counter.increment(1L); //将接收到的数据切割为字符串集合
String[] split = value.toString().split("\t");
//将字符串转换为k2Bean
K2Bean k2Bean = new K2Bean();
k2Bean.setFirst(split[0]);
k2Bean.setSecond(Integer.parseInt(split[1]));
//输出k2,v2
context.write(k2Bean, NullWritable.get());
}
}

第二种是通过enum枚举类型定义计数器

package cn.itcast.mr.demo2;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class SortReducer extends Reducer<K2Bean, NullWritable, K2Bean, NullWritable> { public static enum Counter{
REDUCE_INPUT_RECORDS, REDUCE_INPUT_VAL_NUMS,
} @Override
protected void reduce(K2Bean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
//统计输入的key有多少
context.getCounter(Counter.REDUCE_INPUT_RECORDS).increment(1L);
//循环遍历会让重复的Key2都打印出来
for (NullWritable value : values) {
//统计输出的value有多少
context.getCounter(Counter.REDUCE_INPUT_VAL_NUMS).increment(1L);
context.write(key, NullWritable.get());
}
}
}

Combiner

  • 概述

    combiner可以先对相同k2进行合并,减少发送到reduce阶段的k2的数量,这么做的好处是可以节约网络带宽。

    combiner其实就是一个reducer类,但是这个reducer类的输入和输出比较特殊,输入是k2,v2,输出还是k2,v2。

    combiner不能改变数据结果值,只是用于调用减少发送到reduce端的数据量。

    注意:求平均值不能用combiner。

MapReduce综合练习之上网流量统计

  • 统计求和

自定义JavaBean(FlowNum)

package cn.itcast.mr.demo3;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException; public class FlowNum implements Writable {
//定义上行、下行、上行总、下行总流量
private Integer upload;
private Integer download;
private Integer uploadSum;
private Integer downloadSum; /**
* 序列化
*
* @param out
* @throws IOException
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(upload);
out.writeInt(download);
out.writeInt(uploadSum);
out.writeInt(downloadSum);
} /**
* 反序列化
*
* @param in
* @throws IOException
*/
@Override
public void readFields(DataInput in) throws IOException {
this.upload = in.readInt();
this.download = in.readInt();
this.uploadSum = in.readInt();
this.downloadSum = in.readInt();
} public Integer getUpload() {
return upload;
} public void setUpload(Integer upload) {
this.upload = upload;
} public Integer getDownload() {
return download;
} public void setDownload(Integer download) {
this.download = download;
} public Integer getUploadSum() {
return uploadSum;
} public void setUploadSum(Integer uploadSum) {
this.uploadSum = uploadSum;
} public Integer getDownloadSum() {
return downloadSum;
} public void setDownloadSum(Integer downloadSum) {
this.downloadSum = downloadSum;
} @Override
public String toString() {
return upload + "\t" + download + "\t" + uploadSum + "\t" + downloadSum;
}
}

定义一个map类

package cn.itcast.mr.demo3;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FlowNumMapper extends Mapper<LongWritable, Text, Text, FlowNum> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//以"\t"切分Text数据
String[] split = value.toString().split("\t");
//手机号
String phoneNum = split[1];
//index 6、7、8、9分别对应上行流量、下行流量、上行总流量、下行总流量
Integer upload = Integer.parseInt(split[6]);
Integer download = Integer.parseInt(split[7]);
Integer uploadSum = Integer.parseInt(split[8]);
Integer downloadSum = Integer.parseInt(split[9]);
FlowNum flowNum = new FlowNum();
flowNum.setUpload(upload);
flowNum.setDownload(download);
flowNum.setUploadSum(uploadSum);
flowNum.setDownloadSum(downloadSum);
//输出k2,v2
context.write(new Text(phoneNum), flowNum);
}
}

定义reducer类

package cn.itcast.mr.demo3;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class FlowNumReducer extends Reducer<Text, FlowNum, Text, FlowNum> {
@Override
protected void reduce(Text key, Iterable<FlowNum> values, Context context) throws IOException, InterruptedException {
//定义上行、下行、上行总、下行总流量,并初始化
int upload = 0;
int download = 0;
int uploadSum = 0;
int downloadSum = 0;
//遍历累加流量
for (FlowNum value : values) {
upload += value.getUpload();
download += value.getDownload();
uploadSum += value.getUploadSum();
downloadSum += value.getDownloadSum();
}
FlowNum flowNum = new FlowNum();
flowNum.setUpload(upload);
flowNum.setDownload(download);
flowNum.setUploadSum(uploadSum);
flowNum.setDownloadSum(downloadSum);
//输出k3,v3
context.write(key, flowNum);
}
}

程序main函数入口

package cn.itcast.mr.demo3;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; public class FlowNumMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
//获取job对象
Job job = Job.getInstance(super.getConf(), "flowSum");
//输入数据,设置输入路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.setInputPaths(job, new Path("file:////Volumes/赵壮备份/大数据离线课程资料/4.大数据离线第四天/流量统计/input")); //自定义map逻辑
job.setMapperClass(FlowNumMapper.class);
//设置k2,v2类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowNum.class); //自定义reduce逻辑
job.setReducerClass(FlowNumReducer.class);
//设置k3,v3类型
job.setOutputValueClass(FlowNum.class);
job.setOutputKeyClass(Text.class); //输出数据,设置输出路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("file:////Volumes/赵壮备份/大数据离线课程资料/4.大数据离线第四天/流量统计/output2")); //上传任务到集群
boolean b = job.waitForCompletion(true);
return b ? 0 : 1;
} public static void main(String[] args) throws Exception {
//获取Configuration对象
Configuration configuration = new Configuration();
//使用ToolRunner返回一个状态码
int run = ToolRunner.run(configuration, new FlowNumMain(), args);
//系统推出
System.exit(run);
}
}

  • 上行流量倒序排序

自定义JavaBean并重写CompareTo

package cn.itcast.mr.demo4;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException; public class FlowNumSort implements WritableComparable<FlowNumSort> {
private Integer upFlow;
private Integer downFlow;
private Integer upCountFlow;
private Integer downCountFlow; //因为是倒序,所以需要返回 "-i"
@Override
public int compareTo(FlowNumSort o) {
int i = this.upFlow.compareTo(o.upFlow);
return -i;
} /**
* 序列化
*
* @param out
* @throws IOException
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(upFlow);
out.writeInt(downFlow);
out.writeInt(upCountFlow);
out.writeInt(downCountFlow);
} /**
* 反序列化
*
* @param in
* @throws IOException
*/
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readInt();
this.downFlow = in.readInt();
this.upCountFlow = in.readInt();
this.downCountFlow = in.readInt();
} public Integer getUpFlow() {
return upFlow;
} public void setUpFlow(Integer upFlow) {
this.upFlow = upFlow;
} public Integer getDownFlow() {
return downFlow;
} public void setDownFlow(Integer downFlow) {
this.downFlow = downFlow;
} public Integer getUpCountFlow() {
return upCountFlow;
} public void setUpCountFlow(Integer upCountFlow) {
this.upCountFlow = upCountFlow;
} public Integer getDownCountFlow() {
return downCountFlow;
} public void setDownCountFlow(Integer downCountFlow) {
this.downCountFlow = downCountFlow;
} @Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + upCountFlow + "\t" + downCountFlow;
}
}

定义一个Mapper类

package cn.itcast.mr.demo4;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FlowNumSortMapper extends Mapper<LongWritable, Text, FlowNumSort, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
FlowNumSort flowNumSort = new FlowNumSort();
flowNumSort.setUpFlow(Integer.parseInt(split[1]));
flowNumSort.setDownFlow(Integer.parseInt(split[2]));
flowNumSort.setUpCountFlow(Integer.parseInt(split[3]));
flowNumSort.setDownCountFlow(Integer.parseInt(split[4]));
context.write(flowNumSort, new Text(split[0]));
}
}

定义一个Reducer类

package cn.itcast.mr.demo4;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class FlowNumSortReducer extends Reducer<FlowNumSort, Text, FlowNumSort, Text> {
@Override
protected void reduce(FlowNumSort key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(key, value);
}
}
}

程序main函数入口

package cn.itcast.mr.demo4;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; public class FlowNumSortMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(super.getConf(), "FlowSort");
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.setInputPaths(job, new Path("file:////Volumes/赵壮备份/大数据离线课程资料/4.大数据离线第四天/流量统计/output")); job.setMapperClass(FlowNumSortMapper.class);
job.setMapOutputKeyClass(FlowNumSort.class);
job.setMapOutputValueClass(Text.class); job.setReducerClass(FlowNumSortReducer.class);
job.setOutputKeyClass(FlowNumSort.class);
job.setOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("file:////Volumes/赵壮备份/大数据离线课程资料/4.大数据离线第四天/流量统计/flowout_put")); boolean b = job.waitForCompletion(true);
return b ? 0 : 1;
} public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
int run = ToolRunner.run(configuration, new FlowNumSortMain(), args);
System.exit(run);
}
}

  • 手机号码分区

    在需求一的基础上,继续完善,将不同的手机号分到不同的数据文件的当中去

定义一个Partitioner类

package cn.itcast.mr.demo3;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner; public class PhonePartition extends Partitioner<Text,FlowNum> {
@Override
public int getPartition(Text text, FlowNum flowNum, int i) {
String phoneNum = text.toString();
if (phoneNum.startsWith("135")){
return 0;
} else if (phoneNum.startsWith("136")){
return 1;
} else if (phoneNum.startsWith("137")){
return 2;
} else if (phoneNum.startsWith("138")){
return 3;
} else if (phoneNum.startsWith("139")){
return 4;
} else {
return 5;
}
}
}

Mapper类和Reducer类和需求一的相同

程序main函数要添加打包方式、调用分区类、设置reduce数,修改输入和输出数据路径

package cn.itcast.mr.demo3;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; public class FlowNumMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
//获取job对象
Job job = Job.getInstance(super.getConf(), "flowSum");
//输入数据,设置输入路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.setInputPaths(job, new Path(args[0])); //打包
job.setJarByClass(FlowNumMain.class); //自定义map逻辑
job.setMapperClass(FlowNumMapper.class);
//设置k2,v2类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowNum.class); //分区
job.setPartitionerClass(PhonePartition.class); //自定义reduce逻辑
job.setReducerClass(FlowNumReducer.class);
//设置k3,v3类型
job.setOutputValueClass(FlowNum.class);
job.setOutputKeyClass(Text.class); //指定reduce个数
job.setNumReduceTasks(6); //输出数据,设置输出路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(args[1])); //上传任务到集群
boolean b = job.waitForCompletion(true);
return b ? 0 : 1;
} public static void main(String[] args) throws Exception {
//获取Configuration对象
Configuration configuration = new Configuration();
//使用ToolRunner返回一个状态码
int run = ToolRunner.run(configuration, new FlowNumMain(), args);
//系统退出
System.exit(run);
}
}
05-18 11:11