Parallelism (Repartition Streams)

Introduction

The Parallelism operator, also known as Exchange operator, is used to exchange rows between parallel and serial zones of an execution plan, or between parallel zones with different requirements as to how the rows are distributed. The Parallelism operator has three different Logical Operations, that are each represented with different icons in the graphical execution plan. For ease of understanding, each of the three Logical Operations will be described on its own page here.

The Repartition Streams operation of the Parallelism operator makes the operator read rows from a parallel zone, so from multiple threads. It outputs those rows into another parallel zone, so once more distributed across multiple threads, but with a different distribution. The two other logical operations are Distribute Streams (that reads from a serial zone and outputs to a parallel zone) and Gather Streams (that reads from a parallel zone and outputs to a serial zone).

For a good primer on the basics of parallelism, please read this article by Paul White.

Visual appearance in execution plans

Depending on the tool being used, a Parallelism operator with its Logical Operation set to Repartition Streams is displayed in a graphical execution plan as shown below:

SSMS and ADS
(current versions)

Legacy SSMS

Plan Explorer

Paste The Plan

Icon for Parallelism (Repartition Streams) in Plan Explorer

Algorithm

The Parallelism operator always runs multiple processes on multiple threads at the same time. When the Logical Operation is set to Repartition Streams, then Parallelism has one process on each thread on the side that is shown on the right in graphical execution plans, connecting to the child operator of the Parallelism operator on that thread. This side is called the “producer side”. On the other side, called the “consumer side”, there is also one process on each thread, this time connecting to the parent operator of the Parallelism operator on that thread.

Producer side

On the producer side, the same process runs on each thread. This process reads rows from the child operator, stores them in holding areas called “packets” (of which there is one for each consumer side thread), and initiates a transfer of the packet to the consumer side process when a packet is full.

The basic algorithm for the producer side process of a Parallelism operator with its Logical Operation set to Repartition Streams is as shown below:

Flowchart for producer side process of Parallelism (Repartition Streams)Note that this flowchart is a simplification. It does not show the details of what happens when the producer side process tries to push a packet of rows to the consumer side (this is shown in more detail below). It also doesn’t show the details of how a parallel plan starts up its execution. (Paul White has written an excellent series of articles to describe specifically the startup phase: Part 1, Part 2, Part 3, Part 4, Part 5).

Predicate?

The Parallelism operator supports an optional Predicate property. When present, the supplied predicate is applied to every row read in the producer side process. Only rows where the predicate evaluates to True are processed; if the predicate is False or Unknown, the row is discarded.
So far, I have only ever observed Predicate expressions with a PROBE function in their expression.

Determine target thread(s)

Perhaps the most important step in the producer side flowchart is the determination of the correct thread (or packet) that a row needs to be sent to. The Parallelism operator supports five different ways to distribute the rows; which one is used depends on the Partitioning Type property.

These are the five supported partitioning types:

  • Hash: One or more columns from the input, as indicated by the Partition Columns property, are hashed to find a number between 1 and the degree of parallelism. This number is the number of the packet that the row is stored in.
    This partitioning type is typically used when a parallel zone includes operators that require rows with the same value in certain columns to be on the same thread (such as join or aggregation operators).
  • Round Robin: The first row read is stored in packet 1, the second in packet 2, and so on. Once all packets have received their first row, it wraps back to packet 1.
    This partitioning type is typically used when there are no functional reasons to have rows on specific threads. A benefit of round robin partitioning is that each thread will receive the same number of rows. This does not automatically prevent skew, though, as some rows might require more work than others, and some threads may receive less CPU cycles than others due to other concurrent processes running on the same core.
  • Broadcast: Identical copies of each row read are stored in all packets.
    This partitioning type is typically used when the input is estimated to be very small in size. By making a copy of this small input available on all threads, the rest of the execution plan can often be simplified. When the cardinality of the input is severely misestimated and its actual size is much larger, then this can cause performance issues due to doing the same work on all those rows on each thread in the parallel zone.
  • Range: One column from the input, as indicated by the Partition Columns property, is used to determine the number of the packet that the row is stored in, based on consecutive and non-overlapping ranges. Values in the first range go to packet 1, values in the second range to packet 2, etc.
    This partitioning type is rare; it is, as far as currently known, only used in certain parallel index build execution plans and in parallel update statistics plans.
    The boundaries for the defined ranges are not exposed in either the graphical execution plan, or the execution plan xml. I assume that they are stored in the internal representation of the execution plan, but not represented in the execution plan xml.
  • Demand: Each row is sent to the first consumer thread that asks for rows. That makes this partitioning type the only one that effectively creates a pull-based distribution. Because the distribution is demand based, skew is almost always avoided.
    This partitioning type is rare; it is only used in execution plans with partitioned tables.
    I have so far never been able to reproduce an execution plan that uses this partitioning type, so I am not aware of any specific properties that might occur in this case. Please let me know if you have seen this partitioning type and can share an execution plan or some repro code with me.

Push packet

When one of the processes on the producer side has completely filled the packet for one of the consumer side processes, or when there are partially filled packets when the end of data has been reached, the producer pushes the packet to the consumer. This process of pushing a packet to the consumer has its own logic, as follows:

Flowchart for the "push packet" action in the producer side algorithm of any Parallelism operatorHere, the “Empty packet?” test checks whether the packet corresponding to this thread on the consumer process is empty. If this is not the case, then the consumer side process is still working on the rows sent from this thread in a previous packet, and the producer side on this thread will have to wait until an empty packet is once more available. The producer side processes on the other threads do not have to wait for this; they can continue to work.

Once the consumer side packet is empty, the “Push packet” process moves the data in the packet on the producer side to the packet on the consumer side, by first copying the data to the packet on the consumer side, and then removing it from the packet on the producer side.

Wait (until packet available)

When the consumer side process has not yet finished processing all rows in a packet before the next packet for the same thread is already available, then the producer side process on this thread cannot continue until the consumer side finishes processing these rows. It enters a wait state, where the process is taken off the processor (so that other SQL Server tasks can use it), and waits until the consumer side has an empty packet available for receiving new rows.

These are the infamous CXPACKET waits. Ideally, we would hope that the consumer side processes are able to finish work on all rows in a packet before the next packet from the same input thread is ready, so there should be little to no CXPACKET waits. But that is not always the case, for instance when the operators in the parallel section on the consumer side are more expensive than those on the producer side (in other words, when it takes more effort to process a row than to produce one), or when the data distribution on either side is skewed. When the Parallelism operator is set to be merging (see below), CXPACKET waits can be expected to be even higher.

Consumer side

On the consumer side, the same process runs on each thread. This process reads data from all of the packets (of which there is one for each producer side thread). It combines them in one of two possible ways (“merging” or “non-merging”) and returns them to its parent operator.

The basic algorithm for the consumer side process of a Parallelism operator with its Logical Operation set to Repartition Streams is as shown below:

Flowchart for consumer side process of Parallelism (Repartition Streams)Note that this flowchart is a simplification. It doesn’t show that execution stops whenever a row is returned, and resumes where it was upon next being called.

Merging?

The Parallelism operator with its Logical Operation set to Repartition Streams can operate in two different ways:

  • Merging: A merging Parallelism operator is used when the input streams on all threads are guaranteed to be in a specific order, and the optimizer needs that order to be retained. This introduces some overhead for the Parallelism operator, but it saves the cost of (re)sorting the data after it has been redistributed across the output threads.
    A Parallelism operator is merging when its Order By property is present. The optimizer ensures that the input streams are all already in the correct order; the Parallelism operator is not able to actively reorder data on any of its input streams, it merely combines the data in such a way that the existing order as specified in the Order By property is preserved.
    All descriptions in this text are based on an ascending sort order, but a descending sort order is supported as well. Or even a mix of ascending and descending if the Order By property specifies more than one column.
  • Non-merging: A non-merging Parallelism operator is free to return the data to its parent operator in any order. It will not actively reorder rows, but it will simply return the first row available as soon as it is available. This reduces the amount of CXPACKET and CXCONSUMER waits as compared to a merging Parallelism (Repartition Streams) operator.
    A Parallelism operator is non-merging when it does not have an Order By

Data in ALL/ANY packets?

When a Parallelism operator with its Logical Operation set to Repartition Streams is set to merge its sorted inputs, then any of the consumer side processes can only safely return a row when it can see the next row from each of its input threads. This means that there has to be data in the packets for all input threads. After all, if just one packet is empty, there is no way to tell whether the next row that will be received in that packet has a lower value in its Order By columns than the first available row in all of the other packets.

When a Parallelism operator with its Logical Operation set to Repartition Streams does not have to merge its inputs, then each consumer side process can safely return rows to its parent from any packet that has data, so there is no need to wait until all packets have data.

Note that when a producer side process sends its end of data signal, this signal is also included in the packet, and never removed. This packet is then considered to hold data for the purpose of deciding if a consumer side process has to wait.

Wait (until data in ALL/ANY packets)

When a consumer side process has to wait for data in its packets, then it either waits until there is data in all packets (for a merging Parallelism operator), or until there is data in at least one packet (for a non-merging Parallelism operator).

These waits used to be marked as CXPACKET waits, but since Service Pack 2 for SQL Server 2016 and Cumulative Update 3 for SQL Server 2017, they have been relabeled as CXCONSUMER waits. These waits will  typically be low when the Parallelism operator is non-merging, unless the child operators on the producer side have to put more work into producing a row than it takes the parent operators on the consumer side to process a row. In that case, these waits are considered benign, because the input processes on all of the threads in the parallel section on the producer side can continue to run without waiting.

