Skip to content

Refactor compression pipeline: shared zarr LocalStore, hybrid MPI+threads, in-memory evaluation#4

Open
kotsaloscv wants to merge 19 commits into
mainfrom
hpc_refactoring
Open

Refactor compression pipeline: shared zarr LocalStore, hybrid MPI+threads, in-memory evaluation#4
kotsaloscv wants to merge 19 commits into
mainfrom
hpc_refactoring

Conversation

@kotsaloscv
Copy link
Copy Markdown
Contributor

Summary

End-to-end refactor of the evaluate_combos / compress_with_optimal / merge_compressed_fields pipeline. The sweep is now fully in-memory, within-node parallelism moves from MPI ranks to a ThreadPoolExecutor, and compressed fields land directly in a shared {dataset}.zarr LocalStore instead of per-field .zarr.zip files that had to be unzipped and re-merged.

What changed

Storage layout

  • .zarr.zip is gone. compress_with_optimal writes each field directly into a shared {where_to_write}/{dataset}.zarr LocalStore (component = field name). merge_compressed_fields no longer unzips/copies/rezips — it just runs zarr.consolidate_metadata.
  • Commands renamed: open_zarr_zip_file_and_inspectopen_zarr_and_inspect, from_zarr_zip_to_netcdffrom_zarr_to_netcdf. Both now take a .zarr directory path.
  • utils.open_dataset drops .zarr.zip support; .nc, .grib, .zarr remain.

Parallelism (hybrid MPI + threads)

  • evaluate_combos now requires 1 MPI rank per node; within-node parallelism is a ThreadPoolExecutor. Multi-rank-per-node launches are rejected with a clear launch-string hint.
  • New helpers: detect_node_topology (MPI-3 Split_type, hostname fallback), detect_cores_available (cgroup/Slurm-aware via sched_getaffinity), compute_default_threads_per_rank.
  • New --threads-per-rank flag (auto-detected if omitted).
  • --oversubscription-check warns/aborts if OMP_NUM_THREADS / MKL_NUM_THREADS / OPENBLAS_NUM_THREADS / BLOSC_NTHREADS / NUMBA_NUM_THREADS aren't pinned to 1. Zarr v3's internal thread pool is also pinned.
  • Thread-safe progress_bar and Timer (locks around shared counters/dict).
  • All sys.exit(1) paths that could hang siblings at the next collective are now comm.Abort(1).

Evaluation pipeline

  • evaluate_codec_pipeline runs entirely against a MemoryStore — no disk I/O per combo, no zip wrapping. Error norms are accumulated chunk-wise so a full decompressed copy of the sample is never held in memory.
  • Dask is forced to the synchronous scheduler inside each thread (scoped via with dask.config.set) to prevent nested pools.
  • Per-combo failures are collected rather than fatal; counts are reduced across ranks and reported on rank 0.

Persistence path (with sharding)

  • New persist_with_codec_pipeline writes via dask.array.to_zarr with inner chunks + shards; Dask chunks are rechunked to shard shape so each write = one shard.
  • compress_with_optimal gains --inner-chunk-mib (default 16), --shard-mib (default 512), --threads, and --verify/--no-verify (on by default; disable to skip the re-read pass for trusted combos).

Representative sampling

  • New build_representative_sample: stride-samples along the leading dim via np.linspace, keeping trailing spatial dims full. Deterministic so evaluate_combos and compress_with_optimal build identical codec spaces.
  • New --eval-data-size-limit flag (default "5GB") with parse_size helper for GB/GiB/MiB/... strings. Must match between evaluate_combos and compress_with_optimal so the codec-space indices resolve to the same objects.
  • Old --field-percentage-to-compress is removed.

MPI

  • New broadcast_numpy uses Bcast (uppercase, buffer-protocol) with shape/dtype metadata piggybacked over pickle. Lifts the payload ceiling from ~2 GB (the old [buf, MPI.BYTE] path would silently cap / corrupt at the 5 GB default sample budget) to ~16 GB for float64.

Results / audit trail

  • Per-rank streaming CSV config_space_{var}_rank{N}.csv (flushed per row; survives mid-sweep crashes), consolidated into results_{var}.parquet on rank 0. Both passing and filtered-out combos are recorded, distinguished by a keep column.
  • Per-variable filenames use var (not field_to_compress or "all") so multi-field runs no longer collide.
  • analyze_clustering now takes required --where-to-write and --var flags instead of silently reading config_space.csv from cwd.

Imports / deps

  • Heavyweight optional deps (matplotlib, sklearn, plotly, tqdm) are now imported lazily inside perform_clustering / analyze_clustering. evaluate_combos and compress_with_optimal no longer pay their import cost.
  • pyproject.toml: dropped strict pins on numpy, dask, zarr, numcodecs.
  • Dockerfile: removed the pip install --force-reinstall "dask[...]" "numpy==..." line that conflicted with the loosened pins.
  • .gitignore: added *.parquet.

