





public class MRDriver  extends Configured implements Tool {

public int run(String[] args) throws Exception {
    FileSystem fs = new Path(".").getFileSystem(getConf());
    Job job = new Job(getConf());
    job.setJobName("Enron MR");
    RawCombineFileInputFormat.addInputPath(job, new Path(args[0]));
    TextOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 :1;

public static void main(String[] args) throws Exception  {
    int exitCode = ToolRunner.run(new MRDriver(), args);


public class SingleFileRecordReader extends RecordReader<LongWritable, Text> {
  private static final Log LOG = LogFactory.getLog(SingleFileRecordReader.class);

  private long start;
  private long pos;
  private long end;
  private LineReader in;
  private int maxLineLength;
  private LongWritable key = null;
  private Text value = null;

  public void initialize(InputSplit genericSplit,
                         TaskAttemptContext context) throws IOException {
    FileSplit split = (FileSplit) genericSplit;
    Configuration job = context.getConfiguration();
    this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
    start = split.getStart();
    end = start + split.getLength();
    final Path file = split.getPath();

    // open the file and seek to the start of the split
    FileSystem fs = file.getFileSystem(job);
    FSDataInputStream fileIn = fs.open(split.getPath());

      in = new LineReader(fileIn, job);
    // If this is not the first split, we always throw away first record
    // because we always (except the last split) read one extra line in
    // next() method.
    if (start != 0) {
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    this.pos = start;

  private int maxBytesToConsume(long pos) {
    return (int) Math.min(Integer.MAX_VALUE, end - pos);

  private long getFilePosition() throws IOException {
    long retVal= pos;
    return retVal;

  public boolean nextKeyValue() throws IOException {
    if (key == null) {
      key = new LongWritable();
    if (value == null) {
      value = new Text();
    int newSize = 0;
    StringBuffer totalValue = new StringBuffer();
    // We always read one extra line, which lies outside the upper
    // split limit i.e. (end - 1)
    while (getFilePosition() <= end) {
      newSize = in.readLine(value, maxLineLength,
          Math.max(maxBytesToConsume(pos), maxLineLength));
      if (newSize == 0) {
      pos += newSize;
      if (newSize < maxLineLength) {

      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " +
               (pos - newSize));
    if (newSize == 0) {
      key = null;
      value = null;
      return false;
    } else {
        value = new Text(totalValue.toString());
      return true;

  public LongWritable getCurrentKey() {
    return key;

  public Text getCurrentValue() {
    return value;

   * Get the progress within the split
  public float getProgress() throws IOException {
    if (start == end) {
      return 0.0f;
    } else {
      return Math.min(1.0f,
        (getFilePosition() - start) / (float)(end - start));

  public synchronized void close() throws IOException {
    try {
      if (in != null) {
    } finally {



public class RawCombineFileInputFormat extends CombineFileInputFormat <LongWritable,Text>{

public RecordReader<LongWritable, Text> createRecordReader(
        InputSplit split, TaskAttemptContext context) throws IOException {
    return new CombineFileRecordReader< LongWritable, Text >((CombineFileSplit) split, context, MultiFileRecordReader.class);


public class MultiFileRecordReader extends RecordReader < LongWritable, Text > {

 private CombineFileSplit split;
 private TaskAttemptContext context;
 private int index;
 private RecordReader< LongWritable, Text > rr;

public MultiFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) {
    this.split = split;
    this.context = context;
    this.index = index;
    this.rr = new SingleFileRecordReader();
public void initialize(InputSplit split, TaskAttemptContext context)
        throws IOException, InterruptedException {
    this.split = (CombineFileSplit) split;
      this.context = context;

      if (null == rr) {
       rr = new SingleFileRecordReader();

      FileSplit fileSplit = new FileSplit(this.split.getPath(index),
      this.rr.initialize(fileSplit, this.context);


public boolean nextKeyValue() throws IOException, InterruptedException {
    // TODO Auto-generated method stub
    return this.rr.nextKeyValue();

public LongWritable getCurrentKey() throws IOException, InterruptedException {
    // TODO Auto-generated method stub
    return this.rr.getCurrentKey();

public Text getCurrentValue() throws IOException, InterruptedException {
    // TODO Auto-generated method stub
    return this.rr.getCurrentValue();

public float getProgress() throws IOException, InterruptedException {
    // TODO Auto-generated method stub
    return this.rr.getProgress();

public void close() throws IOException {
    if (rr != null) {
           rr = null;



看一下这种输入格式,这是一种在单个映射任务中读取多个文件的输入格式,传递给映射器的每条记录都会读取一个文件(未拆分)。 WholeFileRecordReader负责将一个文件内容作为一个值发送。返回的键为NullWritable,值是每个文件整体的内容。现在,您可以使用它来运行mapreduce作业,并查看实际运行了多少个mapper并检查您得到的输出是否正确。


    public class WholeFileInputFormat extends CombineFileInputFormat<NullWritable, Text>{

        protected boolean isSplitable(JobContext context, Path file) {
            return false;

   * Creates a CombineFileRecordReader to read each file assigned to this InputSplit.
   * Note, that unlike ordinary InputSplits, split must be a CombineFileSplit, and therefore
   * is expected to specify multiple files.
   * @param split The InputSplit to read.  Throws an IllegalArgumentException if this is
   *        not a CombineFileSplit.
   * @param context The context for this task.
   * @return a CombineFileRecordReader to process each file in split.
   *         It will read each file with a WholeFileRecordReader.
   * @throws IOException if there is an error.

    public RecordReader<NullWritable, Text> createRecordReader(
            InputSplit split, TaskAttemptContext context) throws IOException {

        if (!(split instanceof CombineFileSplit)) {
              throw new IllegalArgumentException("split must be a CombineFileSplit");
            return new CombineFileRecordReader<NullWritable, Text>((CombineFileSplit) split, context, WholeFileRecordReader.class);


public class WholeFileRecordReader extends RecordReader<NullWritable, Text> {
    private static final Logger LOG = Logger.getLogger(WholeFileRecordReader.class);

      /** The path to the file to read. */
      private final Path mFileToRead;
      /** The length of this file. */
      private final long mFileLength;

      /** The Configuration. */
      private final Configuration mConf;

      /** Whether this FileSplit has been processed. */
      private boolean mProcessed;
      /** Single Text to store the file name of the current file. */
    //  private final Text mFileName;
      /** Single Text to store the value of this file (the value) when it is read. */
      private final Text mFileText;

       * Implementation detail: This constructor is built to be called via
       * reflection from within CombineFileRecordReader.
       * @param fileSplit The CombineFileSplit that this will read from.
       * @param context The context for this task.
       * @param pathToProcess The path index from the CombineFileSplit to process in this record.
      public WholeFileRecordReader(CombineFileSplit fileSplit, TaskAttemptContext context,
          Integer pathToProcess) {
        mProcessed = false;
        mFileToRead = fileSplit.getPath(pathToProcess);
        mFileLength = fileSplit.getLength(pathToProcess);
        mConf = context.getConfiguration();

        assert 0 == fileSplit.getOffset(pathToProcess);
        if (LOG.isDebugEnabled()) {
          LOG.debug("FileToRead is: " + mFileToRead.toString());
          LOG.debug("Processing path " + pathToProcess + " out of " + fileSplit.getNumPaths());

          try {
            FileSystem fs = FileSystem.get(mConf);
            assert fs.getFileStatus(mFileToRead).getLen() == mFileLength;
          } catch (IOException ioe) {
            // oh well, I was just testing.

    //    mFileName = new Text();
        mFileText = new Text();

      /** {@inheritDoc} */
      public void close() throws IOException {

       * Returns the absolute path to the current file.
       * @return The absolute path to the current file.
       * @throws IOException never.
       * @throws InterruptedException never.
      public NullWritable getCurrentKey() throws IOException, InterruptedException {
        return NullWritable.get();

       * <p>Returns the current value.  If the file has been read with a call to NextKeyValue(),
       * this returns the contents of the file as a BytesWritable.  Otherwise, it returns an
       * empty BytesWritable.</p>
       * <p>Throws an IllegalStateException if initialize() is not called first.</p>
       * @return A BytesWritable containing the contents of the file to read.
       * @throws IOException never.
       * @throws InterruptedException never.
      public Text getCurrentValue() throws IOException, InterruptedException {
        return mFileText;

       * Returns whether the file has been processed or not.  Since only one record
       * will be generated for a file, progress will be 0.0 if it has not been processed,
       * and 1.0 if it has.
       * @return 0.0 if the file has not been processed.  1.0 if it has.
       * @throws IOException never.
       * @throws InterruptedException never.
      public float getProgress() throws IOException, InterruptedException {
        return (mProcessed) ? (float) 1.0 : (float) 0.0;

       * All of the internal state is already set on instantiation.  This is a no-op.
       * @param split The InputSplit to read.  Unused.
       * @param context The context for this task.  Unused.
       * @throws IOException never.
       * @throws InterruptedException never.
      public void initialize(InputSplit split, TaskAttemptContext context)
          throws IOException, InterruptedException {
        // no-op.

       * <p>If the file has not already been read, this reads it into memory, so that a call
       * to getCurrentValue() will return the entire contents of this file as Text,
       * and getCurrentKey() will return the qualified path to this file as Text.  Then, returns
       * true.  If it has already been read, then returns false without updating any internal state.</p>
       * @return Whether the file was read or not.
       * @throws IOException if there is an error reading the file.
       * @throws InterruptedException if there is an error.
      public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!mProcessed) {
          if (mFileLength > (long) Integer.MAX_VALUE) {
            throw new IOException("File is longer than Integer.MAX_VALUE.");
          byte[] contents = new byte[(int) mFileLength];

          FileSystem fs = mFileToRead.getFileSystem(mConf);
          FSDataInputStream in = null;
          try {
            // Set the contents of this file.
            in = fs.open(mFileToRead);
            IOUtils.readFully(in, contents, 0, contents.length);
            mFileText.set(contents, 0, contents.length);

          } finally {
          mProcessed = true;
          return true;
        return false;


public int run(String[] arg) throws Exception {
    Configuration conf=getConf();
    FileSystem fs = FileSystem.get(conf);
    //estimate reducers
    Job job = new Job(conf);

    FileInputFormat.addInputPath(job, new Path(arg[0]));
    Path output=new Path(arg[1]);
    try {
        fs.delete(output, true);
    } catch (IOException e) {
        LOG.warn("Failed to delete temporary path", e);
    FileOutputFormat.setOutputPath(job, output);

    boolean ret=job.waitForCompletion(true);
        throw new Exception("Job Failed");

09-30 14:31