From 74c4c641f059a0a75b396082062fb0e7f2127c14 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 13 May 2026 16:21:10 -0500 Subject: [PATCH 01/18] Propagate field metadata through NTH_VALUE, FIRST_VALUE, and LAST_VALUE window functions (#22112) ## Which issue does this PR close? - Closes #22108 ## Rationale for this change lead and lag both preserve metadata from the input field but nth_value, first_value, and last_value do not. ## What changes are included in this PR? The mechanism to calcluate the return field for nth_value was changed to match lead/lag (which had already been fixed). ## Are these changes tested? Yes, I added tests to metadata.slt ## Are there any user-facing changes? No --- datafusion/functions-window/src/nth_value.rs | 24 ++++++---- .../sqllogictest/test_files/metadata.slt | 46 +++++++++++++++++++ 2 files changed, 62 insertions(+), 8 deletions(-) diff --git a/datafusion/functions-window/src/nth_value.rs b/datafusion/functions-window/src/nth_value.rs index 6c6139405cbe8..82e1081f75318 100644 --- a/datafusion/functions-window/src/nth_value.rs +++ b/datafusion/functions-window/src/nth_value.rs @@ -308,14 +308,22 @@ impl WindowUDFImpl for NthValue { } fn field(&self, field_args: WindowUDFFieldArgs) -> Result { - let return_type = field_args - .input_fields() - .first() - .map(|f| f.data_type()) - .cloned() - .unwrap_or(DataType::Null); - - Ok(Field::new(field_args.name(), return_type, true).into()) + let input_field = + field_args + .input_fields() + .first() + .cloned() + .unwrap_or_else(|| { + Arc::new(Field::new(field_args.name(), DataType::Null, true)) + }); + + // Clone the input field to preserve metadata, update name and nullability + Ok(input_field + .as_ref() + .clone() + .with_name(field_args.name()) + .with_nullable(true) + .into()) } fn reverse_expr(&self) -> ReversedUDWF { diff --git a/datafusion/sqllogictest/test_files/metadata.slt b/datafusion/sqllogictest/test_files/metadata.slt index 3fea8df260f05..3e2a503e6b3fc 100644 --- a/datafusion/sqllogictest/test_files/metadata.slt +++ b/datafusion/sqllogictest/test_files/metadata.slt @@ -472,5 +472,51 @@ select arrow_metadata(with_metadata(id, 'unit', ''), 'unit') from table_with_met ---- (empty) +# Regression test: window functions should preserve field metadata +# Test FIRST_VALUE window function preserves metadata +query IT +select + first_value(id) over (order by id asc nulls last) as fv, + arrow_metadata(first_value(id) over (order by id asc nulls last), 'metadata_key') as meta +from table_with_metadata limit 1; +---- +1 the id field + +# Test LAST_VALUE window function preserves metadata +query IT +select + last_value(id) over (order by id asc nulls last rows between unbounded preceding and unbounded following) as lv, + arrow_metadata(last_value(id) over (order by id asc nulls last rows between unbounded preceding and unbounded following), 'metadata_key') as meta +from table_with_metadata limit 1; +---- +NULL the id field + +# Test NTH_VALUE window function preserves metadata +query IT +select + nth_value(id, 2) over (order by id asc nulls last) as nv, + arrow_metadata(nth_value(id, 2) over (order by id asc nulls last), 'metadata_key') as meta +from table_with_metadata limit 1; +---- +NULL the id field + +# Test LEAD window function preserves metadata +query IT +select + lead(id) over (order by id asc nulls last) as ld, + arrow_metadata(lead(id) over (order by id asc nulls last), 'metadata_key') as meta +from table_with_metadata limit 1; +---- +3 the id field + +# Test LAG window function preserves metadata +query IT +select + lag(id) over (order by id asc nulls last) as lg, + arrow_metadata(lag(id) over (order by id asc nulls last), 'metadata_key') as meta +from table_with_metadata limit 1; +---- +NULL the id field + statement ok drop table table_with_metadata; From ccc67e9a65967c19f0c1523d714da132559b1a9e Mon Sep 17 00:00:00 2001 From: Oleks V Date: Wed, 13 May 2026 15:28:15 -0700 Subject: [PATCH 02/18] feat: fix windows frame positive/neg overflows (#22140) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Which issue does this PR close? - Closes #22137 . ## Rationale for this change `RANGE` window frames with a value offset (e.g. `RANGE BETWEEN 4 PRECEDING AND 4 FOLLOWING`) panicked with `attempt to add/subtract with overflow` whenever the boundary target (`value ± delta`) wrapped past the type's representable range. Affected inputs include values close to `i64::MAX`/`i64::MIN`, `u64::MAX`, and any analogous boundary for other integer/decimal/timestamp types. Two `// TODO: Handle ... overflows.` markers in `WindowFrameStateRange::calculate_index_of_row` had been left for this case; the unchecked `ScalarValue::add` / `sub` silently wrapped the target, after which `search_in_slice` was handed a nonsensical (wrapped) value and downstream code tripped a debug-assert subtraction in `functions-window/src/nth_value.rs`. Semantically, an overflowed boundary is *unbounded* with respect to the data in the partition — every real value lies strictly inside the wrapped sentinel — so the correct behavior is to collapse the search to the appropriate partition edge rather than to search with a wrapped target. ## What changes are included in this PR? `datafusion/expr/src/window_state.rs` - Replace `ScalarValue::add` / `sub` with their `*_checked` counterparts in the boundary computation. - On overflow, short-circuit to the correct partition edge: `search_start` for `PRECEDING`-direction searches, `length` for `FOLLOWING`-direction searches. The collapse direction depends only on the const-generic `SEARCH_SIDE` (the add branch and sub branch both reduce to `!SEARCH_SIDE` once you expand the `SEARCH_SIDE == is_descending` invariant that selects each arithmetic branch). - The pre-existing `value.is_unsigned() && value < delta` clamp-to-zero path for unsigned subtraction is preserved — it produces a valid polymorphic zero, not an overflow sentinel. - No behavior change on the non-overflow path. `datafusion/sqllogictest/test_files/window.slt` Regression coverage for positive and negative overflow, across: - `ASC` + `FOLLOWING` / `ASC` + `PRECEDING` / `DESC` + `PRECEDING` / `DESC` + `FOLLOWING` (each overflow direction occurs on both sort orders depending on which arithmetic branch is taken) - Symmetric `N PRECEDING AND N FOLLOWING` frames where only one side overflows - Signed (`i64`) and unsigned (`u64`) ordering columns - `first_value` and `last_value` both exercised to verify both frame edges - `ROWS` frame regression guard to document that the pre-existing `saturating_sub` / `min(length)` saturation behavior is unchanged. --- datafusion/expr/src/window_state.rs | 57 +++-- datafusion/sqllogictest/test_files/window.slt | 220 ++++++++++++++++++ 2 files changed, 254 insertions(+), 23 deletions(-) diff --git a/datafusion/expr/src/window_state.rs b/datafusion/expr/src/window_state.rs index d7da7a778b011..f8d4609d3690c 100644 --- a/datafusion/expr/src/window_state.rs +++ b/datafusion/expr/src/window_state.rs @@ -396,6 +396,11 @@ impl WindowFrameStateRange { length: usize, ) -> Result { let current_row_values = get_row_at_idx(range_columns, idx)?; + let search_start = if SIDE { + last_range.start + } else { + last_range.end + }; let end_range = if let Some(delta) = delta { let is_descending: bool = self .sort_options @@ -407,34 +412,40 @@ impl WindowFrameStateRange { })? .descending; - current_row_values - .iter() - .map(|value| { - if value.is_null() { - return Ok(value.clone()); + // On overflow the boundary exceeds the type's range and is + // effectively unbounded within the partition. Collapse to the + // partition edge rather than feeding `search_in_slice` a + // wrapped-around target: PRECEDING searches reach `search_start`, + // FOLLOWING searches reach `length`. + let unbounded_edge = if SEARCH_SIDE { search_start } else { length }; + let mut targets = Vec::with_capacity(current_row_values.len()); + for value in ¤t_row_values { + if value.is_null() { + targets.push(value.clone()); + continue; + } + let target = if SEARCH_SIDE == is_descending { + match value.add_checked(delta) { + Ok(v) => v, + Err(_) => return Ok(unbounded_edge), } - if SEARCH_SIDE == is_descending { - // TODO: Handle positive overflows. - value.add(delta) - } else if value.is_unsigned() && value < delta { - // NOTE: This gets a polymorphic zero without having long coercion code for ScalarValue. - // If we decide to implement a "default" construction mechanism for ScalarValue, - // change the following statement to use that. - value.sub(value) - } else { - // TODO: Handle negative overflows. - value.sub(delta) + } else if value.is_unsigned() && value < delta { + // NOTE: This gets a polymorphic zero without having long coercion code for ScalarValue. + // If we decide to implement a "default" construction mechanism for ScalarValue, + // change the following statement to use that. + value.sub(value)? + } else { + match value.sub_checked(delta) { + Ok(v) => v, + Err(_) => return Ok(unbounded_edge), } - }) - .collect::>>()? + }; + targets.push(target); + } + targets } else { current_row_values }; - let search_start = if SIDE { - last_range.start - } else { - last_range.end - }; let compare_fn = |current: &[ScalarValue], target: &[ScalarValue]| { let cmp = compare_rows(current, target, &self.sort_options)?; Ok(if SIDE { cmp.is_lt() } else { cmp.is_le() }) diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 74c2e38baaad5..2a74660fe9fec 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -6236,6 +6236,226 @@ INNER JOIN issue_20194_t2 t2 ---- 6774502793 10040029 1 +# Regression tests for RANGE window frames whose value-offset boundary +# computation overflows the type's representable range. Previously these +# queries panicked in functions-window/src/nth_value.rs with +# "attempt to subtract with overflow" because the wrapped-around target +# produced a frame range where `end < start`. Both positive overflows +# (target above type MAX) and negative overflows (target below type MIN) +# must be treated as unbounded within the partition. + +############################################################################ +# Positive overflow: value + delta exceeds type MAX +############################################################################ + +# ASC + FOLLOWING: end bound wraps past i64::MAX. +query II +SELECT a, last_value(a) OVER (ORDER BY a RANGE BETWEEN CURRENT ROW AND 4 FOLLOWING) +FROM ( + SELECT 9223372036854775804 AS a + UNION ALL SELECT 9223372036854775805 + UNION ALL SELECT 9223372036854775806 +); +---- +9223372036854775804 9223372036854775806 +9223372036854775805 9223372036854775806 +9223372036854775806 9223372036854775806 + +query II +SELECT a, first_value(a) OVER (ORDER BY a RANGE BETWEEN CURRENT ROW AND 4 FOLLOWING) +FROM ( + SELECT 9223372036854775804 AS a + UNION ALL SELECT 9223372036854775805 + UNION ALL SELECT 9223372036854775806 +); +---- +9223372036854775804 9223372036854775804 +9223372036854775805 9223372036854775805 +9223372036854775806 9223372036854775806 + +# Symmetric PRECEDING/FOLLOWING where the FOLLOWING side overflows past MAX. +query II +SELECT a, last_value(a) OVER (ORDER BY a RANGE BETWEEN 4 PRECEDING AND 4 FOLLOWING) +FROM ( + SELECT 9223372036854775804 AS a + UNION ALL SELECT 9223372036854775805 + UNION ALL SELECT 9223372036854775806 +); +---- +9223372036854775804 9223372036854775806 +9223372036854775805 9223372036854775806 +9223372036854775806 9223372036854775806 + +# DESC + PRECEDING: "PRECEDING" walks toward larger values in DESC order, +# so offsetting past i64::MAX exercises the ADD-overflow path. +query II +SELECT a, first_value(a) OVER (ORDER BY a DESC RANGE BETWEEN 4 PRECEDING AND CURRENT ROW) +FROM ( + SELECT 9223372036854775804 AS a + UNION ALL SELECT 9223372036854775805 + UNION ALL SELECT 9223372036854775806 +); +---- +9223372036854775806 9223372036854775806 +9223372036854775805 9223372036854775806 +9223372036854775804 9223372036854775806 + +query II +SELECT a, last_value(a) OVER (ORDER BY a DESC RANGE BETWEEN 4 PRECEDING AND CURRENT ROW) +FROM ( + SELECT 9223372036854775804 AS a + UNION ALL SELECT 9223372036854775805 + UNION ALL SELECT 9223372036854775806 +); +---- +9223372036854775806 9223372036854775806 +9223372036854775805 9223372036854775805 +9223372036854775804 9223372036854775804 + +# Unsigned ordering column: add past u64::MAX must not wrap. +query II +SELECT a, last_value(a) OVER (ORDER BY a RANGE BETWEEN CURRENT ROW AND 4 FOLLOWING) +FROM ( + SELECT arrow_cast(18446744073709551612, 'UInt64') AS a + UNION ALL SELECT arrow_cast(18446744073709551613, 'UInt64') + UNION ALL SELECT arrow_cast(18446744073709551614, 'UInt64') +); +---- +18446744073709551612 18446744073709551614 +18446744073709551613 18446744073709551614 +18446744073709551614 18446744073709551614 + +query II +SELECT a, first_value(a) OVER (ORDER BY a RANGE BETWEEN CURRENT ROW AND 4 FOLLOWING) +FROM ( + SELECT arrow_cast(18446744073709551612, 'UInt64') AS a + UNION ALL SELECT arrow_cast(18446744073709551613, 'UInt64') + UNION ALL SELECT arrow_cast(18446744073709551614, 'UInt64') +); +---- +18446744073709551612 18446744073709551612 +18446744073709551613 18446744073709551613 +18446744073709551614 18446744073709551614 + +############################################################################ +# Negative overflow: value - delta falls below type MIN +############################################################################ + +# ASC + PRECEDING: start bound wraps below i64::MIN. +query II +SELECT a, first_value(a) OVER (ORDER BY a RANGE BETWEEN 4 PRECEDING AND CURRENT ROW) +FROM ( + SELECT -9223372036854775807 AS a + UNION ALL SELECT -9223372036854775806 + UNION ALL SELECT -9223372036854775805 +); +---- +-9223372036854775807 -9223372036854775807 +-9223372036854775806 -9223372036854775807 +-9223372036854775805 -9223372036854775807 + +query II +SELECT a, last_value(a) OVER (ORDER BY a RANGE BETWEEN 4 PRECEDING AND CURRENT ROW) +FROM ( + SELECT -9223372036854775807 AS a + UNION ALL SELECT -9223372036854775806 + UNION ALL SELECT -9223372036854775805 +); +---- +-9223372036854775807 -9223372036854775807 +-9223372036854775806 -9223372036854775806 +-9223372036854775805 -9223372036854775805 + +# Symmetric PRECEDING/FOLLOWING where the PRECEDING side underflows past MIN. +query II +SELECT a, first_value(a) OVER (ORDER BY a RANGE BETWEEN 4 PRECEDING AND 4 FOLLOWING) +FROM ( + SELECT -9223372036854775807 AS a + UNION ALL SELECT -9223372036854775806 + UNION ALL SELECT -9223372036854775805 +); +---- +-9223372036854775807 -9223372036854775807 +-9223372036854775806 -9223372036854775807 +-9223372036854775805 -9223372036854775807 + +# DESC + FOLLOWING: "FOLLOWING" walks toward smaller values in DESC order, +# so offsetting past i64::MIN exercises the SUB-underflow path. +query II +SELECT a, last_value(a) OVER (ORDER BY a DESC RANGE BETWEEN CURRENT ROW AND 4 FOLLOWING) +FROM ( + SELECT -9223372036854775805 AS a + UNION ALL SELECT -9223372036854775806 + UNION ALL SELECT -9223372036854775807 +); +---- +-9223372036854775805 -9223372036854775807 +-9223372036854775806 -9223372036854775807 +-9223372036854775807 -9223372036854775807 + +query II +SELECT a, first_value(a) OVER (ORDER BY a DESC RANGE BETWEEN CURRENT ROW AND 4 FOLLOWING) +FROM ( + SELECT -9223372036854775805 AS a + UNION ALL SELECT -9223372036854775806 + UNION ALL SELECT -9223372036854775807 +); +---- +-9223372036854775805 -9223372036854775805 +-9223372036854775806 -9223372036854775806 +-9223372036854775807 -9223372036854775807 + +# Unsigned ordering column: subtracting an offset that would go below 0 +# must saturate to 0, not wrap to u64::MAX. +query II +SELECT a, first_value(a) OVER (ORDER BY a RANGE BETWEEN 4 PRECEDING AND CURRENT ROW) +FROM ( + SELECT arrow_cast(1, 'UInt64') AS a + UNION ALL SELECT arrow_cast(2, 'UInt64') + UNION ALL SELECT arrow_cast(3, 'UInt64') +); +---- +1 1 +2 1 +3 1 + +query II +SELECT a, last_value(a) OVER (ORDER BY a RANGE BETWEEN 4 PRECEDING AND CURRENT ROW) +FROM ( + SELECT arrow_cast(1, 'UInt64') AS a + UNION ALL SELECT arrow_cast(2, 'UInt64') + UNION ALL SELECT arrow_cast(3, 'UInt64') +); +---- +1 1 +2 2 +3 3 + +############################################################################ +# ROWS frame regression guard: huge offsets already saturate via +# saturating_sub / min(length), verify we keep that behavior. +############################################################################ + +query II +SELECT a, last_value(a) OVER (ORDER BY a ROWS BETWEEN CURRENT ROW AND 9223372036854775807 FOLLOWING) +FROM ( + SELECT 1 AS a UNION ALL SELECT 2 UNION ALL SELECT 3 +); +---- +1 3 +2 3 +3 3 + +query II +SELECT a, first_value(a) OVER (ORDER BY a ROWS BETWEEN 9223372036854775807 PRECEDING AND CURRENT ROW) +FROM ( + SELECT 1 AS a UNION ALL SELECT 2 UNION ALL SELECT 3 +); +---- +1 1 +2 1 +3 1 + # Config reset statement ok reset datafusion.execution.batch_size; From 7f2f78d48b6d3d6aee2ce2fd29910bb4c11b1012 Mon Sep 17 00:00:00 2001 From: Oleks V Date: Wed, 13 May 2026 16:46:07 -0700 Subject: [PATCH 03/18] feat: fix AVG sliding windows wrong results with NULLs (#22139) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Which issue does this PR close? - Closes #22138 . ## Rationale for this change `AVG` used as a window aggregate can return `NaN` (and, for `Decimal` / `Duration`, panic on integer division by zero) when every value in the window frame is NULL. ```sql SELECT i, AVG(v) OVER (ORDER BY i ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) FROM (VALUES (1,1), (2,2), (3,NULL), (4,NULL)) t(i,v); ``` | i | current output | expected (DuckDB/PgSQL) | |---|----------------|-------------------| | 1 | 1.5 | 1.5 | | 2 | 2.0 | 2.0 | | 3 | **NaN** | **NULL** | | 4 | **NaN** | **NULL** | Root cause: sliding-window execution calls `Accumulator::retract_batch` as rows leave the frame. Once every contributing value has been retracted, `self.count` drops back to `0` but `self.sum` stays `Some(0.0)` (or a tiny floating-point residual). `evaluate()` then computes `sum / 0`, which yields `NaN` on `Float64`, and would panic with integer division by zero on `DecimalAvgAccumulator` and `DurationAvgAccumulator`. The non-sliding aggregation path is unaffected because there `sum` becomes `Some(_)` only after at least one non-NULL value has been added, so `count == 0` implies `sum == None`. ## What changes are included in this PR? `datafusion/functions-aggregate/src/average.rs` — guard all three affected `evaluate()` implementations with an explicit `count == 0 → None` short-circuit: - `AvgAccumulator::evaluate` (Float64) - `DecimalAvgAccumulator::evaluate` (Decimal32/64/128/256) - `DurationAvgAccumulator::evaluate` (Duration*) This matches the idiom already used by sibling retractable accumulators (`variance.rs` uses an explicit `match self.count` before division; `sum.rs` uses a `(self.count != 0).then_some(..)` guard). --- datafusion/functions-aggregate/src/average.rs | 50 +++++++++++++------ datafusion/sqllogictest/test_files/window.slt | 48 ++++++++++++++++++ 2 files changed, 83 insertions(+), 15 deletions(-) diff --git a/datafusion/functions-aggregate/src/average.rs b/datafusion/functions-aggregate/src/average.rs index bcccea381324e..24f2777797b93 100644 --- a/datafusion/functions-aggregate/src/average.rs +++ b/datafusion/functions-aggregate/src/average.rs @@ -519,9 +519,16 @@ impl Accumulator for AvgAccumulator { } fn evaluate(&mut self) -> Result { - Ok(ScalarValue::Float64( - self.sum.map(|f| f / self.count as f64), - )) + // In sliding-window mode `retract_batch` can bring `count` back to 0 + // while `sum` remains `Some(..)` (possibly zero or a floating-point + // residual). Guard against that so the frame with no non-NULL values + // yields NULL rather than NaN / ±Inf. + let avg = if self.count == 0 { + None + } else { + self.sum.map(|f| f / self.count as f64) + }; + Ok(ScalarValue::Float64(avg)) } fn size(&self) -> usize { @@ -584,17 +591,23 @@ impl Accumulator for DecimalAvgAccumu } fn evaluate(&mut self) -> Result { - let v = self - .sum - .map(|v| { - DecimalAverager::::try_new( - self.sum_scale, - self.target_precision, - self.target_scale, - )? - .avg(v, T::Native::from_usize(self.count as usize).unwrap()) - }) - .transpose()?; + // `count == 0` can occur in sliding-window mode after `retract_batch` + // removes every contributing value. Return NULL rather than dividing + // by zero (which would panic for integer decimal types). + let v = if self.count == 0 { + None + } else { + self.sum + .map(|v| { + DecimalAverager::::try_new( + self.sum_scale, + self.target_precision, + self.target_scale, + )? + .avg(v, T::Native::from_usize(self.count as usize).unwrap()) + }) + .transpose()? + }; ScalarValue::new_primitive::( v, @@ -670,7 +683,14 @@ impl Accumulator for DurationAvgAccumulator { } fn evaluate(&mut self) -> Result { - let avg = self.sum.map(|sum| sum / self.count as i64); + // Guard against `count == 0` which can happen in sliding-window mode + // after every contributing value has been retracted. Without this + // check we would integer-divide by zero. + let avg = if self.count == 0 { + None + } else { + self.sum.map(|sum| sum / self.count as i64) + }; match self.result_unit { TimeUnit::Second => Ok(ScalarValue::DurationSecond(avg)), diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 2a74660fe9fec..96b811093ecb2 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -6456,6 +6456,54 @@ FROM ( 2 1 3 1 +# AVG over a sliding window must yield NULL when the frame has no non-NULL +# values — including frames that became empty via `retract_batch`. Covers +# Float64, Decimal, and the narrow-frame retract-to-empty case. +query IR +SELECT i, AVG(v) OVER (ORDER BY i ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) +FROM (VALUES(1,1),(2,2),(3,CAST(NULL AS INT)),(4,CAST(NULL AS INT))) t(i,v) +ORDER BY i; +---- +1 1.5 +2 2 +3 NULL +4 NULL + +# All-NULL input — every frame is empty. +query IR +SELECT i, AVG(v) OVER (ORDER BY i ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) +FROM (VALUES(1,CAST(NULL AS INT)),(2,CAST(NULL AS INT))) t(i,v) +ORDER BY i; +---- +1 NULL +2 NULL + +# Narrow sliding frame that drains to empty each row. +query IR +SELECT i, AVG(v) OVER (ORDER BY i ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING) +FROM (VALUES(1,CAST(NULL AS INT)),(2,1),(3,CAST(NULL AS INT)),(4,CAST(NULL AS INT))) t(i,v) +ORDER BY i; +---- +1 NULL +2 NULL +3 1 +4 NULL + +# Decimal variant — the integer-division path would otherwise panic on an +# empty frame. +query IR +SELECT i, AVG(v) OVER (ORDER BY i ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) +FROM (VALUES(1,CAST(1.5 AS DECIMAL(10,2))), + (2,CAST(2.5 AS DECIMAL(10,2))), + (3,CAST(NULL AS DECIMAL(10,2))), + (4,CAST(NULL AS DECIMAL(10,2)))) t(i,v) +ORDER BY i; +---- +1 2 +2 2.5 +3 NULL +4 NULL + # Config reset statement ok reset datafusion.execution.batch_size; From 4fac70d9b274138b89fef8fce80dcafa378f5891 Mon Sep 17 00:00:00 2001 From: gstvg <28798827+gstvg@users.noreply.github.com> Date: Thu, 14 May 2026 05:04:02 -0300 Subject: [PATCH 04/18] Minor: Disallow async function in lambdas (#22097) ## Which issue does this PR close? Part of #22091. ## Rationale for this change Current async udfs in lambdas fail with generic errors ## What changes are included in this PR? Report an explicit error when trying to create a lambda with async functions ## Are these changes tested? One sqllogictest added to assert the friendly error ## Are there any user-facing changes? What failed before still fail but with a better error --- .../physical-expr/src/expressions/lambda.rs | 30 ++++++++++++++++--- .../sqllogictest/test_files/async_udf.slt | 10 +++++++ 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/lambda.rs b/datafusion/physical-expr/src/expressions/lambda.rs index 5e6dca1a62667..9275821ae9150 100644 --- a/datafusion/physical-expr/src/expressions/lambda.rs +++ b/datafusion/physical-expr/src/expressions/lambda.rs @@ -21,6 +21,7 @@ use std::hash::Hash; use std::sync::Arc; use crate::{ + ScalarFunctionExpr, expressions::{Column, LambdaVariable}, physical_expr::PhysicalExpr, }; @@ -61,11 +62,16 @@ impl Hash for LambdaExpr { impl LambdaExpr { /// Create a new lambda expression with the given parameters and body pub fn try_new(params: Vec, body: Arc) -> Result { - if all_unique(¶ms) { - Ok(Self::new(params, body)) - } else { - plan_err!("lambda params must be unique, got ({})", params.join(", ")) + if !all_unique(¶ms) { + return plan_err!( + "lambda params must be unique, got ({})", + params.join(", ") + ); } + + check_async_udf(&body)?; + + Ok(Self::new(params, body)) } fn new(params: Vec, body: Arc) -> Self { @@ -179,6 +185,8 @@ impl PhysicalExpr for LambdaExpr { ); }; + check_async_udf(body)?; + Ok(Arc::new(Self::new(self.params.clone(), Arc::clone(body)))) } @@ -210,6 +218,20 @@ fn all_unique(params: &[String]) -> bool { } } +fn check_async_udf(body: &Arc) -> Result<()> { + if body.exists(|expr| { + Ok(expr + .downcast_ref::() + .is_some_and(|udf| udf.fun().as_async().is_some())) + })? { + return plan_err!( + "Async functions in lambdas aren't supported, see https://github.com/apache/datafusion/issues/22091" + ); + } + + Ok(()) +} + #[cfg(test)] mod tests { use crate::expressions::{NoOp, lambda::lambda}; diff --git a/datafusion/sqllogictest/test_files/async_udf.slt b/datafusion/sqllogictest/test_files/async_udf.slt index 0708b59e519a0..678b2f7d8b8d3 100644 --- a/datafusion/sqllogictest/test_files/async_udf.slt +++ b/datafusion/sqllogictest/test_files/async_udf.slt @@ -99,3 +99,13 @@ physical_plan 01)ProjectionExec: expr=[__async_fn_0@1 as async_abs(data.x)] 02)--AsyncFuncExec: async_expr=[async_expr(name=__async_fn_0, expr=async_abs(x@0))] 03)----DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +set datafusion.sql_parser.dialect = databricks; + +# Async udf can't be used in lambdas +query error DataFusion error: Error during planning: Async functions in lambdas aren't supported, see https://github\.com/apache/datafusion/issues/22091 +select array_transform([1], v -> async_abs(v)); + +statement ok +set datafusion.sql_parser.dialect = generic; From 1af9bd797650b2dae77a83bdb9b95daff2d394d2 Mon Sep 17 00:00:00 2001 From: Liam Feehery Date: Thu, 14 May 2026 07:42:40 -0400 Subject: [PATCH 05/18] Add metrics to `FFI_ExecutionPlan` (#22136) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Which issue does this PR close? - Closes #22135 ## Rationale for this change `FFI_ExecutionPlan` exposes most of the `ExecutionPlan` trait but does not expose `metrics()`. As a result, `ForeignExecutionPlan::metrics()` falls through to the trait default (`None`), so anything downstream of an FFI boundary loses metrics. The most visible breakage is `EXPLAIN ANALYZE`, which renders empty metric blocks for foreign plans; anything calling `DisplayableExecutionPlan::with_metrics(...)` on a plan tree containing foreign nodes is similarly affected. This PR makes foreign plans behave the same as local plans for metric reporting. Metrics are passed as a snapshot, and all atomic-backed counters/gauges/timers are read into plain integer fields at marshal time. Correct because none of the in-tree consumers (`AnalyzeExec`, `DisplayableExecutionPlan`) poll metrics during streaming. ## What changes are included in this PR? - New module `datafusion/ffi/src/metrics.rs` with FFI-stable mirrors of `MetricsSet`, `Metric`, `MetricValue` (all 16 variants), `Label`, `MetricType`, `MetricCategory`, `PruningMetrics`, `RatioMetrics`, and `RatioMergeStrategy`, plus bidirectional `From` conversions. - `MetricValue::Custom { value: Arc }` is marshalled as `(name, Display output, as_usize())`. On the consumer side it is reconstructed as a small `FfiCustomMetricValue` shim that preserves `Display` and `as_usize()`. `aggregate` becomes a no-op (snapshots are not mergeable) and `as_any` only downcasts to the shim — this is the documented compromise. - `FFI_ExecutionPlan` gains a new `metrics` function pointer (appended after `repartitioned`). `ForeignExecutionPlan::metrics()` is implemented to call through it. - Two trivial accessors added to `RatioMetrics`: `merge_strategy()` and `display_raw_values()` — needed to marshal these otherwise-private fields. - `chrono` added as a direct dependency of `datafusion-ffi` (used for `Timestamp` ↔ unix-nanos conversion). ## Are these changes tested? Yes. New tests, all passing: - 7 unit tests in `datafusion/ffi/src/metrics.rs` round-trip every `MetricValue` variant individually, plus a full `Metric` (value + labels + partition + type + category) and a `MetricsSet`. - `test_ffi_execution_plan_metrics_round_trip` in `datafusion/ffi/src/execution_plan.rs` exercises the full FFI path: builds an `ExecutionPlan` with a `MetricsSet`, wraps it in `FFI_ExecutionPlan`, retrieves metrics via `ForeignExecutionPlan::metrics()` through `mock_foreign_marker_id`, and asserts the aggregated value matches. - `EmptyExec` test helper extended with `with_metrics(MetricsSet)`. Existing test suites still pass: `cargo test -p datafusion-ffi --all-features` and `cargo test -p datafusion-ffi --features integration-tests`. ## Are there any user-facing changes? Yes — this PR adds public API and makes a binary-incompatible change to `FFI_ExecutionPlan`. Please add the `api change` label. - **New public types** in `datafusion_ffi::metrics`: `FFI_MetricsSet`, `FFI_Metric`, `FFI_MetricValue`, `FFI_Label`, `FFI_MetricType`, `FFI_MetricCategory`, `FFI_PruningMetrics`, `FFI_RatioMetrics`, `FFI_RatioMergeStrategy`, and `FfiCustomMetricValue`. - **ABI break for `FFI_ExecutionPlan`**: a new `metrics` function pointer field is appended. Producers and consumers must be rebuilt together, as is already enforced by the major-version check via `datafusion_ffi::version()`. - **New public accessors** on `RatioMetrics`: `merge_strategy()` and `display_raw_values()`. Non-breaking additions. - **`MetricValue::Custom` across FFI is lossy by design**: the underlying `dyn CustomMetricValue` is not preserved; only its `Display` output and `as_usize()` snapshot survive. Documented on `FfiCustomMetricValue`. --- Cargo.lock | 1 + datafusion/ffi/Cargo.toml | 1 + datafusion/ffi/src/execution_plan.rs | 72 ++ datafusion/ffi/src/physical_expr/metrics.rs | 803 ++++++++++++++++++ datafusion/ffi/src/physical_expr/mod.rs | 1 + .../physical-expr-common/src/metrics/value.rs | 11 + 6 files changed, 889 insertions(+) create mode 100644 datafusion/ffi/src/physical_expr/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index 40d920459719c..d2ce889675b1d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2136,6 +2136,7 @@ dependencies = [ "arrow-schema", "async-ffi", "async-trait", + "chrono", "datafusion", "datafusion-catalog", "datafusion-common", diff --git a/datafusion/ffi/Cargo.toml b/datafusion/ffi/Cargo.toml index ea9a12665ad4c..7eed11c0c69e8 100644 --- a/datafusion/ffi/Cargo.toml +++ b/datafusion/ffi/Cargo.toml @@ -48,6 +48,7 @@ arrow = { workspace = true, features = ["ffi"] } arrow-schema = { workspace = true } async-ffi = { version = "0.5.0" } async-trait = { workspace = true } +chrono = { workspace = true } datafusion-catalog = { workspace = true } datafusion-common = { workspace = true } datafusion-datasource = { workspace = true } diff --git a/datafusion/ffi/src/execution_plan.rs b/datafusion/ffi/src/execution_plan.rs index 7541880a233c8..1a8c9767fbed3 100644 --- a/datafusion/ffi/src/execution_plan.rs +++ b/datafusion/ffi/src/execution_plan.rs @@ -23,6 +23,7 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{DataFusionError, Result}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion_physical_expr_common::metrics::MetricsSet; use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, }; @@ -32,6 +33,7 @@ use tokio::runtime::Handle; use crate::config::FFI_ConfigOptions; use crate::execution::FFI_TaskContext; +use crate::physical_expr::metrics::FFI_MetricsSet; use crate::plan_properties::FFI_PlanProperties; use crate::record_batch_stream::FFI_RecordBatchStream; use crate::util::{FFI_Option, FFI_Result}; @@ -68,6 +70,10 @@ pub struct FFI_ExecutionPlan { ) -> FFI_Result>, + /// Snapshot the plan's execution metrics. Returns `None` when the + /// underlying [`ExecutionPlan::metrics`] returned `None`. + pub metrics: unsafe extern "C" fn(plan: &Self) -> FFI_Option, + /// Used to create a clone on the provider of the execution plan. This should /// only need to be called by the receiver of the plan. pub clone: unsafe extern "C" fn(plan: &Self) -> Self, @@ -179,6 +185,16 @@ unsafe extern "C" fn name_fn_wrapper(plan: &FFI_ExecutionPlan) -> SString { plan.inner().name().into() } +unsafe extern "C" fn metrics_fn_wrapper( + plan: &FFI_ExecutionPlan, +) -> FFI_Option { + plan.inner() + .metrics() + .as_ref() + .map(FFI_MetricsSet::from) + .into() +} + unsafe extern "C" fn release_fn_wrapper(plan: &mut FFI_ExecutionPlan) { unsafe { debug_assert!(!plan.private_data.is_null()); @@ -270,6 +286,7 @@ impl FFI_ExecutionPlan { name: name_fn_wrapper, execute: execute_fn_wrapper, repartitioned: repartitioned_fn_wrapper, + metrics: metrics_fn_wrapper, clone: clone_fn_wrapper, release: release_fn_wrapper, private_data: Box::into_raw(private_data) as *mut c_void, @@ -431,6 +448,12 @@ impl ExecutionPlan for ForeignExecutionPlan { .map(|plan| >::try_from(&plan)) .transpose() } + + fn metrics(&self) -> Option { + let ffi: Option = + unsafe { (self.plan.metrics)(&self.plan) }.into(); + ffi.map(MetricsSet::from) + } } #[cfg(any(test, feature = "integration-tests"))] @@ -444,6 +467,7 @@ pub mod tests { pub struct EmptyExec { props: Arc, children: Vec>, + metrics: Option, } impl EmptyExec { @@ -456,8 +480,14 @@ pub mod tests { Boundedness::Bounded, )), children: Vec::default(), + metrics: None, } } + + pub fn with_metrics(mut self, metrics: MetricsSet) -> Self { + self.metrics = Some(metrics); + self + } } impl DisplayAs for EmptyExec { @@ -490,6 +520,7 @@ pub mod tests { Ok(Arc::new(EmptyExec { props: Arc::clone(&self.props), children, + metrics: self.metrics.clone(), })) } @@ -501,6 +532,10 @@ pub mod tests { unimplemented!() } + fn metrics(&self) -> Option { + self.metrics.clone() + } + fn apply_expressions( &self, f: &mut dyn FnMut( @@ -587,6 +622,43 @@ pub mod tests { Ok(()) } + #[test] + fn test_ffi_execution_plan_metrics_round_trip() -> Result<()> { + use datafusion_physical_expr_common::metrics::{Count, Metric, MetricValue}; + + let schema = Arc::new(arrow::datatypes::Schema::new(vec![ + arrow::datatypes::Field::new("a", arrow::datatypes::DataType::Float32, false), + ])); + + // Plans without metrics still return None across the boundary. + let bare_plan = Arc::new(EmptyExec::new(Arc::clone(&schema))); + let mut bare_local = FFI_ExecutionPlan::new(bare_plan, None); + bare_local.library_marker_id = crate::mock_foreign_marker_id; + let bare_foreign: Arc = (&bare_local).try_into()?; + assert!(bare_foreign.metrics().is_none()); + + // Plans with metrics produce equivalent MetricsSets after a round trip. + let mut original_metrics = MetricsSet::new(); + let c0 = Count::new(); + c0.add(11); + original_metrics + .push(Arc::new(Metric::new(MetricValue::OutputRows(c0), Some(0)))); + let c1 = Count::new(); + c1.add(31); + original_metrics + .push(Arc::new(Metric::new(MetricValue::OutputRows(c1), Some(1)))); + + let metric_plan = Arc::new(EmptyExec::new(schema).with_metrics(original_metrics)); + let mut metric_local = FFI_ExecutionPlan::new(metric_plan, None); + metric_local.library_marker_id = crate::mock_foreign_marker_id; + let metric_foreign: Arc = (&metric_local).try_into()?; + + let observed = metric_foreign.metrics().expect("metrics should be present"); + assert_eq!(observed.output_rows(), Some(42)); + + Ok(()) + } + #[test] fn test_ffi_execution_plan_local_bypass() { let schema = Arc::new(arrow::datatypes::Schema::new(vec![ diff --git a/datafusion/ffi/src/physical_expr/metrics.rs b/datafusion/ffi/src/physical_expr/metrics.rs new file mode 100644 index 0000000000000..6c29bd0ea6095 --- /dev/null +++ b/datafusion/ffi/src/physical_expr/metrics.rs @@ -0,0 +1,803 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! FFI-stable mirrors of [`MetricsSet`] and related metric types. +//! +//! Metrics are passed across the FFI boundary as a **snapshot**: all +//! atomic-backed counters/gauges/timers are read into plain integer fields +//! at conversion time. Callers re-invoke [`ExecutionPlan::metrics()`] across +//! the boundary to observe newer values. This matches the documented contract +//! ("Once `self.execute()` has returned... metrics should be complete") and +//! all in-tree consumers (`AnalyzeExec`, `DisplayableExecutionPlan`). +//! +//! The variant *order* of [`FFI_MetricValue`] is part of the stable ABI and +//! must not be reordered. New variants must be appended at the end. +//! +//! [`ExecutionPlan::metrics()`]: datafusion_physical_plan::ExecutionPlan::metrics + +use std::any::Any; +use std::borrow::Cow; +use std::fmt::{self, Debug, Display}; +use std::sync::Arc; + +use chrono::{DateTime, Utc}; +use datafusion_common::format::{MetricCategory, MetricType}; +use datafusion_physical_expr_common::metrics::{ + Count, CustomMetricValue, Gauge, MetricValue, MetricsSet, PruningMetrics, + RatioMergeStrategy, RatioMetrics, Time, Timestamp, +}; +use datafusion_physical_expr_common::metrics::{Label, Metric}; +use stabby::string::String as SString; +use stabby::vec::Vec as SVec; + +use crate::ffi_option::FFI_Option; + +/// FFI-stable mirror of [`MetricsSet`]. +#[repr(C)] +#[derive(Debug, Clone)] +pub struct FFI_MetricsSet { + pub metrics: SVec, +} + +/// FFI-stable mirror of [`Metric`]. +#[repr(C)] +#[derive(Debug, Clone)] +pub struct FFI_Metric { + pub value: FFI_MetricValue, + pub labels: SVec, + pub partition: FFI_Option, + pub metric_type: FFI_MetricType, + pub metric_category: FFI_Option, +} + +/// FFI-stable mirror of [`Label`]. +#[repr(C)] +#[derive(Debug, Clone)] +pub struct FFI_Label { + pub name: SString, + pub value: SString, +} + +/// FFI-stable mirror of [`MetricType`]. +#[expect(non_camel_case_types)] +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FFI_MetricType { + Summary, + Dev, +} + +/// FFI-stable mirror of [`MetricCategory`]. +#[expect(non_camel_case_types)] +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FFI_MetricCategory { + Rows, + Bytes, + Timing, + Uncategorized, +} + +/// FFI-stable mirror of [`PruningMetrics`]. All counts are snapshotted at +/// conversion time. +#[repr(C)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct FFI_PruningMetrics { + pub pruned: u64, + pub matched: u64, + pub fully_matched: u64, +} + +/// FFI-stable mirror of [`RatioMergeStrategy`]. +#[expect(non_camel_case_types)] +#[expect( + clippy::enum_variant_names, + reason = "match RatioMergeStrategy variants" +)] +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FFI_RatioMergeStrategy { + AddPartAddTotal, + AddPartSetTotal, + SetPartAddTotal, +} + +/// FFI-stable mirror of [`RatioMetrics`]. Numerator/denominator are +/// snapshotted at conversion time. +#[repr(C)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct FFI_RatioMetrics { + pub part: u64, + pub total: u64, + pub merge_strategy: FFI_RatioMergeStrategy, + pub display_raw_values: bool, +} + +/// FFI-stable mirror of [`MetricValue`]. +#[repr(C, u8)] +#[derive(Debug, Clone)] +pub enum FFI_MetricValue { + OutputRows(u64), + ElapsedComputeNs(u64), + SpillCount(u64), + SpilledBytes(u64), + OutputBytes(u64), + OutputBatches(u64), + SpilledRows(u64), + CurrentMemoryUsage(u64), + Count { + name: SString, + count: u64, + }, + Gauge { + name: SString, + gauge: u64, + }, + Time { + name: SString, + time_ns: u64, + }, + StartTimestampNsUTC(FFI_Option), + EndTimestampNsUTC(FFI_Option), + PruningMetrics { + name: SString, + pruning_metrics: FFI_PruningMetrics, + }, + Ratio { + name: SString, + ratio_metrics: FFI_RatioMetrics, + }, + /// Custom metrics are marshalled as their `Display` output plus the + /// `as_usize()` fallback. The underlying `dyn CustomMetricValue` type is + /// not preserved across the boundary, so `aggregate`/`as_any` downcasting + /// are lost; the reconstructed value uses [`FfiCustomMetricValue`]. + Custom { + name: SString, + display: SString, + as_usize_value: u64, + }, +} + +// ----------------------------------------------------------------------------- +// MetricsSet <-> FFI_MetricsSet +// ----------------------------------------------------------------------------- + +impl From<&MetricsSet> for FFI_MetricsSet { + fn from(set: &MetricsSet) -> Self { + Self { + metrics: set.iter().map(|m| FFI_Metric::from(m.as_ref())).collect(), + } + } +} + +impl From for MetricsSet { + fn from(set: FFI_MetricsSet) -> Self { + let mut out = MetricsSet::new(); + for ffi_metric in set.metrics { + out.push(Arc::new(Metric::from(ffi_metric))); + } + out + } +} + +// ----------------------------------------------------------------------------- +// Metric <-> FFI_Metric +// ----------------------------------------------------------------------------- + +impl From<&Metric> for FFI_Metric { + fn from(m: &Metric) -> Self { + Self { + value: FFI_MetricValue::from(m.value()), + labels: m.labels().iter().map(FFI_Label::from).collect(), + partition: m.partition().map(|p| p as u64).into(), + metric_type: m.metric_type().into(), + metric_category: m.metric_category().map(FFI_MetricCategory::from).into(), + } + } +} + +impl From for Metric { + fn from(m: FFI_Metric) -> Self { + let labels: Vec