Skip to content
Open
37 changes: 36 additions & 1 deletion crates/core/src/expr/grouping_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::logical_expr::GroupingSet;
use datafusion::logical_expr::{Expr, GroupingSet};
use pyo3::prelude::*;

use crate::expr::PyExpr;

#[pyclass(
from_py_object,
frozen,
Expand All @@ -30,6 +32,39 @@ pub struct PyGroupingSet {
grouping_set: GroupingSet,
}

#[pymethods]
impl PyGroupingSet {
#[staticmethod]
#[pyo3(signature = (*exprs))]
fn rollup(exprs: Vec<PyExpr>) -> PyExpr {
Expr::GroupingSet(GroupingSet::Rollup(
exprs.into_iter().map(|e| e.expr).collect(),
))
.into()
}

#[staticmethod]
#[pyo3(signature = (*exprs))]
fn cube(exprs: Vec<PyExpr>) -> PyExpr {
Expr::GroupingSet(GroupingSet::Cube(
exprs.into_iter().map(|e| e.expr).collect(),
))
.into()
}

#[staticmethod]
#[pyo3(signature = (*expr_lists))]
fn grouping_sets(expr_lists: Vec<Vec<PyExpr>>) -> PyExpr {
Expr::GroupingSet(GroupingSet::GroupingSets(
expr_lists
.into_iter()
.map(|list| list.into_iter().map(|e| e.expr).collect())
.collect(),
))
.into()
}
}

impl From<PyGroupingSet> for GroupingSet {
fn from(grouping_set: PyGroupingSet) -> Self {
grouping_set.grouping_set
Expand Down
23 changes: 19 additions & 4 deletions crates/core/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -791,9 +791,10 @@ aggregate_function!(var_pop);
aggregate_function!(approx_distinct);
aggregate_function!(approx_median);

// Code is commented out since grouping is not yet implemented
// https://github.com/apache/datafusion-python/issues/861
// aggregate_function!(grouping);
// The grouping function's physical plan is not implemented, but the
// ResolveGroupingFunction analyzer rule rewrites it before the physical
// planner sees it, so it works correctly at runtime.
aggregate_function!(grouping);

#[pyfunction]
#[pyo3(signature = (sort_expression, percentile, num_centroids=None, filter=None))]
Expand Down Expand Up @@ -831,6 +832,19 @@ pub fn approx_percentile_cont_with_weight(
add_builder_fns_to_aggregate(agg_fn, None, filter, None, None)
}

#[pyfunction]
#[pyo3(signature = (sort_expression, percentile, filter=None))]
pub fn percentile_cont(
sort_expression: PySortExpr,
percentile: f64,
filter: Option<PyExpr>,
) -> PyDataFusionResult<PyExpr> {
let agg_fn =
functions_aggregate::expr_fn::percentile_cont(sort_expression.sort, lit(percentile));

add_builder_fns_to_aggregate(agg_fn, None, filter, None, None)
}

// We handle last_value explicitly because the signature expects an order_by
// https://github.com/apache/datafusion/issues/12376
#[pyfunction]
Expand Down Expand Up @@ -1031,6 +1045,7 @@ pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(approx_median))?;
m.add_wrapped(wrap_pyfunction!(approx_percentile_cont))?;
m.add_wrapped(wrap_pyfunction!(approx_percentile_cont_with_weight))?;
m.add_wrapped(wrap_pyfunction!(percentile_cont))?;
m.add_wrapped(wrap_pyfunction!(range))?;
m.add_wrapped(wrap_pyfunction!(array_agg))?;
m.add_wrapped(wrap_pyfunction!(arrow_typeof))?;
Expand Down Expand Up @@ -1080,7 +1095,7 @@ pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(from_unixtime))?;
m.add_wrapped(wrap_pyfunction!(gcd))?;
m.add_wrapped(wrap_pyfunction!(greatest))?;
// m.add_wrapped(wrap_pyfunction!(grouping))?;
m.add_wrapped(wrap_pyfunction!(grouping))?;
m.add_wrapped(wrap_pyfunction!(in_list))?;
m.add_wrapped(wrap_pyfunction!(initcap))?;
m.add_wrapped(wrap_pyfunction!(isnan))?;
Expand Down
167 changes: 167 additions & 0 deletions docs/source/user-guide/common-operations/aggregations.rst
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,168 @@ Suppose we want to find the speed values for only Pokemon that have low Attack v
f.avg(col_speed, filter=col_attack < lit(50)).alias("Avg Speed Low Attack")])


