diff --git a/src/context/logical.rs b/src/context/logical.rs index 091c678f..38ddaad0 100644 --- a/src/context/logical.rs +++ b/src/context/logical.rs @@ -2,6 +2,7 @@ use crate::catalog::DEFAULT_SCHEMA; use crate::context::SeafowlContext; use crate::datafusion::parser::{DFParser, Statement as DFStatement, CONVERT_TO_DELTA}; use crate::datafusion::utils::build_schema; +use crate::nodes::Truncate; use crate::wasm_udf::data_types::CreateFunctionDetails; use crate::{ nodes::{ @@ -245,7 +246,7 @@ impl SeafowlContext { })), })) }, - Statement::Truncate { table_name, partitions, .. } => { + Statement::Truncate { table: false, table_name, partitions, .. } => { let table_name = if partitions.is_none() { Some(table_name.to_string()) } else { @@ -267,6 +268,14 @@ impl SeafowlContext { })), })) } + Statement::Truncate { table: true, table_name, .. } => { + Ok(LogicalPlan::Extension(Extension { + node: Arc::new(SeafowlExtensionNode::Truncate(Truncate { + table_name: table_name.to_string(), + output_schema: Arc::new(DFSchema::empty()) + })), + })) + } Statement::DropFunction{ if_exists, func_desc, diff --git a/src/context/physical.rs b/src/context/physical.rs index 57bfe954..3c5b8e6d 100644 --- a/src/context/physical.rs +++ b/src/context/physical.rs @@ -4,7 +4,7 @@ use crate::context::delta::plan_to_object_store; use crate::context::SeafowlContext; use crate::nodes::{ ConvertTable, CreateFunction, CreateTable, DropFunction, RenameTable, - SeafowlExtensionNode, Vacuum, + SeafowlExtensionNode, Truncate, Vacuum, }; use crate::object_store::factory::build_object_store; use crate::object_store::http::try_prepare_http_url; @@ -651,6 +651,43 @@ impl SeafowlContext { .await?; Ok(make_dummy_exec()) } + SeafowlExtensionNode::Truncate(Truncate { + table_name, .. + }) => { + let mut table = self.try_get_delta_table(table_name).await?; + table.load().await?; + let snapshot = table.snapshot()?; + let deletion_timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() + as i64; + let actions: Vec = snapshot + .file_actions()? + .into_iter() + .map(|add_action| { + Action::Remove(Remove { + path: add_action.path, + deletion_timestamp: Some(deletion_timestamp), + data_change: true, + extended_file_metadata: Some(true), + partition_values: Some( + add_action.partition_values, + ), + size: Some(add_action.size), + tags: None, + deletion_vector: None, + base_row_id: None, + default_row_commit_version: None, + }) + }) + .collect(); + + let op = DeltaOperation::Delete { predicate: None }; + self.commit(actions, &table, op).await?; + + Ok(make_dummy_exec()) + } SeafowlExtensionNode::Vacuum(Vacuum { database, table_name, diff --git a/src/datafusion/parser.rs b/src/datafusion/parser.rs index 28f9d3ec..d0d259d3 100644 --- a/src/datafusion/parser.rs +++ b/src/datafusion/parser.rs @@ -159,6 +159,10 @@ impl<'a> DFParser<'a> { self.parser.next_token(); self.parse_vacuum() } + Keyword::TRUNCATE => { + self.parser.next_token(); + self.parse_truncate() + } _ => { // use the native parser Ok(Statement::Statement(Box::from( @@ -219,6 +223,20 @@ impl<'a> DFParser<'a> { }))) } + pub fn parse_truncate(&mut self) -> Result { + if !self.parser.parse_keyword(Keyword::TABLE) { + return self.expected("TABLE as a TRUNCATE target", self.parser.peek_token()); + } + + let table_name = self.parser.parse_object_name(true)?; + + Ok(Statement::Statement(Box::new(SQLStatement::Truncate { + table_name, + partitions: None, + table: true, + }))) + } + /// Parse a SQL `COPY TO` statement pub fn parse_copy(&mut self) -> Result { // parse as a query diff --git a/src/nodes.rs b/src/nodes.rs index 45c6c732..528bd979 100644 --- a/src/nodes.rs +++ b/src/nodes.rs @@ -69,6 +69,13 @@ pub struct Vacuum { pub output_schema: DFSchemaRef, } +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +pub struct Truncate { + pub table_name: String, + /// Dummy result schema for the plan (empty) + pub output_schema: DFSchemaRef, +} + #[derive(AsRefStr, Debug, Clone, Hash, PartialEq, Eq)] pub enum SeafowlExtensionNode { ConvertTable(ConvertTable), @@ -76,6 +83,7 @@ pub enum SeafowlExtensionNode { CreateFunction(CreateFunction), DropFunction(DropFunction), RenameTable(RenameTable), + Truncate(Truncate), Vacuum(Vacuum), } @@ -120,6 +128,9 @@ impl UserDefinedLogicalNode for SeafowlExtensionNode { SeafowlExtensionNode::RenameTable(RenameTable { output_schema, .. }) => { output_schema } + SeafowlExtensionNode::Truncate(Truncate { output_schema, .. }) => { + output_schema + } SeafowlExtensionNode::Vacuum(Vacuum { output_schema, .. }) => output_schema, } } @@ -153,6 +164,9 @@ impl UserDefinedLogicalNode for SeafowlExtensionNode { }) => { write!(f, "RenameTable: {} to {}", old_name, new_name) } + SeafowlExtensionNode::Truncate(Truncate { table_name, .. }) => { + write!(f, "Truncate: {table_name}") + } SeafowlExtensionNode::Vacuum(Vacuum { database, .. }) => { write!( f, diff --git a/tests/statements/mod.rs b/tests/statements/mod.rs index c49258d1..ff8ee37c 100644 --- a/tests/statements/mod.rs +++ b/tests/statements/mod.rs @@ -44,6 +44,7 @@ mod convert; #[path = "../../src/testutils.rs"] mod testutils; mod time_travel; +mod truncate; mod vacuum; enum ObjectStoreType { diff --git a/tests/statements/truncate.rs b/tests/statements/truncate.rs new file mode 100644 index 00000000..1640d4ca --- /dev/null +++ b/tests/statements/truncate.rs @@ -0,0 +1,31 @@ +use crate::statements::*; + +#[tokio::test] +async fn test_truncate_table() -> Result<()> { + let (context, _) = make_context_with_pg(ObjectStoreType::Local).await; + + // Create table_1 and check that it contains data + create_table_and_insert(&context, "table_1").await; + let plan = context.plan_query("SELECT * FROM table_1").await.unwrap(); + let results = context.collect(plan).await.unwrap(); + let expected = [ + "+---------------------+------------+------------------+-----------------+----------------+", + "| some_time | some_value | some_other_value | some_bool_value | some_int_value |", + "+---------------------+------------+------------------+-----------------+----------------+", + "| 2022-01-01T20:01:01 | 42.0 | 1.0000000000 | | 1111 |", + "| 2022-01-01T20:02:02 | 43.0 | 1.0000000000 | | 2222 |", + "| 2022-01-01T20:03:03 | 44.0 | 1.0000000000 | | 3333 |", + "+---------------------+------------+------------------+-----------------+----------------+", + ]; + assert_batches_eq!(expected, &results); + + // Execute TRUNCATE command + context.plan_query("TRUNCATE TABLE table_1").await.unwrap(); + + // Check that table_1 no longer contains data + let plan = context.plan_query("SELECT * FROM table_1").await.unwrap(); + let results = context.collect(plan).await.unwrap(); + assert!(results.is_empty()); + + Ok(()) +}