Join with Multiple Input

In join with multiple input, we will write two mappers to read data. One for employee and one for project.

Now the obvious question is how to recognise which record is come from which mapper at the reducer side. For this we need to append a flag  when we read a record in mapper. We will use ‘EF~’ and ‘PF~’ as a prefix to identify the record.

Below are the four classes used for this exercise.

  1. JoinMultipleInputDriver.java
  2. EmployeeFileMapper.java
  3. ProjectFileMapper.java
  4. JoinMultipleInputReducer.java

Lets have a look at the codes one by one

  1. JoinMultipleInputDriver.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class JoinMultipleInputDriver extends Configured implements Tool{
public static void main(String[] args) throws Exception {

int exitCode = ToolRunner.run(new Configuration() , new JoinMultipleInputDriver(), args);

System.exit(exitCode);
}

@Override
public int run(String[] args) throws Exception {

if (args.length != 3) {
System.out.printf(“Two parameters are required for Joins- <input dir for Employee file> <input dir for Project file> <output dir>\n”);
return -1;
}

Job job = new Job(getConf());
job.setJobName(” Joins example”);

job.setJarByClass(JoinMultipleInputDriver.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileInputFormat.setInputPaths(job, new Path(args[1]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));

MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, EmployeeFileMapper.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, ProjectFileMapper.class);

job.setReducerClass(JoinMultipleInputReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(0);

//job.setNumReduceTasks(1);

boolean success = job.waitForCompletion(true);
return success ? 0 : 1;

}
}

2.  EmployeeFileMapper.java

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class EmployeeFileMapper extends Mapper<LongWritable, Text, Text, Text> {

final String fileTag = “EF~”; //EF means Employee File

@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(“\t”);

System.out.println(fileTag+line);
context.write(new Text(words[4]), new Text(fileTag+line));
}
}

3. ProjectFileMapper.java

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class ProjectFileMapper extends Mapper<LongWritable, Text, Text, Text>{

final String fileTag = “PF~”; //PF means Project File
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(“\t”);

System.out.println(fileTag+line);
context.write(new Text(words[0]), new Text(fileTag+line));
}
}

4.  JoinMultipleInputReducer.java

import java.io.IOException;
import java.util.ArrayList;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class JoinMultipleInputReducer extends Reducer<Text, Text, Text, Text> {
//@Override
protected void reduce(Text key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {

String project = “”;
ArrayList<String> Employees = new ArrayList<String>();

for (Text value : values) {

String valueStr = value.toString();
String splits[] = valueStr.split(“~”);
if(splits[0].equalsIgnoreCase(“PF”)){
project = splits[1];
System.out.println(“project” +project);
}
else if(splits[0].equalsIgnoreCase(“EF”)){
Employees.add(splits[1]);
}
}

for(String employee : Employees){
System.out.println(“inside”);
context.write(key, new Text(employee + “\t” + project)); //)+”—“+ employee
}

}
}

Leave a comment