From f1d1c3afc7761937689bb7b42511826da76adfb1 Mon Sep 17 00:00:00 2001 From: pchintar <89355405+pchintar@users.noreply.github.com> Date: Wed, 13 May 2026 21:18:37 -0400 Subject: [PATCH] Avoid zero-filling IPC reads with typed buffer handling --- arrow-buffer/src/buffer/immutable.rs | 87 +++++++- arrow-ipc/src/reader.rs | 314 +++++++++++++++++++++++---- 2 files changed, 355 insertions(+), 46 deletions(-) diff --git a/arrow-buffer/src/buffer/immutable.rs b/arrow-buffer/src/buffer/immutable.rs index a73cc55086ac..67b35c270221 100644 --- a/arrow-buffer/src/buffer/immutable.rs +++ b/arrow-buffer/src/buffer/immutable.rs @@ -15,13 +15,15 @@ // specific language governing permissions and limitations // under the License. -use std::alloc::Layout; +use std::alloc::{Layout, handle_alloc_error}; use std::fmt::Debug; +use std::ops::{Deref, DerefMut}; use std::ptr::NonNull; use std::sync::Arc; use crate::BufferBuilder; use crate::alloc::{Allocation, Deallocation}; +use crate::buffer::dangling_ptr; use crate::util::bit_chunk_iterator::{BitChunks, UnalignedBitChunk}; use crate::{bit_util, bytes::Bytes, native::ArrowNativeType}; @@ -84,6 +86,89 @@ pub struct Buffer { length: usize, } +/// An aligned byte buffer that can be filled through `Read::read_exact` and +/// converted into [`Buffer`] without copying. +/// +/// This is useful for readers that need Arrow buffer alignment without +/// first zero-initializing the allocation. +pub struct AlignedVec { + ptr: NonNull, + len: usize, + layout: Layout, +} + +impl AlignedVec { + /// Allocates `len` bytes with the requested alignment. + pub fn new(len: usize, align: usize) -> Self { + let layout = + Layout::from_size_align(len, align).expect("failed to create layout for AlignedVec"); + + let ptr = match layout.size() { + 0 => dangling_ptr(), + _ => { + // Safety: `layout` has non-zero size and was constructed above. + let raw_ptr = unsafe { std::alloc::alloc(layout) }; + NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout)) + } + }; + + Self { ptr, len, layout } + } +} + +// Allows callers such as `Read::read_exact` to view the allocated region as +// bytes after it has been filled. +impl Deref for AlignedVec { + type Target = [u8]; + + fn deref(&self) -> &[u8] { + // Safety: `ptr` points to `len` bytes owned by this AlignedVec. + unsafe { std::slice::from_raw_parts(self.ptr.as_ptr(), self.len) } + } +} + +// Allows callers such as `Read::read_exact` to write directly into the aligned +// allocation before it is converted into an Arrow buffer. +impl DerefMut for AlignedVec { + fn deref_mut(&mut self) -> &mut [u8] { + // Safety: `ptr` points to `len` bytes owned by this AlignedVec. + unsafe { std::slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len) } + } +} + +// Transfers ownership of the aligned allocation into Buffer without copying. +impl From for Buffer { + fn from(value: AlignedVec) -> Self { + // Safety: `value.ptr` was allocated with `value.layout`, and the + // resulting Bytes will deallocate it with the same layout. + let bytes = + unsafe { Bytes::new(value.ptr, value.len, Deallocation::Standard(value.layout)) }; + std::mem::forget(value); + Buffer::from(bytes) + } +} + +// Converts through Buffer so the aligned allocation is still owned through the +// normal Arrow buffer representation. +impl From for MutableBuffer { + fn from(value: AlignedVec) -> Self { + let buffer = Buffer::from(value); + buffer + .into_mutable() + .expect("AlignedVec should be uniquely owned") + } +} +// Frees the allocation if AlignedVec is dropped before ownership is transferred +// into Buffer. +impl Drop for AlignedVec { + fn drop(&mut self) { + if self.layout.size() != 0 { + // Safety: `ptr` was allocated with this exact layout in `new`. + unsafe { std::alloc::dealloc(self.ptr.as_ptr(), self.layout) } + } + } +} + impl Default for Buffer { #[inline] fn default() -> Self { diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index 1d5e06c6871c..d8df8e28125c 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -36,8 +36,9 @@ use std::io::{BufReader, Read, Seek, SeekFrom}; use std::sync::Arc; use arrow_array::*; +use arrow_buffer::alloc::ALIGNMENT; use arrow_buffer::{ - ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, NullBuffer, ScalarBuffer, + AlignedVec, ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, NullBuffer, ScalarBuffer, }; use arrow_data::{ArrayData, ArrayDataBuilder, UnsafeFlag}; use arrow_schema::*; @@ -72,6 +73,63 @@ fn read_buffer( } } } + +/// Source for IPC body buffers. +/// +/// The inner buffer contains the IPC message body. Metadata buffers store +/// offsets into this body buffer. +struct IpcBufferSource<'a>(&'a Buffer); + +impl<'a> IpcBufferSource<'a> { + fn read_buffer( + &self, + // IPC metadata entry containing the offset and length of one body buffer. + ipc_buffer_metadata: &crate::Buffer, + compression: Option, + decompression_context: &mut DecompressionContext, + ) -> Result { + // Full IPC message body containing all encoded physical buffers. + let ipc_body_buffer = self.0; + + // Slice and decode the physical buffer described by the IPC metadata entry. + read_buffer( + ipc_buffer_metadata, + ipc_body_buffer, + compression, + decompression_context, + ) + } + /// Reads a physical IPC buffer that is expected to contain `len` values of + /// type `T`. + /// + /// The returned value is still a [`Buffer`], not a [`ScalarBuffer`]. + /// This keeps final array construction on the `ArrayDataBuilder` + /// path after trimming any padding from the physical IPC buffer. + fn read_typed_buffer( + &self, + ipc_buffer_metadata: &crate::Buffer, + // Number of logical values expected in this typed physical buffer. + len: usize, + compression: Option, + decompression_context: &mut DecompressionContext, + ) -> Result { + // Expected byte length for `len` native values of `T`. + let byte_len = len + .checked_mul(std::mem::size_of::()) + .ok_or_else(|| ArrowError::IpcError("Buffer length overflow".to_string()))?; + + // Decoded bytes for this physical buffer, sliced from the IPC body. + let physical_buffer = + self.read_buffer(ipc_buffer_metadata, compression, decompression_context)?; + + if physical_buffer.len() <= byte_len { + return Ok(physical_buffer); + } + + Ok(physical_buffer.slice_with_length(0, byte_len)) + } +} + impl RecordBatchDecoder<'_> { /// Coordinates reading arrays based on data types. /// @@ -93,25 +151,46 @@ impl RecordBatchDecoder<'_> { let data_type = field.data_type(); match data_type { Utf8 | Binary | LargeBinary | LargeUtf8 => { + // Binary and string arrays use fixed-width offset buffers + // followed by the variable-width value bytes buffer. + // Read the offsets through the next_typed_buffer::() helper. let field_node = self.next_node(field)?; + let null_buffer = self.next_buffer()?; + let len = field_node.length() as usize + 1; + let offsets = match data_type { + Utf8 | Binary => self.next_typed_buffer::(len)?, + LargeBinary | LargeUtf8 => self.next_typed_buffer::(len)?, + _ => unreachable!(), + }; + let buffers = [ - self.next_buffer()?, - self.next_buffer()?, - self.next_buffer()?, + null_buffer, + offsets, + self.next_buffer()?, // value bytes ]; + self.create_primitive_array(field_node, data_type, &buffers) } + // The first buffer after the null bitmap is the fixed-width view + // buffer. Any remaining buffers are variadic data buffers. BinaryView | Utf8View => { let count = variadic_counts .pop_front() .ok_or(ArrowError::IpcError(format!( "Missing variadic count for {data_type} column" )))?; - let count = count + 2; // view and null buffer. - let buffers = (0..count) - .map(|_| self.next_buffer()) - .collect::, _>>()?; + let field_node = self.next_node(field)?; + let len = field_node.length() as usize; + + let mut buffers = Vec::with_capacity(count as usize + 2); + buffers.push(self.next_buffer()?); // null buffer + buffers.push(self.next_typed_buffer::(len)?); // views + + for _ in 0..count { + buffers.push(self.next_buffer()?); // variadic data buffers + } + self.create_primitive_array(field_node, data_type, &buffers) } FixedSizeBinary(_) => { @@ -121,17 +200,37 @@ impl RecordBatchDecoder<'_> { } List(list_field) | LargeList(list_field) | Map(list_field, _) => { let list_node = self.next_node(field)?; - let list_buffers = [self.next_buffer()?, self.next_buffer()?]; + let null_buffer = self.next_buffer()?; + + let offset_len = list_node.length() as usize + 1; + let offsets = match data_type { + List(_) | Map(_, _) => self.next_typed_buffer::(offset_len)?, + LargeList(_) => self.next_typed_buffer::(offset_len)?, + _ => unreachable!(), + }; + + let list_buffers = [null_buffer, offsets]; let values = self.create_array(list_field, variadic_counts)?; self.create_list_array(list_node, data_type, &list_buffers, values) } ListView(list_field) | LargeListView(list_field) => { let list_node = self.next_node(field)?; - let list_buffers = [ - self.next_buffer()?, // null buffer - self.next_buffer()?, // offsets - self.next_buffer()?, // sizes - ]; + let null_buffer = self.next_buffer()?; + + let len = list_node.length() as usize; + let (offsets, sizes) = match data_type { + ListView(_) => ( + self.next_typed_buffer::(len)?, + self.next_typed_buffer::(len)?, + ), + LargeListView(_) => ( + self.next_typed_buffer::(len)?, + self.next_typed_buffer::(len)?, + ), + _ => unreachable!(), + }; + + let list_buffers = [null_buffer, offsets, sizes]; let values = self.create_array(list_field, variadic_counts)?; self.create_list_view_array(list_node, data_type, &list_buffers, values) } @@ -170,10 +269,29 @@ impl RecordBatchDecoder<'_> { self.create_array_from_builder(builder) } - // Create dictionary array from RecordBatch Dictionary(_, _) => { let index_node = self.next_node(field)?; - let index_buffers = [self.next_buffer()?, self.next_buffer()?]; + let null_buffer = self.next_buffer()?; + + // Dictionary indices are fixed-width values. Read the index + // buffer through the next_typed_buffer::() helper so length handling is + // based on the physical key type. + let len = index_node.length() as usize; + let indices = match data_type { + Dictionary(key_type, _) => match key_type.as_ref() { + Int8 => self.next_typed_buffer::(len)?, + Int16 => self.next_typed_buffer::(len)?, + Int32 => self.next_typed_buffer::(len)?, + Int64 => self.next_typed_buffer::(len)?, + UInt8 => self.next_typed_buffer::(len)?, + UInt16 => self.next_typed_buffer::(len)?, + UInt32 => self.next_typed_buffer::(len)?, + UInt64 => self.next_typed_buffer::(len)?, + t => unreachable!("Unsupported dictionary key type {t:?}"), + }, + _ => unreachable!(), + }; + let index_buffers = [null_buffer, indices]; #[allow(deprecated)] let dict_id = field.dict_id().ok_or_else(|| { @@ -206,15 +324,12 @@ impl RecordBatchDecoder<'_> { self.next_buffer()?; } - let type_ids: ScalarBuffer = - self.next_buffer()?.slice_with_length(0, len).into(); - + // UnionArray requires ScalarBuffer inputs. If an external + // producer provides unaligned union buffers, align only these + // buffers before constructing the ScalarBuffer. + let type_ids = self.next_union_type_ids(len)?; let value_offsets = match mode { - UnionMode::Dense => { - let offsets: ScalarBuffer = - self.next_buffer()?.slice_with_length(0, len * 4).into(); - Some(offsets) - } + UnionMode::Dense => Some(self.next_union_offsets(len)?), UnionMode::Sparse => None, }; @@ -253,7 +368,41 @@ impl RecordBatchDecoder<'_> { } _ => { let field_node = self.next_node(field)?; - let buffers = [self.next_buffer()?, self.next_buffer()?]; + let null_buffer = self.next_buffer()?; + + // Primitive and primitive-like arrays use fixed-width physical + // buffers with widths that depend on the logical data type. + let len = field_node.length() as usize; + let values = match data_type { + Int8 => self.next_typed_buffer::(len)?, + Int16 => self.next_typed_buffer::(len)?, + Int32 => self.next_typed_buffer::(len)?, + Int64 => self.next_typed_buffer::(len)?, + UInt8 => self.next_typed_buffer::(len)?, + UInt16 => self.next_typed_buffer::(len)?, + UInt32 => self.next_typed_buffer::(len)?, + UInt64 => self.next_typed_buffer::(len)?, + Float16 => self.next_typed_buffer::(len)?, + Float32 => self.next_typed_buffer::(len)?, + Float64 => self.next_typed_buffer::(len)?, + Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) | Decimal32(_, _) => { + self.next_typed_buffer::(len)? + } + Date64 | Time64(_) | Timestamp(_, _) | Duration(_) | Decimal64(_, _) => { + self.next_typed_buffer::(len)? + } + Decimal128(_, _) => self.next_typed_buffer::(len)?, + Decimal256(_, _) => self.next_typed_buffer::(len)?, + Interval(IntervalUnit::DayTime) => { + self.next_typed_buffer::(len)? + } + Interval(IntervalUnit::MonthDayNano) => { + self.next_typed_buffer::(len)? + } + Boolean | FixedSizeBinary(_) => self.next_buffer()?, + t => unreachable!("Unsupported primitive data type {t:?}"), + }; + let buffers = [null_buffer, values]; self.create_primitive_array(field_node, data_type, &buffers) } } @@ -454,8 +603,8 @@ pub struct RecordBatchDecoder<'a> { decompression_context: DecompressionContext, /// The format version version: MetadataVersion, - /// The raw data buffer - data: &'a Buffer, + /// Source of IPC body buffers + data: IpcBufferSource<'a>, /// The fields comprising this array nodes: VectorIter<'a, FieldNode>, /// The buffers comprising this array @@ -500,7 +649,7 @@ impl<'a> RecordBatchDecoder<'a> { compression, decompression_context: DecompressionContext::new(), version: *metadata, - data: buf, + data: IpcBufferSource(buf), nodes: field_nodes.iter(), buffers: buffers.iter(), projection: None, @@ -611,17 +760,88 @@ impl<'a> RecordBatchDecoder<'a> { } } + /// Advances to the next IPC buffer described by the metadata and returns + /// the decoded physical buffer unchanged. + /// + /// This is used for buffers whose physical layout does not depend on a + /// native value type width. fn next_buffer(&mut self) -> Result { - let buffer = self.buffers.next().ok_or_else(|| { + let ipc_buffer_metadata = self.buffers.next().ok_or_else(|| { ArrowError::IpcError("Buffer count mismatched with metadata".to_string()) })?; - read_buffer( - buffer, - self.data, + + self.data.read_buffer( + ipc_buffer_metadata, self.compression, &mut self.decompression_context, ) } + /// Advances to the next IPC buffer and trims it to the expected physical + /// length for `len` values of `T`. + /// + /// This keeps typed buffer length handling in one place while leaving final + /// array construction on the existing `ArrayDataBuilder` path. + fn next_typed_buffer(&mut self, len: usize) -> Result { + let ipc_buffer_metadata = self.buffers.next().ok_or_else(|| { + ArrowError::IpcError("Buffer count mismatched with metadata".to_string()) + })?; + + self.data.read_typed_buffer::( + ipc_buffer_metadata, + len, + self.compression, + &mut self.decompression_context, + ) + } + + // UnionArray requires ScalarBuffer inputs. External IPC producers may + // provide unaligned union type id buffers, so preserve the aligned + // fast path while falling back to an aligned ScalarBuffer only when + // required. + fn next_union_type_ids(&mut self, len: usize) -> Result, ArrowError> { + let buffer = self.next_buffer()?; + if buffer.len() < len { + return Err(ArrowError::IpcError(format!( + "IPC union type id buffer length mismatch: expected at least {len}, got {}", + buffer.len() + ))); + } + + let buffer = buffer.slice_with_length(0, len); + if buffer.as_ptr().align_offset(std::mem::align_of::()) == 0 { + Ok(buffer.into()) + } else { + Ok(buffer.as_slice().iter().map(|v| *v as i8).collect()) + } + } + + // Dense union offsets are stored as i32 buffers. Preserve the aligned + // zero-copy path for normal IPC data while handling unaligned external + // buffers before constructing the ScalarBuffer. + fn next_union_offsets(&mut self, len: usize) -> Result, ArrowError> { + let byte_len = len + .checked_mul(std::mem::size_of::()) + .ok_or_else(|| ArrowError::IpcError("Buffer length overflow".to_string()))?; + + let buffer = self.next_buffer()?; + if buffer.len() < byte_len { + return Err(ArrowError::IpcError(format!( + "IPC union offset buffer length mismatch: expected at least {byte_len}, got {}", + buffer.len() + ))); + } + + let buffer = buffer.slice_with_length(0, byte_len); + if buffer.as_ptr().align_offset(std::mem::align_of::()) == 0 { + Ok(buffer.into()) + } else { + Ok(buffer + .as_slice() + .chunks_exact(std::mem::size_of::()) + .map(|chunk| i32::from_le_bytes(chunk.try_into().unwrap())) + .collect()) + } + } fn skip_buffer(&mut self) { self.buffers.next().unwrap(); @@ -902,15 +1122,19 @@ fn get_dictionary_values( Ok(dictionary_values) } -/// Read the data for a given block +/// Read the data for a given IPC file block. +/// +/// The block buffer is read into an aligned allocation without zero-filling +/// before the read. fn read_block(mut reader: R, block: &Block) -> Result { reader.seek(SeekFrom::Start(block.offset() as u64))?; let body_len = block.bodyLength().to_usize().unwrap(); let metadata_len = block.metaDataLength().to_usize().unwrap(); let total_len = body_len.checked_add(metadata_len).unwrap(); - let mut buf = MutableBuffer::from_len_zeroed(total_len); + let mut buf = AlignedVec::new(total_len, ALIGNMENT); reader.read_exact(&mut buf)?; + Ok(buf.into()) } @@ -1809,16 +2033,14 @@ impl MessageReader { } } - /// Reads the entire next message from the underlying reader which includes - /// the metadata length, the metadata, and the body. + /// Reads the next IPC message, including the metadata and body. /// /// # Returns - /// - `Ok(None)` if the the reader signals the end of stream with EOF on - /// the first read - /// - `Err(_)` if the reader returns an error other than on the first - /// read, or if the metadata length is invalid - /// - `Ok(Some(_))` with the Message and buffer containiner the - /// body bytes otherwise. + /// - `Ok(None)` if the reader signals end-of-stream before reading a + /// metadata length + /// - `Err(_)` if the reader returns an error or the IPC message is invalid + /// - `Ok(Some(_))` with the decoded message metadata and body bytes + /// otherwise fn maybe_next(&mut self) -> Result, MutableBuffer)>, ArrowError> { let meta_len = self.read_meta_len()?; let Some(meta_len) = meta_len else { @@ -1832,10 +2054,12 @@ impl MessageReader { ArrowError::ParseError(format!("Unable to get root as message: {err:?}")) })?; - let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize); - self.reader.read_exact(&mut buf)?; + // Read into an aligned allocation without zero-filling before the read. + let body_len = message.bodyLength() as usize; + let mut body_buffer = AlignedVec::new(body_len, ALIGNMENT); + self.reader.read_exact(&mut body_buffer)?; - Ok(Some((message, buf))) + Ok(Some((message, body_buffer.into()))) } /// Get a mutable reference to the underlying reader.