MapReduce增强
MapReduce的分区与reduceTask的数量
- 概述
MapReduce当中的分区:物以类聚,人以群分。相同key的数据,去往同一个reduce。
ReduceTask的数量默认为一个,可以自己设定数量 job.setNumRudeceTasks(3)
分区决定了我们的数据该去往哪一个ReduceTask里面去 - 用代码实现
注意:分区的案例,只能打成jar包发布到集群上面去运行,本地模式已经不能正常运行了
定义一个map类
package cn.itcast.mr.demo1;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class PartitionMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//输出k2,v2 k2是一行文本数据,v2为NullWritable
context.write(value, NullWritable.get());
}
}
定义一个reduce类
package cn.itcast.mr.demo1;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class PartitionReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
自定义Partitioner
package cn.itcast.mr.demo1;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class PartitionOwn extends Partitioner<Text, NullWritable> {
/**
* 这个方法决定了数据去往哪一个reduce
*
* @param text k2
* @param nullWritable v2
* @param numReduceTask
* @return
*/
@Override
public int getPartition(Text text, NullWritable nullWritable, int numReduceTask) {
//以"\t"切分k2的数据
String result = text.toString().split("\t")[5];
System.out.println(result);
//判断结果值大于15去往一个分区,小于15去往一个分区
if (Integer.parseInt(result) > 15) {
return 1;
} else {
return 0;
}
}
}
程序main函数入口
package cn.itcast.mr.demo1;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
public class PartitionMain extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new PartitionMain(), args);
System.exit(run);
}
@Override
public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//获取我们的job对象,封装我们的job任务
Job job = Job.getInstance(super.getConf(), "myPartition");
//打成jar包运行时必备
job.setJarByClass(PartitionMain.class);
//第一步:读取文件,解析成k1,v1
job.setInputFormatClass(TextInputFormat.class);
//设置输入类型
TextInputFormat.setInputPaths(job, new Path("hdfs://node01:8020/partitionin"));
//第二步:自定义map逻辑,接收k1,v1,转换成k2,v2
job.setMapperClass(PartitionMapper.class);
//设置k2,v2类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//第三步:分区 相同key的数据发送到同一个reduce中去
job.setPartitionerClass(PartitionOwn.class);
//第四步到第六步省略
//第七步:自定义reduce逻辑
job.setReducerClass(PartitionReducer.class);
//设置k3,v3类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//设置reduceTask的数量
//如果reduceTask的数量比分区多,就会有空文件
//如果reduceTask的数量比分区少,就会有一个reduce处理更多的数据
job.setNumReduceTasks(2);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("hdfs://node01:8020/partition_out"));
//提交任务
boolean b = job.waitForCompletion(true);
return b ? 0 : 1;
}
}
排序以及序列化
概述
序列化(Serialization)是指把结构化对象转化为字节流。
反序列化(Deserialization)是序列化的逆过程,把字节流转为结构化对象。
Mapreduce是按照字典顺序对k2的值进行排序。Hadoop没有沿用java的serialize方式实现序列化,可以用自己的writable接口实现序列化。
实现Writable可以进行序列化,实现Comparable可以进行排序,如果想既实现序列话,又进行排序,可以同时实现Writable和Comparable,或者实现WritableComparable
如果以一行文本内容作为k2,不能够实现二次排序的功能,这时可以这两个字段封装成一个JavaBean当做的k2代码实现
a 1
a 9
b 3
a 7
b 8
b 10
a 5
将以上进行排序
自定义JavaBean并重写CompareTo
package cn.itcast.mr.demo2;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class K2Bean implements WritableComparable<K2Bean> {
//将数据中字母封装到第一个,数字封装到第二个
private String first;
private int second;
/**
* compareTo方法,用于数据的比较排序
*
* @param o
* @return
*/
@Override
public int compareTo(K2Bean o) {
//首先比较第一个字段
int i = this.first.compareTo(o.first);
//如果第一个字段相同,就比较第二个字段
if (i == 0) {
int i1 = Integer.valueOf(this.second).compareTo(Integer.valueOf(o.second));
return i1; //如果改为-i1,则为按第二个字段的字典顺序的倒序排序
} else {
//如果第一个字段不同,直接返回结果
return i; //如果改为-i,则为按第一个字段的字典顺序的倒序排序
}
}
/**
* Writable 序列化的方法
*
* @param out
* @throws IOException
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(first);
out.writeInt(second);
}
/**
* 反序列化的方法
*
* @param in
* @throws IOException
*/
@Override
public void readFields(DataInput in) throws IOException {
this.first = in.readUTF();
this.second = in.readInt();
}
//toSting方法
@Override
public String toString() {
return first + '\t' + second;
}
//get(),set()方法
public String getFirst() {
return first;
}
public void setFirst(String first) {
this.first = first;
}
public int getSecond() {
return second;
}
public void setSecond(int second) {
this.second = second;
}
}
定义一个map类
package cn.itcast.mr.demo2;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class SortMapper extends Mapper<LongWritable, Text, K2Bean, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//将接收到的数据切割为字符串集合
String[] split = value.toString().split("\t");
//将字符串转换为k2Bean
K2Bean k2Bean = new K2Bean();
k2Bean.setFirst(split[0]);
k2Bean.setSecond(Integer.parseInt(split[1]));
//输出k2,v2
context.write(k2Bean, NullWritable.get());
}
}
定义一个reduce类
package cn.itcast.mr.demo2;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class SortReducer extends Reducer<K2Bean, NullWritable, K2Bean, NullWritable> {
@Override
protected void reduce(K2Bean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
//循环遍历会让重复的Key2都打印出来
for (NullWritable value : values) {
context.write(key, NullWritable.get());
}
}
}
程序main函数入口
package cn.itcast.mr.demo2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class SortMain extends Configured implements Tool {
public static void main(String[] args) throws Exception {
//创建Configuration对象
Configuration configuration = new Configuration();
//执行ToolRunner得到一个int类型的返回状态码
int run = ToolRunner.run(configuration, new SortMain(), args);
//程序退出
System.exit(run);
}
@Override
public int run(String[] args) throws Exception {
//创建job对象
Job job = Job.getInstance(super.getConf(), "sort");
//输入数据,设置输入路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.setInputPaths(job, new Path("file:////Volumes/赵壮备份/大数据离线课程资料/4.大数据离线第四天/排序/input"));
//自定义map逻辑
job.setMapperClass(SortMapper.class);
job.setMapOutputKeyClass(K2Bean.class);
job.setMapOutputValueClass(NullWritable.class);
//分区、排序、规约、分组省略
//自定义reduce逻辑
job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(K2Bean.class);
job.setOutputValueClass(NullWritable.class);
//输出数据,设置输出路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("file:////Volumes/赵壮备份/大数据离线课程资料/4.大数据离线第四天/排序/output"));
//提交任务
boolean b = job.waitForCompletion(true);
return b ? 0 : 1;
}
}
计数器
- 自带计数器
- 自定义计数器
第一种是通过context上下文对象获取计数器
package cn.itcast.mr.demo2;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class SortMapper extends Mapper<LongWritable, Text, K2Bean, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//自定义计数器,这里实现了统计map数据的条数
Counter counter = context.getCounter("MR_COUNT", "MapRecordCounter");
counter.increment(1L);
//将接收到的数据切割为字符串集合
String[] split = value.toString().split("\t");
//将字符串转换为k2Bean
K2Bean k2Bean = new K2Bean();
k2Bean.setFirst(split[0]);
k2Bean.setSecond(Integer.parseInt(split[1]));
//输出k2,v2
context.write(k2Bean, NullWritable.get());
}
}
第二种是通过enum枚举类型定义计数器
package cn.itcast.mr.demo2;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class SortReducer extends Reducer<K2Bean, NullWritable, K2Bean, NullWritable> {
public static enum Counter{
REDUCE_INPUT_RECORDS, REDUCE_INPUT_VAL_NUMS,
}
@Override
protected void reduce(K2Bean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
//统计输入的key有多少
context.getCounter(Counter.REDUCE_INPUT_RECORDS).increment(1L);
//循环遍历会让重复的Key2都打印出来
for (NullWritable value : values) {
//统计输出的value有多少
context.getCounter(Counter.REDUCE_INPUT_VAL_NUMS).increment(1L);
context.write(key, NullWritable.get());
}
}
}
Combiner
- 概述
combiner可以先对相同k2进行合并,减少发送到reduce阶段的k2的数量,这么做的好处是可以节约网络带宽。
combiner其实就是一个reducer类,但是这个reducer类的输入和输出比较特殊,输入是k2,v2,输出还是k2,v2。
combiner不能改变数据结果值,只是用于调用减少发送到reduce端的数据量。
注意:求平均值不能用combiner。
MapReduce综合练习之上网流量统计
- 统计求和
自定义JavaBean(FlowNum)
package cn.itcast.mr.demo3;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowNum implements Writable {
//定义上行、下行、上行总、下行总流量
private Integer upload;
private Integer download;
private Integer uploadSum;
private Integer downloadSum;
/**
* 序列化
*
* @param out
* @throws IOException
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(upload);
out.writeInt(download);
out.writeInt(uploadSum);
out.writeInt(downloadSum);
}
/**
* 反序列化
*
* @param in
* @throws IOException
*/
@Override
public void readFields(DataInput in) throws IOException {
this.upload = in.readInt();
this.download = in.readInt();
this.uploadSum = in.readInt();
this.downloadSum = in.readInt();
}
public Integer getUpload() {
return upload;
}
public void setUpload(Integer upload) {
this.upload = upload;
}
public Integer getDownload() {
return download;
}
public void setDownload(Integer download) {
this.download = download;
}
public Integer getUploadSum() {
return uploadSum;
}
public void setUploadSum(Integer uploadSum) {
this.uploadSum = uploadSum;
}
public Integer getDownloadSum() {
return downloadSum;
}
public void setDownloadSum(Integer downloadSum) {
this.downloadSum = downloadSum;
}
@Override
public String toString() {
return upload + "\t" + download + "\t" + uploadSum + "\t" + downloadSum;
}
}
定义一个map类
package cn.itcast.mr.demo3;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowNumMapper extends Mapper<LongWritable, Text, Text, FlowNum> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//以"\t"切分Text数据
String[] split = value.toString().split("\t");
//手机号
String phoneNum = split[1];
//index 6、7、8、9分别对应上行流量、下行流量、上行总流量、下行总流量
Integer upload = Integer.parseInt(split[6]);
Integer download = Integer.parseInt(split[7]);
Integer uploadSum = Integer.parseInt(split[8]);
Integer downloadSum = Integer.parseInt(split[9]);
FlowNum flowNum = new FlowNum();
flowNum.setUpload(upload);
flowNum.setDownload(download);
flowNum.setUploadSum(uploadSum);
flowNum.setDownloadSum(downloadSum);
//输出k2,v2
context.write(new Text(phoneNum), flowNum);
}
}
定义reducer类
package cn.itcast.mr.demo3;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowNumReducer extends Reducer<Text, FlowNum, Text, FlowNum> {
@Override
protected void reduce(Text key, Iterable<FlowNum> values, Context context) throws IOException, InterruptedException {
//定义上行、下行、上行总、下行总流量,并初始化
int upload = 0;
int download = 0;
int uploadSum = 0;
int downloadSum = 0;
//遍历累加流量
for (FlowNum value : values) {
upload += value.getUpload();
download += value.getDownload();
uploadSum += value.getUploadSum();
downloadSum += value.getDownloadSum();
}
FlowNum flowNum = new FlowNum();
flowNum.setUpload(upload);
flowNum.setDownload(download);
flowNum.setUploadSum(uploadSum);
flowNum.setDownloadSum(downloadSum);
//输出k3,v3
context.write(key, flowNum);
}
}
程序main函数入口
package cn.itcast.mr.demo3;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class FlowNumMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
//获取job对象
Job job = Job.getInstance(super.getConf(), "flowSum");
//输入数据,设置输入路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.setInputPaths(job, new Path("file:////Volumes/赵壮备份/大数据离线课程资料/4.大数据离线第四天/流量统计/input"));
//自定义map逻辑
job.setMapperClass(FlowNumMapper.class);
//设置k2,v2类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowNum.class);
//自定义reduce逻辑
job.setReducerClass(FlowNumReducer.class);
//设置k3,v3类型
job.setOutputValueClass(FlowNum.class);
job.setOutputKeyClass(Text.class);
//输出数据,设置输出路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("file:////Volumes/赵壮备份/大数据离线课程资料/4.大数据离线第四天/流量统计/output2"));
//上传任务到集群
boolean b = job.waitForCompletion(true);
return b ? 0 : 1;
}
public static void main(String[] args) throws Exception {
//获取Configuration对象
Configuration configuration = new Configuration();
//使用ToolRunner返回一个状态码
int run = ToolRunner.run(configuration, new FlowNumMain(), args);
//系统推出
System.exit(run);
}
}
- 上行流量倒序排序
自定义JavaBean并重写CompareTo
package cn.itcast.mr.demo4;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowNumSort implements WritableComparable<FlowNumSort> {
private Integer upFlow;
private Integer downFlow;
private Integer upCountFlow;
private Integer downCountFlow;
//因为是倒序,所以需要返回 "-i"
@Override
public int compareTo(FlowNumSort o) {
int i = this.upFlow.compareTo(o.upFlow);
return -i;
}
/**
* 序列化
*
* @param out
* @throws IOException
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(upFlow);
out.writeInt(downFlow);
out.writeInt(upCountFlow);
out.writeInt(downCountFlow);
}
/**
* 反序列化
*
* @param in
* @throws IOException
*/
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readInt();
this.downFlow = in.readInt();
this.upCountFlow = in.readInt();
this.downCountFlow = in.readInt();
}
public Integer getUpFlow() {
return upFlow;
}
public void setUpFlow(Integer upFlow) {
this.upFlow = upFlow;
}
public Integer getDownFlow() {
return downFlow;
}
public void setDownFlow(Integer downFlow) {
this.downFlow = downFlow;
}
public Integer getUpCountFlow() {
return upCountFlow;
}
public void setUpCountFlow(Integer upCountFlow) {
this.upCountFlow = upCountFlow;
}
public Integer getDownCountFlow() {
return downCountFlow;
}
public void setDownCountFlow(Integer downCountFlow) {
this.downCountFlow = downCountFlow;
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + upCountFlow + "\t" + downCountFlow;
}
}
定义一个Mapper类
package cn.itcast.mr.demo4;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowNumSortMapper extends Mapper<LongWritable, Text, FlowNumSort, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
FlowNumSort flowNumSort = new FlowNumSort();
flowNumSort.setUpFlow(Integer.parseInt(split[1]));
flowNumSort.setDownFlow(Integer.parseInt(split[2]));
flowNumSort.setUpCountFlow(Integer.parseInt(split[3]));
flowNumSort.setDownCountFlow(Integer.parseInt(split[4]));
context.write(flowNumSort, new Text(split[0]));
}
}
定义一个Reducer类
package cn.itcast.mr.demo4;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowNumSortReducer extends Reducer<FlowNumSort, Text, FlowNumSort, Text> {
@Override
protected void reduce(FlowNumSort key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(key, value);
}
}
}
程序main函数入口
package cn.itcast.mr.demo4;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class FlowNumSortMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(super.getConf(), "FlowSort");
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.setInputPaths(job, new Path("file:////Volumes/赵壮备份/大数据离线课程资料/4.大数据离线第四天/流量统计/output"));
job.setMapperClass(FlowNumSortMapper.class);
job.setMapOutputKeyClass(FlowNumSort.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(FlowNumSortReducer.class);
job.setOutputKeyClass(FlowNumSort.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("file:////Volumes/赵壮备份/大数据离线课程资料/4.大数据离线第四天/流量统计/flowout_put"));
boolean b = job.waitForCompletion(true);
return b ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
int run = ToolRunner.run(configuration, new FlowNumSortMain(), args);
System.exit(run);
}
}
- 手机号码分区
在需求一的基础上,继续完善,将不同的手机号分到不同的数据文件的当中去
定义一个Partitioner类
package cn.itcast.mr.demo3;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class PhonePartition extends Partitioner<Text,FlowNum> {
@Override
public int getPartition(Text text, FlowNum flowNum, int i) {
String phoneNum = text.toString();
if (phoneNum.startsWith("135")){
return 0;
} else if (phoneNum.startsWith("136")){
return 1;
} else if (phoneNum.startsWith("137")){
return 2;
} else if (phoneNum.startsWith("138")){
return 3;
} else if (phoneNum.startsWith("139")){
return 4;
} else {
return 5;
}
}
}
Mapper类和Reducer类和需求一的相同
程序main函数要添加打包方式、调用分区类、设置reduce数,修改输入和输出数据路径
package cn.itcast.mr.demo3;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class FlowNumMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
//获取job对象
Job job = Job.getInstance(super.getConf(), "flowSum");
//输入数据,设置输入路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.setInputPaths(job, new Path(args[0]));
//打包
job.setJarByClass(FlowNumMain.class);
//自定义map逻辑
job.setMapperClass(FlowNumMapper.class);
//设置k2,v2类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowNum.class);
//分区
job.setPartitionerClass(PhonePartition.class);
//自定义reduce逻辑
job.setReducerClass(FlowNumReducer.class);
//设置k3,v3类型
job.setOutputValueClass(FlowNum.class);
job.setOutputKeyClass(Text.class);
//指定reduce个数
job.setNumReduceTasks(6);
//输出数据,设置输出路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(args[1]));
//上传任务到集群
boolean b = job.waitForCompletion(true);
return b ? 0 : 1;
}
public static void main(String[] args) throws Exception {
//获取Configuration对象
Configuration configuration = new Configuration();
//使用ToolRunner返回一个状态码
int run = ToolRunner.run(configuration, new FlowNumMain(), args);
//系统退出
System.exit(run);
}
}