HADOOP FOR THE LONE ANALYST, WHY AND HOW

INTRO

Here at StyleFeeder, we spend a lot of time figuring out what our users are doing, and trying to figure out what they want. One of the tools we have brought to bear on these questions is Hadoop. Among the technical tools these days, Hadoop is like the prettiest girl in school, and it’s easy to think you should be bringing her to every conceivable dance. You shouldn’t: there are plenty of problems that Hadoop can’t solve, or for which there are better tools. But there are some problem spaces where it excels: web analytics and preparation for search, to name two. This post is informed by our use of it for web analytics.

This is a long piece, but I figured we might as well get this all up in one place. To skip straight past the blather and into the HOWTO, go here.

WHO

One of the great promises of Hadoop is that an analyst working alone can operate what formerly required a dedicated cluster with special-purpose analytical software, and a team of sysadmins and systems programmers who know C and possibly Assembler. This analyst is not an end-user, in the traditional sense, but a person who knows some unix and a scripting/declarative language or two, say, Python and SQL. Such a person, in a big company, used to have to requisition hardware in, say, January, and try out maybe one pet theory, if he’s lucky, in April. Now he can try out an idea in a week, or a day, or a few hours. He can try out his theories almost as fast as can think of them. That’s what we’re after. Within a defined but still very large problem space, Hadoop seems to deliver on that promise, and we’re pretty excited about it. We don’t have red tape like that, but we don’t want to spend a lot of money on a team of cluster operators either.

WHY

At StyleFeeder, our richest sources of information about user behavior are MySQL tables full of events, and web server logs. We could dump these all into a MySQL database, but MySQL is nobody’s favorite choice for a data warehousing platform, in part because the joins and the aggregate functions are so slow and primitive. We could dump everything into a database that’s better suited to analytics, but that would probably involve license fees and a commitment to a finicky platform with a much more particular sweetspot in terms of problem space. We’re not really sure where we might need to go with these analytics, so we want to keep our options open. Hadoop is a sort of Swiss army knife for big data, whether structured or not, so it looks like it will do for now. Finally, we don’t have to lease or buy a bunch of hardware: we can run everything on Amazon Web Services EC2 or EMR (Elastic Map Reduce), and just pay for what we’re using.

INITIAL INVESTIGATIONS

The rest of this post will be very specific, and include a lot of “obvious” things. You need to know a little about a lot of things to get the setup just right, and we’re conceiving of the audience for this as the analyst who doesn’t necessarily know how best to install all the software he can use competently. So bear with us on all that. This is really a catalog of the things we wish we had had somebody to explain to us, as we fumbled our way through.

We used Cloudera’s scripts to launch an EC2 cluster, and ran some “Hello, world”-type programs. We counted the incidence of all the words in Shakespeare and the Bible–it works! We subscribed to a few Apache Hadoop mailing lists. As you might expect in a very popular, pre-1.0 project, it’s a community in a state of rollicking ferment. Note to self: we may need to apply some patches, if we run into trouble down the road. Is that a problem? Short answer, no, but it’s not going to be perfectly straightforward.

These Cloudera Python scripts have been contributed to the trunk of Hadoop, but they are not yet in the 0.20 or 0.21 branches. They are somewhat generalized, in that form, relative to the versions we are using, so for example the configuration directory is called “.hadoop-cloud” rather than “.hadoop-ec2”, with associated actual changes. In this discussion, we’re talking about the separately released Cloudera scripts, which have been around for a while. There are also bash ec2 scripts that seem to peter out at version 0.19 in the Apache distribution, no doubt because they have been superseded by the Python scripts. The version we’ve been using is available here: http://archive.cloudera.com/docs/_getting_started.html.

There’s a lot of talk on the Hadoop mailing lists about administering what are obviously dedicated Hadoop clusters. These are people who are much more committed to running Hadoop than we are, and who have it running constantly. That’s not really what we’re after, so we’re going to pay attention to what they’re doing, but not really do it the same way.

HOW TO SET UP AND RUN HADOOP

