This post originally published by Chris Deptula on Wednesday, February 18, 2015
This is the second in a three part blog on working with small files in Hadoop. In my first blog, I discussed what constitutes a small file and why Hadoop has problems with small files. I defined a small file as any file smaller than 75% of the Hadoop block size, and explained that Hadoop prefers fewer larger files due to NameNode memory usage and MapReduce performance. In this blog, I will discuss solutions to these challenges when small files are truly unavoidable.
Solving the NameNode Memory Problem
As discussed in my previous blog the metadata for every block in Hadoop must be stored in the NameNode's memory. This leads to a practical limit on how many objects can be stored in Hadoop and also has implications on startup time and network bandwidth. There are two solutions to this, decrease the number of objects in your Hadoop cluster or somehow enable greater memory use by the NameNode -- but without causing excessive startup times. The most common approaches to solve this memory problem involve Hadoop Archive (HAR) Files and Federated NameNodes.
Hadoop Archive Files
Hadoop archive files alleviate the NameNode memory problem by packing many small files into a larger HAR file, similar to TAR files on Linux. This causes the NameNode to retain knowledge of a single HAR file instead of dozens or hundreds of small files. The files within a HAR file can be accessed using the har:// prefix instead of hdfs://. HAR files are created from files that exist in HDFS. Therefore, a HAR file can consolidate both ingested data as well as data created through normal MapReduce processing. HAR files can be used independent of the technology used to create the small files. There is no common dependency other than HDFS.
Although HAR files reduce the NameNode memory footprint for many small files, accessing and processing HAR file content will likely be less efficient. HAR files are still stored randomly on disk and reading a file within a HAR requires two index accesses -- one for the NameNode to find the HAR file itself and one to find the location of the small file within the HAR. Reading a file in a HAR may actually be slower than reading the same file stored natively on HDFS. MapReduce jobs compound this performance issue as they will still launch one map task per file within the HAR.
In the end, you have a trade-off, HAR files can solve the NameNode memory issue, but may worsen processing performance. If your small files are primarily kept for archival purposes, with infrequent access, then HAR Files are a good solution. If the small files are part of your normal processing flow, you may need to rethink your design.
Federated NameNodes allow you to have multiple NameNodes in your cluster each storing a subset of the object metadata. This eliminates the need to store all object metadata on a single machine, providing more scale for memory usage. On the surface, solving the small file memory problem with this technique is attractive, but with a little more thought you'll quickly realize the limitations.
Federated NameNodes isolate object metadata -- only one NameNode knows about any particular object. This means to get a file you must know which NameNode to use. If your cluster houses multiple tenants and/or siloed applications, then federated NameNodes are a natural fit -- you can isolate object metadata by tenant or application. However, if you are sharing data across all applications within a cluster, this approach is not ideal.
Since federation does not actually change the number of objects or blocks within your cluster, it does not solve the MapReduce performance problem. Conversely, federation adds significant and often unnecessary complexity to your Hadoop installation and administration. Federation, when used to solve the small file problem, is often more of a mechanism to hide the small file problem.
Solving the MapReduce Performance Problem
As discussed in my previous blog, the MapReduce Performance problem is caused by combination of random disk IO and launching/managing too many map tasks. The solution seems obvious -- have fewer, larger files or launch fewer map tasks; however, this is often easier said than done. Some of the most common solutions include:
- Change the ingestion process/interval
- Batch file consolidation
- Sequence files
- S3DistCp (If using Amazon EMR)
- Using a CombineFileInputFormat
- Hive configuration settings
- Using Hadoop's append capabilities
We will discuss the first three of these options in this blog, and cover the remainder in the third and final blog of the series.
Change the Ingestion Process/Interval
The easiest way to get rid of small files is simply not to generate them in the first place. If your source system generates thousands of small files that are copied into Hadoop, investigate changing your source system to generate a few large files instead, or possibly concatenating files when ingesting into HDFS. If you are only ingesting 10 MB of data every hour, determine if it's possible to only ingest once a day. You'll create 1x240MB file instead of 24x10MB files. However, you may not have control over the source system creating the files or business needs require that you ingest data at interval frequencies such that small files are unavoidable. If small files are truly unavoidable then other solutions should be considered.
Batch File Consolidation
When small files are unavoidable, file consolidation is most common solution. With this option you periodically run a simple, consolidating MapReduce job to read all of the small files in a folder and rewrite them into fewer larger files. If you have 1000 files in a folder, and specify only 5 reduces for the MapReduce job, the 1000 input files will be merged into 5 output files. Followed by some simple HDFS file/folder manipulation, you have reduced your memory footprint by 200:1 and, likely, improved the performance of future MapReduce processing on the same data.
This can be implemented in as little as 2 lines of Pig, a load and a store statement. For example, if consolidating text files:
Implementing this in Hive or Java MapReduce is equally as easy. These MapReduce jobs obviously require cluster resources while executing, and are often scheduled during off hours. However, they should be run frequently enough so the performance impact of small files does not become too extreme. Additional logic is often built into these jobs to only merge files in folders that will have a noticeable performance impact. Merging files in a folder that only contains three files will not result in as great of a performance benefit as merging files in a folder that contains 500 small files.
Checking folders to determine which folders should be consolidated can be accomplished in a variety of ways. For example a Pentaho Data Integration job can be used to iterate through a group of folders in HDFS, finding those that meet a minimum set of requirements for consolidation. There is also a pre-written application designed specifically for this task called File Crush, an open source project written by Edward Capriolo. File Crush is not professionally supported, therefore no guarantees exist that it will continue to work with future versions of Hadoop.
Batch file consolidation does not maintain the original file names. If having the original file name is important for processing or understanding where the data originated, batch file consolidation will not work. However, most HDFS designs embed naming semantics at the folder level and not within each file. Adopting this practice removes file name dependencies as an issue.
When there is a requirement to maintain the original filename, a very common approach is to use Sequence files. In this solution, the filename is stored as the key in the sequence file and the file contents are stored as the value. The table below gives an example of how the small files would be stored in a sequence file:
If you have 10,000 small files your sequence file would contain 10,000 keys, one per file. Sequence files support block compression, and are splittable meaning that MapReduce jobs would only launch one map task per 128MB block instead of one map task per small file. This works very well when you need to maintain the input file name, and you are ingesting hundreds or thousands of small files at the same time.
However, if you are only ingesting a small number of small files at a time the sequence file does not work as well because Hadoop files are immutable and cannot be appended to. Three 10MB files will result in a 30MB sequence file which is still, by our definition, a small file. Another challenge is that the retrieval of a list of file names within a sequence file requires processing the entire file.
Additionally, Hive does not work well with sequence files in this structure. Hive treats all of the data within a value as a single row. It would not be easy to use Hive to query this data as the entire contents of a file would be a single row within Hive. Finally, the Hive tables you create will not have access to the sequence file key, the filename, and will only have access to the value, the contents of the files. It may be possible to write a custom Hive serde to solve these challenges, but that is an advanced topic beyond the native capabilities within Hadoop.
More details about challenges and limitations of Sequence Files may be found in our Hadoop file formats blog post.
In this blog, we discussed the trade-offs of using Hadoop Archive (HAR) files to minimize NameNode memory usage. We discussed and dismissed the use of Federated NameNodes as a panacea for the small file problem. And, we introduced some common solutions for small file consolidation --- solutions which improve both NameNode memory usage and MapReduce performance. In my third and final blog, I'll discuss additional, less commonly used, but valuable techniques for file consolidation.