Software Development

MongoDB time series: Introducing the aggregation framework

In my previous posts I talked about batch importing and the out-of-the-box MongoDB performance. Meanwhile, MongoDB was awarded DBMS of the year, so I therefore decided to offer a more thorough analyze of its real-life usage.

Because theory is better understood in a pragmatic context, I will first present you our virtual project requirements.

Introduction

 
 
Our virtual project has the following requirements:

  1. it must store valued time events represented as v=f(t)
  2. it must aggregate the minimum, maximum, average and count records by:
    • seconds in a minute
    • minutes in an hour
    • hours in a day
    • days in a year
  3. the seconds in a minute aggregation is calculated in real-time (so it must be really fast)
  4. all other aggregations are calculated by a batch processor (so they must be relatively fast)

Data model

I will offer two data modelling variants, each one having pros and cons.

  1. The first version uses the default auto-assigned MongoDB “_id”, and this simplifies inserts, since we can do it in batches without fearing of any timestamp clashing.
    If there are 10 values recorded each millisecond, then we will end up having 10 distinct documents. This post will discuss this data model option.

    {
    	"_id" : ObjectId("52cb898bed4bd6c24ae06a9e"),
    	"created_on" : ISODate("2012-11-02T01:23:54.010Z")
    	"value" : 0.19186609564349055
    }
  2. The second version uses the number of milliseconds since epoch as the “_id” field and the values are stored inside a “values” array.
    If there are 10 values recorded each millisecond, then we will end up having one distinct document with 10 entries in the “values” array. A future post will be dedicated to this compacted data model.

    {
            "_id" : 1348436178673,
            "values" : [
                    0.7518879524432123,
                    0.0017396819312125444
            ]
    }

Inserting data

Like in my previous post I will use 50M documents for testing the aggregation logic. I chose this number because I am testing on my commodity PC. In the aforementioned post I managed to insert over 80000 documents per second. This time I will take a more real-life approach and start by creating the collection and the indexes prior to inserting the data.

MongoDB shell version: 2.4.6
connecting to: random
> db.dropDatabase()
{ "dropped" : "random", "ok" : 1 }
> db.createCollection("randomData");
{ "ok" : 1 }
> db.randomData.ensureIndex({"created_on" : 1});
> db.randomData.getIndexes()
[
        {
                "v" : 1,
                "key" : {
                        "_id" : 1
                },
                "ns" : "random.randomData",
                "name" : "_id_"
        },
        {
                "v" : 1,
                "key" : {
                        "created_on" : 1
                },
                "ns" : "random.randomData",
                "name" : "created_on_1"
        }
]

Now it’s time to insert the 50M documents.

mongo random --eval "var arg1=50000000;arg2=1" create_random.js
...
Job#1 inserted 49900000 documents.
Job#1 inserted 50000000 in 2852.56s

This time we managed to import 17500 documents per second. At such rate we would require 550B entries a year, which is more than enough for our use case.

Compacting data

First, we need to analyze our collection statistics and for this we need to use the stats command:

db.randomData.stats()
{
        "ns" : "random.randomData",
        "count" : 50000000,
        "size" : 3200000096,
        "avgObjSize" : 64.00000192,
        "storageSize" : 5297451008,
        "numExtents" : 23,
        "nindexes" : 2,
        "lastExtentSize" : 1378918400,
        "paddingFactor" : 1,
        "systemFlags" : 1,
        "userFlags" : 0,
        "totalIndexSize" : 3497651920,
        "indexSizes" : {
                "_id_" : 1623442912,
                "created_on_1" : 1874209008
        },
        "ok" : 1
}

The current index size is almost 3.5GB and this is almost half of my available RAM. Luckily, MongoDB comes with a compact command, which we can use to defragment our data. This takes a lot of time, especially because we have a large total index size.

db.randomData.runCommand("compact");
Compacting took 1523.085s

Let’s see how much space we saved through compacting:

db.randomData.stats()
{
        "ns" : "random.randomData",
        "count" : 50000000,
        "size" : 3200000032,
        "avgObjSize" : 64.00000064,
        "storageSize" : 4415811584,
        "numExtents" : 24,
        "nindexes" : 2,
        "lastExtentSize" : 1149206528,
        "paddingFactor" : 1,
        "systemFlags" : 1,
        "userFlags" : 0,
        "totalIndexSize" : 2717890448,
        "indexSizes" : {
                "_id_" : 1460021024,
                "created_on_1" : 1257869424
        },
        "ok" : 1
}

We freed almost 800MB of data and that’s going to be handy for our RAM-intensive aggregation operations.

Explaining the aggregation logic

