Send Pentaho MapReduce Output to Multiple Destinations

When implementing MapReduce application using Pentaho PDI, the mapper and reducer can only send data out to a single destination directory when using the Hadoop Output step. The mapper outputs data is sent to a temporary directory on the node where it is executing and is only temporary. The reducer outputs to the output directory in HDFS that is configured in the Pentaho MapReduce job entry.

PDI does not support Hadoop’s native features that provides an output formatter that allows you to write MapReduce output to multiple destinations. However, the following method can be used to write output data to multiple destinations within a mapper or a reducer.

  1. In the mapper or reducer, use the Pentaho Hadoop File Output step (or Text Output step) to write data to files in HDFS. These files will be in addition to the standard mapper/reducer output files that are generated by the Hadoop Output step.
  2. The directory that the Hadoop File Output step stores the file under cannot be the same directory as the output directory configured in the MapReduce job entry.
  3. You need to programmatically create a unique name for the file if the Hadoop File Output step in all the mapper and reducers are writing to the same directory. You can have multiple mapper and reducers executing at the same time in your cluster, so you have to make sure each instance of the mapper or reducer creates a unique file in the HDFS name space.  Only one mapper or reducer can write to an HDFS file at the same time.

The diagram below is an example KTR that functions as a reducer to the Pentaho MapReduce job:

MultioutpuFileReducer

Diagram notes:

  1. The Filter Rows step is used to send some rows to the standard Hadoop Output step and other rows to the Hadoop File Output step.
  2. The Hadoop Output step will send the data to HDFS directory that is configured as the output directory in the Pentaho MapReduce job entry (PDI Job not shown).
  3. The Hadoop File Output step creates another file in HDFS to store all rows less than 10. The actual filename contains timestamp that has a milliseconds resolution so no two mappers will create the same filename in the same HDFS dir (note the resolution of the timestamp is milliseconds).

The above design can be much more powerful then the native Hadoop API because PDI has connector to many technologies. You can use this design to write out to NoSQL data store such as Cassandra on MongoDB and publish rows to a Kafka bus.

Leave a comment

Filed under Uncategorized

Overview of Pentaho MapReduce Integration with Hadoop

There seems to be some confusion in how Pentaho Data Integration’s MapReduce functionality works with a Hadoop cluster. In this posting I will explain the basics of how Pentaho MapReduce (PMR) integrates with the Hadoop’s MapReduce. I assume you know the basics of PDI application and Hadoop MapReduce development but let’s review some PDI basics that are relevant to this discussion:
  1. PDI is a java-based ETL engine, but you do not have to write Java to implement your ETL.
  2. PDI applications are driven by the workflow(s) that you define.    The workflows are stored in XML files.
  3. PDI’s Kettle engine interperpates the XML and executes the application.
  4. The Kettle engine is embedded in all of the following tools (For the remainder of this discussion, when we say “PDI”, it will imply any of these apps):
    1. Spoon – A graphical design tool for designing , executing and testing PDI  workflows.  Most commonly used during development.
    2. PDI Server – The application that runs the Kettle engine in a web container such as Apache Tomcat  server.
    3. Kitchen – Command line script uses the Kettle engine to execute PDI jobs.
    4. Pan – Command line script that starts the Kettle engine to execute PDI transformation
When implementing a PMR application, you must break your problem into a MapReduce application. PDI does not automagically convert an existing PDI application to a MapReduce. So lets review the components that constitute a MapReduce application and how each is represented in PDI:
  1. MapReduce Driver Application
    1. This is the code that configures the Hadoop job and submits it to the Hadoop cluster.
    2. In PDI, this is the Pentaho MapReduce job entry. In this entry you provide the following type of information:
      1. PDI transformations to use for mapper, combiner, and reducer.
      2. Hadoop cluster information (i.e. Name Node hostname/port, etc…)
      3. Input/Output Formatters
      4. Input/Output HDFS directories.
      5. Any  Hadoop parameters needed by your application.
  2. Mapper/Reducer/Combiner
    1. This is the actual application to process your data using the MapReduce paradigm.
    2. In PDI, you write a unique transformation for mapper, combiner, and reducer. Only a mapper is mandatory.
    3. Each transformation that is used as a mapper, reducer, or combiner must have an instance of the Hadoop Input step and the Hadoop Output step. The rest of the transformation is your logic to process your data. (There are some steps that are suited for Hadoop environment…but that another topic for another day).
