c001.txt

------------------------------

filetype|commid|commname|addressid
comm|1|罗湖小区1|1
comm|2|罗湖小区2|1
comm|3|宝安小区1|4
comm|4|南山小区1|3
comm|5|南山小区2|3
comm|6|福田小区1|2
comm|7|福田小区2|2
comm|8|宝安2|4
comm|9|南山3|3

c002.txt

----------------------------

filetype|commid|commname|addressid
comm|10|罗湖小区7|1
comm|11|罗湖小区8|1
comm|12|宝安小区5|4
comm|13|南山小区6|3
comm|14|南山小区7|3
comm|15|福田小区6|2
comm|16|福田小区8|2

a001.txt

-------------------------

filetype|addressid|address
addr|1|罗湖
addr|2|福田
addr|3|南山
addr|4|宝安

输出结果:

-----------------------

commid commname addr
15福田小区6福田
16福田小区8福田
6福田小区1福田
7福田小区2福田
13南山小区6南山
14南山小区7南山
4南山小区1南山
5南山小区2南山
9南山3南山
3宝安小区1宝安
8宝安2宝安
12宝安小区5宝安

----------------------------

代码:

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.fs.Path; public class TestUnion { public static int count=0;
public static class TestUnionMapper extends Mapper<Object,Text,Text,Text>
{
public void map(Object key,Text values,Context context) throws IOException,InterruptedException
{
if(values.toString().indexOf("filetype")>=0)
{
return;
}
StringTokenizer itr=new StringTokenizer(values.toString(),"|");
String fileType="";
String fileTypeId="";
while(itr.hasMoreTokens())
{
fileType=itr.nextToken();
if(fileType.compareToIgnoreCase("addr")==0)
{
String addressId=itr.nextToken();
String addressName=itr.nextToken();
fileTypeId="2"; //标记为地址
context.write(new Text(addressId),new Text(fileTypeId+"|"+addressName));
}
else if(fileType.compareToIgnoreCase("comm")==0)
{
String commId=itr.nextToken();
String commName=itr.nextToken();
String addressId=itr.nextToken();
fileTypeId="1"; //标记为小区
context.write(new Text(addressId),new Text(fileTypeId+"|"+commId+"|"+commName));
}
}
}
}
public static class TestUnionReducer extends Reducer<Text,Text,Text,Text>
{
public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException
{
List<String> addrs=new ArrayList<String>();
List<String> comms=new ArrayList<String>();
if(count<=0)
{
count++;
context.write(new Text("commid"),new Text("commname addr"));
return;
}
else
{       
for(Text val:values)
{
String []astr=val.toString().trim().split("\\|"); // | 为特殊字符,必须转义
String fileTypeId=astr[0];
if(fileTypeId.compareToIgnoreCase("1")==0) //comm
{
String commId=astr[1];
String commName=astr[2];
comms.add(commId+" "+commName);
}
else if(fileTypeId.compareToIgnoreCase("2")==0) //addr
{
String addr=astr[1];
addrs.add(addr);
}
}
}
if(comms.size()>0 && addrs.size()>0)
{
for(int m=0;m<comms.size();m++)
for(int n=0;n<addrs.size();n++) //其实只有一条记录对应上面的
context.write(new Text(comms.get(m)),new Text(addrs.get(n)));
}
}
} public static void main(String[] args) throws Exception{
// TODO Auto-generated method stub
if(args.length!=2)
{
System.err.println("please input two agrs:<in> <out>");
System.exit(2);
}
Configuration conf=new Configuration();
Job job=new Job(conf,"union data");
job.setJarByClass(TestUnion.class);
job.setMapperClass(TestUnionMapper.class);
job.setReducerClass(TestUnionReducer.class);
//job.setNumReduceTasks(0);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
} }

主要利用了reduce函数相同的KEY值聚合在一起的规则。

05-11 11:17