About Vlad Mihalcea

Vlad Mihalcea is a software architect passionate about software integration, high scalability and concurrency challenges.

MongoDB time series: Introducing the aggregation framework

In my previous posts I talked about batch importing and the out-of-the-box MongoDB performance. Meanwhile, MongoDB was awarded DBMS of the year, so I therefore decided to offer a more thorough analyze of its real-life usage.

Because theory is better understood in a pragmatic context, I will first present you our virtual project requirements.

Introduction

 
 
Our virtual project has the following requirements:

  1. it must store valued time events represented as v=f(t)
  2. it must aggregate the minimum, maximum, average and count records by:
    • seconds in a minute
    • minutes in an hour
    • hours in a day
    • days in a year
  3. the seconds in a minute aggregation is calculated in real-time (so it must be really fast)
  4. all other aggregations are calculated by a batch processor (so they must be relatively fast)

Data model

I will offer two data modelling variants, each one having pros and cons.

  1. The first version uses the default auto-assigned MongoDB “_id”, and this simplifies inserts, since we can do it in batches without fearing of any timestamp clashing.
    If there are 10 values recorded each millisecond, then we will end up having 10 distinct documents. This post will discuss this data model option.

    {
    	"_id" : ObjectId("52cb898bed4bd6c24ae06a9e"),
    	"created_on" : ISODate("2012-11-02T01:23:54.010Z")
    	"value" : 0.19186609564349055
    }
  2. The second version uses the number of milliseconds since epoch as the “_id” field and the values are stored inside a “values” array.
    If there are 10 values recorded each millisecond, then we will end up having one distinct document with 10 entries in the “values” array. A future post will be dedicated to this compacted data model.

    {
            "_id" : 1348436178673,
            "values" : [
                    0.7518879524432123,
                    0.0017396819312125444
            ]
    }

Inserting data

Like in my previous post I will use 50M documents for testing the aggregation logic. I chose this number because I am testing on my commodity PC. In the aforementioned post I managed to insert over 80000 documents per second. This time I will take a more real-life approach and start by creating the collection and the indexes prior to inserting the data.

MongoDB shell version: 2.4.6
connecting to: random
> db.dropDatabase()
{ "dropped" : "random", "ok" : 1 }
> db.createCollection("randomData");
{ "ok" : 1 }
> db.randomData.ensureIndex({"created_on" : 1});
> db.randomData.getIndexes()
[
        {
                "v" : 1,
                "key" : {
                        "_id" : 1
                },
                "ns" : "random.randomData",
                "name" : "_id_"
        },
        {
                "v" : 1,
                "key" : {
                        "created_on" : 1
                },
                "ns" : "random.randomData",
                "name" : "created_on_1"
        }
]

Now it’s time to insert the 50M documents.

mongo random --eval "var arg1=50000000;arg2=1" create_random.js
...
Job#1 inserted 49900000 documents.
Job#1 inserted 50000000 in 2852.56s

This time we managed to import 17500 documents per second. At such rate we would require 550B entries a year, which is more than enough for our use case.

Compacting data

First, we need to analyze our collection statistics and for this we need to use the stats command:

db.randomData.stats()
{
        "ns" : "random.randomData",
        "count" : 50000000,
        "size" : 3200000096,
        "avgObjSize" : 64.00000192,
        "storageSize" : 5297451008,
        "numExtents" : 23,
        "nindexes" : 2,
        "lastExtentSize" : 1378918400,
        "paddingFactor" : 1,
        "systemFlags" : 1,
        "userFlags" : 0,
        "totalIndexSize" : 3497651920,
        "indexSizes" : {
                "_id_" : 1623442912,
                "created_on_1" : 1874209008
        },
        "ok" : 1
}

The current index size is almost 3.5GB and this is almost half of my available RAM. Luckily, MongoDB comes with a compact command, which we can use to defragment our data. This takes a lot of time, especially because we have a large total index size.

db.randomData.runCommand("compact");
Compacting took 1523.085s

