How to join big data sets using MapReduce and PDI

When doing lookups for data that resides inside a Hadoop cluster, it is best to have both the data sets you want to join reside in HDFS.  You have several options when using PDI and Hadoop cluster technology to join data sets inside the Hadoop cluster.  The specific solution is dependent of the amount of data that is in the tables:

  1. PDI’s Hadoop File Input with Stream Value Lookup step
    1. This solution is best when the lookup data is very small
    2. Put lookup file in Hadoop’s Distributed Cache
    3. See Joins with Map Reduce for example on how you would do this Java. In PDI you would simply add the file to Hadoop’s distributed cache and then use PDI’s Stream Lookup step  (See here on how to add to Hadoop’s distributed cache in PDI )
  2. PDI’s HBase Input Step with Stream Value Lookup
    1. This solution is best when the lookup data is small
    2. This will perform full range scans of the HBase tables, so if the HBase table is large this could be slow.
    3. HBase configuration also impacts the performance.
  3. Directly use HBase API with PDI’s User Defined Java Class
    1. This solution is best when the lookup data is large and is in HBase tables
    2. You will need to write the HBase API with the Pentaho UDJC. You must be familiar with the HBase API features to get the best performance (Use HBasePools as static variable and try to apply as many filters in a single request to minimize the calls out to HBase).
    3. HBase configuration also impacts the performance.
  4. MapReduce Joins
    1. This solution is best when both the number of input data rows and the total number lookup data is large.
    2. Both of these data sets are in HDFS.
    3. This can also be done using Pig/Impala/Hive instead of actually writing the low-level MapReduce.

In this posting I will detail a solution for the forth option. Before explaining the process of implementing an actual MapReduce, I would like to point out that the easiest way to do this type of join is to use either a Pig script or a Hive/Impala SQL JOIN statement. 

However, if you do not want to use Pig/Hive/Impala to do the joins then the following reference implementation can be used.

When you have two huge data sets that are in HDFS, you can join them using MultipleInputs feature of Apache Hadoop’s Java API when setting up the Hadoop job (Detailed example can be found here). This feature is not available with PDI (as of PDI 4.4 GA).  The attached example demonstrates how to implement the forth solution in PDI. The basic design of the solution is:

  1. Create a map only task for each data set. The mapper adds a row type to each row and writes it back out to HDFS. The row type identifies which data set the row originated from. Note that the number of reducers is set to zero. This will disable the shuffle, sort, and reduce parts of the process to make it more efficient.
  2. Take the output of the mappers and copy them to a single HDFS directory.
  3. Run a MapReduce job that joins the data sets where the input dir contains both data sets with the row type appended to each row.
    1. The mapper basically performs any data cleanup or massaging of data that you want. The mapper output is the the data from each row and the key is the join key.
    2. The reducer then joins the data from both rows and outputs the key and joined data.

You can download this solution here.


Filed under Big Data, Hadoop, MapReduce, PDI, Pentaho, Pentaho Data Integration

How to create custom reports using data from MongoDB

This demonstrate below show how to visually create a report directly against data stored in MongoDB (with no coding required).  The following topics are shown:

  1. Pentaho Data Integration tool is used to create a transformation that does the following:
    1. Connect to and query MongoDB.
    2. Query results are sorted.
    3. Sorted results are grouped.
  2. Pentaho Report Designer is used to visually create a report by using the data from a PDI transformation.

1 Comment

Filed under Big Data, MongoDB, PDI, Pentaho, PRD

How to perform analysis of data in MongoDB using PDI

The demo below shows how to use Pentaho Data Integration tool  to perform ad-hoc analysis of data stored in MongoDB and MySQL. The following items are shown:

  1. Query data in MongoDB.
  2. Convert MongoDB document from JSON document to a basic row of columns using the JSON input step.
  3. Lookup a field in the MongoDB document. The lookup table is stored in MySQL.
  4. Store the document and the lookup data into a table in MySQL.
  5. Perform ad-hoc analysis using PDI modeler perspective.
  6. Display results using PDI visualizer.


Leave a comment

Filed under Big Data, MongoDB, PDI, Pentaho

How to get data into MongoDB using PDI

This demo will show how to import data from a CSV file into MongoDB using Pentaho Data Integration tool (a.k.a. Kettle).  The following items will be demonstrated:

  1. Basics of how to map columns from CSV file to fields in a MongoDB JSON document.
  2. How to handle variable/optional columns.
  3. Perform basic data scrubbing before adding data into MongoDB.

Although this demo uses a CSV file as input data, PDI can just as easily import data from many JDBC compliant databases by using the Table Input step.

Leave a comment

Filed under Big Data, MongoDB, PDI, Pentaho