Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
e2b5e5c
feat: enhance schema alignment and recursion handling
kosiew May 5, 2026
f191f7c
feat(datafusion): add direct tests for align_plan_to_schema and docum…
kosiew May 6, 2026
1b39805
feat: improve schema alignment checks in execution plans
kosiew May 6, 2026
c0a6066
feat: Improve performance and clarity in common and recursive query m…
kosiew May 6, 2026
7fa72c2
feat: improve test setup and simplify validation in physical plan
kosiew May 6, 2026
4959978
feat: enhance recursive query functionality with new schema handling …
kosiew May 10, 2026
0b35795
fix: optimize recursive CTE handling to prevent SLT hang
kosiew May 10, 2026
0bdf95c
feat: refactor CTE handling and logical plan simplifications
kosiew May 10, 2026
63f62a8
feat: enhance recursive query validation and testing
kosiew May 10, 2026
8b4786f
feat: enhance recursive CTE handling in DataFusion
kosiew May 11, 2026
07112de
feat(datafusion): enhance recursive query handling and error management
kosiew May 11, 2026
4c17d7a
feat: update recursive_query and physical_planner to use references
kosiew May 11, 2026
365b9ca
Revert to 63f62a829: feat: enhance recursive query validation and tes…
kosiew May 11, 2026
973f93e
feat: enhance recursive query handling by aligning schemas and preser…
kosiew May 11, 2026
ec9aafd
feat: enhance documentation for name preservation and nullability rat…
kosiew May 11, 2026
59e8d92
Revert to 739e1471b: Add reusable plan-time schema alignment helper a…
kosiew May 13, 2026
1c990d7
Merge branch 'main' into nullability-mismatch-22034
kosiew May 13, 2026
02e1abb
feat: add SLT repro for recursive CTE and update nullability handling
kosiew May 13, 2026
869e49a
feat: refactor recursive query handling and nullability management
kosiew May 13, 2026
3389785
fix: update explain_tree.slt to reflect correct type casting in proje…
kosiew May 13, 2026
6953076
fix: correct SUM(0) -> 0 as level in recursive CTE query
kosiew May 13, 2026
4b6f9fa
feat: update reconcile_recursive_query_input_nullability to handle on…
kosiew May 13, 2026
e76aa54
feat: implement central recursive CTE schema helpers
kosiew May 13, 2026
18b06b0
fix: update type casting in projection for explain_analyze test
kosiew May 13, 2026
aa7fb40
feat: update TreeNode::exists usage and optimize CTE handling
kosiew May 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ mod null_equality;
pub mod parquet_config;
pub mod parsers;
pub mod pruning;
#[doc(hidden)]
pub mod recursive_schema;
pub mod rounding;
pub mod scalar;
pub mod spans;
Expand Down
252 changes: 252 additions & 0 deletions datafusion/common/src/recursive_schema.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Internal helpers for recursive CTE schema reconciliation.
//!
//! Recursive CTE work-table references and children must expose schemas that
//! are conservative for nullability, while preserving every other schema
//! dimension exactly.

use std::sync::Arc;

use arrow::datatypes::{FieldRef, Schema, SchemaRef};

use crate::{DFSchema, DFSchemaRef, DataFusionError, Result};

/// Return an Arrow schema with all fields marked nullable, preserving field and
/// schema metadata.
#[doc(hidden)]
pub fn make_schema_nullable(schema: &Schema) -> SchemaRef {
Arc::new(Schema::new_with_metadata(
schema
.fields()
.iter()
.map(|field| field.as_ref().clone().with_nullable(true))
.collect::<Vec<_>>(),
schema.metadata().clone(),
))
}

