Common MapReduce Patterns
The design patterns are the solution templates for solving specific problems. Developers can reuse templates for similar problems across domains so that they save time in solving problems. If you are a programmer, you would have used the abstract factory pattern, builder pattern, observer pattern, and so on before. These patterns are discovered by people who have been solving similar problems for many years. The MapReduce framework has existed for almost a decade now. Let's look into a few of the commonly used MapReduce design patterns across industries.
Summarization patterns
Summarization problems use the pattern widely across domains. It's all about grouping similar data together and then performing an operation such as calculating a minimum, maximum, count, average, median-standard deviation, building an index, or just simply counting based on key. For example, we might want to calculate the total amount of money our website has made by country.
As another example, let's say you want to get the average number of times the users log on to our website. One more example can be finding the minimum and maximum number of users by state. MapReduce works with key-value pair. Thus, operations on keys are commonly used operations. The mapper emits the key-value pairs and the values of these keys are aggregated on the reducer. The following are a few commonly used examples of the summarization pattern.
Word count example
Many people who start to learn MapReduce development would have written Word count as their first program. Thus, it is sometimes known as the Hello World program of MapReduce. The basic idea of this program is to show how the MapReduce framework works. The pattern of Word count can be applied to use cases such as counting population by state, counting the total number of crime by state, finding total spending per person, and so on. Let's briefly discuss the word count program with a Mapper, Reducer, and combiner example.
Mapper
The job of a Mapper is to split the record, get each word from record, and emit a value of one with a word. The output key and the output value are of type Text and IntWritable, as shown in the following code:
import org.apache.Hadoop.io.IntWritable;
import org.apache.Hadoop.io.LongWritable;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
public static final IntWritable ONE = new IntWritable(1);
@Override
protected void map(LongWritable offset, Text line, Context context)
throws IOException, InterruptedException {
String[] result = line.toString().split(" ");
for (String word : result) {
context.write(new Text(word), ONE);
}
}
}
Reducer
The MapReduce framework uses partitions to make sure that all the records with the same key always go to the same reducer. The reducer receives the list of values for the key and thus can easily perform aggregated operations such as count and sum, as follows:
import org.apache.Hadoop.io.IntWritable;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int count = 0;
for (IntWritable current : values) {
count += current.get();
}
context.write(key, new IntWritable(count));
}
}
Combiner
The combiner would be the same as the Reducer in most of the cases and it can be added to the Driver class with the same class as that of the reducer. The advantage of the combiner is that it works as a mini reducer and runs on the same machine as the mapper, thus reducing the amount of data shuffling. The Driver class of the word count application is as follows:
import org.apache.Hadoop.conf.Configuration;
import org.apache.Hadoop.conf.Configured;
import org.apache.Hadoop.fs.Path;
import org.apache.Hadoop.io.IntWritable;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Job;
import org.apache.Hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.Hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.Hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.Hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.Hadoop.util.Tool;
import org.apache.Hadoop.util.ToolRunner;
public class Driver extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), (Tool) new Driver(), args);
System.exit(res);
}
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "WordCount");
job.setJarByClass(Driver.class);
if (args.length < 2) {
System.out.println("Jar requires 2 paramaters : \""
+ job.getJar()
+ " input_path output_path");
return 1;
}
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
job.setCombinerClass(WordcountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
Path filePath = new Path(args[0]);
FileInputFormat.setInputPaths(job, filePath);
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
job.waitForCompletion(true);
return 0;
}
}
Minimum and maximum
The minimum and maximum calculation for a specific field is a commonly used use case in MapReduce. Once the mapper completes its operation, the reducer simply iterates through all the key values and finds out the minimum and maximum in the key grouping:
- Writables: The idea behind writing custom writable was to save extra effort in splitting data at the reducer side and avoiding unnecessary problems that can occur from the delimiter. Most of the time, we choose the delimiter that is already present in the record and then it leads to the incorrect mapping of records with the field.
We will use the following import packages:
import org.apache.Hadoop.io.IntWritable;
import org.apache.Hadoop.io.LongWritable;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
The custom Writable class encapsulates the details inside the Writable object, which can be used at the reducer side to fetch values for the records:
public class PlayerDetail implements Writable {
private Text playerName;
private IntWritable score;
private Text opposition;
private LongWritable timestamps;
private IntWritable ballsTaken;
private IntWritable fours;
private IntWritable six;
public void readFields(DataInput dataInput) throws IOException {
playerName.readFields(dataInput);
score.readFields(dataInput);
opposition.readFields(dataInput);
timestamps.readFields(dataInput);
ballsTaken.readFields(dataInput);
fours.readFields(dataInput);
six.readFields(dataInput);
}
public void write(DataOutput dataOutput) throws IOException {
playerName.write(dataOutput);
score.write(dataOutput);
opposition.write(dataOutput);
timestamps.write(dataOutput);
ballsTaken.write(dataOutput);
fours.write(dataOutput);
playerName.write(dataOutput);
}
public Text getPlayerName() {
return playerName;
}
public void setPlayerName(Text playerName) {
this.playerName = playerName;
}
public IntWritable getScore() {
return score;
}
public void setScore(IntWritable score) {
this.score = score;
}
public Text getOpposition() {
return opposition;
}
public void setOpposition(Text opposition) {
this.opposition = opposition;
}
public LongWritable getTimestamps() {
return timestamps;
}
public void setTimestamps(LongWritable timestamps) {
this.timestamps = timestamps;
}
public IntWritable getBallsTaken() {
return ballsTaken;
}
public void setBallsTaken(IntWritable ballsTaken) {
this.ballsTaken = ballsTaken;
}
public IntWritable getFours() {
return fours;
}
public void setFours(IntWritable fours) {
this.fours = fours;
}
public IntWritable getSix() {
return six;
}
public void setSix(IntWritable six) {
this.six = six;
}
@Override
public String toString() {
return playerName +
"\t" + score +
"\t" + opposition +
"\t" + timestamps +
"\t" + ballsTaken +
"\t" + fours +
"\t" + six;
}
}
We will import the following packages and implement the custom Writable class:
importorg.apache.Hadoop.io.IntWritable;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class PlayerReport implements Writable {
private Text playerName;
private IntWritable maxScore;
private Text maxScoreopposition;
private IntWritable minScore;
private Text minScoreopposition;
public void write(DataOutput dataOutput) throws IOException {
playerName.write(dataOutput);
maxScore.write(dataOutput);
maxScoreopposition.write(dataOutput);
minScore.write(dataOutput);
minScoreopposition.write(dataOutput);
}
public void readFields(DataInput dataInput) throws IOException {
playerName.readFields(dataInput);
maxScore.readFields(dataInput);
maxScoreopposition.readFields(dataInput);
minScore.readFields(dataInput);
minScoreopposition.readFields(dataInput);
}
public Text getPlayerName() {
return playerName;
}
public void setPlayerName(Text playerName) {
this.playerName = playerName;
}
public IntWritable getMaxScore() {
return maxScore;
}
public void setMaxScore(IntWritable maxScore) {
this.maxScore = maxScore;
}
public Text getMaxScoreopposition() {
return maxScoreopposition;
}
public void setMaxScoreopposition(Text maxScoreopposition) {
this.maxScoreopposition = maxScoreopposition;
}
public IntWritable getMinScore() {
return minScore;
}
public void setMinScore(IntWritable minScore) {
this.minScore = minScore;
}
public Text getMinScoreopposition() {
return minScoreopposition;
}
public void setMinScoreopposition(Text minScoreopposition) {
this.minScoreopposition = minScoreopposition;
}
@Override
public String toString() {
return playerName +
"\t" + maxScore +
"\t" + maxScoreopposition +
"\t" + minScore +
"\t" + minScoreopposition;
}
}
- Mapper class: The Mapperclass in the MinMax algorithm maps the record with the custom writable object and emits the record for each player using the player name as key and PlayerDetail as value, as follows:
import org.apache.Hadoop.io.IntWritable;
import org.apache.Hadoop.io.LongWritable;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MinMaxMapper extends
Mapper<LongWritable, Text, Text, PlayerDetail> {
private PlayerDetail playerDetail = new PlayerDetail();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] player = value.toString().split(",");
playerDetail.setPlayerName(new Text(player[0]));
playerDetail.setScore(new IntWritable(Integer.parseInt(player[1])));
playerDetail.setOpposition(new Text(player[2]));
playerDetail.setTimestamps(new LongWritable(Long.parseLong(player[3])));
playerDetail.setBallsTaken(new IntWritable(Integer.parseInt(player[4])));
playerDetail.setFours(new IntWritable(Integer.parseInt(player[5])));
playerDetail.setSix(new IntWritable(Integer.parseInt(player[6])));
context.write(playerDetail.getPlayerName(), playerDetail);
}
}
- Reducer class: The Reduceris responsible for calculating the minimum and maximum scores of each individual by iterating through the list of records of players and emit the record using the PlayerReport writable object, as follows:
import org.apache.Hadoop.io.IntWritable;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MinMaxReducer extends Reducer<Text, PlayerDetail, Text, PlayerReport> {
PlayerReport playerReport = new PlayerReport();
@Override
protected void reduce(Text key, Iterable<PlayerDetail> values, Context context) throws IOException, InterruptedException {
playerReport.setPlayerName(key);
playerReport.setMaxScore(new IntWritable(0));
playerReport.setMinScore(new IntWritable(0));
for (PlayerDetail playerDetail : values) {
int score = playerDetail.getScore().get();
if (score > playerReport.getMaxScore().get()) {
playerReport.setMaxScore(new IntWritable(score));
playerReport.setMaxScoreopposition(playerDetail.getOpposition());
}
if (score < playerReport.getMaxScore().get()) {
playerReport.setMinScore(new IntWritable(score));
playerReport.setMinScoreopposition(playerDetail.getOpposition());
}
context.write(key, playerReport);
}
}
}
- Driver class: The Driverclass provides the basic configuration to run MapReduce applications and defines the protocol that cannot be violated by the MapReduce framework. For example, the Driver class mentions the output key class as IntWritable and the value as text, but the reducer tries to emit the key as text and the value as IntWritable. Due to this, the job will fail and an error will be thrown, as follows:
import org.apache.Hadoop.conf.Configuration;
import org.apache.Hadoop.fs.Path;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Job;
import org.apache.Hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.Hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.Hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.Hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.Hadoop.util.Tool;
import org.apache.Hadoop.util.ToolRunner;
public class MinMaxDriver {
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), (Tool) new MinMaxDriver(), args);
System.exit(res);
}
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "MinMax");
job.setJarByClass(MinMaxDriver.class);
if (args.length < 2) {
System.out.println("Jar requires 2 paramaters : \""
+ job.getJar()
+ " input_path output_path");
return 1;
}
job.setMapperClass(MinMaxMapper.class);
job.setReducerClass(MinMaxReducer.class);
job.setCombinerClass(MinMaxReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(PlayerReport.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
Path filePath = new Path(args[0]);
FileInputFormat.setInputPaths(job, filePath);
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
job.waitForCompletion(true);
return 0;
}
}
Filtering patterns
The filtering pattern is simply filtering out records based on a particular condition. Data cleansing is one of the commonly used examples of a filtering pattern. The raw data may have records in which a few fields are not present or it's just junk that we cannot use in further analysis. Filtering logic can be used to validate each record and remove any junk records. The other example could be web article filtering based on particular word/regex matches. These web articles can be further used in classification, tagging, or machine learning use cases. The other use case could be filtering out all the customers who do not buy anything that is more than 500 dollars in value and then process it further for any other analysis. Let's look at the following regex filtering example:
import org.apache.Hadoop.io.NullWritable;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Mapper;
import java.io.IOException;
public class RegexFilteringMapper extends Mapper<Object, Text, NullWritable, Text> {
private String regexPattern = "/* REGEX PATTERN HERE */";
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
if (value.toString().matches(regexPattern)) {
context.write(NullWritable.get(), value);
}
}
}
The other example could be random sampling of data, which is required in many use cases such as data for testing applications, training machine learning models, and so on. The other common use case is to find out top-k records based on a specific condition. In most organizations, it is important to find out the outliers/customers who are genuinely loyal to the merchant and offer them good rewards or to find out about customers who have not used the application for a long time and offer them a good discount to get them to re-engage. Let's look into how we can find out about the top-k records using MapReduce based on a particular condition.
Top-k MapReduce implementation
The top-k reduce algorithm is a popular algorithm in MapReduce. The mappers are responsible for emitting top-k records at its level and then reducer filters out top-k records from all the records it received from the mapper. We will be using an example of player score. The objective is to find out top-k players with the lowest score. Let's look onto the mapper implementation. We are assuming that each player has a unique score, otherwise the logic will require a little change, and we need to keep a list of players' details in values and emit only 10 records from the cleanup method.
The code for TopKMapper can be seen as follows:
import org.apache.Hadoop.io.IntWritable;
import org.apache.Hadoop.io.LongWritable;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
public class TopKMapper extends
Mapper<LongWritable, Text, IntWritable, PlayerDetail> {
private int K = 10;
private TreeMap<Integer, PlayerDetail> topKPlayerWithLessScore = new TreeMap<Integer, PlayerDetail>();
private PlayerDetail playerDetail = new PlayerDetail();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] player = value.toString().split(",");
playerDetail.setPlayerName(new Text(player[0]));
playerDetail.setScore(new IntWritable(Integer.parseInt(player[1])));
playerDetail.setOpposition(new Text(player[2]));
playerDetail.setTimestamps(new LongWritable(Long.parseLong(player[3])));
playerDetail.setBallsTaken(new IntWritable(Integer.parseInt(player[4])));
playerDetail.setFours(new IntWritable(Integer.parseInt(player[5])));
playerDetail.setSix(new IntWritable(Integer.parseInt(player[6])));
topKPlayerWithLessScore.put(playerDetail.getScore().get(), playerDetail);
if (topKPlayerWithLessScore.size() > K) {
topKPlayerWithLessScore.remove(topKPlayerWithLessScore.lastKey());
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
for (Map.Entry<Integer, PlayerDetail> playerDetailEntry : topKPlayerWithLessScore.entrySet()) {
context.write(new IntWritable(playerDetailEntry.getKey()), playerDetail);
}
}
}
The TopKReducer has the same logic as that of the reducer and we are assuming that scores are unique for players. We can also have logic for duplicate player scores and emit records for the same. The code for TopKReducer can be seen as follows:
import org.apache.Hadoop.io.IntWritable;
import org.apache.Hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
public class TopKReducer extends Reducer<IntWritable, PlayerDetail, IntWritable, PlayerDetail> {
private int K = 10;
private TreeMap<Integer, PlayerDetail> topKPlayerWithLessScore = new TreeMap<Integer, PlayerDetail>();
private PlayerDetail playerDetail = new PlayerDetail();
@Override
protected void reduce(IntWritable key, Iterable<PlayerDetail> values, Context context) throws IOException, InterruptedException {
for (PlayerDetail playerDetail : values) {
topKPlayerWithLessScore.put(key.get(), playerDetail);
if (topKPlayerWithLessScore.size() > K) {
topKPlayerWithLessScore.remove(topKPlayerWithLessScore.lastKey());
}
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
for (Map.Entry<Integer, PlayerDetail> playerDetailEntry : topKPlayerWithLessScore.entrySet()) {
context.write(new IntWritable(playerDetailEntry.getKey()), playerDetail);
}
}
}
The Driver class has a configuration of job.setNumReduceTasks(1), which means that only one reducer will be running to find out the top-k records, otherwise, in case of multiple reducers, we will have multiple top-k files. The code for TopKDriver can be seen as follows:
import org.apache.Hadoop.conf.Configuration;
import org.apache.Hadoop.fs.Path;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Job;
import org.apache.Hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.Hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.Hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.Hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.Hadoop.util.Tool;
import org.apache.Hadoop.util.ToolRunner;
public class TopKDriver {
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), (Tool) new TopKDriver(), args);
System.exit(res);
}
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "TopK");
job.setNumReduceTasks(1);
job.setJarByClass(TopKDriver.class);
if (args.length < 2) {
System.out.println("Jar requires 2 paramaters : \""
+ job.getJar()
+ " input_path output_path");
return 1;
}
job.setMapperClass(TopKMapper.class);
job.setReducerClass(TopKReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(PlayerDetail.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
Path filePath = new Path(args[0]);
FileInputFormat.setInputPaths(job, filePath);
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
job.waitForCompletion(true);
return 0;
}
}
Join pattern
The join is commonly used across companies where reports are being created. The two datasets are joined together to extract meaningful analysis, which can be helpful for decision makers. The join queries are simple in SQL but achieving this in MapReduce is a bit complex. Both mappers and reducers operate on a single key at a time. Joining two datasets of equal size will require two times the network bandwidth as all data from both datasets will have to be sent to the reducer for joining.
The join operation is very costly in Hadoop as it requires data traversal from one machine to another over the network and thus it is important to make sure that enough effort is made to save network bandwidth. Let's look into a few join patterns.
Reduce side join
The simplest form of join available in the MapReduce framework and nearly any type of SQL join such as inner, left outer, full outer, and so on can be done using reduce side join. The only difficulty is that nearly all the data will be shuffled across the network to go to the reducer. Two or more datasets will be joined together using a common key. Multiple large datasets can be joined by a foreign key. Remember that you should go with map side join if one of the datasets can fit into the memory. Reduce side join should be used when both datasets cannot fit into memory.
MapReduce has the capability of reading data from multiple inputs and different formats in the same MapReduce program and it also allows different mappers to be used for a specific InputFormat. The following configuration needs to be added to the Driver class so that the MapReduce program reads the input from multiple paths and redirects to the specific mapper for processing, for example:
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, UserMapper.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, PurchaseReportMapper.class);
Let's look into some sample code of the reduce side join and see how it works. The mappers emit the records with a key as userId and a value as an identifier appended to the whole record. The X is appended to the record so that on the reducer we can easily identify that the record is coming from which Mapper. The UserMapper class will look as follows:
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Mapper;
import java.io.IOException;
public class UserMapper extends Mapper<Object, Text, Text, Text> {
private Text outputkey = new Text();
private Text outputvalue = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String[] userRecord = value.toString().split(",");
String userId = userRecord[0];
outputkey.set(userId);
outputvalue.set("X" + value.toString());
context.write(outputkey, outputvalue);
}
}
Similarly, the second Mapper processes the purchase history of the users and emits the IDs of the users who purchase the goods, and appends Y to the value as the identifier, as follows:
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Mapper;
import java.io.IOException;
public class PurchaseReportMapper {
private Text outputkey = new Text();
private Text outputvalue = new Text();
public void map(Object key, Text value, Mapper.Context context)
throws IOException, InterruptedException {
String[] purchaseRecord = value.toString().split(",");
String userId = purchaseRecord[1];
outputkey.set(userId);
outputvalue.set("Y" + value.toString());
context.write(outputkey, outputvalue);
}
}
On the Reducer side, the idea is to simply keep two lists and add user records to one list and purchase records to the other list, then perform a join based on the condition. The sample Reducer code will look as follows:
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
public class UserPurchaseJoinReducer extends Reducer<Text, Text, Text, Text> {
private Text tmp = new Text();
private ArrayList<Text> userList = new ArrayList<Text>();
private ArrayList<Text> purchaseList = new ArrayList<Text>();
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
userList.clear();
purchaseList.clear();
while (values.iterator().hasNext()) {
tmp = values.iterator().next();
if (tmp.charAt(0) == 'X') {
userList.add(new Text(tmp.toString().substring(1)));
} else if (tmp.charAt('0') == 'Y') {
purchaseList.add(new Text(tmp.toString().substring(1)));
}
}
/* Joining both dataset */
if (!userList.isEmpty() && !purchaseList.isEmpty()) {
for (Text user : userList) {
for (Text purchase : purchaseList) {
context.write(user, purchase);
}
}
}
}
}
The joining operation is a more costly operation that requires shuffling of data over the network. If there is scope, the data should be filtered at the mapper side to avoid unnecessary data movement.
Map side join (replicated join)
If any of the data is small enough to fit into the main memory, then a map side join can be a good choice. In a map side join, the small dataset is loaded into the memory map during the setup phase of mapper. Large datasets will be read as input to the mapper so that each record gets joined with a small dataset and output is then emitted to a file. There is no reduce phase and therefore there will be no shuffling and sorting phases. Map side join is widely used for left outer join and inner join use cases. Let's look into examples of how we can create a Mapper class for map side join and Driver class:
- Mapper class: The following Mapperclass is a template for using map side join and you can use it and modify the logic according to your input dataset. The data that's read from a distributed cache is stored in RAM and therefore it can throw an out of memory exception if the file size does not fit into memory. The only option to solve this problem is to increase the memory space. The setup method is executed only once during the mapper life cycle and the map function is called for each record. Inside the map function, each record is processed and checked for any matching record available in memory to perform any join operation.
Let's look into the Mapper class template. The following is the code for the Mapperclass:
import org.apache.Hadoop.conf.Configuration;
import org.apache.Hadoop.fs.Path;
import org.apache.Hadoop.io.LongWritable;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Job;
import org.apache.Hadoop.mapreduce.Mapper;
import java.io.*;
import java.net.URI;
import java.util.HashMap;
public class UserPurchaseMapSideJoinMapper extends
Mapper<LongWritable, Text, Text, Text> {
private HashMap<String, String> userDetails = new HashMap<String, String>();
private Configuration conf;
public void setup(Context context) throws IOException {
conf = context.getConfiguration();
URI[] URIs = Job.getInstance(conf).getCacheFiles();
for (URI patternsURI : URIs) {
Path filePath = new Path(patternsURI.getPath());
String userDetailFile = filePath.getName();
readFile(userDetailFile);
}
}
private void readFile(String filePath) {
try {
BufferedReader bufferedReader = new BufferedReader(new FileReader(filePath));
String userInfo = null;
while ((userInfo = bufferedReader.readLine()) != null) {
/* Add Record to map here. You can modify value and key accordingly.*/
userDetails.put(userInfo.split(",")[0], userInfo.toLowerCase());
}
} catch (IOException ex) {
System.err.println("Exception while reading stop words file: " + ex.getMessage());
}
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String purchaseDetailUserId = value.toString().split(",")[0];
String userDetail = userDetails.get(purchaseDetailUserId);
/*Perform the join operation here*/
}
}
- Driver class: In the Driverclass, we add the path of the input file that will be shipped to each mapper during their execution. Let's look into the Driver class template, as follows:
import org.apache.Hadoop.conf.Configuration;
import org.apache.Hadoop.fs.Path;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Job;
import org.apache.Hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.Hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.Hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.Hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.Hadoop.util.Tool;
import org.apache.Hadoop.util.ToolRunner;
import java.util.Map;
public class MapSideJoinDriver {
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), (Tool) new MapSideJoinDriver(), args);
System.exit(res);
}
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "map join");
job.setJarByClass(MapSideJoinDriver.class);
if (args.length < 3) {
System.out.println("Jar requires 3 paramaters : \""
+ job.getJar()
+ " input_path output_path distributedcachefile");
return 1;
}
job.addCacheFile(new Path(args[2]).toUri());
job.setMapperClass(UserPurchaseMapSideJoinMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
Path filePath = new Path(args[0]);
FileInputFormat.setInputPaths(job, filePath);
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
job.waitForCompletion(true);
return 0;
}
}
Composite join
The map side join on a very large dataset is known as a composite join. The advantage will be that the shuffling and sorting phase will be skipped as there will be no reducer. The only condition for composite join is that data needs to be prepared with a specific condition before it gets processed.
One of the conditions is that the dataset must be sorted with the key that was used for the join. It must also partition by the key and both datasets must have the same number of partitions. Hadoop provides a special InputFormat to read such datasets with CompositeInputFormat.
Before using the following template, you must process your input data to sort and partition to make the data be in the format that's required for composite join. The first step should be to prepare the input data and we must preprocess input data to sort and partition it using a join key. Let's look into mapper and reducer to sort and partition the input data.
Sorting and partitioning
The following Mapper swaps the first key with the index key. In our case, the index is already at the first position, so we may not require getRecordInCompositeJoinFormat() here:
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.Hadoop.io.LongWritable;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.List;
public class PrepareCompositeJoinRecordMapper extends Mapper<LongWritable, Text, Text, Text> {
private int indexOfKey=0;
private Splitter splitter;
private Joiner joiner;
private Text joinKey = new Text();
String separator=",";
@Override
protected void setup(Context context) throws IOException, InterruptedException {
splitter = Splitter.on(separator);
joiner = Joiner.on(separator);
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Iterable<String> recordColumns = splitter.split(value.toString());
joinKey.set(Iterables.get(recordColumns, indexOfKey));
if(indexOfKey != 0){
value.set(getRecordInCompositeJoinFormat(recordColumns, indexOfKey));
}
context.write(joinKey,value);
}
private String getRecordInCompositeJoinFormat(Iterable<String> value, int index){
List<String> temp = Lists.newArrayList(value);
String originalFirst = temp.get(0);
String newFirst = temp.get(index);
temp.set(0,newFirst);
temp.set(index,originalFirst);
return joiner.join(temp);
}
}
Reducer: The reducer emits the record with the key as the join key and the value as the entire record. The value is kept as key because in the composite join Driver class, we are going to use the KeyValueTextInputFormat class as the input format class for CompositeInputFormat, as shown in the following code:
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Reducer;
import java.io.IOException;
public class PrepareCompositeJoinRecordReducer extends Reducer<Text,Text,Text,Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(key,value);
}
}
}
Composite join template: The following template can be used to create and run your composite join example. You can modify the logic with respect to your use case. Let's look into its implementation.
Driver class: The Driver class takes four input arguments. The first two are input data files, the third one is the output file path, and the fourth one is the join type. The composite join supports only inner and outer join type, as follows:
import org.apache.Hadoop.fs.Path;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapred.*;
import org.apache.Hadoop.mapred.join.CompositeInputFormat;
public class CompositeJoinExampleDriver {
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf("CompositeJoin");
conf.setJarByClass(CompositeJoinExampleDriver.class);
if (args.length < 2) {
System.out.println("Jar requires 4 paramaters : \""
+ conf.getJar()
+ " input_path1 input_path2 output_path jointype[outer or inner] ");
System.exit(1);
}
conf.setMapperClass(CompositeJoinMapper.class);
conf.setNumReduceTasks(0);
conf.setInputFormat(CompositeInputFormat.class);
conf.set("mapred.join.expr", CompositeInputFormat.compose(args[3],
KeyValueTextInputFormat.class, new Path(args[0]), new Path(args[1])));
TextOutputFormat.setOutputPath(conf,new Path(args[2]));
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
RunningJob job = JobClient.runJob(conf);
System.exit(job.isSuccessful() ? 0 : 1);
}
}
Mapper class: The Mapper class takes join keys as the mapper's input key and TupleWritable as the value. Remember the join key will be fetched from the input files and that is why we said the input data should be in specific format, for example:
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapred.MapReduceBase;
import org.apache.Hadoop.mapred.Mapper;
import org.apache.Hadoop.mapred.OutputCollector;
import org.apache.Hadoop.mapred.Reporter;
import org.apache.Hadoop.mapred.join.TupleWritable;
import java.io.IOException;
public class CompositeJoinMapper extends MapReduceBase implements
Mapper<Text, TupleWritable, Text, Text> {
public void map(Text text, TupleWritable value, OutputCollector<Text, Text> outputCollector, Reporter reporter) throws IOException {
outputCollector.collect((Text) value.get(0), (Text) value.get(1));
}
}
There are many more design patterns available in MapReduce which you can explore. Hope you found this article interesting; you can refer to Mastering Hadoop 3 as a comprehensive guide to mastering the most advanced Hadoop 3 concepts. Mastering Hadoop 3 will help you learn how Hadoop works internally, study advanced concepts of different ecosystem tools, discover solutions to real-world use cases, and understand how to secure your cluster.
Comments
Post a Comment