This is Joe on Data.

Memory Management in Hadoop MapReduce

If you ever have to write MapReduce jobs or custom UDF or SerDe classes for Hive in Java, you will want to re-use memory as much as possible, meaning as few object and array allocations as possible, while also taking care not to inadvertently use/re-use data that is invalid or corrupted. This is an important practice for Java in general and any other language, but for MapReduce jobs it is particularly important because of the amount of data normally processed in MapReduce is generally large (i.e. big data). Any operations on the data, therefore, are called many many times causing small inefficiencies to cause an increase in processing time from minutes to hours in extreme cases.

The most important general rule to help you optimize memory re-use is that a single instance of just about any Java class you create for the MapReduce framework is only ever used by a single thread [unverified]. This goes for Mappers, Reducers, Hive UDFs, and Hive SerDes to name a few I’ve worked with.  What this means is that you can instantiate an instance variable at construction time and each time your map, evaluate, or whatever methods are called you can reset the instance (and all internal variables) to reuse it and its allocated memory.

For instance, if you use an org.apache.hadoop.io.Text as a map output key, you can create a single non-static final instance of a Text object in your Mapper class. Then each time the map method is called, you can either clear or just set the singular text instance and then write it to the mapper’s context. The context will then use/copy the data before it calls your map method again so you don’t have to worry about overwriting data being used by the framework.

Here is an example word count Mapper class to illustrate object re-use:

public static class MemoryTestMapper extends
      Mapper<LongWritable, Text, Text, IntWritable> {
  private static final IntWritable ONE = new IntWritable(1);

  private final Text outKey = new Text();

  @Override
  public void map( LongWritable key, Text value, Context context ) 
        throws IOException {
    outKey.set( value.toString().toUpperCase() );
    context.write( outKey, ONE );
  }
}

Note how ONE is static since it is not modified while outKey is non-static. Also note how both use the final keyword.

Here are a few things to watch out for:

  • Do not re-use static instances of mutable types or those that will be changed.
  • When iterating over the values in a reduce/combine phase, do not use the iterator’s value instances outside of the iteration. The MapReduce framework may re-use value instances across subsequent iterations. If you need data from a value in an iterator outside of the iteration, you must copy the data from the instance.
  • When accessing bytes in an org.apache.hadoop.io.Text, always obey the length of the bytes from the Text’s getLength() method. Never use the full byte array from getBytes() without limiting to the getLength().

I’ve been bitten by a few of these subtle details either because I wasn’t paying attention or because I didn’t understand how the instances were used. Hopefully this will help you write faster and more efficient Java for MapReduce while avoiding some of the subtle details of how the MapReduce framework works.

Post a Comment

Your email is kept private. Required fields are marked *