/// Return a recursive query output schema that preserves `static_schema` except
/// for nullability widened by `recursive_schema`.
///
/// This helper assumes recursive term expressions have already been coerced to
/// the static term's schema, and only reads field nullability from
/// `recursive_schema`. All other output schema dimensions come from
/// `static_schema`.
#[doc(hidden)]
pub fn recursive_query_output_schema(
static_schema: &DFSchema,
recursive_schema: &DFSchema,
) -> Result<DFSchemaRef> {
if static_schema.fields().len() != recursive_schema.fields().len() {
return Err(DataFusionError::Plan(format!(
"Non-recursive term and recursive term must have the same number of columns ({} != {})",
static_schema.fields().len(),
recursive_schema.fields().len()
)));
}

let fields = static_schema
.iter()
.zip(recursive_schema.fields())
.map(|((qualifier, static_field), recursive_field)| {
(
qualifier.cloned(),
static_field
.as_ref()
.clone()
.with_nullable(
static_field.is_nullable() || recursive_field.is_nullable(),
)
.into(),
)
})
.collect::<Vec<_>>();

DFSchema::new_with_metadata(fields, static_schema.metadata().clone())?
.with_functional_dependencies(static_schema.functional_dependencies().clone())
.map(DFSchemaRef::new)
}

/// Reconcile `logical_schema` with an Arrow schema, but only when the Arrow
/// schema differs by being more nullable. Returns `Ok(None)` if any other
/// schema dimension differs, so callers can report their normal schema error.
#[doc(hidden)]
pub fn reconcile_dfschema_with_schema_nullability(
logical_schema: &DFSchema,
physical_schema: &Schema,
) -> Result<Option<DFSchema>> {
if logical_schema.metadata() != physical_schema.metadata()
|| logical_schema.fields().len() != physical_schema.fields().len()
{
return Ok(None);
}

widen_dfschema_nullability_with_fields(
logical_schema,
physical_schema.fields().iter(),
)
}

fn widen_dfschema_nullability_with_fields<'a>(
base_schema: &DFSchema,
widening_fields: impl Iterator<Item = &'a FieldRef>,
) -> Result<Option<DFSchema>> {
let mut widened_nullability = false;
let mut fields = Vec::with_capacity(base_schema.fields().len());

for ((qualifier, base_field), widening_field) in
base_schema.iter().zip(widening_fields)
{
if base_field.name() != widening_field.name()
|| base_field.data_type() != widening_field.data_type()
|| base_field.metadata() != widening_field.metadata()
{
return Ok(None);
}

widened_nullability |= !base_field.is_nullable() && widening_field.is_nullable();
fields.push((
qualifier.cloned(),
base_field
.as_ref()
.clone()
.with_nullable(base_field.is_nullable() || widening_field.is_nullable())
.into(),
));
}

if !widened_nullability {
return Ok(None);
}

DFSchema::new_with_metadata(fields, base_schema.metadata().clone())?
.with_functional_dependencies(base_schema.functional_dependencies().clone())
.map(Some)
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use arrow::datatypes::{DataType, Field, Schema};

use crate::ToDFSchema as _;

use super::*;

#[test]
fn make_schema_nullable_preserves_metadata() {
let schema = Schema::new_with_metadata(
vec![
Field::new("c1", DataType::Int32, false)
.with_metadata(HashMap::from([("field".into(), "value".into())])),
],
HashMap::from([("schema".into(), "value".into())]),
);

let nullable = make_schema_nullable(&schema);

assert!(nullable.field(0).is_nullable());
assert_eq!(nullable.field(0).metadata(), schema.field(0).metadata());
assert_eq!(nullable.metadata(), schema.metadata());
}

#[test]
fn recursive_output_schema_preserves_static_dimensions_and_widens_nullability() {
let static_schema = Schema::new_with_metadata(
vec![
Field::new("anchor_name", DataType::Int32, false)
.with_metadata(HashMap::from([("field".into(), "value".into())])),
],
HashMap::from([("schema".into(), "value".into())]),
)
.to_dfschema_ref()
.unwrap();
let recursive_schema = Schema::new(vec![Field::new(
"recursive_expr_name",
DataType::Int32,
true,
)])
.to_dfschema_ref()
.unwrap();

let output =
recursive_query_output_schema(&static_schema, &recursive_schema).unwrap();

assert_eq!(output.field(0).name(), "anchor_name");
assert_eq!(output.field(0).data_type(), &DataType::Int32);
assert_eq!(
output.field(0).metadata(),
static_schema.field(0).metadata()
);
assert_eq!(output.metadata(), static_schema.metadata());
assert!(output.field(0).is_nullable());
}

#[test]
fn reconciliation_only_widens_nullability() {
let logical_schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)])
.to_dfschema_ref()
.unwrap();
let physical_schema = Schema::new(vec![Field::new("c1", DataType::Int32, true)]);

