The Aggregation framework

The Aggregation framework

And here is the Week 6’s course notes:

The aggregation framework is a set of analytics tools within MongoDB that allows us to run various types of reports or analysis on documents in one or more collections. Based on the idea of a pipeline. We take input from a MongoDB collection and pass the documents from that collection through one or more stages, each of which performs a different operation on it’s inputs. Each stage takes as input whatever the stage before it produced as output. And the inputs and outputs for all stages are a stream of documents. Each stage has a specific job that it does. It’s expecting a specific form of document and produces a specific output, which is itself a stream of documents. At the end of the pipeline, we get access to the output.

aggregation framework stage

An individual stage is a data processing unit. Each stage takes as input a stream of documents one at a time, processes each document one at a time and produces the output stream of documents. Again, one at a time. Each stage provide a set of knobs or tunables that we can control to parameterize the stage to perform whatever task we’re interested in doing. So a stage performs a generic task - a general purpose task of some kind and parameterize the stage for the particular set of documents that we’re working with. And exactly what we would like that stage to do with those documents. These tunables typically take the form of operators that we can supply that will modify fields, perform arithmetic operations, reshape documents or do some sort of accumulation task as well as a veriety of other things. Often times, it the case that we’ll want to include the same type of stage multiple times within a single pipeline.

same type of stage multiple times within a single pipeline

e.g. We may wish to perform an initial filter so that we don’t have to pass the entire collection into our pipeline. But, then later on, following some additional processing, want to filter once again using a different set of criteria. So, to recap, pipeline works with a MongoDB collection. They’re composed of stages, each of which does a different data processing task on it’s input and produces documents as output to be passed to the next stage. And finally at the end of the pipeline output is produced that we can then do something within our application. In many cases, it’s necessary to include the same type of stage, multiple times within an individual pipeline.

Familiar Aggregation Operations

As our first step in developing aggregation pipelines, what we’d like to do is take a look at building some pipelines that involve operations that are already familiar to us. So, we’re going to look at the following stages:

  • match - this is filtering stage, similar to find.
  • project
  • sort
  • skip
  • limit

We might ask ourself why these stages are necessary, given that this functionality is already provided in the MongoDB query language, and the reason is because we need these stages to support the more complex analytics-oriented functionality that’s included with the aggregation framework. The below query is simply equal to a find:


db.companies.aggregate([{
  $match: {
    founded_year: 2004
  }
}, ])

Let’s introduce a project stage in this aggregation pipeline:


db.companies.aggregate([{
  $match: {
    founded_year: 2004
  }
}, {
  $project: {
    _id: 0,
    name: 1,
    founded_year: 1
  }
}])

We use aggregate method for implementing aggregation framework. The aggregation pipelines are merely an array of documents. Each of the document should stipulate a particular stage operator. So, in the above case we’ve an aggregation pipeline with two stages. The $match stage is passing the documents one at a time to $project stage.


db.companies.aggregate([{
  $match: {
    "funding_rounds.investments.financial_org.permalink": "greylock"
  }
}, {
  $project: {
    _id: 0,
    name: 1,
    ipo: "$ipo.pub_year",
    valuation: "$ipo.valuation_amount",
    funders: "$funding_rounds.investments.financial_org.permalink"
  }
}, ])

In the above example, we’re promoting deeply nested fields to upper level in the output we’ll produce from this aggregation pipeline. If we specify $1 in the quotes, MongoDB interprets it as give me the value identified by this key. We cannot change the datatype for a value from the project stage.


db.companies.aggregate([{
  $match: {
    "funding_rounds.investments.financial_org.permalink": "greylock"
  }
}, {
  $project: {
    _id: 0,
    name: 1,
    founded: {
      year: "$founded_year",
      month: "$founded_month",
      day: "$founded_day"
    }
  }
}, ])

In this case, we’re taking the top level founded_year, founded_month & founded_day documents and showing them as part of the nested document founded. Now, let’s extend to limit stage:


db.companies.aggregate([{
  $match: {
    founded_year: 2004
  }
}, {
  $limit: 5
}, {
  $project: {
    _id: 0,
    name: 1
  }
}])

