From 37e408b01e47faf51f856c0ff8c46832d10430d6 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 8 Feb 2025 09:24:07 -0500 Subject: [PATCH] Refactor arrow-ipc: Move `create_*_array` methods into `RecordBatchDecoder` (#7029) * Move `create_primitive_array` into RecordBatchReader * Move `create_list-array` into RecordBatchReader * Move `create_dictionay_array` into RecordBatchReader --- arrow-ipc/src/reader.rs | 228 ++++++++++++++++++---------------------- 1 file changed, 104 insertions(+), 124 deletions(-) diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index e79ab232114..2ab61817775 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -85,16 +85,15 @@ impl RecordBatchDecoder<'_> { ) -> Result { let data_type = field.data_type(); match data_type { - Utf8 | Binary | LargeBinary | LargeUtf8 => create_primitive_array( - self.next_node(field)?, - data_type, - &[ + Utf8 | Binary | LargeBinary | LargeUtf8 => { + let field_node = self.next_node(field)?; + let buffers = [ self.next_buffer()?, self.next_buffer()?, self.next_buffer()?, - ], - self.require_alignment, - ), + ]; + self.create_primitive_array(field_node, data_type, &buffers) + } BinaryView | Utf8View => { let count = variadic_counts .pop_front() @@ -105,42 +104,25 @@ impl RecordBatchDecoder<'_> { let buffers = (0..count) .map(|_| self.next_buffer()) .collect::, _>>()?; - create_primitive_array( - self.next_node(field)?, - data_type, - &buffers, - self.require_alignment, - ) + let field_node = self.next_node(field)?; + self.create_primitive_array(field_node, data_type, &buffers) + } + FixedSizeBinary(_) => { + let field_node = self.next_node(field)?; + let buffers = [self.next_buffer()?, self.next_buffer()?]; + self.create_primitive_array(field_node, data_type, &buffers) } - FixedSizeBinary(_) => create_primitive_array( - self.next_node(field)?, - data_type, - &[self.next_buffer()?, self.next_buffer()?], - self.require_alignment, - ), List(ref list_field) | LargeList(ref list_field) | Map(ref list_field, _) => { let list_node = self.next_node(field)?; let list_buffers = [self.next_buffer()?, self.next_buffer()?]; let values = self.create_array(list_field, variadic_counts)?; - create_list_array( - list_node, - data_type, - &list_buffers, - values, - self.require_alignment, - ) + self.create_list_array(list_node, data_type, &list_buffers, values) } FixedSizeList(ref list_field, _) => { let list_node = self.next_node(field)?; let list_buffers = [self.next_buffer()?]; let values = self.create_array(list_field, variadic_counts)?; - create_list_array( - list_node, - data_type, - &list_buffers, - values, - self.require_alignment, - ) + self.create_list_array(list_node, data_type, &list_buffers, values) } Struct(struct_fields) => { let struct_node = self.next_node(field)?; @@ -205,12 +187,11 @@ impl RecordBatchDecoder<'_> { )) })?; - create_dictionary_array( + self.create_dictionary_array( index_node, data_type, &index_buffers, value_array.clone(), - self.require_alignment, ) } Union(fields, mode) => { @@ -265,107 +246,106 @@ impl RecordBatchDecoder<'_> { // no buffer increases Ok(Arc::new(NullArray::from(array_data))) } - _ => create_primitive_array( - self.next_node(field)?, - data_type, - &[self.next_buffer()?, self.next_buffer()?], - self.require_alignment, - ), + _ => { + let field_node = self.next_node(field)?; + let buffers = [self.next_buffer()?, self.next_buffer()?]; + self.create_primitive_array(field_node, data_type, &buffers) + } } } -} -/// Reads the correct number of buffers based on data type and null_count, and creates a -/// primitive array ref -fn create_primitive_array( - field_node: &FieldNode, - data_type: &DataType, - buffers: &[Buffer], - require_alignment: bool, -) -> Result { - let length = field_node.length() as usize; - let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone()); - let builder = match data_type { - Utf8 | Binary | LargeBinary | LargeUtf8 => { - // read 3 buffers: null buffer (optional), offsets buffer and data buffer - ArrayData::builder(data_type.clone()) - .len(length) - .buffers(buffers[1..3].to_vec()) - .null_bit_buffer(null_buffer) - } - BinaryView | Utf8View => ArrayData::builder(data_type.clone()) - .len(length) - .buffers(buffers[1..].to_vec()) - .null_bit_buffer(null_buffer), - _ if data_type.is_primitive() || matches!(data_type, Boolean | FixedSizeBinary(_)) => { - // read 2 buffers: null buffer (optional) and data buffer - ArrayData::builder(data_type.clone()) + /// Reads the correct number of buffers based on data type and null_count, and creates a + /// primitive array ref + fn create_primitive_array( + &self, + field_node: &FieldNode, + data_type: &DataType, + buffers: &[Buffer], + ) -> Result { + let length = field_node.length() as usize; + let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone()); + let builder = match data_type { + Utf8 | Binary | LargeBinary | LargeUtf8 => { + // read 3 buffers: null buffer (optional), offsets buffer and data buffer + ArrayData::builder(data_type.clone()) + .len(length) + .buffers(buffers[1..3].to_vec()) + .null_bit_buffer(null_buffer) + } + BinaryView | Utf8View => ArrayData::builder(data_type.clone()) .len(length) - .add_buffer(buffers[1].clone()) - .null_bit_buffer(null_buffer) - } - t => unreachable!("Data type {:?} either unsupported or not primitive", t), - }; + .buffers(buffers[1..].to_vec()) + .null_bit_buffer(null_buffer), + _ if data_type.is_primitive() || matches!(data_type, Boolean | FixedSizeBinary(_)) => { + // read 2 buffers: null buffer (optional) and data buffer + ArrayData::builder(data_type.clone()) + .len(length) + .add_buffer(buffers[1].clone()) + .null_bit_buffer(null_buffer) + } + t => unreachable!("Data type {:?} either unsupported or not primitive", t), + }; - let array_data = builder.align_buffers(!require_alignment).build()?; + let array_data = builder.align_buffers(!self.require_alignment).build()?; - Ok(make_array(array_data)) -} + Ok(make_array(array_data)) + } -/// Reads the correct number of buffers based on list type and null_count, and creates a -/// list array ref -fn create_list_array( - field_node: &FieldNode, - data_type: &DataType, - buffers: &[Buffer], - child_array: ArrayRef, - require_alignment: bool, -) -> Result { - let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone()); - let length = field_node.length() as usize; - let child_data = child_array.into_data(); - let builder = match data_type { - List(_) | LargeList(_) | Map(_, _) => ArrayData::builder(data_type.clone()) - .len(length) - .add_buffer(buffers[1].clone()) - .add_child_data(child_data) - .null_bit_buffer(null_buffer), - - FixedSizeList(_, _) => ArrayData::builder(data_type.clone()) - .len(length) - .add_child_data(child_data) - .null_bit_buffer(null_buffer), - - _ => unreachable!("Cannot create list or map array from {:?}", data_type), - }; + /// Reads the correct number of buffers based on list type and null_count, and creates a + /// list array ref + fn create_list_array( + &self, + field_node: &FieldNode, + data_type: &DataType, + buffers: &[Buffer], + child_array: ArrayRef, + ) -> Result { + let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone()); + let length = field_node.length() as usize; + let child_data = child_array.into_data(); + let builder = match data_type { + List(_) | LargeList(_) | Map(_, _) => ArrayData::builder(data_type.clone()) + .len(length) + .add_buffer(buffers[1].clone()) + .add_child_data(child_data) + .null_bit_buffer(null_buffer), - let array_data = builder.align_buffers(!require_alignment).build()?; + FixedSizeList(_, _) => ArrayData::builder(data_type.clone()) + .len(length) + .add_child_data(child_data) + .null_bit_buffer(null_buffer), - Ok(make_array(array_data)) -} + _ => unreachable!("Cannot create list or map array from {:?}", data_type), + }; -/// Reads the correct number of buffers based on list type and null_count, and creates a -/// list array ref -fn create_dictionary_array( - field_node: &FieldNode, - data_type: &DataType, - buffers: &[Buffer], - value_array: ArrayRef, - require_alignment: bool, -) -> Result { - if let Dictionary(_, _) = *data_type { - let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone()); - let array_data = ArrayData::builder(data_type.clone()) - .len(field_node.length() as usize) - .add_buffer(buffers[1].clone()) - .add_child_data(value_array.into_data()) - .null_bit_buffer(null_buffer) - .align_buffers(!require_alignment) - .build()?; + let array_data = builder.align_buffers(!self.require_alignment).build()?; Ok(make_array(array_data)) - } else { - unreachable!("Cannot create dictionary array from {:?}", data_type) + } + + /// Reads the correct number of buffers based on list type and null_count, and creates a + /// list array ref + fn create_dictionary_array( + &self, + field_node: &FieldNode, + data_type: &DataType, + buffers: &[Buffer], + value_array: ArrayRef, + ) -> Result { + if let Dictionary(_, _) = *data_type { + let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone()); + let array_data = ArrayData::builder(data_type.clone()) + .len(field_node.length() as usize) + .add_buffer(buffers[1].clone()) + .add_child_data(value_array.into_data()) + .null_bit_buffer(null_buffer) + .align_buffers(!self.require_alignment) + .build()?; + + Ok(make_array(array_data)) + } else { + unreachable!("Cannot create dictionary array from {:?}", data_type) + } } }