For a merging Parallelism operator, CXCONSUMER waits will be higher, because the process can stall, in the worst case even on all consumer side processes, when just any of the input threads is stalling. When this does not affect the processes on any of the threads on the child side of the Parallelism operator, then this is considered a benign wait. However, it is also possible that, while one or more of the consumer side processes are waiting for more data from a producer side thread, another producer side thread already has a new packet of data available to be pushed. Now that thread on the parallel side is halted, with a CXPACKET wait (see above).

In all cases, the CXCONSUMER waits are not a reason to worry. (Before these waits were relabeled as CXCONSUMER, it was a lot harder to troubleshoot parallelism, because benign and actionable waits were all registered under the same wait type).

Read row from packet

Each consumer side process of a Parallelism operator with its Logical Operation set to Repartition Streams has multiple packets, one for each process on the producer side. So when it reads data, it can choose from which packet to read. How this choice is made depends on the behavior of the operator:

  • Merging: A merging Parallelism operator compares the values in the Order By columns of the first row in each of the packets, to find which packet currently holds the lowest Order By It then reads the row from that packet.
    In case of a tie between one or more packets (same value in all Order By columns), it will choose one of the packets with this lowest value. Which of the candidates is selected is not documented.
  • Non-merging: A non-merging Parallelism operator can select any of the packets that still hold at least one row of data. Which of the candidates is selected is not documented.

Operator properties

The properties below are specific to the Parallelism operator with its Logical Operation set to Repartition Streams, or have a specific meaning when appearing on it. For all other properties, see Common properties. Properties that are included on the Common properties page but are also included below for their specific meaning for the Constant Scan operator are marked with a *.

Property nameDescription
Order ByThis property can be present or absent. When it is absent, the Parallelism operator will use its non-merging operation.
When the Order By property is included, it will list one or more columns, each with a “Descending” or “Ascending” specification. The data streams on all input threads will be ordered according to that specification, and the Parallelism will use its merging operation to ensure that this order is retained in all output streams.
See the main text for more details.
Partition ColumnsWhen the Partitioning Type property is set to Hash or Range, then the Partitioning Columns property lists the column or columns that are used to determine what thread each row gets sent to. All rows with the same value in each of the columns in this property are guaranteed to go to the same thread.
Partitioning TypeThe Partitioning Type property determines how rows will be distributed across the threads on the consumer side. The five available values for Partitioning Type and their meaning are described in the main text.
PredicateWhen present, this property defines a logical test to be applied to all rows that the producer side processes read. Only rows for which the predicate evaluates to True are returned.

Implicit properties

This table below lists the behavior of the implicit properties for the Parallelism operator with its Logical Operation set to Repartition Streams.

Property nameDescription
Batch Mode enabledThe Parallelism operator with its Logical Operation set to Repartition Streams only appears in row mode execution plans, or in row mode sections of batch mode execution plans. Batch mode (sections of) execution plans use a different model for parallelism, where no Parallelism operators are needed.
In SQL Server 2012 only, batch mode sections of execution plans did include Parallelism operators with their Logical Operation set to Repartition Streams, and their Estimated Execution Mode set to Row. They never executed, unless at run time a batch mode Hash Match operator spilled to tempdb, causing execution to fall back to row mode. In an execution plan plus run-time statistics, these would have a Number of Executions of 0, and no value for their Actual Execution Mode or any other Actual property, unless a hash spill, and therefore a fallback to row mode, actually happened.
Since SQL Server 2014, a spill of a batch mode Hash Match operator no longer causes a fallback to row mode, so these extra Parallelism operators are no longer included in batch mode execution plans.
BlockingThe Parallelism operator is non-blocking.
Memory requirementThe Parallelism operator with its Logical Operation set to Repartition Streams requires enough memory to store a number of packets equal to the degree of parallelism for each process on the producer side, and a number of packets equal to the degree of parallelism for each process on the consumer side.
Unlike most other operators with high memory consumption, the Parallelism operator does not expose the Memory Fractions and Memory Usage properties.
Order-preservingThe Parallelism operator with its Logical Operation set to Repartition Streams is only order-preserving when it is merging. When it is non-merging, then the output of this operator is always considered unordered.
See the main text for more details on merging versus non-merging behavior.
Parallelism awareThe Parallelism operator is only used in parallel execution plans, and is fully parallelism aware.
Segment awareThe Parallelism operator is not segment aware.

By continuing to use the site, you agree to the use of cookies. more information

The cookie settings on this website are set to "allow cookies" to give you the best browsing experience possible. If you continue to use this website without changing your cookie settings or you click "Accept" below then you are consenting to this.

Close