From dde40a66997553f068d5afa7319409bf789f86ee Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Wed, 2 Feb 2022 07:33:05 +0100 Subject: [PATCH] Simplified API to write files (#78) --- integration-tests/src/lib.rs | 21 ++++ integration-tests/src/write/mod.rs | 57 ++++++---- src/read/mod.rs | 3 +- src/write/file.rs | 139 +++++++++++++++------- src/write/mod.rs | 11 +- src/write/row_group.rs | 38 +------ src/write/stream.rs | 177 ++++++++++++++++++++--------- src/write/stream_stream.rs | 96 ---------------- 8 files changed, 285 insertions(+), 257 deletions(-) delete mode 100644 src/write/stream_stream.rs diff --git a/integration-tests/src/lib.rs b/integration-tests/src/lib.rs index e74da2330..ec9d5e664 100644 --- a/integration-tests/src/lib.rs +++ b/integration-tests/src/lib.rs @@ -19,6 +19,27 @@ pub enum Array { Struct(Vec, Vec), } +impl Array { + pub fn len(&self) -> usize { + match self { + Array::UInt32(a) => a.len(), + Array::Int32(a) => a.len(), + Array::Int64(a) => a.len(), + Array::Int96(a) => a.len(), + Array::Float32(a) => a.len(), + Array::Float64(a) => a.len(), + Array::Boolean(a) => a.len(), + Array::Binary(a) => a.len(), + Array::List(a) => a.len(), + Array::Struct(a, _) => a[0].len(), + } + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + // The dynamic representation of values in native Rust. This is not exaustive. // todo: maybe refactor this into serde/json? #[derive(Debug, PartialEq)] diff --git a/integration-tests/src/write/mod.rs b/integration-tests/src/write/mod.rs index d12986529..4ece748c2 100644 --- a/integration-tests/src/write/mod.rs +++ b/integration-tests/src/write/mod.rs @@ -31,7 +31,7 @@ mod tests { use parquet::error::Result; use parquet::metadata::SchemaDescriptor; use parquet::statistics::Statistics; - use parquet::write::{write_file, Compressor, DynIter, DynStreamingIterator, Version}; + use parquet::write::{Compressor, DynIter, DynStreamingIterator, FileWriter, Version}; use super::*; @@ -65,16 +65,20 @@ mod tests { let a = schema.columns(); - let row_groups = std::iter::once(Ok(DynIter::new(std::iter::once(Ok( - DynStreamingIterator::new(Compressor::new_from_vec( - DynIter::new(std::iter::once(array_to_page(&array, &options, &a[0]))), - options.compression, - vec![], - )), - ))))); + let num_rows = array.len(); + let pages = DynStreamingIterator::new(Compressor::new_from_vec( + DynIter::new(std::iter::once(array_to_page(&array, &options, &a[0]))), + options.compression, + vec![], + )); + let columns = std::iter::once(Ok(pages)); - let mut writer = Cursor::new(vec![]); - write_file(&mut writer, row_groups, schema, options, None, None)?; + let writer = Cursor::new(vec![]); + let mut writer = FileWriter::new(writer, schema, options, None); + + writer.start()?; + writer.write(DynIter::new(columns), num_rows)?; + let writer = writer.end(None)?.1; let data = writer.into_inner(); @@ -142,7 +146,7 @@ mod tests2 { error::Result, metadata::SchemaDescriptor, read::read_metadata, - write::{write_file, Compressor, DynIter, DynStreamingIterator, Version}, + write::{Compressor, DynIter, DynStreamingIterator, FileWriter, Version}, }; #[test] @@ -165,20 +169,23 @@ mod tests2 { let schema = SchemaDescriptor::try_from_message("message schema { OPTIONAL INT32 col; }")?; - let row_groups = std::iter::once(Ok(DynIter::new(std::iter::once(Ok( - DynStreamingIterator::new(Compressor::new_from_vec( - DynIter::new(std::iter::once(array_to_page_v1( - &array, - &options, - &schema.columns()[0], - ))), - options.compression, - vec![], - )), - ))))); - - let mut writer = Cursor::new(vec![]); - write_file(&mut writer, row_groups, schema, options, None, None)?; + let pages = DynStreamingIterator::new(Compressor::new_from_vec( + DynIter::new(std::iter::once(array_to_page_v1( + &array, + &options, + &schema.columns()[0], + ))), + options.compression, + vec![], + )); + let columns = std::iter::once(Ok(pages)); + + let writer = Cursor::new(vec![]); + let mut writer = FileWriter::new(writer, schema, options, None); + + writer.start()?; + writer.write(DynIter::new(columns), 7)?; + let writer = writer.end(None)?.1; let data = writer.into_inner(); let mut reader = Cursor::new(data); diff --git a/src/read/mod.rs b/src/read/mod.rs index 175b33feb..66ecacd11 100644 --- a/src/read/mod.rs +++ b/src/read/mod.rs @@ -342,8 +342,7 @@ mod tests { let column = 0; let column_metadata = metadata.row_groups[row_group].column(column); let buffer = vec![]; - let mut iter: Vec<_> = - get_page_iterator(column_metadata, &mut file, None, buffer)?.collect(); + let iter: Vec<_> = get_page_iterator(column_metadata, &mut file, None, buffer)?.collect(); let field = metadata.schema().fields()[0].clone(); let mut iter = ReadColumnIterator::new(field, vec![(iter, column_metadata.clone())]); diff --git a/src/write/file.rs b/src/write/file.rs index 291d00f6b..b87c8ee3b 100644 --- a/src/write/file.rs +++ b/src/write/file.rs @@ -4,6 +4,7 @@ use parquet_format_async_temp::FileMetaData; use parquet_format_async_temp::thrift::protocol::TCompactOutputProtocol; use parquet_format_async_temp::thrift::protocol::TOutputProtocol; +use parquet_format_async_temp::RowGroup; pub use crate::metadata::KeyValue; use crate::{ @@ -37,53 +38,103 @@ pub(super) fn end_file(mut writer: &mut W, metadata: FileMetaData) -> Ok(metadata_len as u64 + FOOTER_SIZE) } -pub fn write_file<'a, W, I, E>( - writer: &mut W, - row_groups: I, +/// An interface to write a parquet file. +/// Use `start` to write the header, `write` to write a row group, +/// and `end` to write the footer. +pub struct FileWriter { + writer: W, schema: SchemaDescriptor, options: WriteOptions, created_by: Option, - key_value_metadata: Option>, -) -> Result -where - W: Write, - I: Iterator, E>>, - ParquetError: From, - E: std::error::Error, -{ - let mut offset = start_file(writer)? as u64; - - let row_groups = row_groups - .map(|row_group| { - let (group, size) = write_row_group( - writer, - offset, - schema.columns(), - options.compression, - row_group?, - )?; - offset += size; - Ok(group) - }) - .collect::>>()?; - - // compute file stats - let num_rows = row_groups.iter().map(|group| group.num_rows).sum(); - - let metadata = FileMetaData::new( - options.version.into(), - schema.into_thrift()?, - num_rows, - row_groups, - key_value_metadata, - created_by, - None, - None, - None, - ); - - let len = end_file(writer, metadata)?; - Ok(offset + len) + + offset: u64, + row_groups: Vec, +} + +// Accessors +impl FileWriter { + /// The options assigned to the file + pub fn options(&self) -> &WriteOptions { + &self.options + } + + /// The [`SchemaDescriptor`] assigned to this file + pub fn schema(&self) -> &SchemaDescriptor { + &self.schema + } +} + +impl FileWriter { + /// Returns a new [`FileWriter`]. + pub fn new( + writer: W, + schema: SchemaDescriptor, + options: WriteOptions, + created_by: Option, + ) -> Self { + Self { + writer, + schema, + options, + created_by, + offset: 0, + row_groups: vec![], + } + } + + /// Writes the header of the file + pub fn start(&mut self) -> Result<()> { + self.offset = start_file(&mut self.writer)? as u64; + Ok(()) + } + + /// Writes a row group to the file. + /// + /// This call is IO-bounded + pub fn write(&mut self, row_group: RowGroupIter<'_, E>, num_rows: usize) -> Result<()> + where + ParquetError: From, + E: std::error::Error, + { + if self.offset == 0 { + return Err(ParquetError::General( + "You must call `start` before writing the first row group".to_string(), + )); + } + let (group, size) = write_row_group( + &mut self.writer, + self.offset, + self.schema.columns(), + self.options.compression, + row_group, + num_rows, + )?; + self.offset += size; + self.row_groups.push(group); + Ok(()) + } + + /// Writes the footer of the parquet file. Returns the total size of the file and the + /// underlying writer. + pub fn end(mut self, key_value_metadata: Option>) -> Result<(u64, W)> { + // compute file stats + let num_rows = self.row_groups.iter().map(|group| group.num_rows).sum(); + + let metadata = FileMetaData::new( + self.options.version.into(), + self.schema.into_thrift()?, + num_rows, + self.row_groups, + key_value_metadata, + self.created_by, + None, + None, + None, + ); + + let len = end_file(&mut self.writer, metadata)?; + Ok((self.offset + len, self.writer)) + } } #[cfg(test)] diff --git a/src/write/mod.rs b/src/write/mod.rs index 9af7bcb9d..67b76a608 100644 --- a/src/write/mod.rs +++ b/src/write/mod.rs @@ -6,16 +6,16 @@ mod row_group; pub(self) mod statistics; #[cfg(feature = "stream")] -pub mod stream; +mod stream; #[cfg(feature = "stream")] -mod stream_stream; +pub use stream::FileStreamer; mod dyn_iter; pub use dyn_iter::{DynIter, DynStreamingIterator}; pub use compression::{compress, Compressor}; -pub use file::write_file; +pub use file::FileWriter; use crate::compression::Compression; use crate::page::CompressedPage; @@ -23,13 +23,18 @@ use crate::page::CompressedPage; pub type RowGroupIter<'a, E> = DynIter<'a, std::result::Result, E>>; +/// Write options of different interfaces on this crate #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub struct WriteOptions { + /// Whether to write statistics pub write_statistics: bool, + /// Whether to use compression pub compression: Compression, + /// Which Parquet version to use pub version: Version, } +/// The parquet version to use #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub enum Version { V1, diff --git a/src/write/row_group.rs b/src/write/row_group.rs index c165ef171..8e4746f06 100644 --- a/src/write/row_group.rs +++ b/src/write/row_group.rs @@ -15,18 +15,6 @@ use super::{ DynIter, DynStreamingIterator, }; -fn same_elements(arr: &[T]) -> Option> { - if arr.is_empty() { - return Some(None); - } - let first = &arr[0]; - if arr.iter().all(|item| item == first) { - Some(Some(*first)) - } else { - None - } -} - pub fn write_row_group< 'a, W, @@ -37,6 +25,7 @@ pub fn write_row_group< descriptors: &[ColumnDescriptor], compression: Compression, columns: DynIter<'a, std::result::Result, E>>, + num_rows: usize, ) -> Result<(RowGroup, u64)> where W: Write, @@ -57,16 +46,6 @@ where let bytes_written = offset - initial; // compute row group stats - let num_rows = columns - .iter() - .map(|c| c.meta_data.as_ref().unwrap().num_values) - .collect::>(); - let num_rows = match same_elements(&num_rows) { - None => return Err(general_err!("Every column chunk in a row group MUST have the same number of rows. The columns have rows: {:?}", num_rows)), - Some(None) => 0, - Some(Some(v)) => v - }; - let total_byte_size = columns .iter() .map(|c| c.meta_data.as_ref().unwrap().total_compressed_size) @@ -76,7 +55,7 @@ where RowGroup { columns, total_byte_size, - num_rows, + num_rows: num_rows as i64, sorting_columns: None, file_offset: None, total_compressed_size: None, @@ -96,6 +75,7 @@ pub async fn write_row_group_async< descriptors: &[ColumnDescriptor], compression: Compression, columns: DynIter<'a, std::result::Result, E>>, + num_rows: usize, ) -> Result<(RowGroup, u64)> where W: AsyncWrite + Unpin + Send, @@ -115,16 +95,6 @@ where let bytes_written = offset - initial; // compute row group stats - let num_rows = columns - .iter() - .map(|c| c.meta_data.as_ref().unwrap().num_values) - .collect::>(); - let num_rows = match same_elements(&num_rows) { - None => return Err(general_err!("Every column chunk in a row group MUST have the same number of rows. The columns have rows: {:?}", num_rows)), - Some(None) => 0, - Some(Some(v)) => v - }; - let total_byte_size = columns .iter() .map(|c| c.meta_data.as_ref().unwrap().total_compressed_size) @@ -134,7 +104,7 @@ where RowGroup { columns, total_byte_size, - num_rows, + num_rows: num_rows as i64, sorting_columns: None, file_offset: None, total_compressed_size: None, diff --git a/src/write/stream.rs b/src/write/stream.rs index 29b01ccd7..a766a02ea 100644 --- a/src/write/stream.rs +++ b/src/write/stream.rs @@ -1,69 +1,140 @@ use std::io::Write; -use futures::pin_mut; -use futures::stream::Stream; -use futures::Future; -use futures::StreamExt; +use futures::{AsyncWrite, AsyncWriteExt}; -use parquet_format_async_temp::FileMetaData; +use parquet_format_async_temp::{ + thrift::protocol::{TCompactOutputStreamProtocol, TOutputStreamProtocol}, + FileMetaData, RowGroup, +}; -pub use crate::metadata::KeyValue; use crate::{ error::{ParquetError, Result}, - metadata::SchemaDescriptor, + metadata::{KeyValue, SchemaDescriptor}, + FOOTER_SIZE, PARQUET_MAGIC, }; -use super::file::{end_file, start_file}; -use super::{row_group::write_row_group, RowGroupIter, WriteOptions}; +use super::{row_group::write_row_group_async, RowGroupIter, WriteOptions}; + +async fn start_file(writer: &mut W) -> Result { + writer.write_all(&PARQUET_MAGIC).await?; + Ok(PARQUET_MAGIC.len() as u64) +} + +async fn end_file( + mut writer: &mut W, + metadata: FileMetaData, +) -> Result { + // Write file metadata + let mut protocol = TCompactOutputStreamProtocol::new(&mut writer); + let metadata_len = metadata.write_to_out_stream_protocol(&mut protocol).await? as i32; + protocol.flush().await?; + + // Write footer + let metadata_bytes = metadata_len.to_le_bytes(); + let mut footer_buffer = [0u8; FOOTER_SIZE as usize]; + (0..4).for_each(|i| { + footer_buffer[i] = metadata_bytes[i]; + }); + + (&mut footer_buffer[4..]).write_all(&PARQUET_MAGIC)?; + writer.write_all(&footer_buffer).await?; + Ok(metadata_len as u64 + FOOTER_SIZE) +} -pub async fn write_stream<'a, W, S, E, F>( - writer: &mut W, - row_groups: S, +/// An interface to write a parquet file asynchronously. +/// Use `start` to write the header, `write` to write a row group, +/// and `end` to write the footer. +pub struct FileStreamer { + writer: W, schema: SchemaDescriptor, options: WriteOptions, created_by: Option, - key_value_metadata: Option>, -) -> Result -where - W: Write, - F: Future, E>>, - S: Stream, - ParquetError: From, - E: std::error::Error, -{ - let mut offset = start_file(writer)? as u64; - - let mut groups = vec![]; - pin_mut!(row_groups); - while let Some(row_group) = row_groups.next().await { - let (group, size) = write_row_group( - writer, - offset, - schema.columns(), - options.compression, - row_group.await?, - )?; - offset += size; - groups.push(group); + + offset: u64, + row_groups: Vec, +} + +// Accessors +impl FileStreamer { + /// The options assigned to the file + pub fn options(&self) -> &WriteOptions { + &self.options } - // compute file stats - let num_rows = groups.iter().map(|group| group.num_rows).sum(); - - let metadata = FileMetaData::new( - options.version.into(), - schema.into_thrift()?, - num_rows, - groups, - key_value_metadata, - created_by, - None, - None, - None, - ); - - let len = end_file(writer, metadata)?; - Ok(offset + len) + /// The [`SchemaDescriptor`] assigned to this file + pub fn schema(&self) -> &SchemaDescriptor { + &self.schema + } } -pub use super::stream_stream::write_stream_stream; +impl FileStreamer { + /// Returns a new [`FileStreamer`]. + pub fn new( + writer: W, + schema: SchemaDescriptor, + options: WriteOptions, + created_by: Option, + ) -> Self { + Self { + writer, + schema, + options, + created_by, + offset: 0, + row_groups: vec![], + } + } + + /// Writes the header of the file + pub async fn start(&mut self) -> Result<()> { + self.offset = start_file(&mut self.writer).await? as u64; + Ok(()) + } + + /// Writes a row group to the file. + pub async fn write(&mut self, row_group: RowGroupIter<'_, E>, num_rows: usize) -> Result<()> + where + ParquetError: From, + E: std::error::Error, + { + if self.offset == 0 { + return Err(ParquetError::General( + "You must call `start` before writing the first row group".to_string(), + )); + } + let (group, size) = write_row_group_async( + &mut self.writer, + self.offset, + self.schema.columns(), + self.options.compression, + row_group, + num_rows, + ) + .await?; + self.offset += size; + self.row_groups.push(group); + Ok(()) + } + + /// Writes the footer of the parquet file. Returns the total size of the file and the + /// underlying writer. + pub async fn end(mut self, key_value_metadata: Option>) -> Result<(u64, W)> { + // compute file stats + let num_rows = self.row_groups.iter().map(|group| group.num_rows).sum(); + + let metadata = FileMetaData::new( + self.options.version.into(), + self.schema.into_thrift()?, + num_rows, + self.row_groups, + key_value_metadata, + self.created_by, + None, + None, + None, + ); + + let len = end_file(&mut self.writer, metadata).await?; + Ok((self.offset + len, self.writer)) + } +} diff --git a/src/write/stream_stream.rs b/src/write/stream_stream.rs deleted file mode 100644 index 6a69a4fcd..000000000 --- a/src/write/stream_stream.rs +++ /dev/null @@ -1,96 +0,0 @@ -use std::io::Write; - -use futures::{pin_mut, stream::Stream, AsyncWrite, AsyncWriteExt, Future, StreamExt}; - -use parquet_format_async_temp::{ - thrift::protocol::{TCompactOutputStreamProtocol, TOutputStreamProtocol}, - FileMetaData, -}; - -use crate::{ - error::{ParquetError, Result}, - metadata::{KeyValue, SchemaDescriptor}, - FOOTER_SIZE, PARQUET_MAGIC, -}; - -use super::{row_group::write_row_group_async, RowGroupIter, WriteOptions}; - -async fn start_file(writer: &mut W) -> Result { - writer.write_all(&PARQUET_MAGIC).await?; - Ok(PARQUET_MAGIC.len() as u64) -} - -async fn end_file( - mut writer: &mut W, - metadata: FileMetaData, -) -> Result { - // Write file metadata - let mut protocol = TCompactOutputStreamProtocol::new(&mut writer); - let metadata_len = metadata.write_to_out_stream_protocol(&mut protocol).await? as i32; - protocol.flush().await?; - - // Write footer - let metadata_bytes = metadata_len.to_le_bytes(); - let mut footer_buffer = [0u8; FOOTER_SIZE as usize]; - (0..4).for_each(|i| { - footer_buffer[i] = metadata_bytes[i]; - }); - - (&mut footer_buffer[4..]).write_all(&PARQUET_MAGIC)?; - writer.write_all(&footer_buffer).await?; - Ok(metadata_len as u64 + FOOTER_SIZE) -} - -/// Given a stream of [`RowGroupIter`] and and an `async` writer, returns a future -/// of writing a parquet file to the writer. -pub async fn write_stream_stream<'a, W, S, E, F>( - writer: &mut W, - row_groups: S, - schema: SchemaDescriptor, - options: WriteOptions, - created_by: Option, - key_value_metadata: Option>, -) -> Result -where - W: AsyncWrite + Unpin + Send, - F: Future, E>>, - S: Stream, - ParquetError: From, - E: std::error::Error, -{ - let mut offset = start_file(writer).await?; - - let mut row_groups_c = vec![]; - - pin_mut!(row_groups); - while let Some(row_group) = row_groups.next().await { - let (row_group, size) = write_row_group_async( - writer, - offset, - schema.columns(), - options.compression, - row_group.await?, - ) - .await?; - offset += size; - row_groups_c.push(row_group); - } - - // compute file stats - let num_rows = row_groups_c.iter().map(|group| group.num_rows).sum(); - - let metadata = FileMetaData::new( - options.version.into(), - schema.into_thrift()?, - num_rows, - row_groups_c, - key_value_metadata, - created_by, - None, - None, - None, - ); - - let len = end_file(writer, metadata).await?; - Ok(offset + len) -}