Skip navigation

Histograms are a great way to probe density functions.  Visually they look like ordinary bar charts, but the bars (or bins in histogram-speak) are always quantitative and ordered rather than qualitative.  A histogram is defined by n mutually exclusive bins covering a continuous region of possible measure values from low to high.  An instance of a histogram is specified by a value associated to each bin, plus a value for underflows and another value for overflows.  The value in each bin represents the number of times a measure value fell in that bin, the underflow represents the number of times a measure value fell below the low edge of the first bin, and the overflow represents the number of times a measure value fell above the high edge of the last bin.  Observations might be weighted, in which case the values represent the sum of the weights.

 

In my last blog post, I demonstrated using a User Defined Java Class to measure the time between different workflow steps when a workflow is ill defined or irregular.  In this blog post, we'll explore creating a User Defined Java Class in a transformation t_histogram_example.ktr to create and fill histograms defined in an Info Step. Then we'll see how these can be used to drive a CCC Bar Chart in a dashboard created with the Community Dashboard Extension (CDE).  In the current example, we'll use a data grid to bring in hard-coded histogram definitions, but these could come in from a configuration database as well.

 

 

Figure 1: In this example, the info step is a Data Grid providing hard-coded histogram definitions, each containing an Integer ID, a name, a low specification, a high specification, a number of bins, and an optional field name containing a fill weight for the histogram (default is 1.0).   

 

