我研究了NLineInputFormat的getSplitsForFile()fn。我发现为输入文件创建了InputStream,然后每n行创建它的迭代和拆分。
有效率吗?特别是在启动映射程序任务之前,此读取操作在1个节点上发生时。如果1有5GB文件怎么办。基本上,这意味着两次搜索文件数据,一次是在拆分创建期间,一次是在从映射器任务读取时。
如果这是一个瓶颈,那么hadoop作业如何覆盖它呢?
public static List<FileSplit> getSplitsForFile(FileStatus status,
Configuration conf, int numLinesPerSplit) throws IOException {
List<FileSplit> splits = new ArrayList<FileSplit> ();
Path fileName = status.getPath();
if (status.isDirectory()) {
throw new IOException("Not a file: " + fileName);
}
FileSystem fs = fileName.getFileSystem(conf);
LineReader lr = null;
try {
FSDataInputStream in = fs.open(fileName);
lr = new LineReader(in, conf);
Text line = new Text();
int numLines = 0;
long begin = 0;
long length = 0;
int num = -1;
<!-- my part of concern start -->
while ((num = lr.readLine(line)) > 0) {
numLines++;
length += num;
if (numLines == numLinesPerSplit) {
splits.add(createFileSplit(fileName, begin, length));
begin += length;
length = 0;
numLines = 0;
}
}
<!-- my part of concern end -->
if (numLines != 0) {
splits.add(createFileSplit(fileName, begin, length));
}
} finally {
if (lr != null) {
lr.close();
}
}
return splits;
}
编辑以将我的用例提供给clément-mathieu
我的数据集是每个大约2gb的大输入文件。文件中的每一行代表一条记录,需要将其插入数据库的表中(在我的情况下为cassandra)
我想将批量交易限制为每行n行。
我已经成功使用nlineinputformat做到了这一点。我唯一关心的是生产中是否存在隐藏的性能瓶颈。
最佳答案
是。
此InputFormat
的目的是为每条N行创建一个拆分。计算分割边界的唯一方法是读取此文件并查找换行符。此操作可能会很昂贵,但是如果您需要此操作,您将无法避免。
不知道该问题。
NLineInputFormat不是默认的InputFormat,很少有用例需要它。如果您阅读该类的javadoc,您将看到该类主要存在于将参数提供给尴尬的并行作业(=“小”输入文件)。
大多数InputFormat不需要读取文件即可计算拆分。他们通常使用硬性规则,例如每个HDFS块的分割应该为128MB或一个分割,而RecordReader会照顾实际的分割开始/结束偏移量。
如果NLineInputFormat.getSplitsForFile
的成本是一个问题,我会真正回顾为什么我需要使用此InputFormat
。您要做的是限制映射器中业务流程的批处理大小。使用NLineInputFormat
,每N行都会创建一个映射器,这意味着一个映射器将永远不会执行多个批量事务。您似乎不需要此功能,只想限制大宗交易的大小,却不在乎映射器是否依次执行多个交易。因此,您无需付出任何代价就付出发现的代码成本。
我将使用TextInputFormat
并在映射器中创建批处理。用伪代码:
setup() {
buffer = new Buffer<String>(1_000_000);
}
map(LongWritable key, Text value) {
buffer.append(value.toString())
if (buffer.isFull()) {
new Transaction(buffer).doIt()
buffer.clear()
}
}
cleanup() {
new Transaction(buffer).doIt()
buffer.clear()
}
默认情况下,每个HDFS块都会创建一个映射器。如果您认为这太多或太少,则
mapred.(max|min).split.size
变量允许增加或减少并行度。基本上,便利的
NLineInputFormat
太细粒度了,无法满足您的需求。您可以使用TextInputFormat
和使用*.split.size
进行几乎相同的操作,而无需读取文件即可创建拆分。