All four aggregation reports are similar, as they only differ by:

  1. the selecting time interval
  2. the group by time granularity

We can therefore start with the first report, which aggregates values by second. We will use the explain method to get a glance of our aggregation’s inner-workings.

load(pwd() + "/../../util/date_util.js");
var minDate = new Date(Date.UTC(2012, 1, 10, 11, 25, 30));
var maxDate = new Date(Date.UTC(2012, 1, 10, 11, 25, 35));
var result = db.randomData.runCommand('aggregate', { pipeline: 
[
	{
		$match: {
			"created_on" : {
				$gte: minDate, 
				$lt : maxDate	
			}
		}
	},
	{
		$project: {
			_id : 0,
			created_on : 1,
			value : 1
		}
	},
	{
		$group: {
				"_id": { 
					"year" : {
						$year : "$created_on"
					}, 
					"dayOfYear" : {
						$dayOfYear : "$created_on"
					},
					"hour" : {
						$hour : "$created_on"
					},
					"minute" : {
						$minute : "$created_on"
					},
					"second" : {
						$second : "$created_on"
					},
				}, 
				"count": { 
					$sum: 1 
				}, 
				"avg": { 
					$avg: "$value" 
				}, 
				"min": { 
					$min: "$value" 
				}, 
				"max": { 
					$max: "$value" 
				}		
			}
	},
	{
		$sort: { 
			"_id.year" : 1, 
			"_id.dayOfYear" : 1,
			"_id.hour" : 1,
			"_id.minute" : 1,
			"_id.second" : 1
		} 	
	}
], explain: true});
printjson(result);

Which outputs the following result

{
        "serverPipeline" : [
                {
                        "query" : {
                                "created_on" : {
                                        "$gte" : ISODate("2012-02-10T11:25:30Z"),
                                        "$lt" : ISODate("2012-02-10T11:25:35Z")
                                }
                        },
                        "projection" : {
                                "created_on" : 1,
                                "value" : 1,
                                "_id" : 0
                        },
                        "cursor" : {
                                "cursor" : "BtreeCursor created_on_1",
                                "isMultiKey" : false,
                                "n" : 5,
                                "nscannedObjects" : 5,
                                "nscanned" : 5,
                                "nscannedObjectsAllPlans" : 5,
                                "nscannedAllPlans" : 5,
                                "scanAndOrder" : false,
                                "indexOnly" : false,
                                "nYields" : 0,
                                "nChunkSkips" : 0,
                                "millis" : 0,
                                "indexBounds" : {
                                        "created_on" : [
                                                [
                                                        ISODate("2012-02-10T11:25:30Z"),
                                                        ISODate("2012-02-10T11:25:35Z")
                                                ]
                                        ]
                                },
                                "allPlans" : [
                                        {
                                                "cursor" : "BtreeCursor created_on_1",
                                                "n" : 5,
                                                "nscannedObjects" : 5,
                                                "nscanned" : 5,
                                                "indexBounds" : {
                                                        "created_on" : [
                                                                [
                                                                        ISODate("2012-02-10T11:25:30Z"),
                                                                        ISODate("2012-02-10T11:25:35Z")
                                                                ]
                                                        ]
                                                }
                                        }
                                ],
                                "oldPlan" : {
                                        "cursor" : "BtreeCursor created_on_1",
                                        "indexBounds" : {
                                                "created_on" : [
                                                        [
                                                                ISODate("2012-02-10T11:25:30Z"),
                                                                ISODate("2012-02-10T11:25:35Z")
                                                        ]
                                                ]
                                        }
                                },
                                "server" : "VLAD:27017"
                        }
                },
                {
                        "$project" : {
                                "_id" : false,
                                "created_on" : true,
                                "value" : true
                        }
                },
                {
                        "$group" : {
                                "_id" : {
                                        "year" : {
                                                "$year" : [
                                                        "$created_on"
                                                ]
                                        },
                                        "dayOfYear" : {
                                                "$dayOfYear" : [
                                                        "$created_on"
                                                ]
                                        },
                                        "hour" : {
                                                "$hour" : [
                                                        "$created_on"
                                                ]
                                        },
                                        "minute" : {
                                                "$minute" : [
                                                        "$created_on"
                                                ]
                                        },
                                        "second" : {
                                                "$second" : [
                                                        "$created_on"
                                                ]
                                        }
                                },
                                "count" : {
                                        "$sum" : {
                                                "$const" : 1
                                        }
                                },
                                "avg" : {
                                        "$avg" : "$value"
                                },
                                "min" : {
                                        "$min" : "$value"
                                },
                                "max" : {
                                        "$max" : "$value"
                                }
                        }
                },
                {
                        "$sort" : {
                                "sortKey" : {
                                        "_id.year" : 1,
                                        "_id.dayOfYear" : 1,
                                        "_id.hour" : 1,
                                        "_id.minute" : 1,
                                        "_id.second" : 1
                                }
                        }
                }
        ],
        "ok" : 1
}

The aggregation framework uses a pipes and filter design pattern, and our pipeline consists of the following operations:

  1. Match: This operation is similar to a WHERE SQL clause, and it is the first one we use since we make use of our “created_on” index (e.g. this is confirmed by the explain results: “cursor” : “BtreeCursor created_on_1″,). We are not using a covering-index (e.g. “indexOnly” : false) because that would be over-kill for our 8GB RAM set-up.
  2. Project: This operation is similar to a SELECT SQL clause, and it’s used for removing the “_id” field from our working set (which is useless for our reporting logic).
  3. Group: This operation is similar to a GROUP BY SQL clause, and it does all the calculation in memory. This is why we filtered the working-set prior to grouping it.
  4. Sort: This operation is similar to an ORDER BY SQL clause, and we use it to sort the results chronologically.

The base aggregation script

Since our four reports are similar we can group all the logic in a single script:

function printResult(dataSet) {
	dataSet.result.forEach(function(document)  {
		printjson(document);
	});
}

function aggregateData(fromDate, toDate, groupDeltaMillis, enablePrintResult) {		

	print("Aggregating from " + fromDate + " to " + toDate);

	var start = new Date();

	var groupBy = { 
		"year" : {
			$year : "$created_on"
		}, 
		"dayOfYear" : {
			$dayOfYear : "$created_on"
		}
	};

	var sortBy = { 
			"_id.year" : 1, 
			"_id.dayOfYear" : 1
	}; 	

	var appendSeconds = false;
	var appendMinutes = false;
	var appendHours = false;

	switch(groupDeltaMillis) {
		case ONE_SECOND_MILLIS :
			appendSeconds = true;			
		case ONE_MINUTE_MILLIS :
			appendMinutes = true;			
		case ONE_HOUR_MILLIS :
			appendHours = true;		
	}	

	if(appendHours) {
		groupBy["hour"] = {
			$hour : "$created_on"	
		};
		sortBy["_id.hour"] = 1;	
	}
	if(appendMinutes) {
		groupBy["minute"] = {
			$minute : "$created_on"	
		};
		sortBy["_id.minute"] = 1;
	}
	if(appendSeconds) {
		groupBy["second"] = {
			$second : "$created_on"	
		};
		sortBy["_id.second"] = 1;
	}	

	var pipeline = [
		{
			$match: {
				"created_on" : {
					$gte: fromDate, 
					$lt : toDate	
				}
			}
		},
		{
			$project: {
				_id : 0,
				created_on : 1,
				value : 1
			}
		},
		{
			$group: {
					"_id": groupBy, 
					"count": { 
						$sum: 1 
					}, 
					"avg": { 
						$avg: "$value" 
					}, 
					"min": { 
						$min: "$value" 
					}, 
					"max": { 
						$max: "$value" 
					}		
				}
		},
		{
			$sort: sortBy
		}
	];

	var dataSet = db.randomData.aggregate(pipeline);
	var aggregationDuration = (new Date().getTime() - start.getTime())/1000;	
	print("Aggregation took:" + aggregationDuration + "s");	
	if(dataSet.result != null && dataSet.result.length > 0) {
		print("Fetched :" + dataSet.result.length + " documents.");
		if(enablePrintResult) {
			printResult(dataSet);
		}
	}
	var aggregationAndFetchDuration = (new Date().getTime() - start.getTime())/1000;
	if(enablePrintResult) {
		print("Aggregation and fetch took:" + aggregationAndFetchDuration + "s");
	}	
	return {
		aggregationDuration : aggregationDuration,
		aggregationAndFetchDuration : aggregationAndFetchDuration
	};
}

Time for results

Let’s test the first three reports using the following script:

load(pwd() + "/../../util/date_util.js");
load(pwd() + "/aggregate_base_report.js");

var deltas = [ 
{
	matchDeltaMillis: ONE_MINUTE_MILLIS, 
	groupDeltaMillis: ONE_SECOND_MILLIS,
	description: "Aggregate all seconds in a minute"
},
{
	matchDeltaMillis: ONE_HOUR_MILLIS, 
	groupDeltaMillis: ONE_MINUTE_MILLIS,
	description: "Aggregate all minutes in an hour"
},
{
	matchDeltaMillis: ONE_DAY_MILLIS, 
	groupDeltaMillis: ONE_HOUR_MILLIS,
	description: "Aggregate all hours in a day"
}
];

var testFromDate = new Date(Date.UTC(2012, 5, 10, 11, 25, 59));

deltas.forEach(function(delta) {	
	print('Aggregating ' + description);
	var timeInterval = calibrateTimeInterval(testFromDate, delta.matchDeltaMillis);
	var fromDate = timeInterval.fromDate;
	var toDate = timeInterval.toDate;
	aggregateData(fromDate, toDate, delta.groupDeltaMillis, true);	
});

Giving us the following results:

MongoDB shell version: 2.4.6
connecting to: random
Aggregating Aggregate all seconds in a minute
Aggregating from Sun Jun 10 2012 14:25:00 GMT+0300 (GTB Daylight Time) to Sun Jun 10 2012 14:26:00 GMT+0300 (GTB Daylight Time)
Fetched :45 documents.
{
        "_id" : {
                "year" : 2012,
                "dayOfYear" : 162,
                "hour" : 11,
                "minute" : 25,
                "second" : 0
        },
        "count" : 1,
        "avg" : 0.4924355132970959,
        "min" : 0.4924355132970959,
        "max" : 0.4924355132970959
}
{
        "_id" : {
                "year" : 2012,
                "dayOfYear" : 162,
                "hour" : 11,
                "minute" : 25,
                "second" : 1
        },
        "count" : 1,
        "avg" : 0.10043778014369309,
        "min" : 0.10043778014369309,
        "max" : 0.10043778014369309
}
...
{
        "_id" : {
                "year" : 2012,
                "dayOfYear" : 162,
                "hour" : 11,
                "minute" : 25,
                "second" : 59
        },
        "count" : 1,
        "avg" : 0.16304525500163436,
        "min" : 0.16304525500163436,
        "max" : 0.16304525500163436
}
Aggregating from Sun Jun 10 2012 14:00:00 GMT+0300 (GTB Daylight Time) to Sun Jun 10 2012 15:00:00 GMT+0300 (GTB Daylight Time)
Fetched :60 documents.
{
        "_id" : {
                "year" : 2012,
                "dayOfYear" : 162,
                "hour" : 11,
                "minute" : 0
        },
        "count" : 98,
        "avg" : 0.4758610369979727,
        "min" : 0.004005654249340296,
        "max" : 0.9938081130385399
}
{
        "_id" : {
                "year" : 2012,
                "dayOfYear" : 162,
                "hour" : 11,
                "minute" : 1
        },
        "count" : 100,
        "avg" : 0.5217278444720432,
        "min" : 0.003654648782685399,
        "max" : 0.9981840122491121
}
...
{
        "_id" : {
                "year" : 2012,
                "dayOfYear" : 162,
                "hour" : 11,
                "minute" : 59
        },
        "count" : 92,
        "avg" : 0.5401836506308705,
        "min" : 0.01764239347539842,
        "max" : 0.9997266652062535
}
Aggregating Aggregate all hours in a day
Aggregating from Sun Jun 10 2012 03:00:00 GMT+0300 (GTB Daylight Time) to Mon Jun 11 2012 03:00:00 GMT+0300 (GTB Daylight Time)
Fetched :24 documents.
{
        "_id" : {
                "year" : 2012,
                "dayOfYear" : 162,
                "hour" : 0
        },
        "count" : 5727,
        "avg" : 0.4975644027204364,
        "min" : 0.00020139524713158607,
        "max" : 0.9997993060387671
}
{
        "_id" : {
                "year" : 2012,
                "dayOfYear" : 162,
                "hour" : 1
        },
        "count" : 5799,
        "avg" : 0.49519448930962623,
        "min" : 0.00011728447861969471,
        "max" : 0.9999530822969973
}
...
{
        "_id" : {
                "year" : 2012,
                "dayOfYear" : 162,
                "hour" : 23
        },
        "count" : 5598,
        "avg" : 0.49947314951339256,
        "min" : 0.00009276834316551685,
        "max" : 0.9999523421283811
}

Stay tuned, my next post will show you how to optimize these aggregation queries.

 

Vlad Mihalcea

Vlad Mihalcea is a software architect passionate about software integration, high scalability and concurrency challenges.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

2 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Ankita
9 years ago

What can be done in aggregation to group by hour (hour as per locale time not in UTC)

For example

{_id: {year: 2014, month: 5, day: 2, hour 2}, # this year, month, day, hour should be from ISTTime not ISODate Format.
count : 10}

Vlad Mihalcea
9 years ago

UTC has support for time zones:

http://en.wikipedia.org/wiki/ISO_8601#Time_offsets_from_UTC

I haven’t tried it in the aggregation framework, so if it doesn’t support it maybe you should open an issue.

Back to top button