It is important to understand where various components of your application PDI application execute. Let’s say we have a sample application that is implemented in a PDI job (a *.kjb) that will orchestrate all aspects of your application in sequential order. This main PDI job contains the Pentaho MapReduce job entry (the Hadoop driver). All the entries in your main PDI job will execute on the machine where you are running the PDI server, Spoon, or Pan.  When the Kettle engine interperpates the PMR step within this main job, it will do the following:
  1. The first time you run a PDI application on a Hadoop cluster, PDI will upload all of its’ dependent binaries (JARS) to HDFS an directory and mark that directory to be part of Hadoop’s Distributed Cache. If the target HDFS directory already exists, then it will not upload anything. PMR uses Hadoop’s Distributed Cache to distribute PDI’s libraries to all the nodes in the cluster.
    HDFS directory’s name contains PDI versioning information. So if you upgrade PDI to a new version, then PDI will automatically upload the new version to a new HDFS target directory.
  2. Once the Kettle engine and all of it’s dependencies are uploaded to the Hadoop cluster, PDI will submit a custom Hadoop MapReduce job that will instantiate the Kettle engine within the Mapper, Combiner, and Reducer. The Kettle engine within this custom mapper/combiner/reducer is given the appropriate mapper/combiner/reducer transformation configed in the PMR entry. The Kettle engine embedded in the mapper/combiner/reducer process will then intemperate and execute the transformation in Hadoop cluster in the context of the mapper, reducer, combiner process.
  3. In the PMR entry, under the Cluster tab, if the Enable Blocking option is checked, then the main PDI job that contains the PMR will be blocked waiting for the PMR Hadoop job to complete. Once the PMR Hadoop job completes, then the next entry in the PDI main job will continue.
  4. If the Enable Blocking is not checked, then PDI server will just submit the PMR Hadoop job to the Hadoop cluster and continue with the next entry in the PDI man job. You will not get completion status PMR Hadoop job. Basically fire off the PMR app on the Hadoop cluster, forget about it, and continue with next entry in the PDI app.
Due the above design the following point should be noted:
  1. The first time you execute a PMR application, there will be a delay in starting the PMR execution because PDI will copy all the PDI related binaries (JARs) to Hadoop’s distributed cache. This overhead is only for the first time you run given version of PDI. Once the PDI binaries are uploaded, PDI just confirms the existence of the uploaded dir (NOT the actual content of the directory).
  2. If you change a JAR in PDI (i.e. you got a patch that replaces a Jar file), then you should delete the HDFS dir where the PDI binaries are uploaded. This will force the upload of binaries with the change JARs.
  3. A Pentaho MapReduce application can easily take advantage of multithreading within a mapper, reducer, combiner due to the way PDI executes each step in it’s own thread and it’s ability to have multiple instance of a step.
Hopefully this gives you a good understanding of how PDI apps run on a Hadoop cluster. The above design gives PDI user some important benefits:
  1. PDI automagically deploys and maintains itself on the Hadoop cluster.  You do not have to manually maintain Pentaho specific binaries on the Hadoop cluster. No manual copying of Pentaho JARs to all the nodes.
  2. Developers of MapReduce applications can write solutions that perform faster then other tools because you can easily process multiple rows in parallel using threading features of PDI transformation. Yes you could code this in Java MapReduce, but it in PDI it just a few clicks of configuration.
  3. Since you are using the same Kettle engine in the Spoon development environment and in the Hadoop cluster, you can test the logic of PMR transformations within Spoon. This can significantly speed up development time because debugging an app in Spoon without using a Hadoop cluster.

Leave a comment

Filed under Big Data, Hadoop, MapReduce, PDI, Pentaho, Pentaho Data Integration

Full Outer Join using Pentaho MapReduce

This article will demonstrate how to implement a full outer join using PDI MapReduce (PMR). In one of my previous blogs (here), I discussed different methods of joining data in Hadoop environment. That article provided general guidelines about joining data within Hadoop using PDI.  This post will provide a PDI application that implements a full outer join with the following features:

  • Implements the Reduce Side join design pattern (for detailed explanation of this design pattern read this)
  • Both data sets are in HDFS. You can easily extend the existing application to join more then two data sets.
  • In the resulting joined data, for each row, we indicate which datasets the joined data was taken from.