let reconciled =
reconcile_dfschema_with_schema_nullability(&logical_schema, &physical_schema)
.unwrap()
.expect("nullability widening should reconcile");

assert!(reconciled.field(0).is_nullable());
assert_eq!(reconciled.field(0).name(), "c1");
assert_eq!(reconciled.field(0).data_type(), &DataType::Int32);
}

#[test]
fn reconciliation_rejects_other_mismatches() {
let logical_schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)])
.to_dfschema_ref()
.unwrap();

let cases = [
Schema::new(vec![
Field::new("c1", DataType::Int32, true),
Field::new("c2", DataType::Int32, true),
]),
Schema::new(vec![Field::new("different", DataType::Int32, true)]),
Schema::new(vec![Field::new("c1", DataType::Int64, true)]),
Schema::new(vec![
Field::new("c1", DataType::Int32, true)
.with_metadata(HashMap::from([("key".into(), "value".into())])),
]),
Schema::new(vec![Field::new("c1", DataType::Int32, true)])
.with_metadata(HashMap::from([("key".into(), "value".into())])),
];

for physical_schema in cases {
assert!(
reconcile_dfschema_with_schema_nullability(
&logical_schema,
&physical_schema,
)
.unwrap()
.is_none(),
"should not reconcile unsupported mismatch: {physical_schema:?}"
);
}
}
}
27 changes: 27 additions & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ use datafusion_common::Column;
use datafusion_common::HashMap as DFHashMap;
use datafusion_common::display::ToStringifiedPlan;
use datafusion_common::format::ExplainAnalyzeCategories;
use datafusion_common::recursive_schema::reconcile_dfschema_with_schema_nullability;
use datafusion_common::tree_node::{
Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor,
};
Expand Down Expand Up @@ -115,6 +116,16 @@ use itertools::{Itertools, multiunzip};
use log::debug;
use tokio::sync::Mutex;

/// Aggregate planning normally verifies that the physical input schema satisfies
/// the logical input schema exactly. Recursive CTEs are an exception only for
/// nullability widening: logical planning may conservatively expose nullable
/// recursive output after the aggregate's logical input schema was derived.
fn contains_recursive_query_input(plan: &LogicalPlan) -> bool {
plan.exists(|node| Ok(matches!(node, LogicalPlan::RecursiveQuery(_))))
// Closure always returns Ok
.unwrap()
}

