Reduce Side Join

As name suggests, in reduce side join join happens at the reducer side.

We will read Employee data in mapper and project data in reduce side through distributed cache. Below are the three classes we will write for achieving this.

  1. JoinReduceSideDCDriver.java
  2. JoinReduceSideDCMapper.java
  3. JoinReduceSideDCReducer.java

Lets have a look at codes one by one.

  1. JoinReduceSideDCDriver.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class JoinReduceSideDCDriver extends Configured implements Tool {

public static void main(String[] args) throws Exception {

int exitCode = ToolRunner.run(new Configuration(), new JoinReduceSideDCDriver(), args);
System.exit(exitCode);

}

@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();

Job job = new Job(conf, “Map Side Join Using Distributed Cache”);
job.setJarByClass(JoinReduceSideDCDriver.class);
DistributedCache.addCacheFile(new Path(args[0]).toUri(), job.getConfiguration());

FileInputFormat.addInputPath(job, new Path(args [1]));
FileOutputFormat.setOutputPath(job, new Path(args [2]));

job.setMapperClass(JoinReduceSideDCMapper.class);
job.setReducerClass(JoinReduceSideDCReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// job.setMapOutputKeyClass(Text.class);
// job.setMapOutputValueClass(Text.class);

// job.setNumReduceTasks(0);

return (job.waitForCompletion(true) ? 0 : 1);
}

}

 2. JoinReduceSideDCMapper.java

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class JoinReduceSideDCMapper extends Mapper<LongWritable, Text, Text, Text> {

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

String line = value.toString();

String lineSplits[] = line.split(“\t”);
String lineinCacheSplits[];

context.write(new Text(lineSplits[4]), new Text(lineSplits[0] + “\t” + lineSplits[1] + “\t” + lineSplits[2] + “\t” + lineSplits[3]));

}
}

       3. JoinReduceSideDCReducer.java

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
public class JoinReduceSideDCReducer extends Reducer<Text, Text, Text, Text>{

Path[] cachefiles = new Path[0];
HashMap<String, String> projectList = new HashMap<String, String>();

@Override
protected void setup(Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();

cachefiles = DistributedCache.getLocalCacheFiles(conf);
BufferedReader reader = new BufferedReader(new FileReader(cachefiles[0].toString()));

String lineInCache,lineInCacheSplits[];

while ((lineInCache = reader.readLine()) != null) {

lineInCacheSplits = lineInCache.split(“\t”);
projectList.put(lineInCacheSplits[0], lineInCacheSplits[1] + “\t” + lineInCacheSplits[2]); // Data of lookup files get stored in list

}

}

@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String keyTest = key.toString();

for (Text value : values) {

if( projectList.containsKey(keyTest) )
{
context.write(new Text(key), new Text(value + “\t” + projectList.get(key)));
}

}

}
}

Leave a comment