This post was written by Chris Deptula and originally published on Wednesday, January 28, 2015
With an immutable file system and no update command, how do you perform updates in Hadoop? This problem occurs in just about every Hadoop project. Yes, the most common use case of Hadoop data ingestion is to append new sets of event-based and/or sub-transactional data. However, for analytics, you often have to join these events with reference data (hereafter, I will refer to as “dimension data”) -- data that can and usually does change over time.
Imagine you are collecting website activities through an intelligent event tracking system. The events are generated as JSON datums. You’ll likely have some automated collection mechanism that uses streaming or micro-batching ingestion into HDFS. Simple enough, just append the new data. Now your business users want to analyze these events by various web user attributes -- attributes that you maintain in your registration systems. You often update registration attributes via third party data append services and from direct updates by the registrants. You need a way to maintain a current (perhaps historically versioned) copy of the registrant data in HDFS. You need updates in Hadoop.
Performing data updates in Hadoop is hard. On a recent project and after a long day of Pig scripting, one of my colleagues said, “My head hurts. I have to think about things I never had to think about when using databases.” His experience maintaining dimension data using mature ETL tools and RDBMS technology was great, but now that had to translate to the data programming capabilities in Hadoop. Comparatively speaking, data programming in Hadoop is, how do we put this nicely, still maturing.
If you want to implement “upsert” (i.e. merge) logic or (gasp) Type 2 dimension logic, you have to build it yourself. HDFS files are immutable, meaning you cannot edit a file after it has been written. Hive very recently added, and Pig does not (yet) provide an update command, much less a merge statement. Some day they will, but for now you must develop your own algorithms. Fortunately, there are some patterns which can be applied.
In this blog, we’ll discuss four common approaches: Hybrid Update, HBase Dimension Store, Merge and Compact Update, and “Good Enough” Update. There is a fifth less common approach involving the creation of a custom access layer that resolves updates at read time. This is often reserved extremely high volume data where the batch processing cost to transform all of the data does not offset the cost of transforming the data on-demand. We won’t cover this last approach.
The Hybrid Update approach uses mature ETL and SQL programming to maintain dimensional data within an RDBMS then periodically copies the data, likely using Sqoop, into HDFS replacing the previous version. All of the hard work to handle special update rules, slowly changing dimensions (i.e. versioning), surrogate key assignment, etc. can be maintained using highly productive ETL programming languages. The Hadoop “update” becomes a file replacement. Given that most companies have already invested in a data warehouse to maintain dimensional data, this approach is easiest to implement and often the first adopted.
However, there are downsides. First, the approach is not scalable. Suppose you have a billion record dimension table. With this approach you’d extract a billion records from your RDBMS each time you need to update the dimension data in Hadoop. There would clearly be performance and capacity implications! Even if you currently have manageably sized dimensions, as data volume grows, your load performance will degrade.
Second, this approach requires orchestration between processes running in different environments. You have to make sure that your RDBMS ETL completes before your extract process starts. The latency requirements for your Hadoop-based analysis are wholly dependent upon the timeliness and success of your RDBMS load processing. You could use an ETL technology like Pentaho Data Integration to orchestrate, but your load windows may shrink and exception handling could be complex.
In the end, if all of your dimensions are relatively small with slow growth, then this approach may be a good fit.
Otherwise, we’d only recommend it as a transitional step before embarking on one of the following Hadoop-centric algorithms.
HBase Dimension Store
HBase is a NoSQL database that runs on Hadoop. Since it provides native support for updates, it is often considered for dimension data maintenance. In fact, many ETL tools contain pre-built steps to stream inserts, update and deletes into HBase. You also can ingest “real-time” streams of high velocity data into HBase, minimizing update latency. And, HBase tables can be queried via SQL on Hadoop options like Hive and Impala with properly configured serdes. All this makes HBase seductively attractive, but, be warned, there are significant performance challenges when using HBase tables for analytical queries.
Querying Hbase data works best when a single row or a range of rows is selected. If the query must scan the entire table (as is often the case for analytical queries), HBase is quite inefficient. In our labs, we set up a small experiment using Impala to contrast performance between a file-based dimension table versus an HBase version. We created several aggregating “group by” queries that joined our dimension tables to a large, file-based fact table. In all cases, the file-based dimension table outperformed the HBase version.
If you are 1) already using HBase, 2) comfortable maintaining and balancing its resource needs with the other services running on your Hadoop cluster and 3) able to ensure that your HBase queries always return a single row or a range of rows, then an HBase dimension store is a good solution to the update problem. Otherwise, we’d recommend you consider one of the next two alternatives.
Merge and Compact Update
Unlike the Hybrid and HBase approaches this update strategy leverages an algorithm rather than a specific technology. The algorithm can be implemented with Java MapReduce, Pig, Hive, Spark, Cascading, Pentaho Visual MapReduce or any other Hadoop data programming language. It consists of the following steps:
1. Maintain a Current Copy of the Master Data: A complete copy of your dimension data must reside in HDFS. For clarity’s sake we’ll call this the “master” data. This requires a one-time, initial load of the data. Check out the Inquidia Hadoop Datamart Importer (HDI) for an automated way to do this.
2. Load the Delta Data: Load the newly updated data into HDFS. We’ll call this your “delta” data.
3. Merge the data: Join the master and delta data together on the business key field(s).
4. Compact the data: After the merge you will have one or more records for each business key. In the compaction, you apply logic that emits only one or more records per business key. In the simplest case, you will emit one record having the maximum update timestamp. However, if you need to maintain record versions (e.g. type 2 dimensions) then your logic will emit a set of versioned records with non-overlapping effectivity dates. There may be additional complexity in your compaction process if you have to apply special business logic. For example, you may never want to update some fields or only update a field when some condition occurs.
5. Write the Data to a Temporary Output: Since most Hadoop jobs (e.g. MapReduce) cannot overwrite an existing directory, you must write the compaction results to a temporary output. The temporary output is your new dimension table containing a merge of new, updated and unchanged records.
6. Overwrite the Original Master Data: Move the temporary output to the location of the original master data, overwriting the previous version.
This Hortonworks blog illustrates a simple example of this approach using Hive. It works when there are no complex business rules for the compaction. If you have business rules that cannot be designed using SQL clauses, you may be limited in Hive.
The primary downside of this approach is that you must read and process the entire dimension table every time you perform an update -- even if only one record is changed. If your dimension table is 2TB on disk, then you are reading and writing 2TB to perform the update. Fortunately, there is a way around this issue...
“Good Enough” Update
If you have extremely large dimensions that cannot be processed within your batch window, then you either have to buy a bigger cluster or come up with another approach. This is where the “good enough” update is applied.
At its heart is an intelligent partitioning strategy whereby you organize your dimension data so that you isolate records which are most likely to be updated into a small subset of the partitions. This is often done using a record creation or last update date and assuming that recently inserted records are more likely to be updated than older records. (For type 2 dimensions, you would also isolate the “current” version of each record since older versions are considered immutable.) Within HDFS, you’d store each partition in a sub-folder of a parent dimension folder. A Hive table can be defined to recognize these partitions, which might also improve query performance.
With this partitioning strategy in place, you simply perform the Merge and Compact Update on the subset of data considered most likely to change. You avoid processing the entire dataset with the risk of inserting redundant dimension records. The redundant dimension records are created when an updated delta record cannot find its previous version in the partition subset.
Of course, the potential to have more than one dimension record for the same dimension key means that query results which include that key might be wrong. This may persist and potentially compound unless you augment this approach with a periodic merge and compact update on the complete dataset. We have seen this strategy applied where good enough updates are executed throughout the week and then a full update is run over the weekend. We have also seen organizations run data quality queries looking for duplicate keys and only running merge and compact jobs if an issue is detected. In each case, there is a period of potential data inaccuracy which eventually is resolved.
The Good Enough Update approach should only be used when you cannot feasibly execute the full Merge and Compact Update and when there is a tolerance for small data errors. Given the vagaries of event/sub-transactional data collection, most Hadoop-based analytical use cases already have such a tolerance.
Picking an Update Strategy
As you’d expect, one size does not fit all. You will likely utilize more than one Hadoop update approach. So, how do you decide which approach is right for which circumstance? There are four major (and a whole host of minor) areas for consideration. First, what is the size of your dimension? Are you dealing with 100’s, 1000’s, … 1,000,000’s of records? Second, what is your latency requirement? Do you have a batch window or do you need to see updates occur as changes “stream” in? Third, how much data volatility exists? Is this dimension fairly stable with new/changed records trickling in or do you see massive “delta” files with nearly every record being touched? Fourth, how complex are your update rules? Is it as simple as overriding the previous record with a delta record or are you implementing type 2 dimension logic with additional update rule dependencies? The following table considers these factors for each algorithm.
In general, Hybrid is useful for small dimensions that have complex update rules. HBase is best when latency is a major factor and update rules are simple. Merge and Compact is the most versatile and often used but has scalability and latency limits. Good Enough helps solve the Merge and Compact issues but at the cost of potentially introducing data quality issues.
Hortonworks has added batch insert, update, and delete capabilities in Hive as part of the Stinger.next initiative. These capabilities are designed for large scale inserts, updates, and deletes as opposed to single record transactions. Too many transactions over a period of time can lead to a drop in performance. Additionally, ACID transactions in Hive currently only work with the ORC file format and is a very new immature feature. As this capability matures we expect this to become an attractive option and widely used capability.