Enterprise Java

MapReduce Design Patterns Implemented in Apache Spark

This blog is a first in a series that discusses some design patterns from the book MapReduce design patterns and shows how these patterns can be implemented in Apache Spark(R).

When writing MapReduce or Spark programs, it is useful to think about the data flows to perform a job. Even if Pig, Hive, Apache Drill and Spark Dataframes make it easier to analyze your data, there is value in understanding the flow at a lower level, just like there is value in using Explain to understand a query plan. One way to think about this is in groupings for types of patterns, which are templates for solving a common and general data manipulation problems. Below is the list of types of MapReduce patterns in the MapReduce book:

  • Summarization Patterns
  • Filtering Patterns
  • Data Organization Patterns
  • Join Patterns
  • Metapatterns
  • Input and Output Patterns

In this post we will go over one of the summarization patterns, namely numerical summarizations.

Numerical Summarizations

Numerical summarizations are a pattern for calculating aggregate statistical values over data. The intent is to group records by a key field and calculate aggregates per group such as min, max, median. The figure below from the MapReduce design patterns book shows the general execution of this pattern in MapReduce.


This Aggregation pattern corresponds to using GROUP BY in SQL for example:

SELECT MIN(numericalcol1), MAX(numericalcol1),
        COUNT(*) FROM table GROUP BY groupcol2;

In Pig this corresponds to:

b = GROUP a BY groupcol2;
c = FOREACH b GENERATE group, MIN(a.numericalcol1),
        MAX(a.numericalcol1), COUNT_STAR(a);

In Spark, Key value Pair RDDs are commonly used to group by a key in order to perform aggregations, as shown in the MapReduce diagram, however with Spark Pair RDDS, you have a lot more functions than just Map and Reduce.

We will go through some aggregation examples using the dataset from a previous blog on Spark Dataframes. The dataset is a .csv file that consists of online auction data. Each auction has an auction id associated with it and can have multiple bids. Each row represents a bid. For each bid, we have the following information:


(In the code boxes, comments are in Green and output is in Blue)

Below we load the data from the ebay.csv file, then we use a Scala case class to define the Auction schema corresponding to the ebay.csv file. Then map() transformations are applied to each element to create the auctionRDD of Auction objects.

//  SQLContext entry point for working with structured data
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
// Import Spark SQL data types and Row.
import org.apache.spark.sql._
//define the schema using a case class
case class Auction(auctionid: String, bid: Double, bidtime: Double, bidder: String, bidderrate: Integer, openbid: Double, price: Double, item: String, daystolive: Integer)
// create an RDD of Auction objects
val auctionRDD= sc.textFile("ebay.csv").map(_.split(",")).map(p => Auction(p(0),p(1).toDouble,p(2).toDouble,p(3),p(4).toInt,p(5).toDouble,p(6).toDouble,p(7),p(8).toInt ))

The figure below shows the general execution in Spark to calculate the average bid per auction for an item.


The corresponding code is shown below. First a key value pair is created with the auction id and item as the key and the bid amount and a 1 as the value , e.g. ((id,item), bid amount,1)) . Next a reduceBykey performs a sum of the bid amounts and a sum of the ones to get the total bid amount and the count. A mapValues calculates the average which is the total bid amount / count of bids.

//  create key value pairs of ( (auctionid, item) , (bid, 1))
val apair = auctionRDD.map(auction=>((auction.auctionid,auction.item), (auction.bid, 1)))
//  reducebyKey  to get  the sum of  bids  and count sum
val atotalcount = apair.reduceByKey((x,y) => (x._1 + y._1, x._2 + y._2))  
//  get a couple results
// Array(((1641062012,cartier),(4723.99,3)), ((2920322392,palm),(3677.96,32)))
//  avg  = total/count
val avgs =  atotalcount.mapValues{ case (total, count) => total.toDouble / count } 
//  get a couple results
// Array(((1641062012,cartier),1574.6633333333332), ((2920322392,palm),114.93625))

// This could also be written like this
val avgs =auctionRDD.map(auction=>((auction.auctionid,auction.item), (auction.bid, 1))).reduceByKey((x,y) => (x._1 + y._1, x._2 + y._2)).mapValues{ case (total, count) => total.toDouble / count }

It is also possible to use the java Math class or the spark StatCounter class to calculate statistics as shown

import java.lang.Math
// Calculate the minimum bid per auction
val amax = apair.reduceByKey(Math.min)
//  get a couple results
// Array(((1641062012,cartier),1524.99), ((2920322392,palm),1.0))

import org.apache.spark.util.StatCounter
// Calculate  statistics on the bid  amount per auction
val astats = apair.groupByKey().mapValues(list => StatCounter(list))
//  get a  result
// Array(((1641062012,cartier),(count: 3, mean: 1574.663333, stdev: 35.126723, max: 1600.000000, min: 1524.990000)))

Spark DataFrames provide a domain-specific language for distributed data manipulation, making it easier to perform aggregations. Also DataFrame queries can perform better than coding with PairRDDs because their execution is automatically optimized by a query optimizer. Here is an example of using DataFrames to get the min , max, and avg bid by auctionid and item :

val auctionDF = auctionRDD.toDF()
// get the max, min, average bid by auctionid and item
auctionDF.groupBy("auctionid", "item").agg($"auctionid",$"item",  max("bid"), min("bid"), avg("bid")).show 

auctionid  item    MAX(bid) MIN(bid) AVG(bid)          
3016429446 palm    193.0    120.0    167.54900054931642
8211851222 xbox    161.0    51.0     95.98892879486084

You can also use SQL while working with DataFrames, using Spark SQL. This example gets the max , min, average bid by auctionid and item.

// register as a temp table inorder to use sql
auctionDF .registerTempTable("auction")
// get the max, min, average bid by auctionid and item
val aStatDF = sqlContext.sql("SELECT auctionid, item, MAX(bid) as maxbid, min(bid) as minbid, avg(bid) as avgbid FROM auction GROUP BY auctionid, item")
// show some results
auctionid  item    maxbid minbid avgbid            
3016429446 palm    193.0  120.0  167.549           
8211851222 xbox    161.0  51.0   95.98892857142857


This concludes the first in a series which will discuss some MapReduce design patterns implemented with Spark. This discussion was very condensed, for more information on the patterns refer to the MapReduce design patterns book, for more information on Spark Pair RDDs refer to the Learning Spark Key value Pairs chapter.

References and More Information

Reference: MapReduce Design Patterns Implemented in Apache Spark from our JCG partner Carol McDonald at the Mapr blog.
Notify of

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Inline Feedbacks
View all comments
Back to top button