This gets the matching documents and limits to five before projecting out the fields. So, projection is working only on 5 documents. Assume, if we were to do something like this:


db.companies.aggregate([{
  $match: {
    founded_year: 2004
  }
}, {
  $project: {
    _id: 0,
    name: 1
  }
}, {
  $limit: 5
}])

This gets the matching documents and projects those large number of documents and finally limits to five. So, projection is working on large number of documents and finally limiting to 5. This gives us a lesson that we should limit the documents to those which are absolutely necessary to be passed to the next stage. Now, let’s look at sort stage:


db.companies.aggregate([{
  $match: {
    founded_year: 2004
  }
}, {
  $sort: {
    name: 1
  }
}, {
  $limit: 5
}, {
  $project: {
    _id: 0,
    name: 1
  }
}])

This will sort all documents by name and give only 5 out of them. Assume, if we were to do something like this:


db.companies.aggregate([{
  $match: {
    founded_year: 2004
  }
}, {
  $limit: 5
}, {
  $sort: {
    name: 1
  }
}, {
  $project: {
    _id: 0,
    name: 1
  }
}])

This will take first 5 documents and sort them.

Let’s add the skip stage:


db.companies.aggregate([{
  $match: {
    founded_year: 2004
  }
}, {
  $sort: {
    name: 1
  }
}, {
  $skip: 10
}, {
  $limit: 5
}, {
  $project: {
    _id: 0,
    name: 1
  }
}, ])

This will sort all the documents and skip the initial 10 documents and return to us. We should try to include $match stages as early as possible in the pipeline. To filter documents using a $match stage, we use the same syntax for constructing query documents (filters) as we do for find().

The $unwind stage

The $unwind stage

The $unwind allows us to take documents as input that have an array valued field and produces output documents, such that there’s one output document for each element in the array. source

So let’s go back to our companies examples, and take a look at the use of unwind stages. This query:


db.companies.aggregate([
    { $match: {"funding_rounds.investments.financial_org.permalink": "greylock" } },
    { $project: {
        _id: 0,
        name: 1,
        amount: "$funding_rounds.raised_amount",
        year: "$funding_rounds.funded_year"
    } }
])

produces documents that have arrays for both amount and year.

project output

Because we’re accessing the raised amount and the funded year for every element within the funding rounds array. To fix this, we can include an unwind stage before our project stage in this aggregation pipeline, and parameterize this by saying that we want to unwind the funding rounds array:


db.companies.aggregate([
    { $match: {"funding_rounds.investments.financial_org.permalink": "greylock" } },
    { $unwind: "$funding_rounds" },
    { $project: {
        _id: 0,
        name: 1,
        amount: "$funding_rounds.raised_amount",
        year: "$funding_rounds.funded_year"
    } }
])

unwind has the effect of outputting to the next stage more documents than it receives as input

This is how the document looks like:

original document

If we look at the funding_rounds array, we know that for each funding_rounds, there is a raised_amount and a funded_year field. So, unwind will for each one of the documents that are elements of the funding_rounds array produce an output document. Now, in this example, our values are strings. But, regardless of the type of value for the elements in an array, unwind will produce an output document for each one of these values, such that the field in question will have just that element. In the case of funding_rounds, that element will be one of these documents as the value for funding_rounds for every document that gets passed on to our project stage. The result, then of having run this, is that now we get an amount and a year. One for each funding round for every company in our collection. What this means is that our match produced many company documents and each one of those company documents results in many documents. One for each funding round within every company document. unwind performs this operation using the documents handed to it from the match stage. And all of these documents for every company are then passed to the project stage.

unwind output

So, all documents where the funder was Greylock (as in the query example) will be split into a number of documents, equal to the number of funding rounds for every company that matches the filter $match: {"funding_rounds.investments.financial_org.permalink": "greylock" }. And each one those resulting documents will then be passed along to our project. Now, unwind produces an exact copy for every one of the documents that it receives as input. All fields have the same key and value, with one exception, and that is that the funding_rounds field rather than being an array of funding_rounds documents, instead has a value that is a single document, which is an individual funding round. So, a company that has 4 funding rounds will result in unwind creating 4 documents. Where every field is an exact copy, except for the funding_rounds field, which will instead of being an array for each of those copies will instead be an individual element from the funding_rounds array from the company document that unwind is currently processing. So, unwind has the effect of outputting to the next stage more documents than it receives as input. What that means is that our project stage now gets a funding_rounds field that again, is not an array, but is instead a nested document that has a raised_amount and a funded_year field. So, project will receive multiple documents for each company matching the filter and can therefore process each of the documents individually and identify an individual amount and year for each funding round for each company.

