我有一个巨大的文本文件,我想分割该文件,以便每个块有5行。我实现了自己的GWASInputFormat和GWASRecordReader类。但是我的问题是,在下面的代码(是从http://bigdatacircus.com/2012/08/01/wordcount-with-custom-record-reader-of-textinputformat/复制的)中,在initialize()方法中,我有以下几行
FileSplit split = (FileSplit) genericSplit;
final Path file = split.getPath();
Configuration conf = context.getConfiguration();
我的问题是,在GWASRecordReader类中调用initialize()方法时,文件是否已经拆分?我以为是在GWASRecordReader类中进行的(拆分)。让我知道我的思考过程是否就在这里。
package com.test;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;
public class GWASRecordReader extends RecordReader<LongWritable, Text> {
private final int NLINESTOPROCESS = 5;
private LineReader in;
private LongWritable key;
private Text value = new Text();
private long start = 0;
private long pos = 0;
private long end = 0;
private int maxLineLength;
public void close() throws IOException {
if(in != null) {
in.close();
}
}
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return key;
}
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
public float getProgress() throws IOException, InterruptedException {
if(start == end) {
return 0.0f;
}
else {
return Math.min(1.0f, (pos - start)/(float) (end - start));
}
}
public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
final Path file = split.getPath();
Configuration conf = context.getConfiguration();
this.maxLineLength = conf.getInt("mapred.linerecordreader.maxlength",Integer.MAX_VALUE);
FileSystem fs = file.getFileSystem(conf);
start = split.getStart();
end = start + split.getLength();
System.out.println("---------------SPLIT LENGTH---------------------" + split.getLength());
boolean skipFirstLine = false;
FSDataInputStream filein = fs.open(split.getPath());
if(start != 0) {
skipFirstLine = true;
--start;
filein.seek(start);
}
in = new LineReader(filein, conf);
if(skipFirstLine) {
start += in.readLine(new Text(),0,(int)Math.min((long)Integer.MAX_VALUE, end - start));
}
this.pos = start;
}
public boolean nextKeyValue() throws IOException, InterruptedException {
if (key == null) {
key = new LongWritable();
}
key.set(pos);
if (value == null) {
value = new Text();
}
value.clear();
final Text endline = new Text("\n");
int newSize = 0;
for(int i=0; i<NLINESTOPROCESS;i++) {
Text v = new Text();
while( pos < end) {
newSize = in.readLine(v ,maxLineLength, Math.max((int)Math.min(Integer.MAX_VALUE, end - pos), maxLineLength));
value.append(v.getBytes(), 0, v.getLength());
value.append(endline.getBytes(),0,endline.getLength());
if(newSize == 0) {
break;
}
pos += newSize;
if(newSize < maxLineLength) {
break;
}
}
}
if(newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}
}
最佳答案
是的,输入文件已经被分割了。它基本上是这样的:your input file(s) -> InputSplit -> RecordReader -> Mapper...
基本上,InputSplit
将输入分成多个块,RecordReader
将这些块分成键/值对。请注意,InputSplit
和RecordReader
将由您使用的InputFormat
决定。例如,TextInputFormat
使用FileSplit
分解输入,然后使用LineRecordReader
处理以位置为键,而行本身为值的每一行。
因此,在GWASInputFormat
中,您需要查看使用哪种FileSplit
,以查看传递给GWASRecordReader
的内容。
我建议研究 NLineInputFormat
,它“将N行输入拆分为一个拆分”。它可能能够完全按照自己的意愿去做。
如果您想一次获取5行作为值,而第一行的行号作为键,我想说您可以使用自定义的NLineInputFormat
和自定义的LineRecordReader
来实现。我认为您不必担心输入拆分,因为输入格式可以将其拆分为这5个行块。您的RecordReader
与LineRecordReader
非常相似,但是您无需获取块开头的字节位置,而可以获取行号。因此,除了小小的改动外,代码几乎是相同的。因此,您基本上可以复制并粘贴NLineInputFormat
和LineRecordReader
,但随后让输入格式使用记录读取器来获取行号。该代码将非常相似。
关于java - Hadoop Map Reduce CustomSplit/CustomRecordReader,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/13346225/