diff --git a/src/context/delta.rs b/src/context/delta.rs index 02c88e81..401d49eb 100644 --- a/src/context/delta.rs +++ b/src/context/delta.rs @@ -7,6 +7,7 @@ use bytes::BytesMut; use datafusion::error::Result; use datafusion::execution::context::SessionState; use datafusion::parquet::basic::{Compression, ZstdLevel}; +use datafusion::parquet::format::FileMetaData; use datafusion::{ arrow::datatypes::{Schema, SchemaRef}, datasource::TableProvider, @@ -28,6 +29,7 @@ use futures::{StreamExt, TryStreamExt}; use object_store::path::Path; use object_store::ObjectStore; use std::fs::File; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use tempfile::{NamedTempFile, TempPath}; @@ -48,11 +50,8 @@ const PARTITION_FILE_BUFFER_SIZE: usize = 128 * 1024; // This denotes the threshold size for an individual multipart request payload prior to upload. // It dictates the memory usage, as we'll need to to keep each part in memory until sent. const PARTITION_FILE_MIN_PART_SIZE: usize = 5 * 1024 * 1024; -// Controls how many multipart upload tasks we let run in parallel; this is in part dictated by the -// fact that object store concurrently uploads parts for each of our tasks. That concurrency in -// turn is hard coded to 8 (https://github.com/apache/arrow-rs/blob/master/object_store/src/aws/mod.rs#L145) -// meaning that with 2 partition upload tasks x 8 part upload tasks x 5MB we have 80MB of memory usage -const PARTITION_FILE_UPLOAD_MAX_CONCURRENCY: usize = 2; +// Controls how many partition write/multipart upload tasks we let run in parallel at any one time. +const PARTITION_FILE_WRITE_MAX_CONCURRENCY: usize = 3; #[cfg(test)] fn get_uuid() -> Uuid { @@ -92,75 +91,116 @@ fn temp_partition_file_writer( } /// Execute a plan and upload the results to object storage as Parquet files, indexing them. -/// Partially taken from DataFusion's plan_to_parquet with some additions (file stats, using a DiskManager) +/// We try to adhere to a couple of invariants here, namely: +/// - no partition will have more than `max_partition_size` rows +/// - there will be at least `min_file_count_hint` partition files and possibly more (but not less) pub async fn plan_to_object_store( state: &SessionState, plan: &Arc<dyn ExecutionPlan>, store: Arc<dyn ObjectStore>, local_data_dir: Option<String>, max_partition_size: u32, + min_file_count_hint: Option<usize>, ) -> Result<Vec<Add>> { - let mut current_partition_size = 0; - let (mut current_partition_file_path, mut writer) = - temp_partition_file_writer(plan.schema())?; - let mut partition_file_paths = vec![current_partition_file_path]; - let mut partition_metadata = vec![]; - let mut tasks = vec![]; + let max_concurrency = match min_file_count_hint { + // No/zero file hint passed, go with sequential writing + None | Some(0) => 1, + // Some file hint provided; use that or `PARTITION_FILE_WRITE_MAX_CONCURRENCY` tasks to + // re-chunk DataFusion partitions if the anticipated file count is larger + Some(min_file_count) => min_file_count.min(PARTITION_FILE_WRITE_MAX_CONCURRENCY), + }; + + // Pointer to the next partition to stream from + let partition = Arc::new(AtomicUsize::new(0)); - // Iterate over Datafusion partitions and re-chunk them, since we want to enforce a pre-defined + let input_partitions = plan.output_partitioning().partition_count(); + + // Iterate over DataFusion partitions and re-chunk them, since we want to enforce a pre-defined // partition size limit, which is not guaranteed by DF. - info!("Persisting data into temporary partition objects on disk"); - for i in 0..plan.output_partitioning().partition_count() { + info!("Persisting data into temporary partition objects on disk in {max_concurrency} tasks from {input_partitions} input partitions"); + let mut tasks = vec![]; + for _ in 0..max_concurrency { + let task_partition = Arc::clone(&partition); + + let input = plan.clone(); let task_ctx = Arc::new(TaskContext::from(state)); - let mut stream = plan.execute(i, task_ctx)?; - - while let Some(batch) = stream.next().await { - let mut batch = batch?; - - let mut leftover_partition_capacity = - (max_partition_size - current_partition_size) as usize; - - while batch.num_rows() > leftover_partition_capacity { - if leftover_partition_capacity > 0 { - // Fill up the remaining capacity in the slice - writer - .write(&batch.slice(0, leftover_partition_capacity)) - .map_err(DataFusionError::from)?; - // Trim away the part that made it to the current partition - batch = batch.slice( - leftover_partition_capacity, - batch.num_rows() - leftover_partition_capacity, - ); - } + let handle: tokio::task::JoinHandle<Result<Vec<(TempPath, FileMetaData)>>> = + tokio::task::spawn(async move { + let mut current_partition_size = 0; + let (mut current_partition_file_path, mut writer) = + temp_partition_file_writer(input.schema())?; + let mut partition_file_paths = vec![current_partition_file_path]; + let mut partition_metadata = vec![]; + + let mut next_partition = task_partition.fetch_add(1, Ordering::SeqCst); + while next_partition < input_partitions { + let mut stream = input.execute(next_partition, task_ctx.clone())?; + while let Some(batch) = stream.next().await { + let mut batch = batch?; + + let mut leftover_partition_capacity = + (max_partition_size - current_partition_size) as usize; + + while batch.num_rows() > leftover_partition_capacity { + if leftover_partition_capacity > 0 { + // Fill up the remaining capacity in the slice + writer + .write(&batch.slice(0, leftover_partition_capacity)) + .map_err(DataFusionError::from)?; + // Trim away the part that made it to the current partition + batch = batch.slice( + leftover_partition_capacity, + batch.num_rows() - leftover_partition_capacity, + ); + } - // Roll-over into the next partition: close partition writer, reset partition size - // counter and open new temp file + writer. - let file_metadata = writer.close().map_err(DataFusionError::from)?; - partition_metadata.push(file_metadata); + // Roll-over into the next partition: close partition writer, reset partition size + // counter and open new temp file + writer. + let file_metadata = + writer.close().map_err(DataFusionError::from)?; + partition_metadata.push(file_metadata); - current_partition_size = 0; - leftover_partition_capacity = max_partition_size as usize; + current_partition_size = 0; + leftover_partition_capacity = max_partition_size as usize; - (current_partition_file_path, writer) = - temp_partition_file_writer(plan.schema())?; - partition_file_paths.push(current_partition_file_path); - } + (current_partition_file_path, writer) = + temp_partition_file_writer(input.schema())?; + partition_file_paths.push(current_partition_file_path); + } - current_partition_size += batch.num_rows() as u32; - writer.write(&batch).map_err(DataFusionError::from)?; - } + current_partition_size += batch.num_rows() as u32; + writer.write(&batch).map_err(DataFusionError::from)?; + } + + // Roll-over into the next partition + next_partition = task_partition.fetch_add(1, Ordering::SeqCst); + } + let file_metadata = writer.close().map_err(DataFusionError::from)?; + partition_metadata.push(file_metadata); + + Ok(partition_file_paths + .into_iter() + .zip(partition_metadata) + .collect()) + }); + tasks.push(handle); } - let file_metadata = writer.close().map_err(DataFusionError::from)?; - partition_metadata.push(file_metadata); + + // Merge all the partition handles/metadata + let partition_data: Vec<(TempPath, FileMetaData)> = futures::future::join_all(tasks) + .await + .into_iter() + .map(|x| x.unwrap_or_else(|e| Err(DataFusionError::External(Box::new(e))))) + .collect::<Result<Vec<Vec<(TempPath, FileMetaData)>>>>()? + .into_iter() + .flatten() + .collect(); info!("Starting upload of partition objects"); let partitions_uuid = get_uuid(); - - let sem = Arc::new(Semaphore::new(PARTITION_FILE_UPLOAD_MAX_CONCURRENCY)); - for (part, (partition_file_path, metadata)) in partition_file_paths - .into_iter() - .zip(partition_metadata) - .enumerate() + let mut tasks = vec![]; + let sem = Arc::new(Semaphore::new(PARTITION_FILE_WRITE_MAX_CONCURRENCY)); + for (part, (partition_file_path, metadata)) in partition_data.into_iter().enumerate() { let permit = Arc::clone(&sem).acquire_owned().await.ok(); @@ -412,6 +452,7 @@ impl SeafowlContext { table_log_store.object_store(), local_table_dir, self.config.misc.max_partition_size, + None, ) .await?; @@ -554,6 +595,7 @@ mod tests { object_store.clone(), local_table_dir, 2, + None, ) .await .unwrap(); @@ -707,6 +749,7 @@ mod tests { object_store, None, max_partition_size, + None, ) .await .unwrap(); diff --git a/src/context/physical.rs b/src/context/physical.rs index 57bfe954..f24788f3 100644 --- a/src/context/physical.rs +++ b/src/context/physical.rs @@ -315,6 +315,7 @@ impl SeafowlContext { object_store, local_table_dir, self.config.misc.max_partition_size, + None, ) .await?; @@ -427,6 +428,7 @@ impl SeafowlContext { object_store, local_table_dir, self.config.misc.max_partition_size, + None, ) .await?; diff --git a/src/frontend/flight/sync/writer.rs b/src/frontend/flight/sync/writer.rs index 7f6b8e2c..206ac08c 100644 --- a/src/frontend/flight/sync/writer.rs +++ b/src/frontend/flight/sync/writer.rs @@ -454,6 +454,10 @@ impl SeafowlDataSyncWriter { }) .collect::<Vec<_>>(); + // Guesstimate the number of new partition files that will be produced + let max_size = self.context.config.misc.max_partition_size as usize; + let min_file_count_hint = files.len() + (entry.rows + max_size - 1) / max_size; + // Create a special Delta table provider that will only hit the above partition files let base_scan = Arc::new( DeltaTableProvider::try_new( @@ -505,6 +509,7 @@ impl SeafowlDataSyncWriter { log_store.object_store(), local_data_dir, self.context.config.misc.max_partition_size, + Some(min_file_count_hint), ) .await?;