我正在尝试研究hadoop,并阅读了很多有关如何进行自然连接的知识。我有两个包含键和信息的文件,我想将其显示为(a,b,c)。

我的问题是,映射器正在为每个文件调用reducer。我原本希望收到类似(10,[R1,S10,S22])的内容(因为10键,1、10、22是具有10作为键的不同行的值,而R和S在标记中,所以我可以识别他们来自哪个表)。

问题是我的减速器收到了(10,[S10,S22]),只有在完成所有S文件后,我才得到另一个键值对,如(10,[R1])。这意味着,它针对每个文件分别按键分组并调用化简器

我不确定这种行为是否正确,是否必须以其他方式进行配置或者我做错了什么。

我也是java的新手,所以代码可能对您不利。

我避免使用TextPair数据类型,因为我自己还不能提出这个建议,我想这将是另一种有效的方式(以防万一,您想知道)。谢谢

根据WordCount示例运行hadoop 2.4.1。

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.MultipleInputs;

public class TwoWayJoin {

    public static class FirstMap extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {

        public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);

            Text a = new Text();
            Text b = new Text();

            a.set(tokenizer.nextToken());
            b.set(tokenizer.nextToken());

            output.collect(b, relation);
        }
    }

    public static class SecondMap extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {

        public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);

            Text b = new Text();
            Text c = new Text();

            b.set(tokenizer.nextToken());
            c.set(tokenizer.nextToken());

            Text relation = new Text("S"+c.toString());

            output.collect(b, relation);

        }
    }

    public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
        public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {

            ArrayList < Text > RelationS = new ArrayList < Text >() ;
            ArrayList < Text > RelationR = new ArrayList < Text >() ;

            while (values.hasNext()) {
                String relationValue = values.next().toString();
                if (relationValue.indexOf('R') >= 0){
                    RelationR.add(new Text(relationValue));
                } else {
                    RelationS.add(new Text(relationValue));
                }
            }

            for( Text r : RelationR ) {
                for (Text s : RelationS) {
                    output.collect(key, new Text(r + "," + key.toString() + "," + s));
                }
            }
        }
    }

    public static void main(String[] args) throws Exception {
        JobConf conf = new JobConf(MultipleInputs.class);
        conf.setJobName("TwoWayJoin");

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(Text.class);

        conf.setCombinerClass(Reduce.class);
        conf.setReducerClass(Reduce.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        MultipleInputs.addInputPath(conf, new Path(args[0]), TextInputFormat.class, FirstMap.class);
        MultipleInputs.addInputPath(conf, new Path(args[1]), TextInputFormat.class, SecondMap.class);

        Path output = new Path(args[2]);

        FileOutputFormat.setOutputPath(conf, output);

        FileSystem.get(conf).delete(output, true);

        JobClient.runJob(conf);

    }
}


R.txt

(a  b(key))
2   46
1   10
0   24
31  50
11  2
5   31
12  36
9   46
10  34
6   31


S.txt

(b(key)  c)
45  32
45  45
46  10
36  15
45  21
45  28
45  9
45  49
45  18
46  21
45  45
2   11
46  15
45  33
45  6
45  20
31  28
45  32
45  26
46  35
45  36
50  49
45  13
46  3
46  8
31  45
46  18
46  21
45  26
24  15
46  31
46  47
10  24
46  12
46  36


此代码的输出是成功的,但为空,因为我将Array R清空或将Array S清空。

如果我简单地一个接一个地收集它们而不进行任何处理,那么我将映射所有行。

预期输出为

key  "a,b,c"

最佳答案

问题出在组合器上。请记住,组合器在地图输出上应用reduce函数。因此,间接地是
reduce函数分别应用于R和S关系,这就是在不同的reduce调用中获得R和S关系的原因。
注释掉

conf.setCombinerClass(Reduce.class);


并尝试再次运行应该没有任何问题。顺便说一句,仅当您认为映射输出在完成排序和混洗后应用到输入中的聚合结果相同时,合并器功能才有用。

10-08 01:49