This implementation (and the Reduce Side join design pattern in general) is best for joining large data sets that are in HDFS.

Before explaining the PMR solution, I would like to point out that the easiest way to do this type of join is to use either a Pig script or a Hive/Impala SQL JOIN statement. However, if you do not want to use Pig/Hive/Impala, then the following reference PMR implementation can be used.

The sample inputs to the application are two data sets:

 Dataset A:

3,3738,New York,NY
4,12946,New York,NY
5,17556,San Diego,CA
9,3443,Oakland,CA

Dataset B:
3,35324,Hello
5,44921,Small World
5,44920,Big Universe
3,48002,Good Bye
8,48675,Thank You

The first column in both datasets is the join key. The final output of the full-outer join is:

3,X,3,3738,New York,NY,3,48002,Good Bye
3,X,3,3738,New York,NY,3,35324,Hello
4,A,4,12946,New York,NY,,,
5,X,5,17556,San Diego,CA,5,44920,Big Universe
5,X,5,17556,San Diego,CA,5,44921,Small World
8,B,,,,,8,48675,Thank You
9,A,9,3443,Oakland,CA,,,

 Where:
  • First column is the the join key.
  • Second column indicates which data set the data originated from. The value of X indicates data came from both sets. The value of A indicates that only data from dataset A was present. The value of B indicates only data from dataset B was available.
  • The rest of the fields are the joined data.

You can download this PDI solution here: full_outer_join_pmr

You should start looking at the PDI Job implemented in main.kjb. This PDI job performs the following tasks:

  1. Configures variable for Hadoop and various other parameters used in the application. You will need to configure the Hadoop cluster as needed in the Init Vars job entry.
  2. Copies sample datasets from local file system to HDFS.
  3. Calls the add_row_type PDI job which calls the map only Hadoop job on each dataset to tag each row with an identifier indicating source dataset.
  4. Calls the PMR job to join the two data sets using Pentaho MapReduce job entry and User Defined Java Class transformation step.

The sample app has been tested with the following software:

  1. Pentaho Data Integration 5.3
  2. Cloudera  CDH5.2

Leave a comment

Filed under Big Data, Hadoop, MapReduce, PDI, Pentaho, Pentaho Data Integration

Processing XML and JSON in Hadoop using PDI

Hadoop can efficiently process large amounts of data by splitting large files into smaller chunks (typically 128 MB) and process each chunk in parallel.  The “splittability” of a file is central to the efficient handling of the file by MapReduce. A file format is splittable if you can start processing records anywhere in the file.  A splittable file cannot have a file structure that requires you to start reading from the first line of a file.  Therefore XML and JSON documents that span multiple lines are not splittable. You cannot just start processing an XML document in the middle because you need the starting and ending tags of the XML hierarchy. There are three possible ways to handle these types of files in Hadoop:

  1. Store the XML in HBase an column. The process the HBase table, extract the column that has the XML, and process the the XML as needed in a Mapper. The main downside of this solution is that it forces you to use HBase. If you decide to use this method, you can use PDI’s HBase Row Decoder step to get the HBase data into a Pentaho MapReduce (See: this posting to see how to use this step in PMR )
  2. Remove all line breaks from doc and create a single line that contains the entire XML/JSON data as a single record. If using the a text based input formatter for the mapper/reducer, then you can strip all the line breaks from that XML doc or JSON object so that the entire XML doc or JSON object appears as a single string record in the text file. You then put the single line in an HDFS file. You put multiple XML docs or JSON objects in separate lines in the same HDFS file.  When your mapper function receives the record, it will get the entire XML doc or JSON obj as a single line. Although I have seen this solution implemented, I do not recommend this solution because if your XML/JSON has data that requires line breaks that are not encoded then you will loose the line break formatting (i.e. you may have this issue if using CDATA XML tag).
  3. Use a custom input formatter. This is probably the most popular solution. This requires you to implement Hadoop’s Java classes for writing custom input formatter and record reader classes. The Apache Mahout projects has one of the most widely used implementations of this solution (see Mahout’s XmlInputFormat implementation).  Although this is probably the most popular method of processing XML files in MapReduce application, I find this not to be the most efficient because you effectively have to parse the XML document twice: once to get the entire doc as a single record that gets passed to your mapper, and a second time in you mapper when your mapper code will probably parse it for further processing.
  4. Store the data in binary/hex format. In this solution you would take the text data and put in a binary (byte array) format that can be stored in a single line. Then write binary format as a single line of string to an HDFS file. You put multiple “stringified” binary representations of XML/JSON text in separate lines in the same HDFS file. Then in the mapper you reverse the binary string format to the same textual string format. This will preserve all the formatting and special characters that your XML/JSON contains.

