From a21978e073dced0860862aec80d322f0564ed3e3 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Sat, 15 Feb 2025 02:43:30 +0000 Subject: [PATCH] ffi_stream.rs: Align buffers when importing arrays --- arrow-array/src/ffi_stream.rs | 8 +++++++- arrow-data/src/data.rs | 22 +++++++++++++--------- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/arrow-array/src/ffi_stream.rs b/arrow-array/src/ffi_stream.rs index 3d4e89e80b89..6c7bc8152484 100644 --- a/arrow-array/src/ffi_stream.rs +++ b/arrow-array/src/ffi_stream.rs @@ -364,7 +364,13 @@ impl Iterator for ArrowArrayStreamReader { let result = unsafe { from_ffi_and_data_type(array, DataType::Struct(self.schema().fields().clone())) }; - Some(result.map(|data| RecordBatch::from(StructArray::from(data)))) + Some(result.map(|mut data| { + // Ensure data is aligned (by potentially copying some buffers). + // This is needed because some Arrow C Data Interface sources(e.g. + // ADBC drivers) may produce unaligned buffers. + data.align_buffers(); + RecordBatch::from(StructArray::from(data)) + })) } else { let last_error = self.get_stream_last_error(); let err = ArrowError::CDataInterface(last_error.unwrap()); diff --git a/arrow-data/src/data.rs b/arrow-data/src/data.rs index e123d0cef902..e919e24fed60 100644 --- a/arrow-data/src/data.rs +++ b/arrow-data/src/data.rs @@ -697,17 +697,21 @@ impl ArrayData { /// /// This also aligns buffers of children data pub fn align_buffers(&mut self) { - let layout = layout(&self.data_type); - for (buffer, spec) in self.buffers.iter_mut().zip(&layout.buffers) { - if let BufferSpec::FixedWidth { alignment, .. } = spec { - if buffer.as_ptr().align_offset(*alignment) != 0 { - *buffer = Buffer::from_slice_ref(buffer.as_ref()); + // use a dynamic stack to avoid stack overflow for very deeply nested arrays + let mut stack: Vec<&mut ArrayData> = vec![self]; + while let Some(data) = stack.pop() { + let layout = layout(&data.data_type); + for (buffer, spec) in data.buffers.iter_mut().zip(&layout.buffers) { + if let BufferSpec::FixedWidth { alignment, .. } = spec { + if buffer.as_ptr().align_offset(*alignment) != 0 { + *buffer = Buffer::from_slice_ref(buffer.as_ref()); + } } } - } - // align children data recursively - for data in self.child_data.iter_mut() { - data.align_buffers() + // align children data recursively + for child in data.child_data.iter_mut() { + stack.push(child); + } } }