我正在研究类似于MapReduce示例的示例-字数统计,但是有一点曲折,我希望仅获得前N个结果。

假设我在HDFS中有大量文本数据。有许多示例显示了如何构建Hadoop MapReduce作业,该作业将为您提供该文本中每个单词的单词计数。例如,如果我的语料库是:



标准MapReduce字数统计工作的结果集为:



但是,如果我想要获得整个数据集中使用的前3个单词怎么办?

我仍然可以运行完全相同的标准MapReduce字数统计工作,然后在准备就绪并吐出每个字数的计数后才取前3个结果,但这似乎效率不高,因为需要处理大量数据在随机播放阶段移动。

我在想的是,如果此样本足够大,并且数据在HDFS中很好地随机分布并且分布良好,则每个Mapper都不需要将其所有字数发送给Reducers,而是,仅将其中一些最重要的数据。因此,如果一个映射器具有以下功能:



然后,我只想将每个Mapper的前100个左右的单词发送到Reducer阶段-因为说了一切都完成之后,“rareword”突然出现在前3名中的可能性很小。这似乎可以节省带宽并减少Reducer的处理时间。

可以在组合器阶段完成此操作吗?通常在洗牌阶段之前进行这种优化吗?

最佳答案

这是一个很好的问题,因为您遇到了Hadoop的字数统计示例效率低下的问题。

优化您的问题的技巧如下:

在本地 map 阶段进行基于HashMap的分组,您也可以为此使用组合器。看起来像这样,我使用的是Guava的HashMultiSet,它提供了一种不错的计数机制。

    public static class WordFrequencyMapper extends
      Mapper<LongWritable, Text, Text, LongWritable> {

    private final HashMultiset<String> wordCountSet = HashMultiset.create();

    @Override
    protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {

      String[] tokens = value.toString().split("\\s+");
      for (String token : tokens) {
        wordCountSet.add(token);
      }
    }

然后在清理阶段发出结果:
@Override
protected void cleanup(Context context) throws IOException,
    InterruptedException {
  Text key = new Text();
  LongWritable value = new LongWritable();
  for (Entry<String> entry : wordCountSet.entrySet()) {
    key.set(entry.getElement());
    value.set(entry.getCount());
    context.write(key, value);
  }
}

因此,您已将单词分组在本地工作区中,从而通过使用一些RAM来减少网络使用量。您也可以对Combiner进行相同的操作,但是它正在按组进行排序-因此,这比使用HashMultiset慢(尤其是对于字符串!)。

要仅获得前N个,您只需要将本地HashMultiset中的前N个写入输出收集器,并在化简方面以常规方式汇总结果。
这也节省了很多网络带宽,唯一的缺点是您需要在清理方法中对单词计数元组进行排序。

代码的一部分可能看起来像这样:
  Set<String> elementSet = wordCountSet.elementSet();
  String[] array = elementSet.toArray(new String[elementSet.size()]);
  Arrays.sort(array, new Comparator<String>() {

    @Override
    public int compare(String o1, String o2) {
      // sort descending
      return Long.compare(wordCountSet.count(o2), wordCountSet.count(o1));
    }

  });
  Text key = new Text();
  LongWritable value = new LongWritable();
  // just emit the first n records
  for(int i = 0; i < N, i++){
    key.set(array[i]);
    value.set(wordCountSet.count(array[i]));
    context.write(key, value);
  }

希望您能在本地做尽可能多的单词,然后再汇总前N个中的前N个;

10-06 10:42