Using Pentaho Data Integration, Jupyter, and Python Together



The skills needed to operationalize a data science solution are typically divided between data engineers and data scientist. It is rare to find a single individual with all of the skillsets needed to build and deploy a data science solution. Take a look at the following chart from a Stitch Data blog post:


Data scientists are great at developing analytical models to achieve specific business results. However, different skills are needed to deploy a model from the data scientist’s development environment to a scalable production environment. To bring a data science based solution to production, the following functions are typically distributed between data scientists and data data engineers:

  • Data Scientist
    • Model exploration
    • Model selection
    • Model tuning/training
  • Data Engineer
    • Data Prep/Cleansing/Normalizing
    • Data Blending
    • Scaling solution
    • Production deployment, management, and monitoring

You can significantly reduce the time it takes to bring a data science solution to market and improve the quality of the end-to-end solution by allowing each type of developer to perform the tasks they are best suited for in a environment that best meets their needs. By using Pentaho Data Integration with Jupyter and Python, data scientists can spend their time on developing and tuning data science models and data engineers can be leveraged to performing data prep tasks. By using all of these tools together, it is easier to collaborate and share applications between these groups of developers. Here are the highlights of how the collaboration can work::

  1. Allow data engineers to perform all data prep activities in PDI. Use PDI to perform the following tasks:
    • Utilize the available connectors to a variety of  data sources that can be easily configured instead of coded
    • Blend data from multiple sources
    • Cleanse and normalize the data
    • Tailor data sets for consumption by data scientist’s application by implementing the following following types of tasks in PDI:
      • Feature engineering
      • Statistical analytics
      • Classes and predictors identification
    • Easily migrate PDI applications from development to production environments with minimal changes
    • Easily scale applications to handle production big data data volumes
  1. Allow the data scientist to use the prepared data from PDI applications to feed into Jupyter and Python scripts. Using the data engineer’s prepared data, the data scientist can focus on the following tasks in Jupyter/Python:
    1. Model Exploration
    2. Model Tuning
    3. Model Training
  2. Easily share PDI applications between data engineers and data scientists. The output of the PDI application can easily be fed into Jupyter/Python. This significantly reduces the amount of time the data scientist spends on data prep and integration tasks.

This posting will demonstrate how to use these tools together.



Pentaho requirements:

  • Pentaho PDI 8.1+ needs to be installed on the same machine as the Jupyter/Python execution environment.
  • Pentaho Server with Pentaho Data Service. The Pentaho Server can either be running remotely in a shared environment or locally on your development machine. The PDI transformation developed using the Pentaho Data Service must be stored in the Pentaho Server as required by the Pentaho Data Service feature. For details about Pentaho Data Service see the Pentaho help docs here.

Setting up Jupyter and Python environment is beyond the scope of this article. However, you will need to make sure that the following dependencies are met in your environment:

  • Python 2.7.x or Python 3.5.x
  • Jupyter Notebook 5.6.0+
  • Python JDBC dependencies, i.e. JayDeBeApi and jpype


How to use PDI, Jupyter, and Python Together

1. Implement all of your data connection, blending, filtering, cleansing in PDI and have it stored in your Pentaho Server (local server or shared remote server):



2. Use PDI’s Data Service feature to export rows from the PDI transformation to Jupyter. Create a New Data Service and Test within UI.



3. In Jupyter Notebook, implement the following as Python script. First you will include appropriate PDI libraries and then create a connection to the PDI Data Service. Then the script connects to the PDI Data Services. The sample script below assumes you have installed Pentaho Server on your local machine.  If you are running the Pentaho Server on a remote shared server then change the JDBC connection information appropriately.


4. In Jupyter Notebook’s Python script, retrieve all the rows from the PDI Data Service connection and assign them to a Python Pandas data frame.


5. Now that you have the data that was prepared in your PDI transformation in a Python Data Frame, you can experiment with the data by using various Python data science models, libraries and engines (such as SciKit, TensorFlow, and MATLAB). The example below example shows the SciKit Decision Tree.


The above PDI application and Jupyter/Python code is available here.

Leave a comment

Filed under Big Data, Hadoop, Jupyter, PDI, Pentaho, Pentaho Data Integration, Python

Pentaho’s Visual Development for Spark

NOTE: This blog was written when 7.1 was released. The general concepts described in this blog are still valid, however, newer version of PDI have many areas of enhancements. For example, starting with 8.0, ZooKeeper is no longer used by AEL.


