package com.bigdata.hadoop.wordcount;

 import java.io.IOException;

 import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount {
/**
* 设置Map方法
* @author hxiuz
*
*/
private static class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private Text mapOutkey = new Text(); //设置输出key的格式
private final static IntWritable mapOutvalue = new IntWritable(1); //设置输出value的格式并赋值1
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException { //key即行偏移量
String input = value.toString(); //读入value数据
String[] inArr = input.split(" "); //按空格分割
for(String str:inArr) {
mapOutkey.set(str); //给key赋值
context.write(mapOutkey, mapOutvalue); //写入
}
}
} /**
* 设置Reduce方法
* @author hxiuz
*
*/
private static class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
private IntWritable redOutvalue = new IntWritable(); @Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0; //计数变量
for(IntWritable value:values) {
sum += value.get(); //遍历集合values并将计数累加
} redOutvalue.set(sum); //给输出value赋值为sum
context.write(key, redOutvalue); //写入
}
} /**
* 主方法入口
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
if(args.length!=2) {
System.out.println("Usage:wordcount <in> <out>");
return ;
}
Configuration conf = new Configuration(); //读取配置文件
try {
//新建一个job任务实例 并通过类设置jar
Job job = Job.getInstance(conf, WordCount.class.getSimpleName());
job.setJarByClass(WordCount.class); //设置输入路径
Path inputPath = new Path(args[0]);
FileInputFormat.addInputPath(job, inputPath); //设置map类
job.setMapperClass(WCMapper.class);
//设置map输出的格式
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class); //设置reduce类
job.setReducerClass(WCReducer.class);
//设置reduce输出的格式
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class); //设置输出路径
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath); //提交任务
boolean jobStatus = job.waitForCompletion(true); //判断程序是否正常退出
System.exit(jobStatus ? 0 : 1); } catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} } }
05-11 10:53