Skip to content

Commit

Permalink
fixup some TODOs
Browse files Browse the repository at this point in the history
  • Loading branch information
albertlockett committed Jul 8, 2024
1 parent afeef76 commit b57a121
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 55 deletions.
4 changes: 2 additions & 2 deletions rust/lance-encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ pub trait FieldEncoder: Send {
///
/// This is called only once, after all encode tasks have completed
///
/// By default, returns an empty Vec (no column metadata buffers)
/// This returns a Vec because a single field may have created multiple columns
fn finish(&mut self) -> BoxFuture<'_, Result<Vec<EncodedColumn>>>;

/// The number of output columns this encoding will create
Expand Down Expand Up @@ -376,7 +376,7 @@ impl CoreBufferEncodingStrategy {
return None;
}

Some(BitpackingBufferEncoder::default())
Some(BitpackingBufferEncoder::new(num_bits))
}
}

Expand Down
70 changes: 17 additions & 53 deletions rust/lance-encoding/src/encodings/physical/bitpack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub fn num_compressed_bits(arr: ArrayRef) -> Option<u64> {
DataType::UInt16 => num_bits_for_type::<UInt16Type>(arr.as_primitive()),
DataType::UInt32 => num_bits_for_type::<UInt32Type>(arr.as_primitive()),
DataType::UInt64 => num_bits_for_type::<UInt64Type>(arr.as_primitive()),
// TODO -- eventually we could support signed types as well
_ => None,
}
}
Expand All @@ -49,28 +50,26 @@ where
num_bits.map(|x| x.max(1))
}

#[derive(Debug, Default)]
pub struct BitpackingBufferEncoder {}
#[derive(Debug)]
pub struct BitpackingBufferEncoder {
num_bits: u64,
}

impl BitpackingBufferEncoder {
pub fn new(num_bits: u64) -> Self {
Self { num_bits }
}
}

impl BufferEncoder for BitpackingBufferEncoder {
fn encode(&self, arrays: &[ArrayRef]) -> Result<(EncodedBuffer, EncodedBufferMeta)> {
// TODO -- num bits can be a struct field now that we have the strategy
let mut num_bits = 0;
for arr in arrays {
let arr_max = num_compressed_bits(arr.clone()).ok_or(Error::InvalidInput {
source: format!("Cannot compute num bits for array: {:?}", arr).into(),
location: location!(),
})?;
num_bits = num_bits.max(arr_max);
}

// calculate the total number of bytes we need to allocate for the destination.
// this will be the number of items in the source array times the number of bits.
let count_items = count_items_to_pack(arrays);
// TODO: make function for count & round up
let mut dst_bytes_total = count_items * num_bits as usize / 8;
let mut dst_bytes_total = count_items * self.num_bits as usize / 8;
// if if there's a partial byte at the end, we need to allocate one more byte
if (count_items * num_bits as usize) % 8 != 0 {
if (count_items * self.num_bits as usize) % 8 != 0 {
dst_bytes_total += 1;
}

Expand All @@ -80,25 +79,21 @@ impl BufferEncoder for BitpackingBufferEncoder {
for arr in arrays {
pack_array(
arr.clone(),
num_bits,
self.num_bits,
&mut dst_buffer,
&mut dst_idx,
&mut dst_offset,
)?;
// packed_arrays.push(packed.into());
}

let data_type = arrays[0].data_type();
Ok((
EncodedBuffer {
parts: vec![dst_buffer.into()],
// bits_per_value: (data_type.byte_width() * 8) as u64,
// bitpacked_bits_per_value: Some(num_bits),
// compression_scheme: None,
},
EncodedBufferMeta {
bits_per_value: (data_type.byte_width() * 8) as u64,
bitpacked_bits_per_value: Some(num_bits),
bitpacked_bits_per_value: Some(self.num_bits),
compression_scheme: None,
},
))
Expand Down Expand Up @@ -538,46 +533,15 @@ fn rows_in_buffer(
#[cfg(test)]
pub mod test {
use super::*;
use std::{io::Read, sync::Arc};
use std::sync::Arc;

use arrow_array::{
types::{UInt16Type, UInt8Type},
Float64Array, UInt64Array,
Float64Array,
};

use lance_datagen::{array::fill, gen, ArrayGenerator, ArrayGeneratorExt, RowCount};

#[test]
fn test_round_trip() {
let arrays = vec![Arc::new(UInt64Array::from_iter_values(vec![
1, 2, 1, 2, 1, 2, 1, 2, 1, 2,
])) as ArrayRef];
let encoder = BitpackingBufferEncoder::default();
let (result, _) = encoder.encode(&arrays).unwrap();

let parts = result.parts.clone();
let part_0 = parts[0].clone();
let byte_slice = part_0.bytes();
let bytes_raw: Vec<u8> = byte_slice.into_iter().map(|e| e.unwrap()).collect();

let bytes = Bytes::copy_from_slice(&bytes_raw);

let decoder = BitpackedPageDecoder {
buffer_bit_start_offsets: vec![0],
buffer_bit_end_offsets: vec![None],
bits_per_value: 2,
uncompressed_bits_per_value: 64,
data: vec![bytes],
};

// let dest = BytesMut::new();
// let mut dests = vec![dest];
let mut all_nulls = false;
let dests = decoder.decode(0, 8, &mut all_nulls).unwrap();

println!("{:?}", dests[0])
}

#[test]
fn test_num_compressed_bits() {
fn gen_array(generator: Box<dyn ArrayGenerator>) -> ArrayRef {
Expand Down

0 comments on commit b57a121

Please sign in to comment.