The DIY guide for local-global aggregation

The DIY guide for local-global aggregation

If you have read the title, you will not be surprised when I tell you that this blog is about a thing called “local-global aggregation”, and on how to do that yourself. So let’s start with the two obvious questions: what the heck is local-global aggregation anyway, and why the heck would you ever want to do it yourself?

What is local-global aggregation

The term local-global aggregation may sound scary, but it’s not that hard to understand. You might even already know the concept if you have ever heard the arguably most used explanation of parallelism. You have a jar of beans and need to count the beans. Instead of doing it all by yourself, you trick a few of your gullible friends into helping you, with the promise of a nice beer after an “easy task”. Each of them gets a few scoops of beans, counts them, and writes down the number they have. You collect the papers, sum up the numbers, and the result is the amount of beans that was in the jar before you started scooping. Congratulations! You have just done a local-global aggregation.

A more technical explanation that wraps this example back to SQL Server (or, in fact, any data processing application) is that a set of data is divided into subgroups (your friends each getting their share of beans). Each of these subgroups is then aggregated on its own (your friends counting their beans); this is the “local” part of the aggregation, also called “partial” aggregation (a term that in my opinion actually better describes what is going on, but unfortunately “local” is the more commonly used term). The last step is to combine the separate local aggregation results into a single final result (you tallying up the numbers counted by your friends); the “global” aggregation step. Note that the overall amount of work is not less when using this pattern. However, because the work can now be spread over multiple workers, the task still finishes faster. On a single-threaded platform, it would not make sense to work like this.

The basic concept sounds simple, but it can get more complex depending on what type of aggregation you need done. What if you need to know the minimum and maximum weight of the beans in the jar? You could buy a few precision scales, gather your friends again, and have each of them start weighing beans to find the minimum and maximum in their share; you would then just select the lowest and highest weight from all their results to get your final result. But how about the average weight? You could once more get out the scales and gather your friends (assuming you still have any after making them weigh thousands of beans for a single beer for the previous exercise) and have each of them compute the average weight for their share of beans … but then what? How do you get the global average from the local averages? Problem is: you can’t. To understand why, here is an extremely simple example, using just two friends and three beans. Friend 1 gets two beans weighing 1.1 and 1.3 gram; friend 2 gets the third bean which weighs 1.5 gram. You get two results from them: an average weight of 1.2 and an average weight of 1.5. Now how do you combine those two numbers to get the correct average of all beans, which is 1.3? Again, you can’t.

This does not mean that you can’t use local-global aggregation to get an average, it just means that you have to be smarter. You should have asked your friends to tell you the number of beans and their total weight. From those local aggregates, you could then compute the global aggregates, total weight and total number of beans (3.9 gram and 3 beans), and then computed the average by dividing those numbers (3.9 / 3 = 1.3 gram). So to do a correct local-global aggregation for an average aggregate, you have to get different aggregates at the local level, which can then be combined in the correct formula to get the aggregate you need. For an average, this is fairly simple. But once we start looking at statistical aggregate functions such as standard deviation and variance, the trick becomes more complicated. The basic trick remains the same (compute some other aggregate at the local level, then combine with the appropriate formula), but the actual formula itself becomes more complex. More on that later.

Local-global aggregation done by the optimizer

You usually don’t need to worry about local-global aggregation. Minds far greater than ours have done all the hard work and implemented this logic in the query optimizer, so everything works smoothly without us having to do anything special for it.

The most obvious place where you expect this to happen is in parallel plans. Here is a very simple example, based on the Microsoft Contoso BI Demo database which can be found here. (I normally prefer to base my examples on AdventureWorks, but the tables in that database are too small to get parallelism on simple example queries).

SELECT AVG(TotalCost)

FROM dbo.FactOnlineSales;

The execution plan for this extremely simple query looks like this:

image

