Greg Graham

Analyzing Irregular Business Workflows in PDI

Blog Post created by Greg Graham Employee on Nov 13, 2017

All businesses utilize workflow processes to get things done.  Understanding the timing of the workflow steps is great interest to those charged with business improvement and efficiency.  Some businesses have tightly defined and strictly controlled process implementations, but others take a more informal approach.  Last spring, I worked with a historically small privately-owned client that was expanding through acquisitions, and some of their business processes were ill defined both because of platform integration issues and because of loose controls on data entry.  We ended up working with a User Defined Java Class to analyze the workflow timings because of the complex rules around what constituted an interesting measure in the workflow.  But even for many irregular cases, a Row Denormalizer can be used with a Group By step to calculate timings.


To make the example more concrete, let's imagine a workflow process comprising four steps that should happen in sequence A -> B -> C -> D.

  • Event A: A finished piece leaves the factory floor for inspection
  • Event B: A finished piece leaves inspection for shipping
  • Event C: A finished piece leaves shipping
  • Event D: The finished piece arrives at the customer

We further assume that the rows are coming in, one event per row, with three pieces of information: an EventName that will contain one of the four events above, an EventDate with the date and time the event occurred, and a WorkflowID which identifies the piece going through the factory.  In an irregular workflow, we might imagine that we have several entries per event per piece, if pieces are sent back and forth through the process, if the inspector sends a piece back to the floor, or a shipping department sends a piece back to the inspector, etc.


Figure 1 shows the Row Denormalizer with a Group By step.  The input is sorted on WorkflowID and then by EventName, and then grouped on the same attributes.  The Group By also emits minimum and maximum aggregates over EventDate.  The Row Denormaizer groups on WorkflowID and keys on EventName, placing the aggregates into fields FirstA, LastA, FirstB, LaastB, and so on.  The calculator field can then easily process measures like "the amount of time between the last time a piece left the factory floor and the first time it arrived at the customer".

Figure 1: The Row Denormalizer method for obtaining workflow timings.  (The full transformation is attached in the file t_workflow_times.ktr.)


But what if the desired measures are more complex?  For example, say that a client want to know all of the times between when a piece left the factory floor and was subsequently received at the shipping department.  Now it is more difficult to use a Row Denormalizer since there could be a variable number of columns.  In this case, we can turn to a User Defined Java Class to compute the desired measures and emit them row by row.  How does this work?


A User Defined Java Class contains actual Java code that implements logic inside of a standard PDI transformation step.  It is simpler than creating a new transformation step from scratch, but it provides much of the same functionality, and it has decent performance because it uses Janino to actually compile the code on the fly.  To create a User Defined Java Class step, it is helpful to understand the basic interfaces.  Some of these are listed below, but the complete reference is in the javadoc.

  • processRow():  This is the main workhorse method that is called by PDI once per input row.  This method is also responsible for initializing the step right after startup by checking the boolean variable first, which is set to true for you by PDI when the transformation starts.  processRow() should always return true when processing should continue and false when processing is finished.
  • getRow(): When called from within processRow(), this will return an object array with the field values in it, or null if there is no more input.
  • setOutputDone(): When getRow() returns null, this should be called before returning false.
  • get(): This method returns field accessors that can be used with an object array to return particular field values.  The field accessors are preferred to indexing directly into the Object array returned by getRow() and keep track of indexes for you.  Obtain accessors with the field name, and with the constants Field.In for input fields, Field.out for output fields, aor Field.Info for info fields.
  • createOutputRow(): This method is called to produce an output row.  Typically, it is called with both the input row and an output row size, and in this case it will copy the input row values to the output row.  The field data is a Java object that itself contains two fields set by PDI for you: data.inputRowMeta and data.outputRowMeta.  These objects can be used to obtain further information about row structure, like size, type of fields, etc.
  • findInfoRowSet():  If your User Defined Java Class has an info step, this step must be defined on the Info Steps pane.  This function will return a Row Set given the tag name provided.  Calls to getRowFrom() with this Row Set (instead of plain old getRow()) will return rows from this info step.


Figure 2: The Info Steps pane in the User Defined Java Class. 


  • putRow(): When output has been calculated, it should be returned to the PDI framework for further processing with a call to putRow().  The Object array passed to putRow should be initialized with a call to createOutputRow() as described above to ensure it has the right number of elements consistent with the metadata.
  • Error Handling / Troubleshooting: If an error comes up during initialization and the step cannot begin, or if an unrecoverable error comes up during row processing, a KettleException should be thrown.  However, if an error comes up during row processing that is related to data content of a row, it should be sent to an error handling step.  Use the method putError() to send a row to error handling, and don't forget to return true from processRow() to continue processing.  The method logBasic() can be used to send a message to the Spoon log window to aid in debugging.


Figure 3 below shows the basic setup of the User Defined Java Class, which we have called java: aggregator.  The java: aggregator, like the Row Denormalizer above, assumes input is sorted, but this time we're sorting on WorkgroupID and then EventDate.  The aggregator collects all events per WorkflowID in ArrayList() instances and when it sees a new WorkflowID (or end of input) it will calculate measures and flush them out to PDI using putRow().  The example measures in the example are time between all instances of an A event and the next B event, and time between all instances of an A and the last C occurring before another A.

Figure 3: The User Defined Java Class method for obtaining workflow timings.  (The full transformation is attached in the file t_aggregator_engine.ktr.)


Examples of both transformations are attached.  Each comes included with a small generator of random workflow events generated over a given period of time for testing.  Although completely random workflow events are not strictly realistic, they serve here for demonstration purposes.


Some things to avoid in creating a User Defined Java Class: avoid overly clever or obtuse implementations.  The code should be readable and well commented for maintainers who come after and who are expecting to maintain PDI.  Avoid spinning up any threads within a User Defined Java Class, or opening up any network or database connections.  If you need threads or connections to implement a step, you should consider creating a full transformation step. 


In closing, when is it best to use a pure PDI solution versus a solution with a User Defined Java Class in it?  The User Defined Java Class has some practical limitations.  Debugging can be difficult and may require logging statements, which is slow and tedious for programs more than a few hundred LOC.  Furthermore, if something can be done in PDI, it should be done in PDI.  I tend to look at two criteria: if the logic to implement in pure PDI is very difficult, and if the User Defined Java Class is well limited in scope to solve a single, particular data transformation need, then I consider User Defined Java Class a viable option.  But remember, the User Defined Java Class will look like a magic black box to maintainers who come after you - if it is doing too many different things or is implemented haphazardly, it may not be worth it.