多表关联
多表关联和单表关联类似,它也是通过对原始数据进行一定的处理,从其中挖掘出关心的信息。下面进入这个实例。
1 实例描述
输入是两个文件,一个代表工厂表,包含工厂名列和地址编号列;另一个代表地址表,包含地址名列和地址编号列。要求从输入数据中找出工厂名和地址名的对应关系,输出"工厂名——地址名"表。
样例输入如下所示。
1)factory:
2)address:
样例输出如下所示。
2 设计思路
多表关联和单表关联相似,都类似于数据库中的自然连接。相比单表关联,多表关联的左右表和连接列更加清楚。所以可以采用和单表关联的相同的处理方式,map识别出输入的行属于哪个表之后,对其进行分割,将连接的列值保存在key中,另一列和左右表标识保存在value中,然后输出。reduce拿到连接结果之后,解析value内容,根据标志将左右表内容分开存放,然后求笛卡尔积,最后直接输出。
这个实例的具体分析参考单表关联实例。下面给出代码。
import java.io.IOException;
import java.lang.String;
import java.util.Iterator;
import java.util.StringTokenizer; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MTJoin {
public static int time = 0; public static class Map extends Mapper<Object, Text, Text, Text> { @Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String relationType = new String();
if (line.contains("factoryname") == true
|| line.contains("addressID") == true) {
return;
} StringTokenizer itr = new StringTokenizer(line);
String mapkey = new String();
String mapvalue = new String(); String[] split = line.split(" "); if (split.length == 2 && split[1].charAt(0) >= '0'
&& split[1].charAt(0) <= '9') {
mapkey = split[1];
mapvalue = split[0];
relationType = "1";
}
if (split.length == 2 && split[0].charAt(0) >= '0'
&& split[0].charAt(0) <= '9') {
mapkey = split[0];
mapvalue = split[1];
relationType = "2";
} context.write(new Text(mapkey), new Text(relationType + "+"
+ mapvalue)); }
} public static class Reduce extends Reducer<Text, Text, Text, Text> { @Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
if (0 == time) {
context.write(new Text("factoryname"), new Text("addressname"));
time++;
} int factorynum = 0;
String[] factory = new String[10];
int addressnum = 0;
String[] address = new String[10]; for(Text value:values ){
if (0 == value.toString().length()) {
continue;
} char relationType = value.toString().charAt(0); // left
if ('1' == relationType) {
factory[factorynum] = value.toString().substring(2);
factorynum++;
}
// right
if ('2' == relationType) {
address[addressnum] = value.toString().substring(2);
addressnum++;
}
} if (0 != factorynum && 0 != addressnum) {
for (int m = 0; m < factorynum; m++) {
for (int n = 0; n < addressnum; n++) {
context.write(new Text(factory[m]),
new Text(address[n]));
}
}
}
} } public static void main(String[] args) throws Exception {
Job job = new Job();
job.setJobName("MTJoin");
job.setJarByClass(MTJoin.class); job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}