Pentaho just announced the release of Pentaho 7.1. Don’t let the point release numbering make you think this is a small release. This is one of the most significant releases of Pentaho Data Integration! With the introduction of the Adaptive Execution Layer (AEL) and Spark, this release leapfrogs the competition for Spark application development!

The goal of AEL is to develop visually once and execute anywhere. AEL will future proof your application from emerging engines. Today you can visually develop application for Pentaho’s native Kettle engine and a Spark cluster. As new technologies emerge AEL will be implemented for other engines so developers don’t need to re-write their application.


With the initial implementation of AEL with Spark, Pentaho brings the power and ease-of-use of PDI’s visual development environment to Spark. Virtually all PDI steps can run in Spark. This allows a developer to build their entire application on their desktop without having to access and debug a Spark cluster. Once you are done testing on your desktop, you simply point the application to Spark resources and run. This saves a huge amount of development and testing time! (Many vendors claim they can do this, but have you actually tried developing an application on your desktop without a Spark cluster to test on and then move that application to a Spark cluster…good luck with that…not possible with other vendors without major changes to your application!)

AEL-Spark Application Development

One of the major benefits of developing a Spark application with PDI is that you do not need access to a Spark cluster (or local mode installation of Spark) during development time. As a developer, you can build and test your entire Spark application on your desktop with sample data. Once you are done you simply point the application to Spark resources and execute. If you have ever debugged a cluster application you will realize the huge productivity boost this gives a developer.

Let’s use the following simple application to understand how you would develop a PDI application on your desktop and then execute in Spark (may not be real world application, but it will be used to demonstrate various concepts of AEL-Spark):

Main PDI Job:


  • Use variables for all input and out files. In the Set variables job step, create variables that are used by all input and output files steps in the subsequent transformations. This will allow us to easily point to different files when testing on desktop and then switching to Spark cluster.
  • Executes 2 transformations in sequence. During development time, this PDI Job and all of it’s transformations will execute on the desktop.
    • Transformation A: For records that have a Product ID, it will lookup additional product info and sort final results by product name.
    • Transformation B: Loads the error records to a database


Transformation A: Lookup and Sort


  • Filters record: Records that do not have product IDs are filtered out and saved in a file separate file.
  • For all records that have Product ID, we lookup the product ID and add additional product information to the row, sort the records by Product Name and store the results out.
  • Once we are done testing on our desktop, this will be the only transformation that will get configured to execute on the Spark cluster.


Transformation B: Load RDBMS


  • Loads the error records to a database.
  • For this discussion, this will always execute this transformation on the same engine as the client. We will not execute this on Spark because it is not a good idea to connect to a database from hundreds of Spark worker nodes at the same time.

The above PDI jobs and transformations are developed and debugged with a sample data file on a laptop without any access to Spark technology by configuring the following:

  • Use variables for all input and out files. In the Set variables job step, create variables and use them for all input and output file steps in the subsequent transformations. During development, all of these should resolve to a file name on any VFS (Pentaho’s virtual file system) supported source or destination. This includes the following steps:
    • Transformation A:
      • Tex file in: Raw Data
      • Tex file in: Product Info
      • Text file out: Error Data
      • Text file out: Sorted Product Data
    • Transformation B: Text file in: Error Data
  • Set Run Configuration for the all transformations to Pentaho local (this is default setting):

Once you have completed testing on your local desktop, you then point the application to your Spark cluster and execute it by making the following changes:

  • Update all input and output files to reference HDFS locations:
    • Transformation A:
      • All Text file inputs (Raw Data,Product Info):  Set to a file or directory in HDFS. If set to a directory, then all files in the directory will be read in.
      • Text file outputs (Error Data,Sorted Product Data): Set to a directory in HDFS. Spark will create a part file for each spark executor that outputs a file for error data. All of the part files will be stored in the given directory.
        (Note: this behavior is a bit different then running this running it from local system and then just outputting the data to HDFS. If you were to just output data to HDFS from a transformation that is running in a Kettle engine via Spoon, Pan, Kitchen, or Pentaho server, then you would just get a single file. However, if you right out to HDFS in Spark, it will always create a folder and put part files. This is because Spark will execute multiple instances of the output step in parallel and each instance will write out to a different file in the folder you give).
    • Transformation B: (Text file input Error Data):  Can be set to a file or directory in HDFS. In our case, we will set to a directory where all the part files from the previous transformations are stored. All files in the directory will be read in.
  • Once you are ready to execute the application you add a new Run Configuration for Spark for Transformation A by right clicking on the Run Configuration in the View tab and selecting New:
    Note: The Spark host URL is the ZooKeeper host and port within the Spark/Hadoop cluster.
  • Set Transformation A to run on the Spark configuration you just added:
  • Then you simply run the the main job as usual.


