From 9e62addc17eeb2eb626fb0825e84fa52cae2f08b Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Tue, 31 Mar 2026 12:35:59 -0400 Subject: [PATCH 1/8] Add missing aggregate functions: grouping, percentile_cont, var_population Expose upstream DataFusion aggregate functions that were not yet available in the Python API. Closes #1454. - grouping: returns grouping set membership indicator (rewritten by the ResolveGroupingFunction analyzer rule before physical planning) - percentile_cont: computes exact percentile using continuous interpolation (unlike approx_percentile_cont which uses t-digest) - var_population: alias for var_pop Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/core/src/functions.rs | 23 ++++++++-- python/datafusion/functions.py | 77 ++++++++++++++++++++++++++++++++++ python/tests/test_functions.py | 46 ++++++++++++++++++++ 3 files changed, 142 insertions(+), 4 deletions(-) diff --git a/crates/core/src/functions.rs b/crates/core/src/functions.rs index 3f07da95b..63f6d5bae 100644 --- a/crates/core/src/functions.rs +++ b/crates/core/src/functions.rs @@ -709,9 +709,10 @@ aggregate_function!(var_pop); aggregate_function!(approx_distinct); aggregate_function!(approx_median); -// Code is commented out since grouping is not yet implemented -// https://github.com/apache/datafusion-python/issues/861 -// aggregate_function!(grouping); +// The grouping function's physical plan is not implemented, but the +// ResolveGroupingFunction analyzer rule rewrites it before the physical +// planner sees it, so it works correctly at runtime. +aggregate_function!(grouping); #[pyfunction] #[pyo3(signature = (sort_expression, percentile, num_centroids=None, filter=None))] @@ -749,6 +750,19 @@ pub fn approx_percentile_cont_with_weight( add_builder_fns_to_aggregate(agg_fn, None, filter, None, None) } +#[pyfunction] +#[pyo3(signature = (sort_expression, percentile, filter=None))] +pub fn percentile_cont( + sort_expression: PySortExpr, + percentile: f64, + filter: Option, +) -> PyDataFusionResult { + let agg_fn = + functions_aggregate::expr_fn::percentile_cont(sort_expression.sort, lit(percentile)); + + add_builder_fns_to_aggregate(agg_fn, None, filter, None, None) +} + // We handle last_value explicitly because the signature expects an order_by // https://github.com/apache/datafusion/issues/12376 #[pyfunction] @@ -949,6 +963,7 @@ pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(approx_median))?; m.add_wrapped(wrap_pyfunction!(approx_percentile_cont))?; m.add_wrapped(wrap_pyfunction!(approx_percentile_cont_with_weight))?; + m.add_wrapped(wrap_pyfunction!(percentile_cont))?; m.add_wrapped(wrap_pyfunction!(range))?; m.add_wrapped(wrap_pyfunction!(array_agg))?; m.add_wrapped(wrap_pyfunction!(arrow_typeof))?; @@ -997,7 +1012,7 @@ pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(from_unixtime))?; m.add_wrapped(wrap_pyfunction!(gcd))?; m.add_wrapped(wrap_pyfunction!(greatest))?; - // m.add_wrapped(wrap_pyfunction!(grouping))?; + m.add_wrapped(wrap_pyfunction!(grouping))?; m.add_wrapped(wrap_pyfunction!(in_list))?; m.add_wrapped(wrap_pyfunction!(initcap))?; m.add_wrapped(wrap_pyfunction!(isnan))?; diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index f1ea3d256..4abb02f8f 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -153,6 +153,7 @@ "from_unixtime", "gcd", "greatest", + "grouping", "ifnull", "in_list", "initcap", @@ -224,6 +225,7 @@ "order_by", "overlay", "percent_rank", + "percentile_cont", "pi", "pow", "power", @@ -294,6 +296,7 @@ "uuid", "var", "var_pop", + "var_population", "var_samp", "var_sample", "when", @@ -3643,6 +3646,47 @@ def approx_percentile_cont_with_weight( ) +def percentile_cont( + sort_expression: Expr | SortExpr, + percentile: float, + filter: Expr | None = None, +) -> Expr: + """Computes the exact percentile of input values using continuous interpolation. + + Unlike :py:func:`approx_percentile_cont`, this function computes the exact + percentile value rather than an approximation. + + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``order_by``, ``null_treatment``, and ``distinct``. + + Args: + sort_expression: Values for which to find the percentile + percentile: This must be between 0.0 and 1.0, inclusive + filter: If provided, only compute against rows for which the filter is True + + Examples: + >>> ctx = dfn.SessionContext() + >>> df = ctx.from_pydict({"a": [1.0, 2.0, 3.0, 4.0, 5.0]}) + >>> result = df.aggregate( + ... [], [dfn.functions.percentile_cont( + ... dfn.col("a"), 0.5 + ... ).alias("v")]) + >>> result.collect_column("v")[0].as_py() + 3.0 + + >>> result = df.aggregate( + ... [], [dfn.functions.percentile_cont( + ... dfn.col("a"), 0.5, + ... filter=dfn.col("a") > dfn.lit(1.0), + ... ).alias("v")]) + >>> result.collect_column("v")[0].as_py() + 3.5 + """ + sort_expr_raw = sort_or_default(sort_expression) + filter_raw = filter.expr if filter is not None else None + return Expr(f.percentile_cont(sort_expr_raw, percentile, filter=filter_raw)) + + def array_agg( expression: Expr, distinct: bool = False, @@ -3701,6 +3745,30 @@ def array_agg( ) +def grouping( + expression: Expr, + distinct: bool | None = None, + filter: Expr | None = None, +) -> Expr: + """Returns 1 if the data is aggregated across the specified column, or 0 otherwise. + + This function is used with ``GROUPING SETS``, ``CUBE``, or ``ROLLUP`` to + distinguish between aggregated and non-aggregated rows. In a regular + ``GROUP BY`` without grouping sets, it always returns 0. + + Note: The ``grouping`` aggregate function is rewritten by the query + optimizer before execution, so it works correctly even though its + physical plan is not directly implemented. + + Args: + expression: The column to check grouping status for + distinct: If True, compute on distinct values only + filter: If provided, only compute against rows for which the filter is True + """ + filter_raw = filter.expr if filter is not None else None + return Expr(f.grouping(expression.expr, distinct=distinct, filter=filter_raw)) + + def avg( expression: Expr, filter: Expr | None = None, @@ -4172,6 +4240,15 @@ def var_pop(expression: Expr, filter: Expr | None = None) -> Expr: return Expr(f.var_pop(expression.expr, filter=filter_raw)) +def var_population(expression: Expr, filter: Expr | None = None) -> Expr: + """Computes the population variance of the argument. + + See Also: + This is an alias for :py:func:`var_pop`. + """ + return var_pop(expression, filter) + + def var_samp(expression: Expr, filter: Expr | None = None) -> Expr: """Computes the sample variance of the argument. diff --git a/python/tests/test_functions.py b/python/tests/test_functions.py index 74fcbffb4..0a490a6a3 100644 --- a/python/tests/test_functions.py +++ b/python/tests/test_functions.py @@ -1660,3 +1660,49 @@ def df_with_nulls(): def test_conditional_functions(df_with_nulls, expr, expected): result = df_with_nulls.select(expr.alias("result")).collect()[0] assert result.column(0) == expected + + +def test_percentile_cont(): + ctx = SessionContext() + df = ctx.from_pydict({"a": [1.0, 2.0, 3.0, 4.0, 5.0]}) + result = df.aggregate( + [], [f.percentile_cont(column("a"), 0.5).alias("v")] + ).collect()[0] + assert result.column(0)[0].as_py() == 3.0 + + +def test_percentile_cont_with_filter(): + ctx = SessionContext() + df = ctx.from_pydict({"a": [1.0, 2.0, 3.0, 4.0, 5.0]}) + result = df.aggregate( + [], + [ + f.percentile_cont( + column("a"), 0.5, filter=column("a") > literal(1.0) + ).alias("v") + ], + ).collect()[0] + assert result.column(0)[0].as_py() == 3.5 + + +def test_grouping(): + ctx = SessionContext() + df = ctx.from_pydict({"a": [1, 1, 2], "b": [10, 20, 30]}) + # In a simple GROUP BY (no grouping sets), grouping() returns 0 for all rows. + # Note: grouping() must not be aliased directly in the aggregate expression list + # due to an upstream DataFusion analyzer limitation (the ResolveGroupingFunction + # rule doesn't unwrap Alias nodes). Apply aliases via a follow-up select instead. + result = df.aggregate( + [column("a")], [f.grouping(column("a")), f.sum(column("b")).alias("s")] + ).collect() + grouping_col = pa.concat_arrays([batch.column(1) for batch in result]).to_pylist() + assert all(v == 0 for v in grouping_col) + + +def test_var_population(): + ctx = SessionContext() + df = ctx.from_pydict({"a": [-1.0, 0.0, 2.0]}) + result = df.aggregate([], [f.var_population(column("a")).alias("v")]).collect()[0] + # var_population is an alias for var_pop + expected = df.aggregate([], [f.var_pop(column("a")).alias("v")]).collect()[0] + assert abs(result.column(0)[0].as_py() - expected.column(0)[0].as_py()) < 1e-10 From d16cff16384edf7a0140a1af9249f907e5e7b5ce Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Fri, 3 Apr 2026 16:21:55 -0400 Subject: [PATCH 2/8] Fix grouping() distinct parameter type for API consistency Co-Authored-By: Claude Opus 4.6 (1M context) --- python/datafusion/functions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 4abb02f8f..10c311524 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -3747,7 +3747,7 @@ def array_agg( def grouping( expression: Expr, - distinct: bool | None = None, + distinct: bool = False, filter: Expr | None = None, ) -> Expr: """Returns 1 if the data is aggregated across the specified column, or 0 otherwise. From 03037160b202f96fb4e16feb57cfcf2261d33176 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Mon, 6 Apr 2026 09:18:56 -0400 Subject: [PATCH 3/8] Improve aggregate function tests and docstrings per review feedback Add docstring example to grouping(), parametrize percentile_cont tests, and add multi-column grouping test case. Co-Authored-By: Claude Opus 4.6 (1M context) --- python/datafusion/functions.py | 17 ++++++++++++ python/tests/test_functions.py | 49 ++++++++++++++++++++-------------- 2 files changed, 46 insertions(+), 20 deletions(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index f199abc31..6cc049518 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -4427,6 +4427,23 @@ def grouping( expression: The column to check grouping status for distinct: If True, compute on distinct values only filter: If provided, only compute against rows for which the filter is True + + Examples: + In a simple ``GROUP BY`` (no grouping sets), ``grouping()`` always + returns 0, indicating the column is part of the grouping key: + + >>> import pyarrow as pa + >>> ctx = dfn.SessionContext() + >>> df = ctx.from_pydict({"a": [1, 1, 2], "b": [10, 20, 30]}) + >>> result = df.aggregate( + ... [dfn.col("a")], + ... [dfn.functions.grouping(dfn.col("a")), + ... dfn.functions.sum(dfn.col("b")).alias("s")]) + >>> batches = result.collect() + >>> grouping_vals = pa.concat_arrays( + ... [batch.column(1) for batch in batches]).to_pylist() + >>> all(v == 0 for v in grouping_vals) + True """ filter_raw = filter.expr if filter is not None else None return Expr(f.grouping(expression.expr, distinct=distinct, filter=filter_raw)) diff --git a/python/tests/test_functions.py b/python/tests/test_functions.py index c123456a8..7735b8bf7 100644 --- a/python/tests/test_functions.py +++ b/python/tests/test_functions.py @@ -1820,36 +1820,27 @@ def test_conditional_functions(df_with_nulls, expr, expected): assert result.column(0) == expected -def test_percentile_cont(): +@pytest.mark.parametrize( + ("filter_expr", "expected"), + [ + (None, 3.0), + (column("a") > literal(1.0), 3.5), + ], + ids=["no_filter", "with_filter"], +) +def test_percentile_cont(filter_expr, expected): ctx = SessionContext() df = ctx.from_pydict({"a": [1.0, 2.0, 3.0, 4.0, 5.0]}) result = df.aggregate( - [], [f.percentile_cont(column("a"), 0.5).alias("v")] + [], [f.percentile_cont(column("a"), 0.5, filter=filter_expr).alias("v")] ).collect()[0] - assert result.column(0)[0].as_py() == 3.0 - - -def test_percentile_cont_with_filter(): - ctx = SessionContext() - df = ctx.from_pydict({"a": [1.0, 2.0, 3.0, 4.0, 5.0]}) - result = df.aggregate( - [], - [ - f.percentile_cont( - column("a"), 0.5, filter=column("a") > literal(1.0) - ).alias("v") - ], - ).collect()[0] - assert result.column(0)[0].as_py() == 3.5 + assert result.column(0)[0].as_py() == expected def test_grouping(): ctx = SessionContext() df = ctx.from_pydict({"a": [1, 1, 2], "b": [10, 20, 30]}) # In a simple GROUP BY (no grouping sets), grouping() returns 0 for all rows. - # Note: grouping() must not be aliased directly in the aggregate expression list - # due to an upstream DataFusion analyzer limitation (the ResolveGroupingFunction - # rule doesn't unwrap Alias nodes). Apply aliases via a follow-up select instead. result = df.aggregate( [column("a")], [f.grouping(column("a")), f.sum(column("b")).alias("s")] ).collect() @@ -1857,6 +1848,24 @@ def test_grouping(): assert all(v == 0 for v in grouping_col) +def test_grouping_multiple_columns(): + # Verify grouping() works when multiple columns are in the GROUP BY clause. + ctx = SessionContext() + df = ctx.from_pydict({"a": [1, 1, 2], "b": [10, 10, 30], "c": [100, 200, 300]}) + result = df.aggregate( + [column("a"), column("b")], + [ + f.grouping(column("a")), + f.grouping(column("b")), + f.sum(column("c")).alias("s"), + ], + ).collect() + grouping_a = pa.concat_arrays([batch.column(2) for batch in result]).to_pylist() + grouping_b = pa.concat_arrays([batch.column(3) for batch in result]).to_pylist() + assert all(v == 0 for v in grouping_a) + assert all(v == 0 for v in grouping_b) + + def test_var_population(): ctx = SessionContext() df = ctx.from_pydict({"a": [-1.0, 0.0, 2.0]}) From 124349806b9be3ae420a9ab06d7541d7b0b850ca Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Mon, 6 Apr 2026 09:55:03 -0400 Subject: [PATCH 4/8] Add GroupingSet.rollup, .cube, and .grouping_sets factory methods Expose ROLLUP, CUBE, and GROUPING SETS via the DataFrame API by adding static methods on GroupingSet that construct the corresponding Expr variants. Update grouping() docstring and tests to use the new API. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/core/src/expr/grouping_set.rs | 37 +++++++- python/datafusion/expr.py | 134 ++++++++++++++++++++++++++- python/datafusion/functions.py | 42 +++++---- python/tests/test_functions.py | 72 ++++++++++---- 4 files changed, 251 insertions(+), 34 deletions(-) diff --git a/crates/core/src/expr/grouping_set.rs b/crates/core/src/expr/grouping_set.rs index 549a866ed..11d8f4fcd 100644 --- a/crates/core/src/expr/grouping_set.rs +++ b/crates/core/src/expr/grouping_set.rs @@ -15,9 +15,11 @@ // specific language governing permissions and limitations // under the License. -use datafusion::logical_expr::GroupingSet; +use datafusion::logical_expr::{Expr, GroupingSet}; use pyo3::prelude::*; +use crate::expr::PyExpr; + #[pyclass( from_py_object, frozen, @@ -30,6 +32,39 @@ pub struct PyGroupingSet { grouping_set: GroupingSet, } +#[pymethods] +impl PyGroupingSet { + #[staticmethod] + #[pyo3(signature = (*exprs))] + fn rollup(exprs: Vec) -> PyExpr { + Expr::GroupingSet(GroupingSet::Rollup( + exprs.into_iter().map(|e| e.expr).collect(), + )) + .into() + } + + #[staticmethod] + #[pyo3(signature = (*exprs))] + fn cube(exprs: Vec) -> PyExpr { + Expr::GroupingSet(GroupingSet::Cube( + exprs.into_iter().map(|e| e.expr).collect(), + )) + .into() + } + + #[staticmethod] + #[pyo3(signature = (*expr_lists))] + fn grouping_sets(expr_lists: Vec>) -> PyExpr { + Expr::GroupingSet(GroupingSet::GroupingSets( + expr_lists + .into_iter() + .map(|list| list.into_iter().map(|e| e.expr).collect()) + .collect(), + )) + .into() + } +} + impl From for GroupingSet { fn from(grouping_set: PyGroupingSet) -> Self { grouping_set.grouping_set diff --git a/python/datafusion/expr.py b/python/datafusion/expr.py index 14753a4f5..0bbbe606e 100644 --- a/python/datafusion/expr.py +++ b/python/datafusion/expr.py @@ -91,7 +91,7 @@ Extension = expr_internal.Extension FileType = expr_internal.FileType Filter = expr_internal.Filter -GroupingSet = expr_internal.GroupingSet +_GroupingSetInternal = expr_internal.GroupingSet Join = expr_internal.Join ILike = expr_internal.ILike InList = expr_internal.InList @@ -1430,3 +1430,135 @@ def __repr__(self) -> str: SortKey = Expr | SortExpr | str + + +class GroupingSet: + """Factory for creating grouping set expressions. + + Grouping sets control how + :py:meth:`~datafusion.dataframe.DataFrame.aggregate` groups rows. + Instead of a single ``GROUP BY``, they produce multiple grouping + levels in one pass — subtotals, cross-tabulations, or arbitrary + column subsets. + + Use :py:func:`~datafusion.functions.grouping` in the aggregate list + to tell which columns are aggregated across in each result row. + """ + + @staticmethod + def rollup(*exprs: Expr) -> Expr: + """Create a ``ROLLUP`` grouping set for use with ``aggregate()``. + + ``ROLLUP`` generates all prefixes of the given column list as + grouping sets. For example, ``rollup(a, b)`` produces grouping + sets ``(a, b)``, ``(a)``, and ``()`` (grand total). + + This is equivalent to ``GROUP BY ROLLUP(a, b)`` in SQL. + + Args: + *exprs: Column expressions to include in the rollup. + + Examples: + >>> import pyarrow as pa + >>> import datafusion as dfn + >>> from datafusion.expr import GroupingSet + >>> ctx = dfn.SessionContext() + >>> df = ctx.from_pydict({"a": [1, 1, 2], "b": [10, 20, 30]}) + >>> result = df.aggregate( + ... [GroupingSet.rollup(dfn.col("a"))], + ... [dfn.functions.sum(dfn.col("b")).alias("s"), + ... dfn.functions.grouping(dfn.col("a"))], + ... ).sort(dfn.col("a").sort(nulls_first=False)) + >>> batches = result.collect() + >>> pa.concat_arrays([b.column("s") for b in batches]).to_pylist() + [30, 30, 60] + + See Also: + :py:meth:`cube`, :py:meth:`grouping_sets`, + :py:func:`~datafusion.functions.grouping` + """ + args = [e.expr for e in exprs] + return Expr(_GroupingSetInternal.rollup(*args)) + + @staticmethod + def cube(*exprs: Expr) -> Expr: + """Create a ``CUBE`` grouping set for use with ``aggregate()``. + + ``CUBE`` generates all possible subsets of the given column list + as grouping sets. For example, ``cube(a, b)`` produces grouping + sets ``(a, b)``, ``(a)``, ``(b)``, and ``()`` (grand total). + + This is equivalent to ``GROUP BY CUBE(a, b)`` in SQL. + + Args: + *exprs: Column expressions to include in the cube. + + Examples: + With a single column, ``cube`` behaves identically to + :py:meth:`rollup`: + + >>> import pyarrow as pa + >>> import datafusion as dfn + >>> from datafusion.expr import GroupingSet + >>> ctx = dfn.SessionContext() + >>> df = ctx.from_pydict({"a": [1, 1, 2], "b": [10, 20, 30]}) + >>> result = df.aggregate( + ... [GroupingSet.cube(dfn.col("a"))], + ... [dfn.functions.sum(dfn.col("b")).alias("s"), + ... dfn.functions.grouping(dfn.col("a"))], + ... ).sort(dfn.col("a").sort(nulls_first=False)) + >>> batches = result.collect() + >>> pa.concat_arrays([b.column(2) for b in batches]).to_pylist() + [0, 0, 1] + + See Also: + :py:meth:`rollup`, :py:meth:`grouping_sets`, + :py:func:`~datafusion.functions.grouping` + """ + args = [e.expr for e in exprs] + return Expr(_GroupingSetInternal.cube(*args)) + + @staticmethod + def grouping_sets(*expr_lists: list[Expr]) -> Expr: + """Create explicit grouping sets for use with ``aggregate()``. + + Each argument is a list of column expressions representing one + grouping set. For example, ``grouping_sets([a], [b])`` groups + by ``a`` alone and by ``b`` alone in a single query. + + This is equivalent to ``GROUP BY GROUPING SETS ((a), (b))`` in + SQL. + + Args: + *expr_lists: Each positional argument is a list of + expressions forming one grouping set. + + Examples: + >>> import pyarrow as pa + >>> import datafusion as dfn + >>> from datafusion.expr import GroupingSet + >>> ctx = dfn.SessionContext() + >>> df = ctx.from_pydict( + ... {"a": ["x", "x", "y"], "b": ["m", "n", "m"], + ... "c": [1, 2, 3]}) + >>> result = df.aggregate( + ... [GroupingSet.grouping_sets( + ... [dfn.col("a")], [dfn.col("b")])], + ... [dfn.functions.sum(dfn.col("c")).alias("s"), + ... dfn.functions.grouping(dfn.col("a")), + ... dfn.functions.grouping(dfn.col("b"))], + ... ).sort( + ... dfn.col("a").sort(nulls_first=False), + ... dfn.col("b").sort(nulls_first=False), + ... ) + >>> batches = result.collect() + >>> pa.concat_arrays( + ... [b.column("s") for b in batches]).to_pylist() + [3, 3, 4, 2] + + See Also: + :py:meth:`rollup`, :py:meth:`cube`, + :py:func:`~datafusion.functions.grouping` + """ + raw_lists = [[e.expr for e in lst] for lst in expr_lists] + return Expr(_GroupingSetInternal.grouping_sets(*raw_lists)) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 6cc049518..956ba13ad 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -4413,15 +4413,20 @@ def grouping( distinct: bool = False, filter: Expr | None = None, ) -> Expr: - """Returns 1 if the data is aggregated across the specified column, or 0 otherwise. + """Indicates whether a column is aggregated across in the current row. - This function is used with ``GROUPING SETS``, ``CUBE``, or ``ROLLUP`` to - distinguish between aggregated and non-aggregated rows. In a regular - ``GROUP BY`` without grouping sets, it always returns 0. + Returns 0 when the column is part of the grouping key for that row + (i.e., the row contains per-group results for that column). Returns 1 + when the column is *not* part of the grouping key (i.e., the row's + aggregate spans all values of that column). - Note: The ``grouping`` aggregate function is rewritten by the query - optimizer before execution, so it works correctly even though its - physical plan is not directly implemented. + This function is meaningful with + :py:meth:`GroupingSet.rollup `, + :py:meth:`GroupingSet.cube `, or + :py:meth:`GroupingSet.grouping_sets `, + where different rows are grouped by different subsets of columns. In a + regular ``GROUP BY`` without grouping sets every column is always part + of the key, so ``grouping()`` always returns 0. Args: expression: The column to check grouping status for @@ -4429,21 +4434,26 @@ def grouping( filter: If provided, only compute against rows for which the filter is True Examples: - In a simple ``GROUP BY`` (no grouping sets), ``grouping()`` always - returns 0, indicating the column is part of the grouping key: + With :py:meth:`~datafusion.expr.GroupingSet.rollup`, the result + includes both per-group rows (``grouping(a) = 0``) and a + grand-total row where ``a`` is aggregated across + (``grouping(a) = 1``): >>> import pyarrow as pa + >>> from datafusion.expr import GroupingSet >>> ctx = dfn.SessionContext() >>> df = ctx.from_pydict({"a": [1, 1, 2], "b": [10, 20, 30]}) >>> result = df.aggregate( - ... [dfn.col("a")], - ... [dfn.functions.grouping(dfn.col("a")), - ... dfn.functions.sum(dfn.col("b")).alias("s")]) + ... [GroupingSet.rollup(dfn.col("a"))], + ... [dfn.functions.sum(dfn.col("b")).alias("s"), + ... dfn.functions.grouping(dfn.col("a"))], + ... ).sort(dfn.col("a").sort(nulls_first=False)) >>> batches = result.collect() - >>> grouping_vals = pa.concat_arrays( - ... [batch.column(1) for batch in batches]).to_pylist() - >>> all(v == 0 for v in grouping_vals) - True + >>> pa.concat_arrays([b.column(2) for b in batches]).to_pylist() + [0, 0, 1] + + See Also: + :py:class:`~datafusion.expr.GroupingSet` """ filter_raw = filter.expr if filter is not None else None return Expr(f.grouping(expression.expr, distinct=distinct, filter=filter_raw)) diff --git a/python/tests/test_functions.py b/python/tests/test_functions.py index 7735b8bf7..335d7b187 100644 --- a/python/tests/test_functions.py +++ b/python/tests/test_functions.py @@ -22,6 +22,7 @@ import pytest from datafusion import SessionContext, column, literal from datafusion import functions as f +from datafusion.expr import GroupingSet np.seterr(invalid="ignore") @@ -1837,33 +1838,72 @@ def test_percentile_cont(filter_expr, expected): assert result.column(0)[0].as_py() == expected -def test_grouping(): +def test_rollup(): + # With ROLLUP, per-group rows have grouping()=0 and the grand-total row + # (where the column is aggregated across) has grouping()=1. ctx = SessionContext() df = ctx.from_pydict({"a": [1, 1, 2], "b": [10, 20, 30]}) - # In a simple GROUP BY (no grouping sets), grouping() returns 0 for all rows. result = df.aggregate( - [column("a")], [f.grouping(column("a")), f.sum(column("b")).alias("s")] - ).collect() - grouping_col = pa.concat_arrays([batch.column(1) for batch in result]).to_pylist() - assert all(v == 0 for v in grouping_col) + [GroupingSet.rollup(column("a"))], + [f.sum(column("b")).alias("s"), f.grouping(column("a"))], + ).sort(column("a").sort(ascending=True, nulls_first=False)) + batches = result.collect() + g = pa.concat_arrays([b.column(2) for b in batches]).to_pylist() + s = pa.concat_arrays([b.column("s") for b in batches]).to_pylist() + # Two per-group rows (g=0) plus one grand-total row (g=1) + assert g == [0, 0, 1] + assert s == [30, 30, 60] + + +def test_rollup_multi_column(): + # rollup(a, b) produces grouping sets (a, b), (a), (). + ctx = SessionContext() + df = ctx.from_pydict({"a": [1, 1, 2], "b": ["x", "y", "x"], "c": [10, 20, 30]}) + result = df.aggregate( + [GroupingSet.rollup(column("a"), column("b"))], + [f.sum(column("c")).alias("s")], + ) + total_rows = sum(b.num_rows for b in result.collect()) + # 3 detail (a,b) + 2 subtotal (a) + 1 grand total = 6 + assert total_rows == 6 -def test_grouping_multiple_columns(): - # Verify grouping() works when multiple columns are in the GROUP BY clause. +def test_cube(): + # cube(a, b) produces all subsets: (a,b), (a), (b), (). ctx = SessionContext() - df = ctx.from_pydict({"a": [1, 1, 2], "b": [10, 10, 30], "c": [100, 200, 300]}) + df = ctx.from_pydict({"a": [1, 1, 2], "b": ["x", "y", "x"], "c": [10, 20, 30]}) result = df.aggregate( - [column("a"), column("b")], + [GroupingSet.cube(column("a"), column("b"))], + [f.sum(column("c")).alias("s")], + ) + total_rows = sum(b.num_rows for b in result.collect()) + # 3 (a,b) + 2 (a) + 2 (b) + 1 () = 8 + assert total_rows == 8 + + +def test_grouping_sets(): + # GROUPING SETS lets you choose exactly which column subsets to group by. + # Each row's grouping() value tells you which columns are aggregated across. + ctx = SessionContext() + df = ctx.from_pydict({"a": ["x", "x", "y"], "b": ["m", "n", "m"], "c": [1, 2, 3]}) + result = df.aggregate( + [GroupingSet.grouping_sets([column("a")], [column("b")])], [ + f.sum(column("c")).alias("s"), f.grouping(column("a")), f.grouping(column("b")), - f.sum(column("c")).alias("s"), ], - ).collect() - grouping_a = pa.concat_arrays([batch.column(2) for batch in result]).to_pylist() - grouping_b = pa.concat_arrays([batch.column(3) for batch in result]).to_pylist() - assert all(v == 0 for v in grouping_a) - assert all(v == 0 for v in grouping_b) + ).sort( + column("a").sort(ascending=True, nulls_first=False), + column("b").sort(ascending=True, nulls_first=False), + ) + batches = result.collect() + ga = pa.concat_arrays([b.column(3) for b in batches]).to_pylist() + gb = pa.concat_arrays([b.column(4) for b in batches]).to_pylist() + # Rows grouped by (a): ga=0 (a is a key), gb=1 (b is aggregated across) + # Rows grouped by (b): ga=1 (a is aggregated across), gb=0 (b is a key) + assert ga == [0, 0, 1, 1] + assert gb == [1, 1, 0, 0] def test_var_population(): From c52b49c4cf26ddb45c96f5ac2787a28aa739a561 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Mon, 6 Apr 2026 10:22:25 -0400 Subject: [PATCH 5/8] Remove _GroupingSetInternal alias, use expr_internal.GroupingSet directly Co-Authored-By: Claude Opus 4.6 (1M context) --- python/datafusion/expr.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/python/datafusion/expr.py b/python/datafusion/expr.py index 0bbbe606e..ef997685f 100644 --- a/python/datafusion/expr.py +++ b/python/datafusion/expr.py @@ -91,7 +91,6 @@ Extension = expr_internal.Extension FileType = expr_internal.FileType Filter = expr_internal.Filter -_GroupingSetInternal = expr_internal.GroupingSet Join = expr_internal.Join ILike = expr_internal.ILike InList = expr_internal.InList @@ -1478,7 +1477,7 @@ def rollup(*exprs: Expr) -> Expr: :py:func:`~datafusion.functions.grouping` """ args = [e.expr for e in exprs] - return Expr(_GroupingSetInternal.rollup(*args)) + return Expr(expr_internal.GroupingSet.rollup(*args)) @staticmethod def cube(*exprs: Expr) -> Expr: @@ -1516,7 +1515,7 @@ def cube(*exprs: Expr) -> Expr: :py:func:`~datafusion.functions.grouping` """ args = [e.expr for e in exprs] - return Expr(_GroupingSetInternal.cube(*args)) + return Expr(expr_internal.GroupingSet.cube(*args)) @staticmethod def grouping_sets(*expr_lists: list[Expr]) -> Expr: @@ -1561,4 +1560,4 @@ def grouping_sets(*expr_lists: list[Expr]) -> Expr: :py:func:`~datafusion.functions.grouping` """ raw_lists = [[e.expr for e in lst] for lst in expr_lists] - return Expr(_GroupingSetInternal.grouping_sets(*raw_lists)) + return Expr(expr_internal.GroupingSet.grouping_sets(*raw_lists)) From c9183dd4cb9c25a9122aceccbb3d419bce8c94d0 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Mon, 6 Apr 2026 10:26:19 -0400 Subject: [PATCH 6/8] Parametrize grouping set tests for rollup and cube Co-Authored-By: Claude Opus 4.6 (1M context) --- python/tests/test_functions.py | 55 +++++++++++++++++----------------- 1 file changed, 28 insertions(+), 27 deletions(-) diff --git a/python/tests/test_functions.py b/python/tests/test_functions.py index 335d7b187..419c752fd 100644 --- a/python/tests/test_functions.py +++ b/python/tests/test_functions.py @@ -1838,51 +1838,52 @@ def test_percentile_cont(filter_expr, expected): assert result.column(0)[0].as_py() == expected -def test_rollup(): - # With ROLLUP, per-group rows have grouping()=0 and the grand-total row - # (where the column is aggregated across) has grouping()=1. +@pytest.mark.parametrize( + ("grouping_set_expr", "expected_grouping", "expected_sums"), + [ + (GroupingSet.rollup(column("a")), [0, 0, 1], [30, 30, 60]), + (GroupingSet.cube(column("a")), [0, 0, 1], [30, 30, 60]), + ], + ids=["rollup", "cube"], +) +def test_grouping_set_single_column( + grouping_set_expr, expected_grouping, expected_sums +): ctx = SessionContext() df = ctx.from_pydict({"a": [1, 1, 2], "b": [10, 20, 30]}) result = df.aggregate( - [GroupingSet.rollup(column("a"))], + [grouping_set_expr], [f.sum(column("b")).alias("s"), f.grouping(column("a"))], ).sort(column("a").sort(ascending=True, nulls_first=False)) batches = result.collect() g = pa.concat_arrays([b.column(2) for b in batches]).to_pylist() s = pa.concat_arrays([b.column("s") for b in batches]).to_pylist() - # Two per-group rows (g=0) plus one grand-total row (g=1) - assert g == [0, 0, 1] - assert s == [30, 30, 60] - - -def test_rollup_multi_column(): - # rollup(a, b) produces grouping sets (a, b), (a), (). - ctx = SessionContext() - df = ctx.from_pydict({"a": [1, 1, 2], "b": ["x", "y", "x"], "c": [10, 20, 30]}) - result = df.aggregate( - [GroupingSet.rollup(column("a"), column("b"))], - [f.sum(column("c")).alias("s")], - ) - total_rows = sum(b.num_rows for b in result.collect()) - # 3 detail (a,b) + 2 subtotal (a) + 1 grand total = 6 - assert total_rows == 6 + assert g == expected_grouping + assert s == expected_sums -def test_cube(): - # cube(a, b) produces all subsets: (a,b), (a), (b), (). +@pytest.mark.parametrize( + ("grouping_set_expr", "expected_rows"), + [ + # rollup(a, b) => (a,b), (a), () => 3 + 2 + 1 = 6 + (GroupingSet.rollup(column("a"), column("b")), 6), + # cube(a, b) => (a,b), (a), (b), () => 3 + 2 + 2 + 1 = 8 + (GroupingSet.cube(column("a"), column("b")), 8), + ], + ids=["rollup", "cube"], +) +def test_grouping_set_multi_column(grouping_set_expr, expected_rows): ctx = SessionContext() df = ctx.from_pydict({"a": [1, 1, 2], "b": ["x", "y", "x"], "c": [10, 20, 30]}) result = df.aggregate( - [GroupingSet.cube(column("a"), column("b"))], + [grouping_set_expr], [f.sum(column("c")).alias("s")], ) total_rows = sum(b.num_rows for b in result.collect()) - # 3 (a,b) + 2 (a) + 2 (b) + 1 () = 8 - assert total_rows == 8 + assert total_rows == expected_rows -def test_grouping_sets(): - # GROUPING SETS lets you choose exactly which column subsets to group by. +def test_grouping_sets_explicit(): # Each row's grouping() value tells you which columns are aggregated across. ctx = SessionContext() df = ctx.from_pydict({"a": ["x", "x", "y"], "b": ["m", "n", "m"], "c": [1, 2, 3]}) From 4a1efcb6e2f61245dc56a3708fb6584984d10888 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Mon, 6 Apr 2026 12:24:19 -0400 Subject: [PATCH 7/8] Add grouping sets documentation and note grouping() alias limitation Add user documentation for GroupingSet.rollup, .cube, and .grouping_sets with Pokemon dataset examples. Document the upstream alias limitation (apache/datafusion#21411) in both the grouping() docstring and the aggregation user guide. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../common-operations/aggregations.rst | 167 ++++++++++++++++++ python/datafusion/functions.py | 12 +- 2 files changed, 178 insertions(+), 1 deletion(-) diff --git a/docs/source/user-guide/common-operations/aggregations.rst b/docs/source/user-guide/common-operations/aggregations.rst index e458e5fcb..9f2b702ac 100644 --- a/docs/source/user-guide/common-operations/aggregations.rst +++ b/docs/source/user-guide/common-operations/aggregations.rst @@ -163,6 +163,168 @@ Suppose we want to find the speed values for only Pokemon that have low Attack v f.avg(col_speed, filter=col_attack < lit(50)).alias("Avg Speed Low Attack")]) +Grouping Sets +------------- + +The default style of aggregation produces one row per group. Sometimes you want a single query to +produce rows at multiple levels of detail — for example, totals per type *and* an overall grand +total, or subtotals for every combination of two columns plus the individual column totals. Writing +separate queries and concatenating them is tedious and runs the data multiple times. Grouping sets +solve this by letting you specify several grouping levels in one pass. + +DataFusion supports three grouping set styles through the +:py:class:`~datafusion.expr.GroupingSet` class: + +- :py:meth:`~datafusion.expr.GroupingSet.rollup` — hierarchical subtotals, like a drill-down report +- :py:meth:`~datafusion.expr.GroupingSet.cube` — every possible subtotal combination, like a pivot table +- :py:meth:`~datafusion.expr.GroupingSet.grouping_sets` — explicitly list exactly which grouping levels you want + +Because result rows come from different grouping levels, a column that is *not* part of a +particular level will be ``null`` in that row. Use :py:func:`~datafusion.functions.grouping` to +distinguish a real ``null`` in the data from one that means "this column was aggregated across." +It returns ``0`` when the column is a grouping key for that row, and ``1`` when it is not. + +Rollup +^^^^^^ + +:py:meth:`~datafusion.expr.GroupingSet.rollup` creates a hierarchy. ``rollup(a, b)`` produces +grouping sets ``(a, b)``, ``(a)``, and ``()`` — like nested subtotals in a report. This is useful +when your columns have a natural hierarchy, such as region → city or type → subtype. + +Suppose we want to summarize Pokemon stats by ``Type 1`` with subtotals and a grand total. With +the default aggregation style we would need two separate queries. With ``rollup`` we get it all at +once: + +.. ipython:: python + + from datafusion.expr import GroupingSet + + df.aggregate( + [GroupingSet.rollup(col_type_1)], + [f.count(col_speed).alias("Count"), + f.avg(col_speed).alias("Avg Speed"), + f.max(col_speed).alias("Max Speed")] + ).sort(col_type_1.sort(ascending=True, nulls_first=True)) + +The first row — where ``Type 1`` is ``null`` — is the grand total across all types. But how do you +tell a grand-total ``null`` apart from a Pokemon that genuinely has no type? The +:py:func:`~datafusion.functions.grouping` function returns ``0`` when the column is a grouping key +for that row and ``1`` when it is aggregated across. + +.. note:: + + Due to an upstream DataFusion limitation + (`apache/datafusion#21411 `_), + ``.alias()`` cannot be applied directly to a ``grouping()`` expression — it will raise an + error at execution time. Instead, use + :py:meth:`~datafusion.dataframe.DataFrame.with_column_renamed` on the result DataFrame to + give the column a readable name. Once the upstream issue is resolved, you will be able to + use ``.alias()`` directly and the workaround below will no longer be necessary. + +The raw column name generated by ``grouping()`` contains internal identifiers, so we use +:py:meth:`~datafusion.dataframe.DataFrame.with_column_renamed` to clean it up: + +.. ipython:: python + + result = df.aggregate( + [GroupingSet.rollup(col_type_1)], + [f.count(col_speed).alias("Count"), + f.avg(col_speed).alias("Avg Speed"), + f.grouping(col_type_1)] + ) + for field in result.schema(): + if field.name.startswith("grouping("): + result = result.with_column_renamed(field.name, "Is Total") + result.sort(col_type_1.sort(ascending=True, nulls_first=True)) + +With two columns the hierarchy becomes more apparent. ``rollup(Type 1, Type 2)`` produces: + +- one row per ``(Type 1, Type 2)`` pair — the most detailed level +- one row per ``Type 1`` — subtotals +- one grand total row + +.. ipython:: python + + df.aggregate( + [GroupingSet.rollup(col_type_1, col_type_2)], + [f.count(col_speed).alias("Count"), + f.avg(col_speed).alias("Avg Speed")] + ).sort( + col_type_1.sort(ascending=True, nulls_first=True), + col_type_2.sort(ascending=True, nulls_first=True) + ) + +Cube +^^^^ + +:py:meth:`~datafusion.expr.GroupingSet.cube` produces every possible subset. ``cube(a, b)`` +produces grouping sets ``(a, b)``, ``(a)``, ``(b)``, and ``()`` — one more than ``rollup`` because +it also includes ``(b)`` alone. This is useful when neither column is "above" the other in a +hierarchy and you want all cross-tabulations. + +For our Pokemon data, ``cube(Type 1, Type 2)`` gives us stats broken down by the type pair, +by ``Type 1`` alone, by ``Type 2`` alone, and a grand total — all in one query: + +.. ipython:: python + + df.aggregate( + [GroupingSet.cube(col_type_1, col_type_2)], + [f.count(col_speed).alias("Count"), + f.avg(col_speed).alias("Avg Speed")] + ).sort( + col_type_1.sort(ascending=True, nulls_first=True), + col_type_2.sort(ascending=True, nulls_first=True) + ) + +Compared to the ``rollup`` example above, notice the extra rows where ``Type 1`` is ``null`` but +``Type 2`` has a value — those are the per-``Type 2`` subtotals that ``rollup`` does not include. + +Explicit Grouping Sets +^^^^^^^^^^^^^^^^^^^^^^ + +:py:meth:`~datafusion.expr.GroupingSet.grouping_sets` lets you list exactly which grouping levels +you need when ``rollup`` or ``cube`` would produce too many or too few. Each argument is a list of +columns forming one grouping set. + +For example, if we want only the per-``Type 1`` totals and per-``Type 2`` totals — but *not* the +full ``(Type 1, Type 2)`` detail rows or the grand total — we can ask for exactly that: + +.. ipython:: python + + df.aggregate( + [GroupingSet.grouping_sets([col_type_1], [col_type_2])], + [f.count(col_speed).alias("Count"), + f.avg(col_speed).alias("Avg Speed")] + ).sort( + col_type_1.sort(ascending=True, nulls_first=True), + col_type_2.sort(ascending=True, nulls_first=True) + ) + +Each row belongs to exactly one grouping level. The :py:func:`~datafusion.functions.grouping` +function tells you which level each row comes from: + +.. ipython:: python + + result = df.aggregate( + [GroupingSet.grouping_sets([col_type_1], [col_type_2])], + [f.count(col_speed).alias("Count"), + f.avg(col_speed).alias("Avg Speed"), + f.grouping(col_type_1), + f.grouping(col_type_2)] + ) + for field in result.schema(): + if field.name.startswith("grouping("): + clean = field.name.split(".")[-1].rstrip(")") + result = result.with_column_renamed(field.name, f"grouping({clean})") + result.sort( + col_type_1.sort(ascending=True, nulls_first=True), + col_type_2.sort(ascending=True, nulls_first=True) + ) + +Where ``grouping(Type 1)`` is ``0`` the row is a per-``Type 1`` total (and ``Type 2`` is ``null``). +Where ``grouping(Type 2)`` is ``0`` the row is a per-``Type 2`` total (and ``Type 1`` is ``null``). + + Aggregate Functions ------------------- @@ -213,4 +375,9 @@ The available aggregate functions are: - :py:func:`datafusion.functions.approx_median` - :py:func:`datafusion.functions.approx_percentile_cont` - :py:func:`datafusion.functions.approx_percentile_cont_with_weight` +10. Grouping Set Functions + - :py:func:`datafusion.functions.grouping` + - :py:meth:`datafusion.expr.GroupingSet.rollup` + - :py:meth:`datafusion.expr.GroupingSet.cube` + - :py:meth:`datafusion.expr.GroupingSet.grouping_sets` diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 956ba13ad..410c064f6 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -4425,9 +4425,19 @@ def grouping( :py:meth:`GroupingSet.cube `, or :py:meth:`GroupingSet.grouping_sets `, where different rows are grouped by different subsets of columns. In a - regular ``GROUP BY`` without grouping sets every column is always part + default aggregation without grouping sets every column is always part of the key, so ``grouping()`` always returns 0. + .. warning:: + + Due to an upstream DataFusion limitation + (`#21411 `_), + ``.alias()`` cannot be applied directly to a ``grouping()`` + expression. Doing so will raise an error at execution time. To + rename the column, use + :py:meth:`~datafusion.dataframe.DataFrame.with_column_renamed` + on the result DataFrame instead. + Args: expression: The column to check grouping status for distinct: If True, compute on distinct values only From 18ab457bab30a01771bcd4dc53b51ae2a66d230e Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Mon, 6 Apr 2026 12:30:12 -0400 Subject: [PATCH 8/8] Add grouping sets note to DataFrame.aggregate() docstring Co-Authored-By: Claude Opus 4.6 (1M context) --- python/datafusion/dataframe.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index 10e2a913f..9907eae8b 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -633,8 +633,22 @@ def aggregate( ) -> DataFrame: """Aggregates the rows of the current DataFrame. + By default each unique combination of the ``group_by`` columns + produces one row. To get multiple levels of subtotals in a + single pass, pass a + :py:class:`~datafusion.expr.GroupingSet` expression + (created via + :py:meth:`~datafusion.expr.GroupingSet.rollup`, + :py:meth:`~datafusion.expr.GroupingSet.cube`, or + :py:meth:`~datafusion.expr.GroupingSet.grouping_sets`) + as the ``group_by`` argument. See the + :ref:`aggregation` user guide for detailed examples. + Args: - group_by: Sequence of expressions or column names to group by. + group_by: Sequence of expressions or column names to group + by. A :py:class:`~datafusion.expr.GroupingSet` + expression may be included to produce multiple grouping + levels (rollup, cube, or explicit grouping sets). aggs: Sequence of expressions to aggregate. Returns: