java 中 自定义OutputFormat的实例详解

实例代码:

package com.ccse.hadoop.outputformat;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;


public class MySelfOutputFormatApp {

  public final static String INPUT_PATH = "hdfs://chaoren1:9000/mapinput";
  public final static String OUTPUT_PATH = "hdfs://chaoren1:9000/mapoutput";
  public final static String OUTPUT_FILENAME = "/abc";

  public static void main(String[] args) throws IOException, URISyntaxException,
    ClassNotFoundException, InterruptedException {
    Configuration conf = new Configuration();
    FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf);
    fileSystem.delete(new Path(OUTPUT_PATH), true);

    Job job = new Job(conf, MySelfOutputFormatApp.class.getSimpleName());
    job.setJarByClass(MySelfOutputFormatApp.class);

    FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
    job.setMapperClass(MyMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(LongWritable.class);

    job.setReducerClass(MyReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);
    job.setOutputFormatClass(MyselfOutputFormat.class);

    job.waitForCompletion(true);
  }

  public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

    private Text word = new Text();
    private LongWritable writable = new LongWritable(1);

    @Override
    protected void map(LongWritable key, Text value,
        Mapper<LongWritable, Text, Text, LongWritable>.Context context)
        throws IOException, InterruptedException {
      if (value != null) {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
          word.set(tokenizer.nextToken());
          context.write(word, writable);
        }
      }
    }

  }

  public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

    @Override
    protected void reduce(Text key, Iterable<LongWritable> values,
        Reducer<Text, LongWritable, Text, LongWritable>.Context context)
        throws IOException, InterruptedException {
      long sum = 0;
      for (LongWritable value : values) {
        sum += value.get();
      }
      context.write(key, new LongWritable(sum));
    }
  }

  public static class MyselfOutputFormat extends OutputFormat<Text, LongWritable> {

    private FSDataOutputStream outputStream = null;

    @Override
    public RecordWriter<Text, LongWritable> getRecordWriter(
        TaskAttemptContext context) throws IOException,
        InterruptedException {
      try {
        FileSystem fileSystem = FileSystem.get(new URI(MySelfOutputFormatApp.OUTPUT_PATH), context.getConfiguration());
        //指定文件的输出路径
        final Path path = new Path(MySelfOutputFormatApp.OUTPUT_PATH
                     + MySelfOutputFormatApp.OUTPUT_FILENAME);
        this.outputStream = fileSystem.create(path, false);
      } catch (URISyntaxException e) {
        e.printStackTrace();
      }
      return new MySelfRecordWriter(outputStream);
    }

    @Override
    public void checkOutputSpecs(JobContext context) throws IOException,
        InterruptedException {
    }

    @Override
    public OutputCommitter getOutputCommitter(TaskAttemptContext context)
        throws IOException, InterruptedException {
      return new FileOutputCommitter(new Path(MySelfOutputFormatApp.OUTPUT_PATH), context);
    }

  }

  public static class MySelfRecordWriter extends RecordWriter<Text, LongWritable> {

    private FSDataOutputStream outputStream = null;

    public MySelfRecordWriter(FSDataOutputStream outputStream) {
      this.outputStream = outputStream;
    }

    @Override
    public void write(Text key, LongWritable value) throws IOException,
        InterruptedException {
      this.outputStream.writeBytes(key.toString());
      this.outputStream.writeBytes("\t");
      this.outputStream.writeLong(value.get());
    }

    @Override
    public void close(TaskAttemptContext context) throws IOException,
        InterruptedException {
      this.outputStream.close();
    }

  }

}

 2.OutputFormat是用于处理各种输出目的地的。

2.1 OutputFormat需要写出去的键值对,是来自于Reducer类,是通过RecordWriter获得的。

2.2 RecordWriter中的write(...)方法只有k和v,写到哪里去哪?这要通过单独传入OutputStream来处理。write就是把k和v写入到OutputStream中的。

2.3 RecordWriter类位于OutputFormat中的。因此,我们自定义的OutputFromat必须继承OutputFormat类型。那么,流对象必须在getRecordWriter(...)方法中获得。

以上就是java 中自定义OutputFormat的实例,如有疑问请留言或者到本站社区交流讨论,感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!

02-08 17:05