20 October 2015

INotify for HDFS: from ideation to open source

TL;DR: Trumpet, scalable INotify-like system for HDFS, is out: https://github.com/verisign/trumpet.
----

Polling a filesystem to detect files creation might be cumbersome and inefficient. Linux brought an answer to this called inotify: you register a listener to a folder and get notified asynchronously when an change occurs in that folder.

I started to ask myself the question back in 2013: what about HDFS, how to get notified when a new file is created in HDFS?

Use-cases leveraging this feature are multiples.

  • The most obvious one being a processing workflow, when a file is created, the next stage of processing could start immediately: instead of polling HDFS to detect, say, a _SUCCESS file in a particular folder, the next stage would start directly by being notified once the file created
  • The most interesting to replace would be files monitoring: instead of recursively polling a entire directory tree, which might be consuming a significant amount of resources in HDFS, the monitoring system would be notified on new file creation in a given folder.

Convinced about the value of such system, I started working on it with my colleagues. After several iterations and multiple failures, the current version is running in production for several months, handling on average 1000's of events per seconds, and an order of magnitude more with peaks.

Key features of Trumpet are:

  • Non-intrusive to the NameNode: as a Hadoop admin, my first priority is to protect the NameNode. Trumpet server is then running in a different JVM than the NameNode or the JournalNode, extracting the transactions from the edits log files, the HDFS redo log files. This actually happened to be pretty straight forward, and (almost :)) without touching the NameNode code.
  • Scalability: Trumpet is leveraging Kafka -- its pub-sub messaging model fits perfectly what I'm trying to achieve with Trumpet: reliably broadcasting HDFS events to multiple consumers. The current implementation can server hundreds of subscribers without problem -- actually, Trumpet scales as well as your Kafka cluster.
  • Highly Available: Trumpet is designed from day one to run in HA mode, with multiple processes electing a leader, and recovering from a previous state.
  • Ops friendly: I'm Hadoop ops, and I like making my life easier. Trumpet provides all the tools to validate, dig, watch, monitor a running installation. This also includes the support for rolling upgrade!
  • Simple: in total, Trumpet is about 3K line of codes. Most of them are boilerplate for exception handling. The main components of Trumpet probably fits in a 100's lines of code. Trumpet is only doing one thing, but is doing it rather well.

And that's it. Install it and use it, and build awesome reactive pipeline on top of it!

Oh yeah, and if you don't have Kafka running along side your Hadoop cluster, well, you definitely should -- Kafka is awesome!

13 October 2015

Spark is an Optimistic Implementation of MapReduce

Apache Spark is really trendy these days, to the point that people starts speaking about a unified data processing platform, or in analogy to Lord of the Rings, one stack to rule them all. Benchmarks are flourishing all around, comparing Spark with Map Reduce, Tez, Flink and your grandmother mixer. I'm not here to debate about the relevance of these benchmarks, and I personally always smile when I see a benchmark claiming X to be 100 times faster than Y -- to quote Taylor Goetz, "any system can be hobbled to look bad in a benchmark". No, the goal of this small article is to understand what's so cool with Spark, or said differently what can be done better in MapReduce.

Common belief

If you ever ask in a job interview about the difference between Spark and MapReduce, most of the candidates will answer "Spark is doing in-memory processing so it's faster". Well, I'm probably biased with MapReduce code, but before spilling to disk, MapReduce is doing a significant part of the processing "in-memory". Or said differently, if you're comparing a word count on a 1PB datasets between MapReduce and Spark, you'll see that MapReduce is not slower than Spark.

But attentive readers of the Apache Spark page in Wikipedia will notice that the article adds the word complement multi-stage along side in-memory :
... Spark's multi-stage in-memory primitives provides ...
Interesting addition to the common belief here is multi-stage. Let's dig into that.

Multi-stage in-memory