AEL-Spark Architecture

Before explaining how the above job and transformations are executed, it is important to understand the AEL-Spark architecture as diagramed below:


The main Pentaho components of AEL-Spark are:

  • PDI Clients
    • This includes Spoon, Kitchen, and PDI Server. Pan cannot run a transformation on AEL-Spark because the Run Configurations are not available in the KTRs because Run Configurations are associated with the Transformation job entry.
    • Clients use a new AEL interface to communicate with remote clusters to execute PDI jobs and transformation. For 7.1, PDI Jobs will execute on PDI client with the existing Kettle engine and transformations that are identified to run on Spark will be sent to AEL-Spark Daemon.
    • A PDI Job can contain a mix of locally executed transformations (via Kettle engine) and Spark executed transformations.
  • AEL-Spark Daemon
    • Currently (in 7.1) there is a single instance of this process/JVM that is run at an edge not of the Hadoop/Spark cluster.
    • On startup, the Daemon registers with a ZooKeeper cluster.  ZooKeeper must be deployed as part of the Spark/Hadoop cluster. Future enhancements will allow you to configure multiple Daemons to address fault tolerance and scalability.
    • The Daemon is responsible for starting and managing AEL-Spark Engines in a separate process/JVM for each PDI transformation that is executed on the Spark Cluster.
    • The Daemon creates a two-way connections to PDI client. This connection is used to relay transformation metrics and logs from AEL-Spark Engine to PDI clients.
    • Daemon logs are local to where daemon is running, not sent back to PDI
  • AEL-Spark Engine
    • Executes a PDI transformation on Spark cluster. An instance for each PDI transformations.
    • Works as a Spark Driver in Spark Client mode to execute a transformation (see Spark Resource Manager and YARN App Models for details on Spark client and driver).
    • Parses PDI transformation and creates an execution plan to determine how each step will be executed (in parallel mode or single instance).
    • The first input step of a transformation is evaluated by the Engine to generate the initial RDD (unless the input isin HDFS…details are given below).

Note: Installation and configuration of AEL-Spark components are beyond the scope of this article. I assume that the user has setup PDI components detailed in the 7.1 docs.


AEL-Spark Execution

Prior to 7.1, there was only one engine supported by Pentaho, the Kettle engine. The Kettle engine performed two major tasks:

  • Loaded plugins for all jobs and transformations.
  • Executed the job entires and transformation steps by using threads and passing data between the entries/steps.

With AEL-Spark, Pentaho has completely re-written the transformation execution engine and data movement so that it loads the same plugins, but uses Spark to execute the plugins and manage the data between the steps.

When you begin executing a PDI Job, each entry in the job is executed in series with the Kettle engine of the PDI Client.  When that Kettle engine encounters a transformation entry that is configured to run on a Spark cluster, the following steps occur for that transformation:

  1. The PDI client connects to ZooKeeper to request an AEL-Spark Daemon.
  2. The PDI connects to the the AEL-Spark Daemon and provides the Daemon the transformation KTR and PDI application environment settings. This connection between the PDI client and AEL-Spark Daemon remains open for the duration of execution of the transformation.
  3. The AEL-Daemon then starts a new AEL-Spark Engine in a new process/JVM. The new AEL-Spark Engine is provided the transformation KTR and PDI application environment settings.
  4. The AEL-Spark Engine creates an execution plan form for the KTR. The Engine decides where to execute each PDI step based on the following characteristics:
    1. The first input step is executed within the AEL-Spark Engine:
      1. If the input step is an HDFS directory or file, then the it is given to Spark to convert it into RDD(s) and distribute the RDD partitions to the cluster.
      2. If the input step is not reading from HDFS (it can be reading from local disk, a DB, or Kafka stream), then AEL-Spark Engine will generate RDD(s) and distribute the partitions of the RDD to the Spark cluster.
    2. All other steps are then executed in one or more Spark worker nodes based:
      1. If a step is not allowed to run in parallel, then Spark will run it on only a singe worker node. This will force all the data to be brought back to that single node.
      2. If a step is allowed to run in parallel, then Spark will execute the plug in on worker nodes where the RDD partitions are cached.Note: The AEL-Spark Engine determines which steps cannot be run in parallel and must execute on a single worker node by looking at a list of plugins defined by the forceCoalesceSteps parameter in the following file on the AEL-Spark Daemon system: [AEL-SparkDaemonInstallDir]/data-integration/system/karaf/etc/org.penatho.pdi.engine.spark.cfg
        The default list only contains EE supported components that cannot be parallelized.  Users can add additional steps they do not want to run in parallel (like some custom or marketplace steps).
  5. As Spark executes all of the plugin’s, the PDI logs are written to YARN logs and sent back to the client via the AEL-Daemon connection to the client. In addition, the AEL-Daemon also collects metrics from the AEL-Spark Engine and sends them back to the client.


