我是Hadoop初学者。我遇到了this定制RecordReader程序,该程序一次读取3行,并输出将3行输入提供给映射器的次数。

我能够理解为什么使用RecordReader,但是当输入格式类本质上是扩展mapreduce.TextInputFormat类时,我看不到每个InputSplit如何包含3行。
根据我的理解,TextInputFormat类为每行(对于每个\ n)发出1 InputSplit。

那么RecordReader如何从每个InputSplit读取3行?请有人解释这是怎么可能的。
提前致谢!

最佳答案

您需要了解TextInputFormat的实现才能找到答案。

让我们深入研究代码。我将谈论新的mapreduce API,但“旧的” mapred API非常相似。

如您所说,从用户的 Angular 来看,TextInputFormat根据一些换行符将拆分拆分为记录。让我们check the implementation

您可以看到该类几乎为空。关键功能是createRecord,由InputFormat定义

@Override
public RecordReader<LongWritable, Text> createRecordReader(
        InputSplit split,
        TaskAttemptContext context
) {
   return new LineRecordReader();
}

通常的约定是使用InputFormat获取RecordReader。如果您查看MapperMapContextImpl,您将看到映射器仅使用RecordReader来获取下一个键和值。他什么都不知道。

映射器:
public void run(Context context) throws IOException, InterruptedException {
  setup(context);
  while (context.nextKeyValue()) {
    map(context.getCurrentKey(), context.getCurrentValue(), context);
  }
  cleanup(context);

}

MapContextImpl:
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
  return reader.nextKeyValue();
}

现在,请仔细阅读您提供的此链接。您将看到:
  • NLinesInputFormat扩展了TextInputFormat,并且仅覆盖createRecordReader。基本上,使用LineReader可以提供自己的RecordReader。您想扩展TextInputFormat而不是扩展层次结构中更高的另一个类,因为它已经处理了在此级别完成的所有工作,并且您可能需要(压缩,不可拆分格式等)
  • NLinesRecordReader做真正的工作。在initialize中,它执行所需的操作以从提供的InputStream的正确偏移处获取InputSplit。它还创建了一个LineReader,与TextInputFormat一起使用
  • nextKeyValue方法中,您将看到LineReader.readLine()被调用了三次以得到三行(加上一些逻辑来正确处理一些极端情况,例如太大的记录,行尾,拆分尾)

  • 希望对您有帮助。关键是要了解API的总体设计以及每个部分之间如何交互。

    关于java - mapreduce.TextInputFormat hadoop,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/25340373/

    10-10 13:33