/// Physical query planner that converts a `LogicalPlan` to an
/// `ExecutionPlan` suitable for execution.
#[async_trait]
Expand Down Expand Up @@ -987,6 +998,22 @@ impl DefaultPhysicalPlanner {
let input_exec = children.one()?;
let physical_input_schema = input_exec.schema();
let logical_input_schema = input.as_ref().schema();
let reconciled_logical_schema;
let logical_input_schema = if schema_satisfied_by(
logical_input_schema.inner(),
&physical_input_schema,
) || !contains_recursive_query_input(input)
{
logical_input_schema
} else if let Some(schema) = reconcile_dfschema_with_schema_nullability(
logical_input_schema,
&physical_input_schema,
)? {
reconciled_logical_schema = schema;
&reconciled_logical_schema
} else {
logical_input_schema
};
let physical_input_schema_from_logical = logical_input_schema.inner();

if !options.execution.skip_physical_aggregate_schema_check
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1014,7 +1014,7 @@ async fn parquet_recursive_projection_pushdown() -> Result<()> {
SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
RecursiveQueryExec: name=number_series, is_distinct=false
CoalescePartitionsExec
ProjectionExec: expr=[id@0 as id, 1 as level]
ProjectionExec: expr=[CAST(id@0 AS Int64) as id, CAST(1 AS Int64) as level]
FilterExec: id@0 = 1
RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1
DataSourceExec: file_groups={1 group: [[TMP_DIR/hierarchy.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 = 1, pruning_predicate=id_null_count@2 != row_count@3 AND id_min@0 <= 1 AND 1 <= id_max@1, required_guarantees=[id in (1)]
Expand Down
28 changes: 26 additions & 2 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use arrow::datatypes::{DataType, Field, FieldRef, Fields, Schema, SchemaRef};
use datafusion_common::display::ToStringifiedPlan;
use datafusion_common::file_options::file_type::FileType;
use datafusion_common::metadata::FieldMetadata;
use datafusion_common::recursive_schema::recursive_query_output_schema;
use datafusion_common::{
Column, Constraints, DFSchema, DFSchemaRef, NullEquality, Result, ScalarValue,
TableReference, ToDFSchema, UnnestOptions, exec_err,
Expand All @@ -66,6 +67,20 @@ use indexmap::IndexSet;
/// Default table name for unnamed table
pub const UNNAMED_TABLE: &str = "?table?";

fn plan_with_schema(plan: LogicalPlan, schema: DFSchemaRef) -> Result<LogicalPlan> {
match plan {
LogicalPlan::Projection(Projection { expr, input, .. }) => {
Projection::try_new_with_schema(expr, input, schema)
.map(LogicalPlan::Projection)
}
_ => {
let exprs = plan.schema().iter().map(Expr::from).collect();
Projection::try_new_with_schema(exprs, Arc::new(plan), schema)
.map(LogicalPlan::Projection)
}
}
}

/// Options for [`LogicalPlanBuilder`]
#[derive(Default, Debug, Clone)]
pub struct LogicalPlanBuilderOptions {
Expand Down Expand Up @@ -192,10 +207,19 @@ impl LogicalPlanBuilder {
// Ensure that the recursive term has the same field types as the static term
let coerced_recursive_term =
coerce_plan_expr_for_schema(recursive_term, self.plan.schema())?;
let output_schema = recursive_query_output_schema(
self.plan.schema(),
coerced_recursive_term.schema(),
)?;
let static_term = plan_with_schema(
Arc::unwrap_or_clone(self.plan),
Arc::clone(&output_schema),
)?;
let recursive_term = plan_with_schema(coerced_recursive_term, output_schema)?;
Ok(Self::from(LogicalPlan::RecursiveQuery(RecursiveQuery {
name,
static_term: self.plan,
recursive_term: Arc::new(coerced_recursive_term),
static_term: Arc::new(static_term),
recursive_term: Arc::new(recursive_term),
is_distinct,
})))
}
Expand Down
12 changes: 5 additions & 7 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,8 @@ impl LogicalPlan {
LogicalPlan::Ddl(ddl) => ddl.schema(),
LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
// we take the schema of the static term as the schema of the entire recursive query
// The static term is coerced to the recursive output schema when
// building a RecursiveQuery.
static_term.schema()
}
}
Expand Down Expand Up @@ -1080,12 +1081,9 @@ impl LogicalPlan {
}) => {
self.assert_no_expressions(expr)?;
let (static_term, recursive_term) = self.only_two_inputs(inputs)?;
Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
name: name.clone(),
static_term: Arc::new(static_term),
recursive_term: Arc::new(recursive_term),
is_distinct: *is_distinct,
}))
LogicalPlanBuilder::from(static_term)
.to_recursive_query(name.clone(), recursive_term, *is_distinct)?
.build()
}
LogicalPlan::Analyze(a) => {
self.assert_no_expressions(expr)?;
Expand Down
Loading
Loading