MapReduce is a two-stage processing, i.e. the data is read from a stable storage in a map task (first stage), then shuffled and sent to the reduce task (second stage), which pretty much look like a grouped-by-key transformation. Then the data is stored back into a stable storage.
MapReduce two-stages processing
The output of the first MapReduce round might be the input of the next MapReduce round, and so on and so forth until the end of the processing. If the data is stored in HDFS, it is actually stored to disk (if no special storage policy used) and replicated by default three times.
Now is the question: what about sending the output of a reducer directly to the next mapper, by-passing the stable storage step?
Bypass stable storage
That's exactly what Spark is doing when speaking about multi-stage in-memory processing: from any stage within your processing you're able to forward the data directly into a next stage without relying on (disk-based) stable storage.
And this multi-stage processing allows you to represent all the stages of your processing as a DAG (Direct Acyclic Graph), which in turns allows more aggressive optimization (like predicate pushdown, projections, partitions, merge-joins, ...) along side the processing.
In a nutshell, Spark uses the terms RDD (Resilient Distributed Dataset) and transformation to refer to the vertices and edges respectively of the DAG.

Level of Persistance

One question I always ask when I read or heard the word "in-memory" is: what about dataset too big to fit in RAM? Here Spark goes one step forward in term of configuration:  it let you choose your level of persistance between RDDs requiring data transfer. Here is a subset of available persistance levels at the time of writing:

  • NONE : RDD is not cached
  • MEMORY_ONLY : RDD is cached in RAM, without replication
  • MEMORY_ONLY_SER : RDD is cached in RAM in a serialized form
  • MEMORY_AND_DISK : RDD is stored in RAM first, spilled to disk if too big
  • DISK_ONLY : RDD is spilled to disk
  • OFF_HEAP : RDD is cached in RAM, but out of the JVM heap

Most of the persistance levels also have _2 version, like MEMORY_ONLY_2, meaning that the data is replicated two times.
The choice of the persistance level is a typical time/memory tradeoff that you need to solve yourself. In short, know what you're doing.

Fault Tolerance

How Spark handles fault-tolerance is probably the part I prefer. It makes me thinking about my old days in RDBMS world when I first heard about optimistic locking. MapReduce might be seen as using pessimistic approach, in replicating three times every unit of work. Spark on the other hand, took a more optimistic approach and tries to not replicate anything except if you ask so (by changing the persistance level for instance).
When a failure occurs, Spark is referring to the so-called RDD lineage, i.e. the list of transformations that have been applied since the last stable storage, and re-compute the minimum amount of partitions needed to recover the lost data. The last stable storage could be the first RRD, or a checkpoint in between.

Conclusion

As we saw here, Spark is NOT necessary doing all in memory.  Spark is taking an optimistic approach that everything will go well, which, when it is the case, does not have performance penalty. It keeps track of the dependencies of the RDDs and is able to re-compute lost data in a the minimum amount of work. Spark also tries to keep in RAM as much as it can, transferring the data directly from one RDD to another, hence the multi-stage processing. During the data transfer, it provides you lots of power and flexibility on where you want to place the cursor of your time/memory tradeoffs.

So if you'll be asked to compare MapReduce with Spark in your next job interview, an answer that will probably drive the admiration of the interviewer would be: MapReduce is like creating a checkpoint after every transformations in Spark.

But again, everything is a tradeoff in a distributed system, and you'd better know what implies taking this tradeoff versus this other one -- blowing out a cluster was never so easy!

04 May 2015

Hive♥ on Tez♥ on CDH♥ -- Love everywhere! -- Really?

Disclaimer: The question of the utility of doing that is out of the scope of this article.
It's Friday. It's rainy. All the monitoring is green (including me ;)). Time to test something cool. Hey, did someone ever tried to run Tez on CDH? That sounds awesome. Let's give it a try!

At the time of writing, the latest branch is 0.6.0. So go for it. The install guide is pretty straight forward: compile, upload, configure and run. Do not forget to checkout the right branch :)

I installed the libs in /opt/tez-0.6.0 and used the following commands to run it.

Mind the magic option to change the processing engine from mapreduce to yarn: mapreduce.framework.name=yarn-tez

# Setting up configuration environment variables
export TEZ_HOME=/opt/tez-0.6.0 
export TEZ_CONF_DIR=${TEZ_HOME}/tez-conf 
export HADOOP_CLASSPATH="${HADOOP_CLASSPATH}:${TEZ_CONF_DIR}:$(find ${TEZ_HOME} -name "*.jar" | paste -sd ":")"  

# Run an example on YARN 
yarn jar /opt/cloudera/parcels/CDH/jars/hadoop-mapreduce-examples-2.5.0-cdh5.2.5.jar pi -Dmapreduce.framework.name=yarn-tez 16 100000

BAM. This didn't work. I got a stacktrace about  stacktrace:

