Skip to content

Optimize memory usage of DataStore#553

Open
wudidapaopao wants to merge 8 commits intochdb-io:mainfrom
wudidapaopao:optimize_memory_usage
Open

Optimize memory usage of DataStore#553
wudidapaopao wants to merge 8 commits intochdb-io:mainfrom
wudidapaopao:optimize_memory_usage

Conversation

@wudidapaopao
Copy link
Copy Markdown
Contributor

@wudidapaopao wudidapaopao commented Mar 23, 2026

Resolved #552

Changelog category (leave one):

  • Improvement

Changelog entry

Optimize DataStore memory usage by pushing more operations to SQL and releasing intermediate data earlier

Summary

This PR reduces DataStore peak memory consumption through five strategies:

  1. SQL JOIN pushdown for merge() — avoids materializing both sides into Python DataFrames
  2. SQL UNION ALL pushdown for concat() / union() — avoids loading all concatenation operands into memory simultaneously
  3. Column pruning in expression evaluation — only fetches columns actually referenced instead of SELECT *
  4. Single-SQL GROUP BY aggregation path — pushes groupby+agg into one SQL query for SQL-backed sources
  5. Execution result checkpointing — releases old source DataFrames after execution so GC can reclaim memory

Additionally adds 43 merge/join pandas-compatibility tests and fixes several edge cases in the groupby SQL path.

Detailed Changes

1. SQL JOIN pushdown for merge() (pandas_compat.py)

Before: ds1.merge(ds2, on='key') always materialized both DataStores into Python DataFrames, then called pd.merge().

After: When both sides are SQL-backed DataStores and no pandas-only feature (index join, suffixes, sort, validate) is needed, merge() delegates to DataStore.join() which generates a SQL JOIN — neither side needs to be loaded into Python memory.

# Scenario: two Parquet files, 10M rows each, joining on user_id
ds_users = DataStore('users.parquet')      # 10M rows, 500MB
ds_orders = DataStore('orders.parquet')    # 10M rows, 800MB

# Before: peak ~1.3GB (both loaded into pandas) 
# After:  peak ~0 (SQL JOIN pushdown, only result transferred)
result = ds_users.merge(ds_orders, on='user_id', how='inner')

Falls back to pandas merge when overlapping non-key columns require suffixes (_x/_y), or when sort=True, validate, left_index/right_index, or indicator is set.

2. SQL UNION ALL pushdown for concat() / union() (core.py, pandas_api.py, pandas_compat.py)

Before: concat([ds1, ds2, ds3]) materialized all DataStores into DataFrames then called pd.concat().

After: When all inputs are SQL-backed DataStores with ignore_index=True and join='outer', generates a (SELECT ...) UNION ALL (SELECT ...) query — data stays in the SQL engine.

# Scenario: concatenate 12 monthly Parquet files
monthly_files = [DataStore(f'sales_2025_{m:02d}.parquet') for m in range(1, 13)]

# Before: peak = sum of all 12 files in memory simultaneously
# After:  single SQL UNION ALL query, only final result transferred
result = concat(monthly_files, ignore_index=True)

A new SubqueryTableFunction class wraps composite SQL expressions (UNION, subqueries) as table sources. Each SELECT is parenthesized to safely handle ORDER BY/LIMIT/SETTINGS clauses.

3. Column pruning in expression evaluation (column_expr.py)

Before: Any expression like ds['price'] * ds['quantity'] called ds._execute() which does SELECT *, loading all columns.

After: Expression evaluation inspects the AST to find referenced Field nodes, then generates SELECT price, quantity FROM ... — only the needed columns are transferred.

# Scenario: 100-column wide table, only need 2 columns for computation
ds = DataStore('wide_table.parquet')  # 100 columns, 5M rows

# Before: SELECT * → loads all 100 columns (~4GB)
# After:  SELECT price, quantity → loads 2 columns (~80MB)
result = ds['price'] * ds['quantity']

Same optimization applied to value_counts():

# Before: SELECT * then count in pandas
# After:  SELECT category_col then count
ds['category'].value_counts()

4. Single-SQL GROUP BY path (column_expr.py)

Before: ds.groupby('dept')['salary'].mean() first executed ds._execute() (full table download), then performed groupby+agg via chDB on the local DataFrame.

After: For SQL-backed sources, generates a single SELECT dept, AVG(salary) FROM ... GROUP BY dept query — no intermediate full-table materialization.

# Scenario: aggregate a large remote table
ds = DataStore('employee_records.parquet')  # 5M rows, 50 columns

# Before: downloads entire 5M×50 table, then groupby locally
# After:  single SQL query, returns only ~100 department-level rows
result = ds.groupby('department')['salary'].mean()

Handles edge cases:

  • Skips when execution_engine='pandas' to avoid floating-point precision differences
  • Skips when WHERE clause references the aggregation column (avoids chDB ILLEGAL_AGGREGATION)
  • Filters NULL/empty groups after SQL GROUP BY to match pandas dropna=True behavior

5. SQL fillna() optimization (column_expr.py)

Before: ds['col'].fillna(0) always added a lazy pandas operation, requiring full materialization.

After: For numeric scalar fills on SQL-backed sources, uses SQL ifNull(col, 0) which stays in the expression tree — no materialization needed.

ds = DataStore('data.parquet')

# Before: materialize full column, then fillna in pandas
# After:  ifNull() expression pushed into SQL, zero materialization
result = ds['revenue'].fillna(0).sum()

6. Execution result checkpointing (core.py)

Before: After executing a mixed SQL+pandas pipeline, the original source DataFrame was retained in memory alongside the result.