The query has just a single aggregation, but the plan has two Stream Aggregate operators. The rightmost Stream Aggregate, running in the parallel section, does the local aggregation. If you open the properties and look at the Defined Values aggregate, you will see that it computes two aggregates: “partialagg1004=Count(*)” and “partialagg1006=SUM(TotalCost)” (I have simplified these slightly from the actual expressions in the plan). To the left, in the serial section of the plan, a second Stream Aggregate operator computes the global aggregates: “globalagg1005=SUM(partialagg1004)” and “globalagg1007=SUM(partialagg1006)”. So globalagg1005 is the total number of rows, and globalagg1007 is the grand total of all TotalCost values. The Compute Scalar operator then uses those two global aggregates to compute the average that the query actually requested: “Expr1003=CASE WHEN globalagg=0 THEN NULL ELSE globalagg1007 / globalagg1005” – in other words, total cost divided by number of rows, or NULL if the number of rows was zero.

Parallel plans are not the only place where the optimizer will use local-global aggregation. In some other queries it can use this as a trick to save work. See the query below for an example (this time I did base the query on AdventureWorks, because I want to have a serial plan).

SELECT     p.Color, SUM(sod.OrderQty)

FROM       Sales.SalesOrderDetail AS sod

INNER JOIN Production.Product AS p

      ON   p.ProductID = sod.ProductID

GROUP BY   p.Color;

The query requests an aggregation by colour, but because colour isn’t stored in the SalesOrderDetail table we need to join to the Product table. The most straightforward execution plan for this would look something like this (which I made up by using copy and paste from various other plans – we can’t all know all undocumented trace flags by head):

image

This made-up plan is a very straightforward implementation of the query: join every row from SalesOrderHeader to every row from DimProduct, then aggregate by colour and present the results. In this plan, the join has to operate on all 121,317 rows from SalesOrderDetail, which is estimated to contribute over a quarter of the total cost of the plan.

The execution plan that you actually get for the query above (when running on SQL Server 2012) looks like this:

image

You will see two key differences: far less input to the join operator (just 266 rows), and two instead of just one operators for a local-global aggregation pattern. The Hash Match (Aggregate) operator to the right and in the lower branch does the local aggregation, and the Stream Aggregate operator to the left does the global aggregation. I will not dive into the exact Defined Values properties for each of them, as they are pretty basic (since the query merely requests a SUM). In this case, it is far more interesting to look at the grouping level.

The global aggregate obviously has to group by colour, as requested in the query, and this is confirmed if you look at the Group By property of the Stream Aggregate. But what happens in the local aggregation in the Hash Match (Aggregate) operator? The Product table is not joined yet so we do not have the colour available. To see how the groups are defined here, we’ll have to look at the Hash Keys Build property of this operator, which tells us that the local aggregation groups by ProductID.

What happens here is that the global aggregation, which has a group for each colour, is subdivided into a group for each product. Some colours might have be used for just a single product, but most are used in multiple products. You can see this by looking at the number of row property of various operators – after grouping by ProductID the 121,317 rows from SalesOrderDetails are reduced to just 266 rows for 266 distinct products, and after the final aggregation we are left with just 9 rows, for nine colours.

As before, the local-global aggregation pattern does not save work by itself; it actually introduces a bit of extra work. And in this case the plan is serial so we never have more than one worker. And yet this pattern makes sense here – because the extra work introduced  by the local-global aggregation is far less than the work saved by having to join 266 instead of 121,317 rows.

Local-global aggregation done by you

So far I have shown you what local-global aggregation is, and how the optimizer will use this when using this pattern can improve the overall query performance. It is now time to address the “DIY” part of the title.

There may be situations where a query might benefit from local-global aggregation, but the optimizer can’t or doesn’t use it by itself. This happens very often when you rewrite queries on a columnstore table to work around the limitations of batch mode execution; I cover this extensively in levels 10 and 11 of the Stairway to Columnstore Indexes (not yet published at the time of writing this blog post). It can also, less frequently, happen in other situations.

To see an example, let’s return to the AdventureWorks sample database. Two of its tables are Production.TransactionHistory and Production.TransactionHistoryArchive. The second one is used for older data that will not change anymore and is less frequently queried. There are lots of ways to implement such a business need; this one was chosen for AdventureWorks (and that I have also seen in a lot of real companies).

The management dashboard includes a view on these tables that shows aggregated data from these tables, as follows:

CREATE VIEW DashBoard_ProductsSold

WITH SCHEMABINDING

AS

SELECT   ProductID, SUM(Quantity) AS TotalSold

FROM    (SELECT ProductID, Quantity

         FROM   Production.TransactionHistory

         UNION ALL

         SELECT ProductID, Quantity

         FROM   Production.TransactionHistoryArchive) AS c

GROUP BY ProductID;

Lately, the DBA team has noticed that more and more managers get access to this dashboard, and some tend to refresh quite often. And every time the dashboard is refreshed, all 200K rows in the two tables have to be read. This is starting to affect overall performance of the system, so we want to index this view. The benefit of an indexed view is that the results are stored, so now refreshing the dashboard is just a simple index scan. The downside is that the stored results need to be updated whenever data changes so there will be some overhead on modifications. In this case, we expect this overhead to be outweighed by the saved IO and processing whenever the dashboard is loaded or refreshed.

However, when we try to index the view we get an error:

CREATE UNIQUE CLUSTERED INDEX cix_DashBoard_ProductsSold

ON dbo.DashBoard_ProductsSold (ProductID);

Msg 10109, Level 16, State 1, Line 1

Cannot create index on view “AdventureWorks2012.dbo.DashBoard_ProductsSold” because it references derived table “c” (defined by SELECT statement in FROM clause). Consider removing the reference to the derived table or not indexing the view.

There are lots of limitations for indexed views, and this is one of them. Because the data is in two different tables, we have to combine it and then aggregate the combined result. That isn’t possible without using a derived table or another similar (and also forbidden) construction, so we will have to find another way to optimize the dashboard. And that’s where DIY local-global aggregation comes in. In this case the data already is divided in two smaller groups (the two tables); instead of combining the data and then aggregating, we can aggregate each tables individually and then combine the results. Let’s show this in two steps.

The first step is for the local aggregation. For this, we need to create two new views:

CREATE VIEW DashBoard_ProductsSold_History

WITH SCHEMABINDING

AS

SELECT   ProductID, SUM(Quantity) AS TotalSold

       , COUNT_BIG(*) AS NumRows            — Added to satisfy indexed view req’ment

FROM     Production.TransactionHistory

GROUP BY ProductID;

GO

CREATE VIEW DashBoard_ProductsSold_History_Archive

WITH SCHEMABINDING

AS

SELECT   ProductID, SUM(Quantity) AS TotalSold

       , COUNT_BIG(*) AS NumRows            — Added to satisfy indexed view req’ment

FROM     Production.TransactionHistoryArchive

GROUP BY ProductID;

These two views can each be indexed without problems:

CREATE UNIQUE CLUSTERED INDEX cix_DashBoard_ProductsSold_History

ON dbo.DashBoard_ProductsSold_History (ProductID);

CREATE UNIQUE CLUSTERED INDEX cix_DashBoard_ProductsSold_History_Archive

ON dbo.DashBoard_ProductsSold_History_Archive (ProductID);

And then, as the final step, we change the original view to return the same results by performing global aggregation on the locally aggregated data from the new views:

ALTER VIEW DashBoard_ProductsSold

WITH SCHEMABINDING

AS

SELECT   ProductID, SUM(TotalSold) AS TotalSold

FROM    (SELECT ProductID, TotalSold

         FROM   dbo.DashBoard_ProductsSold_History WITH (NOEXPAND)

         UNION ALL

         SELECT ProductID, TotalSold

         FROM   dbo.DashBoard_ProductsSold_History_Archive WITH (NOEXPAND)) AS c

GROUP BY ProductID;

(The NOEXPAND hints are added to ensure that this works on any edition of SQL Server; on the Enterprise and Developer editions the hints are not required but they do not hurt either).

The last view cannot be indexed because of the derived table. But this view does not need to be indexed, because the local aggregation views already are indexed. If you query this view and check the execution plan, you’ll see less than 1000 rows read from the two views, as opposed to the 200K rows we were reading before.

With these same local-aggregated indexed views as input, I can easily extend the dashboard to also show an average, or a count of rows:

ALTER VIEW DashBoard_ProductsSold