java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSumsByteArray(II[BI[BIILjava/lang/String;JZ)V
        at org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSumsByteArray(Native Method)
        at org.apache.hadoop.util.NativeCrc32.calculateChunkedSumsByteArray(NativeCrc32.java:86)
        at org.apache.hadoop.util.DataChecksum.calculateChunkedSums(DataChecksum.java:430)
        at org.apache.hadoop.fs.FSOutputSummer.writeChecksumChunks(FSOutputSummer.java:202)
        at org.apache.hadoop.fs.FSOutputSummer.flushBuffer(FSOutputSummer.java:163)
        at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1993)
        at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1946)
        at org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130)
        at org.apache.tez.dag.history.recovery.RecoveryService.handle(RecoveryService.java:265)
        at org.apache.tez.dag.history.HistoryEventHandler.handleCriticalEvent(HistoryEventHandler.java:102)
        at org.apache.tez.dag.app.DAGAppMaster.startDAG(DAGAppMaster.java:1954)
        at org.apache.tez.dag.app.DAGAppMaster.startDAG(DAGAppMaster.java:1911)
        at org.apache.tez.dag.app.DAGAppMaster.serviceStart(DAGAppMaster.java:1628)
        at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
        at org.apache.tez.dag.app.DAGAppMaster$6.run(DAGAppMaster.java:2036)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
        at org.apache.tez.dag.app.DAGAppMaster.initAndStartAppMaster(DAGAppMaster.java:2032)
        at org.apache.tez.dag.app.DAGAppMaster.main(DAGAppMaster.java:1846)


There's already a jira for that: HADOOP-11064. In a nutshell. I’m running CDH 5.2 which is based on Hadoop 2.5 and by default Tez 0.6 is built on Hadoop 2.6.

Let’s do it properly and build it with the exact CDH version I’m running. I created a profile cdh5.2 in the pom file, which looks like this:

Tez on CDH 5.2.5 -- Pom profile -- See Gist 64ba10edb64d744f05a0

And tried to rebuild with the profile cdh5.2 enabled:

mvn -Pcdh5.2 clean package -DskipTests=true -Dmaven.javadoc.skip=true

BAM. I got the following error:

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project tez-mapreduce: Compilation failure 
[ERROR] /home/mil2048/Projects/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/JobContextImpl.java:[57,8] org.apache.tez.mapreduce.hadoop.mapreduce.JobContextImpl is not abstract and does not override abstract method userClassesTakesPrecedence() in org.apache.hadoop.mapreduce.JobContext 

Abstract method org.apache.hadoop.mapred.JobContext.userClassesTakesPrecedence? WTF?

Well, Cloudera applies custom patches to the source code, from time to time breaking downstream project. It's the case for this one. See the commit details: https://github.com/cloudera/hadoop-common/commit/fb3fd746f4340142978596c1997a7bfb19007e69

So I applied the following patch to the sources of Tez and successfully compile the project:

Tez on CDH 5.2.5 -- CDH-4212 -- See Gist 23225004a78949d4c849


The good new is the patch does not seem to be in CDH 5.4 anymore. So Tez on CDH5.4 might be more straight forward.

Now it's built, let's re-run it.

Woot, the PI example works now! First Tez on CDH job run ever!



Ok, first step done. And now, what about Hive on Tez on CDH?

hive
set hive.execution.engine=tez;
SELECT * FROM myTable
BAM.

Well, CDH 5.2/5.3 is shipped with Hive 0.13, which is in turn built against Tez 0.4.1-incubating (see https://github.com/cloudera/hive/blob/cdh5-0.13.1_5.2.5/pom.xml for more details), so let's try Hive with Tez 0.4.0 and my custom patch.

Checkout branch 0.4.0, apply the patch, rebuild, upload, run hive, and ...
Woot!! Hive on Tez on CDH. Time to go and celebrate this great achievement! ;)




.

14 January 2015

Rename a host in Ambari (1.7.0)

