This is Joe on Data.

Memory Management in Hadoop MapReduce

If you ever have to write MapReduce jobs or custom UDF or SerDe classes for Hive in Java, you will want to re-use memory as much as possible, meaning as few object and array allocations as possible, while also taking care not to inadvertently use/re-use data that is invalid or corrupted. This is an important practice for Java in general and any other language, but for MapReduce jobs it is particularly important because of the amount of data normally processed in MapReduce is generally large (i.e. big data). Any operations on the data, therefore, are called many many times causing small inefficiencies to cause an increase in processing time from minutes to hours in extreme cases.

The most important general rule to help you optimize memory re-use is that a single instance of just about any Java class you create for the MapReduce framework is only ever used by a single thread [unverified]. This goes for Mappers, Reducers, Hive UDFs, and Hive SerDes to name a few I’ve worked with.  What this means is that you can instantiate an instance variable at construction time and each time your map, evaluate, or whatever methods are called you can reset the instance (and all internal variables) to reuse it and its allocated memory.

For instance, if you use an org.apache.hadoop.io.Text as a map output key, you can create a single non-static final instance of a Text object in your Mapper class. Then each time the map method is called, you can either clear or just set the singular text instance and then write it to the mapper’s context. The context will then use/copy the data before it calls your map method again so you don’t have to worry about overwriting data being used by the framework.

Here is an example word count Mapper class to illustrate object re-use:

public static class MemoryTestMapper extends
      Mapper<LongWritable, Text, Text, IntWritable> {
  private static final IntWritable ONE = new IntWritable(1);

  private final Text outKey = new Text();

  @Override
  public void map( LongWritable key, Text value, Context context ) 
        throws IOException {
    outKey.set( value.toString().toUpperCase() );
    context.write( outKey, ONE );
  }
}

Note how ONE is static since it is not modified while outKey is non-static. Also note how both use the final keyword.

Here are a few things to watch out for:

  • Do not re-use static instances of mutable types or those that will be changed.
  • When iterating over the values in a reduce/combine phase, do not use the iterator’s value instances outside of the iteration. The MapReduce framework may re-use value instances across subsequent iterations. If you need data from a value in an iterator outside of the iteration, you must copy the data from the instance.
  • When accessing bytes in an org.apache.hadoop.io.Text, always obey the length of the bytes from the Text’s getLength() method. Never use the full byte array from getBytes() without limiting to the getLength().

I’ve been bitten by a few of these subtle details either because I wasn’t paying attention or because I didn’t understand how the instances were used. Hopefully this will help you write faster and more efficient Java for MapReduce while avoiding some of the subtle details of how the MapReduce framework works.

Amazon ElasticMapReduce and the Screen Command

How many times have you SSH’d into a remote GNU/Linux server, ran a long-running command, only to come back to a stuck or disconnected SSH session? GNU Screen to the rescue! GNU Screen is perhaps one of the most subtle, useful, and difficult to understand commands in the GNU/Linux command arsenal. The important thing it does for this discussion is create a terminal session from which you can detach, allowing you to log out from the server while your session persists and runs. Then when you log back in at a later time or from another client, you can re-attach the previous screen session exactly as if you never left. Screen acts like a container for your session, keeping it’s program(s) running, command history, and other aspects of the session.

The difficult part of learning to use GNU Screen is the key commands it uses. Screen commands all begin with control-a key sequence (abbreviated “C-a” from now on), followed by the screen command. The most important combination is “C-a d“. This means hold the control key while pressing and releasing the a key. You then can release the control key and press the d key. The “C-a d” command detaches the current screen session, bringing you back to where you ran the screen command. You can then attach the screen session again by running the command “screen -r“. There are many more screen commands for which you will have to consult the GNU Screen documentation (ex: man page).

The importance of GNU Screen for Amazon ElasticMapReduce is it lets you use the Hive interactive command line client, run a long-running query, and not lose the hive interactive session if you logout or the connection is interrupted. I’m surprised the Amazon AMIs for EMR don’t have this installed by default. So the first thing I have to do when running a Hive interactive job is install screen via:

$ sudo apt-get install screen

I should just script the EMR cluster creation, adding a step to install Screen. But I’ll leave that as an exercise for myself and the reader.

GNU Screen has many other useful features such as being able to multiplex multiple terminal windows, scrollback and copy/paste. Checkout the GNU Screen website or man page for more details.

Shell Analytics: command line tools for data analysis

Shell Analytics is when you use command line tools to analyse or manipulate data. Command line tools are an invaluable tool for working with data, specifically files or command line programs which output useful data. Not only can you quickly preview the first or last few lines of a file quickly and easily (head/tail), you can extract it (cat/grep/>), transform it (sed/awk), organize and count it (sort/uniq). Specifically, I am talking about command line tools on Linux using Bash, although they are available in Mac OSX. I am unsure of their availability or usability in Windows, so let me know if you have any insight there.

Here are a few use cases for shell analytics:

  1. Previewing the contents of a file. Often times when getting new data files and importing them into a database or for use in MapReduce jobs I will check the first few lines of a file to verify the file contents and structure or see if I need to remove a header line. What makes a tool like head uniquely qualified for this job is that it only reads the first few lines of a file, which is important when you have a multi-gigabyte file that would make most text editors explode. And it is fast no matter the size of the file.
  2. Looking for specific data. Whether you’re searching through log files looking for lines containing a particular session id or error message. Or you are trying to find a record in your input data that caused an error or issue in your MapReduce because of a bad data format. grep and regular expressions are your friend (although they don’t always do you you think they should be doing).
  3. Extracting subsets of data. Sometimes it is useful to run a MapReduce job on a smaller subset of input data to test and verify it’s functioning as you need or expect. Using the head command you can extract the first 1000 lines or whatever you need out of your larger data set. Also, it is sometimes useful to extract or suppress specific lines into a smaller data set using grep for further analysis or processing. Not only can you extract specific lines, but also specific parts of a line (sed) or specific columns in a delimited file (awk).
  4. Aggregation and counting. Using the uniq command (usually in combination other commands to format and organize the data) you can aggregate unique lines and count occurrences of values or number of unique values.

I/O Pipes

The killer feature is that multiple commands can be chained together with the pipe character (|) forming a data processing pipeline allowing complex data transformation operations in a single command. It is also fast because intermediate stages are not written to or read from disk (there are, however, exceptions to this rule). Where a pipeline gets performance bound is primarily by being disk bound from the reading of input files from disk or writing of output files to disk. Also, some operations on the data are time consuming and can be CPU bound for things like complex regular expression extraction or data compression/decompression.

Data pipelining (chaining multiple commands using a pipe) is possible because of two special file descriptors in Linux (and Unix-based systems); standard in and standard out. There is a third special file descriptor, standard error, but you don’t typically use it as data for data processing unless you are trapping and handling errors in a certain way. When a program is run, each of the special file descriptors are made available to the program as standard file handles accessible in most programming languages. Standard in can be read from just like any file and standard out can be written two just like any file. You can take advantage of these standard file descriptors in your own programs and use them in concert with other command line tools.

The command line tools discussed here make specific use of these standard file descriptors to pass data to and receive data from other commands through I/O pipes. When you execute a command in Bash by naming one command with its arguments, appending the pipe character, and naming another command with its arguments, you implicitly connect the standard output of the first command to the standard input of the second command. Here is a simple example that reads the contents of a file and prints out each line in the file containing the characters “joe” (note: examples of executing commands in Bash will show the Bash prompt $ followed by the command(s) as you would enter and execute them in Bash):

 $ cat file.txt | grep 'joe'

The first command in a pipeline will always be a command which writes some data to standard out. The most common for performing shell analytics is the cat command, which reads the contents of a named file and writes it to standard out (it can do more, but that is the basics). Also useful is the head and tail commands, which read the first or last respectively N lines (default is 10) from a file (or standard in).

Useful Commands

Below are a few of the most useful commands for shell analytics which take advantage of reading/writing data from standard in/standard out. I will focus on use cases for shell analytics, but they have many other uses. To learn more about a command, a great resource is the command’s man page. Man is a command to read the manual (i.e. documentation) of another command. To read the man page for the cat command run “man cat“. Also helpful is Google. If you search for the command name and what you are trying to use it for, you can usually find some helpful information.

Each command will show example usage. I will leave it up to the reader to experiment with real data and variations on the commands and the combinations which can be made.

Cat

The cat command is like glue. The most common use is to read one or more files and write them to standard out. Most shell analytics commands will start with cat although it is not always necessary. A lot of commands you would use can read in a file on their own without needing cat. However, I like to use it because I can start by testing a data pipeline by using “head file.txt | ...” then it is easy to change head to cat to run it on the whole file.

Example: print the contents of a file.

 $ cat file.txt

Head / Tail