Let’s see how much space we saved through compacting:

db.randomData.stats()
{
        "ns" : "random.randomData",
        "count" : 50000000,
        "size" : 3200000032,
        "avgObjSize" : 64.00000064,
        "storageSize" : 4415811584,
        "numExtents" : 24,
        "nindexes" : 2,
        "lastExtentSize" : 1149206528,
        "paddingFactor" : 1,
        "systemFlags" : 1,
        "userFlags" : 0,
        "totalIndexSize" : 2717890448,
        "indexSizes" : {
                "_id_" : 1460021024,
                "created_on_1" : 1257869424
        },
        "ok" : 1
}

We freed almost 800MB of data and that’s going to be handy for our RAM-intensive aggregation operations.

Explaining the aggregation logic

All four aggregation reports are similar, as they only differ by:

  1. the selecting time interval
  2. the group by time granularity

We can therefore start with the first report, which aggregates values by second. We will use the explain method to get a glance of our aggregation’s inner-workings.

load(pwd() + "/../../util/date_util.js");
var minDate = new Date(Date.UTC(2012, 1, 10, 11, 25, 30));
var maxDate = new Date(Date.UTC(2012, 1, 10, 11, 25, 35));
var result = db.randomData.runCommand('aggregate', { pipeline: 
[
	{
		$match: {
			"created_on" : {
				$gte: minDate, 
				$lt : maxDate	
			}
		}
	},
	{
		$project: {
			_id : 0,
			created_on : 1,
			value : 1
		}
	},
	{
		$group: {
				"_id": { 
					"year" : {
						$year : "$created_on"
					}, 
					"dayOfYear" : {
						$dayOfYear : "$created_on"
					},
					"hour" : {
						$hour : "$created_on"
					},
					"minute" : {
						$minute : "$created_on"
					},
					"second" : {
						$second : "$created_on"
					},
				}, 
				"count": { 
					$sum: 1 
				}, 
				"avg": { 
					$avg: "$value" 
				}, 
				"min": { 
					$min: "$value" 
				}, 
				"max": { 
					$max: "$value" 
				}		
			}
	},
	{
		$sort: { 
			"_id.year" : 1, 
			"_id.dayOfYear" : 1,
			"_id.hour" : 1,
			"_id.minute" : 1,
			"_id.second" : 1
		} 	
	}
], explain: true});
printjson(result);

Which outputs the following result

