I recently read with interest this post over at iTnews Labs. In it, they demonstrate how to set up a test instance of Hadoop, everyone’s favorite elephantine data cruncher, in no more than an hour. Bringing up a working Hadoop system from a scratch is no small feat, as evidenced by the Linux-esque distribution ecosystem that’s grown up around it (see Cloudera, Hortonworks, et. al.). So kudos to the guys for consolidating what has been, until now, a lot of disparate information into a single detailed walkthrough.
Having a local instance of Hadoop is great to get tinkering with, and see what’s under the hood. But what happens once you’ve learned your chops and actually want to start crunching some of those terabytes? After all, if you’re attracted to Hadoop, you must be sitting on a pile of data. (And if you’re not, my advice is: use a regular database. Herding elephants is too much like hard work).
Inevitably, your poor old laptop’s going to blow a sand shoe, you’ll realise you need to build something bigger, and then you’re back where you started, tailing logs and reaching for the Vi cheatsheet. What would be ideal is something you can get up and going in minutes, tinker with until you have a concrete plan, then scale out to meet the demands of your full corpus. And ideally you don’t want to do all the heavy lifting twice. You can’t have it all, you say?
You can when you cheat.
Elastic MapReduce (EMR) is the Hadoop-as-a-service offering from Amazon Web Services (AWS). It offers all the capabilities of vanilla Hadoop, but with almost instant setup, arbitrary scaling and minimal commitment.
Like many products in the AWS suite, it’s most cost-effective when used as an on-demand service, and scaled back to minimal or no footprint when not in use. This makes it ideal for rapid prototyping of map / reduce solutions, as you pay only for the compute time you use, with no further commitment to the infrastructure. As an example, a cluster of 5 nodes, each with 8 virtual cores and 7 GB of RAM, will cost around $3.50 per hour to run in the US East region (and much cheaper if you use the spot market, but more on that later).
Wait a minute… where did everybody go? Yes, you’ll need to put your hand in your pocket, and in that respect I concede I’m stretching the brief. But I’d ask you to consider for a moment what your time is worth (for me, time is 100% pure unobtainium). What I hope to demonstrate in this article are a few ways to productively use a metered service and still minimise your outlay.
And as you’ve been so good as to lend me your lunch break, I’ll set a little challenge; how much data can we process in an hour, without spending more than you just did on your sandwich?
The first prerequisite for running the following examples is a working AWS account. There’s no charge for holding an account, you only pay for the compute time you use.
Additionally, we’re going to use the Amazon EMR Command Line Interface (CLI), which requires Ruby 1.8. You can find instructions for installing the CLI in the EMR Developer Guide. The examples we’ll use are all written for a Bash shell, apologies to Windows users. If you’re on Windows, please check out the developer guide for the correct incantation, which is usually as easy as prefixing the command with an explicit call to the Ruby interpreter.
In map / reduce land, the equivalent of the canonical “Hello World” example is Word Count. Word Count takes a body of text, and counts the frequency with which words appear in that text. To get you started there are various pre-canned bodies of text available to throw yourself against. The Wikipedia database dump is one well known one, but unfortunately the dump is serialized to XML, and we aren’t masochists, so we’ll steer clear of that.
Far more appealing is the Common Crawl, an open repository of 6 billion odd web pages. The Common Crawl is available straight from Amazon S3, and is stored as (amongst other things) Sequence Files, which are a native Hadoop key / value serialization. Those Sequence Files are then grouped into segments, which is the output bucketing used by Nutch, the open source web crawler used to gather the data. Segments sizes vary, but most are around 100 GB each in compressed form, or about 3 times that size as text.
Running a Cluster
First up, let’s launch a simple “cluster” and measure how long it takes to become available. In rough terms this is comparable to the example demonstrated by iTnews Labs, in the sense that we’re starting a bare, single node pseudo-cluster, which would then be available for submitting jobs.
Happily, the EMR CLI sets a lot of defaults for us, so it’s as simple as:
And that’s it. The CLI returns us a job flow ID, like so: ` Created job flow j-30Y4E7T52UPJT output `
Which we can then use to see how long the cluster takes to come up (specifically, to transition to WAITING state):
real 3m26.872s user 0m4.939s sys 0m1.052s
So in three and a half minutes, we have a working system. Not bad. You can SSH to it if you want, at which point you have the regular Hadoop CLI at your disposal, which allows to you to do I/O to HDFS, and launch your own map / reduce jobs.
We used an instance type of m1.large which has 2 virtual cores and 7.5 GB of RAM, which I’m guessing is as much as most people could afford to hand over to a VM on their laptop. And we have the remaining 56 minutes and 30 seconds in our bakeoff to take it for a test drive, which in the US East region will cost you the princely sum of 30 cents. Is an hour of your time worth 30 cents? Mine’s cheap at ten times the price.
Now before we move along, let’s take care of business:
Running a job
I mentioned earlier that the Common Crawl is available as a public data set on AWS S3. Residence on S3 is important, as it allows us to demonstrate one of the extensions to Hadoop that’s used commonly with Elastic MapReduce; the ability to process data from S3, either directly, or by first bulk copying the data to HDFS.
This is valuable as it provides a separation of concerns between data storage and processing, and we can then better understand the different cost models that apply to each service. S3 is optimised for long term, high availability data storage, with costs as low as $1k per terabyte-year for Reduced Redundancy Storage. On the other hand EMR (at least in my opinion), is better suited to on-demand analytics, and it’s the tight integration with S3 that makes this approach feasible. By storing data cheaply, and paying for cluster time only when we need it, we can make the utility computing model work to our advantage.
So in the next example we’re going to demonstrate how to process data directly from S3, specifically we’ll count the occurrences of the words “Hello”, and “World” in a tiny subset of the Common Crawl. We’ll be using Hadoop Streaming, which lets you run your mapper and reducer implementations out-of-process, and therefore write them in any language you like. In this case I’ve used a Ruby mapper that emits the count of the relevant words in each line of the input, and an accompanying reducer to aggregate those counts.
As with the last example, we’ll time it from a clean start.
Created job flow j-31L35MBT0LEBJ
This command launches a single node pseudo-cluster, specifies we want to run a streaming job, and identifies the input as being one part of one segment of the Common Crawl, chosen at random. The input file is about 45 MB in uncompressed form.
Not long after, we find a bunch of files in our output bucket, one of which contains the results:
hello 312 world 3645
Using the same timing loop as we did earlier (except this time waiting for COMPLETED state), we got:$ time while …real 4m45.939s
…for a total cost of 30 cents with 55 minutes to spare.
Again, pretty spritely even for a small pachyderm, but of course we only processed a trivial amount of data. It would’ve been easier and quicker just to grep through it. The point of this example, though, is that by working directly from S3 storage it’s possible to prototype your map / reduce implementation against small subsets of your data, at low cost, and with minimal ramp-up time.
Processing data straight off S3 is great for prototyping and experimentation on small data sets, but what about larger chunks? You might rightly be concerned about having Hadoop stream it’s data over the network, rather than working (as it was designed to do) from HDFS on locally attached storage.
Still, you might be surprised. The parallel nature of map / reduce spreads the transfer out over many processes, and the bandwidth between S3 and EC2, while not officially specified, has improved over time.
So let’s a have a look at how this goes with a slightly bigger data set; this time we’ll process an entire crawl segment, about 120 GB in compressed form. I think we’re going to need a slightly bigger boat…
Created job flow j-2ZLAG8QEM0BK9$ time while … real 22m25.155s
So that time we were done in 22 minutes for a total cost of $8.03, including cluster creation, data transfer and processing. Uncompressed our corpus was about 350 GB, and we gave it 10 nodes of 8 virtual cores each. Not entirely shabby, but at this rate we’d barely crack a terabyte of text in an hour. I think our streaming prototype may have had its day.
Part of the problem with the previous example is that the individual sequence files that make up the crawl segment are much smaller than an HDFS block, causing Hadoop to create many more mappers than it would ideally like. That’s better handled in later versions, with new flavours of SequenceFileInputFormat, but that’s a story for another day.
What’s more we’re still piping all the data out to a Ruby interpreter, which can hardly be helping. Assuming that by now we have a concrete approach to our map / reduce problem, we can consider moving our implementation into pure Java (or for that matter using a declarative approach like Pig). You might recoil at the thought of implementing the solution twice, but never underestimate how much time you can burn testing exploratory code on a real cluster. Being able to work out the initial approach in a rapid development environment can be a life-saver.
Here’s a Java based implementation, run on an equivalent cluster, and against the same crawl segment.
Created job flow j-30UW9BCRZQIBD$ time while … real 17m42.939s
Somewhat better. Keep in mind that the cluster startup and network transfer times are more-or-less constant between the two examples, which somewhat obscured the performance advantage of the native implementation. For persistent clusters running multiple, chained jobs from HDFS, the payback is more obvious (see also Persistent Clusters below).
Pacing your optimisations is always a tricky line to walk, and it’s no different here. Working with the streaming API gets us out of the blocks quickly, however knowing when to commit to a more performant, albeit fussier implementation is key to balancing cost.
So with a good 40 minutes left on the clock we could be reasonably confident about chewing through a terabyte, but not with a lot of change. We can still do better.
This is the really fun part.
As you saw from the preceding examples, costs can start to rise pretty sharply once you start to run clusters of more powerful computers. For production analytics with a demonstrated business case this can be acceptable, but for exploratory or prototyping work it’s a little hard to swallow.
So far we’ve been buying our compute time in the On Demand market. What would the picture look like if we were to bid for some unused compute time in the Spot market? As I write this, spot prices for select instance types in the US East region look like this:
|Instance Type||Price ($/hr)|
Straight away you see the vagaries of the spot instance market. Some instance types are significantly cheaper than their less powerful brethren, and others are even trading way above their own On Demand price!
But if we go with the flow we can turn this to our advantage. I’m looking at cc2.8xlarge, a snap at 27 cents an hour for 32 cores and 60 GB of RAM.
Let’s re-run the previous Java-based example with a stack of those bad boys. This time I’ll separate the launching of the cluster from the job, as the spot instance bids can take some time to work their way through the market, and as that’s not something we pay for I won’t bother measuring it.
Created job flow j-2XP9O9IRLHHBU
Our spot market bids were successful, and the cluster’s now up and running, so let’s launch the word count.
Added jobflow steps$ time while … real 05m02.003s
Well we creamed that last one, so let’s scale it up to 15 crawl segments and see if we can smash through that in the hour.
Added jobflow steps$ time while real 48m45.812s
This time our corpus clocked in at 1.7 TB in compressed form, or about 5 TB as uncompressed text. Elapsed time was 49 minutes for a total cost of $8.73.
Disappointingly we didn’t manage to bring the cost down in absolute terms, as the EMR instance premium (50 cents on cc2.8xlarge) wiped out the money we saved in the spot market. The good news though is that we got much more iron for our money, getting us up into terabyte scale processing and still from a standing start.
The obvious drawback of the spot market (I’d call it the elephant in the room, but I think I’ve sufficiently flogged that metaphor) is that the market price may exceed your bid while you have jobs in flight. That may (although won’t necessarily) mean that you lose instances, which could interrupt your job.
Hadoop’s retry mechanism will do it’s best to recover from that, and due to the replication in HDFS you’ll be unlikely to lose data unless a bunch of your nodes disappear at once. In any case, in my experience it’s easy to bid conservatively enough that instance loss is rare, while the cost benefit can still be dramatic.
The master node, however, is a single point of failure, so it’s game over if that one goes away. As a result we’ve kept the master in the on demand market. As it represents less than 5% of our compute power, it doesn’t make much difference to the overall price anyway.
Minding the pennies
I’ll wind up with a few more tips on balancing cost.
Throughout this article I’ve been using Hadoop Streaming, a standard I/O API for implementing mappers and reducers as standalone executables. Running out-of-process like this certainly won’t be as fast as a pure Java implementation, but it has one very important advantage that can also help to minimise your metered compute time.
When running streaming jobs, Hadoop uses the standard input and output streams of your map and reduce executables to supply and collect the data. This means that you can get started prototyping your implementation in the shell using ordinary pipes and console utilities.
Working with Ruby, the simplest example might look like this:$ cat input_file.txt | mapper.rb | reducer.rb
However your reducer will normally expect the input keys to have been sorted prior to being called, which we could approximate with something like:$ cat input_file.txt | mapper.rb | sort -t$’\t’ -k1 | reducer.rb
Custom input formats are a little problematic, as currently they can only be implemented in Java. While I was preparing this article I wrote a small JRuby wrapper around SequenceFileInputFormat so that I could deserialise individual sequence files into my mapper on the command line. The toolchain I ended up using looked like this.
hello 634 world 7580
You can find the implementations of all these pieces in our Github repo.
The previous examples have all been one-shot jobs, in order to demonstrate the use of EMR as an on-demand service, and to underline the separation of storage from processing in the AWS landscape.
However, the nature of big data is that you often don’t come with a well-defined idea of what intelligence can be gained from your data, only that there’s sure to be some diamonds hidden in there. It’s not so much about finding answers, then, but discovering questions, and that’s a much more iterative process. For example, you might be using Hive to run a series of exploratory queries against your data to get a feel for what gems may lie hidden therein.
When running a series of tasks on a persistent cluster in this way, we don’t want to be paying the cost of that network transfer for every job or query, as we do when processing data directly from S3. Furthermore, the S3 direct route negates one important feature of Hadoop that allows it to scale to very large volumes. That’s Data Locality, which refers to Hadoop’s ability to have each mapper process data that’s (mostly) local to the node the mapper is running on. Take the code to the data, as it were.
DistCp is a tool built into the standard Hadoop toolset, which leverages the distributed nature of map / reduce to get data onto the cluster. Think of it as a parallelising upload manager for Hadoop. EMR takes it one step further with S3DistCp, which, as the name suggests, enhances plain DistCp to use S3 as a data source.
Re-using the last cluster, we can bulk copy data from S3 like this:
Added jobflow steps$ time while real 09m49.278s
The amount we copied was around 120 GB in compressed form. As stated previously, bandwidth between S3 and EC2 isn’t something AWS publish, but a network connection is a network connection, so we’d expect that to take a while. As you can see, by using S3DistCp we were able to chop that down to just under 10 minutes.
Now let’s run the word count against that crawl segment:
Added jobflow steps$ time while … real 03m57.661s
As you can see, performing the data copy and the map / reduce in separate steps isn’t as efficient as doing it in one. However we’ve now got our data on HDFS and ready for repeated processing with the full benefits of directly attached instance storage and HDFS locality.
This last one might seem like a no-brainer, but I think it bears articulating; shut the cluster down when you’re finished.
This is really an extension of the discipline the becomes second nature once you get used to using cloud computing services. Forgotten instance hours can really add up, and that’s only magnified when you’re dealing with clusters of highly spec’d computers.
In the last few examples above I’ve used the –alive argument to keep the cluster running when the last job finishes, however if you know you’re doing one-shot processing, consider dropping that flag. The cluster will then terminate itself once it becomes idle, which is a nice safety net.
If you do run with keep-alive, make this step a part of your muscle memory:
If you’re unsure about the state of your cluster(s), the –list argument is your friend:
# We’re looking for COMPLETED in the second column to verify the cluster’s been shut down.j-2XP9O9IRLHHBU COMPLETED ec2-54-224-192-110.compute-1.amazonaws.com Common Crawl word count keepalive COMPLETED Example Jar Step
The trick to getting somewhere with Hadoop map / reduce is to start small, and increase the size of your workload as your confidence in your implementation grows. Hadoop is a cumbersome beast, so any fast track to a working proof of concept is welcome.
For that reason it’s great to see people out there sharing their quick start recipes, as iTnews Labs did recently with their post on getting a Hadoop test instance running in under an hour.
What I hoped to demonstrate in this article is how Elastic MapReduce gives you not only a very fast start, but also a path to scaling your solution as you start to pile on the data. The disposability of the infrastructure means you’re never committed to a single size solution, and you can iteratively determine the scale that matches your business case.
The trade-off of course is that EMR is a metered service, and so time is quite literally money. Some of the techniques I’ve used here to keep the cost under control are:
- Build your early prototypes against the streaming API, and start your work on the command line using standard console tools to simulate the behaviour of the cluster.
- Run your next experiments from small, compressed subsets of your corpus hosted on S3. If your primary data store is elsewhere, also take advantage of the lower cost of Reduced Redundancy Storage.
- Stick with Hadoop Streaming while you solidify your approach, as you can return to the console any time to iron the bugs out.
- As you scale up, the performance penalty of the streaming API will at some point start to outweigh the benefit. Step up to a more performant implementation (such as Java, Hive or Pig) once you have the confidence to run your solution at a bigger scale.
- For any non-production, or non-time-critical work, always use the spot market. You may lose a few nodes here and there, but it’s well worth it.
- When running multiple, or chained jobs against the same data set, first bulk copy the data onto HDFS using S3DistCp.
- Plan the cluster’s lifecycle in advance; use auto-termination when running one-shot jobs, or otherwise set a reminder to shut it down manually before you go home.
I hope this has been helpful. Now go saddle up that beast!
- The source code for the examples used in this article: https://github.com/DiUS/emr-examples
- The original iTnews Labs article on setting up a Hadoop testbed: http://www.itnews.com.au/Lab/359466,the-worlds-tiniest-hadoop-testbed.aspx
- The Common Crawl’s own guide on processing the corpus with EMR: http://commoncrawl.org/mapreduce-for-the-masses/
- More detail on optimising use of S3 with Hadoop: http://blog.mortardata.com/post/58920122308/s3-hadoop-performance
- Another comparison of the different deployment models of Hadoop: http://www.accenture.com/us-en/Pages/insight-hadoop-deployment-comparison.aspx
- AWS white paper on EMR best practices: http://media.amazonwebservices.com/AWS_Amazon_EMR_Best_Practices.pdf