This will probably get out of date quickly, but here goes. One of the mildly frustrating things about doing all of this has been that most of the examples out there are for version 0.18.3 and lower, and they don’t “just work” with more evolved versions of Hadoop. There’s a dearth of HOWTOs for version 0.20 and beyond, so this will be one.

  • Hadoop runs on a few platforms, but most of the action in the community is on Debian/Ubuntu or Redhat/Fedora/CentOS. Let’s stick with what we’re going to be able to get help with. If you want to do exactly what we did (and trust me, diverging in small ways from this can lead you down some rat holes), start here: have a local CentOS 5 machine, or (as in our case), a VMWare-based virtual machine running CentOS 5. 32-bit will do (that’s what we have), but we will have some uses for a 64-bit machine. In our case, we run a 32-bit machine locally, and fire up an EC2 instance when we need to do some 64-bit builds.
  • Install python 2.5 or 2.6. CentOS 5 needs the system-installed rpm of /usr/bin/python 2.4, or the package manager will complain. There are a couple of rpm packages out there for python 2.5 or 2.6 on CentOS 5. Long story short, they didn’t work for us. As root, download the Python source to /usr/local/src; ./configure –prefix=/usr/local/python2.5; make; make install. In your own (non-root) .bashrc or equivalent, set PYTHONHOME to /usr/local/python2.5, and PATH to $PYTHONHOME/bin:$PATH.
  • Install boto and simpleson, which are Python libraries. easy_install simplejson seems to do the trick, at least for now, but the EC2 launcher scripts, although they say “boto 1.8 or higher” will complain like stuck pigs if you get boto 1.9. So download the source for boto 1.8, temporarily set root’s environment (PYTHONHOME, PATH) as above, and do ‘python setup.py’ build and install.
  • If you don’t have one, set up an AWS/EC2 account, and get your ssh keys sorted out for that, so you can shell out to your EC2 instances.
  • The README in the EC2 launcher scripts tells you in detail how to configure your local environment to launch a cluster. Use the fedora ami that Cloudera makes available. If you want something special for your data processing, like setting the timezone to Central European Time, or whatever, you can launch an instance off that ami, make whatever changes you need, and bundle it as a private or public image for yourself. Then change the id of the ami in the config file. The Cloudera amis have Sun java 1.6 on them, which you will need. Hadoop is not pre-installed, but rather installed by the scripts from their yum repository. We have our own internal procedures for installing java, involving the jpackage process, and if you’ve got something of your own, you can probably just launch an appropriate image, install a couple of things and be off to the races. YMMV. The Cloudera images work fine.
  • When you have a cluster configured in a local text file (let’s call it ‘mycluster’, for future reference), do “hadoop-ec2 launch-cluster –env REPO=testing –env HADOOP_VERSION=0.20 –user-packages ‘s3cmd lua lua-devel lzo-devel lzop’ mycluster 3”. Supply whatever user packages you want. It’s going to run “yum -y install packagename” on all the nodes. There are reasons for the ones on that list, which we’ll see later. This will launch a name node (the central controller) and 3 slaves. Since the data in Hadoop is replicated 3 times by default, this seemed like a good test cluster size. Why not run locally? We did, for a while, and we still do sometimes, but certain things just don’t work the same. If you’re new to Hadoop, you can get very frustrated by trying out things that work locally in tests and then fail in the cluster. Try out a small cluster until you get the hang of it. For tests, use an instance size of m1.small, but switch that to at least c1.medium when you start crunching real data. Why version 0.20? Version 0.18.3 is the stable one right now, but by last December version 0.20 was already close to being declared stable by Yahoo, and lots of people have it in production. You get Sqoop, you get more advanced versions of Hive and Pig, and the API is closer to what it will be going forward. Odds are that 0.20 has some features you’ll want.
  • Try out a few things. First, from your local box, run “hadoop-ec2 update-slaves-file mycluster”. This will update a file called $HADOOP_HOME/conf/slaves on the name node. Then do “hadoop-ec2 login mycluster”. This will give you a shell on the namenode. Then do “slaves.sh -o StrictHostKeyChecking=no uptime”. This will get ssh’s known-host checking out of the way once and for all (or at least until you terminate this cluster and launch another one). From this point on (on this namenode), you can do “slaves.sh uptime” to see how hard the slaves are working. Go into the /etc/passwd file and change the ‘hadoop’ user’s shell from /sbin/nologin to /bin/bash. Then do “su hadoop -“. Now try “hadoop dfsadmin -report”. You’ll see how many nodes you have, and how much disk space is available. You’ll want to keep close tabs on that, when you start slinging real data around. “hadoop job” will show you a few useful commands, that will allow you to see how far the jobs are from completion. There’s a web interface, but we find ourselves frequently having to restart Firefox and a proxy to get that to work. We’re probably just doing something stupid in that area, but the command line always works, so we’ve gotten used to relying on that.
  • Inspect things a bit more. $HADOOP_HOME (/usr/lib/hadoop, if you’re following along with these procedures) has files that will show you that what you’re running is (for now) Hadoop version 0.20.1+152, which means Apache Hadoop version 0.20.1, plus 152 patches that Cloudera has applied or backported to the Apache release. Thank you, Cloudera! Follow the trail in /etc/yum.repos.d/cloudera-testing.repo, and unpack the rpm if you’re curious. Just reading the list of what’s involved makes you think you wouldn’t want to be running this without them.
  • Try streaming, or Pig, or Hive, or java-based map-reduce. A trivial streaming example will be the easiest to get working. Take a text file, put it in hdfs (“hadoop fs -copyFromLocal filename.txt /user/test/input/filename.txt”), and use “cat” as your mapper and “wc -l” as your reducer.
  • Modify your environment a little, to make things tolerable. My .bashrc contains this section:

    alias hcat='hadoop fs -cat'
    alias hcp='hadoop fs -cp'
    alias hget='hadoop fs -get'
    alias hgetmerge='hadoop fs -getmerge'
    alias hls='hadoop fs -ls'
    alias hmd='hadoop fs -mkdir'
    alias hmv='hadoop fs -mv'
    alias hput='hadoop fs -copyFromLocal'
    alias hrm='hadoop fs -rm'
    alias hrmr='hadoop fs -rmr'

HOW TO RUN A MORE SUBSTANTIAL EXAMPLE

Cloudera has a great example of web server log file processing with Pig and streaming here: http://www.cloudera.com/blog/2009/06/17/analyzing-apache-logs-with-pig/. It illustrates a bunch of very useful techniques, including streaming with Perl and Perl modules, basic Pig, and Pig User-Defined Functions. There are two things that don’t work about it. We spent some time getting it to really work, to make sure we understood what all was involved, and we were better off for it.

First, a typo/omission. This line:

DEFINE iplookup `ipwrapper.sh $GEO`
ship ('ipwrapper.sh')
cache('/home/dvryaboy/tmp/$GEO#$GEO');

needs to have a ‘ship’ statement for ‘geo-pack.tgz’ (available on that page for download), like so:

DEFINE iplookup `ipwrapper.sh $GEO`
ship ('ipwrapper.sh')
ship ('geo-pack.tgz')
cache('/user/tmp/$GEO#$GEO');

/user/tmp is an hdfs location, and those files need to be in the directory you are launching the script from. You’ll need to set this up by taking the GeoLiteCity.dat file they link to and putting it into hdfs (hadoop fs -copyFromLocal). Everything you need for the DEFINE needs either to be there in hdfs already (‘cache’, which then makes it available locally to every node), or to be sent from a local file system location as you run the script (‘ship’).

This example also uses the Piggybank, which is a collection of user-defined functions for Pig. If you’re oriented towards Hive, you may not want to bother with this example. Since we’re a java-enabled group, Pig is a reasonable tool for us, and the Piggybank has some very useful functions, beyond what’s built into Pig itself. The Cloudera distribution comes with Pig by default, but not the Piggybank. So we checked out branch-0.4 of Pig, built the Piggybank, and then started extending and adapting functions from it. For example, our Apache logs have an extra field at the end, so the combined Apache log parser was useful, but needed an extension to work for us. The DateExtractor thing is there in the PiggyBank and works, but the IsBotUA function, as the author says, has not been contributed yet, so that was a good exercise for us. After detailed study of our web logs, we have our own ideas about what constitutes a bot anyway, so we would have had to modify whatever they might have done.

While just playing around, we did our extensions directly in the Apache tree, but eventually we set up a whole java/Maven dependency thing, because we have a bunch of other java that we’ve built up over time, that we want to apply to our logs. So we combined that existing code base with our new Pig examples, set up a “mvn compile assembly:single” sort of build process, and took the resulting fat jar (jar with all the dependencies included) for the Pig ‘register’ statement. That gives us our code, with the Piggybank included, for easy reference in Hadoop/Pig scripts and MR jobs. In either Pig or Hive, which are basically SQL-query-optimizer-like (Pig) and SQL-like (Hive) interfaces to map-reduce, you can use built-in keywords to take advantage of scripts that will be integrated into the processing through Hadoop’s streaming API. Pig’s ‘stream’ and Hive’s ‘transform’ play comparable roles, within the shells or scripts. We haven’t really gotten into Hive, just because we have such a big java code base, and it seemed more natural to hook in through either raw java MR or Pig. If we were more pythonic or perlish, we’d probably be doing MR in Python/Perl, and using Hive for joins. Hive has a ‘JOIN’ keyword, as does Pig, and Pig’s COGROUP + FLATTEN gives you much the same thing as well. A declarative join is one of the most helpful things that Pig and Hive provide, but if you don’t like theirs, you can just write a special-purpose one yourself in MR, whether streaming or not. It might seem, because of the basic looping constructs, that Pig has more of an actual programming language built in than Hive, but that’s not really true. Even to do simple loops or list processing of anything beyond the lists that Pig can create for you, we have had to create a few Python scripts that call Pig. This is by design on the Pig side, according to mailing list chatter: they don’t want to be re-inventing the wheel. Cloudera’s example shows how you can call Perl/Python/Bash/whatever scripts from inside Pig, when you need to. We’re not trying to knock Pig here–it does a lot of useful things for us. We’re just pointing out that it won’t answer all your questions on its own. You’ll get used to cobbling together whatever pieces are comfortable for you pretty quickly, if you have a reason to. If you have been processing log files for a while with Perl, Python, C, Java, or Language X, there are typically ways to adapt what you have, and plug it into Hadoop.

HOW TO PROCESS WEB SERVER LOGS

There are some great tutorials out there for processing Apache logs with Hadoop. We went about looking at our logs in the following way.

  • We grabbed some of our Apache logs from S3, where we keep them. These were in .bz2 format, and that requires a little work. The main configuration file for Hadoop (although it’s now deprecated because it’s been split into three separate ones), is called hadoop-site.xml. You will get a message about this deprecation constantly. Ignore it. The config is embedded in the hadoop-ec2 scripts, in the file hadoop-ec2-init-remote.sh. Search for “Gzip” and you’ll find the line that sets up the compression handlers. Add “,org.apache.hadoop.io.compress.BZip2Codec”. The next cluster you start up will be BZip2-enabled. To make configuration changes on running clusters, we have been editing the file on the name node and then doing something like “slaves.sh scp user@namenode:/usr/lib/hadoop/conf/hadoop-site.xml /usr/lib/hadoop/conf/hadoop-site.xml”, which seems to work fine. The BZip2 Codec didn’t actually work for us, though, so we re-zipped the files with gzip.
  • A word on compression in Hadoop: it just works, behind the scenes, and Hadoop takes care of it for you. To prove this to yourself, try a cat/wc example, as above, with a gzipped text file. That doesn’t work with zcat, but it works just fine with cat. There are a lot of nuances, and fancy ways you can set up compression with different codecs, but your gzipped files are usable in Hadoop, and that was good enough for us, to get started.
  • A compressed daily file for a single web server for us is smaller than the default hdfs block size of 64MB. Processing files that small apparently leads to all sorts of suboptimal behavior in Hadoop, so when we started to get mysterious errors on 3 months worth of log files, which weren’t showing up on a single-file test, we catted a bunch of compressed files together, so they would be closer to the size that Hadoop wanted. The failing jobs just started working! Yikes! Rollicking ferment, like we said, in the community, apparently means a certain amount of trail and error for us! In the end we have an etl job taking our Apache logs and putting the partially transformed data into tab-delimited format. We’re doing that with a regular expression wrapped inside a Pig UDF, based on the one in the Piggybank. If you prefer to work with Python and Hive, or whatever, presumably you can plug in the same regex there.
  • There are ways to make Hadoop use an ‘S3’ file system, or an EBS volume. We’ll try those when we’re feeling adventurous. For now we just transfer the files from S3 with s3cmd, and load them into hdfs. We figure we’re getting smoother and faster processing afterwards, at the cost of longer startup for our cluster. That’s just a hunch that we haven’t tested, though.
  • While we were at it, we wrote some more Pig UDFs to do things like turning Apache date formats into seconds since the UTC/GMT unix epoch, so we could compare records in the Apache logs with records from the database, and have our session/visit analysis hold up on Daylight Savings Time change days, without too many coding gymnastics. MySQL date formats are of course missing any stored notion of time zone, so the best way to deal with them in MySQL is to record everything as some sort of longish int, before or after the epoch. Apache logs typically do not have this limitation, but we’re going to the lowest common denominator.

HOW TO PROCESS MYSQL DATA

There are some different ways to get data from MySQL into Hadoop. The Sqoop (sql-to-hadoop) project (in the contrib area of mapreduce) provides ways to do this with both jdbc and mysqldump, if you have a running MySQL instance. You could certainly get Sqoop to do what we did, but we actually went about this in a slightly different way. We are using Hadoop as a batch processor, and we already had our mysqldump files archived in S3. When we want to do longitudinal studies of any kind, we need a lot of that data. We didn’t spend much time thinking about standing up a MySQL that Hadoop could talk to, because that was clearly going to be a bottleneck, but rather decided to parse the stored mysqldump files (h/t Aaron Kimball of Cloudera for this suggestion). How to do that? A quick Google search led us to ‘mydp’, a little C program with configuration scripts in embedded lua, written by a fellow called Cimarron Taylor, here: http://cimarron-taylor.appspot.com/html/0901/090116-mydp.html. With the exception of this part of his Makefile:

CFLAGS = -g -I/usr/include/lua51
LIBS = -lfl -llua51 -llualib51 -lm -ldl

which we needed to change to

CFLAGS = -g -I/usr/include
LIBS = -lfl -llua -lm -ldl

this compiled and ran beautifully on CentOS as is. He’s developing on a Mac, and the references to lua libraries that work are a bit different from the way they need to be on CentOS. When I’m feeling more energetic, maybe I’ll create an rpm for this, or send him a gnutools patch or something, but for now, I can compile the binary for 32 or 64-bit Redhat (with lua-devel installed), and that’s good enough. Taking his lua lexer callback example, which outputs one row per data item, and swapping in this workhorse function:

function values(...)
for i=1, select('#', ...) do
local v = select(i,...)
if v == nil then
io.write("\t")
else
io.write(v, '\t')
end
end
print()
end

we get tab-delimited output, which is a good starting point for Hadoop processing. This part of the pipeline just dumps the contents of the table into hdfs with Hadoop streaming. If you did the cat/wc example described in part 1, this will be easy for you.

So now the data world is our oyster. We can look at web server logs and database info, and join across them. We can load in a year’s worth of data and get some quick counts of various things in a few minutes, with the right size cluster. We’ll go into more detail on customizations and techniques in future posts.

A note: as you’re working, watch the disk space. If it gets a bit short, you can add slave nodes with “hadoop-ec2 launch-slaves” (and all the same arguments you gave for the initial cluster launch). You might think you should then run “start-balancer.sh” to even things out. Maybe, but in our experience, at least as of now, and if you’re not careful, it will kill the existing nodes. Just add some slaves and let your jobs gradually balance hdfs. Watch memory on the name node. We have had some jobs fail, and broken them into smaller bits, run them again, and had them succeed. The script could use a patch to make the name node a bigger instance than the slaves.

That’s all for now. There’s a lot more to go into, in future posts.

5 Comments

  1. Alan Gates says:

    Pig has a JOIN keyword, and as of release 0.4 four different implementations of join. Users can select the join type that best fits their data. See http://hadoop.apache.org/pig/docs/r0.4.0/piglatin_reference.html#JOIN

  2. Ben Clark says:

    Alan, thanks. And right you are: corrected in the body of the post.

  3. […] left off, in part one of this series, at the point where we had Hadoop running with the Cloudera distribution, version 0.20.1+152. […]

  4. […] published some howtos here and here in the past, and a lot has changed over at AWS and Cloudera hadoop in the last year, so […]