Archive for the ‘mapreduce’ Tag

Using Map/Reduce for Sorting

In my previous post – Demystifying Map/Reduce – I had talked about what is Map/Reduce and a couple of its applications : word counting and PageRank. In this post I will try to go over a couple of sorting applications of Map/Reduce.

Let us imagine that we have a huge dataset (i.e. 100s of files, and each file itself is also quite big) of integers that we have to sort. One can use any number of sorting algorithms from literature including external sort (see previous post) to sort these files without assuming anything about the data. Now if the data itself were not widely distributed i.e. the integers lie between a certain range and this range is quite small compared to size of the data, then we can use Map/Reduce. Let us see why with the help of an example.

Let us assume that our data set (integers) is constrained between 100 to 200 and we have 5 files each containing 1000 random integers between 100 and 200 (so a total of 5000 integers between 100 and 200). We read each file into a Map and then in the Reduce phase, we produce a final Map which contains the count of all the integers. Now if we sort all the integers from the final Map and output it
into a list data structure in the form of <Integer, Count> then we have sorted all the data (see figure below). Aside : In Java, you don’t even have to come up with the data-structure that I am talking about, if you just use a TreeMap in the final Reduce phase, then all the keys (i.e. data) are already sorted as long as the key type (e.g. String, Integer, etc.) implements the Comparable interface (Hadoop has something similar called WritableComparable and I am using a TreeMap that takes Strings as keys in Reducer.java).

Sorting using Map/Reduce

Sorting using Map/Reduce

What is the complexity of the above sorting algorithm ? The Map phase is an order “n” algorithm (where n is the size of the data). The reduce phase is an order “m” algorithm where “m” is the number of unique integers in our data set. The sort phase after the Reduce phase will be an order “mlogm” operation (if use a sort algorithm like heap sort). Now if “m” is small compared to “n” (e.g. the size of the data set is 100,000 and the actual number of unique integers is only 100), then the complexity of the Reduce phase and final sort phase is actually quite small compared to the Map phase. So the total complexity of a Map/Reduce phase is of order “n” if the number of unique integers is quite small compared to the size of the total number of integers to be sorted. However, if the number of unique integers is comparable to the size of the data, then the complexity of the Reduce phase and the final sort phase is no longer small (compared to the complexity of the Map phase) and hence it is better to use a traditional sort algorithm instead of using Map/Reduce (to avoid the overhead of the additional order “n” Map phase).

The Map/Reduce project has an example that reads integers from five files (each containing 5000 integers) and sorts them. The total number of unique integers is 20 and the figure below shows the output of the result in “Integer (Count)” format. As one can see the output is sorted and the sum of counts adds up to 25,000 (the size of the data set – 5000 integers in 5 files). It is a small and trivial example but I hope you find it useful to understand the application of Map/Reduce to sorting.

Sort result using Map/Reduce

Sort result using Map/Reduce

Until next time. Cheers !!

The Map/Reduce design pattern demystified

Over the last 5-6 years there has been quite a lot of buzz about the Map/Reduce design pattern (yes, it is a design pattern), specially after the famous paper from Google. Now there is an open-source project – Hadoop – that helps you implement Map/Reduce on a cluster, Amazon’s EC2 offers Map/Reduce, Cloudera offers commercial support for Hadoop, and so on.

But what really is Map/Reduce, and that is what I will try to answer in this post.

Basic Concept

Let us start with an example of external sorting. Let us say you have a large file (say about 100 GB) of integers and you have been given the job of sorting those integers but you have only access to 1GB of memory. The general solution is that you will read 1GB of data into memory, do an in-place sort (using any sorting algorithm of your choice), write the sorted data out into a file, and then read the next 1GB of data from the master file, sort them, write the data out, and so on and so forth. After this process is finished, you will end up with 100 files (1GB each) and the data in each file is sorted. Now you will do a merge sort i.e. read an integer from each of the 100 files, find out which is the minimum integer, and write that integer to a new file, and we keep doing this until all the data from the 100 files are read. The new file will contain integers that are all sorted. The image below tries to provide an overview of the above algorithm (for more details take a look at Knuth’s fascinating discussion on external sorts in his classic : The Art of Computer Programming Volume 3, Sorting and Searching).

External Sorting

External Sorting

So what was the purpose of the above example ? The key pattern in the above example is that a huge task is broken down into smaller tasks, and each small task after it has finished produces an intermediate result, and these intermediate results are combined to produce the final result. That is the core of the Map/Reduce design pattern. The processing of small tasks to produce intermediate results is referred to as the Map phase, and the processing of the intermediate results to produce the final result is referred to as the Reduce phase. The key difference is that the Map/Reduce design pattern handles data as key-value pairs, and even the intermediate results are also produced as key-value pairs. Let us go over a couple of examples to understand what I mean by key-value pairs.

Examples

Word Counting :
You have been tasked to find out how many times each word occurs in a document. Very simple, you create a hashtable, and then you read each word from the document, and check if the word exists in the hashtable. If the word does not exist in the hashtable, you insert it (using it as the key) along with a counter that is initialized to 1 (this is the value). If the word exists in the hashtable, you get its counter, increment it by one, and insert the word back into the hashtable with the new counter value. After you have finished reading the document, you iterate over the keys in the hashtable, and for every key (i.e. each word), you lookup its value (the number of times it has occurred) and you have the word count for each word in the document. Now, let us say, you have to find out how many times each word occurs in a set of 100 books. The above process will be extremely slow and time consuming. An efficient solution would be to go through the above process for each book, and producing the word count for each book (the Map phase) and then processing the results (the Reduce phase) from all the 100 hashtables – one for each book – to produce the overall word count for all the 100 books (for details look at the video in the Implementation section below).

Page Rank :
Imagine that we have to parse about a million web pages (which is quite a small number considering the size of the World Wide Web) and we have to calculate how many times every URL occurs. For example, an article on Hibernate, might contain a link to Java 6, a link to the Hibernate home page, and a link to the MySQL website. For every such link, our job is to find out how many times does that link appear in these million web pages (I will call it the URL-Count, similar to word count). The higher the URL-Count of a specific URL, the more popular is that URL (this is the foundation of Google’s PageRank algorithm). Once again, we will divide this task into 100 Map phases, where every Map phase will look at 10,000 web pages. Every time a Map phase sees a URL in a web page, it will insert it into it’s hashtable and increment the counter associated with that URL (just like the above word count example). After all the Map phases are finished, each hashtable contains the URL-Count of all the URLs that occurred in each 10,000 webpage set. The Reduce phase iterates over each hashtable and combines the results (counters) of URLs that occur over multiple hashtables to produce the final URL-Count of each URL that occured in our million web pages.

Implementation

A simple implementation of the Map-Reduce design pattern consists of a Mapper interface that takes an InputStream as an input and returns a Map as an output. The actual implementation of this interface will know how to read and handle the contents of the stream – e.g. extracting words, removing punctuation, parsing URLs, etc. The Reducer class just aggregates the results from all the Map phases and produces the final result Map.

The Mapper interface –

public interface Mapper
{
	/**
	 * Parses the contents of the stream and updates the contents of the <code>Map</code>
	 * with the relevant information. For example, an implementation to count
	 * words will extract words from the stream (will have to handle punctuation,
	 * line breaks, etc.), or an implementation to mine web-server log files
	 * will have to parse URL patterns, etc. The resulting <code>Map</code> will contain
	 * the relevant information (words, URLs, etc.) and their counts.
	 * 
	 * @param is A <code>InputStream</code> that contains the content that needs to be parsed
	 * @return A <code>Map</code> that contains relevant patterns (words, URLs, etc.) and their counts
	 */
	public Map<String, Integer> doMap(InputStream is);
}

The Reducer class –

public class Reducer
{
/**
* Executes the Reduce phase of the Map-Reduce pattern by iterating over
* all the input Maps to find common keys and aggregating their results.
* Stores and returns the final results in the output Map.
*
* @param inputMaps A list of results from Map phase
* @return A Map that contains the final result
*/
public Map doReduce(List> inputMaps)
{
Map outputMap = new Hashtable();

int mapIndex = 0;

// outer loop – iterate over all maps
for (Map map : inputMaps)
{
mapIndex++;

Iterator it = map.keySet().iterator();

while (it.hasNext())
{
String key = it.next();

// Get the value from the current map
Integer value = map.get(key);

// Now iterate over the rest of maps. The mapIndex variable starts
// at 1 and keeps increasing because once we are done with all the
// keys in the first map, we don’t need to inspect it any more, and
// the same holds for the second map, third map, and so on.
for (int j = mapIndex; j < inputMaps.size(); j++) { Integer v = inputMaps.get(j).get(key); // if you find a value for the key, add it to the current value // and then remove that key from that map. if (v != null) { value += v; inputMaps.get(j).remove(key); } } // finished aggregating all the values for this key, now store it // in the output map outputMap.put(key, value); } } return outputMap; } } [/sourcecode] The following video (best viewed in HD mode) shows a simple word counting application using Map-Reduce that walks through the steps of implementing the Mapper interface and passing the results of the Map phase to the Reduce phase and finally validating that the results are correct. [wpvideo 9w4ElCW9] The above code and all the code discussed in the video can be found in the MapReduce sub-project of the DalalStreet open source project in Google Code. Here are the links to the files in case you want to take a detailed look at the code.

Complexity

So if Map/Reduce is really that simple, then why does Google consider it’s implementation as its intellectual property (IP), or why is there an open-source project (Hadoop) around it, or even a company (Cloudera) that is trying to commercialize this pattern ?

The answer lies in the application of Map/Reduce to huge data sets (URL-Count of the entire World Wide Web, log analysis, etc.) over a cluster of machines. When one runs Map/Reduce over a cluster of machines, one has worry about getting notified when each Map phase or job finishes (either successfully or with an error), transferring the results – which can be huge – of the Map phase over to the Reduce phase (over the network), and other problems that are typically associated with a distributed application. Map/Reduce by itself is not complex, but the associated set of supporting services that enable Map/Reduce to be distributed, is what makes a Map/Reduce “framework” (such as Hadoop) complex (and valuable). Google takes it a step further by running redundant Map phases (to account for common error conditions like disk failures, network failures, etc.) and its IP lies in how it manages these common failures, results from redundant jobs, etc.

Conclusion

Map/Reduce has definitely opened up new possibilities for companies that want to analyze their huge data sets, and if you want to give it a test drive, you might want to checkout Amazon’s EC2 Map/Reduce harness (running Hadoop). You might want to try out the word count example by downloading a few books from Project Gutenberg.

Happy crunching !!

Design a site like this with WordPress.com
Get started