Skip to content

Commit

Permalink
Merge pull request #639 from splitgraph/SergeiPatiakin/truncate
Browse files Browse the repository at this point in the history
Add support for TRUNCATE command
  • Loading branch information
SergeiPatiakin authored Aug 29, 2024
2 parents b890a25 + 28efc33 commit 0b07557
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 2 deletions.
11 changes: 10 additions & 1 deletion src/context/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::catalog::DEFAULT_SCHEMA;
use crate::context::SeafowlContext;
use crate::datafusion::parser::{DFParser, Statement as DFStatement, CONVERT_TO_DELTA};
use crate::datafusion::utils::build_schema;
use crate::nodes::Truncate;
use crate::wasm_udf::data_types::CreateFunctionDetails;
use crate::{
nodes::{
Expand Down Expand Up @@ -245,7 +246,7 @@ impl SeafowlContext {
})),
}))
},
Statement::Truncate { table_name, partitions, .. } => {
Statement::Truncate { table: false, table_name, partitions, .. } => {
let table_name = if partitions.is_none() {
Some(table_name.to_string())
} else {
Expand All @@ -267,6 +268,14 @@ impl SeafowlContext {
})),
}))
}
Statement::Truncate { table: true, table_name, .. } => {
Ok(LogicalPlan::Extension(Extension {
node: Arc::new(SeafowlExtensionNode::Truncate(Truncate {
table_name: table_name.to_string(),
output_schema: Arc::new(DFSchema::empty())
})),
}))
}
Statement::DropFunction{
if_exists,
func_desc,
Expand Down
39 changes: 38 additions & 1 deletion src/context/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::context::delta::plan_to_object_store;
use crate::context::SeafowlContext;
use crate::nodes::{
ConvertTable, CreateFunction, CreateTable, DropFunction, RenameTable,
SeafowlExtensionNode, Vacuum,
SeafowlExtensionNode, Truncate, Vacuum,
};
use crate::object_store::factory::build_object_store;
use crate::object_store::http::try_prepare_http_url;
Expand Down Expand Up @@ -651,6 +651,43 @@ impl SeafowlContext {
.await?;
Ok(make_dummy_exec())
}
SeafowlExtensionNode::Truncate(Truncate {
table_name, ..
}) => {
let mut table = self.try_get_delta_table(table_name).await?;
table.load().await?;
let snapshot = table.snapshot()?;
let deletion_timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis()
as i64;
let actions: Vec<Action> = snapshot
.file_actions()?
.into_iter()
.map(|add_action| {
Action::Remove(Remove {
path: add_action.path,
deletion_timestamp: Some(deletion_timestamp),
data_change: true,
extended_file_metadata: Some(true),
partition_values: Some(
add_action.partition_values,
),
size: Some(add_action.size),
tags: None,
deletion_vector: None,
base_row_id: None,
default_row_commit_version: None,
})
})
.collect();

let op = DeltaOperation::Delete { predicate: None };
self.commit(actions, &table, op).await?;

Ok(make_dummy_exec())
}
SeafowlExtensionNode::Vacuum(Vacuum {
database,
table_name,
Expand Down
18 changes: 18 additions & 0 deletions src/datafusion/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ impl<'a> DFParser<'a> {
self.parser.next_token();
self.parse_vacuum()
}
Keyword::TRUNCATE => {
self.parser.next_token();
self.parse_truncate()
}
_ => {
// use the native parser
Ok(Statement::Statement(Box::from(
Expand Down Expand Up @@ -219,6 +223,20 @@ impl<'a> DFParser<'a> {
})))
}

pub fn parse_truncate(&mut self) -> Result<Statement, ParserError> {
if !self.parser.parse_keyword(Keyword::TABLE) {
return self.expected("TABLE as a TRUNCATE target", self.parser.peek_token());
}

let table_name = self.parser.parse_object_name(true)?;

Ok(Statement::Statement(Box::new(SQLStatement::Truncate {
table_name,
partitions: None,
table: true,
})))
}

/// Parse a SQL `COPY TO` statement
pub fn parse_copy(&mut self) -> Result<Statement, ParserError> {
// parse as a query
Expand Down
14 changes: 14 additions & 0 deletions src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,21 @@ pub struct Vacuum {
pub output_schema: DFSchemaRef,
}

#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct Truncate {
pub table_name: String,
/// Dummy result schema for the plan (empty)
pub output_schema: DFSchemaRef,
}

#[derive(AsRefStr, Debug, Clone, Hash, PartialEq, Eq)]
pub enum SeafowlExtensionNode {
ConvertTable(ConvertTable),
CreateTable(CreateTable),
CreateFunction(CreateFunction),
DropFunction(DropFunction),
RenameTable(RenameTable),
Truncate(Truncate),
Vacuum(Vacuum),
}

Expand Down Expand Up @@ -120,6 +128,9 @@ impl UserDefinedLogicalNode for SeafowlExtensionNode {
SeafowlExtensionNode::RenameTable(RenameTable { output_schema, .. }) => {
output_schema
}
SeafowlExtensionNode::Truncate(Truncate { output_schema, .. }) => {
output_schema
}
SeafowlExtensionNode::Vacuum(Vacuum { output_schema, .. }) => output_schema,
}
}
Expand Down Expand Up @@ -153,6 +164,9 @@ impl UserDefinedLogicalNode for SeafowlExtensionNode {
}) => {
write!(f, "RenameTable: {} to {}", old_name, new_name)
}
SeafowlExtensionNode::Truncate(Truncate { table_name, .. }) => {
write!(f, "Truncate: {table_name}")
}
SeafowlExtensionNode::Vacuum(Vacuum { database, .. }) => {
write!(
f,
Expand Down
1 change: 1 addition & 0 deletions tests/statements/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ mod convert;
#[path = "../../src/testutils.rs"]
mod testutils;
mod time_travel;
mod truncate;
mod vacuum;

enum ObjectStoreType {
Expand Down
31 changes: 31 additions & 0 deletions tests/statements/truncate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use crate::statements::*;

#[tokio::test]
async fn test_truncate_table() -> Result<()> {
let (context, _) = make_context_with_pg(ObjectStoreType::Local).await;

// Create table_1 and check that it contains data
create_table_and_insert(&context, "table_1").await;
let plan = context.plan_query("SELECT * FROM table_1").await.unwrap();
let results = context.collect(plan).await.unwrap();
let expected = [
"+---------------------+------------+------------------+-----------------+----------------+",
"| some_time | some_value | some_other_value | some_bool_value | some_int_value |",
"+---------------------+------------+------------------+-----------------+----------------+",
"| 2022-01-01T20:01:01 | 42.0 | 1.0000000000 | | 1111 |",
"| 2022-01-01T20:02:02 | 43.0 | 1.0000000000 | | 2222 |",
"| 2022-01-01T20:03:03 | 44.0 | 1.0000000000 | | 3333 |",
"+---------------------+------------+------------------+-----------------+----------------+",
];
assert_batches_eq!(expected, &results);

// Execute TRUNCATE command
context.plan_query("TRUNCATE TABLE table_1").await.unwrap();

// Check that table_1 no longer contains data
let plan = context.plan_query("SELECT * FROM table_1").await.unwrap();
let results = context.collect(plan).await.unwrap();
assert!(results.is_empty());

Ok(())
}

0 comments on commit 0b07557

Please sign in to comment.