Skip to content

Commit

Permalink
Add Validity enum (#136)
Browse files Browse the repository at this point in the history
FLUP: we should probably just turn this into an array and implement the
array trait for it?
  • Loading branch information
gatesn authored Mar 25, 2024
1 parent 87be047 commit 308fde9
Show file tree
Hide file tree
Showing 61 changed files with 702 additions and 544 deletions.
7 changes: 7 additions & 0 deletions vortex-alp/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use vortex::formatter::{ArrayDisplay, ArrayFormatter};
use vortex::impl_array;
use vortex::serde::{ArraySerde, EncodingSerde};
use vortex::stats::{Stats, StatsSet};
use vortex::validity::{ArrayValidity, Validity};
use vortex_error::{VortexError, VortexResult};
use vortex_schema::{DType, IntWidth, Signedness};

Expand Down Expand Up @@ -123,6 +124,12 @@ impl ArrayDisplay for ALPArray {
}
}

impl ArrayValidity for ALPArray {
fn validity(&self) -> Option<Validity> {
self.encoded().validity()
}
}

#[derive(Debug)]
pub struct ALPEncoding;

Expand Down
12 changes: 6 additions & 6 deletions vortex-alp/src/compress.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use itertools::Itertools;

use crate::alp::ALPFloat;
use crate::array::{ALPArray, ALPEncoding};
use crate::downcast::DowncastALP;
use crate::Exponents;
use vortex::array::downcast::DowncastArrayBuiltin;
use vortex::array::primitive::PrimitiveArray;
use vortex::array::sparse::SparseArray;
Expand All @@ -8,13 +12,9 @@ use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression};
use vortex::compute::flatten::flatten_primitive;
use vortex::compute::patch::patch;
use vortex::ptype::{NativePType, PType};
use vortex::validity::ArrayValidity;
use vortex_error::VortexResult;

use crate::alp::ALPFloat;
use crate::array::{ALPArray, ALPEncoding};
use crate::downcast::DowncastALP;
use crate::Exponents;

