Skip to content

Commit

Permalink
ffi_stream.rs: Align buffers when importing arrays
Browse files Browse the repository at this point in the history
  • Loading branch information
felipecrv committed Feb 15, 2025
1 parent 2375c4f commit a21978e
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 10 deletions.
8 changes: 7 additions & 1 deletion arrow-array/src/ffi_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,13 @@ impl Iterator for ArrowArrayStreamReader {
let result = unsafe {
from_ffi_and_data_type(array, DataType::Struct(self.schema().fields().clone()))
};
Some(result.map(|data| RecordBatch::from(StructArray::from(data))))
Some(result.map(|mut data| {
// Ensure data is aligned (by potentially copying some buffers).
// This is needed because some Arrow C Data Interface sources(e.g.
// ADBC drivers) may produce unaligned buffers.
data.align_buffers();
RecordBatch::from(StructArray::from(data))
}))
} else {
let last_error = self.get_stream_last_error();
let err = ArrowError::CDataInterface(last_error.unwrap());
Expand Down
22 changes: 13 additions & 9 deletions arrow-data/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,17 +697,21 @@ impl ArrayData {
///
/// This also aligns buffers of children data
pub fn align_buffers(&mut self) {
let layout = layout(&self.data_type);
for (buffer, spec) in self.buffers.iter_mut().zip(&layout.buffers) {
if let BufferSpec::FixedWidth { alignment, .. } = spec {
if buffer.as_ptr().align_offset(*alignment) != 0 {
*buffer = Buffer::from_slice_ref(buffer.as_ref());
// use a dynamic stack to avoid stack overflow for very deeply nested arrays
let mut stack: Vec<&mut ArrayData> = vec![self];
while let Some(data) = stack.pop() {
let layout = layout(&data.data_type);
for (buffer, spec) in data.buffers.iter_mut().zip(&layout.buffers) {
if let BufferSpec::FixedWidth { alignment, .. } = spec {
if buffer.as_ptr().align_offset(*alignment) != 0 {
*buffer = Buffer::from_slice_ref(buffer.as_ref());
}
}
}
}
// align children data recursively
for data in self.child_data.iter_mut() {
data.align_buffers()
// align children data recursively
for child in data.child_data.iter_mut() {
stack.push(child);
}
}
}

Expand Down

0 comments on commit a21978e

Please sign in to comment.