{
        "serverPipeline" : [
                {
                        "query" : {
                                "created_on" : {
                                        "$gte" : ISODate("2012-02-10T11:25:30Z"),
                                        "$lt" : ISODate("2012-02-10T11:25:35Z")
                                }
                        },
                        "projection" : {
                                "created_on" : 1,
                                "value" : 1,
                                "_id" : 0
                        },
                        "cursor" : {
                                "cursor" : "BtreeCursor created_on_1",
                                "isMultiKey" : false,
                                "n" : 5,
                                "nscannedObjects" : 5,
                                "nscanned" : 5,
                                "nscannedObjectsAllPlans" : 5,
                                "nscannedAllPlans" : 5,
                                "scanAndOrder" : false,
                                "indexOnly" : false,
                                "nYields" : 0,
                                "nChunkSkips" : 0,
                                "millis" : 0,
                                "indexBounds" : {
                                        "created_on" : [
                                                [
                                                        ISODate("2012-02-10T11:25:30Z"),
                                                        ISODate("2012-02-10T11:25:35Z")
                                                ]
                                        ]
                                },
                                "allPlans" : [
                                        {
                                                "cursor" : "BtreeCursor created_on_1",
                                                "n" : 5,
                                                "nscannedObjects" : 5,
                                                "nscanned" : 5,
                                                "indexBounds" : {
                                                        "created_on" : [
                                                                [
                                                                        ISODate("2012-02-10T11:25:30Z"),
                                                                        ISODate("2012-02-10T11:25:35Z")
                                                                ]
                                                        ]
                                                }
                                        }
                                ],
                                "oldPlan" : {
                                        "cursor" : "BtreeCursor created_on_1",
                                        "indexBounds" : {
                                                "created_on" : [
                                                        [
                                                                ISODate("2012-02-10T11:25:30Z"),
                                                                ISODate("2012-02-10T11:25:35Z")
                                                        ]
                                                ]
                                        }
                                },
                                "server" : "VLAD:27017"
                        }
                },
                {
                        "$project" : {
                                "_id" : false,
                                "created_on" : true,
                                "value" : true
                        }
                },
                {
                        "$group" : {
                                "_id" : {
                                        "year" : {
                                                "$year" : [
                                                        "$created_on"
                                                ]
                                        },
                                        "dayOfYear" : {
                                                "$dayOfYear" : [
                                                        "$created_on"
                                                ]
                                        },
                                        "hour" : {
                                                "$hour" : [
                                                        "$created_on"
                                                ]
                                        },
                                        "minute" : {
                                                "$minute" : [
                                                        "$created_on"
                                                ]
                                        },
                                        "second" : {
                                                "$second" : [
                                                        "$created_on"
                                                ]
                                        }
                                },
                                "count" : {
                                        "$sum" : {
                                                "$const" : 1
                                        }
                                },
                                "avg" : {
                                        "$avg" : "$value"
                                },
                                "min" : {
                                        "$min" : "$value"
                                },
                                "max" : {
                                        "$max" : "$value"
                                }
                        }
                },
                {
                        "$sort" : {
                                "sortKey" : {
                                        "_id.year" : 1,
                                        "_id.dayOfYear" : 1,
                                        "_id.hour" : 1,
                                        "_id.minute" : 1,
                                        "_id.second" : 1
                                }
                        }
                }
        ],
        "ok" : 1
}

The aggregation framework uses a pipes and filter design pattern, and our pipeline consists of the following operations:

  1. Match: This operation is similar to a WHERE SQL clause, and it is the first one we use since we make use of our “created_on” index (e.g. this is confirmed by the explain results: “cursor” : “BtreeCursor created_on_1″,). We are not using a covering-index (e.g. “indexOnly” : false) because that would be over-kill for our 8GB RAM set-up.
  2. Project: This operation is similar to a SELECT SQL clause, and it’s used for removing the “_id” field from our working set (which is useless for our reporting logic).
  3. Group: This operation is similar to a GROUP BY SQL clause, and it does all the calculation in memory. This is why we filtered the working-set prior to grouping it.
  4. Sort: This operation is similar to an ORDER BY SQL clause, and we use it to sort the results chronologically.

The base aggregation script

Since our four reports are similar we can group all the logic in a single script:

function printResult(dataSet) {
	dataSet.result.forEach(function(document)  {
		printjson(document);
	});
}

