This post originally published by Chris Deptula on Tuesday, February 24, 2015.
This is the third in a three part blog on working with small files in Hadoop.
In my previous blogs, we defined what constitutes a small file and why Hadoop prefers fewer, larger files. We then elaborated on the specific issues that small files cause, specifically inordinate NameNode memory usage and MapReduce performance degradation. We then began exploring common solutions to these problems. In this blog, I’ll close this series by examining some less commonly used alternatives for solving the MapReduce performance problem and the factors to consider when choosing a solution.
Solving the MapReduce Performance Problem
In my previous blog, I listed the following solutions for mitigating the MapReduce performance problem:
• Change the ingestion process/interval
• Batch file consolidation
• Sequence files
• S3DistCp (If using Amazon EMR)
• Using a CombineFileInputFormat
• Hive configuration settings
• Using Hadoop’s append capabilities
My second blog already discussed changing the ingestion process, batch file consolidation, and sequence files. I will cover the remaining options here.
If you are producing numerous small files, storing your data as files in HDFS may not be the best solution. Instead you might consider using an HBase column store. Using HBase changes your ingestion process from producing many small HDFS files to writing individual records into HBase tables. If your data access pattern is based on well-defined, random-access lookups, HBase may be your best alternative. It is architecturally tuned for high-velocity data record inserts, high-volume, individual record lookups and streaming based analytics. However, if your data access pattern tends toward full file/table scans, then HBase may not be optimal.
It is possible to create a Hive table that maps to HBase data; however, query performance in this design will vary. Hive on HBase will shine when a single row or a range of rows is selected, but if your queries tend toward full table scans HBase is very inefficient. Most analytical queries, especially those using group bys, require full table scans.
HBase provides the best ability to stream data into Hadoop and make it available for processing in real time. However, balancing the needs of HBase with other cluster processes can be challenging and requires advanced system administration. Additionally, HBase performance is largely dependent on data access patterns and these should be carefully considered before choosing HBase to solve the small file problem.
This solution is only available for users of Amazon EMR. Amazon EMR clusters are designed to be short lived, persisting their data in Amazon S3. Even with Amazon S3, processing a large number of small files still results in launching more map tasks than necessary -- decreasing performance. Enter S3DistCp...
S3DistCp is a utility provided by Amazon for distributed copying of data from S3 to ephemeral HDFS or even other S3 buckets. The utility provides the capability to concatenate files together through the use of groupBy and targetSize options. This is useful when you have thousands of small files stored in S3 that you want to process using Amazon EMR. S3DistCp kills two birds with one stone by concatenating many small files and making them appear in faster, ephemeral HDFS storage. There have been reports of as much as 15x performance improvement using this mechanism.
For all practical purposes S3DistCp does the same task as the batch file consolidation approach I mentioned in my previous blog. If using Amazon EMR, note that you have a pre-built tool that accomplishes this task.
Using a CombineFileInputFormat
The CombineFileInputFormat is an abstract class provided by Hadoop that merges small files at MapReduce read time. The merged files are not persisted to disk. Instead, the process reads multiple files and merges them “on the fly” for consumption by a single map task. You gain the benefits of not launching one map task per file and not requiring that multiple files be merged into a single persisted file as part of a preparatory step. This solves the problem of a MapReduce job launching too many map tasks; however, since the job is still reading in multiple small files random disk IO is still a problem. Additionally, most implementations of the CombineFileInputFormat do not account for data locality and will often pull data from a variety of data nodes over the network.
In order to implement this, the CombineFileInputFormat must be extended in Java for different file types. This requires significant development expertise to develop your custom input format classes. However, once written, you can configure a maximum split size and it will merge files until this size is met.
Note that since the merged data is not persisted in HDFS, the CombineFileInputFormat does not alleviate the NameNode memory problem. A good example of an implementation of a CombineFileInputFormat may be found here.
Hive Configuration Settings
If you notice that Hive creates small files in your Hadoop cluster through “create table as” or “insert overwrite” statements, you can adjust a few Hive specific configuration settings to mitigate. When used, these settings tell Hive to merge any small files that were created into larger files. However, there is a penalty. Hive will launch an additional MapReduce job, post-query, to perform the merge. Further, the merge is done before Hive indicates to the user that the query has finished processing instead of occurring asynchronously.
It should be noted that these settings only work for files that are created by Hive. If, for example, you create files outside of Hive using another tool such as Sqoop to copy into the Hive table using a hdfs fs –mv command, Hive will not merge the files. Therefore, this solution does not work when the files ingested into Hadoop are small. This solution is only recommended in Hive-centric architectures where a small performance penalty in insert overwrite and create table as statements is acceptable.
The settings to be used are:
Using Hadoop’s Append Capabilities
I would be remiss if I did not mention this final option. Invariably, when I discuss how to handle small files, I hear the question “I was reading a blog and saw that Hadoop has the ability to append to files. Why can’t we just append to existing files?”
The story of appends in Hadoop is rather rocky. Append was added in July of 2008 as part of Hadoop 0.19. However, after implementation (as early as October 2008) many issues were found and append was disabled in 0.19.1. However, in order to support HBase without risk of data loss append capabilities were added back to Hadoop in 0.20.2. So, finally, after 0.20.2 it was technically possible to perform appends in Hadoop.
Append may be available, but none of the major tools in the Hadoop ecosystem support it: Flume, Sqoop, Pig, Hive, Spark, and Java MapReduce. MapReduce enforces a rule that the output location of a MapReduce job must not exist prior to execution. Due to this rule it is obviously not possible for MapReduce to append to pre-existing files with its output. Since Sqoop, Pig, and Hive all use MapReduce under the covers it is also not possible for these tools to support append. Flume does not support append largely because it operates under the assumption that after a certain period either in terms of seconds, bytes, number of events, or seconds of inactivity, Flume will close the file and never open it again. The Flume community has deemed this sufficient and not demanded append support.
If you truly must use appends in Hadoop, you have to write your own system to perform the ingestion and append to existing files. Additionally, if any of your in-cluster processing requires appending to existing files, you will not be able to use Spark or MapReduce. Therefore using HDFS append capabilities is very complex and should only be used by the most technologically savvy organizations. Without a significant engineering team and support commitment, this option is not recommended.
Choosing a Solution
Choosing the best solution for working with small files depends on a variety of questions. It may be necessary to use a combination of these solutions based on access patterns and data requirements. The questions that should be considered include:
• At what point in the data flow are the small files being generated? Are the small files being created at ingestion, or via in-cluster processing?
• What tool is generating the small files? Can changing tool configuration reduce the number of small files?
• What level of technical skill exists within your organization? Do you have the capabilities to maintain input formats or writing your own ingestion engine?
• How often are the small files being generated? How often can small files be merged in order to create large files?
• What sort of data access is required to these small files? Do the files need to accessible through Hive?
• What type of administrative periods might exist where processes can be run inside the cluster to alleviate small files?
• What level of latency is acceptable from MapReduce processes?
Once you answer these questions, then you can evaluate and choose the best options.