20 October 2015

INotify for HDFS: from ideation to open source

TL;DR: Trumpet, scalable INotify-like system for HDFS, is out: https://github.com/verisign/trumpet.

Polling a filesystem to detect files creation might be cumbersome and inefficient. Linux brought an answer to this called inotify: you register a listener to a folder and get notified asynchronously when an change occurs in that folder.

I started to ask myself the question back in 2013: what about HDFS, how to get notified when a new file is created in HDFS?

Use-cases leveraging this feature are multiples.

  • The most obvious one being a processing workflow, when a file is created, the next stage of processing could start immediately: instead of polling HDFS to detect, say, a _SUCCESS file in a particular folder, the next stage would start directly by being notified once the file created
  • The most interesting to replace would be files monitoring: instead of recursively polling a entire directory tree, which might be consuming a significant amount of resources in HDFS, the monitoring system would be notified on new file creation in a given folder.

Convinced about the value of such system, I started working on it with my colleagues. After several iterations and multiple failures, the current version is running in production for several months, handling on average 1000's of events per seconds, and an order of magnitude more with peaks.

Key features of Trumpet are:

  • Non-intrusive to the NameNode: as a Hadoop admin, my first priority is to protect the NameNode. Trumpet server is then running in a different JVM than the NameNode or the JournalNode, extracting the transactions from the edits log files, the HDFS redo log files. This actually happened to be pretty straight forward, and (almost :)) without touching the NameNode code.
  • Scalability: Trumpet is leveraging Kafka -- its pub-sub messaging model fits perfectly what I'm trying to achieve with Trumpet: reliably broadcasting HDFS events to multiple consumers. The current implementation can server hundreds of subscribers without problem -- actually, Trumpet scales as well as your Kafka cluster.
  • Highly Available: Trumpet is designed from day one to run in HA mode, with multiple processes electing a leader, and recovering from a previous state.
  • Ops friendly: I'm Hadoop ops, and I like making my life easier. Trumpet provides all the tools to validate, dig, watch, monitor a running installation. This also includes the support for rolling upgrade!
  • Simple: in total, Trumpet is about 3K line of codes. Most of them are boilerplate for exception handling. The main components of Trumpet probably fits in a 100's lines of code. Trumpet is only doing one thing, but is doing it rather well.

And that's it. Install it and use it, and build awesome reactive pipeline on top of it!

Oh yeah, and if you don't have Kafka running along side your Hadoop cluster, well, you definitely should -- Kafka is awesome!

13 October 2015

Spark is an Optimistic Implementation of MapReduce

Apache Spark is really trendy these days, to the point that people starts speaking about a unified data processing platform, or in analogy to Lord of the Rings, one stack to rule them all. Benchmarks are flourishing all around, comparing Spark with Map Reduce, Tez, Flink and your grandmother mixer. I'm not here to debate about the relevance of these benchmarks, and I personally always smile when I see a benchmark claiming X to be 100 times faster than Y -- to quote Taylor Goetz, "any system can be hobbled to look bad in a benchmark". No, the goal of this small article is to understand what's so cool with Spark, or said differently what can be done better in MapReduce.

Common belief

If you ever ask in a job interview about the difference between Spark and MapReduce, most of the candidates will answer "Spark is doing in-memory processing so it's faster". Well, I'm probably biased with MapReduce code, but before spilling to disk, MapReduce is doing a significant part of the processing "in-memory". Or said differently, if you're comparing a word count on a 1PB datasets between MapReduce and Spark, you'll see that MapReduce is not slower than Spark.

But attentive readers of the Apache Spark page in Wikipedia will notice that the article adds the word complement multi-stage along side in-memory :
... Spark's multi-stage in-memory primitives provides ...
Interesting addition to the common belief here is multi-stage. Let's dig into that.

Multi-stage in-memory

MapReduce is a two-stage processing, i.e. the data is read from a stable storage in a map task (first stage), then shuffled and sent to the reduce task (second stage), which pretty much look like a grouped-by-key transformation. Then the data is stored back into a stable storage.
MapReduce two-stages processing
The output of the first MapReduce round might be the input of the next MapReduce round, and so on and so forth until the end of the processing. If the data is stored in HDFS, it is actually stored to disk (if no special storage policy used) and replicated by default three times.
Now is the question: what about sending the output of a reducer directly to the next mapper, by-passing the stable storage step?
Bypass stable storage
That's exactly what Spark is doing when speaking about multi-stage in-memory processing: from any stage within your processing you're able to forward the data directly into a next stage without relying on (disk-based) stable storage.
And this multi-stage processing allows you to represent all the stages of your processing as a DAG (Direct Acyclic Graph), which in turns allows more aggressive optimization (like predicate pushdown, projections, partitions, merge-joins, ...) along side the processing.
In a nutshell, Spark uses the terms RDD (Resilient Distributed Dataset) and transformation to refer to the vertices and edges respectively of the DAG.

Level of Persistance

One question I always ask when I read or heard the word "in-memory" is: what about dataset too big to fit in RAM? Here Spark goes one step forward in term of configuration:  it let you choose your level of persistance between RDDs requiring data transfer. Here is a subset of available persistance levels at the time of writing:

  • NONE : RDD is not cached
  • MEMORY_ONLY : RDD is cached in RAM, without replication
  • MEMORY_ONLY_SER : RDD is cached in RAM in a serialized form
  • MEMORY_AND_DISK : RDD is stored in RAM first, spilled to disk if too big
  • DISK_ONLY : RDD is spilled to disk
  • OFF_HEAP : RDD is cached in RAM, but out of the JVM heap

Most of the persistance levels also have _2 version, like MEMORY_ONLY_2, meaning that the data is replicated two times.
The choice of the persistance level is a typical time/memory tradeoff that you need to solve yourself. In short, know what you're doing.

Fault Tolerance

How Spark handles fault-tolerance is probably the part I prefer. It makes me thinking about my old days in RDBMS world when I first heard about optimistic locking. MapReduce might be seen as using pessimistic approach, in replicating three times every unit of work. Spark on the other hand, took a more optimistic approach and tries to not replicate anything except if you ask so (by changing the persistance level for instance).
When a failure occurs, Spark is referring to the so-called RDD lineage, i.e. the list of transformations that have been applied since the last stable storage, and re-compute the minimum amount of partitions needed to recover the lost data. The last stable storage could be the first RRD, or a checkpoint in between.


As we saw here, Spark is NOT necessary doing all in memory.  Spark is taking an optimistic approach that everything will go well, which, when it is the case, does not have performance penalty. It keeps track of the dependencies of the RDDs and is able to re-compute lost data in a the minimum amount of work. Spark also tries to keep in RAM as much as it can, transferring the data directly from one RDD to another, hence the multi-stage processing. During the data transfer, it provides you lots of power and flexibility on where you want to place the cursor of your time/memory tradeoffs.

So if you'll be asked to compare MapReduce with Spark in your next job interview, an answer that will probably drive the admiration of the interviewer would be: MapReduce is like creating a checkpoint after every transformations in Spark.

But again, everything is a tradeoff in a distributed system, and you'd better know what implies taking this tradeoff versus this other one -- blowing out a cluster was never so easy!