1. 简单介绍
reduce side join是全部join中用时最长的一种join,可是这样的方法可以适用内连接、left外连接、right外连接、full外连接和反连接等全部的join方式。reduce side
假设要做join的数据量很大的话。就不得不用reduce join了。
2. 适用场景
3.Reduce side join的架构
3.1 map 阶段
map 阶段首先从数据中提取出join的foreign key作为map输出的key,然后将输入的记录所有作为输出value。输出的value须要依据输入的数据集打上数据集的标签,比方在value的开头加上‘A’‘B’的标签。
3.2 reduce阶段
reduce端对具有相同foreign key的数据进行处理,对具有标签'A'和'B'的数据进行迭代处理,下面分别用伪代码对不同的join的处理进行说明。
if (!listA.isEmpty() && !listB.isEmpty()) {
for (Text A : listA) {
for (Text B : listB) {
context.write(A, B);
// For each entry in A,
for (Text A : listA) {
// If list B is not empty, join A and B
if (!listB.isEmpty()) {
for (Text B : listB) {
context.write(A, B);
} else {
// Else, output A by itself
context.write(A, EMPTY_TEXT);
// For each entry in B,
for (Text B : listB) {
// If list A is not empty, join A and B
if (!listA.isEmpty()) {
for (Text A : listA) {
context.write(A, B);
} else {
// Else, output B by itself
context.write(EMPTY_TEXT, B);
// If list A is not empty
if (!listA.isEmpty()) {
// For each entry in A
for (Text A : listA) {
// If list B is not empty, join A with B
if (!listB.isEmpty()) {
for (Text B : listB) {
context.write(A, B);
} else {
// Else, output A by itself
context.write(A, EMPTY_TEXT);
} else {
// If list A is empty, just output B
for (Text B : listB) {
context.write(EMPTY_TEXT, B);
-反连接:输出A和B没有共同foreign key的值
// If list A is empty and B is empty or vice versa
if (listA.isEmpty() ^ listB.isEmpty()) {
// Iterate both A and B with null values
// The previous XOR check will make sure exactly one of
// these lists is empty and therefore the list will be skipped
for (Text A : listA) {
context.write(A, EMPTY_TEXT);
for (Text B : listB) {
context.write(EMPTY_TEXT, B);
以下举一个简单的样例,要求可以用reduce side join方式实现以上全部的join。
User 表
username cityid
Li lei, 1
Xiao hong, 2
Lily, 3
Lucy, 3
Daive, 4
Jake, 5
Xiao Ming, 6
cityid cityname
1, Shanghai
2, Beijing
3, Jinan
4, Guangzhou
7, Wuhan
8, Shenzhen
4.2 代码介绍
package com.study.hadoop.mapreduce; import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ReduceJoin { //user map
public static class UserJoinMapper extends Mapper<Object, Text, Text, Text>{
private Text outKey = new Text();
private Text outValue = new Text();
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String line = value.toString();
String[] items = line.split(","); outKey.set(items[1]);
context.write(outKey, outValue);
//city map
public static class CityJoinMapper extends Mapper<Object, Text, Text, Text>{
// TODO Auto-generated constructor stub
private Text outKey = new Text();
private Text outValue = new Text();
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String line = value.toString();
String[] items = line.split(","); outKey.set(items[0]);
context.write(outKey, outValue);
} }
public static class JoinReducer extends Reducer<Text, Text, Text, Text>{
// TODO Auto-generated constructor stub
//Join type:{inner,leftOuter,rightOuter,fullOuter,anti}
private String joinType = null;
private static final Text EMPTY_VALUE = new Text("");
private List<Text> listA = new ArrayList<Text>();
private List<Text> listB = new ArrayList<Text>();
protected void setup(Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
joinType = context.getConfiguration().get("join.type");
} @Override
protected void reduce(Text key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
listB.clear(); Iterator<Text> iterator = values.iterator();
String value = iterator.next().toString();
listA.add(new Text(value.substring(1)));
listB.add(new Text(value.substring(1)));
} private void joinAndWrite(Context context)
throws IOException, InterruptedException{
//inner join
if(!listA.isEmpty() && !listB.isEmpty()) {
for (Text A : listA)
for(Text B : listB){
context.write(A, B);
//left outer join
for (Text A : listA){
for(Text B: listB){
context.write(A, B);
context.write(A, EMPTY_VALUE);
//right outer join
else if(joinType.equalsIgnoreCase("rightouter")){
for(Text B: listB){
for(Text A: listA)
context.write(A, B);
}else {
context.write(EMPTY_VALUE, B);
//full outer join
else if(joinType.equalsIgnoreCase("fullouter")){
for (Text A : listA){
for(Text B : listB){
context.write(A, B);
}else {
context.write(A, EMPTY_VALUE);
for(Text B : listB)
context.write(EMPTY_VALUE, B);
//anti join
else if(joinType.equalsIgnoreCase("anti")){
if(listA.isEmpty() ^ listB.isEmpty()){
for(Text A : listA)
context.write(A, EMPTY_VALUE);
for(Text B : listB)
context.write(EMPTY_VALUE, B);
} public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 4)
System.err.println("params:<UserInDir> <CityInDir> <OutDir> <join Type>");
Job job = new Job(conf,"Reduce side join Job");
MultipleInputs.addInputPath(job, new Path(otherArgs[0]), TextInputFormat.class, UserJoinMapper.class);
MultipleInputs.addInputPath(job, new Path(otherArgs[1]), TextInputFormat.class, CityJoinMapper.class);
FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
job.getConfiguration().set("join.type", otherArgs[3]); System.exit(job.waitForCompletion(true) ? 0 : 1);
} }
4.3 结果
inner join:
left outer join:
right outer join:
full outer join:
anti join:
