The chapter contains a lot of details about integer numbers encoding and compression. Since these topics are not directly about MapReduce, I made no questions about them.
4.4 Inverting Indexing: Revised Implementation
Explain inverting index retrieval algorithm. You may assume that each document fits into the memory. Assume also then there is a huge number of documents. Which part should be optimized by integer encoding and compression?
Input: text documents
key: document id
value: text document
Output: key/value pairs where
key: word
value: list[documentId, numberOfOccurences]
list elements must be sorted by numberOfOccurences
Mapper counts number of occurrences in the document for each word. As the whole document fits into the memory, we can hold partial results in a map.
Intermediate key/values:
key: word, numberOfOccurences
value: documentId
Custom partitioner groups intermediate key/values by word. Custom sort sorts them primary by word and secondary by the number of occurrences.
Reducer uses initialize method to initialize list of all postings. Reduce method handles two cases:
- current word equal to previous word – add documentId and numberOfOccurences to posting list.
- current word equal to previous word – emit previous word and posting list; initialize posting list.
Posting list in reducer should be compressed.
class MAPPER
method INITIALIZE
H = new hash map
method MAP(docid, doc d)
H = new hash map
for all term w in doc d do
H(w) = H(w) + 1
for all term w in H do
emit(pair(u, w), count 1)
method CLOSE
for all term w in H
emit(pair(w, H(w)), docid)
class REDUCER
variable previous_word = 0
variable PL = new list of postings
method REDUCE(pair (w, #occurrences), docid)
if w <> previous_word && previous_word <> 0 do
emit(w, PL)
PL = new list of postings
PL.add(pair(#occurrences, docid))
previous_word = w
method compare(key (w1, o1), key (w2, o2))
if w1 = w2
return keys are equal
return keys are different
class SORTING_COMPARATOR
method compare(key (w1, o1), key (w2, o2))
if w1 = w2 do
return compare(o1, o2)
return compare(w1, w2)
5 Graph Algorithms
The chapter contains two algorithms: shortest path in the graph and page ranking algorithm. The questions are straightforward.
5.2 Parallel Breadth-First Search
Find shortest path from one node origin to all other nodes. Each edge has a weight associated. Input key/value pairs have already bean preprocessed into comfortable form.
Input: graph
key: node id
value: distance to origin, list[adjacent node, edge length]
Output: key/value pairs where
key: node id
value: distance to origin, list[adjacent node, edge length]
The algorithm requires multiple iterations. It stops the iteration does not change any ‘distance to origin’. At worst, there will be O(n) iterations where n is a number of nodes in the graph.
Mapper passes original graph to the next iteration as it is. Plus, it generates key/value pair for each adjacent node. The value contains the minimum known distance from origin if the route would go through node.
class MAPPER
method MAP(node, pair(dist, adjacencylist))
emit(node, pair(dist, adjacencylist))
for all (closenode, nodedist) in adjacencylist do
emit(closenode, pair(dist + nodedist, empty))
Reducer finds the minimum known distance from each node. It passes the distance along with the original graph to the next iteration. It also increments global counter whenever minimum known distance to any node changes.
class REDUCER
method REDUCE(node, list(dist, adjacencylist))
minimum = infinity
previous_iteration_solution = infinity
original_graph = empty
for all (dist, adjacencylist) in list do
if adjacencylist not empty do
original_graph = adjacencylist
previous_iteration_solution = dist
if minimum > dist
minimum = dist
if previous_iteration_solution <> minimum
increment global counter
emit(node, pair(minimum, original_graph))
If the global counter is 0, the algorithm stops. Otherwise another iteration is needed.
Explain page rank algorithm, assume alpha = 0.
Page rank P(n) of a page n is calculated form page ranks of all pages linking to it.
P(n) = sum_m (P(m)/C(m))
The sum goes through all pages m linking to the page n. C(m) is the number of outgoing links of the page m.
Page rank algorithm runs in iterations. Mapper passes page rank contribution of each page to adjacent pages. Reducer updates page rank of each node. The algorithm stops when page ranks no longer moves.
class MAPPER
method MAP(page, (page_rank, adjacency_list))
emit(page, (0, adjacency_list))
contribution = page_rank/adjacency_list.length
for all node in adjacency_list do
emit(node, (contribution, empty))
class REDUCER
method REDUCE(page, contributions[c1, c2, ..., cn])
rank = 0
adjacency_list = new list
for all c in contributions do
adjacency_list.addAll(c.adjacency_list)
rank = rank + c.contribution
emit(page, (rank, adjacency_list))
6 EM Algorithms For Text Processing
I made no questions out of this chapter.
Exercises
This chapter contains hands-on exercises for MapReduce. Some of them require multiple iterations.
Warm Up
Count number of occurrences of every word in a text collection.
Input:
key: document id,
value: text document.
Output:
key: word,
value: number of occurences.
Intermediate pairs:
key: word
value: integer - how many times was the word seen in the input.
class MAPPER
method MAP(docid a, doc d)
for all term w in doc d do
emit(w, 1)
class COMBINER
method COMBINE(word w, counts[c1, c2, ..., cn])
s = 0
for all c in counts[c1, c2, ..., cn] do
s = s + c
emit(word w, s)
class REDUCER
variable total_occurrences = 0
method REDUCE(word w, counts[c1, c2, ..., cn])
s = 0
for all c in counts[c1, c2, ..., cn] do
s = s + c
emit(word w, s)
Alternative solution would use in-mapper combining.
Web Store
Website user log contains user ids and length of each session. The website has modest number of registered users. Compute the average session length for each user.
Input:
key: user id,
value: session length.
Output:
key: user id,
value: average session length.
As the number of registered users is modest, we can use in-mapper combining.
class MAPPER
variable total_time = new hash map
variable sessions_number = new hash map
method MAP(user_id, session_length)
total_time(user_id) = total_time(user_id) + session_length
sessions_number(user_id) = sessions_number(user_id) + 1
method CLOSE
for all user_id in total_logged_in_time
tt = total_time(user_id)
sn = sessions_number(user_id)
emit(user_id, pair(tt, sn))
class REDUCER
method REDUCE(user_id, [pairs(time, sessions_number)])
total_time = 0
total_sessions = 0
for all pairs in [pairs(time, sessions_number)] do
total_time = total_time + time
total_sessions = total_sessions + sessions_number
emit(user_id, total_time/total_sessions)
Web store log contains user id and bought item for each sale. You need to implement “buyers of item also bought” functionality. Whenever the item is shown, the store will suggest five items most often bought by items buyers.
Input:
key: user id,
value: brought item.
Output:
key: item,
value: list of five most common "buyers of item also bought" items.
Our solution has two iterations. First iteration generates lists of all items brought by the same user. Grouping is done by the framework, both mapper and reducer perform an identity function.
Input:
key: user id,
value: brought item.
Output:
key: user id,
value: list of all brought items.
class MAPPER
method MAP(user_id, item)
emit(user_id, item)
class REDUCER
method REDUCE(user_id, items[i1, i2, ..., in])
emit(user_id, items)
Second iteration solves co-occurrences problem on list items. It uses the stripes approach. Only difference against the standard solution is that we have emit only five most common co-occurrences.
Input:
key: user id,
value: list of all brought items.
Output:
key: item,
value: list of five most common co-occurrences.
class MAPPER
method MAP(user_id, items[i1, i2, ..., in])
for all item in items do
H = new hash map
for all item j in items do
H(j) = H(j) + 1
emit(item, H)
class REDUCER
method REDUCE(item, stripes[H1, H2, ..., Hn])
T = new hash map
for all H in stripes do
for all (key/value) in H do
T(key) = T(key) + value
emit(user_id, max_five(T))
Web store log contains user id, timestamp, item and number of brought pieces for each sale. The store is looking for items whose sales rise or decline at the same time. Find 20 item couples with maximum of such months.
Input:
key: user id,
value: timestamp, brought item, count.
Output:
key: item, item
value: number of months when both items sales rose or decline.
#: the output contains 20 key/value pairs with maximum value
Our solution requires multiple MapReduce iterations. We have to:
- calculate whether items sales for any given month went up or down,
- create lists of items with the same sales change during the same month,
- find number of co-occurrences in those lists,
- choose items with maximum co-occurrences.
First iteration calculates sales changes for any given month. We have to supply mapper, partitioner, custom sort and reducer. Mapper generates one intermediate key/value pair for each input key/value. Key is composed of sold item and sales month. Value contains number of sold pieces.
Partitioner sends all key/value pairs with the same item to the same reducer. Custom sort sorts them by months. Finally, reducer calculates sales changes.
Input:
key: user id,
value: timestamp, item, count.
Intermediate key/values:
key: item, month
value: count.
Output:
key: month, up/down/equal
value: item.
class MAPPER
method MAP(user_id, (timestamp, item, count))
month = get_month(timestamp)
emit((item, month), count)
class PARTITIONING_COMPARATOR
method compare(key (item1, month1), key (item2, month2))
if item1 = item2
return keys are equal
return keys are different
class SORTING_COMPARATOR
method compare(key (item1, month1), key (item2, month2))
if item1 = item2 do
return compare(month1, month2)
return compare(item1, item2)
class REDUCER
method REDUCE((item, month), counts[c1, c2, ..., cn])
c = sum([c1, c2, ..., cn])
if last_item = item
if last_month + 1 = month
//emit correct up/down/equal flags
if last_count < count
emit((item, month), up)
if last_count > count
emit((item, month), down)
if last_count = count
emit((item, month), equal)
else
//no sales during some months
emit((item, last_month + 1), down)
emit((item, month), up)
else
// new item
emit((last_item, last_month + 1), down)
emit((item, month), up)
last_item = item
last_count = count
last_month = month
Second iteration groups first iteration results by keys. It generates lists of items with same sales changes during the same month. Framework does all the work. Both mapper and reducer perform an identity function.
Input:
key: month, up/down/equal
value: item.
Output:
key: month, up/down/equal
value: [items].
Third iteration performs standard ‘co-occurrences by pairs’ algorithm.
Input:
key: month, up/down/equal
value: [items].
Intermediate key/values:
key: item, item
value: partial number of co-occurrences.
Output:
key: item, item
value: number of months when both items sales rose or decline.
#: the output contains all items couples
class MAPPER
method MAP((month, change), items[i1, i2, ..., in])
for each i in items do
for each j in items do
if i != j
emit((i, j), 1)
class COMBINER
method COMBINE((item1, item2), co-occurrences[c1, c2, ..., cn])
s = 0
for all c in co-occurrences[c1, c2, ..., cn] do
s = s + c
emit((item1, item2), s)
class REDUCER
method REDUCE((item, item), co-occurrences[c1, c2, ..., cn])
s = 0
for all c in co-occurrences[c1, c2, ..., cn] do
s = s + c
emit((item1, item2), s)
Finally, we have to choose 20 key/value pairs with maximum value. Each mapper selects 20 key/value pairs with maximum value and emits them with the same key. There will be only one reducer which selects final 20 key/value pairs.
Input:
key: item, item
value: number of months when both items sales rose or decline.
#: the output contains all items couples
Intermediate key/values:
key: 1
value: item, item, number of months when both items sales rose or decline.
#: the output contains 20 key/value pairs with maximum value for each mapper
Output:
key: item, item
value: number of months when both items sales rose or decline.
#: the output contains 20 key/value pairs with maximum value
the code is very simple but long
Criminal Agency
Inputs to all exercises in this chapter uses the same data structure.
Criminal agency stole Facebook’s friendships database and wants to analyze new data. Friendships are stored in form key/value pairs, each friendship corresponds to two key/value pairs:
Friends:
key: first friend name
value: second friend name
key: second friend name
value: first friend name
The agency owns also criminal records of all citizens:
Criminal record:
key: citizen name
value: when, where, accomplices, description
Find at risk youths. A person is considered at risk youth if more than half of his friends have criminal record.
Our solution has two iterations. First iteration joins two sets and flags each ‘value friend’ with has_record/law_abiding flags.
Output:
key: first friend
value: second friend, has_record/law_abiding
The mapper flags each key with data set name. Partitioner groups data according to names in keys and sorter puts criminal records before friendships. We could use local aggregation to remove multiple criminal records for the same person.
class MAPPER
method MAP(name, value)
if value is name do
emit(name, friendship, item)
else
emit(name, criminal, item)
class PARTITIONING_COMPARATOR
method compare(key (name1, dataset1), key (name2, dataset2))
if name1 = name2
return keys are equal
return keys are different
class SORTING_COMPARATOR
method compare(key (name1, dataset1), key (name2, dataset2))
if name1 = name2 AND dataset1 is criminal
return key1 is lower
if name1 = name2 AND dataset2 is criminal
return key2 is lower
return compare(name1, name2)
class REDUCER
variable previous_name
method REDUCE(pair(name, flag), items[i1, i2, ..., in])
if flag is criminal do
previous_name = name
has_record = criminal
return
if previous_name <> name do
has_record = law_abiding
else
has_record = criminal
previous_name = name
for all i in items do
emit(i.name, pair(name, has_record))
Second iteration counts both total number of friends and number of friends with criminal record. Reducer emits key/value pairs only for at risk youths. Also this iteration could use some kind of local aggregation.
Intermediate key/value:
key: name
value: total friends, total friend criminals
# totals are relative only to in data sets subsets
Output:
key: name
value: empty
# only at risk youths
class MAPPER
method MAP(name, pair(name, has_record))
if has_record is law_abiding do
emit(name, pair(0, 1))
else
emit(name, pair(1, 1))
class REDUCER
method REDUCE(name, items[pair(total, criminals)])
total = 0
criminals = 0
for all i in items do
total = total + i.total
criminals = criminals + i.criminals
if criminals / total > 0.5 do
emit(name, empty)
Find gangs. Gang is a group of people that:
- has exactly 5 members,
- each member is friend with all other members,
- each two members committed at least 3 crimes together.
Again, we need three iterations. The idea is to first clean up the graph of all useless edges, so that only criminal contacts remain. Then, we split graph into smaller manageable sub-graphs. We attach all criminal contacts and edges between them to each person:
Last iteration reducers input:
key: person
values: all his criminal contacts and relationships between them.
Final reducer takes smaller graphs represented by value in each key/value pair and finds complete sub-graphs with 4 vertices in it. Add person from the key in it, and you have found a complete sub-graph with 5 vertices. The reducer may use any polynomial algorithm.
First iteration uses pairs approach to clear the graph. We omit both local aggregation and removal of duplicities. Both would make the algorithm more efficient.
Intermediate key/values:
key: first friend, second friend, friendship/accomplice
value: 1
Output:
key: first friend, second friend
value: empty
# only friends with common criminal record
class MAPPER
method MAP(name, value)
if value is name do
emit(triple(name, value, friendship), empty)
else
for all crime_accomplice in value.accomplices do
emit(triple(name, crime_accomplice, accomplice), 1)
class PARTITIONING_COMPARATOR
method compare(key (name1, accomplice1, flag1), key (name2, accomplice2, flag2))
if name1 = name2 AND accomplice1 = accomplice2
return keys are equal
return keys are different
class SORTING_COMPARATOR
method compare(key (name1, accomplice1, flag1), key (name2, accomplice2, flag2))
if name1 = name2 AND accomplice1 AND flag1 is friendship
return key1 is lower
if name1 = name2 AND accomplice1 AND flag2 is friendship
return key2 is lower
return compare(pair(name1, accomplice1), pair(name2, accomplice2))
class REDUCER
variable previous_name
variable previous_accomplice
method sameAsPrevious(name, accomplice)
if previous_name <> name
return false
if previous_accomplice <> accomplice
return false
return true
method REDUCE(triple(name, accomplice, flag), items[i1, i2, ..., in])
if sameAsPrevious(name, accomplice) do
if items.length > 2 do
emit(name, accomplice)
return
if flag is friendship do
previous_name = name
previous_accomplice = accomplice
Second iteration attaches lists of all ‘second degree’ friends to edges:
Input
key: first friend, second friend
value: empty
Intermediate key/values:
key: first friend
value: first friend, second friend
key: second friend
value: first friend, second friend
Output:
key: first friend, second friend
value: all friends of second friend
key: second friend, first friend
value: all friends of first friend
class MAPPER
method MAP((first friend, second friend), empty)
emit(first friend, (first friend, second friend))
emit(second friend, (first friend, second friend))
class REDUCER
method REDUCE(name, edges[e1, e2, ..., en])
friends = new Set
friends.add(name)
for all edge in edges do
friends.add(edge.v1, edge.v2)
for all edge in edges do
emit(edge, friends)
Finally, mapper and shuffle and sort phase together generate lists of all friends of any given person and relationships between them.
Input
key: friend 1, friend 2
value: all friends of friend 2
Intermediate key/values:
key: friend 1
value: friend 2, all friends of friend 2
Reducers input (after shuffle and sort):
key: person
values: all his friends and relationships between them.
Output:
key: first friend, second friend, third friend, fourth friend, fifth friend
value: gang
class MAPPER
method MAP((friend , friend 2), all friends of second friend)
emit(friend 1, (friend 2, all friends of friend 2))
class REDUCER
method REDUCE(name, graph attached to it)
any polynomial algorithm will work
Reference: MapReduce Questions and Answers from our JCG partner Maria Jurcovicova at the This is Stuff blog.



