我想深入了解textFile
方法,但我认为
缺乏Hadoop知识使我退缩。让我布置我的
理解,也许你可以纠正任何不正确的东西
调用sc.textFile(path)
时,将使用defaultMinPartitions
,
这实际上只是math.min(taskScheduler.defaultParallelism, 2)
。让我们
假设我们正在使用SparkDeploySchedulerBackend
,这是
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(),
2))
所以,现在让我们说默认值是2,回到
textFile
,这是传递给
HadoopRDD
。真实大小是使用getPartitions()
确定的inputFormat.getSplits(jobConf, minPartitions)
。但是,据我所知,分区只是一个提示,实际上几乎被忽略,因此您将
可能会得到块的总数。
好的,这符合预期,但是,如果不使用默认值,该怎么办?
您提供的分区大小大于块大小。如果我的
研究是正确的,并且getSplits调用只是忽略了此参数,然后
所提供的分钟数最终不会被忽略,您仍然会得到
块大小?
Cross posted with the spark mailing list
最佳答案
缩写:
分割大小由mapred.min.split.size
或mapreduce.input.fileinputformat.split.minsize
确定,如果大于HDFS的blockSize,则同一文件内的多个块将合并为一个分割。
详细版本:
我认为您正确理解inputFormat.getSplits
之前的过程是正确的。
在inputFormat.getSplits
内部,更具体地说,在FileInputFormat's getSplits
内部,是mapred.min.split.size
或mapreduce.input.fileinputformat.split.minsize
最后确定拆分大小。 (我不确定哪种方法在Spark中会有效,我更愿意相信前者)。
让我们看一下代码:FileInputFormat from Hadoop 2.4.0
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
// generate splits
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
NetworkTopology clusterMap = new NetworkTopology();
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
FileSystem fs = path.getFileSystem(job);
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(fs, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
String[] splitHosts = getSplitHosts(blkLocations,
length-bytesRemaining, splitSize, clusterMap);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
splitHosts));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
String[] splitHosts = getSplitHosts(blkLocations, length
- bytesRemaining, bytesRemaining, clusterMap);
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
splitHosts));
}
} else {
String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
splits.add(makeSplit(path, 0, length, splitHosts));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
在for循环中,
makeSplit()
用于生成每个拆分,而splitSize
是有效的拆分大小。该computeSplitSize函数生成splitSize
:protected long computeSplitSize(long goalSize, long minSize,
long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}
因此,如果minSplitSize> blockSize,则输出拆分实际上是同一HDFS文件中几个块的组合,另一方面,如果minSplitSize