MR实现多表连接的原理和单表连接时一样的,甚至比单表连接还要简单。

在map阶段只需要根据文件的名称区分左表还是右表。使用关联的字段作为key2。

在reduce中对values中的值分别存储到一个左表list和右表list中。对左表list和右表list进行一个笛卡尔积完事。

 import java.io.*;
import java.util.*; import org.apache.hadoop.io.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.Tool;
public class MTjoin extends Configured implements Tool {
/*
* 多表链接,与单表链接思路类似。将关联列作为map的key值,用数字区分左表和右表。在Reduce阶段对两个表进行笛卡尔积
* */
public static class Map extends Mapper<LongWritable,Text,Text,Text>{
public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{
String line=value.toString();
int linelen=line.length();
//去除文件首行
if(line.indexOf("factoryname")==-1&&line.indexOf("addressID")==-1)
{
//处理factory数据
if(line.charAt(linelen-2)==' ')
{
String facstr="1"+line.substring(0, linelen-2);
String addrestr=String.valueOf(line.charAt(linelen-1));
context.write(new Text(addrestr), new Text(facstr));
}else{
String addreidstr=String.valueOf(line.charAt(0));
String addrenastr="2"+line.substring(1);
context.write(new Text(addreidstr), new Text(addrenastr));
} }
} } public static class Reduce extends Reducer<Text,Text,Text,Text>{
public void reduce(Text key,Iterable<Text> values,Context context)throws IOException, InterruptedException{
ArrayList<String> facarr=new ArrayList<String>();
ArrayList<String> addarr=new ArrayList<String>();
for(Text var:values){
if(var.toString().charAt(0)=='1')
{
facarr.add(var.toString().substring(1));
}else if(var.toString().charAt(0)=='2')
{
addarr.add(var.toString().substring(1));
} }
if(facarr.size()!=0&&addarr.size()!=0)
{
for(int i=0;i<facarr.size();i++)
{
context.write(new Text(facarr.get(i)), new Text(addarr.get(0)));
} }
}
}
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf=new Configuration();
Job job=new Job(conf,"MTjoin");
job.setJarByClass(MTjoin.class); job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class); job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class); job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean success=job.waitForCompletion(true);
return success?0:1;
}
public static void main(String[] args)throws Exception{
int ret=ToolRunner.run(new MTjoin(), args);
System.exit(ret);
} }
04-27 22:21