Full Outer Join using Pentaho MapReduce

This article will demonstrate how to implement a full outer join using PDI MapReduce (PMR). In one of my previous blogs (here), I discussed different methods of joining data in Hadoop environment. That article provided general guidelines about joining data within Hadoop using PDI.  This post will provide a PDI application that implements a full outer join with the following features:

  • Implements the Reduce Side join design pattern (for detailed explanation of this design pattern read this)
  • Both data sets are in HDFS. You can easily extend the existing application to join more then two data sets.
  • In the resulting joined data, for each row, we indicate which datasets the joined data was taken from.

This implementation (and the Reduce Side join design pattern in general) is best for joining large data sets that are in HDFS.

Before explaining the PMR solution, I would like to point out that the easiest way to do this type of join is to use either a Pig script or a Hive/Impala SQL JOIN statement. However, if you do not want to use Pig/Hive/Impala, then the following reference PMR implementation can be used.

The sample inputs to the application are two data sets:

 Dataset A:

3,3738,New York,NY
4,12946,New York,NY
5,17556,San Diego,CA
9,3443,Oakland,CA

Dataset B:
3,35324,Hello
5,44921,Small World
5,44920,Big Universe
3,48002,Good Bye
8,48675,Thank You

The first column in both datasets is the join key. The final output of the full-outer join is:

3,X,3,3738,New York,NY,3,48002,Good Bye
3,X,3,3738,New York,NY,3,35324,Hello
4,A,4,12946,New York,NY,,,
5,X,5,17556,San Diego,CA,5,44920,Big Universe
5,X,5,17556,San Diego,CA,5,44921,Small World
8,B,,,,,8,48675,Thank You
9,A,9,3443,Oakland,CA,,,

 Where:
  • First column is the the join key.
  • Second column indicates which data set the data originated from. The value of X indicates data came from both sets. The value of A indicates that only data from dataset A was present. The value of B indicates only data from dataset B was available.
  • The rest of the fields are the joined data.

You can download this PDI solution here: full_outer_join_pmr

You should start looking at the PDI Job implemented in main.kjb. This PDI job performs the following tasks:

  1. Configures variable for Hadoop and various other parameters used in the application. You will need to configure the Hadoop cluster as needed in the Init Vars job entry.
  2. Copies sample datasets from local file system to HDFS.
  3. Calls the add_row_type PDI job which calls the map only Hadoop job on each dataset to tag each row with an identifier indicating source dataset.
  4. Calls the PMR job to join the two data sets using Pentaho MapReduce job entry and User Defined Java Class transformation step.

The sample app has been tested with the following software:

  1. Pentaho Data Integration 5.3
  2. Cloudera  CDH5.2

Leave a comment

Filed under Big Data, Hadoop, MapReduce, PDI, Pentaho, Pentaho Data Integration

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s