Grouping Sets
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much clearer!

-------------

The default style of aggregation produces one row per group. Sometimes you want a single query to
produce rows at multiple levels of detail — for example, totals per type *and* an overall grand
total, or subtotals for every combination of two columns plus the individual column totals. Writing
separate queries and concatenating them is tedious and runs the data multiple times. Grouping sets
solve this by letting you specify several grouping levels in one pass.

DataFusion supports three grouping set styles through the
:py:class:`~datafusion.expr.GroupingSet` class:

- :py:meth:`~datafusion.expr.GroupingSet.rollup` — hierarchical subtotals, like a drill-down report
- :py:meth:`~datafusion.expr.GroupingSet.cube` — every possible subtotal combination, like a pivot table
- :py:meth:`~datafusion.expr.GroupingSet.grouping_sets` — explicitly list exactly which grouping levels you want

Because result rows come from different grouping levels, a column that is *not* part of a
particular level will be ``null`` in that row. Use :py:func:`~datafusion.functions.grouping` to
distinguish a real ``null`` in the data from one that means "this column was aggregated across."
It returns ``0`` when the column is a grouping key for that row, and ``1`` when it is not.

Rollup
^^^^^^

:py:meth:`~datafusion.expr.GroupingSet.rollup` creates a hierarchy. ``rollup(a, b)`` produces
grouping sets ``(a, b)``, ``(a)``, and ``()`` — like nested subtotals in a report. This is useful
when your columns have a natural hierarchy, such as region → city or type → subtype.

Suppose we want to summarize Pokemon stats by ``Type 1`` with subtotals and a grand total. With
the default aggregation style we would need two separate queries. With ``rollup`` we get it all at
once:

.. ipython:: python

from datafusion.expr import GroupingSet

df.aggregate(
[GroupingSet.rollup(col_type_1)],
[f.count(col_speed).alias("Count"),
f.avg(col_speed).alias("Avg Speed"),
f.max(col_speed).alias("Max Speed")]
).sort(col_type_1.sort(ascending=True, nulls_first=True))

The first row — where ``Type 1`` is ``null`` — is the grand total across all types. But how do you
tell a grand-total ``null`` apart from a Pokemon that genuinely has no type? The
:py:func:`~datafusion.functions.grouping` function returns ``0`` when the column is a grouping key
for that row and ``1`` when it is aggregated across.

.. note::

Due to an upstream DataFusion limitation
(`apache/datafusion#21411 <https://github.com/apache/datafusion/issues/21411>`_),
``.alias()`` cannot be applied directly to a ``grouping()`` expression — it will raise an
error at execution time. Instead, use
:py:meth:`~datafusion.dataframe.DataFrame.with_column_renamed` on the result DataFrame to
give the column a readable name. Once the upstream issue is resolved, you will be able to
use ``.alias()`` directly and the workaround below will no longer be necessary.

The raw column name generated by ``grouping()`` contains internal identifiers, so we use
:py:meth:`~datafusion.dataframe.DataFrame.with_column_renamed` to clean it up:

.. ipython:: python

result = df.aggregate(
[GroupingSet.rollup(col_type_1)],
[f.count(col_speed).alias("Count"),
f.avg(col_speed).alias("Avg Speed"),
f.grouping(col_type_1)]
)
for field in result.schema():
if field.name.startswith("grouping("):
result = result.with_column_renamed(field.name, "Is Total")
result.sort(col_type_1.sort(ascending=True, nulls_first=True))

With two columns the hierarchy becomes more apparent. ``rollup(Type 1, Type 2)`` produces:

- one row per ``(Type 1, Type 2)`` pair — the most detailed level
- one row per ``Type 1`` — subtotals
- one grand total row

.. ipython:: python

df.aggregate(
[GroupingSet.rollup(col_type_1, col_type_2)],
[f.count(col_speed).alias("Count"),
f.avg(col_speed).alias("Avg Speed")]
).sort(
col_type_1.sort(ascending=True, nulls_first=True),
col_type_2.sort(ascending=True, nulls_first=True)
)

Cube
^^^^

:py:meth:`~datafusion.expr.GroupingSet.cube` produces every possible subset. ``cube(a, b)``
produces grouping sets ``(a, b)``, ``(a)``, ``(b)``, and ``()`` — one more than ``rollup`` because
it also includes ``(b)`` alone. This is useful when neither column is "above" the other in a
hierarchy and you want all cross-tabulations.

For our Pokemon data, ``cube(Type 1, Type 2)`` gives us stats broken down by the type pair,
by ``Type 1`` alone, by ``Type 2`` alone, and a grand total — all in one query:

.. ipython:: python

df.aggregate(
[GroupingSet.cube(col_type_1, col_type_2)],
[f.count(col_speed).alias("Count"),
f.avg(col_speed).alias("Avg Speed")]
).sort(
col_type_1.sort(ascending=True, nulls_first=True),
col_type_2.sort(ascending=True, nulls_first=True)
)

Compared to the ``rollup`` example above, notice the extra rows where ``Type 1`` is ``null`` but
``Type 2`` has a value — those are the per-``Type 2`` subtotals that ``rollup`` does not include.

Explicit Grouping Sets
^^^^^^^^^^^^^^^^^^^^^^

:py:meth:`~datafusion.expr.GroupingSet.grouping_sets` lets you list exactly which grouping levels
you need when ``rollup`` or ``cube`` would produce too many or too few. Each argument is a list of
columns forming one grouping set.

For example, if we want only the per-``Type 1`` totals and per-``Type 2`` totals — but *not* the
full ``(Type 1, Type 2)`` detail rows or the grand total — we can ask for exactly that:

.. ipython:: python

df.aggregate(
[GroupingSet.grouping_sets([col_type_1], [col_type_2])],
[f.count(col_speed).alias("Count"),
f.avg(col_speed).alias("Avg Speed")]
).sort(
col_type_1.sort(ascending=True, nulls_first=True),
col_type_2.sort(ascending=True, nulls_first=True)
)

Each row belongs to exactly one grouping level. The :py:func:`~datafusion.functions.grouping`
function tells you which level each row comes from:

.. ipython:: python

result = df.aggregate(
[GroupingSet.grouping_sets([col_type_1], [col_type_2])],
[f.count(col_speed).alias("Count"),
f.avg(col_speed).alias("Avg Speed"),
f.grouping(col_type_1),
f.grouping(col_type_2)]
)
for field in result.schema():
if field.name.startswith("grouping("):
clean = field.name.split(".")[-1].rstrip(")")
result = result.with_column_renamed(field.name, f"grouping({clean})")
result.sort(
col_type_1.sort(ascending=True, nulls_first=True),
col_type_2.sort(ascending=True, nulls_first=True)
)

Where ``grouping(Type 1)`` is ``0`` the row is a per-``Type 1`` total (and ``Type 2`` is ``null``).
Where ``grouping(Type 2)`` is ``0`` the row is a per-``Type 2`` total (and ``Type 1`` is ``null``).


Aggregate Functions
-------------------

Expand Down Expand Up @@ -213,4 +375,9 @@ The available aggregate functions are:
- :py:func:`datafusion.functions.approx_median`
- :py:func:`datafusion.functions.approx_percentile_cont`
- :py:func:`datafusion.functions.approx_percentile_cont_with_weight`
10. Grouping Set Functions
- :py:func:`datafusion.functions.grouping`
- :py:meth:`datafusion.expr.GroupingSet.rollup`
- :py:meth:`datafusion.expr.GroupingSet.cube`
- :py:meth:`datafusion.expr.GroupingSet.grouping_sets`

16 changes: 15 additions & 1 deletion python/datafusion/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -633,8 +633,22 @@ def aggregate(
) -> DataFrame:
"""Aggregates the rows of the current DataFrame.

By default each unique combination of the ``group_by`` columns
produces one row. To get multiple levels of subtotals in a
single pass, pass a
:py:class:`~datafusion.expr.GroupingSet` expression
(created via
:py:meth:`~datafusion.expr.GroupingSet.rollup`,
:py:meth:`~datafusion.expr.GroupingSet.cube`, or
:py:meth:`~datafusion.expr.GroupingSet.grouping_sets`)
as the ``group_by`` argument. See the
:ref:`aggregation` user guide for detailed examples.

Args:
group_by: Sequence of expressions or column names to group by.
group_by: Sequence of expressions or column names to group
by. A :py:class:`~datafusion.expr.GroupingSet`
expression may be included to produce multiple grouping
levels (rollup, cube, or explicit grouping sets).
aggs: Sequence of expressions to aggregate.

Returns:
Expand Down
Loading
Loading