Ambari might be tricky for some real life operation, like moving a component from a dead node to another (for instance, the App Timeline Server can't be moved to another host if the current host is dead). We end-up in particular corner cases where it's easier to rename a node in db than failing to move the component.

But updating the hostname is not as straight forward as update hosts set host_name = 'new-hostname' where host_name = 'old-hostname' because the primary key in different tables is the hostname.

Updating all hostname in all the required tables in the same transaction does not work either, because the constraint will fail while doing the update. We should defer constraint checks to ensure the transaction is run successfully.
Mysql

If you choose Mysql as the backend DB, lucky you, it's fairly trivial:
SET FOREIGN_KEY_CHECKS=0; 

BEGIN;
UPDATE ambari.clusterhostmapping set host_name = '${new-name}' WHERE host_name = '${old-name}';
UPDATE ambari.configgrouphostmapping set host_name = '${new-name}' WHERE host_name = '${old-name}';
UPDATE ambari.host_role_command set host_name = '${new-name}' WHERE host_name = '${old-name}';
UPDATE ambari.hostcomponentdesiredstate set host_name = '${new-name}' WHERE host_name = '${old-name}';
UPDATE ambari.hostcomponentstate set host_name = '${new-name}' WHERE host_name = '${old-name}';
UPDATE ambari.hostconfigmapping set host_name = '${new-name}' WHERE host_name = '${old-name}';
UPDATE ambari.hoststate set host_name = '${new-name}' WHERE host_name = '${old-name}';
UPDATE ambari.hosts set host_name = '${new-name}' WHERE host_name = '${old-name}';
COMMIT;

SET FOREIGN_KEY_CHECKS=1;
Postgresql

If you have Postgres as the backend DB, well, there's a bit more work to do:
  1. Delete the constraints about hosts.host_name primary key (yes, really)
  2. Update the hostname
  3. Re-created the foreign key constraints
Automatically find the constraint

Connect to postgres with the admin user

sudo su - postgres
psql -d ambari

Run a query which will generate the ALTER command to drop all the constraints on hosts.host_name

SELECT 'ALTER TABLE '||nspname||'.'||relname||' DROP CONSTRAINT '||conname||';'
FROM pg_constraint
INNER JOIN pg_class ON conrelid=pg_class.oid
INNER JOIN pg_namespace ON pg_namespace.oid=pg_class.relnamespace
where conname in ('hstcmponentdesiredstatehstname', 'hostcomponentstate_host_name', 'fk_hoststate_host_name', 'fk_hostconfmapping_host_name', 'fk_host_role_command_host_name', 'fk_cghm_hname', 'clusterhostmapping_cluster_id') ORDER BY CASE WHEN contype='f' THEN 0 ELSE 1 END,contype,nspname,relname,conname;

Run a query which will generate the ALTER command to create all the constraints on hosts.host_name

SELECT 'ALTER TABLE '||nspname||'.'||relname||' ADD CONSTRAINT '||conname||' '|| pg_get_constraintdef(pg_constraint.oid)||';'
FROM pg_constraint
INNER JOIN pg_class ON conrelid=pg_class.oid
INNER JOIN pg_namespace ON pg_namespace.oid=pg_class.relnamespace
where conname in ('hstcmponentdesiredstatehstname', 'hostcomponentstate_host_name', 'fk_hoststate_host_name', 'fk_hostconfmapping_host_name', 'fk_host_role_command_host_name', 'fk_cghm_hname', 'clusterhostmapping_cluster_id')
ORDER BY CASE WHEN contype='f' THEN 0 ELSE 1 END DESC,contype DESC,nspname DESC,relname DESC,conname DESC;

Save the ALTER commands in you favorite text editor for later, or use the ones provided below.
Delete the constraints

 ALTER TABLE ambari.clusterhostmapping DROP CONSTRAINT clusterhostmapping_cluster_id;
 ALTER TABLE ambari.configgrouphostmapping DROP CONSTRAINT fk_cghm_hname;
 ALTER TABLE ambari.host_role_command DROP CONSTRAINT fk_host_role_command_host_name;
 ALTER TABLE ambari.hostcomponentdesiredstate DROP CONSTRAINT hstcmponentdesiredstatehstname;
 ALTER TABLE ambari.hostcomponentstate DROP CONSTRAINT hostcomponentstate_host_name;
 ALTER TABLE ambari.hostconfigmapping DROP CONSTRAINT fk_hostconfmapping_host_name;
 ALTER TABLE ambari.hoststate DROP CONSTRAINT fk_hoststate_host_name;
Update the hostname

Replace ${old-name} and ${new-name} with the appropriate values.
BEGIN;
UPDATE ambari.clusterhostmapping set host_name = '${new-name}' WHERE host_name = '${old-name}';
UPDATE ambari.configgrouphostmapping set host_name = '${new-name}' WHERE host_name = '${old-name}';
UPDATE ambari.host_role_command set host_name = '${new-name}' WHERE host_name = '${old-name}';
UPDATE ambari.hostcomponentdesiredstate set host_name = '${new-name}' WHERE host_name = '${old-name}';
UPDATE ambari.hostcomponentstate set host_name = '${new-name}' WHERE host_name = '${old-name}';
UPDATE ambari.hostconfigmapping set host_name = '${new-name}' WHERE host_name = '${old-name}';
UPDATE ambari.hoststate set host_name = '${new-name}' WHERE host_name = '${old-name}';
UPDATE ambari.hosts set host_name = '${new-name}' WHERE host_name = '${old-name}';
COMMIT;

Recreate the constaints
 ALTER TABLE ambari.hoststate ADD CONSTRAINT fk_hoststate_host_name FOREIGN KEY (host_name) REFERENCES ambari.hosts(host_name);
 ALTER TABLE ambari.hostconfigmapping ADD CONSTRAINT fk_hostconfmapping_host_name FOREIGN KEY (host_name) REFERENCES ambari.hosts(host_name);
 ALTER TABLE ambari.hostcomponentstate ADD CONSTRAINT hostcomponentstate_host_name FOREIGN KEY (host_name) REFERENCES ambari.hosts(host_name);
 ALTER TABLE ambari.hostcomponentdesiredstate ADD CONSTRAINT hstcmponentdesiredstatehstname FOREIGN KEY (host_name) REFERENCES ambari.hosts(host_name);
 ALTER TABLE ambari.host_role_command ADD CONSTRAINT fk_host_role_command_host_name FOREIGN KEY (host_name) REFERENCES ambari.hosts(host_name);
 ALTER TABLE ambari.configgrouphostmapping ADD CONSTRAINT fk_cghm_hname FOREIGN KEY (host_name) REFERENCES ambari.hosts(host_name);
 ALTER TABLE ambari.clusterhostmapping ADD CONSTRAINT clusterhostmapping_cluster_id FOREIGN KEY (host_name) REFERENCES ambari.hosts(host_name);

Done.

Restart ambari-server

Once you updated the hostname, you need to restart ambari-server.
Actually, you might want to shut it down before running the queries to ensure consistent state of the db.

Resources

If you're interested in more generic readings on deferred foreign keys check, I can recommend this one which was from a great inspiration to me:


.

06 January 2015

Hadoop hftp:// returns an error HTTP_OK expected, received 500

It's fairly common these days to migrate from one version of Hadoop to another. Hopefully you have 2 clusters, and you planned to upgrade one cluster, sync some data and migrate the second one. This means that for a certain period of time, you'll have two different versions of Hadoop running (CDH4 and CDH5 for instance, HDP1.3 and HDP2.2 or a mix of both).
To transfer data between two clusters with different major versions like these, the best way is to use distcp, reading from the source via HFTP and writing to the destination cluster with HDFS, HFTP being read only.

Now things become more tricky when your distcp jobs fail with the error java.io.IOException: HTTP_OK expected, received 500:

FAIL randomFile.txt : java.io.IOException: HTTP_OK expected, received 500
 at org.apache.hadoop.hdfs.HftpFileSystem$RangeHeaderUrlOpener.connect(HftpFileSystem.java:380)
 at org.apache.hadoop.hdfs.ByteRangeInputStream.openInputStream(ByteRangeInputStream.java:119)
 at org.apache.hadoop.hdfs.ByteRangeInputStream.getInputStream(ByteRangeInputStream.java:103)
 at org.apache.hadoop.hdfs.ByteRangeInputStream.read(ByteRangeInputStream.java:187)
 at java.io.DataInputStream.read(DataInputStream.java:83)
 at org.apache.hadoop.tools.DistCp$CopyFilesMapper.copy(DistCp.java:426)
 at org.apache.hadoop.tools.DistCp$CopyFilesMapper.map(DistCp.java:549)
 at org.apache.hadoop.tools.DistCp$CopyFilesMapper.map(DistCp.java:316)
 at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
 at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:417)
 at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332)
 at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:396)
 at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
 at org.apache.hadoop.mapred.Child.main(Child.java:262)