After: After successful execution, _source_df is updated to point to the result DataFrame, allowing the old source to be garbage collected.

ds = DataStore.from_dataframe(large_df)  # large_df held in _source_df
ds = ds[ds['status'] == 'active']        # filter added as lazy op
result = repr(ds)                         # triggers execution

# Before: large_df still in memory (referenced by _source_df)
# After:  _source_df now points to the filtered result, large_df is GC-eligible

7. .columns metadata optimization (core.py)

Before: Accessing ds.columns after assign() or join() triggered full execution.

After: Derives column names from schema + computed columns metadata, or probes with LIMIT 0 for JOINs — avoids materializing full results.

ds = DataStore('large.parquet')
ds = ds.assign(profit=ds['revenue'] - ds['cost'])

# Before: ds.columns → full execution to discover column names
# After:  ds.columns → derived from schema metadata, zero data transfer

8. Test coverage (test_merge_join_compat.py)

43 new pandas-compatibility tests covering:

  • Overlapping non-key columns (suffixes handling, fallback to pandas)
  • left_on / right_on with different column names
  • Different column orders between left and right
  • sort parameter in merge (fallback detection)
  • All join types: inner, left, right, outer
  • from_df() DataStore merge (in-memory path)
  • File-backed DataStore merge (SQL JOIN path)
  • SQL UNION ALL column ordering in concat()/union()

Documentation entry for user-facing changes

  • Documentation is written (mandatory for new features)

Information about CI checks: https://clickhouse.com/docs/en/development/continuous-integration/

CI Settings

NOTE: If your merge the PR with modified CI you MUST KNOW what you are doing
NOTE: Checked options will be applied if set before CI RunConfig/PrepareRunConfig step

Run these jobs only (required builds will be added automatically):

  • Integration Tests
  • Stateless tests
  • Stateful tests
  • Unit tests
  • Performance tests
  • All with aarch64
  • All with ASAN
  • All with TSAN
  • All with Analyzer
  • All with Azure
  • Add your option here

Deny these jobs:

  • Fast test
  • Integration Tests
  • Stateless tests
  • Stateful tests
  • Performance tests
  • All with ASAN
  • All with TSAN
  • All with MSAN
  • All with UBSAN
  • All with Coverage
  • All with Aarch64

Extra options:

  • do not test (only style check)
  • disable merge-commit (no merge from master before tests)
  • disable CI cache (job reuse)

Only specified batches in multi-batch jobs:

  • 1
  • 2
  • 3
  • 4

- Column pruning, SQL groupby/fillna/union pushdown to reduce memory
- Detect overlapping non-key columns in merge() to fallback for suffixes
- Fallback to pandas merge when sort=True or validate is set
- Add 43 merge/join compatibility tests
Prevents ClickHouse ILLEGAL_AGGREGATION when WHERE column name
conflicts with aggregate alias (e.g. filter→assign→groupby chain).
UNION ALL resets index, incompatible with pandas concat default
(ignore_index=False) which preserves original indices.
Filter NULL/empty groups after SQL GROUP BY to match pandas dropna=True.
Let exceptions propagate instead of silently falling back.
Skip the single-SQL GROUP BY optimization when execution_engine is
explicitly set to 'pandas', avoiding floating-point precision
differences between chDB and pandas aggregation.
Replace the broad has_existing_lazy_ops guard with targeted detection
of WHERE clauses that reference the aggregation column. This allows
more queries (e.g., filter on column A + groupby agg on column B) to
use the memory-saving single-SQL path while still avoiding chDB's
ILLEGAL_AGGREGATION when WHERE and the aggregation alias collide.
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR targets lower peak memory usage in DataStore by increasing SQL pushdown opportunities (JOIN/UNION/GROUP BY/value_counts) and by checkpointing execution results to release now-unused intermediate sources, while adding an extensive pandas-compatibility test suite for merge/join behaviors.

Changes:

  • Add lazy SQL paths for merge() (JOIN) and concat() (UNION ALL) when inputs are SQL-backed and features are compatible.
  • Introduce column-pruning and single-SQL execution paths in ColumnExpr/groupby to avoid materializing wide intermediates.
  • Add new merge/join pandas-compatibility test coverage, plus SQL helper support (SubqueryTableFunction) and row-order SQL clause handling updates.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
datastore/tests/test_merge_join_compat.py New pandas-compat coverage for merge/join edge cases, file-backed joins, and concat/union column-ordering.
datastore/table_functions.py Adds SubqueryTableFunction to treat composite SQL (e.g., UNION) as a table source.
datastore/pandas_compat.py Adds lazy SQL fast-paths for merge() and concat(), and checkpoints loc.__setitem__ mutations into a DataFrame source.
datastore/pandas_api.py Adds a module-level concat() UNION ALL fast-path for DataStore inputs.
datastore/core.py Improves SQL-source “pristine” detection, reduces retained memory after execution, improves columns metadata for SQL/join cases, and adds SQL UNION implementation in union().
datastore/connection.py Updates row-order preservation SQL rewrite to insert before SETTINGS as well as LIMIT/OFFSET.
datastore/column_expr.py Adds column pruning for expression/value_counts/groupby and introduces a single-SQL groupby aggregation path for SQL-backed sources.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread datastore/core.py
Comment thread datastore/core.py Outdated
Comment thread datastore/core.py
Comment thread datastore/pandas_compat.py
Comment thread datastore/tests/test_merge_join_compat.py Outdated
@CLAassistant
Copy link
Copy Markdown

CLAassistant commented Apr 16, 2026

CLA assistant check
All committers have signed the CLA.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

DataStore memory usage exceeds pandas for analytics operations on large datasets

3 participants