Monthly Archives: April 2015

Overview of Pentaho MapReduce Integration with Hadoop

There seems to be some confusion in how Pentaho Data Integration’s MapReduce functionality works with a Hadoop cluster. In this posting I will explain the basics of how Pentaho MapReduce (PMR) integrates with the Hadoop’s MapReduce. I assume you know the basics of PDI application and Hadoop MapReduce development but let’s review some PDI basics that are relevant to this discussion:
  1. PDI is a java-based ETL engine, but you do not have to write Java to implement your ETL.
  2. PDI applications are driven by the workflow(s) that you define.    The workflows are stored in XML files.
  3. PDI’s Kettle engine interperpates the XML and executes the application.
  4. The Kettle engine is embedded in all of the following tools (For the remainder of this discussion, when we say “PDI”, it will imply any of these apps):
    1. Spoon – A graphical design tool for designing , executing and testing PDI  workflows.  Most commonly used during development.
    2. PDI Server – The application that runs the Kettle engine in a web container such as Apache Tomcat  server.
    3. Kitchen – Command line script uses the Kettle engine to execute PDI jobs.
    4. Pan – Command line script that starts the Kettle engine to execute PDI transformation
When implementing a PMR application, you must break your problem into a MapReduce application. PDI does not automagically convert an existing PDI application to a MapReduce. So lets review the components that constitute a MapReduce application and how each is represented in PDI:
  1. MapReduce Driver Application
    1. This is the code that configures the Hadoop job and submits it to the Hadoop cluster.
    2. In PDI, this is the Pentaho MapReduce job entry. In this entry you provide the following type of information:
      1. PDI transformations to use for mapper, combiner, and reducer.
      2. Hadoop cluster information (i.e. Name Node hostname/port, etc…)
      3. Input/Output Formatters
      4. Input/Output HDFS directories.
      5. Any  Hadoop parameters needed by your application.
  2. Mapper/Reducer/Combiner
    1. This is the actual application to process your data using the MapReduce paradigm.
    2. In PDI, you write a unique transformation for mapper, combiner, and reducer. Only a mapper is mandatory.
    3. Each transformation that is used as a mapper, reducer, or combiner must have an instance of the Hadoop Input step and the Hadoop Output step. The rest of the transformation is your logic to process your data. (There are some steps that are suited for Hadoop environment…but that another topic for another day).
It is important to understand where various components of your application PDI application execute. Let’s say we have a sample application that is implemented in a PDI job (a *.kjb) that will orchestrate all aspects of your application in sequential order. This main PDI job contains the Pentaho MapReduce job entry (the Hadoop driver). All the entries in your main PDI job will execute on the machine where you are running the PDI server, Spoon, or Pan.  When the Kettle engine interperpates the PMR step within this main job, it will do the following:
  1. The first time you run a PDI application on a Hadoop cluster, PDI will upload all of its’ dependent binaries (JARS) to HDFS an directory and mark that directory to be part of Hadoop’s Distributed Cache. If the target HDFS directory already exists, then it will not upload anything. PMR uses Hadoop’s Distributed Cache to distribute PDI’s libraries to all the nodes in the cluster.
    HDFS directory’s name contains PDI versioning information. So if you upgrade PDI to a new version, then PDI will automatically upload the new version to a new HDFS target directory.
  2. Once the Kettle engine and all of it’s dependencies are uploaded to the Hadoop cluster, PDI will submit a custom Hadoop MapReduce job that will instantiate the Kettle engine within the Mapper, Combiner, and Reducer. The Kettle engine within this custom mapper/combiner/reducer is given the appropriate mapper/combiner/reducer transformation configed in the PMR entry. The Kettle engine embedded in the mapper/combiner/reducer process will then intemperate and execute the transformation in Hadoop cluster in the context of the mapper, reducer, combiner process.
  3. In the PMR entry, under the Cluster tab, if the Enable Blocking option is checked, then the main PDI job that contains the PMR will be blocked waiting for the PMR Hadoop job to complete. Once the PMR Hadoop job completes, then the next entry in the PDI main job will continue.
  4. If the Enable Blocking is not checked, then PDI server will just submit the PMR Hadoop job to the Hadoop cluster and continue with the next entry in the PDI man job. You will not get completion status PMR Hadoop job. Basically fire off the PMR app on the Hadoop cluster, forget about it, and continue with next entry in the PDI app.
Due the above design the following point should be noted:
  1. The first time you execute a PMR application, there will be a delay in starting the PMR execution because PDI will copy all the PDI related binaries (JARs) to Hadoop’s distributed cache. This overhead is only for the first time you run given version of PDI. Once the PDI binaries are uploaded, PDI just confirms the existence of the uploaded dir (NOT the actual content of the directory).
  2. If you change a JAR in PDI (i.e. you got a patch that replaces a Jar file), then you should delete the HDFS dir where the PDI binaries are uploaded. This will force the upload of binaries with the change JARs.
  3. A Pentaho MapReduce application can easily take advantage of multithreading within a mapper, reducer, combiner due to the way PDI executes each step in it’s own thread and it’s ability to have multiple instance of a step.
Hopefully this gives you a good understanding of how PDI apps run on a Hadoop cluster. The above design gives PDI user some important benefits:
  1. PDI automagically deploys and maintains itself on the Hadoop cluster.  You do not have to manually maintain Pentaho specific binaries on the Hadoop cluster. No manual copying of Pentaho JARs to all the nodes.
  2. Developers of MapReduce applications can write solutions that perform faster then other tools because you can easily process multiple rows in parallel using threading features of PDI transformation. Yes you could code this in Java MapReduce, but it in PDI it just a few clicks of configuration.
  3. Since you are using the same Kettle engine in the Spoon development environment and in the Hadoop cluster, you can test the logic of PMR transformations within Spoon. This can significantly speed up development time because debugging an app in Spoon without using a Hadoop cluster.

Leave a comment

Filed under Big Data, Hadoop, MapReduce, PDI, Pentaho, Pentaho Data Integration