I will give a quick solution of the problem, but in a second phase I will explain the root cause of this issue, dig into HFTP protocol, put my hands down to the namenode source code and more generally give some trick to debug the Hadoop stack.

Short version

Disable "Wildcard binding" in the NameNode. HFTP does not work with the namenode binding on all the interfaces (0.0.0.0). Seriously!

In Cloudera Manager, got to HDFS configuration and search for Wildcard

Ensure the property dfs.namenode.rpc-address in hdfs-site.xml is different from 0.0.0.0:

<property>
    <name>dfs.namenode.rpc-address</name>
    <value>0.0.0.0:8020</value>
</property>

And restart the NameNode.

I reported the issue in Apache Jira: HDFS-7586

If you're eager for more details, keep reading!

HFTP

HFTP is another data transfer protocol builtin Hadoop core.
From the server perspective, it exposes two HTTP endpoints (servlet) in the NameNode and another in the DataNode:

  • NameNode
    • http://namenode:50070/listPaths/${path_to_file_in_hdfs} is handled by org.apache.hadoop.hdfs.server.namenode.ListPathServlet
    • http://namenode:50070/data/${path_to_file_in_hdfs} is handled by org.apache.hadoop.hdfs.server.namenode.FileDataServlet and redirects (header Location) the client to a datanode (storing the first block)
  • DataNode
    • http://datanode:50075/streamFile/${path_to_file_in_hdfs} is handled by org.apache.hadoop.hdfs.server.namenode.StreamFile and streams the file to the client.
