一:自定义OutputFormat类
MapReduce默认的OutPutFormat会将结果输出文件放置到一个我们指定的目录下,但如果想把输出文件根据某个条件,把满足不同条件的内容分别输出到不同的目录下,

就需要自定义实现OutputFormat类,且重写RecordWriter方法。在驱动类中设置job.setOutputFormatClass方法为自定义实现的OutputFormat类

下面案例是一组购物文本数据,将其中的好评和差评分别输出到对应的好评文件夹下、差评文件夹下。

二:自定义实现OutputFormat类代码实现

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* 自定义实现OutputFormat类
*/
public class MyOutputFormat extends FileOutputFormat<Text,NullWritable> {

@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
//从这个方法里面可以获取一个configuration
Configuration configuration = context.getConfiguration();
//获取文件系统的对象
FileSystem fileSystem = FileSystem.get(configuration);
//好评文件的输出路径
Path goodComment = new Path("file:///F:\\goodComment\\1.txt");

//差评文件的输出路径
Path badComment = new Path("file:///F:\\badComment\\1.txt");

//获取到了两个输出流
FSDataOutputStream fsDataOutputStream = fileSystem.create(goodComment);
FSDataOutputStream fsDataOutputStream1 = fileSystem.create(badComment);

MyRecordWriter myRecordWriter = new MyRecordWriter(fsDataOutputStream, fsDataOutputStream1);

return myRecordWriter;
}
}

三:自定义实现RecordWriter类

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

public class MyRecordWriter extends RecordWriter<Text,NullWritable> {
private FSDataOutputStream goodStream;
private FSDataOutputStream badSteam;

public MyRecordWriter(){

}

public MyRecordWriter(FSDataOutputStream goodStream,FSDataOutputStream badSteam){
this.goodStream = goodStream;
this.badSteam= badSteam;

}

/**
* 重写write方法
* 这个write方法就是往外写出去数据,我们可以根据这个key,来判断文件究竟往哪个目录下面写
* goodStream:指定输出文件
* badSteam:自定输出文件
* @param key:k3
* @param value
* @throws IOException
* @throws InterruptedException
*/
@Override
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
String[] split = key.toString().split("\t");
//获取评论状态 0 好评 1 中评 2 差评
// split[9]
//判断评评论状态,如果是小于等于1,都写到好评文件里面去
if(Integer.parseInt(split[9])<=1){
//好评
goodStream.write(key.getBytes());
goodStream.write("\r\n".getBytes());
}else{
//差评
badSteam.write(key.getBytes());
badSteam.write("\r\n".getBytes());
}
}

/**
* 关闭资源
* @param context:上下文对象
* @throws IOException
* @throws InterruptedException
*/
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
IOUtils.closeStream(badSteam);
IOUtils.closeStream(goodStream);
}
}

四:自定义Map类

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MyOutputMapper extends Mapper<LongWritable,Text,Text,NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(value,NullWritable.get());
}
}

五:驱动程序

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MyOutputMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(super.getConf(), "ownOutputFormat");

job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///F:\\input"));


job.setMapperClass(MyOutputMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);


job.setOutputFormatClass(MyOutputFormat.class);
//由于重写了FileOutputFormat,所以下面这个指定的目录内不会有输出文件
//输出文件在MyOutputFormat中重新指定
MyOutputFormat.setOutputPath(job ,new Path("file:///F:\\output"));

boolean b = job.waitForCompletion(true);

return b?0:1;
}

public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new MyOutputMain(), args);
System.exit(run);
}

}
02-10 14:34