Filed under Adaptive Execution Engine, AEL, Big Data, Hadoop, MapReduce, PDI, Pentaho, Pentaho Data Integration, Spark

Send Pentaho MapReduce Output to Multiple Destinations

When implementing MapReduce application using Pentaho PDI, the mapper and reducer can only send data out to a single destination directory when using the Hadoop Output step. The mapper outputs data is sent to a temporary directory on the node where it is executing and is only temporary. The reducer outputs to the output directory in HDFS that is configured in the Pentaho MapReduce job entry.

PDI does not support Hadoop’s native features that provides an output formatter that allows you to write MapReduce output to multiple destinations. However, the following method can be used to write output data to multiple destinations within a mapper or a reducer.

  1. In the mapper or reducer, use the Pentaho Hadoop File Output step (or Text Output step) to write data to files in HDFS. These files will be in addition to the standard mapper/reducer output files that are generated by the Hadoop Output step.
  2. The directory that the Hadoop File Output step stores the file under cannot be the same directory as the output directory configured in the MapReduce job entry.
  3. You need to programmatically create a unique name for the file if the Hadoop File Output step in all the mapper and reducers are writing to the same directory. You can have multiple mapper and reducers executing at the same time in your cluster, so you have to make sure each instance of the mapper or reducer creates a unique file in the HDFS name space.  Only one mapper or reducer can write to an HDFS file at the same time.

The diagram below is an example KTR that functions as a reducer to the Pentaho MapReduce job:


Diagram notes:

  1. The Filter Rows step is used to send some rows to the standard Hadoop Output step and other rows to the Hadoop File Output step.
  2. The Hadoop Output step will send the data to HDFS directory that is configured as the output directory in the Pentaho MapReduce job entry (PDI Job not shown).
  3. The Hadoop File Output step creates another file in HDFS to store all rows less than 10. The actual filename contains timestamp that has a milliseconds resolution so no two mappers will create the same filename in the same HDFS dir (note the resolution of the timestamp is milliseconds).

The above design can be much more powerful then the native Hadoop API because PDI has connector to many technologies. You can use this design to write out to NoSQL data store such as Cassandra on MongoDB and publish rows to a Kafka bus.

Leave a comment

Filed under Uncategorized

Overview of Pentaho MapReduce Integration with Hadoop

There seems to be some confusion in how Pentaho Data Integration’s MapReduce functionality works with a Hadoop cluster. In this posting I will explain the basics of how Pentaho MapReduce (PMR) integrates with the Hadoop’s MapReduce. I assume you know the basics of PDI application and Hadoop MapReduce development but let’s review some PDI basics that are relevant to this discussion:
  1. PDI is a java-based ETL engine, but you do not have to write Java to implement your ETL.
  2. PDI applications are driven by the workflow(s) that you define.    The workflows are stored in XML files.
  3. PDI’s Kettle engine interperpates the XML and executes the application.
  4. The Kettle engine is embedded in all of the following tools (For the remainder of this discussion, when we say “PDI”, it will imply any of these apps):
    1. Spoon – A graphical design tool for designing , executing and testing PDI  workflows.  Most commonly used during development.
    2. PDI Server – The application that runs the Kettle engine in a web container such as Apache Tomcat  server.
    3. Kitchen – Command line script uses the Kettle engine to execute PDI jobs.
    4. Pan – Command line script that starts the Kettle engine to execute PDI transformation
When implementing a PMR application, you must break your problem into a MapReduce application. PDI does not automagically convert an existing PDI application to a MapReduce. So lets review the components that constitute a MapReduce application and how each is represented in PDI:
  1. MapReduce Driver Application
    1. This is the code that configures the Hadoop job and submits it to the Hadoop cluster.
    2. In PDI, this is the Pentaho MapReduce job entry. In this entry you provide the following type of information:
      1. PDI transformations to use for mapper, combiner, and reducer.
      2. Hadoop cluster information (i.e. Name Node hostname/port, etc…)
      3. Input/Output Formatters
      4. Input/Output HDFS directories.
      5. Any  Hadoop parameters needed by your application.
  2. Mapper/Reducer/Combiner
    1. This is the actual application to process your data using the MapReduce paradigm.
    2. In PDI, you write a unique transformation for mapper, combiner, and reducer. Only a mapper is mandatory.
    3. Each transformation that is used as a mapper, reducer, or combiner must have an instance of the Hadoop Input step and the Hadoop Output step. The rest of the transformation is your logic to process your data. (There are some steps that are suited for Hadoop environment…but that another topic for another day).
It is important to understand where various components of your application PDI application execute. Let’s say we have a sample application that is implemented in a PDI job (a *.kjb) that will orchestrate all aspects of your application in sequential order. This main PDI job contains the Pentaho MapReduce job entry (the Hadoop driver). All the entries in your main PDI job will execute on the machine where you are running the PDI server, Spoon, or Pan.  When the Kettle engine interperpates the PMR step within this main job, it will do the following:
  1. The first time you run a PDI application on a Hadoop cluster, PDI will upload all of its’ dependent binaries (JARS) to HDFS an directory and mark that directory to be part of Hadoop’s Distributed Cache. If the target HDFS directory already exists, then it will not upload anything. PMR uses Hadoop’s Distributed Cache to distribute PDI’s libraries to all the nodes in the cluster.
    HDFS directory’s name contains PDI versioning information. So if you upgrade PDI to a new version, then PDI will automatically upload the new version to a new HDFS target directory.
  2. Once the Kettle engine and all of it’s dependencies are uploaded to the Hadoop cluster, PDI will submit a custom Hadoop MapReduce job that will instantiate the Kettle engine within the Mapper, Combiner, and Reducer. The Kettle engine within this custom mapper/combiner/reducer is given the appropriate mapper/combiner/reducer transformation configed in the PMR entry. The Kettle engine embedded in the mapper/combiner/reducer process will then intemperate and execute the transformation in Hadoop cluster in the context of the mapper, reducer, combiner process.
  3. In the PMR entry, under the Cluster tab, if the Enable Blocking option is checked, then the main PDI job that contains the PMR will be blocked waiting for the PMR Hadoop job to complete. Once the PMR Hadoop job completes, then the next entry in the PDI main job will continue.
  4. If the Enable Blocking is not checked, then PDI server will just submit the PMR Hadoop job to the Hadoop cluster and continue with the next entry in the PDI man job. You will not get completion status PMR Hadoop job. Basically fire off the PMR app on the Hadoop cluster, forget about it, and continue with next entry in the PDI app.
Due the above design the following point should be noted:
  1. The first time you execute a PMR application, there will be a delay in starting the PMR execution because PDI will copy all the PDI related binaries (JARs) to Hadoop’s distributed cache. This overhead is only for the first time you run given version of PDI. Once the PDI binaries are uploaded, PDI just confirms the existence of the uploaded dir (NOT the actual content of the directory).
  2. If you change a JAR in PDI (i.e. you got a patch that replaces a Jar file), then you should delete the HDFS dir where the PDI binaries are uploaded. This will force the upload of binaries with the change JARs.
  3. A Pentaho MapReduce application can easily take advantage of multithreading within a mapper, reducer, combiner due to the way PDI executes each step in it’s own thread and it’s ability to have multiple instance of a step.
Hopefully this gives you a good understanding of how PDI apps run on a Hadoop cluster. The above design gives PDI user some important benefits:
  1. PDI automagically deploys and maintains itself on the Hadoop cluster.  You do not have to manually maintain Pentaho specific binaries on the Hadoop cluster. No manual copying of Pentaho JARs to all the nodes.
  2. Developers of MapReduce applications can write solutions that perform faster then other tools because you can easily process multiple rows in parallel using threading features of PDI transformation. Yes you could code this in Java MapReduce, but it in PDI it just a few clicks of configuration.
  3. Since you are using the same Kettle engine in the Spoon development environment and in the Hadoop cluster, you can test the logic of PMR transformations within Spoon. This can significantly speed up development time because debugging an app in Spoon without using a Hadoop cluster.

Leave a comment

Filed under Big Data, Hadoop, MapReduce, PDI, Pentaho, Pentaho Data Integration

Full Outer Join using Pentaho MapReduce

This article will demonstrate how to implement a full outer join using PDI MapReduce (PMR). In one of my previous blogs (here), I discussed different methods of joining data in Hadoop environment. That article provided general guidelines about joining data within Hadoop using PDI.  This post will provide a PDI application that implements a full outer join with the following features:

  • Implements the Reduce Side join design pattern (for detailed explanation of this design pattern read this)
  • Both data sets are in HDFS. You can easily extend the existing application to join more then two data sets.
  • In the resulting joined data, for each row, we indicate which datasets the joined data was taken from.

This implementation (and the Reduce Side join design pattern in general) is best for joining large data sets that are in HDFS.

Before explaining the PMR solution, I would like to point out that the easiest way to do this type of join is to use either a Pig script or a Hive/Impala SQL JOIN statement. However, if you do not want to use Pig/Hive/Impala, then the following reference PMR implementation can be used.

The sample inputs to the application are two data sets:

 Dataset A:

3,3738,New York,NY
4,12946,New York,NY
5,17556,San Diego,CA

Dataset B:
5,44921,Small World
5,44920,Big Universe
3,48002,Good Bye
8,48675,Thank You

The first column in both datasets is the join key. The final output of the full-outer join is:

3,X,3,3738,New York,NY,3,48002,Good Bye
3,X,3,3738,New York,NY,3,35324,Hello
4,A,4,12946,New York,NY,,,
5,X,5,17556,San Diego,CA,5,44920,Big Universe
5,X,5,17556,San Diego,CA,5,44921,Small World
8,B,,,,,8,48675,Thank You

  • First column is the the join key.
  • Second column indicates which data set the data originated from. The value of X indicates data came from both sets. The value of A indicates that only data from dataset A was present. The value of B indicates only data from dataset B was available.
  • The rest of the fields are the joined data.

You can download this PDI solution here: full_outer_join_pmr

You should start looking at the PDI Job implemented in main.kjb. This PDI job performs the following tasks:

  1. Configures variable for Hadoop and various other parameters used in the application. You will need to configure the Hadoop cluster as needed in the Init Vars job entry.
  2. Copies sample datasets from local file system to HDFS.
  3. Calls the add_row_type PDI job which calls the map only Hadoop job on each dataset to tag each row with an identifier indicating source dataset.
  4. Calls the PMR job to join the two data sets using Pentaho MapReduce job entry and User Defined Java Class transformation step.

The sample app has been tested with the following software:

  1. Pentaho Data Integration 5.3
  2. Cloudera  CDH5.2

Leave a comment

Filed under Big Data, Hadoop, MapReduce, PDI, Pentaho, Pentaho Data Integration

Processing XML and JSON in Hadoop using PDI

Hadoop can efficiently process large amounts of data by splitting large files into smaller chunks (typically 128 MB) and process each chunk in parallel.  The “splittability” of a file is central to the efficient handling of the file by MapReduce. A file format is splittable if you can start processing records anywhere in the file.  A splittable file cannot have a file structure that requires you to start reading from the first line of a file.  Therefore XML and JSON documents that span multiple lines are not splittable. You cannot just start processing an XML document in the middle because you need the starting and ending tags of the XML hierarchy. There are three possible ways to handle these types of files in Hadoop:

  1. Store the XML in HBase an column. The process the HBase table, extract the column that has the XML, and process the the XML as needed in a Mapper. The main downside of this solution is that it forces you to use HBase. If you decide to use this method, you can use PDI’s HBase Row Decoder step to get the HBase data into a Pentaho MapReduce (See: this posting to see how to use this step in PMR )
  2. Remove all line breaks from doc and create a single line that contains the entire XML/JSON data as a single record. If using the a text based input formatter for the mapper/reducer, then you can strip all the line breaks from that XML doc or JSON object so that the entire XML doc or JSON object appears as a single string record in the text file. You then put the single line in an HDFS file. You put multiple XML docs or JSON objects in separate lines in the same HDFS file.  When your mapper function receives the record, it will get the entire XML doc or JSON obj as a single line. Although I have seen this solution implemented, I do not recommend this solution because if your XML/JSON has data that requires line breaks that are not encoded then you will loose the line break formatting (i.e. you may have this issue if using CDATA XML tag).
  3. Use a custom input formatter. This is probably the most popular solution. This requires you to implement Hadoop’s Java classes for writing custom input formatter and record reader classes. The Apache Mahout projects has one of the most widely used implementations of this solution (see Mahout’s XmlInputFormat implementation).  Although this is probably the most popular method of processing XML files in MapReduce application, I find this not to be the most efficient because you effectively have to parse the XML document twice: once to get the entire doc as a single record that gets passed to your mapper, and a second time in you mapper when your mapper code will probably parse it for further processing.
  4. Store the data in binary/hex format. In this solution you would take the text data and put in a binary (byte array) format that can be stored in a single line. Then write binary format as a single line of string to an HDFS file. You put multiple “stringified” binary representations of XML/JSON text in separate lines in the same HDFS file. Then in the mapper you reverse the binary string format to the same textual string format. This will preserve all the formatting and special characters that your XML/JSON contains.

The last option is preferred method because it does not have the issues of the first few options.  The last option is also very easy to implement in PDI.  If you are writing your application in Java, you could implement it using Hadoop’s sequence file format. However, the current release of PDI cannot natively write out to a sequence file without. You could write custom Java code in PDI to write out to a Hadoop sequence file, but there is an easier way to get the same benefits of without using sequence files.

The attached sample PDI application demonstrates how to process non-splittable text formats such as XML and JSON in a MapReduce application. This example pre-processes XML docs into a hex string that can be written out to a single line. Each line can then be read into a mapper as a single record and converted back to XML and processed using PDI’s XML parsers. It uses a Java Expression PDI step to convert between XML string and its hex representation.

You can download this PDI solution here: xml_processing

You should start looking at the PDI Job implemented in master_job.kjb. This PDI job performs the following tasks:

  • Configures variable for Hadoop and various other parameters used in the application. You will need to configure the Hadoop cluster as needed in the Set Variables job entry.
  • Calls the store_binary_hdfs transformation that will read all the XML docs in the data directory, convert them to Hex strings, and write them out to a single HDFS files.
  • Runs a Pentaho MapReduce (map only) application to parse the XML and extract the wanted fields to a CSV file that is sent to the mapper output.

The sample app has been tested with the following software:

  1. Pentaho Data Integration 5.2
  2. Cloudera  CDH5.1

Leave a comment

Filed under Big Data, Hadoop, JSON, MapReduce, PDI, Pentaho, Pentaho Data Integration, XML

How to get Hadoop counters in PDI applications

Hadoop maintains various counters about MapReduce jobs in the JobTracker. These counters can be viewed using the JobTracker’s web UI but are not easily accessible within a PDI application. This solution shows how you can retrieve the Hadoop counters from a PDI applications.  The attached solution implements the word count MapReduce application using Pentaho MapReduce. Once the Penatho MapReduce is complete, it collects and logs all the Hadoop counters. The solution contains the following files/folders:

  1. data – directory that contains sample files that will used for word count. These files will be copied to HDFS for performing the word count MapReduce application.
  2. wordcount.kjb – This is the main PDI application that coordinates all the steps required to perform the word count. It performs the following tasks:
    1. Creates a unique Hadoop Job Name  for our MapReduce application.
    2. Sets variables to configure the Hadoop cluster.
    3. Executes the word count MapReduce application using Pentaho MapReduce.
    4. Retrieves the Hadoop counters and logs them to Kettle’s log file.
  3. wc_mapper.ktr – Transformation that implements the map phase of the word count application.
  4. wc_reducee.ktr – Transformation that implements the reduce phase of the word count application.
  5. capture_hadoop_metrics.ktr – This transformation collects all the Hadoop counters in a User Defined Java Class and outputs the information to logs. The UDJC uses Hadoop’s native API to retrieve the counters and output a single row for each counter.

In order for this solution to work, the following requirements must be met:

  1. Hadoop job has not been retired. Hadoop’s Java API only only retrieves job and counter data for non-retired jobs.
  2. Hadoop Job Name must be unique. Information about a Hadoop job is retrieved using the PMR Job Name. This name must be unique for all non-retired Hadoop jobs.
  3. PDI uses Hadoop’s Java API to retrieve the counters. Therefore all required Hadoop libraries (JARs) must be copied to the appropriate Pentaho application lib directory. For example, if you are using CDH4.x  and run the sample app in Spoon 5.x, then copy all of Hadoop client libs from [PentahoHome]/design-tools/data-integration/plugins/pentaho-big-data-plugin/hadoop-configurations/cdh42/lib/client/* to [PentahoHome]/design-tools/data-integration/lib.

You can download this PDI solution here: get_hadoop_metrics.tar.gz

The sample app has been tested with the following software:

  1. Pentaho Data Integration 5.x
  2. Cloudera  CDH4.3


Filed under Big Data, Hadoop, MapReduce, PDI, Pentaho, Pentaho Data Integration

Uploading custom JARs to Hadoop for PDI

Many PDI applications require third-party Java libraries to perform tasks within PDI jobs and transformations. These libraries must be included in the class path of Hadoop mappers and reducers so PDI applications can use them in the Hadoop cluster. The best way to do this is to copy all dependent JARs to Hadoop’s Distributed Cache and add the following parameters to the Pentaho Map Reduce job step (in the User Defined tab):

  • mapred.cache.files
  • mapred.job.classpath.files

The process of uploading the Custom JARs can be automated by implementing  a PDI transformation that does the following:

  1. Take a list of the JARs the PDI application requires.
  2. Copy all the files from the local filesystem to a configured HDFS dir.
  3. Set a global variable that has a list of all the JARs with the fully qualified path in HDFS. This variable is then used to set the user defined variable given above in the Pentaho MapReduce job step.

You can download this PDI solution here.

This solution has been tested with PDI 4.4.

1 Comment

Filed under Big Data, Hadoop, MapReduce, PDI, Pentaho, Pentaho Data Integration

How to join big data sets using MapReduce and PDI

When doing lookups for data that resides inside a Hadoop cluster, it is best to have both the data sets you want to join reside in HDFS.  You have several options when using PDI and Hadoop cluster technology to join data sets inside the Hadoop cluster.  The specific solution is dependent of the amount of data that is in the tables:

  1. PDI’s Hadoop File Input with Stream Value Lookup step
    1. This solution is best when the lookup data is very small
    2. Put lookup file in Hadoop’s Distributed Cache
    3. See Joins with Map Reduce for example on how you would do this Java. In PDI you would simply add the file to Hadoop’s distributed cache and then use PDI’s Stream Lookup step  (See here on how to add to Hadoop’s distributed cache in PDI )
  2. PDI’s HBase Input Step with Stream Value Lookup
    1. This solution is best when the lookup data is small
    2. This will perform full range scans of the HBase tables, so if the HBase table is large this could be slow.
    3. HBase configuration also impacts the performance.
  3. Directly use HBase API with PDI’s User Defined Java Class
    1. This solution is best when the lookup data is large and is in HBase tables
    2. You will need to write the HBase API with the Pentaho UDJC. You must be familiar with the HBase API features to get the best performance (Use HBasePools as static variable and try to apply as many filters in a single request to minimize the calls out to HBase).
    3. HBase configuration also impacts the performance.
  4. MapReduce Joins
    1. This solution is best when both the number of input data rows and the total number lookup data is large.
    2. Both of these data sets are in HDFS.
    3. This can also be done using Pig/Impala/Hive instead of actually writing the low-level MapReduce.

In this posting I will detail a solution for the forth option. Before explaining the process of implementing an actual MapReduce, I would like to point out that the easiest way to do this type of join is to use either a Pig script or a Hive/Impala SQL JOIN statement. 

However, if you do not want to use Pig/Hive/Impala to do the joins then the following reference implementation can be used.

When you have two huge data sets that are in HDFS, you can join them using MultipleInputs feature of Apache Hadoop’s Java API when setting up the Hadoop job (Detailed example can be found here). This feature is not available with PDI (as of PDI 4.4 GA).  The attached example demonstrates how to implement the forth solution in PDI. The basic design of the solution is:

  1. Create a map only task for each data set. The mapper adds a row type to each row and writes it back out to HDFS. The row type identifies which data set the row originated from. Note that the number of reducers is set to zero. This will disable the shuffle, sort, and reduce parts of the process to make it more efficient.
  2. Take the output of the mappers and copy them to a single HDFS directory.
  3. Run a MapReduce job that joins the data sets where the input dir contains both data sets with the row type appended to each row.
    1. The mapper basically performs any data cleanup or massaging of data that you want. The mapper output is the the data from each row and the key is the join key.
    2. The reducer then joins the data from both rows and outputs the key and joined data.

You can download this solution here.


Filed under Big Data, Hadoop, MapReduce, PDI, Pentaho, Pentaho Data Integration

How to create custom reports using data from MongoDB

This demonstrate below show how to visually create a report directly against data stored in MongoDB (with no coding required).  The following topics are shown:

  1. Pentaho Data Integration tool is used to create a transformation that does the following:
    1. Connect to and query MongoDB.
    2. Query results are sorted.
    3. Sorted results are grouped.
  2. Pentaho Report Designer is used to visually create a report by using the data from a PDI transformation.

1 Comment

Filed under Big Data, MongoDB, PDI, Pentaho, PRD