#[macro_export]
macro_rules! match_each_alp_float_ptype {
($self:expr, | $_:tt $enc:ident | $($body:tt)*) => ({
Expand Down Expand Up @@ -118,7 +118,7 @@ pub fn decompress(array: &ALPArray) -> VortexResult<PrimitiveArray> {
let decoded = match_each_alp_float_ptype!(array.dtype().try_into().unwrap(), |$T| {
PrimitiveArray::from_nullable(
decompress_primitive::<$T>(encoded.typed_data(), array.exponents()),
encoded.validity().cloned(),
encoded.validity(),
)
})?;

Expand Down
57 changes: 25 additions & 32 deletions vortex-array/src/array/bool/compute.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
use std::sync::Arc;

use arrow_buffer::buffer::BooleanBuffer;
use itertools::Itertools;

use vortex_error::VortexResult;

use crate::array::bool::BoolArray;
use crate::array::downcast::DowncastArrayBuiltin;
use crate::array::{Array, ArrayRef};
use crate::compute::as_contiguous::{as_contiguous, AsContiguousFn};
use crate::compute::as_contiguous::AsContiguousFn;
use crate::compute::fill::FillForwardFn;
use crate::compute::flatten::{flatten_bool, FlattenFn, FlattenedArray};
use crate::compute::flatten::{FlattenFn, FlattenedArray};
use crate::compute::scalar_at::ScalarAtFn;
use crate::compute::ArrayCompute;
use crate::scalar::{BoolScalar, Scalar};
use crate::validity::{ArrayValidity, Validity};

impl ArrayCompute for BoolArray {
fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> {
Expand All @@ -36,20 +36,12 @@ impl ArrayCompute for BoolArray {
impl AsContiguousFn for BoolArray {
fn as_contiguous(&self, arrays: Vec<ArrayRef>) -> VortexResult<ArrayRef> {
// TODO(ngates): implement a HasValidity trait to avoid this duplicate code.
let validity = if arrays.iter().all(|a| a.as_bool().validity().is_none()) {
None
let validity: Option<Validity> = if self.dtype().is_nullable() {
Some(Validity::from_iter(arrays.iter().map(|a| a.as_bool()).map(
|a| a.validity().unwrap_or_else(|| Validity::valid(a.len())),
)))
} else {
Some(as_contiguous(
arrays
.iter()
.map(|a| {
a.as_bool()
.validity()
.cloned()
.unwrap_or_else(|| BoolArray::from(vec![true; a.len()]).into_array())
})
.collect_vec(),
)?)
None
};

Ok(BoolArray::new(
Expand Down Expand Up @@ -84,23 +76,23 @@ impl ScalarAtFn for BoolArray {
impl FillForwardFn for BoolArray {
fn fill_forward(&self) -> VortexResult<ArrayRef> {
if self.validity().is_none() {
Ok(Arc::new(self.clone()))
} else {
let validity = flatten_bool(self.validity().unwrap())?;
let bools = self.buffer();
let mut last_value = false;
let filled = bools
.iter()
.zip(validity.buffer().iter())
.map(|(v, valid)| {
if valid {
last_value = v;
}
last_value
})
.collect::<Vec<_>>();
Ok(BoolArray::from(filled).into_array())
return Ok(Arc::new(self.clone()));
}

let validity = self.validity().unwrap().to_bool_array();
let bools = self.buffer();
let mut last_value = false;
let filled = bools
.iter()
.zip(validity.buffer().iter())
.map(|(v, valid)| {
if valid {
last_value = v;
}
last_value
})
.collect::<Vec<_>>();
Ok(BoolArray::from(filled).into_array())
}
}

Expand All @@ -109,6 +101,7 @@ mod test {
use crate::array::bool::BoolArray;
use crate::array::downcast::DowncastArrayBuiltin;
use crate::compute;
use crate::validity::ArrayValidity;

#[test]
fn fill_forward() {
Expand Down
11 changes: 3 additions & 8 deletions vortex-array/src/array/bool/flatten.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
use std::sync::Arc;

use arrow_array::{ArrayRef as ArrowArrayRef, BooleanArray as ArrowBoolArray};
use arrow_buffer::NullBuffer;

use vortex_error::VortexResult;

use crate::array::bool::BoolArray;
use crate::arrow::wrappers::as_nulls;
use crate::compute::as_arrow::AsArrowArray;
use crate::compute::flatten::flatten_bool;
use crate::validity::ArrayValidity;

impl AsArrowArray for BoolArray {
fn as_arrow(&self) -> VortexResult<ArrowArrayRef> {
let validity = self
.validity()
.map(|a| flatten_bool(a.as_ref()))
.transpose()?
.map(|b| NullBuffer::new(b.buffer));
Ok(Arc::new(ArrowBoolArray::new(
self.buffer().clone(),
validity,
as_nulls(self.validity())?,
)))
}
}
54 changes: 22 additions & 32 deletions vortex-array/src/array/bool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,13 @@ use vortex_error::VortexResult;
use vortex_schema::{DType, Nullability};

use crate::array::IntoArray;
use crate::compute::scalar_at::scalar_at;
use crate::formatter::{ArrayDisplay, ArrayFormatter};
use crate::impl_array;
use crate::serde::{ArraySerde, EncodingSerde};
use crate::stats::{Stat, Stats, StatsSet};
use crate::validity::{ArrayValidity, Validity};

use super::{
check_slice_bounds, check_validity_buffer, Array, ArrayRef, Encoding, EncodingId, EncodingRef,
ENCODINGS,
};
use super::{check_slice_bounds, Array, ArrayRef, Encoding, EncodingId, EncodingRef, ENCODINGS};

mod compute;
mod flatten;
Expand All @@ -27,36 +24,30 @@ mod stats;
pub struct BoolArray {
buffer: BooleanBuffer,
stats: Arc<RwLock<StatsSet>>,
validity: Option<ArrayRef>,
validity: Option<Validity>,
}

impl BoolArray {
pub fn new(buffer: BooleanBuffer, validity: Option<ArrayRef>) -> Self {
pub fn new(buffer: BooleanBuffer, validity: Option<Validity>) -> Self {
Self::try_new(buffer, validity).unwrap()
}

pub fn try_new(buffer: BooleanBuffer, validity: Option<ArrayRef>) -> VortexResult<Self> {
let validity = validity.filter(|v| !v.is_empty());
check_validity_buffer(validity.as_ref(), buffer.len())?;

pub fn try_new(buffer: BooleanBuffer, validity: Option<Validity>) -> VortexResult<Self> {
if let Some(v) = &validity {
assert_eq!(v.len(), buffer.len());
}
Ok(Self {
buffer,
stats: Arc::new(RwLock::new(StatsSet::new())),
validity,
})
}

fn is_valid(&self, index: usize) -> bool {
self.validity
.as_deref()
.map(|v| scalar_at(v, index).unwrap().try_into().unwrap())
.unwrap_or(true)
}

/// Create an all-null boolean array.
pub fn null(n: usize) -> Self {
BoolArray::new(
BooleanBuffer::from(vec![false; n]),
Some(BoolArray::from(vec![false; n]).into_array()),
Some(Validity::invalid(n)),
)
}

Expand All @@ -65,9 +56,8 @@ impl BoolArray {
&self.buffer
}

#[inline]
pub fn validity(&self) -> Option<&ArrayRef> {
self.validity.as_ref()
pub fn into_buffer(self) -> BooleanBuffer {
self.buffer
}
}

Expand Down Expand Up @@ -104,11 +94,7 @@ impl Array for BoolArray {
Ok(Self {
buffer: self.buffer.slice(start, stop - start),
stats: Arc::new(RwLock::new(StatsSet::new())),
validity: self
.validity
.as_ref()
.map(|v| v.slice(start, stop))
.transpose()?,
validity: self.validity.as_ref().map(|v| v.slice(start, stop)),
}
.into_array())
}
Expand All @@ -128,6 +114,12 @@ impl Array for BoolArray {
}
}

impl ArrayValidity for BoolArray {
fn validity(&self) -> Option<Validity> {
self.validity.clone()
}
}

#[derive(Debug)]
pub struct BoolEncoding;

Expand All @@ -154,7 +146,7 @@ impl ArrayDisplay for BoolArray {
let false_count = self.len() - true_count;
f.property("n_true", true_count)?;
f.property("n_false", false_count)?;
f.maybe_child("validity", self.validity())
f.validity(self.validity())
}
}

Expand Down Expand Up @@ -191,17 +183,15 @@ impl FromIterator<Option<bool>> for BoolArray {
if validity.is_empty() {
BoolArray::from(values)
} else {
BoolArray::new(
BooleanBuffer::from(values),
Some(BoolArray::from(validity).into_array()),
)
BoolArray::new(BooleanBuffer::from(values), Some(Validity::from(validity)))
}
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::compute::scalar_at::scalar_at;

#[test]
fn slice() {
Expand Down
24 changes: 5 additions & 19 deletions vortex-array/src/array/bool/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,18 @@ use vortex_error::VortexResult;
use crate::array::bool::{BoolArray, BoolEncoding};
use crate::array::{Array, ArrayRef};
use crate::serde::{ArraySerde, EncodingSerde, ReadCtx, WriteCtx};
use crate::validity::ArrayValidity;

impl ArraySerde for BoolArray {
fn write(&self, ctx: &mut WriteCtx) -> VortexResult<()> {
if let Some(v) = self.validity() {
ctx.write(v.as_ref())?;
}
ctx.write_validity(self.validity())?;
ctx.write_buffer(self.len(), &self.buffer().sliced())
}
}

impl EncodingSerde for BoolEncoding {
fn read(&self, ctx: &mut ReadCtx) -> VortexResult<ArrayRef> {
let validity = if ctx.schema().is_nullable() {
Some(ctx.validity().read()?)
} else {
None
};

let validity = ctx.read_validity()?;
let (logical_len, buf) = ctx.read_buffer(|len| (len + 7) / 8)?;
Ok(BoolArray::new(BooleanBuffer::new(buf, 0, logical_len), validity).into_array())
}
Expand All @@ -33,22 +27,14 @@ mod test {
use crate::array::bool::BoolArray;
use crate::array::downcast::DowncastArrayBuiltin;
use crate::serde::test::roundtrip_array;
use crate::validity::ArrayValidity;

#[test]
fn roundtrip() {
let arr = BoolArray::from_iter(vec![Some(false), None, Some(true), Some(false)]);
let read_arr = roundtrip_array(&arr).unwrap();

assert_eq!(arr.buffer().values(), read_arr.as_bool().buffer().values());
assert_eq!(
arr.validity().unwrap().as_bool().buffer().values(),
read_arr
.as_bool()
.validity()
.unwrap()
.as_bool()
.buffer()
.values()
);
assert_eq!(arr.validity(), read_arr.validity());
}
}
15 changes: 15 additions & 0 deletions vortex-array/src/array/chunked/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::formatter::{ArrayDisplay, ArrayFormatter};
use crate::impl_array;
use crate::serde::{ArraySerde, EncodingSerde};
use crate::stats::{Stats, StatsSet};
use crate::validity::{ArrayValidity, Validity};

mod compute;
mod serde;
Expand Down Expand Up @@ -153,6 +154,20 @@ impl Array for ChunkedArray {
}
}

impl ArrayValidity for ChunkedArray {
fn validity(&self) -> Option<Validity> {
if !self.dtype.is_nullable() {
return None;
}

Some(Validity::from_iter(self.chunks.iter().map(|chunk| {
chunk
.validity()
.unwrap_or_else(|| Validity::valid(chunk.len()))
})))
}
}

impl FromIterator<ArrayRef> for ChunkedArray {
fn from_iter<T: IntoIterator<Item = ArrayRef>>(iter: T) -> Self {
let chunks: Vec<ArrayRef> = iter.into_iter().collect();
Expand Down
Loading

0 comments on commit 308fde9

Please sign in to comment.