We’ll add an additional field for understanding this and in doing so, we’ll identify a little bit of a problem with this aggregation query as currently written. So, what I’m going to do is a funder field and this will access the investments field of the funding_rounds embedded document, that it gets from unwind and for the financial_org gets the permalink (refer to the $match filter in the below snippet).


// Add funder to output documents.
db.companies.aggregate([
    { $match: {"funding_rounds.investments.financial_org.permalink": "greylock" } },
    { $unwind: "$funding_rounds" },
    { $project: {
        _id: 0,
        name: 1,
        funder: "$funding_rounds.investments.financial_org.permalink",
        amount: "$funding_rounds.raised_amount",
        year: "$funding_rounds.funded_year"
    } }
])

Notice, that funder and $match are very similar. We need to be sure that the funder is what we have stipulated. In general, when building aggregation pipelines - it’s a good idea to put in checks as we’re constructing them to make sure they’re doing what we think they’re doing. The output looks like this:

full output with funder details

If, we look at the document and look at the investments field. We find that it’s an array:

investments array

Multiple funders can participate in a single funding round.

multiple funders

So, investments will list every one of those funders. The output, as we originally saw with the raised_amount and funded_year - we’re now seeing an array for funder. Because, investments is an array valued field and as we know, the semantics for a project, on an array valued field is to produce all of the values for whatever field we’ve stipulated.

The above query returns companies for which at least in one of the funding rounds, Greylock participated in, what we’d like to do is constrain our results so that we only see results that Greylock participated in. Not, all the companies for which Greylock participated in at least one. So, in order to do that, what we’re going to have to do is figure out a way to filter this further. One possibility is to reverse the order in which we’re doing our $unwind and $match:


// Add second unwind stage.
db.companies.aggregate([
    { $unwind: "$funding_rounds" },
    { $match: {"funding_rounds.investments.financial_org.permalink": "greylock" } },
    { $project: {
        _id: 0,
        name: 1,
        funder: "$funding_rounds.investments.financial_org.permalink",
        amount: "$funding_rounds.raised_amount",
        year: "$funding_rounds.funded_year"
    } },
])

This will guarantee that we’ll only match documents coming out of $unwind that represent funding rounds that Greylock actually participated in. If we run this, we see a slight delay - but then we see that Greylock is one of the funders. To make it more clear, we can include a second $unwind:


// Add second unwind stage.
db.companies.aggregate([
    { $unwind: "$funding_rounds" },
    { $unwind: "$funding_rounds.investments" },
    { $match: {"funding_rounds.investments.financial_org.permalink": "greylock" } },
    { $project: {
        _id: 0,
        name: 1,
        funder: "$funding_rounds.investments.financial_org.permalink",
        amount: "$funding_rounds.raised_amount",
        year: "$funding_rounds.funded_year"
    } },
])

Which outputs this:

double unwind output

So, what are these two $unwind stages doing? Both of these are running through the entire collection. However, the $match operation should occur as early as possible. So, that for each stage we have least number of documents to work with.

So, what we can do is - leave the $match filter as a first stage in our pipeline and simply include a second match:


// Instead, use a second match stage.
db.companies.aggregate([
    { $match: {"funding_rounds.investments.financial_org.permalink": "greylock" } },
    { $unwind: "$funding_rounds" },
    { $unwind: "$funding_rounds.investments" },
    { $match: {"funding_rounds.investments.financial_org.permalink": "greylock" } },
    { $project: {
        _id: 0,
        name: 1,
        individualFunder: "$funding_rounds.investments.person.permalink",
        fundingOrganization: "$funding_rounds.investments.financial_org.permalink",
        amount: "$funding_rounds.raised_amount",
        year: "$funding_rounds.funded_year"
    } },
])

The first $match will return company documents for which Greylock participated in at least one of the funding rounds. We’ll then unwind the funding_rounds and the investments nested array. And then finally filter again - so that any funding rounds, any documents that represent funding rounds, that Greylock did not participated in will be removed from what’s passed on to project. Sometimes, we need to include multiple stages of the same type. This query is a bit faster because of less documents.

Array Expressions

These expressions are used to work with arrays and can be used with project stages. There are a couple of different array expressions such as:

  • $filter for selecting a subset of elements in the array based on a certain set of filter criteria that should be passed, in the documents, passed to the next stage in the aggregation pipeline:
 
 db.companies.aggregate([{
   $match: {
     "funding_rounds.investments.financial_org.permalink": "greylock"
   }
 }, {
   $project: {
     _id: 0,
     name: 1,
     founded_year: 1,
     rounds: {
       $filter: {
         input: "$funding_rounds",
         as: "round",
         cond: {
           $gte: ["$$round.raised_amount", 100000000]
         }
       }
     }
   }
 }, {
   $match: {
     "rounds.investments.financial_org.permalink": "greylock"
   }
 }, ]).pretty()
 
 

Here is where things gets interesting. We’re using a $filter expression - it is designed to work with array fields and has 3 fields we must supply as part of it’s parameters, or this document that we set as the value for our $filter operator. The first is input which is an array, as specifies the alias we need to use for the input array throughout the rest of the filter expression. And the cond parameter will provide the condition used to filter whatever array we’ve provided as input selecting a subset. In the above case, we’re selecting elements where raised_amount is greater than or equal to 100 million. There’s $$round - where $ refers to a variable named round. $$ says that we want to dereference a variable specified in this expression. This is to disambiguate the reference to a variable from a reference to, say, fields in the input document. Returns an array with only those elements that match the condition. The returned elements are in the original order.

Let’s look at $arrayElemAt for returning the element at the specified array index. Let’s pull out the first round and the last round (maybe for comparison).


db.companies.aggregate([{
  $match: {
    "founded_year": 2010
  }
}, {
  $project: {
    _id: 0,
    name: 1,
    founded_year: 1,
    first_round: {
      $arrayElemAt: ["$funding_rounds", 0]
    },
    last_round: {
      $arrayElemAt: ["$funding_rounds", -1]
    }
  }
}]).pretty()

This requires the array name and the index of the element which needs to be returned. 0 means the first one and -1 refers to last one. -2 would give the second to last or penultimate element in the array. The output will look likes this:

arrayElemAt output MongoDB

This can also be done by $slice:


db.companies.aggregate([{
  $match: {
    "founded_year": 2010
  }
}, {
  $project: {
    _id: 0,
    name: 1,
    founded_year: 1,
    first_round: {
      $slice: ["$funding_rounds", 1]
    },
    last_round: {
      $slice: ["$funding_rounds", -1]
    }
  }
}]).pretty()

Now, related to $arrayElemAt is the $slice expression which allows us to return multiple items from an array in sequence beginning with a particular index.


db.companies.aggregate([{
  $match: {
    "founded_year": 2010
  }
}, {
  $project: {
    _id: 0,
    name: 1,
    founded_year: 1,
    early_rounds: {
      $slice: ["$funding_rounds", 1, 3]
    }
  }
}]).pretty()

Notice that the index is 1, skipping the first (which is 0) funding round.

slice output in MongoDB

The last one we need to see is $size - which simply returns the size of the array:


db.companies.aggregate([{
  $match: {
    "founded_year": 2004
  }
}, {
  $project: {
    _id: 0,
    name: 1,
    founded_year: 1,
    total_rounds: {
      $size: "$funding_rounds"
    }
  }
}]).pretty()

$size output MongoDB

Accumulators

Accumulators are another type of expressions. They involve calculating values from fields in multiple documents. Prior to MongoDB 3.2 accumulators were available only in the group stage. We’ve ability to access a subset of accumulators within the project stage. The primary difference between the accumulators in the group stage and the project stage is that in the project stage accumulators, such as $sum and $avg must operate on arrays within a single document. Whereas accumulators in the group stage provide us the ability to perform calculations on values across multiple documents.

Using Accumulators in $project Stages

There’re multiple accumulators. The $max accumulator gives the largest value from the passed in array.


db.companies.aggregate([{
  $match: {
    "funding_rounds": {
      $exists: true,
      $ne: []
    }
  }
}, {
  $project: {
    _id: 0,
    name: 1,
    largest_round: {
      $max: "$funding_rounds.raised_amount"
    }
  }
}]).pretty()

$max operator in MongoDB

The below examples shows the uses of sum & avg:


db.companies.aggregate([{
  $match: {
    "funding_rounds": {
      $exists: true,
      $ne: []
    }
  }
}, {
  $project: {
    _id: 0,
    name: 1,
    total_funding: {
      $sum: "$funding_rounds.raised_amount"
    }
  }
}])

db.companies.aggregate([{
    $group: {
      _id: {
        founded_year: "$founded_year"
      },
      average_number_of_employees: {
        $avg: "$number_of_employees"
      }
    }
  }, {
    $sort: {
      average_number_of_employees: -1
    }
  }
])

Introduction to $group

$group is similar to SQL Group by command. In the below example, we’re going to aggregate companies on the basis of the year in which they were founded. And calculate the average number of employees for each company.


db.companies.aggregate([{
    $group: {
      _id: {
        founded_year: "$founded_year"
      },
      average_number_of_employees: {
        $avg: "$number_of_employees"
      }
    }
  }, {
    $sort: {
      average_number_of_employees: -1
    }
  }
])

$avg operator MongoDB

This aggregation pipeline has 2 stages

  1. $group
  2. $sort

Now, fundamental to the $group stage is the _id field that we specify as the part of the document. That is the value of the $group operator itself using a very strict interpretation of the arrogation framework syntax. _id is how we define, how we control, how we tune what the group stage uses to organize the documents that it sees.

The below query find the relationships of the people with companies using $sum operator:


db.companies.aggregate([{
  $match: {
    "relationships.person": {
      $ne: null
    }
  }
}, {
  $project: {
    relationships: 1,
    _id: 0
  }
}, {
  $unwind: "$relationships"
}, {
  $group: {
    _id: "$relationships.person",
    count: {
      $sum: 1
    }
  }
}, {
  $sort: {
    count: -1
  }
}])

$sum in MongoDB

_id in $group Stages

We’re going to understand the _id field within the $group stage & look at some best practices for constructing _ids in group aggregation stages. Let’s look at this query:


db.companies.aggregate([{
  $match: {
    founded_year: {
      $gte: 2010
    }
  }
}, {
  $group: {
    _id: {
      founded_year: "$founded_year"
    },
    companies: {
      $push: "$name"
    }
  }
}, {
  $sort: {
    "_id.founded_year": 1
  }
}]).pretty()

MongoDB $group with document approach

One thing which might not be clear to us is why the _id field is constructed this “document” way? We could have done it this way as well:


db.companies.aggregate([{
  $match: {
    founded_year: {
      $gte: 2010
    }
  }
}, {
  $group: {
    _id: "$founded_year",
    companies: {
      $push: "$name"
    }
  }
}, {
  $sort: {
    "_id": 1
  }
}]).pretty()

MongoDB $group without document approach

We don’t do it this way, because in these output documents - it’s not explicit what exactly this number means. So, we actually don’t know. And in some cases, that means there maybe confusion in interpreting these documents. So, another case maybe to group an _id document with multiple fields:


db.companies.aggregate([{
  $match: {
    founded_year: {
      $gte: 2010
    }
  }
}, {
  $group: {
    _id: {
      founded_year: "$founded_year",
      category_code: "$category_code"
    },
    companies: {
      $push: "$name"
    }
  }
}, {
  $sort: {
    "_id.founded_year": 1
  }
}]).pretty()

group an _id document with multiple fields in MongoDB

$push simply pushes the elements to generating arrays. Often, it might be required to group on promoted fields to upper level:


db.companies.aggregate([{
  $group: {
    _id: {
      ipo_year: "$ipo.pub_year"
    },
    companies: {
      $push: "$name"
    }
  }
}, {
  $sort: {
    "_id.ipo_year": 1
  }
}]).pretty()

group on promoted fields to upper level in MongoDB

It’s also perfect to have an expression that resolves to a document as a _id key.

db.companies.aggregate([{
  $match: {
    "relationships.person": {
      $ne: null
    }
  }
}, {
  $project: {
    relationships: 1,
    _id: 0
  }
}, {
  $unwind: "$relationships"
}, {
  $group: {
    _id: "$relationships.person",
    count: {
      $sum: 1
    }
  }
}, {
  $sort: {
    count: -1
  }
}])

expression that resolves to a document

$group vs. $project

Not all accumulators are available in $project stage. We need to consider what we can do in project with respect to accumulators and what we can do in group. Let’s take a look at this:


db.companies.aggregate([{
  $match: {
    funding_rounds: {
      $ne: []
    }
  }
}, {
  $unwind: "$funding_rounds"
}, {
  $sort: {
    "funding_rounds.funded_year": 1,
    "funding_rounds.funded_month": 1,
    "funding_rounds.funded_day": 1
  }
}, {
  $group: {
    _id: {
      company: "$name"
    },
    funding: {
      $push: {
        amount: "$funding_rounds.raised_amount",
        year: "$funding_rounds.funded_year"
      }
    }
  }
}, ]).pretty()

$group in MongoDB

Where we’re checking if any of the funding_rounds is not empty. Then it’s unwind-ed to $sort and to later stages. We’ll see one document for each element of the funding_rounds array for every company. So, the first thing we’re going to do here is to $sort based on:

  1. funding_rounds.funded_year
  2. funding_rounds.funded_month
  3. funding_rounds.funded_day

In the group stage by company name, the array is getting built using $push. $push is supposed to be part of a document specified as the value for a field we name in a group stage. We can push on any valid expression. In this case, we’re pushing on documents to this array and for every document that we push it’s being added to the end of the array that we’re accumulating. In this case, we’re pushing on documents that are built from the raised_amount and funded_year. So, the $group stage is a stream of documents that have an _id where we’re specifying the company name.

Notice that $push is available in $group stages but not in $project stage. This is because $group stages are designed to take a sequence of documents and accumulate values based on that stream of documents.

$project on the other hand, works with one document at a time. So, we can calculate an average on an array within an individual document inside a project stage. But doing something like this where one at a time, we’re seeing documents and for every document, it passes through the group stage pushing on a new value, well that’s something that the $project stage is just not designed to do. For that type of operation we want to use $group.

Let’s take a look at another example:


db.companies.aggregate([{
  $match: {
    funding_rounds: {
      $exists: true,
      $ne: []
    }
  }
}, {
  $unwind: "$funding_rounds"
}, {
  $sort: {
    "funding_rounds.funded_year": 1,
    "funding_rounds.funded_month": 1,
    "funding_rounds.funded_day": 1
  }
}, {
  $group: {
    _id: {
      company: "$name"
    },
    first_round: {
      $first: "$funding_rounds"
    },
    last_round: {
      $last: "$funding_rounds"
    },
    num_rounds: {
      $sum: 1
    },
    total_raised: {
      $sum: "$funding_rounds.raised_amount"
    }
  }
}, {
  $project: {
    _id: 0,
    company: "$_id.company",
    first_round: {
      amount: "$first_round.raised_amount",
      article: "$first_round.source_url",
      year: "$first_round.funded_year"
    },
    last_round: {
      amount: "$last_round.raised_amount",
      article: "$last_round.source_url",
      year: "$last_round.funded_year"
    },
    num_rounds: 1,
    total_raised: 1,
  }
}, {
  $sort: {
    total_raised: -1
  }
}]).pretty()

group vs project in MongoDB

In the $group stage, we’re using $first and $last accumulators. Right, again we can see that as with $push - we can’t use $first and $last in project stages. Because again, project stages are not designed to accumulate values based on multiple documents. Rather they’re designed to reshape documents one at a time. Total number of rounds is calculated using the $sum operator. The value 1 simply counts the number of documents passed through that group together with each document that matches or is grouped under a given _id value. The project may seem complex, but it’s just making the output pretty. It’s just that it’s including num_rounds and total_raised from the previous document.

Photos