The last option is preferred method because it does not have the issues of the first few options.  The last option is also very easy to implement in PDI.  If you are writing your application in Java, you could implement it using Hadoop’s sequence file format. However, the current release of PDI cannot natively write out to a sequence file without. You could write custom Java code in PDI to write out to a Hadoop sequence file, but there is an easier way to get the same benefits of without using sequence files.

The attached sample PDI application demonstrates how to process non-splittable text formats such as XML and JSON in a MapReduce application. This example pre-processes XML docs into a hex string that can be written out to a single line. Each line can then be read into a mapper as a single record and converted back to XML and processed using PDI’s XML parsers. It uses a Java Expression PDI step to convert between XML string and its hex representation.

You can download this PDI solution here: xml_processing

You should start looking at the PDI Job implemented in master_job.kjb. This PDI job performs the following tasks:

  • Configures variable for Hadoop and various other parameters used in the application. You will need to configure the Hadoop cluster as needed in the Set Variables job entry.
  • Calls the store_binary_hdfs transformation that will read all the XML docs in the data directory, convert them to Hex strings, and write them out to a single HDFS files.
  • Runs a Pentaho MapReduce (map only) application to parse the XML and extract the wanted fields to a CSV file that is sent to the mapper output.

The sample app has been tested with the following software:

  1. Pentaho Data Integration 5.2
  2. Cloudera  CDH5.1

Leave a comment

Filed under Big Data, Hadoop, JSON, MapReduce, PDI, Pentaho, Pentaho Data Integration, XML

How to get Hadoop counters in PDI applications

Hadoop maintains various counters about MapReduce jobs in the JobTracker. These counters can be viewed using the JobTracker’s web UI but are not easily accessible within a PDI application. This solution shows how you can retrieve the Hadoop counters from a PDI applications.  The attached solution implements the word count MapReduce application using Pentaho MapReduce. Once the Penatho MapReduce is complete, it collects and logs all the Hadoop counters. The solution contains the following files/folders:

  1. data – directory that contains sample files that will used for word count. These files will be copied to HDFS for performing the word count MapReduce application.
  2. wordcount.kjb – This is the main PDI application that coordinates all the steps required to perform the word count. It performs the following tasks:
    1. Creates a unique Hadoop Job Name  for our MapReduce application.
    2. Sets variables to configure the Hadoop cluster.
    3. Executes the word count MapReduce application using Pentaho MapReduce.
    4. Retrieves the Hadoop counters and logs them to Kettle’s log file.
  3. wc_mapper.ktr – Transformation that implements the map phase of the word count application.
  4. wc_reducee.ktr – Transformation that implements the reduce phase of the word count application.
  5. capture_hadoop_metrics.ktr – This transformation collects all the Hadoop counters in a User Defined Java Class and outputs the information to logs. The UDJC uses Hadoop’s native API to retrieve the counters and output a single row for each counter.