The head and tail commands both print N number of lines from either the head or tail of a file or standard in. The default number of lines is 10, but that can be overridden with the -n argument. There are a  number of other useful arguments you can find in the man page.

Example: print last 100 lines from a file.

 $ tail -n 100 file.txt

Grep

The grep command is like a filter. It will read a file or standard input and print lines matching (or not matching when using the -v flag) the supplied text or regular expression.

Example: print lines containing the string “joe” from the file file.txt

 $ grep "joe" file.txt

Example: print all non-empty lines except those starting with # from file.txt

 $ grep -v '^ *\(#.*\)\?$' file.txt

Notes: It is usually good to wrap regular expressions for grep or sed in single quotes to avoid inadvertently referencing Bash variables. Also, the single biggest annoyance I find with regular expressions, particularly from the shell, is knowing or finding what characters to escape or not escape. It sometimes takes trial and error to get the regular expressions right.

Sed

The sed command is a stream editor for filtering or transforming text. I almost always use it for extracting or replacing parts of a line. For that you will use the expression  s/REGEX/REPLACEMENT/[flags]. Your regex can match partial lines, in which case the replacement will only replace the matched substring, leaving the remaining line as it was. Or it can match hole lines and the whole line will be replaced. See the sed manual for more detailed documentation.

Example: replace all occurrences of the string “joe” with the string “bob”.

$ sed 's/joe/bob/g' file.txt

Example: extract the number of milliseconds (a number followed by “ms”) from a log file.

$ sed 's/^.*\([0-9]\+\)ms.*$/\1/' file.log

Awk

The awk command is actually a feature rich programming language. However I only use it to parse and extract fields from CSV files. For more complicated processing I would move up to using Python rather than a more complicated awk script. The key for extracting fields is to specify the field separator with the -F argument. Then you can print fields by index to extract a single field or a subset of fields, maybe putting them in a different order. See the awk manual for more detailed documentation.

Example: print the 3rd field from a CSV.

$ cat file.csv | awk -F ',' '{print $3}'

Example: output a CSV of fields 6 then 3.

 $ cat file.csv | awk -F ',' '{print $6","$3}'

Sort

The sort command simply sorts lines from standard in or 1 or more files. This can be helpful to view or use sorted data, but I primarily use it for sorting lines for use by the uniq command (described in next section).

Example: sort a list of names

 $ sort names.txt

Uniq

The uniq command will aggregate matching adjacent lines from the input file or standard input. This is useful if you want to count the number of unique lines in a file (by piping output to the wc command, which counts lines, words, and/or characters). Also very useful is the -c flag, which also counts the number of occurrences of each unique line.

Example: count number of unique names.

 $ sort names.txt | uniq | wc -l

Example: print each uniq name and the number of times it occurs.

 $ sort names.txt | uniq -c

I/O Redirection

Normally you don’t want to just view the printed output of a series of piped commands, you want to save the data so that you can share it or view it later or so you can use it in another program. Maybe you want to import the data into a relational database like PostgreSQL or MySQL, maybe you want to use it as part of a MapReduce job, or maybe you want to create graphs from it using gnuplot or LibreOffice.

In order to capture the output of a command or series of piped commands, you use the > or >> I/O redirection symbols. The > symbol will redirect standard output of the preceding command to the file named after it, overwriting the file if it existed. The >> symbol will also redirect standard output of the proceeding command to the file named after, however if the file already exists the data will be appended to the file.

Example: write sorted, unique, names to the file unique_names.txt.

 $ sort names.txt | uniq > unique_names.txt

There are a number of other useful I/O redirection uses and syntaxes, however this is most of what you need for shell analytics.

Summary

The command line tools available on Linux and other systems can be powerful, flexible, and fast tools for performing data analysis or preparing data for further analysis. Through the use if I/O pipes and I/O redirection, one can perform complex data operations more quickly and easily than through other methods.

Playing With D3.js

D3.js is a “JavaScript library for manipulating documents based on data.” D3 works a little like jQuery, but is focused on providing a set of tools for binding data to visual components and a rich set of plugins for many visualization features beyond simple line, bar, or pie charts. Take a look at the gallery for an amazing variety of use cases. D3 is perfect for building custom and creative visualizations to expose and explore new insights into data.

As a fairly simple but interesting way to familiarize myself with D3, I decided to put together a demo which illustrates some of its capabilities. The demo draws an SVG image with a series of circles. These circles are hooked up to a data set giving position and velocity to each circle. I created an animation loop which updates the data set to move the position of the circles based on their velocity and a redraws the circles at the new positions using the bindings to the data. To make the demo a little more interesting, I added a different kind of circle in yellow which follows the mouse. This circle adds acceleration proportional to the inverse of the square of the distance (i.e. gravity).