The measure values that we will be looking at are random numbers generated according to several simple distributions which can be generated with inverse methods: uniform, Gaussian (or Normal), exponential, and logistic.  (See http://luc.devroye.org/handbooksimulation1.pdf for more information on how these were generated.)  The input parameters of the transformation define the ranges of these random numbers: a low and high for the uniform range, a mean and standard deviation for each Gaussian, and a scale factor for the exponential distribution.  These measures come in as data rows.

 

During initialization of the User Defined Java Class, we get the row set containing histogram definitions using the method findInfoRowStep().  Next, we loop over the info step rows and create the histograms.  The histograms are implemented in a Java static nested class, and used with the following methods.

  • fill(x): Increments the value of the bin corresponding to x by 1.0
  • fill(x, w): Increments the value of the bin corresponding to x by the number w.
  • getBins(): returns all of the bin values in an array.

The name field is used to identify a measure in the data rows, which is used to fill the corresponding histogram.  If a weight field is specified, then the fill(x) method is used.  Otherwise, the fill(x, w) method is used with the value w from the weight field.

 

The User Defined Java Class accumulates data until there are no more data rows detected.  At this point it dumps all histograms to the output.  Each row contains the histogram ID, the measure, the bin number, and the bin value.   In the example, we split the output rows by histogram ID and send to different dummy steps that can be referenced later.

 

Figure 2: Output is split by histogram ID.

 

The output of the first Gaussian using the step preview is shown in Figure 3.

 

Figure 3: The output of a histogram containing a standard normal distribution with 11 bins from -5.5 to +5.5.

 

The following assumes that a Pentaho Business Analytics server is installed and running with the Community Dashboard Editor enabled.  This example was done on version 7.1, and instructions can be found here: 7.1 - Pentaho Documentation .


To create a visual using the CDE, log into the Pentaho User Console (PUC) and create a new folder.  In this example, the folder is called "/home/Histograms".  Into this folder, upload the transformation t_histogram_example.ktr, and create a new CDE dashboard.  In the data sources panel, create a new "kettleTransFromFile" data source from the Kettle Queries tab.  This type of datasource will silently create a Community Data Access (CDA) query which will be the conduit between the PDI transformation and the CDE dashboard.  To configure the datasource, specify

  • A unique name for the datasource
  • The PDI transformation file having the histograms that was uploaded through the PUC
  • The PSI step that contains the histogram of interest, in this case out: 2.
  • Output Columns should be set up to the bin number and the bin value.  The CCC Bar Chart is expecting two input columns: the first should be the X-axis categories of the bar chart, and the second should be the bin values.  The structure of the output can be seen from Figure 3, and counting from 0, these are the 3rd and 6th columns.

The rest of the options should be OK from defaults.

 

Figure 4: Options needed for the kettleTransFromFile to set up a CCC Bar Chart.

 

In the layout panel, create a row and a column and give the column a name.  Change layout parameters as needed to create a reasonably big sized chart.  Finally, in the components panel, choose a CCC Bar Chart component and connect it to the layout (using the name given to the column as the "htmlObject") and the datasource using the datasource name.  Save the dashboard, and then preview it.  It configured correctly, you should see a chart like shown in Figure 5.

Figure 5: The output of the histogram from step out: 2, which has a Standard Normal distribution.

 

In conclusion, we've demonstrated generating random data and binning it in histograms using a User Defined Java Class in PDI.  We also showed how to display these histograms in a CCC Bar Chart in a CDE dashboard.  The techniques demonstrated here can be extended to get histogram definitions from a configuration database managed by a web application, or even to output and clear histograms periodically to provide real-time displays.

 

Update 2017-01-05: When using an Info step with a User Defined Java Class that is set start more than one copy, make sure the Data Movement is set to "Copy Data to Next Steps" on the info step.  You can control this setting by right clicking on the Info step.  By default the data movement is "Round Robin", and so if the User Defined Java Class step is started with more than one copy, some rows from the Info step would appear to be missing.  Thanks to @Jeffrey_Hair for the explanation.

 

 

I recently wrote about everything you needed to know about Pentaho 8. And now is available! Go get your Enterprise Edition or trial version from the usual places

 

For CE, you can find it on the new community home!

 


Enjoy!

 


-pedro

Hi everyone,

 

thank you for attending Pentaho Community Meeting 2017! With more than 200 attendees from 40 countries and 24 speakers it was a great event with an awesome vibe.

 

This page contains all PCM17 resources - summaries of the talks, presentation slides and pictures.

 

Summaries of the event:

 

Presentation slides:

 

Videos of presentations

 

Interviews with the speakers (you have to scroll down a little bit to see all interviews)

 

Pictures:

 

Btw: You can upload your pictures directly in the picture folders but please only a best-of!

 

And just in case you weren´t lucky enough to win the Pentaho boarding pass and golf balls, you find the auction and the quiz presentations in this folder.

 

You think there are contents missing or would like to add yours? Please leave a comment below.

 

Thank you again all for making this the best PCM ever! We at it-novum might have been good organizers but without your contributions, commitment and positive vibe PCM17 would just have been another boring software conference.

Pentaho is a great product made by a highly committed community. It was an honour to bring this community together for three days.

 

Keep the good spirit and see you at PCM18!

 

PS: Save the date for German Pentaho User Meeting on March 6, 2018 in Frankfurt!

All businesses utilize workflow processes to get things done.  Understanding the timing of the workflow steps is great interest to those charged with business improvement and efficiency.  Some businesses have tightly defined and strictly controlled process implementations, but others take a more informal approach.  Last spring, I worked with a historically small privately-owned client that was expanding through acquisitions, and some of their business processes were ill defined both because of platform integration issues and because of loose controls on data entry.  We ended up working with a User Defined Java Class to analyze the workflow timings because of the complex rules around what constituted an interesting measure in the workflow.  But even for many irregular cases, a Row Denormalizer can be used with a Group By step to calculate timings.

 

To make the example more concrete, let's imagine a workflow process comprising four steps that should happen in sequence A -> B -> C -> D.

  • Event A: A finished piece leaves the factory floor for inspection
  • Event B: A finished piece leaves inspection for shipping
  • Event C: A finished piece leaves shipping
  • Event D: The finished piece arrives at the customer

We further assume that the rows are coming in, one event per row, with three pieces of information: an EventName that will contain one of the four events above, an EventDate with the date and time the event occurred, and a WorkflowID which identifies the piece going through the factory.  In an irregular workflow, we might imagine that we have several entries per event per piece, if pieces are sent back and forth through the process, if the inspector sends a piece back to the floor, or a shipping department sends a piece back to the inspector, etc.

 

Figure 1 shows the Row Denormalizer with a Group By step.  The input is sorted on WorkflowID and then by EventName, and then grouped on the same attributes.  The Group By also emits minimum and maximum aggregates over EventDate.  The Row Denormaizer groups on WorkflowID and keys on EventName, placing the aggregates into fields FirstA, LastA, FirstB, LaastB, and so on.  The calculator field can then easily process measures like "the amount of time between the last time a piece left the factory floor and the first time it arrived at the customer".

Figure 1: The Row Denormalizer method for obtaining workflow timings.  (The full transformation is attached in the file t_workflow_times.ktr.)

 

But what if the desired measures are more complex?  For example, say that a client want to know all of the times between when a piece left the factory floor and was subsequently received at the shipping department.  Now it is more difficult to use a Row Denormalizer since there could be a variable number of columns.  In this case, we can turn to a User Defined Java Class to compute the desired measures and emit them row by row.  How does this work?

 

A User Defined Java Class contains actual Java code that implements logic inside of a standard PDI transformation step.  It is simpler than creating a new transformation step from scratch, but it provides much of the same functionality, and it has decent performance because it uses Janino to actually compile the code on the fly.  To create a User Defined Java Class step, it is helpful to understand the basic interfaces.  Some of these are listed below, but the complete reference is in the javadoc.

  • processRow():  This is the main workhorse method that is called by PDI once per input row.  This method is also responsible for initializing the step right after startup by checking the boolean variable first, which is set to true for you by PDI when the transformation starts.  processRow() should always return true when processing should continue and false when processing is finished.
  • getRow(): When called from within processRow(), this will return an object array with the field values in it, or null if there is no more input.
  • setOutputDone(): When getRow() returns null, this should be called before returning false.
  • get(): This method returns field accessors that can be used with an object array to return particular field values.  The field accessors are preferred to indexing directly into the Object array returned by getRow() and keep track of indexes for you.  Obtain accessors with the field name, and with the constants Field.In for input fields, Field.out for output fields, aor Field.Info for info fields.
  • createOutputRow(): This method is called to produce an output row.  Typically, it is called with both the input row and an output row size, and in this case it will copy the input row values to the output row.  The field data is a Java object that itself contains two fields set by PDI for you: data.inputRowMeta and data.outputRowMeta.  These objects can be used to obtain further information about row structure, like size, type of fields, etc.
  • findInfoRowSet():  If your User Defined Java Class has an info step, this step must be defined on the Info Steps pane.  This function will return a Row Set given the tag name provided.  Calls to getRowFrom() with this Row Set (instead of plain old getRow()) will return rows from this info step.

 

Figure 2: The Info Steps pane in the User Defined Java Class. 

 

  • putRow(): When output has been calculated, it should be returned to the PDI framework for further processing with a call to putRow().  The Object array passed to putRow should be initialized with a call to createOutputRow() as described above to ensure it has the right number of elements consistent with the metadata.
  • Error Handling / Troubleshooting: If an error comes up during initialization and the step cannot begin, or if an unrecoverable error comes up during row processing, a KettleException should be thrown.  However, if an error comes up during row processing that is related to data content of a row, it should be sent to an error handling step.  Use the method putError() to send a row to error handling, and don't forget to return true from processRow() to continue processing.  The method logBasic() can be used to send a message to the Spoon log window to aid in debugging.

 

Figure 3 below shows the basic setup of the User Defined Java Class, which we have called java: aggregator.  The java: aggregator, like the Row Denormalizer above, assumes input is sorted, but this time we're sorting on WorkgroupID and then EventDate.  The aggregator collects all events per WorkflowID in ArrayList() instances and when it sees a new WorkflowID (or end of input) it will calculate measures and flush them out to PDI using putRow().  The example measures in the example are time between all instances of an A event and the next B event, and time between all instances of an A and the last C occurring before another A.

Figure 3: The User Defined Java Class method for obtaining workflow timings.  (The full transformation is attached in the file t_aggregator_engine.ktr.)

 

Examples of both transformations are attached.  Each comes included with a small generator of random workflow events generated over a given period of time for testing.  Although completely random workflow events are not strictly realistic, they serve here for demonstration purposes.

 

Some things to avoid in creating a User Defined Java Class: avoid overly clever or obtuse implementations.  The code should be readable and well commented for maintainers who come after and who are expecting to maintain PDI.  Avoid spinning up any threads within a User Defined Java Class, or opening up any network or database connections.  If you need threads or connections to implement a step, you should consider creating a full transformation step. 

 

In closing, when is it best to use a pure PDI solution versus a solution with a User Defined Java Class in it?  The User Defined Java Class has some practical limitations.  Debugging can be difficult and may require logging statements, which is slow and tedious for programs more than a few hundred LOC.  Furthermore, if something can be done in PDI, it should be done in PDI.  I tend to look at two criteria: if the logic to implement in pure PDI is very difficult, and if the User Defined Java Class is well limited in scope to solve a single, particular data transformation need, then I consider User Defined Java Class a viable option.  But remember, the User Defined Java Class will look like a magic black box to maintainers who come after you - if it is doing too many different things or is implemented haphazardly, it may not be worth it.

 

With the move to Hitachi Vantara we're not letting the community go away - exactly on the contrary. And one of the first things is trying to give the community a new home, in here: http://community.pentaho.com

 

 

We're trying to gather people from the forums, user groups, whatever, and give a better and more modern collaboration space. This space will continue open, also because the content is extremely value, so the ultimate decision is yours.

 

Your mission, should you choose/decide to accept it, is to register and try this new home. Counting on your help to make it a better space

 

 

See you in http://community.pentaho.com

 

 

Cheers!

 

 

-pedro

This post was written by Kevin Haas and originally published on Tuesday, July 14, 2015

 

When working with our clients, we find a growing number who regard their customer or transaction data as not just an internally leveraged asset, but one that can enrich their relationships with customers and supporting partners. They need to systematically share data and analytics outside their firewall.

 

One of the ways we help them achieve this goal is with Custom Data API's. In this article, we'll show you how to build Data API's using visual programming within the Pentaho Data Integration Interface. Along the way, we'll expose capabilities of the platform that are not always seen by the average user.

 

Inspecting The Field: Inside Pentaho Data Integration

 

For those who use Pentaho's Data Integration (PDI aka Kettle) platform regularly, you're probably aware of the amazing things you can do. For the uninitiated, PDI is a very powerful Extract-Transform-Load (ETL) technology that lets you access data from virtually any source, perform complex transformations, and push/load that data to virtually any destination. It's commonly used for integrating and shaping data into new forms optimized for analytical purposes (data warehouses, data marts, etc.).

 

    However, Pentaho experts know that Kettle is more than an ETL platform. It is powered by an amazing, open, and flexible data management platform that can not only read and write data from relational databases, Hadoop, NoSQL databases, and Web API's, but it can also serve data to other applications.

 

    Enter one of the most powerful (but maybe underutilized) components of the Pentaho Data Integration Platform: the Data Integration Server. It can power your API's.

 

Sowing The Seeds: The Data Integration Server

 

If you're an Enterprise Edition Pentaho customer, you're probably aware of the capabilities of the Data Integration Server. It's one of the foundational reasons why customers choose the Enterprise Edition of PDI. The Data Integration Server is a server application that allows you to schedule and run jobs and transformations created with PDIs "Spoon" visual development tool.

 

      Community Edition users have a similar (but not as sophisticated) version of the Data Integration Server with the included Carte server. If you're a community edition user, it's easy enough to start up your own local Carte server on a port of your choosing.  You could start Carte on port 8081 like this: 

 

carte.sh 127.0.0.1 8081

 

or

 

carte.bat 127.0.0.1 8081

 

    There are many configuration options for the Data Integration Server and Carte server that allow it to operate in all kinds of forms. See here for the current list of options.

 

Bringing It To Life: Calling The Transformation on the Server with ExecuteTrans 

 

The first step to create a Data API is to develop a Pentaho Data Integration transformation that gathers data from virtually any source:  RDBMS, Hadoop, NoSQL Databases, Web APIs, etc.  Since transformations can be parameterized, you can apply filtering, sorting or any other customizable logic to your code. Again, this can all be developed using PDIs graphical development tool:  Spoon.

 

      Once you have installed and launched your Data Integration or Carte server, you can execute your transformations with a simple HTTP call using the ExecuteTrans method. For example, if you have Carte running on port 8081 and want to execute a transformation that is stored in /srv/pentaho/sample.ktr and accepts a parameter named "parm", you can simply call: 

 

http://127.0.0.1:8081/kettle/executeTrans/?trans=/srv/pentaho/sample.ktr&parm=parmvalue

 

      When you launch this URL within a browser, you wont see a lot of information in return. In fact, you'll get nothing back! But, if you look at the PDI logs on the server, you'll see that the sample.ktr transformation did run.  This is all well and good -- you were able to launch a transformation on a server via an HTTP request. But, now you want it to return some data! 

 

Growing a Transformation into an API: The Output To Servlet Option 

 

To provide an output, the transformation must produce one or more rows of data through a configured output step.  To send rows of data out as a response from the ExecuteTrans method, your transformation must pass this data to a Text File Output (or JSON Output) Step, configuring the step to "Pass output to servlet."

 

         

By checking the "Pass output to servlet" box, the transform will take the stream of data entering this step and push it out as your response. Note that the exact data contents of the stream are output as specified by the steps formatting options (e.g. fields, headers, separators).  You'll want to configure depending on your needs. 

 

      Bringing it to Market: A Taste of Farmers Market Data In A Custom Data API 

 

Let's look at an example. With harvest season upon us, more and more farmers markets are filling up with their bounty. But where can you find local farmers markets? Thankfully, the USDA provides an API that lets you search for nearby markets.

 

http://catalog.data.gov/dataset/farmers-markets-search

 

This is a powerful set of data, but its not always easy to use the API (or any API, as provided). In this case, to get a market, you first provide a zip code.  The API returns a list of markets with their associated IDs, which you then must use to query the API again in order to get detailed information on each market. Also, we'd like to handle the result data using JSON format and some elements of the API return comma separated values. Finally, we need to format the returned address a bit better, attaching latitude and longitude for mapping. 

 

      To begin, we'll create a transform that accepts a parameter called "zip", and does the following: 

1. Generate Rows: Initialize with a row for Pentaho to process input parameters.     
2. HTTP Client: Call the USDA API to get the list of markets with the input parameters.
3. JSON Output: Convert the JSON that comes back into a list of markets.
4. JSON Output: Extract the Market IDs from that list of markets.
5. Calculator: Format a URL string to re-query the API to get detailed market info for each market.
6. HTTP Client: Call the USDA API again for each of the markets to get the details.
7. JSON Output: Extract the detailed address info on the market.
8. HTTP Client: Call the Google Geocoding API to get the latitude and longitude.
9. JSON Output: Extract the latitude and longitude from Google's response.
10. Javascript: Clean up and format the Address and Market info from the source APIs into a nice JSON format.
11. Group By: Group all of the detailed Markets from the API into a single row.
12. Javascript: Format the final JSON.
13. Text File Output: Output the single data element as a response to the servlet to show in the server.

  
 

A lot of steps, for sure, but their configuration is relatively simple. This is what the transformation looks like in the Spoon IDE: 

 

     

 

Once we have developed and tested our transformation using Spoon, we are ready to use our Carte server and the ExecuteTrans method.   We open a browser and enter the following URL: 

 

http://127.0.0.1:8081/kettle/executeTrans/?trans=/mnt/pentaho/pdi-ee/repository/api/farmers-market.ktr&zip=60634 

 

Note, we can even front end the Carte or DI server with Apache and do a little URL rewriting to make it a friendlier URL to call.  We did this. So instead of the above executeTrans URL, we can call this: 

 

http://127.0.0.1:8081/api/farmers-market?zip=60634 

 

When you execute the API, it does all the work for you. It calls all the various child APIs, does all the cleanup and formatting, and returns delicious looking JSON, with all of the added items from the Transformation as a result. For example: 

    {
   market:
      {
         ami_id:"1010508",
         marketname:"Eli's Cheesecake and Wright College Farmer's Market",
         location:"6701 W. Forest Preserve Drive, Chicago, Illinois, 60634",
         address1:"6701 West Forest Preserve Drive",
         address2:"",
         city:"Chicago",
         state:"IL",
         postal_code:"60634",
         latitude:"41.9588993",
         longitude:"-87.7944428",
         schedule:[
            "06/12/2015 to 09/25/2015 Thu: 7:00 AM-1:00 PM"
         ],
         products:[
            "Baked goods",
            "Crafts and/or woodworking items",
            "Cut flowers",
            "Fresh fruit and vegetables",
            "Honey",
            "Canned or preserved fruits, vegetables, jams, jellies, preserves, dried fruit, etc.",
            "Maple syrup and/or maple products",
            "Poultry",
            "Soap and/or body care products"
         ]
      }
   ]
}

 

Taking It Home: Building Your Own Data APIs 

 

This is a fun illustration of how to pull data from various sources and craft it together into your very own API. All of this was done with with out-of-the-box Pentaho Data Integration capabilities, no extra parts required.

 

Clearly, you're not limited on what you can do with this capability. You can easily get data from virtually any source and wrap a custom data API around it, giving you new capabilities to leverage your existing infrastructure, and share data in new ways. Pentaho has a lot of capabilities inside.  We know that once you take PDI home from the market and look inside the box, you'll be inspired. 

This post was written by Chris Deptula and originally published on Tuesday, February 24, 2015

 

This is the third in a three part blog on working with small files in Hadoop.  The first two parts may be found on our blog. In my previous blogs, we defined what constitutes a small file and why Hadoop prefers fewer, larger files.   We then elaborated on the specific issues that small files cause, specifically inordinate NameNode memory usage and MapReduce performance degradation.  We then began exploring common solutions to these problems.   In this blog, I’ll close this series by examining some less commonly used alternatives for solving the MapReduce performance problem and the factors to consider when choosing a solution.

 

Solving the MapReduce Performance Problem

In my previous blog, I listed the following solutions for mitigating the MapReduce performance problem:

  • Change the ingestion process/interval
  • Batch file consolidation
  • Sequence files
  • HBase
  • S3DistCp (If using Amazon EMR)
  • Using a CombineFileInputFormat
  • Hive configuration settings
  • Using Hadoop’s append capabilities

 

My second blog already discussed changing the ingestion process, batch file consolidation, and sequence files.  I will cover the remaining options here.

 

HBase

If you are producing numerous small files, storing your data as files in HDFS may not be the best solution.  Instead you might consider using an HBase column store.   Using HBase changes your ingestion process from producing many small HDFS files to writing individual records into HBase tables.    If your data access pattern is based on well-defined, random-access lookups, HBase may be your best alternative.   It is architecturally tuned for high-velocity data record inserts, high-volume, individual record lookups and streaming based analytics.  However, if your data access pattern tends toward full file/table scans, then HBase may not be optimal.

 

It is possible to create a Hive table that maps to HBase data; however, query performance in this design will vary.  Hive on HBase will shine when a single row or a range of rows is selected, but if your queries tend toward full table scans HBase is very inefficient.  Most analytical queries, especially those using group bys, require full table scans.

 

HBase provides the best ability to stream data into Hadoop and make it available for processing in real time.  However, balancing the needs of HBase with other cluster processes can be challenging and requires advanced system administration. Additionally, HBase performance is largely dependent on data access patterns and these should be carefully considered before choosing HBase to solve the small file problem.

 

S3DistCp

This solution is only available for users of Amazon EMR.  Amazon EMR clusters are designed to be short lived, persisting their data in Amazon S3.  Even with Amazon S3, processing a large number of small files still results in launching more map tasks than necessary --  decreasing performance.  Enter S3DistCp...

 

S3DistCp is a utility provided by Amazon for distributed copying of data from S3 to ephemeral HDFS or even other S3 buckets.  The utility provides the capability to concatenate files together through the use of groupBy and targetSize options.  This is useful when you have thousands of small files stored in S3 that you want to process using Amazon EMR. S3DistCp kills two birds with one stone by concatenating many small files and making them appear in faster, ephemeral HDFS storage.  There have been reports of as much as 15x performance improvement using this mechanism.

 

For all practical purposes S3DistCp does the same task as the batch file consolidation approach I mentioned in my previous blog.  If using Amazon EMR, note that you have a pre-built tool that accomplishes this task.

 

Using a CombineFileInputFormat

The CombineFileInputFormat is an abstract class provided by Hadoop that merges small files at MapReduce read time.  The merged files are not persisted to disk.  Instead, the process reads multiple files and merges them “on the fly” for consumption by a single map task.   You gain the benefits of not launching one map task per file and not requiring that multiple files be merged into a single persisted file as part of a preparatory step.   This solves the problem of a MapReduce job launching too many map tasks; however, since the job is still reading in multiple small files random disk IO is still a problem.  Additionally, most implementations of the CombineFileInputFormat do not account for data locality and will often pull data from a variety of data nodes over the network.

 

In order to implement this, the CombineFileInputFormat must be extended in Java for different file types.  This requires significant development expertise to develop your custom input format classes.  However, once written, you can configure a maximum split size and it will merge files until this size is met.

 

Note that since the merged data is not persisted in HDFS, the CombineFileInputFormat does not alleviate the NameNode memory problem.  A good example of an implementation of a CombineFileInputFormat may be found here.

 

Hive Configuration Settings

If you notice that Hive creates small files in your Hadoop cluster through “create table as” or “insert overwrite” statements, you can adjust a few Hive specific configuration settings to mitigate.  When used, these settings tell Hive to merge any small files that were created into larger files.  However, there is a penalty.  Hive will launch an additional MapReduce job, post-query, to perform the merge.  Further, the merge is done before Hive indicates to the user that the query has finished processing instead of occurring asynchronously.

 

It should be noted that these settings only work for files that are created by Hive. If, for example, you create files outside of Hive using another tool such as Sqoop to copy into the Hive table using a hdfs fs –mv command, Hive will not merge the files.  Therefore, this solution does not work when the files ingested into Hadoop are small.  This solution is only recommended in Hive-centric architectures where a small performance penalty in insert overwrite and create table as statements is acceptable.

 

The settings to be used are:

Using Hadoop’s Append Capabilities

I would be remiss if I did not mention this final option.   Invariably, when I discuss how to handle small files, I hear the question “I was reading a blog and saw that Hadoop has the ability to append to files.  Why can’t we just append to existing files?”

 

The story of appends in Hadoop is rather rocky.  Append was added in July of 2008 as part of Hadoop 0.19.  However, after implementation (as early as October 2008) many issues were found and append was disabled in 0.19.1.  However, in order to support HBase without risk of data loss append capabilities were added back to Hadoop in 0.20.2.  So, finally, after 0.20.2 it was technically possible to perform appends in Hadoop.

 

