Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[copy_from] AWS source #31144

Merged
merged 2 commits into from
Jan 29, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
start, implementation of an S3 oneshot source
* add a new OneshotSource implementation
* support the FILES and PATTERN options for COPY FROM
  • Loading branch information
ParkMyCar committed Jan 29, 2025
commit 3a8c2ec6b1a47040bd2af3e9329690e0c7f80ce9
11 changes: 9 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/adapter/src/coord/sequencer.rs
Original file line number Diff line number Diff line change
@@ -373,7 +373,7 @@ impl Coordinator {
session,
);
}
CopyFromSource::Url(_) => {
CopyFromSource::Url(_) | CopyFromSource::AwsS3 { .. } => {
self.sequence_copy_from(ctx, plan, target_cluster).await;
}
},
81 changes: 68 additions & 13 deletions src/adapter/src/coord/sequencer/inner/copy_from.rs
Original file line number Diff line number Diff line change
@@ -7,12 +7,14 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::str::FromStr;

use mz_adapter_types::connection::ConnectionId;
use mz_ore::cast::CastInto;
use mz_persist_client::batch::ProtoBatch;
use mz_pgcopy::CopyFormatParams;
use mz_repr::{CatalogItemId, Datum, RowArena};
use mz_sql::plan::{self, CopyFromSource, HirScalarExpr};
use mz_sql::plan::{self, CopyFromFilter, CopyFromSource, HirScalarExpr};
use mz_sql::session::metadata::SessionMetadata;
use mz_storage_client::client::TableData;
use mz_storage_types::oneshot_sources::OneshotIngestionRequest;
@@ -38,16 +40,10 @@ impl Coordinator {
source,
columns: _,
params,
filter,
} = plan;

let from_expr = match source {
CopyFromSource::Url(from_expr) => from_expr,
CopyFromSource::Stdin => {
unreachable!("COPY FROM STDIN should be handled elsewhere")
}
};

let eval_url = |from: HirScalarExpr| -> Result<Url, AdapterError> {
let eval_uri = |from: HirScalarExpr| -> Result<String, AdapterError> {
let style = ExprPrepStyle::OneShot {
logical_time: EvalTime::NotAvailable,
session: ctx.session(),
@@ -66,10 +62,8 @@ impl Coordinator {
other => coord_bail!("programming error! COPY FROM target cannot be {other}"),
};

Url::parse(eval_string)
.map_err(|err| AdapterError::Unstructured(anyhow::anyhow!("{err}")))
Ok(eval_string.to_string())
};
let url = return_if_err!(eval_url(from_expr), ctx);

// We check in planning that we're copying into a Table, but be defensive.
let Some(dest_table) = self.catalog().get_entry(&id).table() else {
@@ -93,9 +87,70 @@ impl Coordinator {
}
};

let source = match source {
CopyFromSource::Url(from_expr) => {
let url = return_if_err!(eval_uri(from_expr), ctx);
// TODO(cf2): Structured errors.
let result = Url::parse(&url)
.map_err(|err| AdapterError::Unstructured(anyhow::anyhow!("{err}")));
let url = return_if_err!(result, ctx);

mz_storage_types::oneshot_sources::ContentSource::Http { url }
}
CopyFromSource::AwsS3 {
uri,
connection,
connection_id,
} => {
let uri = return_if_err!(eval_uri(uri), ctx);

// Validate the URI is an S3 URI, with a bucket name. We rely on validating here
// and expect it in clusterd.
//
// TODO(cf2): Structured errors.
let result = http::Uri::from_str(&uri)
.map_err(|err| {
AdapterError::Unstructured(anyhow::anyhow!("expected S3 uri: {err}"))
})
.and_then(|uri| {
if uri.scheme_str() != Some("s3") {
coord_bail!("only 's3://...' urls are supported as COPY FROM target");
}
Ok(uri)
})
.and_then(|uri| {
if uri.host().is_none() {
coord_bail!("missing bucket name from 's3://...' url");
}
Ok(uri)
});
let uri = return_if_err!(result, ctx);

mz_storage_types::oneshot_sources::ContentSource::AwsS3 {
connection,
connection_id,
uri: uri.to_string(),
}
}
CopyFromSource::Stdin => {
unreachable!("COPY FROM STDIN should be handled elsewhere")
}
};

let filter = match filter {
None => mz_storage_types::oneshot_sources::ContentFilter::None,
Some(CopyFromFilter::Files(files)) => {
mz_storage_types::oneshot_sources::ContentFilter::Files(files)
}
Some(CopyFromFilter::Pattern(pattern)) => {
mz_storage_types::oneshot_sources::ContentFilter::Pattern(pattern)
}
};

let request = OneshotIngestionRequest {
source: mz_storage_types::oneshot_sources::ContentSource::Http { url },
source,
format,
filter,
};

let target_cluster = match self
9 changes: 7 additions & 2 deletions src/aws-util/Cargo.toml
Original file line number Diff line number Diff line change
@@ -12,15 +12,20 @@ workspace = true
[dependencies]
anyhow = "1.0.66"
aws-config = { version = "1.2.0", default-features = false }
aws-sdk-s3 = { version = "1.23.0", default-features = false, features = ["rt-tokio"], optional = true }
aws-sdk-s3 = { version = "1.23.0", default-features = false, features = [
"rt-tokio",
], optional = true }
aws-smithy-runtime-api = "1.1.1"
aws-smithy-runtime = { version = "1.1.1", features = ["connector-hyper-0-14-x"] }
aws-smithy-types = { version = "1.1.8", features = ["byte-stream-poll-next"] }
aws-types = "1.1.1"
bytes = "1.3.0"
bytesize = "1.1.0"
futures = "0.3.25"
http = "1.1.0"
hyper-tls = "0.5.0"
mz-ore = { path = "../ore", default-features = false }
mz-ore = { path = "../ore", features = ["async"], default-features = false }
pin-project = "1.0.12"
thiserror = "1.0.37"
tokio = { version = "1.38.0", default-features = false, features = ["macros"] }
uuid = { version = "1.7.0", features = ["v4"] }
31 changes: 30 additions & 1 deletion src/aws-util/src/s3.rs
Original file line number Diff line number Diff line change
@@ -8,9 +8,11 @@
// by the Apache License, Version 2.0.

use aws_sdk_s3::config::Builder;
use aws_sdk_s3::Client;
use aws_types::sdk_config::SdkConfig;

pub use aws_sdk_s3::Client;
use bytes::Bytes;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is meant to go in the other import block?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh yeah I should move this up, thanks!


/// Creates a new client from an [SDK config](aws_types::sdk_config::SdkConfig)
/// with Materialize-specific customizations.
///
@@ -46,3 +48,30 @@ pub async fn list_bucket_path(
})
.transpose()
}

