diff --git a/differential-dataflow/Cargo.toml b/differential-dataflow/Cargo.toml index 5a42ce9b2..0e6b74bfb 100644 --- a/differential-dataflow/Cargo.toml +++ b/differential-dataflow/Cargo.toml @@ -25,6 +25,8 @@ itertools="^0.13" graph_map = "0.1" bytemuck = "1.18.0" mimalloc = "0.1.48" +tempfile = "3" +lz4_flex = "0.11" [dependencies] columnar = { workspace = true } diff --git a/differential-dataflow/examples/columnar_spill.rs b/differential-dataflow/examples/columnar_spill.rs new file mode 100644 index 000000000..96776121f --- /dev/null +++ b/differential-dataflow/examples/columnar_spill.rs @@ -0,0 +1,690 @@ +//! Example: file-backed spill for the columnar `MergeBatcher`. +//! +//! Demonstrates `Spill` / `Fetch` / `SpillPolicy` impls modeled on TD's +//! `communication/examples/spill_stress.rs`. Spills `UpdatesTyped` chunks +//! to a tempfile via per-column `Stash::write_bytes`, fetches them back via +//! `Stash::try_from_bytes` and `Updates::into_typed`. +//! +//! Run with: `cargo run --example columnar_spill` + +use std::io::{Read, Seek, SeekFrom, Write}; +use std::marker::PhantomData; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex, OnceLock}; + +use mimalloc::MiMalloc; + +#[global_allocator] +static GLOBAL: MiMalloc = MiMalloc; + +// Static spill-policy config, read by `SpillBatcher::new` when `arrange_core` +// constructs each worker's batcher (we can't pass parameters through the +// `Batcher::new(logger, op_id)` constructor). +static ENABLE_SPILL: AtomicBool = AtomicBool::new(true); +static HEAD: AtomicUsize = AtomicUsize::new(10_000_000); +static THRESH: AtomicUsize = AtomicUsize::new(50_000_000); + +/// Cumulative bytes serialized (pre-compression) and bytes written +/// (post-compression) across all `FileSpill` instances. Lets us report a +/// compression ratio at the end of a run. +static BYTES_DECOMPRESSED: AtomicUsize = AtomicUsize::new(0); +static BYTES_COMPRESSED: AtomicUsize = AtomicUsize::new(0); + +/// Cross-worker registry of `Threshold` stats so we can sum them after a run. +static SHARED_STATS: OnceLock>>> = OnceLock::new(); + +fn register_stats(stats: Arc) { + SHARED_STATS + .get_or_init(|| Mutex::new(Vec::new())) + .lock() + .unwrap() + .push(stats); +} + +fn collect_stats() -> (usize, usize) { + if let Some(m) = SHARED_STATS.get() { + let v = m.lock().unwrap(); + let fires: usize = v.iter().map(|s| s.fires.load(Ordering::Relaxed)).sum(); + let chunks: usize = v.iter().map(|s| s.chunks_spilled.load(Ordering::Relaxed)).sum(); + (fires, chunks) + } else { + (0, 0) + } +} + +fn reset_stats() { + if let Some(m) = SHARED_STATS.get() { + m.lock().unwrap().clear(); + } + BYTES_DECOMPRESSED.store(0, Ordering::Relaxed); + BYTES_COMPRESSED.store(0, Ordering::Relaxed); +} + +use columnar::Push; +use columnar::bytes::stash::Stash; + +use differential_dataflow::columnar::{RecordedUpdates, ValBuilder, ValColBuilder, ValSpine}; +use differential_dataflow::columnar::batcher::MergeBatcher; +use differential_dataflow::columnar::layout::ColumnarUpdate as Update; +use differential_dataflow::columnar::spill::{Entry, Fetch, Spill, SpillPolicy}; +use differential_dataflow::columnar::updates::{Updates, UpdatesTyped}; +use differential_dataflow::logging::Logger; +use differential_dataflow::operators::arrange::arrangement::arrange_core; +use differential_dataflow::trace::{Batcher, Builder}; +use timely::dataflow::channels::pact::Pipeline; +use timely::dataflow::operators::probe::{Handle as ProbeHandle, Probe}; +use timely::dataflow::operators::Input; +use timely::dataflow::InputHandle; +use timely::progress::frontier::AntichainRef; +use timely::progress::{frontier::Antichain, Timestamp}; + +/// File-backed `Spill`. Serializes each chunk into a reusable `Vec` and +/// writes it with one `write_all` per chunk. Rotates to a new tempfile every +/// `ROTATE_AFTER_BYTES` so disk space is reclaimed as `FileFetch` handles are +/// consumed: once a file's last handle is dropped, the `Arc` hits zero, the +/// (already-unlinked) tempfile closes, and the OS gives the space back. +pub struct FileSpill { + /// Current write file. `None` until first spill, or after rotation if no + /// chunks have been written to a fresh file yet. + current: Option>>, + /// Bytes written to `current` so far. + current_offset: u64, + /// Reusable serialization buffer; grows to fit the largest chunk seen, + /// then sticks at that capacity (no per-chunk allocation). + buf: Vec, + _marker: PhantomData, +} + +impl FileSpill { + /// Rotate to a new tempfile after this many bytes. Sized so each file + /// holds many chunks (amortizing the file-open cost) but small enough + /// that we don't accumulate hundreds of GB on disk before any can be + /// reclaimed. + const ROTATE_AFTER_BYTES: u64 = 1 << 30; // 1 GiB + + pub fn new() -> std::io::Result { + Ok(Self { + current: None, + current_offset: 0, + buf: Vec::new(), + _marker: PhantomData, + }) + } + + fn current_file(&mut self) -> std::io::Result>> { + if self.current.is_none() || self.current_offset >= Self::ROTATE_AFTER_BYTES { + // Drop the previous Arc — outstanding `FileFetch` handles still + // hold it; once they're all consumed, the file is unlinked-closed + // and the OS reclaims its space. + self.current = Some(Arc::new(Mutex::new(tempfile::tempfile()?))); + self.current_offset = 0; + } + Ok(self.current.as_ref().unwrap().clone()) + } +} + +impl Spill> for FileSpill { + fn spill( + &mut self, + chunks: &mut Vec>, + handles: &mut Vec>>>, + ) { + while let Some(chunk) = chunks.pop() { + let updates: Updates> = chunk.into(); + let keys_len = updates.keys.length_in_bytes() as u64; + let vals_len = updates.vals.length_in_bytes() as u64; + let times_len = updates.times.length_in_bytes() as u64; + let diffs_len = updates.diffs.length_in_bytes() as u64; + let total = 32 + keys_len + vals_len + times_len + diffs_len; + + // Serialize the whole chunk (header + four columns) into the + // reusable buffer. + self.buf.clear(); + self.buf.extend_from_slice(&keys_len.to_le_bytes()); + self.buf.extend_from_slice(&vals_len.to_le_bytes()); + self.buf.extend_from_slice(×_len.to_le_bytes()); + self.buf.extend_from_slice(&diffs_len.to_le_bytes()); + updates.keys.write_bytes(&mut self.buf).unwrap(); + updates.vals.write_bytes(&mut self.buf).unwrap(); + updates.times.write_bytes(&mut self.buf).unwrap(); + updates.diffs.write_bytes(&mut self.buf).unwrap(); + debug_assert_eq!(self.buf.len() as u64, total); + + // Compress before writing. lz4 block format: caller is responsible + // for tracking the decompressed size, which we stash in the handle. + let compressed = lz4_flex::block::compress(&self.buf); + let comp_len = compressed.len() as u64; + BYTES_DECOMPRESSED.fetch_add(total as usize, Ordering::Relaxed); + BYTES_COMPRESSED.fetch_add(comp_len as usize, Ordering::Relaxed); + + let file = self.current_file().expect("tempfile"); + let start = self.current_offset; + let mut f = file.lock().unwrap(); + f.seek(SeekFrom::Start(start)).unwrap(); + f.write_all(&compressed).unwrap(); + drop(f); + self.current_offset += comp_len; + + handles.push(Box::new(FileFetch:: { + file: file.clone(), + offset: start, + compressed_len: comp_len, + decompressed_len: total, + _marker: PhantomData, + })); + } + } +} + +/// Per-chunk fetch handle. On `fetch`, reads `compressed_len` bytes at +/// `offset`, decompresses to `decompressed_len`, then parses the 32-byte +/// header + four column payloads. +pub struct FileFetch { + file: Arc>, + offset: u64, + compressed_len: u64, + decompressed_len: u64, + _marker: PhantomData, +} + +impl Fetch> for FileFetch { + fn fetch(self: Box) -> Result>, Box>>> { + // Read the compressed bytes in one shot. + let mut compressed = vec![0u8; self.compressed_len as usize]; + let mut file = self.file.lock().unwrap(); + file.seek(SeekFrom::Start(self.offset)).unwrap(); + file.read_exact(&mut compressed).unwrap(); + drop(file); + + let decompressed = lz4_flex::block::decompress(&compressed, self.decompressed_len as usize) + .expect("lz4 decompress"); + + // Parse the 32-byte header from the decompressed buffer. + let header = &decompressed[0..32]; + let keys_len = u64::from_le_bytes(header[0..8].try_into().unwrap()) as usize; + let vals_len = u64::from_le_bytes(header[8..16].try_into().unwrap()) as usize; + let times_len = u64::from_le_bytes(header[16..24].try_into().unwrap()) as usize; + let diffs_len = u64::from_le_bytes(header[24..32].try_into().unwrap()) as usize; + + // Slice the four columns out of the decompressed buffer, each into + // its own owned `Vec` (`Stash::try_from_bytes` requires owned). + let mut o = 32; + let keys_bytes = decompressed[o..o + keys_len].to_vec(); + o += keys_len; + let vals_bytes = decompressed[o..o + vals_len].to_vec(); + o += vals_len; + let times_bytes = decompressed[o..o + times_len].to_vec(); + o += times_len; + let diffs_bytes = decompressed[o..o + diffs_len].to_vec(); + + let keys = Stash::try_from_bytes(keys_bytes).unwrap(); + let vals = Stash::try_from_bytes(vals_bytes).unwrap(); + let times = Stash::try_from_bytes(times_bytes).unwrap(); + let diffs = Stash::try_from_bytes(diffs_bytes).unwrap(); + let updates: Updates> = Updates { keys, vals, times, diffs }; + Ok(vec![updates.into_typed()]) + } +} + +/// Threshold-based spill policy adapted from timely's +/// `communication::allocator::zero_copy::spill::threshold::Threshold`. +/// +/// Counts records (not bytes) for the threshold check. When the queue's +/// resident records exceed `head_reserve_records + threshold_records`, spill +/// chunks past the head reserve. Unlike TD we don't carve out the last +/// entry — TD's last entry is a `try_merge` target being extended in place; +/// our chunks are all finished, so any of them can be spilled. +pub struct Threshold { + spill: FileSpill, + /// Records near the head of the queue stay resident. + pub head_reserve_records: usize, + /// Spillable surplus: trigger when resident exceeds head + threshold. + pub threshold_records: usize, + /// Counters shared with the caller (chunks_spilled, fires). + pub stats: Arc, +} + +#[derive(Default)] +pub struct ThresholdStats { + pub fires: AtomicUsize, + pub chunks_spilled: AtomicUsize, +} + +impl Threshold { + pub fn new(spill: FileSpill, head_reserve_records: usize, threshold_records: usize) -> Self { + Self { + spill, + head_reserve_records, + threshold_records, + stats: Arc::new(ThresholdStats::default()), + } + } +} + +impl SpillPolicy> for Threshold { + fn apply(&mut self, queue: &mut std::collections::VecDeque>>) { + let resident: usize = queue.iter().map(|e| match e { + Entry::Typed(c) => c.len(), + Entry::Paged(_) => 0, + }).sum(); + if resident <= self.head_reserve_records + self.threshold_records { + return; + } + + // Walk the queue, accumulating a head reserve. Past the reserve, mark + // every Typed entry for spill. + let mut cumulative = 0usize; + let mut target_indices: Vec = Vec::new(); + for (i, entry) in queue.iter().enumerate() { + if let Entry::Typed(c) = entry { + if cumulative >= self.head_reserve_records { + target_indices.push(i); + } + cumulative += c.len(); + } + } + if target_indices.is_empty() { return; } + + // Take the targeted chunks out, leaving empty placeholders we overwrite below. + let mut targets: Vec> = Vec::with_capacity(target_indices.len()); + for &i in &target_indices { + if let Entry::Typed(c) = &mut queue[i] { + targets.push(std::mem::take(c)); + } + } + + let mut handles: Vec>>> = Vec::new(); + self.spill.spill(&mut targets, &mut handles); + // FileSpill drains via pop (LIFO); reverse so handles align with target_indices order. + handles.reverse(); + assert_eq!(target_indices.len(), handles.len()); + self.stats.fires.fetch_add(1, Ordering::Relaxed); + self.stats.chunks_spilled.fetch_add(handles.len(), Ordering::Relaxed); + for (i, handle) in target_indices.into_iter().zip(handles) { + queue[i] = Entry::Paged(handle); + } + } +} + +/// `Batcher` wrapper that installs a `Threshold` policy on a `MergeBatcher` +/// at construction time, reading config from `HEAD` / `THRESH` / `ENABLE_SPILL` +/// statics. Slots into `arrange_core` in place of `ValBatcher` and lets the +/// timely operator drive a spilling merger without surgery to the `Batcher` +/// trait signature. +pub struct SpillBatcher(MergeBatcher<(K, V, T, R)>) +where + (K, V, T, R): Update; + +impl Batcher for SpillBatcher +where + K: columnar::Columnar + 'static, + V: columnar::Columnar + 'static, + T: columnar::Columnar + Timestamp + 'static, + R: columnar::Columnar + 'static, + (K, V, T, R): Update