From the client, the org.apache.hadoop.fs.FileSystem implementation is org.apache.hadoop.hdfs.HftpFileSystem, but as it's all HTTP requests, curl can be used to emulate the calls:

  1. Calling the namenode to find out where to download from
    $ curl -vv http://namenode1:50070/data/tmp/randomFile.txt?ugi=hdfs | grep "Location:"
     Location: http://datanode003:50075/streamFile/tmp/randomFile.txt?ugi=hdfs&nnaddr=10.0.0.4:8020
  2. Downloading the file
    $ curl -vv http://datanode003:50075/streamFile/tmp/randomFile.txt?ugi=hdfs&nnaddr=10.0.0.4:8020
Easy.
Note: we can call any http://datanode:50075 to download the file, but calling the one returned by the call to the namenode gives the guarantee you're reading from the datanode storing the most blocks locally (see org.apache.hadoop.hdfs.server.common.JspHelper.getBestNode() function for more details).

Note 2: seems like the datanode is receiving the namenode IP in parameter. Not sure why, but it's required.
Debugging without using Curl

Not everybody is familiar with curl, but there's another, easier and thus limited, way to debug hadoop client: turning on logging on the hadoop or hdfs command line tool
In CDH4:
export HADOOP_ROOT_LOGGER=TRACE,console
In CDH5:
export HADOOP_OPTS="-Droot.logger=TRACE,console"

After setting this environment variable, hdfs dfs -cat hftp://namenode1/tmp/randomFile.txt will start to be more noisy:

$ hdfs dfs -cat hftp://namenode1/tmp/randomFile.txt
15/01/06 07:39:29 DEBUG lib.MutableMetricsFactory: field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess with annotation @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=, value=[Rate of successful kerberos logins and latency (milliseconds)], always=false, type=DEFAULT, sampleName=Ops)
15/01/06 07:39:29 DEBUG lib.MutableMetricsFactory: field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure with annotation @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=, value=[Rate of failed kerberos logins and latency (milliseconds)], always=false, type=DEFAULT, sampleName=Ops)
...
15/01/06 07:39:30 TRACE fs.FileSystem: url=http://namenode1:50070/listPaths/tmp/randomFile.txt?ugi=hdfs
15/01/06 07:39:30 DEBUG security.Groups: Returning cached groups for 'hdfs'
15/01/06 07:39:30 TRACE fs.FileSystem: url=http://namenode1:50070/data/tmp/randomFile.txt?ugi=hdfs
cat: HTTP_OK expected, received 500
As you can see, we can figure out more informations. You need to use curl to dig further, but in a lot of other scenarios, turning on DEBUG or TRACE logging on the client is a first vital step.

HTTP_OK expected, received 500

We understood HFTP and are more familiar with the Servlet endpoints of HFTP.

Tthe trace below is still missing some details, mostly what HTTP request is made to the datanode. For this, we need curl.