Breaking changes

  • .zarr.zip output and readers are removed.
  • Command renames: open_zarr_zip_file_and_inspectopen_zarr_and_inspect, from_zarr_zip_to_netcdffrom_zarr_to_netcdf.
  • evaluate_combos: where_to_write is now the flag --where-to-write (required); --field-percentage-to-compress removed; --eval-data-size-limit added.
  • compress_with_optimal: new required topology of same --eval-data-size-limit as the sweep; new --inner-chunk-mib / --shard-mib / --threads / --verify flags.
  • analyze_clustering: --where-to-write and --var are now required.
  • evaluate_combos now requires 1 MPI rank per node; relaunch with mpirun -n <NODES> --ntasks-per-node=1 ... (or srun --nodes=<N> --ntasks-per-node=1 ...).

Migration

# Old
mpirun -n 32 dc_toolkit evaluate_combos input.nc /out --field-percentage-to-compress 10
# New
mpirun -n <NODES> --ntasks-per-node=1 \
  dc_toolkit evaluate_combos input.nc \
    --where-to-write /out \
    --eval-data-size-limit 5GB

Pass the same --eval-data-size-limit to compress_with_optimal, otherwise the (comp_idx, filt_idx, ser_idx) returned by the sweep may resolve to codec objects with slightly different parameters (symptom: worse compression ratio than the sweep reported, no error).

@kotsaloscv kotsaloscv requested a review from nfarabullini April 23, 2026 09:16
Comment thread santis.run Outdated
Comment thread santis.run Outdated
Comment thread src/dc_toolkit/cli.py
Comment thread README.md
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors the compression/evaluation pipeline to eliminate per-combo disk I/O, switch evaluate_combos to a hybrid MPI (1 rank/node) + threads model, and write compressed outputs directly into a shared {dataset}.zarr LocalStore (with metadata consolidation replacing the old zip/merge flow).

Changes:

  • Reworked core utilities to support representative in-memory sampling, codec-space generation, threaded evaluation, and sharded Zarr v3 persistence.
  • Updated UIs and docs/scripts to align with the new CLI interface (--where-to-write, .zarr instead of .zarr.zip, 1-rank-per-node).
  • Loosened dependency pins and ensured the L1 threshold CSV is packaged in wheels.

Reviewed changes

Copilot reviewed 11 out of 13 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
src/dc_toolkit/utils.py New sampling, chunk/shard sizing, in-memory codec evaluation, persistence into shared LocalStore, and topology/threading helpers.
src/dc_toolkit/data/l1_error_thresholds.csv Adds local fallback table for per-variable L1 thresholds.
src/dc_toolkit/compression_analysis_ui_web.py Updates output location/CLI invocation to use out/ and --where-to-write.
src/dc_toolkit/compression_analysis_ui_vcluster.py Same UI updates for vcluster flow.
src/dc_toolkit/compression_analysis_ui_local.py Same UI updates for local PyQt flow.
santis.run New production sweep driver script aligned with 1 rank/node + threads and new flags.
README.md Documents the new end-to-end workflow, output files, and HPC topology requirements.
pyproject.toml Drops strict pins, adds psutil, and includes CSV as package data.
install_dc_toolkit.sh Updates install guidance (manual thread env pinning).
docs/PARALLELIZATION.md New documentation explaining the MPI+threads model and rationale.
Dockerfile Removes force-reinstall of pinned numpy/dask.
.gitignore Tracks the L1 threshold CSV despite global *.csv ignore; ignores parquet/manifests, etc.
Comments suppressed due to low confidence (1)

src/dc_toolkit/utils.py:1319

  • Same as above in the sync evaluation path: ratio = count_bytes / count_bytes_stored can divide by zero for empty arrays / zero stored bytes. Add a guard or clearer error handling.
    with Timer("eval.info_complete"):
        info = z.info_complete()
        count_bytes, count_bytes_stored = _info_bytes(info)
        ratio = count_bytes / count_bytes_stored


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/dc_toolkit/utils.py
Comment thread src/dc_toolkit/utils.py
Comment thread src/dc_toolkit/utils.py
Comment thread src/dc_toolkit/utils.py
Comment thread src/dc_toolkit/compression_analysis_ui_web.py Outdated
Comment thread src/dc_toolkit/compression_analysis_ui_vcluster.py Outdated
Comment thread src/dc_toolkit/compression_analysis_ui_local.py Outdated
Copilot AI and others added 2 commits May 20, 2026 11:38
…sults (vcluster)

Agent-Logs-Url: https://github.com/C2SM/data-compression/sessions/00362400-8ee0-483c-9e38-2a7b53270757

Co-authored-by: nfarabullini <41536517+nfarabullini@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants