Why Should You Care About MapReduce?

Java Development News:

Why Should You Care About MapReduce?

By Eugene Ciurana

01 Feb 2008 | TheServerSide.com

What is MapReduce?

MapReduce is a distributed programming model intended for processing massive amounts of data in large clusters, developed by Jeffrey Dean and Sanjay Ghemawat at Google. MapReduce is implemented as two functions, Map which applies a function to all the members of a collection and returns a list of results based on that processing, and Reduce, which collates and resolves the results from two or more Maps executed in parallel by multiple threads, processors, or stand-alone systems. Both Map() and Reduce() may run in parallel, though not necessarily in the same system at the same time.

Google uses MapReduce for indexing every Web page they crawl. It replaced the original indexing algorithms and heuristics in 2004, given its proven efficiency in processing very large, unstructured datasets. Several implementations of MapReduce are available in a variety of programming languages, including Java, C++, Python, Perl, Ruby, and C, among others. In some cases, like in Lisp or Python, Map() and Reduce() have been integrated into the fabric of the language itself. In general, these functions could be defined as:

List2 map(Functor1, List1);
Object reduce(Functor2, List2);

The map() function operates on large datasets split into two or more smaller buckets. A bucket then contains a collection of loosely delimited logical records or lines of text. Each thread, processor, or system executes the map() function on a separate bucket to calculate a set of intermediate values based on the processing of each logical records. The combined results from all of these should be identical as having been executed as a single bucket by a single map() function. The general form of the map() function is:

map(function, list) {
  foreach element in list {
    v = function(element)
    intermediateResult.add(v)
  }
} // map

The reduce() function operates on one or more lists of intermediate results by fetching the each from memory, disk, or a network transfer and performing a function on each element of each list. The final result of the complete operation is performed by collating and interpreting the results from all processes running the reduce() operations. The general form of the reduce() function is:

reduce(function, list, init) {
  result = init
  foreach value in list {
    result = function(result, value)
  }
  outputResult.add(result)
}

The implementation of MapReduce separates the business logic from the multi-processing logic. The map() and reduce() functions across multiple systems, synchronized over shared pools and communicating with one another over some form of RPC. The business logic is implemented in user-definable functors that only work on logical record processing and aren’t concerned with multiprocessing issues. This enables quick turnaround of parallelized processing applications across massive numbers of processors once the MapReduce framework is in place because developers focus their effort on writing the functors. MapReduce clusters are reused by replacing the functors and providing new data sources without having to build, test, and deploy a complete application every time.

Implementation of MapReduce

MapReduce is intended for large clusters of systems that can work in parallel on a large dataset. Figure 1 shows a main program, running on a master system, coordinating other instances of itself that either execute map() or reduce(), and collates the results from each reduce operation.

The main application is responsible for splitting the basic dataset into buckets. The optimal bucket size depends on the application, the number of nodes, and the I/O bandwidth available. These buckets are normally saved to disk, but could be split in main memory if appropriate, depending on the application. The buckets will become the input into the map() functions.

The main application is also responsible for launching or forking several copies of the MapReduce core, all of which are identical except for a controller that assigns and coordinates map() or reduce() tasks to idle processes or threads. The controller keeps track of each map() and reduce() tasks state (waiting, running, complete) and may act as a conduit for routing the intermediate results between map() and reduce() tasks.

Each map() task processes an assigned bucket through and produces a list of intermediate results that are stored into a shared memory area. The shared memory can be designed in the form of a distributed cache, disk, or other. The task notifies the controller when a new intermediate result has been written and provides a handle to its shared memory location.

The controller assigns reduce() tasks as new intermediate results are available. The task sorts the results by application-dependent intermediate keys implemented through comparators so that identical data are grouped together for faster retrieval. Very large results may be externally sorted. The task iterates over the sorted data and passes each unique key and collated results to the users’s reduce() functor for processing.

Processing by map() and reduce() instances ends when all buckets are exhausted and all reduce() tasks notify the controller that their output has been generated. The controller signals the main application to retrieve its results. The main application may then operate on these results directly, or re-assign them to a different MapReduce controller and tasks for further processing.

Real world implementations of MapReduce would normally assign controllers, map(), and reduce() tasks to a single system. The Google operational model is based on deploying MapReduce applications across large clusters of commodity systems, or white boxes. Each white box has its own local storage required for processing of its bucket, a reasonable amount of primary memory (2 to 4 GB RAM), and at least two processing cores. White boxes are interchangeable, and the main application may assign any machine in the cluster as the controller, and this one in turn assigns map() or reduce() tasks to other connected white boxes.

Java-based MapReduce Implementation

The Google environment is customized for their needs and to fit their operational model. For example, Google uses a proprietary file system for storing files that’s optimized for the type of operations that their MapReduce implementations are likely to perform. Enterprise applications, on the other hand, are built on top of Java or similar technologies, and rely on existing file systems, communication protocols, and application stacks.

A Java-based implementation of MapReduce should take into account existing data storage facilities, which protocols are supported by the organization where it will be deployed, the internal APIs at hand, and the availability of third-party products (open-source or commercial) that will support deployment. Figure 2 shows how the general architecture could be mapped to robust, existing Java open-source infrastructure for this implementation.

This architecture assumes the presence of tools such as Terracotta or Mule that are common in many enterprise setups, and the availability of white boxes in the form of physical or virtual systems that can be designated as part of the MapReduce cluster through simple configuration and deployment. A large system may be partitioned into several virtual machines for efficiency, and assign more or fewer nodes as needed. Capacity issues and processor availability can help determine whether to use physical white boxes, virtual machines, or a combination of both in the cluster.

The Terracotta clustering technology is a great choice for sharing data between the map() and reduce() tasks because it abstracts all the communications overhead involved in sharing files or using RPC calls to initiate processing of the results. The map() and reduce() tasks are implemented in the same application core, as described in the previous section. The data structures for sharing intermediate result sets could be kept in memory data structures that are in turn shared transparently by Terracotta. Interprocess communication issues disappear to the MapReduce implementers since the Terracotta runtime is in charge of sharing those data structures across the cluster with MapReduce that uses its runtime. Instead of implementing a complex signaling system, all the map() tasks need to do is flag intermediate result sets in memory and the reduce() tasks will fetch them directly.

Both the controller and the main application are likely to be in a wait state for a while even with the massive parallelization available through the MapReduce cluster. Signaling between these two components, and between the reduce() tasks and the controller when reduction is complete, is done over the Mule ESB. In this manner, results output could be queued up for processing by other applications, or a Mule service object (or UMO) can take these output results and split them into buckets for another MapReduce pass, as described in the previous section. Mule supports synchronous and asynchronous data transfers in memory, across all major enterprise protocols, or over raw TCP/IP sockets. Mule can be used to move results output between applications executing in the same machine, across the data center, or in a different location entirely with little programmer participation beyond identifying a local endpoint and letting Mule move and transform the data toward its destination.

Another Java-based implementation could be through Hadoop, a Lucene-derived framework for deploying distributed applications running on large clusters of commodity computers. Hadoop is an open-source, end-to-end, general-purpose implementation of MapReduce.

Conclusion

Indexing large amounts of unstructured data is a difficult task regardless of the technologies involved. Traditional methods of applying dedicated algorithms and heuristics result in hard to maintain, unwieldy systems with performance degradation over time. RDBMSs are optimized for indexing and searching of large structured data sets but they are inadequate for unstructured data. MapReduce provides a simple, elegant solution for data processing in parallelized systems with these advantages:

  • Reduced Cost
  • High programmer productivity since the business logic is implemented independently of the parallelization code
  • Overall better performance and results than using traditional RDBMS techniques or custom algorithms/heuristics
  • Ease of deployment using known techniques and existing tools that are familiar to enterprise Java architects and developers

Google has an impressive track record with MapReduce, and more tools appear every day that can ease its adoption for mainstream, enterprise-class applications. If you’re ready to give it a shot, start with a simple task like analyzing the traffic patterns into your Web cluster based on the IP address of the requesters, or something similar. An exercise like this will be a great way of familiarizing yourself with the issues and opportunities of MapReduce in preparation to use it for your mission-critical applications.

About the Author

Eugene Ciurana is the Director of Systems Infrastructure at LeapFrog Enterprises, an open-source software evangelist, and a contributing editor to TheServerSide.com. He can be found in several places across the IRC universe (##java, #awk, #esb, #iphone) under the /nick pr3d4t0r.