Skip navigation

Pentaho

4 Posts authored by: Greg Graham Employee

Pentaho Data Integration comprises a powerful high throughput framework for moving data through a network of transformation steps, turning data from one form into another.  PDI is an excellent choice for data engineers because it provides a intuitive visual UI so that the engineer gets to focus on the interesting business details.  PDI is also easily extendable.

 

PDI traditionally focuses on large scale re-aggregation of data for data warehousing, big data, and business intelligence, but there are lots of other interesting things you can contemplate doing with embedded PDI.  Coming from a high frequency trading background, when I looked at the default list of transformation steps I was surprised to not see a generalized UDP packet processor.  Data transmitted over UDP is everywhere in the trading world - market data often comes in UDP and so do custom prompt trading signals.   So I ended up creating a toy implementation of transformation steps UDPPacketSender and a UDPPacketReceiver to send/receive data over UDP.

 

Part I of this blog post will introduce some concepts describing what UDP is and is not, an introduction to packet handling in Java, and how to write transformation steps for PDI.  It is intended to be an introduction, and is not exhaustive.  Where applicable, links are provided to further reading.  Code is provided separately in a GitHub repository for educational purposes only, no warranty is expressed or implied.   Part II of this blog post will include some kettle transformations and demonstrations of the UDPSender and UDPReceiver in action.

 

What is UDP?

 

So what is UDP anyway and why should we care about it?  Information is generally sent over the internet in one of two protocols: UDP and TCP.  UDP stands for User Datagram Protocol, and it is a lightweight, connectionless protocol that was introduced in 1980 by David Reed (RFC 768 - User Datagram Protocol  ) as an alternative to the comparatively high overhead, connection-oriented network Transmission Control Protocol, or TCP.  Among the properties of UDP:

  • Connectionless
  • Does not guarantee reliable transmission of packets
  • Does not guarantee packet arrival in order
  • Multicast/Broadcast

By contrast, TCP is connection oriented and has protocol mechanisms to guarantee packet transmission and ordered arrival of packets.  But it cannot do Multicast/Broadcast, and TCP has higher overhead associated with flow control and with maintaining connection state.  So UDP is a good candidate when latency is more important than TCP protocol guarantees, or when packet delivery guarantees can be ignored or engineered into an enterprise network or into the application level.

Much of the time, UDP packet loss occurs simply because intervening network devices are free to drop UDP packets in periods of high activity when switch and router buffers fill up.  (It's rarely due to an errant backhoe, although that does happen, and when it does it's a problem for TCP as well :-)

 

For many traditional UDP uses like VoiP, packet loss manifests as hiss and pop and is mitigated by the filtering and natural language processing capabilities of the human brain.  For many other uses, like streaming measurement data, missing measurements can be similarly imputed. But, it is important to note, UDP packet loss may not be a problem at all on enterprise networks provided that the hardware is deliberately selected and controlled to always have enough buffer.  Also it should be noted that socket communication is an excellent mode of inter-process communication (IPC) for endpoints in different platforms on the same machine.

 

Finally, if packet loss is a problem, a lightweight mitigation protocol may be implemented in the application on top of UDP.  More information on the differences between UDP and TCP can be found here: User Datagram Protocol or Transmission Control Protocol.

 

Creating PDI Transformation Steps

 

There is excellent documentation on how to extend PDI by creating your own transformation steps https://help.pentaho.com/Documentation/8.0/Developer_Center/PDI.  The most relevant information for creating additional PDI plugins is located at the bottom of the page under the heading "Extend Pentaho Data Integration".

 

In a nutshell, you'll need a working knowledge of Java and common Java tools like eclipse and Maven.  First we'll explore sending and receiving packets in Java, and move on to a discussion of threading models.  Then we'll move on to a discussion of implementation of the four required PDI interfaces.

 

Sending and Receiving UDP Packets in Java

 

Since buffer overflow is usually the main culprit behind lost UDP packets, it is important to grab packets from the network stack as quickly as possible.  If you leave them hanging around long enough, they will get bumped by some other packet.  So a common pattern for handling UDP packets is to have a dedicated thread listening for packets on a port, read the packets immediately and queue the packets up for further processing on another thread.  Depending on how fast packets are coming in and how burst-y the incoming rate is, you may also need to adjust buffer size parameters in the software or even on the network device directly. 

This implementation uses classes in the java.nio package.  These classes are suited for high performance network programming.  Two classes of interest are ByteBuffer and DatagramChannel.  You can think of ByteBuffer as a byte array on steroids:  it has get and set methods for each basic data type and an internal pointer so that data can be read and written very conveniently.  Also, ByteBuffer has the possibility to use native memory outside of the Java heap, using the allocateDirect() method.  When passed to a DatagramChannel, then received data can be written to the native memory in a ByteBuffer without the extra step of copying the data initially into the Java heap.  For low latency applications this is very desirable, but it comes at the cost of allocation time and extra memory management.  (For both of these reasons,  it's a good idea to pre-allocate all of the ByteBuffers anticipated and use an ObjectPool.)

 

Threading Model

 

A word of caution is in order here.  Multi-threaded application programming is challenging enough on its own, but when you are contemplating adding threads inside of another multi-threaded application that you didn't write, you should be extra careful!  For example, PDI transformations are usually runnable within Spoon, the UI designer for PDI transformations.  Spoon has start and stop buttons, and it would be extra bad to cause a transformation to not stop when the user hits the stop button because of a threading issue, or to leak resources on multiple starts and stops.  Above all, don't do anything to affect the user experience.

If at all possible, when designing a transformation step that involves some threading, look at a similar example or two.  When creating this implementation, I looked at the pentaho-mqtt implementation on GitHub located at pentaho-labs/pentaho-mqtt-plugin.  This was very useful because I could look at how the mqtt-plugin developers married a network implementation capable of sending/receiving packets to PDI.  The model I ultimately settled upon has the following characteristics:

  • A startable/stoppable DatagramChannel listener with a single-use, flagged thread loop, and an "emergency" hard stop in case a clean join timeout limit was exceeded.
  • Assumes that the putRow() method is thread safe.
  • Uses an object pool of pre-allocated ByteBuffers to avoid inline object construction costs.
  • Uses a Java thread pool (ExecutorService) to both process packets and to serve as a packet queue.

Looking at the pentaho-mqtt-plugin and understanding the flow of data and the initialization sequences greatly speeded up my own development efforts.

 

Implementation

 

Each PDI transformation step has to implement all of the following four interfaces.

  • StepMetaInterface: Responsible for holding step configuration information and implementing persistence to XML and PDI repository.
  • StepDataInterface: Responsible for keeping processing state information.
  • StepInterface: Implements the processing logic of the transformation.
  • StepDialogInterface: Implements the configuration UI dialog for the step.

I'm not going to go into a lot of detail on the coding here, but the source code is provided for reference at the GitHub repositories

  • ggraham-412/ggutils v1.0.0 contains utility classes that implement logging that can be injected with alternative logging implementations at runtime, an object pool implementation, and UDP packet sender/receiver objects that wrap DatagramChannel and ByteBuffer objects together with a simple packet encoder/decoder.
  • ggraham-412/kettle-udp  v1.0.0 contains the implementations of the UDPReceiver and UDPSender steps

(Again, the code is provided for educational purposes only and no support or warranty is expressed or implied.)

The basic flow for UDPSender is simple.  We're expecting data to come in from PDI into the UDPSenderStep.  So in the processRow() method (which is invoked by the framework on every row) we basically get a row of data, find the fields we're interested to send, and send them on a UDPSender from the ggutils library.  If the row we get is the first one, we open the UDPSender ahead of processing.  If the row we get is null (end of data) then we close the UDPSender and signal that the step is finished by returning false.

 

The flow in UDPReceiver is a little more complicated because it is ingesting externally generated data from the point of view of PDI.  Going from the pentaho-mqtt-plugin model, the processRow() method is used only to determine a stop condition to signal to the framework.  Since there are no rows coming in, we use the init() method to start up a UDPReceiver from ggutils listening on the configured port.  The UDPReceiver has a handler method so that whenever a packet is received, it is passed to the handler.  The handler will unpack the packet into a row and call putRow().  The packets are handled on a thread pool, so we do potentially process fast arriving packets out of order.  When the number of packets received hits a maximum, or when a time limit expires, processRow() will return false to the framework stopping the transformation.  The UDPReceiver is stopped with a boolean flag, but just in case the thread does not stop for unknown reasons, it is hard-stopped after a join timeout to avoid leaking resources.

 

The settings UI dialogs are created using the Standard Widget Toolkit (SWT), which is a desktop UI library available for Java.  Note that there are many excellent custom widgets that already exist for use with Spoon, such as the table input widget.  To maintain the look and feel of PDI, it is best to work from an existing example dialog class from GitHub using these widgets.

 

Build and Deploy

 

In order to build the PDI transformation steps, import both of the above ggutils and kettle-udp projects into eclipse.  kettle-udp uses Maven, so make sure that the m2e eclipse extension is installed.  Finally, if you're not familiar with building Pentaho code using Maven, you should visit the GitHub repository Pentaho OSS Parent Poms for important configuration help.  (Big hint: you should use the settings.xml file given at the above repo in the maven-support-files in your ~username/.m2 folder to make sure your build tools can find the appropriate Pentaho repositories.)

 

Once everything is building, export a jar file called UDPReceiver.jar.  In order to see the new steps in Spoon, copy the jar file to the folder data-integration/plugins/steps/UDPReceiver.  The folder should have the same name as the jar file it contains.  You should be able to see the UDPReceiver step in the Input category and the UDPSender step in the output category when you start up Spoon the next time.

 

i18n and l10n

 

Pentaho contains facilities for internationalization and localization of strings and date formats.  The repository above contains en_US strings, but alternates could easily be provided depending on the language/locale of your choice.  In order to provide for localized strings in the application, simply include a messages package in your project structure (it should match the name of the target package plus ".messages") containing a messages_en_US.properties file.  The file should contain properties like "UDPSenderDialog.GeneralTab.Label=General", and the dialog code retrieves the string with a call to BaseMessages.getText(class, key), where class is a Java class in the target package, and key is a string in the properties file.  So for example,  BaseMessages.getString( UDPSenderMeta.class, "UDPSenderDialog.GeneralTab.Label") will return the string "General" if the locale in Spoon is set to en_US.

Note: to troubleshoot this during development, make sure that the messages properties file is actually deployed in the jar file by inspection with an archiver like 7Zip, and that the properties files are included in the build CLASSPATH.  Also you may have to change the locale back and forth away from and back to the desired locale in Spoon to flush cached strings.

 

Conclusions

 

Data sent via the internet is generally sent using either TCP or UDP protocols.  In this blog post, we looked at the characteristics of UDP, and highlighted the basic techniques for handling UDP packets in Java.  Then we showed how those techniques could be incorporated into a PDI transformation step and provided a reference implementation (for educational purposes only.)

 

In part II of this post, we will put these ideas into action with some demonstrations using Kettle transformations.

Back when I took the Johns Hopkins Data Science track on Coursera, one of my homework assignments for the Developing Data Products course was to create a dynamic tool using R and Shiny that would graphically demonstrate the Central Limit Theorem (CLT).  The CLT says that if you take the means of groups of random numbers, then the means will be normally distributed no matter what the underlying distribution of random numbers looks like.  The finished assignment included a histogram that reacted dynamically to user input including the number of random numbers to generate, how many per group, and so on.  The Shiny application would detect user input and dynamically update the histograms in R.

 

A couple of weeks ago, I wrote about implementing histograms using PDI and CTools: Creating CDE Histograms using PDI and a User Defined Java Class.  If we want a CTools histogram to be dynamic in the sense of my R/Shiny homework assignment, then we need to send parameters from the Ctools dashboard to the kettle transformation.  Fortunately this is easy to do.  The following example was produced using the Pentaho BA Server 7.1, and the complete source is attached.  To see it work, simply upload the attached CentralLimitTheorem zip file to an empty folder in the BA Server.

 

The first step is to rework the histogram kettle transform example slightly from last week's example.  To demonstrate the CLT, we'll use only the exponentially distributed random numbers, and so we'll remove the rest.  Next, we'll want to add  group structure to the random number generation.  We can do this as shown in Figure 1 by adding an incrementing row id using a Sequence step, adding a group number (= row id modulo number of rows in a group) using a Calculator step, and finally emitting a group average using the Group By step.  Also we use a Get Variables step and an Add Constants step to define histogram dimensions from user input instead of a Data Grid.

Figure 1: Taking average over groups of rows using Sequence, Calculator, and Group By steps.

 

Next, we want to create a dashboard that uses this transformation.  The basics of how to do this are described in my blog post fromlast week, but briefly, the new kettle transformation is uploaded through the Pentaho User Console, a new CDE dashboard is created, and a kettleTransFromFile datasource is created pointing to the new kettle transform.  The major addition here is that in the current example, we want CTools to pass user input back to the kettleTransform and update the histogram in response.  To do this, we need

  1. to configure the datasource to accept parameters and map them to kettle transformation input parameters,
  2. to create user changeable parameters in the dashboard,
  3. to have the chart to update when the user parameters change

There are several places where parameters have to be defined, and these are demonstrated in the following.

 

Kettle Datasource Configuration with Parameters

 

Lets consider the kettleTransFromFile datasource.  In the example, the transformation is called t_Central_Limit_Theorem.ktr and the output step with the histogram is "out: 1".  IMPORTANT: Make sure Cache is set to false, since the chart is to be interactive.  The datasource here is describing a CDA datasource with its own parameters and defaults, and these need to be mapped to the kettle transformation input parameters.  This is done by clicking on the "Variables" item in the datasource properties (See Figure 2).  The entries on the left under the "Arg" title are the CDA parameters.  These are the parameters that that the chart components use when they pull data down from a datasource, and the entries on the left under the "Value" title are the corresponding kettle input parameters defined in the transformation.

Figure 2: The datasource Variables dialog in CDE.

 

If an entry appears in the Variables table, then a value can be passed from the dashboard to the kettle transform.  If an entry does not appear in this table, then the dashboard cannot see that kettle input parameter, and the default value defined in the kettle transform will be used.

 

Next, the data type and default value need to be specified for each CDA parameter.  This is done by clicking on the "Parameters" item in the datasource configuration list (See Figure 3).

Figure 3: The datasource Parameters dialog in CDE.

 

Note that these are "CDA" parameters and defaults - you must use the same name here that you used in the "Arg" column of the Variables configuration table. And the default value given here is what will be passed to the CDQ query (and ultimately to the kettle transformation) if the dashboard does not specify it.

 

Creating Dashboard Parameters and Inputs

 

In our example, we want the user to be able to change each parameter in the CDA.  They are going to be used by multiple components, including input text boxes and one or more CCC charting components, so these parameters live in the dashboard.  In the components tab, add a parameter for each and name them accordingly.

 

Figure 4: Parameters defined in the Components tab.

 

In order to allow the user to see and change the values of these parameters, some kind of input control must be chosen.  For the example, I'm using simple Text Input Component (text box) inputs, but other input controls like dropdown lists, date ranges and buttons are available as well. Each input should be mapped to a parameter via the properties of the input (see Figure 5), and these will come from the pool of dashboard parameters.  In fact, if you start typing the Parameter name, the CDE will pull up a context menu of prompts drawn from the list of dashboard parameters that have been created so far.

 

Figure 5: Mapping parameters to Text Input Components.

 

 

Adding Charting Components

 

This is where it gets really interesting.  We're going to work with a bar chart again since we're making histograms.  In order for the histogram to change in response to user input, we have to do two things: we need tell the chart component to react to changes in a dashboard parameter and we need to tell the chart component how to update from its datasource.  That's done in the Listeners and Parameters dialogs respectively of the charting component configuration shown in Figure 6.

Figure 6: The configuration section for a Community Bar Chart component.

 

To have the chart react to parameter changes, click on the Listeners dialog.  A list of available dashboard parameters will be presented with checkboxes.  Simply check the parameters of interest.  If a parameter is not checked as a listener, then the component will not update when that parameter changes.

 

What does it mean for a chart to react to a parameter change?  It means that it refreshes its data from its datasource, and so it needs to know how to map the dashboard parameters to the cda parameters in its datasource.  This is done by clicking on the Parameters dialog, and shown in Figure 7.

Figure 7: The Parameters dialog box of a Community Chart Component.

 

When updating in reaction to a parameter change, the chart component will use the values of the parameters on the right for the CDA datasource arguments on the left.  Note: if a parameter is not listed here, then CDA will use the default value from figure 3.  Also, it is not necessary to use a dashboard parameter here.  A constant value could be used instead if, for instance, the chart has some default value that is not shared by other chart components.

 

Tying it all Together

 

So when we look at the final product, we have the following result shown in figure 8.  Back to the Central Limit Theorem, the width of the resulting normal distribution of means is inversely related to the square root of the number of entries in each group, so it is useful to be able to change the histogram definition to track changes in the other input parameters.  Note that the user can also see the underlying exponential distribution by simply setting the group size to 1.  Lots of interactive investigations are possible with this, the whole power of PDI is behind it!

 

In closing, a note about the parameter mappings we encountered above.  It should be noted that CTools is a collection of distinct technologies: CDA (Community Data Access) which gets data from backend sources like a database or a PDI and returns it in a usable format, CCC (Community Charting Components) which displays graphical information, and the CDF (Community Dashboard Framework) which provides the backbone that knits components together.  The parameter mappings fundamentally exist at the boundaries of these technologies and reflect the immense flexibility of CTools.  The ability to define mappings and defaults at every level makes a rich variety of possible dashboard configurations. For example, to enforce a common parameter value across all dashboard components, simply specify it's default in CDA and leave it out of the component mappings.  Or to change the user experience of the current example and have the chart update in response to a button change rather than react to each parameter change individually, simply add a button and make the histogram listen only to the button.

 

UPDATED 12/11/2017: The layout of the attached example site (Central Limit Theorem.zip) was updated using some best practices that I learned about at the CT1000: CTools Fundamentals online course last week.

Histograms are a great way to probe density functions.  Visually they look like ordinary bar charts, but the bars (or bins in histogram-speak) are always quantitative and ordered rather than qualitative.  A histogram is defined by n mutually exclusive bins covering a continuous region of possible measure values from low to high.  An instance of a histogram is specified by a value associated to each bin, plus a value for underflows and another value for overflows.  The value in each bin represents the number of times a measure value fell in that bin, the underflow represents the number of times a measure value fell below the low edge of the first bin, and the overflow represents the number of times a measure value fell above the high edge of the last bin.  Observations might be weighted, in which case the values represent the sum of the weights.

 

In my last blog post, I demonstrated using a User Defined Java Class to measure the time between different workflow steps when a workflow is ill defined or irregular.  In this blog post, we'll explore creating a User Defined Java Class in a transformation t_histogram_example.ktr to create and fill histograms defined in an Info Step. Then we'll see how these can be used to drive a CCC Bar Chart in a dashboard created with the Community Dashboard Extension (CDE).  In the current example, we'll use a data grid to bring in hard-coded histogram definitions, but these could come in from a configuration database as well.

 

 

Figure 1: In this example, the info step is a Data Grid providing hard-coded histogram definitions, each containing an Integer ID, a name, a low specification, a high specification, a number of bins, and an optional field name containing a fill weight for the histogram (default is 1.0).   

 

The measure values that we will be looking at are random numbers generated according to several simple distributions which can be generated with inverse methods: uniform, Gaussian (or Normal), exponential, and logistic.  (See http://luc.devroye.org/handbooksimulation1.pdf for more information on how these were generated.)  The input parameters of the transformation define the ranges of these random numbers: a low and high for the uniform range, a mean and standard deviation for each Gaussian, and a scale factor for the exponential distribution.  These measures come in as data rows.

 

During initialization of the User Defined Java Class, we get the row set containing histogram definitions using the method findInfoRowStep().  Next, we loop over the info step rows and create the histograms.  The histograms are implemented in a Java static nested class, and used with the following methods.

  • fill(x): Increments the value of the bin corresponding to x by 1.0
  • fill(x, w): Increments the value of the bin corresponding to x by the number w.
  • getBins(): returns all of the bin values in an array.

The name field is used to identify a measure in the data rows, which is used to fill the corresponding histogram.  If a weight field is specified, then the fill(x) method is used.  Otherwise, the fill(x, w) method is used with the value w from the weight field.

 

The User Defined Java Class accumulates data until there are no more data rows detected.  At this point it dumps all histograms to the output.  Each row contains the histogram ID, the measure, the bin number, and the bin value.   In the example, we split the output rows by histogram ID and send to different dummy steps that can be referenced later.

 

Figure 2: Output is split by histogram ID.

 

The output of the first Gaussian using the step preview is shown in Figure 3.

 

Figure 3: The output of a histogram containing a standard normal distribution with 11 bins from -5.5 to +5.5.

 

The following assumes that a Pentaho Business Analytics server is installed and running with the Community Dashboard Editor enabled.  This example was done on version 7.1, and instructions can be found here: 7.1 - Pentaho Documentation .


To create a visual using the CDE, log into the Pentaho User Console (PUC) and create a new folder.  In this example, the folder is called "/home/Histograms".  Into this folder, upload the transformation t_histogram_example.ktr, and create a new CDE dashboard.  In the data sources panel, create a new "kettleTransFromFile" data source from the Kettle Queries tab.  This type of datasource will silently create a Community Data Access (CDA) query which will be the conduit between the PDI transformation and the CDE dashboard.  To configure the datasource, specify

  • A unique name for the datasource
  • The PDI transformation file having the histograms that was uploaded through the PUC
  • The PSI step that contains the histogram of interest, in this case out: 2.
  • Output Columns should be set up to the bin number and the bin value.  The CCC Bar Chart is expecting two input columns: the first should be the X-axis categories of the bar chart, and the second should be the bin values.  The structure of the output can be seen from Figure 3, and counting from 0, these are the 3rd and 6th columns.

The rest of the options should be OK from defaults.

 

Figure 4: Options needed for the kettleTransFromFile to set up a CCC Bar Chart.

 

In the layout panel, create a row and a column and give the column a name.  Change layout parameters as needed to create a reasonably big sized chart.  Finally, in the components panel, choose a CCC Bar Chart component and connect it to the layout (using the name given to the column as the "htmlObject") and the datasource using the datasource name.  Save the dashboard, and then preview it.  It configured correctly, you should see a chart like shown in Figure 5.

Figure 5: The output of the histogram from step out: 2, which has a Standard Normal distribution.

 

In conclusion, we've demonstrated generating random data and binning it in histograms using a User Defined Java Class in PDI.  We also showed how to display these histograms in a CCC Bar Chart in a CDE dashboard.  The techniques demonstrated here can be extended to get histogram definitions from a configuration database managed by a web application, or even to output and clear histograms periodically to provide real-time displays.

 

Update 2017-01-05: When using an Info step with a User Defined Java Class that is set start more than one copy, make sure the Data Movement is set to "Copy Data to Next Steps" on the info step.  You can control this setting by right clicking on the Info step.  By default the data movement is "Round Robin", and so if the User Defined Java Class step is started with more than one copy, some rows from the Info step would appear to be missing.  Thanks to @Jeffrey_Hair for the explanation.

All businesses utilize workflow processes to get things done.  Understanding the timing of the workflow steps is great interest to those charged with business improvement and efficiency.  Some businesses have tightly defined and strictly controlled process implementations, but others take a more informal approach.  Last spring, I worked with a historically small privately-owned client that was expanding through acquisitions, and some of their business processes were ill defined both because of platform integration issues and because of loose controls on data entry.  We ended up working with a User Defined Java Class to analyze the workflow timings because of the complex rules around what constituted an interesting measure in the workflow.  But even for many irregular cases, a Row Denormalizer can be used with a Group By step to calculate timings.

 

To make the example more concrete, let's imagine a workflow process comprising four steps that should happen in sequence A -> B -> C -> D.

  • Event A: A finished piece leaves the factory floor for inspection
  • Event B: A finished piece leaves inspection for shipping
  • Event C: A finished piece leaves shipping
  • Event D: The finished piece arrives at the customer

We further assume that the rows are coming in, one event per row, with three pieces of information: an EventName that will contain one of the four events above, an EventDate with the date and time the event occurred, and a WorkflowID which identifies the piece going through the factory.  In an irregular workflow, we might imagine that we have several entries per event per piece, if pieces are sent back and forth through the process, if the inspector sends a piece back to the floor, or a shipping department sends a piece back to the inspector, etc.

 

Figure 1 shows the Row Denormalizer with a Group By step.  The input is sorted on WorkflowID and then by EventName, and then grouped on the same attributes.  The Group By also emits minimum and maximum aggregates over EventDate.  The Row Denormaizer groups on WorkflowID and keys on EventName, placing the aggregates into fields FirstA, LastA, FirstB, LaastB, and so on.  The calculator field can then easily process measures like "the amount of time between the last time a piece left the factory floor and the first time it arrived at the customer".

Figure 1: The Row Denormalizer method for obtaining workflow timings.  (The full transformation is attached in the file t_workflow_times.ktr.)

 

But what if the desired measures are more complex?  For example, say that a client want to know all of the times between when a piece left the factory floor and was subsequently received at the shipping department.  Now it is more difficult to use a Row Denormalizer since there could be a variable number of columns.  In this case, we can turn to a User Defined Java Class to compute the desired measures and emit them row by row.  How does this work?

 

A User Defined Java Class contains actual Java code that implements logic inside of a standard PDI transformation step.  It is simpler than creating a new transformation step from scratch, but it provides much of the same functionality, and it has decent performance because it uses Janino to actually compile the code on the fly.  To create a User Defined Java Class step, it is helpful to understand the basic interfaces.  Some of these are listed below, but the complete reference is in the javadoc.

  • processRow():  This is the main workhorse method that is called by PDI once per input row.  This method is also responsible for initializing the step right after startup by checking the boolean variable first, which is set to true for you by PDI when the transformation starts.  processRow() should always return true when processing should continue and false when processing is finished.
  • getRow(): When called from within processRow(), this will return an object array with the field values in it, or null if there is no more input.
  • setOutputDone(): When getRow() returns null, this should be called before returning false.
  • get(): This method returns field accessors that can be used with an object array to return particular field values.  The field accessors are preferred to indexing directly into the Object array returned by getRow() and keep track of indexes for you.  Obtain accessors with the field name, and with the constants Field.In for input fields, Field.out for output fields, aor Field.Info for info fields.
  • createOutputRow(): This method is called to produce an output row.  Typically, it is called with both the input row and an output row size, and in this case it will copy the input row values to the output row.  The field data is a Java object that itself contains two fields set by PDI for you: data.inputRowMeta and data.outputRowMeta.  These objects can be used to obtain further information about row structure, like size, type of fields, etc.
  • findInfoRowSet():  If your User Defined Java Class has an info step, this step must be defined on the Info Steps pane.  This function will return a Row Set given the tag name provided.  Calls to getRowFrom() with this Row Set (instead of plain old getRow()) will return rows from this info step.

 

Figure 2: The Info Steps pane in the User Defined Java Class. 

 

  • putRow(): When output has been calculated, it should be returned to the PDI framework for further processing with a call to putRow().  The Object array passed to putRow should be initialized with a call to createOutputRow() as described above to ensure it has the right number of elements consistent with the metadata.
  • Error Handling / Troubleshooting: If an error comes up during initialization and the step cannot begin, or if an unrecoverable error comes up during row processing, a KettleException should be thrown.  However, if an error comes up during row processing that is related to data content of a row, it should be sent to an error handling step.  Use the method putError() to send a row to error handling, and don't forget to return true from processRow() to continue processing.  The method logBasic() can be used to send a message to the Spoon log window to aid in debugging.

 

Figure 3 below shows the basic setup of the User Defined Java Class, which we have called java: aggregator.  The java: aggregator, like the Row Denormalizer above, assumes input is sorted, but this time we're sorting on WorkgroupID and then EventDate.  The aggregator collects all events per WorkflowID in ArrayList() instances and when it sees a new WorkflowID (or end of input) it will calculate measures and flush them out to PDI using putRow().  The example measures in the example are time between all instances of an A event and the next B event, and time between all instances of an A and the last C occurring before another A.

Figure 3: The User Defined Java Class method for obtaining workflow timings.  (The full transformation is attached in the file t_aggregator_engine.ktr.)

 

Examples of both transformations are attached.  Each comes included with a small generator of random workflow events generated over a given period of time for testing.  Although completely random workflow events are not strictly realistic, they serve here for demonstration purposes.

 

Some things to avoid in creating a User Defined Java Class: avoid overly clever or obtuse implementations.  The code should be readable and well commented for maintainers who come after and who are expecting to maintain PDI.  Avoid spinning up any threads within a User Defined Java Class, or opening up any network or database connections.  If you need threads or connections to implement a step, you should consider creating a full transformation step. 

 

In closing, when is it best to use a pure PDI solution versus a solution with a User Defined Java Class in it?  The User Defined Java Class has some practical limitations.  Debugging can be difficult and may require logging statements, which is slow and tedious for programs more than a few hundred LOC.  Furthermore, if something can be done in PDI, it should be done in PDI.  I tend to look at two criteria: if the logic to implement in pure PDI is very difficult, and if the User Defined Java Class is well limited in scope to solve a single, particular data transformation need, then I consider User Defined Java Class a viable option.  But remember, the User Defined Java Class will look like a magic black box to maintainers who come after you - if it is doing too many different things or is implemented haphazardly, it may not be worth it.