This demo is more like a game than a visualization, but I think what it illustrates well is the binding and manipulation of data and having that represented visually. I have an idea for my next demo of D3 which explores a novel way of representing multi-dimensional data in a way that is easy to visually pick up on variations and outliers. So stay tuned for that.

You can view the demo here (And yes the name “Gravity Balls” is horrible. I ran out of creativity.). Feel free to view or copy the source for your own uses. Note: requires a modern browser such as the latest Google Chrome, Firefox, or Internet Explorer 9 (I believe).

Big Data is CRAP

I was watching an interesting “Leaders in Big Data” panel and one of the panelists, Charles Fan from VMWare, had a great name for Big Data (or what you do with it):

Create Replicate Append Process

Very fitting.

If you are interested you can find the video here.

Picking the Right Database for Your Application

One of the first things you need to decide when building a new application or major feature is how you are going to store and process the data for it, which means picking the right database for the job. This is, of course, assuming you will need to store and retrieve data and your storage needs are non-trivial. In this post I will attempt to walk through the various options and decisions you must make while selecting a database.

What makes this decision difficult is that you have a lot of options and there is a lot of variety to those options. There are relational databases, NoSQL databases. Transactional, non-transactional. Row-oriented or column-oriented. Community supported or commercial. And there are many subtle differences between each of those options. Since there are really too many options to cover everything in detail, my goal here is to give you a better understanding of how to evaluate your needs and match them to the right database.

Since practically all my experience is with open source databases, I will stick to what I know and just talk about that. I apologize to all you folks out there who insist on spending money to make you feel better about your decisions (cough, Oracle. cough, Windows…). Also, I want to avoid holy wars over your personal favorite database. There isn’t one database for every application or situation. So try to keep an open mind and objectively evaluate all options.

In order to evaluate which database to use, you must first understand your application and how it uses data. Here is a short list of key usage characteristics to consider:

  • Storage size — How much data will your application use and store?
  • Read:Write ratio — Will your application be read-intensive, write-intensive, or fairly balanced?
  • Concurrent users — How many concurrent users will access your application or do you need your application to support? i.e. how much traffic or database transactions per second do you expect or need to plan for?
  • Data model — What is your data model and how is it used?
  • Security, stability, and availability — How much downtime or data loss can you tolerate? How sensitive is the data you are storing (financial or personal information)? Are there service-level agreements (SLAs) you must maintain?

With that knowledge of your application, you can better evaluate your database options and how they apply to your situation. Here are the three top factors that will help you pick the right database:

  1. What databases are you or your team experienced with?
  2. How well does your data model fit into the databases you are considering? Do you need a more flexible model?
  3. Will you need to split your database across multiple servers?

Previous Experience

There is a lot of weight in your experience. A hot new database is great, but if you already have a lot of experience with a particular database like MySQL then you need to have a good reason to try something new. You have to be prepared to not only develop your application for a new database but to support and maintain it once it is in production. A new hot database also hasn’t been around as long for bugs to be found and fixed. It therefore may not be as mature and stable as something more traditional like MySQL or PostgreSQL. So you may run into more bugs, crashes and maybe even data loss. Even though there are ways to handle and prepare for failures, you have to understand and prepare for working with something unknown.

A challenge too, when deciding between relational databases or NoSQL databases, is that the APIs for NoSQL databases are very different than for relational databases. Relational databases are fairly universally supported through standard APIs like JDBC (Java Database Connectivity) for Java or DB-API for Python with standard interfaces for executing SQL and processing results. NoSQL databases, on the other hand, by definition do not use NoSQL and have database-specific APIs.

Data Model

The data model is important when considering a database, especially when you get into the world of NoSQL databases. They typically have more flexible data models and sometimes fit better with your application. With a relational model, there is an impedance mismatch when trying to map it to classes in an object-oriented programming language.  ORM (object-relational mapping) frameworks make this easier, but they tend to have a lot of performance overhead and some are difficult to setup and maintain. NoSQL databases, on the other hand, usually matche the data model in the application much more closely with the data model in the database. Between the flexibility of the data model and being able to better represent that in the application, NoSQL databases can be very good choices.

There is also another aspect of NoSQL databases which is important to consider. That is NoSQL databases tend to not be as feature rich as mature relational databases. Part of the reason is they are relatively new and the developers just haven’t had the time or resources to add certain features. Sometimes the reason is the feature is really hard or impractical to implement at the scale or in the distributed nature of the database. A few features that some NoSQL databases lack are:

  • Transactions — This tends to be a feature NoSQL databases leave out.
  • Indexes on fields other than the primary key — MongoDB has great index support, just as you would expect from a mature relational database, but databases that are key-value stores like Redis or modeled after Google BigTable like HBase are only queryable by their primary key.
  • Foreign Keys — Also tends to be a feature NoSQL databases leave out. MongoDB has a convention for referencing other documents, but there is no constraint that is maintained by the database preventing broken foreign keys.
  • Joins — Along the same line as foreign keys, joins are not typically supported. One way to get around this is to denormalize the data, embedding referenced data withing the referencing document. This doesn’t always work well, but in some cases can greatly improve database performance.

Distributing Across Multiple Servers

There are several ways to distribute your database across multiple servers, but they fall into one of two categories. The first is replication, where the entire database is replicated, or copied, and maintained consistently across every server in the cluster. Basic replication involves one master server capable of reading and writing data and several slave servers which are read-only. The slave servers are beneficial when your application has high read to write ratio, so that the majority of read operations can query from the slave servers and thus distribute the query load. Some database servers can run in a multi-master mode where more than one server can act as a master and be written to and read from at the same time. The limitation of replication is that the data set must fit in each server, so it is limited to the size of your smallest server. The benefit of replication is distributing the processing of queries across multiple servers.

The other main category of distributing across multiple servers is partitioning, where the data is split across the servers. This gives the benefit of distributing the data across the cluster so no one server contains all data for the application. Usually there are three copies of any given record, so if one database server fails, there are two copies available so there is no data loss. With replication schemes, the entire data set must fit within each server, so it is limited in how big the database can scale. Most NoSQL databases primarily use a form of partitioning (some, like MongoDB, call it sharding), while some offer both partitioning and replication (MongoDB employs both to provide the highest availability and reliability). This makes NoSQL databases very good at huge data sets.

One important thing to note about dealing with distributed database systems is that there are always tradeoffs made due to the many challenges in providing a consistent, available, and fault tolerant system. Many NoSQL databases, when fully distributed, follow an eventually consistent model, which means data written to one server in the cluster is not always immediately visible when read from another server in the cluster. Often times there are ways to ensure more consistency if you need it, but it is usually something you need to be aware of or configure when working with a distributed database.

Conclusion

Perhaps the biggest difference between various database options is to choose a relational database or a NoSQL database. Here is a quick rundown of the pros and cons:

Relational NoSQL
Pros Cons Pros Cons
Well supported Performance drops at very large data sets Flexible data model Not as mature
Well understood Limited or challenging scalability Highly scalable Non-standard APIs
Fast and easy to setup Cumbersome to code against Easy to code against Not as feature-rich

Beyond the relational vs NoSQL decision, the choices get more subtle. In terms of relational database options, it is basically down to MySQL vs PostgreSQL. I prefer PostgreSQL, but they are very closely matched.

For a detailed review of the features, pros, and cons of many of the NoSQL databases out there check out this blog post by Kristof Kovacs. He does a much better job than I could do here. However, my personal favorite NoSQL database is MongoDB. I have used it for a very high volume application with huge amounts of data in a fully clustered environment and have been very happy with its performance.

I hope you have enjoyed this topic and that is able to help you pick the right database for your application.

Intel Launches Hadoop Distribution and Project Rhino

Intel apparently is launching it’s own distribution of Hadoop as well as Project Rhino. Project Rhino
is an “open-source effort to enhance security in Hadoop,” which makes Hadoop a more viable option for highly sensitive data. The Intel Hadoop distribution aims to optimize Hadoop for Intel Xenon platforms. I’m not convinced we need another distribution of Hadoop. Hopefully the Intel Xenon optimizations will be available upstream so other distributions can take advantage of it.

Source: TechCrunch [link]

Welcome

Welcome to my new blog, Joe on Data. Hopefully the title of the blog is explanatory enough for what it is about. In it I will be writing about data; how to manage it, how to store it, how to process it, and what you can do with it. Data is everywhere and data is crucial in so many ways to so many things. A lot of people have a lot of it, but don’t always know what to do with it or how to use it effectively. I hope you will find the posts here interesting, informative, and helpful so you can get the most out of your data.