/// A wrapper around [`ByteStream`] that implements the [`futures::stream::Stream`] trait.
///
/// [`ByteStream`]: aws_smithy_types::byte_stream::ByteStream
#[pin_project::pin_project]
pub struct ByteStreamAdapter {
#[pin]
inner: aws_smithy_types::byte_stream::ByteStream,
}

impl ByteStreamAdapter {
pub fn new(bytes: aws_smithy_types::byte_stream::ByteStream) -> Self {
ByteStreamAdapter { inner: bytes }
}
}

impl futures::stream::Stream for ByteStreamAdapter {
type Item = Result<Bytes, aws_smithy_types::byte_stream::error::Error>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
aws_smithy_types::byte_stream::ByteStream::poll_next(this.inner, cx)
}
}
2 changes: 2 additions & 0 deletions src/sql-lexer/src/keywords.txt
Original file line number Diff line number Diff line change
@@ -174,6 +174,7 @@ Features
Fetch
Fields
File
Files
Filter
First
Fixpoint
@@ -320,6 +321,7 @@ Partition
Partitions
Password
Path
Pattern
Physical
Plan
Plans
5 changes: 5 additions & 0 deletions src/sql-parser/src/ast/defs/statement.rs
Original file line number Diff line number Diff line change
@@ -374,6 +374,8 @@ pub enum CopyOptionName {
Header,
AwsConnection,
MaxFileSize,
Files,
Pattern,
}

impl AstDisplay for CopyOptionName {
@@ -387,6 +389,8 @@ impl AstDisplay for CopyOptionName {
CopyOptionName::Header => "HEADER",
CopyOptionName::AwsConnection => "AWS CONNECTION",
CopyOptionName::MaxFileSize => "MAX FILE SIZE",
CopyOptionName::Files => "FILES",
CopyOptionName::Pattern => "PATTERN",
})
}
}
@@ -407,6 +411,7 @@ impl WithOptionName for CopyOptionName {
| CopyOptionName::Header
| CopyOptionName::AwsConnection
| CopyOptionName::MaxFileSize => false,
CopyOptionName::Files | CopyOptionName::Pattern => true,
}
}
}
8 changes: 5 additions & 3 deletions src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
@@ -6403,9 +6403,9 @@ impl<'a> Parser<'a> {
}

