From b57a12132820dbf3b9c3a547658f984e74baf571 Mon Sep 17 00:00:00 2001 From: albertlockett Date: Mon, 8 Jul 2024 13:01:12 -0300 Subject: [PATCH] fixup some TODOs --- rust/lance-encoding/src/encoder.rs | 4 +- .../src/encodings/physical/bitpack.rs | 70 +++++-------------- 2 files changed, 19 insertions(+), 55 deletions(-) diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index 32ac36a903..36ca5058b4 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -226,7 +226,7 @@ pub trait FieldEncoder: Send { /// /// This is called only once, after all encode tasks have completed /// - /// By default, returns an empty Vec (no column metadata buffers) + /// This returns a Vec because a single field may have created multiple columns fn finish(&mut self) -> BoxFuture<'_, Result>>; /// The number of output columns this encoding will create @@ -376,7 +376,7 @@ impl CoreBufferEncodingStrategy { return None; } - Some(BitpackingBufferEncoder::default()) + Some(BitpackingBufferEncoder::new(num_bits)) } } diff --git a/rust/lance-encoding/src/encodings/physical/bitpack.rs b/rust/lance-encoding/src/encodings/physical/bitpack.rs index ff4bf8f026..7e66757ffc 100644 --- a/rust/lance-encoding/src/encodings/physical/bitpack.rs +++ b/rust/lance-encoding/src/encodings/physical/bitpack.rs @@ -30,6 +30,7 @@ pub fn num_compressed_bits(arr: ArrayRef) -> Option { DataType::UInt16 => num_bits_for_type::(arr.as_primitive()), DataType::UInt32 => num_bits_for_type::(arr.as_primitive()), DataType::UInt64 => num_bits_for_type::(arr.as_primitive()), + // TODO -- eventually we could support signed types as well _ => None, } } @@ -49,28 +50,26 @@ where num_bits.map(|x| x.max(1)) } -#[derive(Debug, Default)] -pub struct BitpackingBufferEncoder {} +#[derive(Debug)] +pub struct BitpackingBufferEncoder { + num_bits: u64, +} + +impl BitpackingBufferEncoder { + pub fn new(num_bits: u64) -> Self { + Self { num_bits } + } +} impl BufferEncoder for BitpackingBufferEncoder { fn encode(&self, arrays: &[ArrayRef]) -> Result<(EncodedBuffer, EncodedBufferMeta)> { - // TODO -- num bits can be a struct field now that we have the strategy - let mut num_bits = 0; - for arr in arrays { - let arr_max = num_compressed_bits(arr.clone()).ok_or(Error::InvalidInput { - source: format!("Cannot compute num bits for array: {:?}", arr).into(), - location: location!(), - })?; - num_bits = num_bits.max(arr_max); - } - // calculate the total number of bytes we need to allocate for the destination. // this will be the number of items in the source array times the number of bits. let count_items = count_items_to_pack(arrays); // TODO: make function for count & round up - let mut dst_bytes_total = count_items * num_bits as usize / 8; + let mut dst_bytes_total = count_items * self.num_bits as usize / 8; // if if there's a partial byte at the end, we need to allocate one more byte - if (count_items * num_bits as usize) % 8 != 0 { + if (count_items * self.num_bits as usize) % 8 != 0 { dst_bytes_total += 1; } @@ -80,25 +79,21 @@ impl BufferEncoder for BitpackingBufferEncoder { for arr in arrays { pack_array( arr.clone(), - num_bits, + self.num_bits, &mut dst_buffer, &mut dst_idx, &mut dst_offset, )?; - // packed_arrays.push(packed.into()); } let data_type = arrays[0].data_type(); Ok(( EncodedBuffer { parts: vec![dst_buffer.into()], - // bits_per_value: (data_type.byte_width() * 8) as u64, - // bitpacked_bits_per_value: Some(num_bits), - // compression_scheme: None, }, EncodedBufferMeta { bits_per_value: (data_type.byte_width() * 8) as u64, - bitpacked_bits_per_value: Some(num_bits), + bitpacked_bits_per_value: Some(self.num_bits), compression_scheme: None, }, )) @@ -538,46 +533,15 @@ fn rows_in_buffer( #[cfg(test)] pub mod test { use super::*; - use std::{io::Read, sync::Arc}; + use std::sync::Arc; use arrow_array::{ types::{UInt16Type, UInt8Type}, - Float64Array, UInt64Array, + Float64Array, }; use lance_datagen::{array::fill, gen, ArrayGenerator, ArrayGeneratorExt, RowCount}; - #[test] - fn test_round_trip() { - let arrays = vec![Arc::new(UInt64Array::from_iter_values(vec![ - 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, - ])) as ArrayRef]; - let encoder = BitpackingBufferEncoder::default(); - let (result, _) = encoder.encode(&arrays).unwrap(); - - let parts = result.parts.clone(); - let part_0 = parts[0].clone(); - let byte_slice = part_0.bytes(); - let bytes_raw: Vec = byte_slice.into_iter().map(|e| e.unwrap()).collect(); - - let bytes = Bytes::copy_from_slice(&bytes_raw); - - let decoder = BitpackedPageDecoder { - buffer_bit_start_offsets: vec![0], - buffer_bit_end_offsets: vec![None], - bits_per_value: 2, - uncompressed_bits_per_value: 64, - data: vec![bytes], - }; - - // let dest = BytesMut::new(); - // let mut dests = vec![dest]; - let mut all_nulls = false; - let dests = decoder.decode(0, 8, &mut all_nulls).unwrap(); - - println!("{:?}", dests[0]) - } - #[test] fn test_num_compressed_bits() { fn gen_array(generator: Box) -> ArrayRef {