我正在研究类似于MapReduce示例的示例-字数统计,但是有一点曲折,我希望仅获得前N个结果。
假设我在HDFS中有大量文本数据。有许多示例显示了如何构建Hadoop MapReduce作业,该作业将为您提供该文本中每个单词的单词计数。例如,如果我的语料库是:
标准MapReduce字数统计工作的结果集为:
但是,如果我仅想要获得整个数据集中使用的前3个单词怎么办?
我仍然可以运行完全相同的标准MapReduce字数统计工作,然后在准备就绪并吐出每个字数的计数后才取前3个结果,但这似乎效率不高,因为需要处理大量数据在随机播放阶段移动。
我在想的是,如果此样本足够大,并且数据在HDFS中很好地随机分布并且分布良好,则每个Mapper都不需要将其所有字数发送给Reducers,而是,仅将其中一些最重要的数据。因此,如果一个映射器具有以下功能:
然后,我只想将每个Mapper的前100个左右的单词发送到Reducer阶段-因为说了一切都完成之后,“rareword”突然出现在前3名中的可能性很小。这似乎可以节省带宽并减少Reducer的处理时间。
可以在组合器阶段完成此操作吗?通常在洗牌阶段之前进行这种优化吗?
最佳答案
这是一个很好的问题,因为您遇到了Hadoop的字数统计示例效率低下的问题。
优化您的问题的技巧如下:
在本地 map 阶段进行基于HashMap
的分组,您也可以为此使用组合器。看起来像这样,我使用的是Guava的HashMultiSet
,它提供了一种不错的计数机制。
public static class WordFrequencyMapper extends
Mapper<LongWritable, Text, Text, LongWritable> {
private final HashMultiset<String> wordCountSet = HashMultiset.create();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] tokens = value.toString().split("\\s+");
for (String token : tokens) {
wordCountSet.add(token);
}
}
然后在清理阶段发出结果:
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
Text key = new Text();
LongWritable value = new LongWritable();
for (Entry<String> entry : wordCountSet.entrySet()) {
key.set(entry.getElement());
value.set(entry.getCount());
context.write(key, value);
}
}
因此,您已将单词分组在本地工作区中,从而通过使用一些RAM来减少网络使用量。您也可以对
Combiner
进行相同的操作,但是它正在按组进行排序-因此,这比使用HashMultiset
慢(尤其是对于字符串!)。要仅获得前N个,您只需要将本地
HashMultiset
中的前N个写入输出收集器,并在化简方面以常规方式汇总结果。这也节省了很多网络带宽,唯一的缺点是您需要在清理方法中对单词计数元组进行排序。
代码的一部分可能看起来像这样:
Set<String> elementSet = wordCountSet.elementSet();
String[] array = elementSet.toArray(new String[elementSet.size()]);
Arrays.sort(array, new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
// sort descending
return Long.compare(wordCountSet.count(o2), wordCountSet.count(o1));
}
});
Text key = new Text();
LongWritable value = new LongWritable();
// just emit the first n records
for(int i = 0; i < N, i++){
key.set(array[i]);
value.set(wordCountSet.count(array[i]));
context.write(key, value);
}
希望您能在本地做尽可能多的单词,然后再汇总前N个中的前N个;