Monthly Archives: April 2013

How to join big data sets using MapReduce and PDI

When doing lookups for data that resides inside a Hadoop cluster, it is best to have both the data sets you want to join reside in HDFS.  You have several options when using PDI and Hadoop cluster technology to join data sets inside the Hadoop cluster.  The specific solution is dependent of the amount of data that is in the tables:

  1. PDI’s Hadoop File Input with Stream Value Lookup step
    1. This solution is best when the lookup data is very small
    2. Put lookup file in Hadoop’s Distributed Cache
    3. See Joins with Map Reduce for example on how you would do this Java. In PDI you would simply add the file to Hadoop’s distributed cache and then use PDI’s Stream Lookup step  (See here on how to add to Hadoop’s distributed cache in PDI )
  2. PDI’s HBase Input Step with Stream Value Lookup
    1. This solution is best when the lookup data is small
    2. This will perform full range scans of the HBase tables, so if the HBase table is large this could be slow.
    3. HBase configuration also impacts the performance.
  3. Directly use HBase API with PDI’s User Defined Java Class
    1. This solution is best when the lookup data is large and is in HBase tables
    2. You will need to write the HBase API with the Pentaho UDJC. You must be familiar with the HBase API features to get the best performance (Use HBasePools as static variable and try to apply as many filters in a single request to minimize the calls out to HBase).
    3. HBase configuration also impacts the performance.
  4. MapReduce Joins
    1. This solution is best when both the number of input data rows and the total number lookup data is large.
    2. Both of these data sets are in HDFS.
    3. This can also be done using Pig/Impala/Hive instead of actually writing the low-level MapReduce.

In this posting I will detail a solution for the forth option. Before explaining the process of implementing an actual MapReduce, I would like to point out that the easiest way to do this type of join is to use either a Pig script or a Hive/Impala SQL JOIN statement. 

However, if you do not want to use Pig/Hive/Impala to do the joins then the following reference implementation can be used.

When you have two huge data sets that are in HDFS, you can join them using MultipleInputs feature of Apache Hadoop’s Java API when setting up the Hadoop job (Detailed example can be found here). This feature is not available with PDI (as of PDI 4.4 GA).  The attached example demonstrates how to implement the forth solution in PDI. The basic design of the solution is:

  1. Create a map only task for each data set. The mapper adds a row type to each row and writes it back out to HDFS. The row type identifies which data set the row originated from. Note that the number of reducers is set to zero. This will disable the shuffle, sort, and reduce parts of the process to make it more efficient.
  2. Take the output of the mappers and copy them to a single HDFS directory.
  3. Run a MapReduce job that joins the data sets where the input dir contains both data sets with the row type appended to each row.
    1. The mapper basically performs any data cleanup or massaging of data that you want. The mapper output is the the data from each row and the key is the join key.
    2. The reducer then joins the data from both rows and outputs the key and joined data.

You can download this solution here.


Filed under Big Data, Hadoop, MapReduce, PDI, Pentaho, Pentaho Data Integration