Append may be available, but none of the major tools in the Hadoop ecosystem support it:   Flume, Sqoop, Pig, Hive, Spark, and Java MapReduce.  MapReduce enforces a rule that the output location of a MapReduce job must not exist prior to execution.  Due to this rule it is obviously not possible for MapReduce to append to pre-existing files with its output.  Since Sqoop, Pig, and Hive all use MapReduce under the covers it is also not possible for these tools to support append.  Flume does not support append largely because it operates under the assumption that after a certain period either in terms of seconds, bytes, number of events, or seconds of inactivity, Flume will close the file and never open it again.  The Flume community has deemed this sufficient and not demanded append support.

 

If you truly must use appends in Hadoop, you have to write your own system to perform the ingestion and append to existing files.  Additionally, if any of your in-cluster processing requires appending to existing files, you will not be able to use Spark or MapReduce.  Therefore using HDFS append capabilities is very complex and should only be used by the most technologically savvy organizations.  Without a significant engineering team and support commitment, this option is not recommended.

 

Choosing a Solution

Choosing the best solution for working with small files depends on a variety of questions.  It may be necessary to use a combination of these solutions based on access patterns and data requirements.  The questions that should be considered include:

  • At what point in the data flow are the small files being generated?  Are the small files being created at ingestion, or via in-cluster processing?
  • What tool is generating the small files?  Can changing tool configuration reduce the number of small files?
  • What level of technical skill exists within your organization?  Do you have the capabilities to maintain input formats or writing your own ingestion engine?
  • How often are the small files being generated?  How often can small files be merged in order to create large files?
  • What sort of data access is required to these small files?  Do the files need to accessible through Hive?
  • What type of administrative periods might exist where processes can be run inside the cluster to alleviate small files?
  • What level of latency is acceptable from MapReduce processes?

 

Once you answer these questions, then you can evaluate and choose the best options.

This post was written by Chris Deptula and originally published on Wednesday, February 18, 2015

 

This is the second in a three part blog on working with small files in Hadoop.  In my first blog, I discussed what constitutes a small file and why Hadoop has problems with small files.  I defined a small file as any file smaller than 75% of the Hadoop block size, and explained that Hadoop prefers fewer larger files due to NameNode memory usage and MapReduce performance.  In this blog, I will discuss solutions to these challenges when small files are truly unavoidable.

 

Solving the NameNode Memory Problem

 

As discussed in my previous blog the metadata for every block in Hadoop must be stored in the NameNode’s memory.  This leads to a practical limit on how many objects can be stored in Hadoop and also has implications on startup time and network bandwidth.  There are two solutions to this, decrease the number of objects in your Hadoop cluster or somehow enable greater memory use by the NameNode -- but without causing excessive startup times.  The most common approaches to solve this memory problem involve Hadoop Archive (HAR) Files and Federated NameNodes.

 

Hadoop Archive Files

Hadoop archive files alleviate the NameNode memory problem by packing many small files into a larger HAR file, similar to TAR files on Linux.  This causes the NameNode to retain knowledge of a single HAR file instead of dozens or hundreds of small files.   The files within a HAR file can be accessed using the har:// prefix instead of hdfs://.

 

HAR files are created from files that exist in HDFS.  Therefore, a HAR file can consolidate both ingested data as well as data created through normal MapReduce processing.  HAR files can be used independent of the technology used to create the small files.  There is no common dependency other than HDFS.

 

Although HAR files reduce the NameNode memory footprint for many small files, accessing and processing HAR file content will likely be less efficient.  HAR files are still stored randomly on disk and reading a file within a HAR requires two index accesses -- one for the NameNode to find the HAR file itself and one to find the location of the small file within the HAR.   Reading a file in a HAR may actually be slower than reading the same file stored natively on HDFS.  MapReduce jobs compound this performance issue as they will still launch one map task per file within the HAR.

 

In the end, you have a trade-off, HAR files can solve the NameNode memory issue, but may worsen processing performance.   If your small files are primarily kept for archival purposes, with infrequent access, then HAR Files are a good solution.   If the small files are part of your normal processing flow, you may need to rethink your design.

 

Federated NameNodes

Federated NameNodes allow you to have multiple NameNodes in your cluster each storing a subset of the object metadata.  This eliminates the need to store all object metadata on a single machine, providing more scale for memory usage.  On the surface, solving the small file memory problem with this technique is attractive, but with a little more thought you’ll quickly realize the limitations.

 

Federated NameNodes isolate object metadata -- only one NameNode knows about any particular object.  This means to get a file you must know which NameNode to use.  If your cluster houses multiple tenants and/or siloed applications, then federated NameNodes are a natural fit -- you can isolate object metadata by tenant or application.   However, if you are sharing data across all applications within a cluster, this approach is not ideal.

 

Since federation does not actually change the number of objects or blocks within your cluster, it does not solve the MapReduce performance problem.  Conversely, federation adds significant and often unnecessary complexity to your Hadoop installation and administration.  Federation, when used to solve the small file problem, is often more of a mechanism to hide the small file problem.

Solving the MapReduce Performance Problem

As discussed in my previous blog, the MapReduce Performance problem is caused by combination of random disk IO and launching/managing too many map tasks.  The solution seems obvious -- have fewer, larger files or launch fewer map tasks; however, this is often easier said than done.  Some of the most common solutions include:

  • Change the ingestion process/interval
  • Batch file consolidation
  • Sequence files
  • HBase
  • S3DistCp (If using Amazon EMR)
  • Using a CombineFileInputFormat
  • Hive configuration settings
  • Using Hadoop’s append capabilities

We will discuss the first three of these options in this blog, and cover the remainder in the third and final blog of the series.

 

Change the Ingestion Process/Interval

The easiest way to get rid of small files is simply not to generate them in the first place.  If your source system generates thousands of small files that are copied into Hadoop, investigate changing your source system to generate a few large files instead, or possibly concatenating files when ingesting into HDFS.  If you are only ingesting 10 MB of data every hour, determine if it’s possible to only ingest once a day.  You’ll create 1x240MB file instead of 24x10MB files.

 

However, you may not have control over the source system creating the files or business needs require that you ingest data at interval frequencies such that small files are unavoidable.  If small files are truly unavoidable then other solutions should be considered.

 

Batch File Consolidation

When small files are unavoidable, file consolidation is most common solution.  With this option you periodically run a simple, consolidating MapReduce job to read all of the small files in a folder and rewrite them into fewer larger files.  If you have 1000 files in a folder, and specify only 5 reduces for the MapReduce job, the 1000 input files will be merged into 5 output files.   Followed by some simple HDFS file/folder manipulation, you have reduced your memory footprint by 200:1 and, likely, improved the performance of future MapReduce processing on the same data.

 

This can be implemented in as little as 2 lines of Pig, a load and a store statement.  For example, if consolidating text files:

Implementing this in Hive or Java MapReduce is equally as easy.  These MapReduce jobs obviously require cluster resources while executing, and are often scheduled during off hours.  However, they should be run frequently enough so the performance impact of small files does not become too extreme.  Additional logic is often built into these jobs to only merge files in folders that will have a noticeable performance impact.  Merging files in a folder that only contains three files will not result in as great of a performance benefit as merging files in a folder that contains 500 small files.

 

Checking folders to determine which folders should be consolidated can be accomplished in a variety of ways.  For example a Pentaho Data Integration job can be used to iterate through a group of folders in HDFS, finding those that meet a minimum set of requirements for consolidation.  There is also a pre-written application designed specifically for this task called File Crush, an open source project written by Edward Capriolo.  File Crush is not professionally supported, therefore no guarantees exist that it will continue to work with future versions of Hadoop.

 

Batch file consolidation does not maintain the original file names.  If having the original file name is important for processing or understanding where the data originated, batch file consolidation will not work.  However, most HDFS designs embed naming semantics at the folder level and not within each file.  Adopting this practice removes file name dependencies as an issue.

 

Sequence Files

When there is a requirement to maintain the original filename, a very common approach is to use Sequence files.  In this solution, the filename is stored as the key in the sequence file and the file contents are stored as the value.  The table below gives an example of how the small files would be stored in a sequence file:

 

 

If you have 10,000 small files your sequence file would contain 10,000 keys, one per file.  Sequence files support block compression, and are splittable meaning that MapReduce jobs would only launch one map task per 128MB block instead of one map task per small file. This works very well when you need to maintain the input file name, and you are ingesting hundreds or thousands of small files at the same time.

 

However, if you are only ingesting a small number of small files at a time the sequence file does not work as well because Hadoop files are immutable and cannot be appended to.  Three 10MB files will result in a 30MB sequence file which is still, by our definition, a small file.  Another challenge is that the retrieval of a list of file names within a sequence file requires processing the entire file.

 

Additionally, Hive does not work well with sequence files in this structure.  Hive treats all of the data within a value as a single row.  It would not be easy to use Hive to query this data as the entire contents of a file would be a single row within Hive.  Finally, the Hive tables you create will not have access to the sequence file key, the filename,  and will only have access to the value, the contents of the files.  It may be possible to write a custom Hive serde to solve these challenges, but that is an advanced topic beyond the native capabilities within Hadoop.

 

More details about challenges and limitations of Sequence Files may be found in our Hadoop file formats blog post.

 

 

Conclusion

