我研究了NLineInputFormat的getSplitsForFile()fn。我发现为输入文件创建了InputStream,然后每n行创建它的迭代和拆分。
有效率吗?特别是在启动映射程序任务之前,此读取操作在1个节点上发生时。如果1有5GB文件怎么办。基本上,这意味着两次搜索文件数据,一次是在拆分创建期间,一次是在从映射器任务读取时。
如果这是一个瓶颈,那么hadoop作业如何覆盖它呢?

 public static List<FileSplit> getSplitsForFile(FileStatus status,
          Configuration conf, int numLinesPerSplit) throws IOException {
        List<FileSplit> splits = new ArrayList<FileSplit> ();
        Path fileName = status.getPath();
        if (status.isDirectory()) {
          throw new IOException("Not a file: " + fileName);
        }
        FileSystem  fs = fileName.getFileSystem(conf);
        LineReader lr = null;
        try {
          FSDataInputStream in  = fs.open(fileName);
          lr = new LineReader(in, conf);
          Text line = new Text();
          int numLines = 0;
          long begin = 0;
          long length = 0;
          int num = -1;
<!-- my part of concern start -->
          while ((num = lr.readLine(line)) > 0) {
            numLines++;
            length += num;
            if (numLines == numLinesPerSplit) {
              splits.add(createFileSplit(fileName, begin, length));
              begin += length;
              length = 0;
              numLines = 0;
            }
          }
<!-- my part of concern end -->
          if (numLines != 0) {
            splits.add(createFileSplit(fileName, begin, length));
          }
        } finally {
          if (lr != null) {
            lr.close();
          }
        }
        return splits;
      }

编辑以将我的用例提供给clément-mathieu

我的数据集是每个大约2gb的大输入文件。文件中的每一行代表一条记录,需要将其插入数据库的表中(在我的情况下为cassandra)
我想将批量交易限制为每行n行。
我已经成功使用nlineinputformat做到了这一点。我唯一关心的是生产中是否存在隐藏的性能瓶颈。

最佳答案



是。

InputFormat的目的是为每条N行创建一个拆分。计算分割边界的唯一方法是读取此文件并查找换行符。此操作可能会很昂贵,但是如果您需要此操作,您将无法避免。



不知道该问题。

NLineInputFormat不是默认的InputFormat,很少有用例需要它。如果您阅读该类的javadoc,您将看到该类主要存在于将参数提供给尴尬的并行作业(=“小”输入文件)。

大多数InputFormat不需要读取文件即可计算拆分。他们通常使用硬性规则,例如每个HDFS块的分割应该为128MB或一个分割,而RecordReader会照顾实际的分割开始/结束偏移量。

如果NLineInputFormat.getSplitsForFile的成本是一个问题,我会真正回顾为什么我需要使用此InputFormat。您要做的是限制映射器中业务流程的批处理大小。使用NLineInputFormat,每N行都会创建一个映射器,这意味着一个映射器将永远不会执行多个批量事务。您似乎不需要此功能,只想限制大宗交易的大小,却不在乎映射器是否依次执行多个交易。因此,您无需付出任何代价就付出发现的代码成本。

我将使用TextInputFormat并在映射器中创建批处理。用伪代码:

setup() {
  buffer = new Buffer<String>(1_000_000);
}

map(LongWritable key, Text value) {
  buffer.append(value.toString())
  if (buffer.isFull()) {
    new Transaction(buffer).doIt()
    buffer.clear()
  }
}

cleanup() {
  new Transaction(buffer).doIt()
  buffer.clear()
}

默认情况下,每个HDFS块都会创建一个映射器。如果您认为这太多或太少,则mapred.(max|min).split.size变量允许增加或减少并行度。

基本上,便利的NLineInputFormat太细粒度了,无法满足您的需求。您可以使用TextInputFormat和使用*.split.size进行几乎相同的操作,而无需读取文件即可创建拆分。

07-26 04:24