12 November 2013

FSImage analysis using Pig -- Scalable NameDistribution

HDFS-1110 introduces an interesting feature to reduce memory footprint of the NameNode. In a nutshell you'll probably end up having duplicates in name of files (for instance part-00000). This feature allows same filenames to share the same byte array storing the name in the FSDirectory structure in order to save memory.

The presented solution is not prefect as the name duplicates are found at NameNode startup time when the FSImage is loaded from the disk. This means that if you never stops your NameNode you'll never take advantage of names de-duplication. Ok in the real world it's not really the case :). Well, let's move forward.

The patch adds an additional processor in the OIV, the Offline Image Viewer, in order to compute the name distribution (or said differently count the number of filenames and sort them by descending order to display the most popular filenames).

The problem with OIV's NameDistribution processor is that it keeps all the names in memory and increment counters for each filename. This can result in OutOfMemory exception if your FSImage is too big.

So the idea here is to redo the NameDistribution but using Apache Pig.

Detailed Steps

The FSImage, once dumped out of the NameNode, needs to be converted into a textual format to enable Pig to read it (this step is not explicitly specified in the Offline Image Viewer documentation, which is somehow misleading to new users). After being uploaded into HDFS, a Pig script is run

1. Retrieve the latest FSImage from the (active) NameNode
curl --silent "http://${activeNamenode}:50070/getimage?getimage=1&txid=latest" -o fsimage.bin

2. Convert the fsimage to textual "delimited" representation
hdfs oiv -i fsimage.bin -o fsimage.delim -p Delimited

3. Upload the fsimage.delim to HDFS
hdfs dfs -copyFromLocal fsimage.delim /tmp

4. Run the pig script given below
pig -p fsimage=/tmp/fsimage.delim -f nameDistribution.pig

5. Retrieve the results
hdfs dfs -cat /tmp/nameDistribution/part*

--- nameDistribution.pig ---

set job.name 'FSImage NameDistribution'

files = LOAD '$fsimage' USING PigStorage('\t') AS (path:chararray, replication:int, modTime:chararray, accessTime:chararray, blockSize:long, numBlocks:int, fileSize:long, NamespaceQuota:int, DiskspaceQuota:int, perms:chararray, username:chararray, groupname:chararray);

filenames = FOREACH files GENERATE
    SUBSTRING(path, (int)LAST_INDEX_OF(path, '/'),
    (int)SIZE(path)) as filename;

filenamesGroup = GROUP filenames BY filename;

filenamesCount = FOREACH filenamesGroup GENERATE 
    group as filename, 
    COUNT(filenames) as n;

filenamesCount = ORDER filenamesCount BY n DESC;

STORE filenamesCount INTO '/tmp/nameDistribution';

27 August 2013

Hadoop HDFS Balancer Explained

HDFS blocks allocation strategy tries hard to spread new blocks evenly amongst all the datanodes. The rational behind that behavior is to avoid recently added nodes into the cluster to become a bottleneck because all the new blocks would be allocated and read from that datanode.
The drawback of this allocation strategy is that your cluster needs constantly to be rebalanced. Even more when you start having multiple hardware generations living along side: the bigger your boxes are, the slower their disk usage is increasing proportionally to older (and smaller) generations.
Look at the following graph: we can easily distinguish 3 generations of hardware (dn20-dn60, dn61-dn70 and dn71-dn80).

Unbalanced cluster

So Hadoop HDFS Balancer need to be run on a regular basis.

HDFS Balancer

Help entry from the command line:

$ hdfs balancer -h
Usage: java Balancer
        [-policy ]       the balancing policy: datanode or blockpool (default datanode)
        [-threshold ] Percentage of disk capacity (default 10)

The threshold parameter is a float number between 0 and 100 (12.5 for instance). From the average cluster utilization (about 50% in the graph below), the balancer process will try to converge all datanodes' usage in the range [average - threshold, average + threshold]. In the current example:
- Higher (average + threshold): 60% if run with the default threshold (10%)
- Lower (average - threshold): 40%

You can easily notice that the smaller your threshold, the more balanced your datanodes will be. For very small threshold, the cluster may not be able to reach the balanced state if other clients concurrently write and delete data in the cluster.

The balancer will pick datanodes with current usage above the higher threshold (the source, classified as over-utilized), and try to find blocks from these datanodes that could be copied into nodes with current usage below the lower threshold (the destination, under-utilized). A second selection round will select over-utilized nodes to move blocks to nodes with utilization below average. A third pass will pick nodes with utilization above average to move data to under-utilized nodes.

In addition to that selection process, the balancer can also pick a proxy node if the source and the destination are not located in the same rack (i.e. a datanode storing a replica of the block and located in the same rack than the destination). Yes, balancer is rack aware and will generate very little rack-to-rack noise.

Iterative steps in the balancer

The balancer process is iterative. As HDFS has lots of moving state, the balancer try at each iteration to move 10 GB * number of selected sources. Each iteration does not last more than 20 minutes.
The progress of the runs is reported to stdout:

$ hdfs balancer -threshold 25
Time Stamp               Iteration#  Bytes Already Moved  Bytes Left To Move  Bytes Being Moved
Aug 26, 2013 9:29:16 AM           0                 0 KB             5.82 TB              50 GB
Aug 26, 2013 9:46:40 AM           1             49.88 GB             5.76 TB              50 GB
Aug 26, 2013 10:04:27 AM          2             99.67 GB             5.72 TB              50 GB
Aug 26, 2013 10:22:36 AM          3            149.62 GB             5.69 TB              50 GB

Aug 27, 2013 3:24:54 AM          61              2.07 TB             4.08 TB              30 GB
Aug 27, 2013 3:42:32 AM          62               2.1 TB             4.05 TB              30 GB
Aug 27, 2013 4:00:19 AM          63              2.13 TB             4.02 TB              30 GB
Aug 27, 2013 4:18:15 AM          64              2.16 TB             3.95 TB              30 GB

Iterative steps as administrator

The same iterative approach applies to the Hadoop Administrator: if you haven't balance your cluster for a long time, you should start by balancing with a higher threshold (like 25), and then converging to a smaller target threshold (like 10).

Remember: balancer needs to be run regularly to keep constant performances of HDFS.

Special note for HBase

Apache HBase
HBase and HDFS balance are not best friend at all. It is recommended to run HBase on a separate HDFS cluster and to not re-balance it.

18 August 2013

Hadoop fsck -move does not only move the files

Let me rewind the story. Last morning two datanodes crashed. Badly. Hopefully the default replication factor is 3, so the blocks started to re-replicate over properly. Unfortunately, we had a few data sets stored with replication factor of 2. And without much surprise, some blocks were only stored in the two datanodes that crashed. hadoop fsck command reported corrupted files.
No big deal, the action plan simply looks like:
  1. Run a fsck -move to move the corrupted files out of the processing paths to not have jobs failing
  2. Recover the blocks from the crashed datanodes, potentially manually if the datanodes do not come back to life
  3. Move the files from /lost+found back to their original locations.
fsck -move ran fine. /path/to/my/corrupted/file has been move to /lost+found/path/to/my/corrupted/file as expected. But hey, what happened? Looking closer, files have been transformed in directories, and additional files have been created:
$ hadoop fs -ls -R /lost+found
drwx------ /lost+found/path/to/my/corrupted/file
-rw-r--r-- /lost+found/path/to/my/corrupted/file/0
-rw-r--r-- /lost+found/path/to/my/corrupted/file/1

Looking in the bible (Hadoop - The Definited Guide by Tom White), it is briefly mentioned: "Files are broken into chains of contiguous blocks to aid any salvaging efforts you may attempt." So here is the point: fsck -move does not only move the files to /lost+found, it actually splits the files in smaller file of continuous blocks:

As a side effect, the corrupted blocks are no longer used in the remaining files. Even if the blocks are recovered anytime later, they will be deleted anyway because no more belonging to any file. fsck -move is a non-reversible process – or at least painful to recover from.

The recommendation is to use fsck -move in the last resort, only when there is no more chance to recover the block. This will helps to reduce the amount of lost data to only a few portions of the file and not it’s entire content.
I learnt fsck -move hidden behavior the hard way. If I needed to rewrite the action plan in case of corrupted files, it would be:
  1. Get the list of corrupted files (hadoop fsck / | grep -v "^\.$")
  2. Manually move the files listed at point 1. away from the processing path (hadoop fs -mv /path/to/my/corrupted/file /quarantine/path/to/my/corrupted/file)
  3. Try to recover the missing blocks.
  4. If able to recover the missing blocks, re-move the files in the initial directory
  5. If not, run hadoop fsck -move to reduce the amount of lost data down to a few blocks, and do some additional post-processing to have the portion of files in a usable format, including re-move the file’s splits in the initial directory.

Remember: hadoop fsck -move does not only move the files to /lost+found!

03 July 2013

One Engineer’s Experience with Parcel

I wrote an article about Parcel that has been published on Cloudera's blog, but the missing part of the entry was about the motivation of yet another packaging format.

Motivation for Parcels

The first question I asked myself when I read “Parcel” in the release notes was why the h*** does Cloudera need yet another new packaging format? RPM and DEB etc… aren’t enough? And why is the format  called parcel and not YAPF (Yet Another Packaging Format) to be consistent with Hadoop renaming (YARN)? Their main motivations are probably others…
Hadoop, unfortunately, became famous over time for its incompatibility between versions and components. How many spent a full night trying to run a component, eventually discovering that the embedded library is not compatible with that exact version of Hadoop? Having one package per component could lead to incompatibility problems when, for instance, one component is forgotten to be upgraded at the same time as the others. And with the rise of Hadoop 2.0 and its compatibility breaking changes, things are becoming even worse.
At the same time, companies have started to use Hadoop for mission critical processing. In such environments, downtime for component upgrades is no longer an option, so a way to achieve rolling upgrades is necessary.

And last, but not least, for commercial entities such as Cloudera, existing packaging formats (RPM, DEB…) have started to be a real nightmare to maintain amongst all the available long term versions supported by their respective vendors. Naming and dependencies between packages change over time and major versions, resulting in a huge cost in testing and maintenance.
If you were asked to design a solution that meet all the strong requirements mentioned above, what would your solution be? A custom, monolithic packaging format like parcel could be part of the answer.