In this blog, we discussed the trade-offs of using Hadoop Archive (HAR) files to minimize NameNode memory usage.  We discussed and dismissed the use of Federated NameNodes as a panacea for the small file problem.  And, we introduced some common solutions for small file consolidation --- solutions which improve both NameNode memory usage and MapReduce performance.    In my third and final blog, I’ll discuss additional, less commonly used, but valuable techniques for file consolidation. 

This post was written by Chris Deptula and originally published on Wednesday, February 11, 2015

 

This is the first in a 3 part blog on working with small files in Hadoop. Hadoop does not work well with lots of small files and instead wants fewer large files.  This is probably a statement you have heard before.  But, why does Hadoop have a problem with large numbers of small files?  And, what exactly does “small” mean?  In the first part of this series I will answer these questions.  The subsequent parts will discuss options for solving or working around the small file problem.

 

What is a small file?

A small file can be defined as any file that is significantly smaller than the Hadoop block size.  The Hadoop block size is usually set to 64,128, or 256 MB, trending toward increasingly larger block sizes.  Throughout the rest of this blog when providing examples we will use a 128MB block size.  I use the rule if a file is not at least 75% of the block size, it is a small file.  However, the small file problem does not just affect small files.  If a large number of files in your Hadoop cluster are marginally larger than an increment of your block size you will encounter the same challenges as small files.

 

For example if your block size is 128MB but all of the files you load into Hadoop are 136MB you will have a significant number of small 8MB blocks.  The good news is that solving the small block problem is as simple as choosing an appropriate (larger) block size.  Solving the small file problem is significantly more complex.  Notice I never mentioned number of rows.  Although number of rows can impact MapReduce performance, it is much less important than file size when determining how to write files to HDFS.

 

Why do small files occur?

The small file problem is an issue we frequently see on Hadoop projects.  There are a variety of reasons why companies may have small files in Hadoop, including:

  • Companies are increasingly hungry for data to be available near real time, causing Hadoop ingestion processes to run every hour/day/week with only, say, 10MB of new data generated per period.
  • The source system generates thousands of small files which are copied directly into Hadoop without modification.
  • The configuration of MapReduce jobs using more than the necessary number of reducers, each outputting its own file.  Along the same lines, if there is a skew in the data that causes the majority of the data to go to one reducer, then the remaining reducers will process very little data and produce small output files.

 

Why does Hadoop have a small file problem?

There are two primary reasons Hadoop has a small file problem:   NameNode memory management and MapReduce performance. The namenode memory problem Every directory, file, and block in Hadoop is represented as an object in memory on the NameNode.  As a rule of thumb, each object requires 150 bytes of memory.  If you have 20 million files each requiring 1 block, your NameNode needs 6GB of memory.  This is obviously quite doable, but as you scale up you eventually reach a practical limit on how many files (blocks) your NameNode can handle.  A billion files will require 300GB of memory and that is assuming every file is in the same folder!   Let’s consider the impact of a 300GB NameNode memory requirement...

  • When a NameNode restarts, it must read the metadata of every file from a cache on local disk.  This means reading 300GB of data from disk -- likely causing quite a delay in startup time.
  • In normal operation, the NameNode must constantly track and check where every block of data is stored in the cluster.   This is done by listening for data nodes to report on all of their blocks of data.   The more blocks a data node must report, the more network bandwidth it will consume.   Even with high-speed interconnects between the nodes, simple block reporting at this scale could become disruptive.

 

The optimization is clear.   If you can reduce the number of small files on your cluster, you can reduce the NameNode memory footprint, startup time and network impact.

 

The MapReduce performance problem

Having a large number of small files will degrade the performance of MapReduce processing whether it be Hive, Pig, Cascading, Pentaho MapReduce, or Java MapReduce.    The first reason is that a large number of small files means a large amount of random disk IO.  Disk IO is often one of the biggest limiting factors in MapReduce performance.  One large sequential read will always outperform reading the same amount of data via several random reads.  If you can store your data in fewer, larger blocks, the performance impact of disk IO is mitigated.

 

The second reason for performance degradation is a bit more complicated, requiring an understanding of how MapReduce processes files and schedules resources.  I will use MapReduce version 1 terminology in this explanation as it is easier to explain than with Yarn, but the same concepts apply for Yarn.  When a MapReduce job launches, it schedules one map task per block of data being processed.  Each file stored in Hadoop is at least one block.  If you have 10,000 files each containing 10 MB of data, a MapReduce job will schedule 10,000 map tasks.  Usually Hadoop is configured so that each map task runs in its own JVM. Continuing our example, you will have the overhead of spinning up and tearing down 10,000 JVMs!

 

Your Hadoop cluster only has so many resources.  In MapReduce v1, to avoid overloading your nodes, you specify the maximum number of concurrent mappers a node can process.  Often the maximum number of concurrent mappers is in the 5 to 20 range.  Therefore, to run 10,000 mappers concurrently you would have to have 500 to 2000 nodes.  Most Hadoop clusters are much smaller than this, causing the JobTracker to queue map tasks as they wait for open slots.   If you have a 20 node cluster with a total of 100 slots, your queue will become quite large and your process will take a long time.  And don’t forget, your job is likely not the only job competing for cluster resources.

 

If instead of 10,000 10MB files you had 800 128 MB files you would only need 800 map tasks. This would require an order of magnitude less JVM maintenance time and will result in better disk IO.  Even though an individual map task processing 128 MB will take longer than a map task processing 10 MB, the sum of all of the processing time will almost always be orders of magnitude faster when processing the 800 larger files.

 

What can you do if you have small files?

Now that we have discussed what constitutes a small file and why Hadoop prefers larger files, how do you avoid the small file problem?  In my next post, I will discuss solutions to the NameNode memory problem as well as some initial options for solving the MapReduce performance problem.  In my third and final blog in this series, I will discuss additional solutions for the performance problem and how to choose the best solution for your situation.

This post was written by Chris Deptula and originally published on Wednesday, November 12, 2014

 

When I proposed this blog to my Pentaho Consulting colleagues they stared blankly at me and asked: “Seriously?  A blog about optimizing file formats in Hadoop?”. To which I replied that choosing the optimal file format in Hadoop is one of the most essential drivers of functionality and performance for big data processing and query. It is an interesting and important topic that is often overlooked by Hadoop architects and engineers.

 

In my experience, there are stark performance differences resulting from (im)proper format choices. Pick the wrong format for your Impala data and you could see your query times increase by 5x or more. Choose a format that does not nicely compress and you may consume multiple GB of disk instead of 100’s of MB. Choose a format that does not support flexible schema evolution and you may have to pay with massive re-processing just to add a new field. Unfortunately, there is no single file format that optimizes for all of these concerns. You need to understand the trade-offs and then make educated decisions.

 

Before we dive in, perhaps you are like my incredulous colleagues asking:"Isn’t Hadoop supposed to be able to process unstructured data? Why do file formats matter so much?". It is true that Hadoop can store and process unstructured data like video, text, etc. This is a significant advantage for Hadoop. However, there are many, perhaps more, uses of Hadoop with structured or “flexibly structured” data. In fact, we often encounter clients using the term “unstructured” when referring to flexibly structured data. Data with flexible structure can have fields added, changed or removed over time and even vary amongst concurrently ingested records. Many of the file format choices focus on managing structured and flexibly structured data. Our commentary will revolve on this assumption.

 

This blog will start by identifying the key questions that drive a file format choice. We’ll then contrast the current file format options and close with a discussion on how to leverage a portfolio of formats optimized for your use cases.

When evaluating file formats, what must be considered?

 

  1. What processing and query tools will you be using?

This is the most obvious but overlooked question. If you want to use Avro, does the data processing language you’ve chosen include Avro readers and writers?     Likewise, suppose you’ve picked the Cloudera distribution because you like Impala. You should probably know that Impala currently does not support ORC format. Conversely, if you’ve chosen Hortonworks and Hive-Stinger, you probably want to avoid Parquet. Yes, it is expected that most of the tools will end up supporting most of the popular formats, but doublecheck before you make any final decisions.

 

  1. Will your data structure change over time?

Do you want to add and delete fields from a file and still be able to read old files with the same code?   Certainly, if you have a large enough cluster you can rewrite all of your historical data to add a field, but this is often not ideal. Being able to add a field and still read historical data may be preferred. If so, know which file formats enable flexible and evolving schema.

 

  1. How important is file format “splittability”?

Since Hadoop stores and processes data in blocks you must be able to begin reading data at any point within a file in order to take fullest advantage of Hadoop’s distributed processing. For example, CSV files are splittable since you can start reading at any line in the file and the data will still make sense; however, an XML file is not splittable since XML has an opening tag at the beginning and a closing tag at the end. You cannot start processing at any point in the middle of those tags.

 

 

  1. Does block compression matter?

Since Hadoop stores large files by splitting them into blocks, its best if the blocks can be independently compressed. Snappy and LZO are commonly used compression technologies that enable efficient block storage and processing. If a file format does not support block compression then, if compressed, the file is rendered non-splittable. When processed, the decompressor must begin reading the file at its beginning in order to obtain any block within the file. For a large file with many blocks, this could generate a substantial performance penalty.

 

  1. How big are your files?

If your files are smaller than the size of an HDFS block, then splittability and block compression don’t matter. You may be able to store the data uncompressed or with a simple file compression algorithm. Of course, small files are the exception in Hadoop and processing too many small files can cause performance issues. Hadoop wants large, splittable files so that its massively distributed engine can leverage data locality and parallel processing.

 

Large files in Hadoop consume a lot of disk -- especially when considering standard 3x replication. So, there is an economic incentive to compress data. i.e. store more data per byte of disk. There is also a performance incentive as disk IO is expensive. If you can reduce the disk footprint through compression, you can relieve IO bottlenecks. As an example, I converted an uncompressed, 1.8GB CSV file into the following formats, achieving much smaller disk footprints.

 

Uncompressed CSV

1.8 GB

Avro

1.5 GB

Avro w/ Snappy Compression

750 MB

Parquet w/ Snappy Compression

300 MB

 

I then ran Impala and Hive queries against each of the file formats. As the files became smaller, the query performance improved. The queries against Parquet were a couple orders of magnitude faster than uncompressed CSV.

 

  1. Are you more concerned about processing or query performance?

There are three types of performance to consider:  

  • Write performance -- how fast can the data be written.
  • Partial read performance -- how fast can you read individual columns within a file.
  • Full read performance -- how fast can you read every data element in a file.

 

A columnar, compressed file format like Parquet or ORC may optimize partial and full read performance, but they do so at the expense of write performance. Conversely, uncompressed CSV files are fast to write but due to the lack of compression and column-orientation are slow for reads. You may end up with multiple copies of your data each formatted for a different performance profile.

 

How do the most common Hadoop file formats stack up?

 

1. Formats to Avoid

Any format that is not splittable should generally be avoided. The use of XML File and JSON File formats is a common mistake. Each of these formats contain a single document per file with an opening tag at the beginning and a closing tag at the end. Note that files containing JSON records are ok and will be discussed below.

 

2. Text/CSV Files

CSV files are still quite common and often used for exchanging data between Hadoop and external systems. They are readable and ubiquitously parsable. They come in handy when doing a dump from a database or bulk loading data from Hadoop into an analytic database. However, CSV files do not support block compression, thus compressing a CSV file in Hadoop often comes at a significant read performance cost.

 

When working with Text/CSV files in Hadoop, never include header or footer lines. Each line of the file should contain a record. This, of course, means that there is no metadata stored with the CSV file. You must know how the file was written in order to make use of it. Also, since the file structure is dependent on field order, new fields can only be appended at the end of records while existing fields can never be deleted. As such, CSV files have limited support for schema evolution.

 

3. JSON Records

JSON records are different from JSON Files in that each line is its own JSON datum -- making the files splittable. Unlike CSV files, JSON stores metadata with the data, fully enabling schema evolution. However, like CSV files, JSON files do not support block compression. Additionally, JSON support was a relative late comer to the Hadoop toolset and many of the native serdes contain significant bugs. Fortunately, third party serdes are frequently available and often solve these challenges. You may have to do a little experimentation and research for your use cases.

 

4. Avro Files

Avro files are quickly becoming the best multi-purpose storage format within Hadoop. Avro files store metadata with the data but also allow specification of an independent schema for reading the file. This makes Avro the epitome of schema evolution support since you can rename, add, delete and change the data types of fields by defining new independent schema. Additionally, Avro files are splittable, support block compression and enjoy broad, relatively mature, tool support within the Hadoop ecosystem.

 

5. Sequence Files

Sequence files store data in a binary format with a similar structure to CSV. Like CSV, sequence files do not store metadata with the data so the only schema evolution option is appending new fields. However, unlike CSV, sequence files do support block compression. Due to the complexity of reading sequence files, they are often only used for “in flight” data such as intermediate data storage used within a sequence of MapReduce jobs.

 

6. RC Files

RC Files or Record Columnar Files were the first columnar file format adopted in Hadoop. Like columnar databases, the RC file enjoys significant compression and query performance benefits. However, the current serdes for RC files in Hive and other tools do not support schema evolution. In order to add a column to your data you must rewrite every pre-existing RC file. Also, although RC files are good for query, writing an RC file requires more memory and computation than non-columnar file formats. They are generally slower to write.

 

7. ORC Files

ORC Files or Optimized RC Files were invented to optimize performance in Hive and are primarily backed by HortonWorks. ORC files enjoy the same benefits and limitations as RC files just done better for Hadoop. This means ORC files compress better than RC files, enabling faster queries. However, they still don’t support schema evolution. Some benchmarks indicate that ORC files compress to be the smallest of all file formats in Hadoop. It is worthwhile to note that, at the time of this writing, Cloudera Impala does not support ORC files.

 

8. Parquet Files

Parquet Files are yet another columnar file format that originated from Hadoop creator Doug Cutting’s Trevni project. Like RC and ORC, Parquet enjoys compression and query performance benefits, and is generally slower to write than non-columnar file formats. However, unlike RC and ORC files Parquet serdes support limited schema evolution. In Parquet, new columns can be added at the end of the structure. At present, Hive and Impala are able to query newly added columns, but other tools in the ecosystem such as Hadoop Pig may face challenges. Parquet is supported by Cloudera and optimized for Cloudera Impala. Native Parquet support is rapidly being added for the rest of the Hadoop ecosystem.

 

One note on Parquet file support with Hive... It is very important that Parquet column names are lowercase. If your Parquet file contains mixed case column names, Hive will not be able to read the column and will return queries on the column with null values and not log any errors. Unlike Hive, Impala handles mixed case column names. A truly perplexing problem when you encounter it!

How to choose a file format?

As discussed, each file format is optimized by purpose. Your choice of format is driven by your use case and environment. Here are the key factors to consider:

  • Hadoop Distribution- Cloudera and Hortonworks support/favor different formats
  • Schema Evolution- Will the structure of your data evolve?  In what way?
  • Processing Requirements - Will you be crunching the data and with what tools?
  • Read/Query Requirements- Will you be using SQL on Hadoop?  Which engine?
  • Extract Requirements- Will you be extracting the data from Hadoop for import into an external database engine or other platform?
  • Storage Requirements- Is data volume a significant factor?  Will you get significantly more bang for your storage buck through compression?

 

So, with all the options and considerations are there any obvious choices?  If you are storing intermediate data between MapReduce jobs, then Sequence files are preferred. If query performance against the data is most important, ORC (HortonWorks/Hive) or Parquet (Cloudera/Impala) are optimal --- but these files will take longer to write. (We’ve also seen order of magnitude query performance improvements when using Parquet with Spark SQL.) Avro is great if your schema is going to change over time, but query performance will be slower than ORC or Parquet. CSV files are excellent if you are going to extract data from Hadoop to bulk load into a database.

 

Because you will likely have multiple use cases for your Hadoop data, you will use multiple file formats. It is quite common to store the same (or very similar) data in multiple file formats to support varied processing, query and extract requirements. Embrace diversity!

This post was written by Dave Reinke and originally published on Wednesday, June 22, 2016

 

In a previous blog, we discussed the importance of tuning data lookups within Pentaho Data Integration (PDI) transformations.  We suggested that there were identifiable design patterns which can be exploited to improve the performance and stability of our code.  In this blog, we will investigate the most frequently used lookup pattern:  Key-based, Single Record Lookup.

 

This pattern’s classic use case involves an inbound stream of fact records queried from a source containing dimensional attributes in the form of business keys.  In order to load this data into the target fact table, the dimensional business keys must be converted into a set of technical keys referencing target dimension tables. The conversion of dimensional business to technical keys is done through a series of lookups.

 

The Database Lookup and Stream Lookup steps are used to implement this pattern.   Database Join could be used, but would be much less efficient since it does not support caching.   Additionally, we are not considering PDI’s purpose built Dimension Lookup/Update and Combination Lookup/Update steps as this pattern assumes no versioning of dimension data, i.e. no slowly-changing dimensions and that we have already loaded our dimension tables prior to the fact table load.  (I can already hear some of my colleagues’ arguments.   Yes, I know that Dimension Lookup/Update can be used for Type 1 dimensions and that its use enables the code to more easily migrate to Type 2, versioned, dimensions.  It is a candidate for this pattern, but requires specific database design changes for its use.   We’ll discuss Dimension Lookup/Update in a future blog.  I promise.)  So with that caveat, let’s examine the use of Database Lookup.

 

Using Database Lookup for Key-based, Single Record Lookups

Database Lookup is the prime choice for this pattern because it is easy to set up and provides rich functionality and tuning options.  However, there are two vital prerequisites for its use.  First, the lookup source must be a single database table.  Second, the lookup table must contain a unique key column, often a business key, and a technical key column, almost always the primary key. The following diagram shows an example of this step’s configuration.  It depicts a lookup of the technical product_key from the dim_product table in my_datamart database using a unique product_code business key field.

 

 

As configured, the step will generate and issue the following SQL statement to the my_datamart database:

 

select product_key from dim_product where product_code = ?

 

The ? is replaced by the value in the inbound stream’s product_code field.  

 

Since the step is configured to enable caching of lookup data, the SQL statement will be run only once for each distinct product_code value found within the inbound stream.  Additionally, we provide a default value of 0 should the lookup fail to find a match.  No-Match behavior and caching are both configurable.  Let’s discuss...

Handling No-Match Lookup Failure in the Database Lookup Step

In our example, a no-match would occur if an inbound stream record’s product_code did not find a matching product_code in the dim_product database table.   The step is configured to “not fail”, rather to place a default value of 0 in the outbound stream’s product_key field.  The underlying assumption is that there exists a  “null” or “unknown” dim_product database record with a 0 product_key for null or invalid product_code’s.  The following table describes the two ways a “no-match” can be handled by this step.

 

Action

Step Configuration

Processing Implications

Provide a Default Value

Depicted in the example.

A subsequent step in the transformation can be used to filter and/or operate on the defaulted value, perhaps writing an error record and sending an email to an administrator.

Filter the inbound record

Check the "Do not pass the row if the lookup fails" box

No error is raised and the row is not passed to the outbound stream. This prevents you from taking corrective action on the errant data but ensures that only valid matches are processed.

 

In practice, we rarely use the filter option as most data programs attempt to remediate and/or alert if invalid data appears.

 

Optimizing the Lookup Cache for the Database Lookup Step

Not implementing or optimizing the lookup cache is a frequent mistake made by new PDI developers. Fortunately, it’s simple to explain and set up!  The following table details the configuration options:

 

Cache Option

Step Configuration

Processing Implications

Cache All

Check “Enable Cache”, then check “Load all data from table”

A single SQL select statement is executed, retrieving all key and lookup column data from the database Lookup Table. The entire Lookup Table dataset must fit in PDI’s memory. No inbound stream records will be processed until this entire dataset is retrieved and cached.

Cache Some

Check “Enable Cache”, uncheck “Load all data from table”, then specify how big (in # of lookup records) the cache will be allowed to get via the “Cache Size in rows” field.

One SQL select statement will be executed per distinct inbound record key that is not already found in the cache. PDI memory must be able to hold the number of lookup rows specified.

No Cache

Uncheck “Enable Cache”

Every inbound record will cause a SQL select statement to be executed against the Lookup Table -- even if keys are repeated!

 

So, when to use which option?   Here are some insights into our best practice decision-making...

 

Cache Option

When to Use

Cache All

  • The lookup table is not very big

Cache Some

  • The lookup table is very big, the inbound stream is not small and contains repeated lookup keys

  • The lookup table is big and the inbound stream is sorted on lookup keys (i.e. removing previously cached records from the active cache has no downstream penalty)

No Cache

  • The lookup table is very big and the inbound stream is very small

  • The lookup table is very big and the inbound stream contains very few (or no) repeated lookup keys

 

The most common configuration is Cache All. In general, if Cache All works efficiently, then use it. Most lookup tables are small (< 100,000 records) and thus can be retrieved quickly and will easily fit in memory. If full lookup dataset retrieval is inordinately slow or memory is limited, then consider Cache Some or No Cache depending on the size and lookup key distribution in your inbound stream.

 

Using Stream Lookup for Key-based, Single Record Lookups

Stream Lookup differs from Database Lookup in the following manner:

  • It’s lookup dataset is sourced from a second inbound stream of records and not tied directly to a single database table.

  • It always caches all lookup data -- there is no option for partial or no caching. 

 

Thus, Stream Lookup should be used in the following circumstances:

  • The lookup data is not sourced from the entirety of a single lookup table

    • You may want to use a subset of rows in a lookup table. In this case, you’d use a Table Input step to gather the subset and then stream that subset as the lookup data into the Stream Lookup step, or

    • the lookup data is sourced by joining two or more tables or streams.

  • You have multiple lookups against the same “not big” lookup table

    • You have role-playing dimension keys that relate a single fact record to the same dimension table multiple times. In this case, you’d use a Table Input step to query all of the lookup data and then use “copy” hops to stream that data to multiple Stream Lookup steps -- one per role.

  • You have a source of data that does not reside in a database table

    • E.g. a flat file, or a spreadsheet

 

Since the lookup data stream for Stream Lookup can be generated by any valid sequence of PDI steps, the alternatives and examples for its use are somewhat boundless.

 

Also note that Stream Lookup does not provide a filter record option for “no-match” conditions.   If you want to rid your stream of “no-match” records you have to use a Filter step after the Stream Lookup.

 

As you can see, there is tremendous opportunity to tune (or mistune!) lookups when doing a simple key-based, single record lookup.   Next in our series, I’ll examine a similar pattern but with a subtle twist:  Looking up the Most Recent Record.

This post was written by Dave Reinke and originally published on Wednesday, July 6, 2016

 

As we continue our series of Pentaho Data Integration (PDI) Lookup Patterns, we next discuss best practice options for looking up the “most recent record”. Common use cases for this pattern include finding the most recent order for a customer, the last interaction of a web site visitor, and the last claim record for a policy. As you’d expect there are multiple ways in which this can be done within PDI.

 

We’ll explore two of the most frequently used options, but first let’s define an example use case.

Example Use Case

Suppose we have a stream of customers each identified with a customer_key and there is an orders table which contains the following columns:

 

COLUMN

DESCRIPTION

order_key

Primary key of the order. The column we want to lookup.

customer_key

Foreign key to customers. The column we need to match with our input stream records..

order_timestamp

The timestamp of the order.  We’ll use this to determine the most recent record for a given customer_key.

Other columns

There may be many other columns which we could include in our lookup.   We will ignore these in order to de-clutter our example.

 

If you had customer_key, 100, and wanted to get the most recent order_key, you would issue a query like the following:

 

select order_key

from orders

where customer_key = 100

order by order_timestamp desc

limit 1

 

The database would return exactly one order_key (limit 1) for the given customer_key (100) chosen as the first of a list sorted by the order_timestamp descending. In PDI, we can use the Database Lookup and Database Join steps to accomplish the same result.

 

Approach 1: Database Lookup

As previously discussed, the Database Lookup step is most often used for key-based, single record lookups, however it is also apt for the Most Recent Record lookup pattern. The following picture demonstrates how this step would be configured for our example.

 

The step looks up the order_key of the most recent record in the order_db.orders table for each input stream record’s customer_key. The Order by field tells PDI to sort the matching orders records by order_timestamp in descending order. By design, the step will only return one row -- the first row returned after the sort.   So, we have effectively configured the step to execute a query similar to the one listed above.

 

As configured, our example will execute a single lookup query for each input stream record. If customer_key’s are repeated in the input stream, this will result in many redundant query executions. To eliminate these redundant queries, caching can be configured for Database Lookup. Database Lookup enables caching because it restricts the lookup query to a single database table. The performance boost can be quite significant. In our example, suppose there are 1,000,000 input stream records containing 55,000 distinct customer_key’s. Without caching, the step will execute 1,000,000 lookup  queries. If we check the Enable Cache? box, the step will only fire 55,000 lookup queries -- one per distinct customer_key. This results in an order of magnitude or more in improved performance. 

 

Approach 2: Database Join

The Database Join step can also be configured to implement the Most Recent Record lookup pattern. The following picture shows it’s configuration for our orders example.

 

 

When using the Database Join step, you must supply your own lookup SQL. In our simple example, this is a straightforward parameterized select statement that returns a order_keys for a given customer_key in descending timestamp order -- most recent record first. We enter 1 in the Number of rows to return field to ensure that only the first record for each input stream record is passed to the output stream. We check Outer join? to allow a failed lookup record (i.e. no matching orders for a customer_key) to pass with a null order_key in it’s output stream record.

 

Performance-wise, the Database Join step will take about as long to process as a non-cached Database Lookup step. There will be one query per input stream record. Unlike Database Lookup, there is no ability to cache data to improve performance. However, because Database Join allows you to define your own SQL Select statement, you have the ability to define arbitrarily complex lookup queries.

Which Approach to Use?

So, how do you decide which step to choose? The answer is rather straightforward. Use Database Lookup step if you have a single lookup table which a straightforward lookup condition. Leverage caching if your input datastream contains duplicate lookup keys. Only use Database Join if your lookup query is complex. Remarkably, in the field, we see developers using to Database Join without realizing that Database Lookup can be leveraged to improve performance. Don’t make that mistake!

 

One final comment, the savvy reader will note that this same pattern applies to “least recent record” or historically first record lookups as well. In fact, any arbitrary record sort that you might apply can be handled by this design pattern.

Next in our series, I’ll examine a the Master-Detail explosion pattern causing the Database Join step to come to the fore.

This post was written by Chris Deptula and originally published on Wednesday, January 28, 2015

 

With an immutable file system and no update command, how do you perform updates in Hadoop?   This problem occurs in just about every Hadoop project.   Yes, the most common use case of Hadoop data ingestion is to append new sets of event-based and/or sub-transactional data.   However, for analytics, you often have to join these events with reference data (hereafter, I will refer to as “dimension data”) -- data that can and usually does change over time. 

 

Imagine you are collecting website activities through an intelligent event tracking system.  The events are generated as JSON datums.  You’ll likely have some automated collection mechanism that uses streaming or micro-batching ingestion into HDFS.  Simple enough, just append the new data.   Now your business users want to analyze these events by various web user attributes -- attributes that you maintain in your registration systems.   You often update registration attributes via third party data append services and from direct updates by the registrants.  You need a way to maintain a current (perhaps historically versioned) copy of the registrant data in HDFS.  You need updates in Hadoop.


Performing data updates in Hadoop is hard.  On a recent project and after a long day of Pig scripting, one of my colleagues said, “My head hurts.  I have to think about things I never had to think about when using databases.”   His experience maintaining dimension data using mature ETL tools and RDBMS technology was great, but now that had to translate to the data programming capabilities in Hadoop.   Comparatively speaking, data programming in Hadoop is, how do we put this nicely, still maturing. 

 

If you want to implement “upsert” (i.e. merge) logic or (gasp) Type 2 dimension logic, you have to build it yourself.   HDFS files are immutable, meaning you cannot edit a file after it has been written. Hive very recently added, and Pig does not (yet) provide an update command, much less a merge statement. Some day they will, but for now you must develop your own algorithms.   Fortunately, there are some patterns which can be applied.

 

In this blog, we’ll discuss four common approaches:  Hybrid Update, HBase Dimension Store, Merge and Compact Update, and “Good Enough” Update.  There is a fifth less common approach involving the creation of a custom access layer that resolves updates at read time.  This is often reserved extremely high volume data where the batch processing cost to transform all of the data does not offset the cost of transforming the data on-demand.  We won’t cover this last approach.

 

Hybrid Update

The Hybrid Update approach uses mature ETL and SQL programming to maintain dimensional data within an RDBMS then periodically copies the data, likely using Sqoop, into HDFS replacing the previous version.  All of the hard work to handle special update rules, slowly changing dimensions (i.e. versioning), surrogate key assignment, etc. can be maintained using highly productive ETL programming languages.   The Hadoop “update” becomes a file replacement.   Given that most companies have already invested in a data warehouse to maintain dimensional data, this approach is easiest to implement and often the first adopted.


However, there are downsides.  First, the approach is not scalable.  Suppose you have a billion record dimension table.  With this approach you’d extract a billion records from your RDBMS each time you need to update the dimension data in Hadoop.  There would clearly be performance and capacity implications!  Even if you currently have manageably sized dimensions, as data volume grows, your load performance will degrade.


Second, this approach requires orchestration between processes running in different environments.   You have to make sure that your RDBMS ETL completes before your extract process starts.   The latency requirements for your Hadoop-based analysis are wholly dependent upon the timeliness and success of your RDBMS load processing.  You could use an ETL technology like Pentaho Data Integration to orchestrate, but your load windows may shrink and exception handling could be complex.

 

In the end, if all of your dimensions are relatively small with slow growth, then this approach may be a good fit. Otherwise, we’d only recommend it as a transitional step before embarking on one of the following Hadoop-centric algorithms.

 

HBase Dimension Store

HBase is a NoSQL database that runs on Hadoop.  Since it provides native support for updates, it is often considered for dimension data maintenance.  In fact, many ETL tools contain pre-built steps to stream inserts, update and deletes into HBase.  You also can ingest “real-time” streams of high velocity data into HBase, minimizing update latency.  And, HBase tables can be queried via SQL on Hadoop options like Hive and Impala with properly configured serdes.   All this makes HBase seductively attractive, but, be warned, there are significant performance challenges when using HBase tables for analytical queries.


Querying Hbase data works best when a single row or a range of rows is selected.   If the query must scan the entire table (as is often the case for analytical queries), HBase is quite inefficient.   In our labs, we set up a small experiment using Impala to contrast performance between a file-based dimension table versus an HBase version.  We created several aggregating “group by” queries that joined our dimension tables to a large, file-based fact table.  In all cases, the file-based dimension table outperformed the HBase version.


If you are 1) already using HBase, 2) comfortable maintaining and balancing its resource needs with the other services running on your Hadoop cluster and 3) able to ensure that your HBase queries always return a single row or a range of rows, then an HBase dimension store is a good solution to the update problem.  Otherwise, we’d recommend you consider one of the next two alternatives.

 