function aggregateData(fromDate, toDate, groupDeltaMillis, enablePrintResult) {		

	print("Aggregating from " + fromDate + " to " + toDate);

	var start = new Date();

	var groupBy = { 
		"year" : {
			$year : "$created_on"
		}, 
		"dayOfYear" : {
			$dayOfYear : "$created_on"
		}
	};

	var sortBy = { 
			"_id.year" : 1, 
			"_id.dayOfYear" : 1
	}; 	

	var appendSeconds = false;
	var appendMinutes = false;
	var appendHours = false;

	switch(groupDeltaMillis) {
		case ONE_SECOND_MILLIS :
			appendSeconds = true;			
		case ONE_MINUTE_MILLIS :
			appendMinutes = true;			
		case ONE_HOUR_MILLIS :
			appendHours = true;		
	}	

	if(appendHours) {
		groupBy["hour"] = {
			$hour : "$created_on"	
		};
		sortBy["_id.hour"] = 1;	
	}
	if(appendMinutes) {
		groupBy["minute"] = {
			$minute : "$created_on"	
		};
		sortBy["_id.minute"] = 1;
	}
	if(appendSeconds) {
		groupBy["second"] = {
			$second : "$created_on"	
		};
		sortBy["_id.second"] = 1;
	}	

	var pipeline = [
		{
			$match: {
				"created_on" : {
					$gte: fromDate, 
					$lt : toDate	
				}
			}
		},
		{
			$project: {
				_id : 0,
				created_on : 1,
				value : 1
			}
		},
		{
			$group: {
					"_id": groupBy, 
					"count": { 
						$sum: 1 
					}, 
					"avg": { 
						$avg: "$value" 
					}, 
					"min": { 
						$min: "$value" 
					}, 
					"max": { 
						$max: "$value" 
					}		
				}
		},
		{
			$sort: sortBy
		}
	];

	var dataSet = db.randomData.aggregate(pipeline);
	var aggregationDuration = (new Date().getTime() - start.getTime())/1000;	
	print("Aggregation took:" + aggregationDuration + "s");	
	if(dataSet.result != null && dataSet.result.length > 0) {
		print("Fetched :" + dataSet.result.length + " documents.");
		if(enablePrintResult) {
			printResult(dataSet);
		}
	}
	var aggregationAndFetchDuration = (new Date().getTime() - start.getTime())/1000;
	if(enablePrintResult) {
		print("Aggregation and fetch took:" + aggregationAndFetchDuration + "s");
	}	
	return {
		aggregationDuration : aggregationDuration,
		aggregationAndFetchDuration : aggregationAndFetchDuration
	};
}

Time for results

Let’s test the first three reports using the following script:

load(pwd() + "/../../util/date_util.js");
load(pwd() + "/aggregate_base_report.js");

var deltas = [ 
{
	matchDeltaMillis: ONE_MINUTE_MILLIS, 
	groupDeltaMillis: ONE_SECOND_MILLIS,
	description: "Aggregate all seconds in a minute"
},
{
	matchDeltaMillis: ONE_HOUR_MILLIS, 
	groupDeltaMillis: ONE_MINUTE_MILLIS,
	description: "Aggregate all minutes in an hour"
},
{
	matchDeltaMillis: ONE_DAY_MILLIS, 
	groupDeltaMillis: ONE_HOUR_MILLIS,
	description: "Aggregate all hours in a day"
}
];

var testFromDate = new Date(Date.UTC(2012, 5, 10, 11, 25, 59));

deltas.forEach(function(delta) {	
	print('Aggregating ' + description);
	var timeInterval = calibrateTimeInterval(testFromDate, delta.matchDeltaMillis);
	var fromDate = timeInterval.fromDate;
	var toDate = timeInterval.toDate;
	aggregateData(fromDate, toDate, delta.groupDeltaMillis, true);	
});

Giving us the following results:

MongoDB shell version: 2.4.6
connecting to: random
Aggregating Aggregate all seconds in a minute
Aggregating from Sun Jun 10 2012 14:25:00 GMT+0300 (GTB Daylight Time) to Sun Jun 10 2012 14:26:00 GMT+0300 (GTB Daylight Time)
Fetched :45 documents.
{
        "_id" : {
                "year" : 2012,
                "dayOfYear" : 162,
                "hour" : 11,
                "minute" : 25,
                "second" : 0
        },
        "count" : 1,
        "avg" : 0.4924355132970959,
        "min" : 0.4924355132970959,
        "max" : 0.4924355132970959
}
{
        "_id" : {
                "year" : 2012,
                "dayOfYear" : 162,
                "hour" : 11,
                "minute" : 25,
                "second" : 1
        },
        "count" : 1,
        "avg" : 0.10043778014369309,
        "min" : 0.10043778014369309,
        "max" : 0.10043778014369309
}
...
{
        "_id" : {
                "year" : 2012,
                "dayOfYear" : 162,
                "hour" : 11,
                "minute" : 25,
                "second" : 59
        },
        "count" : 1,
        "avg" : 0.16304525500163436,
        "min" : 0.16304525500163436,
        "max" : 0.16304525500163436
}
Aggregating from Sun Jun 10 2012 14:00:00 GMT+0300 (GTB Daylight Time) to Sun Jun 10 2012 15:00:00 GMT+0300 (GTB Daylight Time)
Fetched :60 documents.
{
        "_id" : {
                "year" : 2012,
                "dayOfYear" : 162,
                "hour" : 11,
                "minute" : 0
        },
        "count" : 98,
        "avg" : 0.4758610369979727,
        "min" : 0.004005654249340296,
        "max" : 0.9938081130385399
}
{
        "_id" : {
                "year" : 2012,
                "dayOfYear" : 162,
                "hour" : 11,
                "minute" : 1
        },
        "count" : 100,
        "avg" : 0.5217278444720432,
        "min" : 0.003654648782685399,
        "max" : 0.9981840122491121
}
...
{
        "_id" : {
                "year" : 2012,
                "dayOfYear" : 162,
                "hour" : 11,
                "minute" : 59
        },
        "count" : 92,
        "avg" : 0.5401836506308705,
        "min" : 0.01764239347539842,
        "max" : 0.9997266652062535
}
Aggregating Aggregate all hours in a day
Aggregating from Sun Jun 10 2012 03:00:00 GMT+0300 (GTB Daylight Time) to Mon Jun 11 2012 03:00:00 GMT+0300 (GTB Daylight Time)
Fetched :24 documents.
{
        "_id" : {
                "year" : 2012,
                "dayOfYear" : 162,
                "hour" : 0
        },
        "count" : 5727,
        "avg" : 0.4975644027204364,
        "min" : 0.00020139524713158607,
        "max" : 0.9997993060387671
}
{
        "_id" : {
                "year" : 2012,
                "dayOfYear" : 162,
                "hour" : 1
        },
        "count" : 5799,
        "avg" : 0.49519448930962623,
        "min" : 0.00011728447861969471,
        "max" : 0.9999530822969973
}
...
{
        "_id" : {
                "year" : 2012,
                "dayOfYear" : 162,
                "hour" : 23
        },
        "count" : 5598,
        "avg" : 0.49947314951339256,
        "min" : 0.00009276834316551685,
        "max" : 0.9999523421283811
}

Stay tuned, my next post will show you how to optimize these aggregation queries.

 

Do you want to know how to develop your skillset to become a Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you two of our best selling eBooks for FREE!

JPA Mini Book

Learn how to leverage the power of JPA in order to create robust and flexible Java applications. With this Mini Book, you will get introduced to JPA and smoothly transition to more advanced concepts.

JVM Troubleshooting Guide

The Java virtual machine is really the foundation of any Java EE platform. Learn how to master it with this advanced guide!

Given email address is already subscribed, thank you!
Oops. Something went wrong. Please try again later.
Please provide a valid email address.
Thank you, your sign-up request was successful! Please check your e-mail inbox.
Please complete the CAPTCHA.
Please fill in the required fields.

2 Responses to "MongoDB time series: Introducing the aggregation framework"

  1. Ankita says:

    What can be done in aggregation to group by hour (hour as per locale time not in UTC)

    For example

    {_id: {year: 2014, month: 5, day: 2, hour 2}, # this year, month, day, hour should be from ISTTime not ISODate Format.
    count : 10}

  2. UTC has support for time zones:

    http://en.wikipedia.org/wiki/ISO_8601#Time_offsets_from_UTC

    I haven’t tried it in the aggregation framework, so if it doesn’t support it maybe you should open an issue.

Leave a Reply


five + = 6



Java Code Geeks and all content copyright © 2010-2014, Exelixis Media Ltd | Terms of Use | Privacy Policy | Contact
All trademarks and registered trademarks appearing on Java Code Geeks are the property of their respective owners.
Java is a trademark or registered trademark of Oracle Corporation in the United States and other countries.
Java Code Geeks is not connected to Oracle Corporation and is not sponsored by Oracle Corporation.
Do you want to know how to develop your skillset and become a ...
Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you two of our best selling eBooks for FREE!

Get ready to Rock!
You can download the complementary eBooks using the links below:
Close