In order for this solution to work, the following requirements must be met:

  1. Hadoop job has not been retired. Hadoop’s Java API only only retrieves job and counter data for non-retired jobs.
  2. Hadoop Job Name must be unique. Information about a Hadoop job is retrieved using the PMR Job Name. This name must be unique for all non-retired Hadoop jobs.
  3. PDI uses Hadoop’s Java API to retrieve the counters. Therefore all required Hadoop libraries (JARs) must be copied to the appropriate Pentaho application lib directory. For example, if you are using CDH4.x  and run the sample app in Spoon 5.x, then copy all of Hadoop client libs from [PentahoHome]/design-tools/data-integration/plugins/pentaho-big-data-plugin/hadoop-configurations/cdh42/lib/client/* to [PentahoHome]/design-tools/data-integration/lib.

You can download this PDI solution here: get_hadoop_metrics.tar.gz

The sample app has been tested with the following software:

  1. Pentaho Data Integration 5.x
  2. Cloudera  CDH4.3

2 Comments

Filed under Big Data, Hadoop, MapReduce, PDI, Pentaho, Pentaho Data Integration

Uploading custom JARs to Hadoop for PDI

Many PDI applications require third-party Java libraries to perform tasks within PDI jobs and transformations. These libraries must be included in the class path of Hadoop mappers and reducers so PDI applications can use them in the Hadoop cluster. The best way to do this is to copy all dependent JARs to Hadoop’s Distributed Cache and add the following parameters to the Pentaho Map Reduce job step (in the User Defined tab):

  • mapred.cache.files
  • mapred.job.classpath.files

The process of uploading the Custom JARs can be automated by implementing  a PDI transformation that does the following:

  1. Take a list of the JARs the PDI application requires.
  2. Copy all the files from the local filesystem to a configured HDFS dir.
  3. Set a global variable that has a list of all the JARs with the fully qualified path in HDFS. This variable is then used to set the user defined variable given above in the Pentaho MapReduce job step.

You can download this PDI solution here.

This solution has been tested with PDI 4.4.

1 Comment

Filed under Big Data, Hadoop, MapReduce, PDI, Pentaho, Pentaho Data Integration

How to join big data sets using MapReduce and PDI

When doing lookups for data that resides inside a Hadoop cluster, it is best to have both the data sets you want to join reside in HDFS.  You have several options when using PDI and Hadoop cluster technology to join data sets inside the Hadoop cluster.  The specific solution is dependent of the amount of data that is in the tables:

  1. PDI’s Hadoop File Input with Stream Value Lookup step
    1. This solution is best when the lookup data is very small
    2. Put lookup file in Hadoop’s Distributed Cache
    3. See Joins with Map Reduce for example on how you would do this Java. In PDI you would simply add the file to Hadoop’s distributed cache and then use PDI’s Stream Lookup step  (See here on how to add to Hadoop’s distributed cache in PDI )
  2. PDI’s HBase Input Step with Stream Value Lookup
    1. This solution is best when the lookup data is small
    2. This will perform full range scans of the HBase tables, so if the HBase table is large this could be slow.
    3. HBase configuration also impacts the performance.
  3. Directly use HBase API with PDI’s User Defined Java Class
    1. This solution is best when the lookup data is large and is in HBase tables
    2. You will need to write the HBase API with the Pentaho UDJC. You must be familiar with the HBase API features to get the best performance (Use HBasePools as static variable and try to apply as many filters in a single request to minimize the calls out to HBase).
    3. HBase configuration also impacts the performance.
  4. MapReduce Joins
    1. This solution is best when both the number of input data rows and the total number lookup data is large.
    2. Both of these data sets are in HDFS.
    3. This can also be done using Pig/Impala/Hive instead of actually writing the low-level MapReduce.

In this posting I will detail a solution for the forth option. Before explaining the process of implementing an actual MapReduce, I would like to point out that the easiest way to do this type of join is to use either a Pig script or a Hive/Impala SQL JOIN statement. 

However, if you do not want to use Pig/Hive/Impala to do the joins then the following reference implementation can be used.

When you have two huge data sets that are in HDFS, you can join them using MultipleInputs feature of Apache Hadoop’s Java API when setting up the Hadoop job (Detailed example can be found here). This feature is not available with PDI (as of PDI 4.4 GA).  The attached example demonstrates how to implement the forth solution in PDI. The basic design of the solution is:

  1. Create a map only task for each data set. The mapper adds a row type to each row and writes it back out to HDFS. The row type identifies which data set the row originated from. Note that the number of reducers is set to zero. This will disable the shuffle, sort, and reduce parts of the process to make it more efficient.
  2. Take the output of the mappers and copy them to a single HDFS directory.
  3. Run a MapReduce job that joins the data sets where the input dir contains both data sets with the row type appended to each row.
    1. The mapper basically performs any data cleanup or massaging of data that you want. The mapper output is the the data from each row and the key is the join key.
    2. The reducer then joins the data from both rows and outputs the key and joined data.

You can download this solution here.

2 Comments

Filed under Big Data, Hadoop, MapReduce, PDI, Pentaho, Pentaho Data Integration