自定义InputFormat
OutputFormat
示例代码
package com.vip09;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class ScoreWritable implements WritableComparable<Object>{
//在自定义的数据类型中,建议使用java原生的数据类型
private float chinese ;
private float math ;
private float english ;
private float physics ;
private float chemistry ;
//在自定义的数据类型中,必须要有一个无参的构造方法
public ScoreWritable(){}
public ScoreWritable(float chinese, float math, float english, float physics, float chemistry) {
this.chinese = chinese;
this.math = math;
this.english = english;
this.physics = physics;
this.chemistry = chemistry;
}
public void set(float chinese, float math, float english, float physics, float chemistry){
this.chinese = chinese;
this.math = math;
this.english = english;
this.physics = physics;
this.chemistry = chemistry;
}
public float getChinese() {
return chinese;
}
public float getMath() {
return math;
}
public float getEnglish() {
return english;
}
public float getPhysics() {
return physics;
}
public float getChemistry() {
return chemistry;
}
//是在写入数据的时候调用,进行序列化
@Override
public void write(DataOutput out) throws IOException {
out.writeFloat(chinese);
out.writeFloat(math);
out.writeFloat(english);
out.writeFloat(physics);
out.writeFloat(chemistry);
}
//该方法是在取出数据时调用,反序列化,以便生成对象
@Override
public void readFields(DataInput in) throws IOException {
chinese = in.readFloat() ;
math = in.readFloat() ;
english = in.readFloat() ;
physics = in.readFloat() ;
chemistry = in.readFloat() ;
}
@Override
public int compareTo(Object o) {
// TODO Auto-generated method stub
return 0;
}
}
package com.vip09;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ScoreCount extends Configured implements Tool{
//map和reduce
public static class ScoreMapper extends Mapper<Text, ScoreWritable, Text, ScoreWritable>{
@Override
protected void map(Text key, ScoreWritable value,
Context context)
throws IOException, InterruptedException {
context.write(key, value);
}
}
public static class ScoreReducer extends Reducer<Text, ScoreWritable, Text, Text>{
private Text text = new Text() ;
@Override
protected void reduce(Text key, Iterable<ScoreWritable> value,
Context context) throws IOException, InterruptedException {
float totalScore = 0.0f ;
float avgScore = 0.0f ;
for (ScoreWritable sw : value) {
totalScore = sw.getChinese() + sw.getEnglish() + sw.getMath() + sw.getPhysics() + sw.getChemistry() ;
avgScore = totalScore/5 ;
}
text.set(totalScore + "\t" + avgScore);
context.write(key, text);
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration() ;
//删除已经存在的输出目录
Path mypath = new Path(args[1]) ;
FileSystem hdfs = mypath.getFileSystem(conf);
if(hdfs.isDirectory(mypath)){
hdfs.delete(mypath, true) ;
}
Job job = Job.getInstance(conf, "scorecount") ;
job.setJarByClass(ScoreCount.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(ScoreMapper.class);
job.setReducerClass(ScoreReducer.class);
//如果是自定义的类型,需要进行设置
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(ScoreWritable.class);
//设置自定义的输入格式
job.setInputFormatClass(ScoreInputFormat.class);
job.waitForCompletion(true) ;
return 0;
}
public static void main(String[] args) throws Exception {
String[] args0 = {"hdfs://192.168.153.111:9000/input5",
"hdfs://192.168.153.111:9000/output15"} ;
int res = ToolRunner.run(new ScoreCount(), args0) ;
System.exit(res);
}
}
package com.vip09;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class ScoreInputFormat extends FileInputFormat<Text, ScoreWritable> {
//需要注意的是:
/*
* 对于一个数据输入格式,都需要一个对应的RecordReader
* 重写createRecordReader()方法,其实也就是重写其返回的对象
* 这里就是自定义的ScoreRecordReader类,该类需要继承RecordReader,实现数据的读取
* */
@Override
public RecordReader<Text, ScoreWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
return new ScoreRecordReader();
}
}
package com.vip09;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;
public class ScoreRecordReader extends RecordReader<Text, ScoreWritable>{
public LineReader in ; //行读取器
public Text lineKey ; //自定义key类型
public ScoreWritable linevalue ; //自定义的value类型
public Text line ; //行数据
//初始化方法,只执行一次
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
FileSplit fsplit = (FileSplit)split ;
Configuration conf = context.getConfiguration();
Path file = fsplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream filein = fs.open(file);
in = new LineReader(filein, conf) ;
line = new Text() ;
lineKey = new Text() ;
linevalue = new ScoreWritable() ;
}
//读取每一行数据的时候,都会执行该方法
//我们只需要根据自己的需求,重点编写该方法即可,其他的方法比较固定,仿照就好
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
int linesize = in.readLine(line);
if(linesize == 0){
return false ;
}
String[] pieces = line.toString().split("\\s+") ;
if(pieces.length != 7){
throw new IOException("无效的数据") ;
}
//将学生的每门成绩转换为float类型
float a =0 , b= 0 , c = 0 ,d = 0, e =0 ;
try{
a = Float.parseFloat(pieces[2].trim()) ;
b = Float.parseFloat(pieces[3].trim()) ;
c = Float.parseFloat(pieces[4].trim()) ;
d = Float.parseFloat(pieces[5].trim()) ;
e = Float.parseFloat(pieces[6].trim()) ;
}catch(NumberFormatException nfe){
nfe.printStackTrace();
}
lineKey.set(pieces[0] + "\t" + pieces[1]); //完成自定义的key数据
linevalue.set(a, b, c, d, e); //封装自定义的value数据
return true;
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return lineKey;
}
@Override
public ScoreWritable getCurrentValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return linevalue;
}
@Override
public float getProgress() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return 0;
}
@Override
public void close() throws IOException {
if(in != null){
in.close();
}
}
}
package com.vip09;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MapReduceCaseEmail extends Configured implements Tool{
public static class EmailMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1) ;
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
context.write(value, one);
}
}
public static class EmailReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
private IntWritable result = new IntWritable() ;
//输出到多个文件或多个文件夹,使用Multipleoutputs
private MultipleOutputs<Text, IntWritable> mout ;
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
mout = new MultipleOutputs<Text, IntWritable>(context) ;
}
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int begin = key.toString().indexOf("@") ;
int end = key.toString().indexOf(".") ;
if(begin >= end){
return ;
}
//获取邮箱类别,比如qq,163等
String name = key.toString().substring(begin + 1, end);
int sum = 0 ;
for (IntWritable value : values) {
sum += value.get() ;
}
result.set(sum);
//baseoutputpath-r-nnnnn
mout.write(key, result, name);
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
mout.close();
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration() ;
//删除已经存在的输出目录
Path mypath = new Path(args[1]) ;
FileSystem hdfs = mypath.getFileSystem(conf);
if(hdfs.isDirectory(mypath)){
hdfs.delete(mypath, true) ;
}
Job job = Job.getInstance(conf, "emailcount") ;
job.setJarByClass(MapReduceCaseEmail.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(EmailMapper.class);
job.setReducerClass(EmailReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.waitForCompletion(true) ;
return 0;
}
public static void main(String[] args) throws Exception {
String[] args0 = {"hdfs://192.168.153.111:9000/input6",
"hdfs://192.168.153.111:9000/output16"} ;
int res = ToolRunner.run(new MapReduceCaseEmail(), args0) ;
System.exit(res);
}
}