This article illustrates the fundamental operations in the MapReduce programming paradigm. Modern big data frameworks like Spark, Flink, Pig, and Hive, and most functional programming languages, all provide interfaces for these common operations in some form or another, so understanding them will allow you to learn these tools more easily. The basic operations are
MapReduce is derived from functional programming concepts. Each of these operations are higher order functions, which in layman’s terms, means each operator’s behavior can be customized by passing it a function you define. I will explain all of these operations using an example of counting different types of fruit in a basket (which, in hindsight, is a very unoriginal variation of the classic word-count problem).
This does exactly what the name implies. It outputs a map. Each
map() function call takes in one item, and outputs 0 or 1 items. If we want to count how many of each type of fruit are in a basket, map would output a tuple of type
map((grape, green, 8)) -> (grape, 8)
map((grape, green, 6)) -> (grape, 6)
map((apple, red, 4)) -> (apple, 4)
map((banana, yellow, 3)) -> (banana, 3)
map((dog, bark)) -> Ignore
map((apple, red, 2)) -> (apple, 2)
Note, in the above example, we can ignore an item and exclude it from the output, so
map() can be used to implement a
After the mappers run, the framework needs to decide where to send each item to be processed by the reducers. This is called partitioning. It happens in the “shuffling” phase of a MapReduce job, which is defined as the step where reducer nodes copy data from mapper nodes. The partitioner tells Hadoop which reducer should receive each item. The default partitioner computes a hash of the item, which is associated with a reducer. This behavior can be customized. In our case, we will send the tuples for “apple” to the reducer with an ID of 1 and the tuples for “banana” and “grape” to the reducer with an ID of 2.
partition((apple, 4)) -> reducer_1
partition((apple, 2)) -> reducer_1
partition((banana, 3)) -> reducer_2
partition((grape, 8)) -> reducer_2
partition((grape, 6)) -> reducer_2
Grouping also happens during the shuffling phase. Note how a partitioner only guarantees that a reducer task will receive all the keys for a fruit, but multiple types of fruit can be sent to the same reducer:
reducer_2 receives both the banana’s and grape’s items, but we do not want those items to be treated as the same fruit and have their counts summed up together. We can use a group operator to ensure that banana’s items are processed in a different group than grape’s.
By now, you might be wondering how I’m distinguishing between a reducer task and a
reduce() method call. Inside a reducer task,
reduce() is called multiple times in a for loop. Each reducer task processes one partition, and each
reduce() call processes one group in a partition. (Refer to Reducer.java.) To determine which
reduce() call an item should be sent to, we can define an
equals() method in the GroupComparator class of Hadoop MapReduce. It takes two items, and outputs
true if their keys are equal and
false if they are not. If the two items’ keys are determined to be equal, they are sent to the same
reduce() call. If they are not equal, they are sent to different
reduce() calls. In our example, we compare the fruit type.
group((apple, 4)) -> reducer_1, group_a
group((apple, 2)) -> reducer_1, group_a
group((banana, 3)) -> reducer_2, group_b
group((grape, 8)) -> reducer_2, group_c
group((grape, 6)) -> reducer_2, group_c
This results in the following inputs to each
reduce(apple, [4,2])reducer_2, group_b:
reduce(banana, )reducer_2, group_c:
In each partition, data is sorted before being assigned to Reducer tasks and
reduce() calls. Sorting starts right after the
map() methods are called and ends right before the
reduce() methods are called. Sorting can be done by implementing a
compare() method, which takes two keys as input, and outputs one of three things:
1(key 1 > key 2),
-1(key 1 < key 2), or
0(key 1 = key 2). If we want the output to be sorted by ascending alphabetical order of the fruit’s name, then the output would be the following:
)-> (apple, 4),
A technique called “Secondary Sort” can be used to sort on multiple keys. For example, if we want the data that the reducers receive to be sorted on fruit name in ascending order, then fruit color in descending order, we’d send the sort comparator a composite key of
[fruit, color] and our
compare() function would be allow us to compare two items’ fruit and color.
reduce call compresses multiple values for a key into one output value. Remember, a reducer task can receive multiple groups of items, and in each task,
reduce() is called multiple times, once for each group. We can override the
reduce() method to customize the reducing logic. In our fruit counting example, we want each
reduce() call to sum up the counts of the group it is assigned. We grouped by fruit, so the input to each
reduce() call will look like the following.
reduce(apple, [4,2]) -> (apple, 6)reducer_2, group_b:
reduce(banana, ) -> (banana, 3)reducer_2, group_c:
reduce(grape, [8,6]) -> (grape, 14)
Each reducer task outputs to a file, so the output of the application will be two files since we have two reducers. The number of reducers can be configured.
Combiners are a special kind of reducer that run after the mappers and before the partitioners. The purpose of combiners is to reduce I/O across the network because it locally reduces the number of items on the mapper nodes before sending them across the wire to the reducer nodes. Unlike a reducer, the input key schema and output key schema must be the same.
For example, the reducer can output just the counts:
reduce((apple, [4,2])) -> (6)
reduce((banana, )) -> (3)
reduce((grape, [8,6])) -> (14)
However, the combiner’s input and output must both have the format
combine((apple, [4,2])) -> (apple, )
combine((banana, )) -> (banana, )
combine((grape, [8,6])) -> (grape, 14)
The reason for the input and output formats being the same is multiple combine operations will run in a chain, so the output of one combiner will be the input of the next combiner. In Hadoop MapReduce, a combiner can be implemented by overriding the
reduce() method in the
In a future post, I will implement the fruit-counting example using code.