我想深入了解textFile方法,但我认为
缺乏Hadoop知识使我退缩。让我布置我的
理解,也许你可以纠正任何不正确的东西

调用sc.textFile(path)时,将使用defaultMinPartitions
这实际上只是math.min(taskScheduler.defaultParallelism, 2)。让我们
假设我们正在使用SparkDeploySchedulerBackend,这是

conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(),
2))

所以,现在让我们说默认值是2,回到textFile,这是
传递给HadoopRDD。真实大小是使用getPartitions()确定的inputFormat.getSplits(jobConf, minPartitions)。但是,据我所知,
分区只是一个提示,实际上几乎被忽略,因此您将
可能会得到块的总数。

好的,这符合预期,但是,如果不使用默认值,该怎么办?
您提供的分区大小大于块大小。如果我的
研究是正确的,并且getSplits调用只是忽略了此参数,然后
所提供的分钟数最终不会被忽略,您仍然会得到
块大小?

Cross posted with the spark mailing list

最佳答案

缩写:

分割大小由mapred.min.split.sizemapreduce.input.fileinputformat.split.minsize确定,如果大于HDFS的blockSize,则同一文件内的多个块将合并为一个分割。

详细版本:

我认为您正确理解inputFormat.getSplits之前的过程是正确的。

inputFormat.getSplits内部,更具体地说,在FileInputFormat's getSplits内部,是mapred.min.split.sizemapreduce.input.fileinputformat.split.minsize最后确定拆分大小。 (我不确定哪种方法在Spark中会有效,我更愿意相信前者)。

让我们看一下代码:FileInputFormat from Hadoop 2.4.0

long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
  FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

// generate splits
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
NetworkTopology clusterMap = new NetworkTopology();

for (FileStatus file: files) {
  Path path = file.getPath();
  long length = file.getLen();
  if (length != 0) {
    FileSystem fs = path.getFileSystem(job);
    BlockLocation[] blkLocations;
    if (file instanceof LocatedFileStatus) {
      blkLocations = ((LocatedFileStatus) file).getBlockLocations();
    } else {
      blkLocations = fs.getFileBlockLocations(file, 0, length);
    }
    if (isSplitable(fs, path)) {
      long blockSize = file.getBlockSize();
      long splitSize = computeSplitSize(goalSize, minSize, blockSize);

      long bytesRemaining = length;
      while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
        String[] splitHosts = getSplitHosts(blkLocations,
            length-bytesRemaining, splitSize, clusterMap);
        splits.add(makeSplit(path, length-bytesRemaining, splitSize,
            splitHosts));
        bytesRemaining -= splitSize;
      }

      if (bytesRemaining != 0) {
        String[] splitHosts = getSplitHosts(blkLocations, length
            - bytesRemaining, bytesRemaining, clusterMap);
        splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
            splitHosts));
      }
    } else {
      String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
      splits.add(makeSplit(path, 0, length, splitHosts));
    }
  } else {
    //Create empty hosts array for zero length files
    splits.add(makeSplit(path, 0, length, new String[0]));
  }
}

在for循环中,makeSplit()用于生成每个拆分,而splitSize是有效的拆分大小。该computeSplitSize函数生成splitSize:

protected long computeSplitSize(long goalSize, long minSize,
                                   long blockSize) {
  return Math.max(minSize, Math.min(goalSize, blockSize));
}

因此,如果minSplitSize> blockSize,则输出拆分实际上是同一HDFS文件中几个块的组合,另一方面,如果minSplitSize
09-10 22:47