Merge and Compact Update

Unlike the Hybrid and HBase approaches this update strategy leverages an algorithm rather than a specific technology. The algorithm can be implemented with Java MapReduce, Pig, Hive, Spark, Cascading, Pentaho Visual MapReduce or any other Hadoop data programming language.  It consists of the following steps:

  1. Maintain a Current Copy of the Master Data:  A complete copy of your dimension data must reside in HDFS.  For clarity’s sake we’ll call this the “master” data.  This requires a one-time, initial load of the data.
  2. Load the Delta Data: Load the newly updated data into HDFS.  We’ll call this your “delta” data.
  3. Merge the data:  Join the master and delta data together on the business key field(s).
  4. Compact the data: After the merge you will have one or more records for each business key.  In the compaction, you apply logic that emits only one or more records per business key.  In the simplest case, you will emit one record having the maximum update timestamp.   However, if you need to maintain record versions (e.g. type 2 dimensions) then your logic will emit a set of versioned records with non-overlapping effectivity dates.  There may be additional complexity in your compaction process if you have to apply special business logic.  For example, you may never want to update some fields or only update a field when some condition occurs.
  5. Write the Data to a Temporary Output: Since most Hadoop jobs (e.g. MapReduce) cannot overwrite an existing directory, you must write the compaction results to a temporary output.  The temporary output is your new dimension table containing a merge of new, updated and unchanged records.
  6. Overwrite the Original Master Data: Move the temporary output to the location of the original master data, overwriting the previous version.


This Hortonworks blog illustrates a simple example of this approach using Hive.  It works when there are no complex business rules for the compaction.   If you have business rules that cannot be designed using SQL clauses, you may be limited in Hive.


The primary downside of this approach is that you must read and process the entire dimension table every time you perform an update -- even if only one record is changed.  If your dimension table is 2TB on disk, then you are reading and writing 2TB to perform the update.  Fortunately, there is a way around this issue...

 

“Good Enough” Update

If you have extremely large dimensions that cannot be processed within your batch window, then you either have to buy a bigger cluster or come up with another approach.  This is where the “good enough” update is applied.

 

At its heart is an intelligent partitioning strategy whereby you organize your dimension data so that you isolate records which are most likely to be updated into a small subset of the partitions.  This is often done using a record creation or last update date and assuming that recently inserted records are more likely to be updated than older records.  (For type 2 dimensions, you would also isolate the “current” version of each record since older versions are considered immutable.)  Within HDFS, you’d store each partition in a sub-folder of a parent dimension folder.   A Hive table can be defined to recognize these partitions, which might also improve query performance.

 

With this partitioning strategy in place, you simply perform the Merge and Compact Update on the subset of data considered most likely to change.   You avoid processing the entire dataset with the risk of inserting redundant dimension records.  The redundant dimension records are created when an updated delta record cannot find its previous version in the partition subset. 

 

Of course, the potential to have more than one dimension record for the same dimension key means that query results which include that key might be wrong.   This may persist and potentially compound unless you augment this approach with a periodic merge and compact update on the complete dataset.   We have seen this strategy applied where good enough updates are executed throughout the week and then a full update is run over the weekend.  We have also seen organizations run data quality queries looking for duplicate keys and only running merge and compact jobs if an issue is detected.   In each case, there is a period of potential data inaccuracy which eventually is resolved.

 

The Good Enough Update approach should only be used when you cannot feasibly execute the full Merge and Compact Update and when there is a tolerance for small data errors.    Given the vagaries of event/sub-transactional data collection, most Hadoop-based analytical use cases already have such a tolerance.

 

Picking an Update Strategy

As you’d expect, one size does not fit all.   You will likely utilize more than one Hadoop update approach.   So, how do you decide which approach is right for which circumstance?    There are four major (and a whole host of minor) areas for consideration.   First, what is the size of your dimension? Are you dealing with 100’s, 1000’s, … 1,000,000’s of records?   Second, what is your latency requirement?  Do you have a batch window or do you need to see updates occur as changes “stream” in?   Third, how much data volatility exists?   Is this dimension fairly stable with new/changed records trickling in or do you see massive “delta” files with nearly every record being touched?   Fourth, how complex are your update rules?  Is it as simple as overriding the previous record with a delta record or are you implementing type 2 dimension logic with additional update rule dependencies?   The following table considers these factors for each algorithm.

 



In general, Hybrid is useful for small dimensions that have complex update rules.  HBase is best when latency is a major factor and update rules are simple.   Merge and Compact is the most versatile and often used but has scalability and latency limits.  Good Enough helps solve the Merge and Compact issues but at the cost of potentially introducing data quality issues.


The Future

Hortonworks has added batch insert, update, and delete capabilities in Hive as part of the Stinger.next initiative.  These capabilities are designed for large scale inserts, updates, and deletes as opposed to single record transactions.  Too many transactions over a period of time can lead to a drop in performance.  Additionally, ACID transactions in Hive currently only work with the ORC file format and is a very new immature feature.  As this capability matures we expect this to become an attractive option and widely used capability.