Pentaho Data Integration comprises a powerful high throughput framework for moving data through a network of transformation steps, turning data from one form into another. PDI is an excellent choice for data engineers because it provides a intuitive visual UI so that the engineer gets to focus on the interesting business details. PDI is also easily extendable.
PDI traditionally focuses on large scale re-aggregation of data for data warehousing, big data, and business intelligence, but there are lots of other interesting things you can contemplate doing with embedded PDI. Coming from a high frequency trading background, when I looked at the default list of transformation steps I was surprised to not see a generalized UDP packet processor. Data transmitted over UDP is everywhere in the trading world - market data often comes in UDP and so do custom prompt trading signals. So I ended up creating a toy implementation of transformation steps UDPPacketSender and a UDPPacketReceiver to send/receive data over UDP.
Part I of this blog post will introduce some concepts describing what UDP is and is not, an introduction to packet handling in Java, and how to write transformation steps for PDI. It is intended to be an introduction, and is not exhaustive. Where applicable, links are provided to further reading. Code is provided separately in a GitHub repository for educational purposes only, no warranty is expressed or implied. Part II of this blog post will include some kettle transformations and demonstrations of the UDPSender and UDPReceiver in action.
What is UDP?
So what is UDP anyway and why should we care about it? Information is generally sent over the internet in one of two protocols: UDP and TCP. UDP stands for User Datagram Protocol, and it is a lightweight, connectionless protocol that was introduced in 1980 by David Reed (RFC 768 - User Datagram Protocol ) as an alternative to the comparatively high overhead, connection-oriented network Transmission Control Protocol, or TCP. Among the properties of UDP:
- Does not guarantee reliable transmission of packets
- Does not guarantee packet arrival in order
By contrast, TCP is connection oriented and has protocol mechanisms to guarantee packet transmission and ordered arrival of packets. But it cannot do Multicast/Broadcast, and TCP has higher overhead associated with flow control and with maintaining connection state. So UDP is a good candidate when latency is more important than TCP protocol guarantees, or when packet delivery guarantees can be ignored or engineered into an enterprise network or into the application level.
Much of the time, UDP packet loss occurs simply because intervening network devices are free to drop UDP packets in periods of high activity when switch and router buffers fill up. (It's rarely due to an errant backhoe, although that does happen, and when it does it's a problem for TCP as well :-)
For many traditional UDP uses like VoiP, packet loss manifests as hiss and pop and is mitigated by the filtering and natural language processing capabilities of the human brain. For many other uses, like streaming measurement data, missing measurements can be similarly imputed. But, it is important to note, UDP packet loss may not be a problem at all on enterprise networks provided that the hardware is deliberately selected and controlled to always have enough buffer. Also it should be noted that socket communication is an excellent mode of inter-process communication (IPC) for endpoints in different platforms on the same machine.
Finally, if packet loss is a problem, a lightweight mitigation protocol may be implemented in the application on top of UDP. More information on the differences between UDP and TCP can be found here: User Datagram Protocol or Transmission Control Protocol.
Creating PDI Transformation Steps
There is excellent documentation on how to extend PDI by creating your own transformation steps https://help.pentaho.com/Documentation/8.0/Developer_Center/PDI. The most relevant information for creating additional PDI plugins is located at the bottom of the page under the heading "Extend Pentaho Data Integration".
In a nutshell, you'll need a working knowledge of Java and common Java tools like eclipse and Maven. First we'll explore sending and receiving packets in Java, and move on to a discussion of threading models. Then we'll move on to a discussion of implementation of the four required PDI interfaces.
Sending and Receiving UDP Packets in Java
Since buffer overflow is usually the main culprit behind lost UDP packets, it is important to grab packets from the network stack as quickly as possible. If you leave them hanging around long enough, they will get bumped by some other packet. So a common pattern for handling UDP packets is to have a dedicated thread listening for packets on a port, read the packets immediately and queue the packets up for further processing on another thread. Depending on how fast packets are coming in and how burst-y the incoming rate is, you may also need to adjust buffer size parameters in the software or even on the network device directly.
This implementation uses classes in the java.nio package. These classes are suited for high performance network programming. Two classes of interest are ByteBuffer and DatagramChannel. You can think of ByteBuffer as a byte array on steroids: it has get and set methods for each basic data type and an internal pointer so that data can be read and written very conveniently. Also, ByteBuffer has the possibility to use native memory outside of the Java heap, using the allocateDirect() method. When passed to a DatagramChannel, then received data can be written to the native memory in a ByteBuffer without the extra step of copying the data initially into the Java heap. For low latency applications this is very desirable, but it comes at the cost of allocation time and extra memory management. (For both of these reasons, it's a good idea to pre-allocate all of the ByteBuffers anticipated and use an ObjectPool.)
A word of caution is in order here. Multi-threaded application programming is challenging enough on its own, but when you are contemplating adding threads inside of another multi-threaded application that you didn't write, you should be extra careful! For example, PDI transformations are usually runnable within Spoon, the UI designer for PDI transformations. Spoon has start and stop buttons, and it would be extra bad to cause a transformation to not stop when the user hits the stop button because of a threading issue, or to leak resources on multiple starts and stops. Above all, don't do anything to affect the user experience.
If at all possible, when designing a transformation step that involves some threading, look at a similar example or two. When creating this implementation, I looked at the pentaho-mqtt implementation on GitHub located at pentaho-labs/pentaho-mqtt-plugin. This was very useful because I could look at how the mqtt-plugin developers married a network implementation capable of sending/receiving packets to PDI. The model I ultimately settled upon has the following characteristics:
- A startable/stoppable DatagramChannel listener with a single-use, flagged thread loop, and an "emergency" hard stop in case a clean join timeout limit was exceeded.
- Assumes that the putRow() method is thread safe.
- Uses an object pool of pre-allocated ByteBuffers to avoid inline object construction costs.
- Uses a Java thread pool (ExecutorService) to both process packets and to serve as a packet queue.
Looking at the pentaho-mqtt-plugin and understanding the flow of data and the initialization sequences greatly speeded up my own development efforts.
Each PDI transformation step has to implement all of the following four interfaces.
- StepMetaInterface: Responsible for holding step configuration information and implementing persistence to XML and PDI repository.
- StepDataInterface: Responsible for keeping processing state information.
- StepInterface: Implements the processing logic of the transformation.
- StepDialogInterface: Implements the configuration UI dialog for the step.
I'm not going to go into a lot of detail on the coding here, but the source code is provided for reference at the GitHub repositories
- ggraham-412/ggutils v1.0.0 contains utility classes that implement logging that can be injected with alternative logging implementations at runtime, an object pool implementation, and UDP packet sender/receiver objects that wrap DatagramChannel and ByteBuffer objects together with a simple packet encoder/decoder.
- ggraham-412/kettle-udp v1.0.0 contains the implementations of the UDPReceiver and UDPSender steps
(Again, the code is provided for educational purposes only and no support or warranty is expressed or implied.)
The basic flow for UDPSender is simple. We're expecting data to come in from PDI into the UDPSenderStep. So in the processRow() method (which is invoked by the framework on every row) we basically get a row of data, find the fields we're interested to send, and send them on a UDPSender from the ggutils library. If the row we get is the first one, we open the UDPSender ahead of processing. If the row we get is null (end of data) then we close the UDPSender and signal that the step is finished by returning false.
The flow in UDPReceiver is a little more complicated because it is ingesting externally generated data from the point of view of PDI. Going from the pentaho-mqtt-plugin model, the processRow() method is used only to determine a stop condition to signal to the framework. Since there are no rows coming in, we use the init() method to start up a UDPReceiver from ggutils listening on the configured port. The UDPReceiver has a handler method so that whenever a packet is received, it is passed to the handler. The handler will unpack the packet into a row and call putRow(). The packets are handled on a thread pool, so we do potentially process fast arriving packets out of order. When the number of packets received hits a maximum, or when a time limit expires, processRow() will return false to the framework stopping the transformation. The UDPReceiver is stopped with a boolean flag, but just in case the thread does not stop for unknown reasons, it is hard-stopped after a join timeout to avoid leaking resources.
The settings UI dialogs are created using the Standard Widget Toolkit (SWT), which is a desktop UI library available for Java. Note that there are many excellent custom widgets that already exist for use with Spoon, such as the table input widget. To maintain the look and feel of PDI, it is best to work from an existing example dialog class from GitHub using these widgets.
Build and Deploy
In order to build the PDI transformation steps, import both of the above ggutils and kettle-udp projects into eclipse. kettle-udp uses Maven, so make sure that the m2e eclipse extension is installed. Finally, if you're not familiar with building Pentaho code using Maven, you should visit the GitHub repository Pentaho OSS Parent Poms for important configuration help. (Big hint: you should use the settings.xml file given at the above repo in the maven-support-files in your ~username/.m2 folder to make sure your build tools can find the appropriate Pentaho repositories.)
Once everything is building, export a jar file called UDPReceiver.jar. In order to see the new steps in Spoon, copy the jar file to the folder data-integration/plugins/steps/UDPReceiver. The folder should have the same name as the jar file it contains. You should be able to see the UDPReceiver step in the Input category and the UDPSender step in the output category when you start up Spoon the next time.
i18n and l10n
Pentaho contains facilities for internationalization and localization of strings and date formats. The repository above contains en_US strings, but alternates could easily be provided depending on the language/locale of your choice. In order to provide for localized strings in the application, simply include a messages package in your project structure (it should match the name of the target package plus ".messages") containing a messages_en_US.properties file. The file should contain properties like "UDPSenderDialog.GeneralTab.Label=General", and the dialog code retrieves the string with a call to BaseMessages.getText(class, key), where class is a Java class in the target package, and key is a string in the properties file. So for example, BaseMessages.getString( UDPSenderMeta.class, "UDPSenderDialog.GeneralTab.Label") will return the string "General" if the locale in Spoon is set to en_US.
Note: to troubleshoot this during development, make sure that the messages properties file is actually deployed in the jar file by inspection with an archiver like 7Zip, and that the properties files are included in the build CLASSPATH. Also you may have to change the locale back and forth away from and back to the desired locale in Spoon to flush cached strings.
Data sent via the internet is generally sent using either TCP or UDP protocols. In this blog post, we looked at the characteristics of UDP, and highlighted the basic techniques for handling UDP packets in Java. Then we showed how those techniques could be incorporated into a PDI transformation step and provided a reference implementation (for educational purposes only.)
In part II of this post, we will put these ideas into action with some demonstrations using Kettle transformations.