fn parse_copy_option(&mut self) -> Result<CopyOption<Raw>, ParserError> {
let name = match self
.expect_one_of_keywords(&[FORMAT, DELIMITER, NULL, ESCAPE, QUOTE, HEADER, AWS, MAX])?
{
let name = match self.expect_one_of_keywords(&[
FORMAT, DELIMITER, NULL, ESCAPE, QUOTE, HEADER, AWS, MAX, FILES, PATTERN,
])? {
FORMAT => CopyOptionName::Format,
DELIMITER => CopyOptionName::Delimiter,
NULL => CopyOptionName::Null,
@@ -6423,6 +6423,8 @@ impl<'a> Parser<'a> {
self.expect_keywords(&[FILE, SIZE])?;
CopyOptionName::MaxFileSize
}
FILES => CopyOptionName::Files,
PATTERN => CopyOptionName::Pattern,
_ => unreachable!(),
};
Ok(CopyOption {
23 changes: 22 additions & 1 deletion src/sql-parser/tests/testdata/copy
Original file line number Diff line number Diff line change
@@ -79,7 +79,7 @@ Copy(CopyStatement { relation: Named { name: Name(UnresolvedItemName([Ident("t")
parse-statement
COPY t TO STDOUT ()
----
error: Expected one of FORMAT or DELIMITER or NULL or ESCAPE or QUOTE or HEADER or AWS or MAX, found right parenthesis
error: Expected one of FORMAT or DELIMITER or NULL or ESCAPE or QUOTE or HEADER or AWS or MAX or FILES or PATTERN, found right parenthesis
COPY t TO STDOUT ()
^

@@ -184,3 +184,24 @@ COPY INTO t(a, b) TO '/any/path'
error: Expected identifier, found INTO
COPY INTO t(a, b) TO '/any/path'
^

parse-statement
COPY INTO t1 FROM 'http://spacemonkey.info' WITH (FILES = ['foo.csv', 'bar.csv']);
----
COPY t1 FROM 'http://spacemonkey.info' WITH (FILES = ('foo.csv', 'bar.csv'))
=>
Copy(CopyStatement { relation: Named { name: Name(UnresolvedItemName([Ident("t1")])), columns: [] }, direction: From, target: Expr(Value(String("http://spacemonkey.info"))), options: [CopyOption { name: Files, value: Some(Sequence([Value(String("foo.csv")), Value(String("bar.csv"))])) }] })

parse-statement
COPY INTO t1 FROM 'http://spacemonkey.info' WITH (FILES = ['foo.csv', 'bar.csv'], FORMAT CSV);
----
COPY t1 FROM 'http://spacemonkey.info' WITH (FILES = ('foo.csv', 'bar.csv'), FORMAT = csv)
=>
Copy(CopyStatement { relation: Named { name: Name(UnresolvedItemName([Ident("t1")])), columns: [] }, direction: From, target: Expr(Value(String("http://spacemonkey.info"))), options: [CopyOption { name: Files, value: Some(Sequence([Value(String("foo.csv")), Value(String("bar.csv"))])) }, CopyOption { name: Format, value: Some(UnresolvedItemName(UnresolvedItemName([Ident("csv")]))) }] })

parse-statement
COPY INTO t1 FROM 'http://spacemonkey.info' WITH (FILES = ['foo.csv']);
----
COPY t1 FROM 'http://spacemonkey.info' WITH (FILES = ('foo.csv'))
=>
Copy(CopyStatement { relation: Named { name: Name(UnresolvedItemName([Ident("t1")])), columns: [] }, direction: From, target: Expr(Value(String("http://spacemonkey.info"))), options: [CopyOption { name: Files, value: Some(Sequence([Value(String("foo.csv"))])) }] })
18 changes: 17 additions & 1 deletion src/sql/src/plan.rs
Original file line number Diff line number Diff line change
@@ -936,16 +936,32 @@ pub struct CopyFromPlan {
pub source: CopyFromSource,
pub columns: Vec<usize>,
pub params: CopyFormatParams<'static>,
pub filter: Option<CopyFromFilter>,
}

#[derive(Debug)]
pub enum CopyFromSource {
/// Copying from a file local to the user, transmitted via pgwire.
Stdin,
/// A remote resource, e.g. S3.
/// A remote resource, e.g. HTTP file.
///
/// The contained [`HirScalarExpr`] evaluates to the Url for the remote resource.
Url(HirScalarExpr),
/// A file in an S3 bucket.
AwsS3 {
/// Expression that evaluates to the file we want to copy.
uri: HirScalarExpr,
/// Details for how we connect to AWS S3.
connection: AwsConnection,
/// ID of the connection object.
connection_id: CatalogItemId,
},
}

#[derive(Debug)]
pub enum CopyFromFilter {
Files(Vec<String>),
Pattern(String),
}

#[derive(Debug, Clone)]
Loading