$ curl -vv http://namenode1:50070/data/tmp/randomFile.txt?ugi=hdfs | grep "Location:"
 Location: http://datanode003:50075/streamFile/tmp/randomFile.txt?ugi=hdfs&nnaddr=0.0.0.0:8020

Ouch, 0.0.0.0:8020. We saw ealier that the datanode is trying to contact the namenode received in paramter. Opening a connection to 0.0.0.0 will always fail...

And the stacktrace relative to the failed request to the datanode:

java.net.ConnectException: Connection refused
 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
 at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
 at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:207)
 at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:528)
 at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:492)
 at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:510)
 at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:604)
 at org.apache.hadoop.ipc.Client$Connection.access$2100(Client.java:252)
 at org.apache.hadoop.ipc.Client.getConnection(Client.java:1291)
 at org.apache.hadoop.ipc.Client.call(Client.java:1209)
 at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
 at $Proxy15.getBlockLocations(Unknown Source)
 at sun.reflect.GeneratedMethodAccessor147.invoke(Unknown Source)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 at java.lang.reflect.Method.invoke(Method.java:597)
 at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
 at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
 at $Proxy15.getBlockLocations(Unknown Source)
 at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:155)
 at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:970)
 at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:960)
 at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:239)
 at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:206)
 at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:199)
 at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1117)
 at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1103)
 at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1089)
 at org.apache.hadoop.hdfs.server.namenode.StreamFile.doGet(StreamFile.java:94)
Obvious, the datanode can't connect to the NameNode. We go the root cause.


But where this 0.0.0.0 comes from?

That's the question. The FileDataServlet is responsible for redirecting the client, so this should be the guilty. Digging into org.apache.hadoop.hdfs.server.namenode.FileDataServlet source code shows that the redirection URI is generated using dfs.namenode.rpc-address parameter (DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY) to get the ip address. But if you turned on wildcard binding, this value contains 0.0.0.0:8020 in hdfs-site.xml:

<property>
    <name>dfs.namenode.rpc-address</name>
    <value>0.0.0.0:8020</value>
</property>

This concludes our investigation: HFTP is broken with wildcard binding feature turned on.

Now, how to fix that?

The easiest way is to disable wildcard binding if it's not really needed. But if it is really needed, then code change is required:

As the Servlet knows to which of its IP addresses the request comes from, it could use this IP instead of 0.0.0.0. This might not work in all the scenario, where you have HFTP listening on one interface and HDFS (RPC) on another, but this would already do a better job than the broken current version.

--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.util.ServletUtil;
 public class FileDataServlet extends DfsServlet {
   /** For java.io.Serializable */
   private static final long serialVersionUID = 1L;
+  private static final String WILDCARD_INTERFACE = "0.0.0.0";
 
   /** Create a redirection URL */
   private URL createRedirectURL(String path, String encodedPath, HdfsFileStatus status, 
@@ -80,6 +81,19 @@ public class FileDataServlet extends DfsServlet {
     NameNode nn = NameNodeHttpServer.getNameNodeFromContext(
         getServletContext());
     String addr = NetUtils.getHostPortString(nn.getNameNodeAddress());
+    // Wildcard binding special case: returning 0.0.0.0 to the DN is always wrong.
+    if (addr != null && addr.startsWith(WILDCARD_INTERFACE + ":")) {
+        LOG.warn("Wildcard binding seems to be turned on. Returning 0.0.0.0 is always wrong. " +
+            "The best we can do is using the ip address the requested is connected to " +
+            "(HttpServletRequest.getLocalAddr()) instead (" + request.getLocalAddr() +"), but this might " +
+            "not always be correct (multiple interface, hftp listen to one interface, hdfs-rpc to another one). " +
+            "Please turn off wildcard binding if you're using HFTP, i.e. " +
+            "dfs.namenode.rpc-address (DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY) and " +
+            "dfs.namenode.http-address must (DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY) " +
+            "must point to a ip address different than 0.0.0.0.");
+
+        addr = addr.replace(WILDCARD_INTERFACE, request.getLocalAddr());
+    }
     String addrParam = JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, addr);
     
     return new URL(scheme, hostname, port,

Happy Hadoop debugging and patching!

Final Note: Hadoop Core has a really good code coverage, but the MiniDFSCluster is forcing to listen on 127.0.0.1, i.e. it does not test anything in regard to wildcard binding. That's probably why this issue wasn't found by the tests.
.