WITH SCHEMABINDING

AS

SELECT   ProductID,

         SUM(TotalSold) AS TotalSold,

         SUM(NumRows) AS NumRows,

         SUM(TotalSold) * 1.0 / SUM(NumRows) AS AvgSold

FROM    (SELECT ProductID, TotalSold, NumRows

         FROM   dbo.DashBoard_ProductsSold_History WITH (NOEXPAND)

         UNION ALL

         SELECT ProductID, TotalSold, NumRows

         FROM   dbo.DashBoard_ProductsSold_History_Archive WITH (NOEXPAND)) AS c

GROUP BY ProductID;

Adding a minimum or a maximum is slightly more work because I would need to add the MIN and MAX functions to the local-aggregated views, but even this is hardly rocket science. However, things gets complicated when management asks to include more advanced statistical information.

Those pesky statistical functions

SQL Server offers out of the box four statistical functions that are more advanced then AVG: VAR, VARP, STDEV, and STDEVP. These, too, can be used in a local-global aggregation pattern. But that is more complex than for the aggregate functions we have seen so far.

It is possible that you, like me, are not a daily user of these functions. And it is even possible that your high school days are so far back that you don’t really remember what they are about. But if you work with SQL Server, then I believe that you should at least know that they exist and (roughly) know what they are and what they are used for. A very short explanation is that both variance and standard deviation are a measure of how spread out the values are. The set of numbers {0, 50, 100} has the same average as {49, 50, 51}. But the distribution is very different, and the standard deviation and variance indicate that difference.

The variance is defined in terms of the difference between each value and the average. The typical algorithm is to first calculate the average, then return to the data set and find the difference between each value and that average. The standard deviation is simply the square root of the variance. A small added complexity is that there is a subtle change in the calculation depending on whether you compute it based on all data or on a sample – that is why SQL Server has VAR and STDEV for the variance and standard deviation of a sample and VARP and STDEVP for the variance and standard deviation of the full population (the extra P stands for population). See here for a slightly deeper (but still fairly accessible) explanation.

The problem with the standard algorithm for variance and standard deviation is that it requires two passes over the data: first the average must be found, and then we need a second pass to subtract each value from that average. For computers, where I/O typically costs more than computation, we don’t want to use two passes. Luckily, smart mathematicians have found many other algorithms to compute the same result, and some of them can be implemented with a single pass over the data. The formula that SQL Server uses internally when executing a query that uses a statistical aggregate function is the so-called “Naïve algorithm”, and this is also the formulas that we will use for our DIY local-global aggregation.

Without going too deep into the formulas, I will now show the code that you can use for the local-global aggregation pattern on statistical functions. Note that in this case I have to drop and recreate all views because an indexed view does not support ALTER. Also note that the views needs to use an ISNULL function because SQL Server misinterprets the SQUARE function as being nullable even when the input is not nullable; this nullability would trigger yet another limitation on indexed views.

DROP VIEW DashBoard_ProductsSold;

DROP VIEW DashBoard_ProductsSold_History;

DROP VIEW dbo.DashBoard_ProductsSold_History_Archive;

GO

CREATE VIEW DashBoard_ProductsSold_History

WITH SCHEMABINDING

AS

SELECT   ProductID,

         SUM(Quantity) AS TotalSold,

         SUM(ISNULL(SQUARE(Quantity),0)) AS TotalSquared,

         COUNT_BIG(*) AS NumRows

FROM     Production.TransactionHistory

GROUP BY ProductID;

GO

CREATE VIEW DashBoard_ProductsSold_History_Archive

WITH SCHEMABINDING

AS

SELECT   ProductID,

         SUM(Quantity) AS TotalSold,

         SUM(ISNULL(SQUARE(Quantity),0)) AS TotalSquared,

         COUNT_BIG(*) AS NumRows

FROM     Production.TransactionHistoryArchive

GROUP BY ProductID;

GO

CREATE UNIQUE CLUSTERED INDEX cix_DashBoard_ProductsSold_History

ON dbo.DashBoard_ProductsSold_History (ProductID);

