Many PDI applications require third-party Java libraries to perform tasks within PDI jobs and transformations. These libraries must be included in the class path of Hadoop mappers and reducers so PDI applications can use them in the Hadoop cluster. The best way to do this is to copy all dependent JARs to Hadoop’s Distributed Cache and add the following parameters to the Pentaho Map Reduce job step (in the User Defined tab):
The process of uploading the Custom JARs can be automated by implementing a PDI transformation that does the following:
- Take a list of the JARs the PDI application requires.
- Copy all the files from the local filesystem to a configured HDFS dir.
- Set a global variable that has a list of all the JARs with the fully qualified path in HDFS. This variable is then used to set the user defined variable given above in the Pentaho MapReduce job step.
You can download this PDI solution here.
This solution has been tested with PDI 4.4.
When doing lookups for data that resides inside a Hadoop cluster, it is best to have both the data sets you want to join reside in HDFS. You have several options when using PDI and Hadoop cluster technology to join data sets inside the Hadoop cluster. The specific solution is dependent of the amount of data that is in the tables:
- PDI’s Hadoop File Input with Stream Value Lookup step
- This solution is best when the lookup data is very small
- Put lookup file in Hadoop’s Distributed Cache
- See Joins with Map Reduce for example on how you would do this Java. In PDI you would simply add the file to Hadoop’s distributed cache and then use PDI’s Stream Lookup step (See here on how to add to Hadoop’s distributed cache in PDI )
- PDI’s HBase Input Step with Stream Value Lookup
- This solution is best when the lookup data is small
- This will perform full range scans of the HBase tables, so if the HBase table is large this could be slow.
- HBase configuration also impacts the performance.
- Directly use HBase API with PDI’s User Defined Java Class
- This solution is best when the lookup data is large and is in HBase tables
- You will need to write the HBase API with the Pentaho UDJC. You must be familiar with the HBase API features to get the best performance (Use HBasePools as static variable and try to apply as many filters in a single request to minimize the calls out to HBase).
- HBase configuration also impacts the performance.
- MapReduce Joins
- This solution is best when both the number of input data rows and the total number lookup data is large.
- Both of these data sets are in HDFS.
- This can also be done using Pig/Impala/Hive instead of actually writing the low-level MapReduce.
In this posting I will detail a solution for the forth option. Before explaining the process of implementing an actual MapReduce, I would like to point out that the easiest way to do this type of join is to use either a Pig script or a Hive/Impala SQL JOIN statement.
However, if you do not want to use Pig/Hive/Impala to do the joins then the following reference implementation can be used.
When you have two huge data sets that are in HDFS, you can join them using MultipleInputs feature of Apache Hadoop’s Java API when setting up the Hadoop job (Detailed example can be found here). This feature is not available with PDI (as of PDI 4.4 GA). The attached example demonstrates how to implement the forth solution in PDI. The basic design of the solution is:
- Create a map only task for each data set. The mapper adds a row type to each row and writes it back out to HDFS. The row type identifies which data set the row originated from. Note that the number of reducers is set to zero. This will disable the shuffle, sort, and reduce parts of the process to make it more efficient.
- Take the output of the mappers and copy them to a single HDFS directory.
- Run a MapReduce job that joins the data sets where the input dir contains both data sets with the row type appended to each row.
- The mapper basically performs any data cleanup or massaging of data that you want. The mapper output is the the data from each row and the key is the join key.
- The reducer then joins the data from both rows and outputs the key and joined data.
You can download this solution here.
This demonstrate below show how to visually create a report directly against data stored in MongoDB (with no coding required). The following topics are shown:
- Pentaho Data Integration tool is used to create a transformation that does the following:
- Connect to and query MongoDB.
- Query results are sorted.
- Sorted results are grouped.
- Pentaho Report Designer is used to visually create a report by using the data from a PDI transformation.
The demo below shows how to use Pentaho Data Integration tool to perform ad-hoc analysis of data stored in MongoDB and MySQL. The following items are shown:
- Query data in MongoDB.
- Convert MongoDB document from JSON document to a basic row of columns using the JSON input step.
- Lookup a field in the MongoDB document. The lookup table is stored in MySQL.
- Store the document and the lookup data into a table in MySQL.
- Perform ad-hoc analysis using PDI modeler perspective.
- Display results using PDI visualizer.
This demo will show how to import data from a CSV file into MongoDB using Pentaho Data Integration tool (a.k.a. Kettle). The following items will be demonstrated:
- Basics of how to map columns from CSV file to fields in a MongoDB JSON document.
- How to handle variable/optional columns.
- Perform basic data scrubbing before adding data into MongoDB.
Although this demo uses a CSV file as input data, PDI can just as easily import data from many JDBC compliant databases by using the Table Input step.