我有一个巨大的文本文件,我想分割该文件,以便每个块有5行。我实现了自己的GWASInputFormat和GWASRecordReader类。但是我的问题是,在下面的代码(是从http://bigdatacircus.com/2012/08/01/wordcount-with-custom-record-reader-of-textinputformat/复制的)中,在initialize()方法中,我有以下几行

FileSplit split = (FileSplit) genericSplit;
final Path file = split.getPath();
Configuration conf = context.getConfiguration();

我的问题是,在GWASRecordReader类中调用initialize()方法时,文件是否已经拆分?我以为是在GWASRecordReader类中进行的(拆分)。让我知道我的思考过程是否就在这里。
package com.test;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;

public class GWASRecordReader extends RecordReader<LongWritable, Text> {

private final int NLINESTOPROCESS = 5;
private LineReader in;
private LongWritable key;
private Text value = new Text();
private long start = 0;
private long pos = 0;
private long end = 0;
private int maxLineLength;

public void close() throws IOException {
    if(in != null) {
        in.close();
    }
}

public LongWritable getCurrentKey() throws IOException, InterruptedException {
    return key;
}

public Text getCurrentValue() throws IOException, InterruptedException {
    return value;
}

public float getProgress() throws IOException, InterruptedException {
    if(start == end) {
        return 0.0f;
    }
    else {
        return Math.min(1.0f, (pos - start)/(float) (end - start));
    }
}

public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
    FileSplit split = (FileSplit) genericSplit;
    final Path file = split.getPath();
    Configuration conf = context.getConfiguration();
    this.maxLineLength = conf.getInt("mapred.linerecordreader.maxlength",Integer.MAX_VALUE);
    FileSystem fs = file.getFileSystem(conf);
    start = split.getStart();
    end = start + split.getLength();
    System.out.println("---------------SPLIT LENGTH---------------------" + split.getLength());
    boolean skipFirstLine = false;
    FSDataInputStream filein = fs.open(split.getPath());

    if(start != 0) {
        skipFirstLine = true;
        --start;
        filein.seek(start);
    }

    in = new LineReader(filein, conf);
    if(skipFirstLine) {
        start += in.readLine(new Text(),0,(int)Math.min((long)Integer.MAX_VALUE, end - start));
    }
    this.pos = start;
}

public boolean nextKeyValue() throws IOException, InterruptedException {
    if (key == null) {
        key = new LongWritable();
    }

    key.set(pos);

    if (value == null) {
        value = new Text();
    }
    value.clear();
    final Text endline = new Text("\n");
    int newSize = 0;
    for(int i=0; i<NLINESTOPROCESS;i++) {
        Text v = new Text();
        while( pos < end) {
            newSize = in.readLine(v ,maxLineLength, Math.max((int)Math.min(Integer.MAX_VALUE, end - pos), maxLineLength));
            value.append(v.getBytes(), 0, v.getLength());
            value.append(endline.getBytes(),0,endline.getLength());
            if(newSize == 0) {
                break;
            }
            pos += newSize;
            if(newSize < maxLineLength) {
                break;
            }
        }
    }

    if(newSize == 0) {
        key = null;
        value = null;
        return false;
    } else {
        return true;
    }
}
}

最佳答案

是的,输入文件已经被分割了。它基本上是这样的:
your input file(s) -> InputSplit -> RecordReader -> Mapper...
基本上,InputSplit将输入分成多个块,RecordReader将这些块分成键/值对。请注意,InputSplitRecordReader将由您使用的InputFormat决定。例如,TextInputFormat使用FileSplit分解输入,然后使用LineRecordReader处理以位置为键,而行本身为值的每一行。
因此,在GWASInputFormat中,您需要查看使用哪种FileSplit,以查看传递给GWASRecordReader的内容。

我建议研究 NLineInputFormat ,它“将N行输入拆分为一个拆分”。它可能能够完全按照自己的意愿去做。

如果您想一次获取5行作为值,而第一行的行号作为键,我想说您可以使用自定义的NLineInputFormat和自定义的LineRecordReader来实现。我认为您不必担心输入拆分,因为输入格式可以将其拆分为这5个行块。您的RecordReaderLineRecordReader非常相似,但是您无需获取块开头的字节位置,而可以获取行号。因此,除了小小的改动外,代码几乎是相同的。因此,您基本上可以复制并粘贴NLineInputFormatLineRecordReader,但随后让输入格式使用记录读取器来获取行号。该代码将非常相似。

关于java - Hadoop Map Reduce CustomSplit/CustomRecordReader,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/13346225/

10-12 23:48