CREATE UNIQUE CLUSTERED INDEX cix_DashBoard_ProductsSold_History_Archive

ON dbo.DashBoard_ProductsSold_History_Archive (ProductID);

GO

CREATE VIEW DashBoard_ProductsSold

WITH SCHEMABINDING

AS

SELECT   ProductID,

         SUM(TotalSold) AS TotalSold,

         SUM(NumRows) AS NumRows,

         SUM(TotalSold) * 1.0 / SUM(NumRows) AS AvgSold,

         CASE WHEN SUM(NumRows) > 0

              THEN (SUM(TotalSquared) – (SQUARE(SUM(TotalSold)) / SUM(NumRows))) / SUM(NumRows)

         END                            AS [VarP],

         CASE WHEN SUM(NumRows) > 0

              THEN SQRT((SUM(TotalSquared) – (SQUARE(SUM(TotalSold)) / SUM(NumRows))) / SUM(NumRows))

         END                            AS [StDevP],

         CASE WHEN SUM(NumRows) > 1

              THEN (SUM(TotalSquared) – (SQUARE(SUM(TotalSold)) / SUM(NumRows))) / (SUM(NumRows) – 1)

         END                            AS [Var],

         CASE WHEN SUM(NumRows) > 1

              THEN SQRT((SUM(TotalSquared) – (SQUARE(SUM(TotalSold)) / SUM(NumRows))) / (SUM(NumRows) – 1))

         END                            AS [StDev]

FROM    (SELECT ProductID, TotalSold, TotalSquared, NumRows

         FROM   dbo.DashBoard_ProductsSold_History WITH (NOEXPAND)

         UNION ALL

         SELECT ProductID, TotalSold, TotalSquared, NumRows

         FROM   dbo.DashBoard_ProductsSold_History_Archive WITH (NOEXPAND)) AS c

GROUP BY ProductID;

GO

A small footnote – the square root expression in the calculation for standard deviation can, by mathematical standards, never be negative. But rounding errors, especially with floating point data, introduce a theoretical possibility that the computer runs into a small negative number, which could cause a run-time error. The chance of this happening is incredibly small, but if you want to make sure that this never happens, use this longer but more defensive version instead:

ALTER VIEW DashBoard_ProductsSold

WITH SCHEMABINDING

AS

SELECT   ProductID,

         SUM(TotalSold) AS TotalSold,

         SUM(NumRows) AS NumRows,

         SUM(TotalSold) * 1.0 / SUM(NumRows) AS AvgSold,

         CASE WHEN SUM(NumRows) = 0

              THEN NULL

              WHEN (SUM(TotalSquared) – (SQUARE(SUM(TotalSold)) / SUM(NumRows))) / SUM(NumRows) < 0

              THEN 0

              ELSE (SUM(TotalSquared) – (SQUARE(SUM(TotalSold)) / SUM(NumRows))) / SUM(NumRows)

         END                            AS [VarP],

         CASE WHEN SUM(NumRows) = 0

              THEN NULL

              WHEN (SUM(TotalSquared) – (SQUARE(SUM(TotalSold)) / SUM(NumRows))) / SUM(NumRows) < 0

              THEN 0

              ELSE SQRT((SUM(TotalSquared) – (SQUARE(SUM(TotalSold)) / SUM(NumRows))) / SUM(NumRows))

         END                            AS [StDevP],

         CASE WHEN SUM(NumRows) <= 1

              THEN NULL

              WHEN (SUM(TotalSquared) – (SQUARE(SUM(TotalSold)) / SUM(NumRows))) / (SUM(NumRows) – 1) < 0

              THEN 0

              ELSE (SUM(TotalSquared) – (SQUARE(SUM(TotalSold)) / SUM(NumRows))) / (SUM(NumRows) – 1)

         END                            AS [Var],

         CASE WHEN SUM(NumRows) <= 1

              THEN NULL

              WHEN (SUM(TotalSquared) – (SQUARE(SUM(TotalSold)) / SUM(NumRows))) / (SUM(NumRows) – 1) < 0

              THEN 0

              ELSE SQRT((SUM(TotalSquared) – (SQUARE(SUM(TotalSold)) / SUM(NumRows))) / (SUM(NumRows) – 1))

         END                            AS [StDev]

FROM    (SELECT ProductID, TotalSold, TotalSquared, NumRows

         FROM   dbo.DashBoard_ProductsSold_History WITH (NOEXPAND)

         UNION ALL

         SELECT ProductID, TotalSold, TotalSquared, NumRows

         FROM   dbo.DashBoard_ProductsSold_History_Archive WITH (NOEXPAND)) AS c

GROUP BY ProductID;

Note that in real cases, the source of the local aggregation can be diverse. It can be two or more views as in the example above, but it can also be a subquery with a different grouping level than the final query, or some other subquery, view or CTE. In all cases, the steps to build a DIY aggregate for variance and standard deviation are the same: ensure that the local aggregation computes the three ingredients (count of rows, sum of values, and sum of squares), then use the appropriate formula as shown above for the final result.

Conclusion

Instead of computing an aggregate function of a full data set, it is sometimes a better idea to divide the data in smaller portions, aggregate each individually, and then compute the final aggregates by combining the aggregations of the smaller sets (the partial aggregates). This is called local-global aggregation.

There are cases where the optimizer will do this automatically for you. Parallelism is a prime example of this, but the optimizer can also use local-global aggregation for other optimizations.

In some cases you need to explicitly force local-global aggregation in your queries. The batch execution mode, one of the reasons for the huge performance gain that columnstore indexes can achieve, has lots of limitations that require query rewrites for optimal performance; these often necessitate local-global aggregation patterns in the query. And though many of the batch mode limitations that existed in SQL Server 2012 were fixed in later versions, there are even in SQL Server 2016 still a few queries that need help to get batch mode execution.

Even when not using columnstore indexes, there are still situations that require us to write a manual local-global aggregation pattern. An example of this based on a current and an archive table is used in this blog post to demonstrate the methods to do this.

For the aggregate functions that are most commonly known, the local-global aggregation pattern is fairly easy to implement and understand. For more advanced statistical aggregate functions, such as standard deviation and variance, the formulas to use can get quite complex. In this blog post I showed a query that includes all these formulas.

I hope this blog post can help you when you ever need to write a query with a local-global aggregation pattern – regardless of whether you know that term or not!

How TOP wrecks performance (part 2)
Upcoming presentations. And an offer.

Related Posts

No results found.

5 Comments. Leave new

  • The links aren’t links, just styles…

    Reply
  • Small correction: MIN and MAX can’t be indexes because it’s not possible to efficiently remove rows from these aggregates. Funnily, you said it’s not rocket science when in fact it is impossible 🙂

    Reply
  • Hugo Kornelis
    September 15, 2016 23:48

    @Paul: Thanks for pointing that out. Somehow, the publishing software I use for this blog sometimes decides that links arer highly overrated and changes them to styles. I’ll change them back to be proper links.

    @Tobi: You arer right, I messed this up. The blog post is intended to be a generic guide on DIY local-global aggregation, and there are definitely cases (for instance, when trying to force batch mode for queries on columnstore tables) where you can apply this method on MIN/MAX.

    When I decided to use an example that is realistic, and not on columnstore (because I already have lots of those examples in the columnstore stairway at SQL Server Central), I did not realize that by adding view indexability into the equation, I introduced an unintended contradiction.

    So I guess that the accurate text would have been that MIN and MAX arre impossible to do with indexed views, but that in other cases of local-global aggegation you can use the same technique (and that is still not rocket science).

    Thanks for keeping me sharp!

    Reply
  • […] Local Global Aggregation […]

    Reply
  • […] If you run this query in ContosoRetailDW, you will see a rather large execution plan. In this post I will focus on the right-hand side, where the three tables are read and joined. (The left-hand side is where the aggregation happens, using a local-global aggregation pattern that I already blogged about in 2016). […]

    Reply

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.

By continuing to use the site, you agree to the use of cookies. more information

The cookie settings on this website are set to "allow cookies" to give you the best browsing experience possible. If you continue to use this website without changing your cookie settings or you click "Accept" below then you are consenting to this.

Close