From ab43e648f3fa08ec8a47ac40ce0857cbf20a1408 Mon Sep 17 00:00:00 2001 From: Parker Timmerman Date: Wed, 29 Jan 2025 11:06:48 -0500 Subject: [PATCH] [copy_from] AWS source (#31144) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit _Stacked on top of_: https://github.com/MaterializeInc/materialize/pull/30956 This PR implements a new AwsS3 `OneshotSource` that allows copying in files from S3, e.g. ``` COPY INTO my_table FROM 's3://my-test-bucket' (FORMAT CSV, FILES = ['important.csv']); ``` Along with `FILES = []` we also support a `PATTERN = ` option which allows copying multiple files all at once. ### Motivation Fixes https://github.com/MaterializeInc/database-issues/issues/8860 Fixes https://github.com/MaterializeInc/database-issues/issues/8855 ### Tips for reviewer Review only the final commit, the one titled "start, implementation of an S3 oneshot source" ### Checklist - [x] This PR has adequate test coverage / QA involvement has been duly considered. ([trigger-ci for additional test/nightly runs](https://trigger-ci.dev.materialize.com/)) - [x] This PR has an associated up-to-date [design doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md), is a design doc ([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)), or is sufficiently small to not require a design. - [x] If this PR evolves [an existing `$T ⇔ Proto$T` mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md) (possibly in a backwards-incompatible way), then it is tagged with a `T-proto` label. - [x] If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label ([example](https://github.com/MaterializeInc/cloud/pull/5021)). - [x] If this PR includes major [user-facing behavior changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note), I have pinged the relevant PM to schedule a changelog post. --- Cargo.lock | 11 +- src/adapter/src/coord/sequencer.rs | 2 +- .../src/coord/sequencer/inner/copy_from.rs | 81 ++++++- src/aws-util/Cargo.toml | 9 +- src/aws-util/src/s3.rs | 31 ++- src/sql-lexer/src/keywords.txt | 2 + src/sql-parser/src/ast/defs/statement.rs | 5 + src/sql-parser/src/parser.rs | 8 +- src/sql-parser/tests/testdata/copy | 23 +- src/sql/src/plan.rs | 18 +- src/sql/src/plan/statement/dml.rs | 39 ++- src/sql/src/rbac.rs | 1 + src/storage-operators/Cargo.toml | 6 +- src/storage-operators/src/oneshot_source.rs | 138 ++++++++++- .../src/oneshot_source/aws_source.rs | 223 ++++++++++++++++++ .../src/oneshot_source/http_source.rs | 7 +- src/storage-types/src/oneshot_sources.proto | 24 ++ src/storage-types/src/oneshot_sources.rs | 89 ++++++- src/storage/src/render.rs | 5 + 19 files changed, 676 insertions(+), 46 deletions(-) create mode 100644 src/storage-operators/src/oneshot_source/aws_source.rs diff --git a/Cargo.lock b/Cargo.lock index efd8642ce77ae..db04d807601b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3068,9 +3068,9 @@ checksum = "78cc372d058dcf6d5ecd98510e7fbc9e5aec4d21de70f65fea8fecebcd881bd4" [[package]] name = "glob" -version = "0.3.1" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2" [[package]] name = "globset" @@ -4789,12 +4789,15 @@ dependencies = [ "aws-sdk-s3", "aws-smithy-runtime", "aws-smithy-runtime-api", + "aws-smithy-types", "aws-types", "bytes", "bytesize", + "futures", "http 1.1.0", "hyper-tls 0.5.0", "mz-ore", + "pin-project", "thiserror", "tokio", "uuid", @@ -7137,13 +7140,17 @@ dependencies = [ "arrow", "async-compression", "async-stream", + "aws-smithy-types", "aws-types", "bytes", "bytesize", "csv-async", + "derivative", "differential-dataflow", "futures", + "glob", "http 1.1.0", + "itertools 0.12.1", "mz-arrow-util", "mz-aws-util", "mz-dyncfg", diff --git a/src/adapter/src/coord/sequencer.rs b/src/adapter/src/coord/sequencer.rs index afa6b4d2a90b9..a8cc8b837ce81 100644 --- a/src/adapter/src/coord/sequencer.rs +++ b/src/adapter/src/coord/sequencer.rs @@ -373,7 +373,7 @@ impl Coordinator { session, ); } - CopyFromSource::Url(_) => { + CopyFromSource::Url(_) | CopyFromSource::AwsS3 { .. } => { self.sequence_copy_from(ctx, plan, target_cluster).await; } }, diff --git a/src/adapter/src/coord/sequencer/inner/copy_from.rs b/src/adapter/src/coord/sequencer/inner/copy_from.rs index 80efef8d5f1fa..112c1251b516a 100644 --- a/src/adapter/src/coord/sequencer/inner/copy_from.rs +++ b/src/adapter/src/coord/sequencer/inner/copy_from.rs @@ -7,12 +7,14 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::str::FromStr; + use mz_adapter_types::connection::ConnectionId; use mz_ore::cast::CastInto; use mz_persist_client::batch::ProtoBatch; use mz_pgcopy::CopyFormatParams; use mz_repr::{CatalogItemId, Datum, RowArena}; -use mz_sql::plan::{self, CopyFromSource, HirScalarExpr}; +use mz_sql::plan::{self, CopyFromFilter, CopyFromSource, HirScalarExpr}; use mz_sql::session::metadata::SessionMetadata; use mz_storage_client::client::TableData; use mz_storage_types::oneshot_sources::OneshotIngestionRequest; @@ -38,16 +40,10 @@ impl Coordinator { source, columns: _, params, + filter, } = plan; - let from_expr = match source { - CopyFromSource::Url(from_expr) => from_expr, - CopyFromSource::Stdin => { - unreachable!("COPY FROM STDIN should be handled elsewhere") - } - }; - - let eval_url = |from: HirScalarExpr| -> Result { + let eval_uri = |from: HirScalarExpr| -> Result { let style = ExprPrepStyle::OneShot { logical_time: EvalTime::NotAvailable, session: ctx.session(), @@ -66,10 +62,8 @@ impl Coordinator { other => coord_bail!("programming error! COPY FROM target cannot be {other}"), }; - Url::parse(eval_string) - .map_err(|err| AdapterError::Unstructured(anyhow::anyhow!("{err}"))) + Ok(eval_string.to_string()) }; - let url = return_if_err!(eval_url(from_expr), ctx); // We check in planning that we're copying into a Table, but be defensive. let Some(dest_table) = self.catalog().get_entry(&id).table() else { @@ -93,9 +87,70 @@ impl Coordinator { } }; + let source = match source { + CopyFromSource::Url(from_expr) => { + let url = return_if_err!(eval_uri(from_expr), ctx); + // TODO(cf2): Structured errors. + let result = Url::parse(&url) + .map_err(|err| AdapterError::Unstructured(anyhow::anyhow!("{err}"))); + let url = return_if_err!(result, ctx); + + mz_storage_types::oneshot_sources::ContentSource::Http { url } + } + CopyFromSource::AwsS3 { + uri, + connection, + connection_id, + } => { + let uri = return_if_err!(eval_uri(uri), ctx); + + // Validate the URI is an S3 URI, with a bucket name. We rely on validating here + // and expect it in clusterd. + // + // TODO(cf2): Structured errors. + let result = http::Uri::from_str(&uri) + .map_err(|err| { + AdapterError::Unstructured(anyhow::anyhow!("expected S3 uri: {err}")) + }) + .and_then(|uri| { + if uri.scheme_str() != Some("s3") { + coord_bail!("only 's3://...' urls are supported as COPY FROM target"); + } + Ok(uri) + }) + .and_then(|uri| { + if uri.host().is_none() { + coord_bail!("missing bucket name from 's3://...' url"); + } + Ok(uri) + }); + let uri = return_if_err!(result, ctx); + + mz_storage_types::oneshot_sources::ContentSource::AwsS3 { + connection, + connection_id, + uri: uri.to_string(), + } + } + CopyFromSource::Stdin => { + unreachable!("COPY FROM STDIN should be handled elsewhere") + } + }; + + let filter = match filter { + None => mz_storage_types::oneshot_sources::ContentFilter::None, + Some(CopyFromFilter::Files(files)) => { + mz_storage_types::oneshot_sources::ContentFilter::Files(files) + } + Some(CopyFromFilter::Pattern(pattern)) => { + mz_storage_types::oneshot_sources::ContentFilter::Pattern(pattern) + } + }; + let request = OneshotIngestionRequest { - source: mz_storage_types::oneshot_sources::ContentSource::Http { url }, + source, format, + filter, }; let target_cluster = match self diff --git a/src/aws-util/Cargo.toml b/src/aws-util/Cargo.toml index 002f20e087766..4fde2216e29f7 100644 --- a/src/aws-util/Cargo.toml +++ b/src/aws-util/Cargo.toml @@ -12,15 +12,20 @@ workspace = true [dependencies] anyhow = "1.0.66" aws-config = { version = "1.2.0", default-features = false } -aws-sdk-s3 = { version = "1.23.0", default-features = false, features = ["rt-tokio"], optional = true } +aws-sdk-s3 = { version = "1.23.0", default-features = false, features = [ + "rt-tokio", +], optional = true } aws-smithy-runtime-api = "1.1.1" aws-smithy-runtime = { version = "1.1.1", features = ["connector-hyper-0-14-x"] } +aws-smithy-types = { version = "1.1.8", features = ["byte-stream-poll-next"] } aws-types = "1.1.1" bytes = "1.3.0" bytesize = "1.1.0" +futures = "0.3.25" http = "1.1.0" hyper-tls = "0.5.0" -mz-ore = { path = "../ore", default-features = false } +mz-ore = { path = "../ore", features = ["async"], default-features = false } +pin-project = "1.0.12" thiserror = "1.0.37" tokio = { version = "1.38.0", default-features = false, features = ["macros"] } uuid = { version = "1.7.0", features = ["v4"] } diff --git a/src/aws-util/src/s3.rs b/src/aws-util/src/s3.rs index 8173c3439dca8..931abc068f7d5 100644 --- a/src/aws-util/src/s3.rs +++ b/src/aws-util/src/s3.rs @@ -8,8 +8,10 @@ // by the Apache License, Version 2.0. use aws_sdk_s3::config::Builder; -use aws_sdk_s3::Client; use aws_types::sdk_config::SdkConfig; +use bytes::Bytes; + +pub use aws_sdk_s3::Client; /// Creates a new client from an [SDK config](aws_types::sdk_config::SdkConfig) /// with Materialize-specific customizations. @@ -46,3 +48,30 @@ pub async fn list_bucket_path( }) .transpose() } + +/// A wrapper around [`ByteStream`] that implements the [`futures::stream::Stream`] trait. +/// +/// [`ByteStream`]: aws_smithy_types::byte_stream::ByteStream +#[pin_project::pin_project] +pub struct ByteStreamAdapter { + #[pin] + inner: aws_smithy_types::byte_stream::ByteStream, +} + +impl ByteStreamAdapter { + pub fn new(bytes: aws_smithy_types::byte_stream::ByteStream) -> Self { + ByteStreamAdapter { inner: bytes } + } +} + +impl futures::stream::Stream for ByteStreamAdapter { + type Item = Result; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.project(); + aws_smithy_types::byte_stream::ByteStream::poll_next(this.inner, cx) + } +} diff --git a/src/sql-lexer/src/keywords.txt b/src/sql-lexer/src/keywords.txt index d58bf1815a484..14ef39c8b4b1a 100644 --- a/src/sql-lexer/src/keywords.txt +++ b/src/sql-lexer/src/keywords.txt @@ -174,6 +174,7 @@ Features Fetch Fields File +Files Filter First Fixpoint @@ -320,6 +321,7 @@ Partition Partitions Password Path +Pattern Physical Plan Plans diff --git a/src/sql-parser/src/ast/defs/statement.rs b/src/sql-parser/src/ast/defs/statement.rs index 8891a63dd9494..c9665e39822ff 100644 --- a/src/sql-parser/src/ast/defs/statement.rs +++ b/src/sql-parser/src/ast/defs/statement.rs @@ -374,6 +374,8 @@ pub enum CopyOptionName { Header, AwsConnection, MaxFileSize, + Files, + Pattern, } impl AstDisplay for CopyOptionName { @@ -387,6 +389,8 @@ impl AstDisplay for CopyOptionName { CopyOptionName::Header => "HEADER", CopyOptionName::AwsConnection => "AWS CONNECTION", CopyOptionName::MaxFileSize => "MAX FILE SIZE", + CopyOptionName::Files => "FILES", + CopyOptionName::Pattern => "PATTERN", }) } } @@ -407,6 +411,7 @@ impl WithOptionName for CopyOptionName { | CopyOptionName::Header | CopyOptionName::AwsConnection | CopyOptionName::MaxFileSize => false, + CopyOptionName::Files | CopyOptionName::Pattern => true, } } } diff --git a/src/sql-parser/src/parser.rs b/src/sql-parser/src/parser.rs index f55209e99e1b9..cfae801c2a2e3 100644 --- a/src/sql-parser/src/parser.rs +++ b/src/sql-parser/src/parser.rs @@ -6403,9 +6403,9 @@ impl<'a> Parser<'a> { } fn parse_copy_option(&mut self) -> Result, ParserError> { - let name = match self - .expect_one_of_keywords(&[FORMAT, DELIMITER, NULL, ESCAPE, QUOTE, HEADER, AWS, MAX])? - { + let name = match self.expect_one_of_keywords(&[ + FORMAT, DELIMITER, NULL, ESCAPE, QUOTE, HEADER, AWS, MAX, FILES, PATTERN, + ])? { FORMAT => CopyOptionName::Format, DELIMITER => CopyOptionName::Delimiter, NULL => CopyOptionName::Null, @@ -6423,6 +6423,8 @@ impl<'a> Parser<'a> { self.expect_keywords(&[FILE, SIZE])?; CopyOptionName::MaxFileSize } + FILES => CopyOptionName::Files, + PATTERN => CopyOptionName::Pattern, _ => unreachable!(), }; Ok(CopyOption { diff --git a/src/sql-parser/tests/testdata/copy b/src/sql-parser/tests/testdata/copy index 6cc286f74e709..694f8f5326398 100644 --- a/src/sql-parser/tests/testdata/copy +++ b/src/sql-parser/tests/testdata/copy @@ -79,7 +79,7 @@ Copy(CopyStatement { relation: Named { name: Name(UnresolvedItemName([Ident("t") parse-statement COPY t TO STDOUT () ---- -error: Expected one of FORMAT or DELIMITER or NULL or ESCAPE or QUOTE or HEADER or AWS or MAX, found right parenthesis +error: Expected one of FORMAT or DELIMITER or NULL or ESCAPE or QUOTE or HEADER or AWS or MAX or FILES or PATTERN, found right parenthesis COPY t TO STDOUT () ^ @@ -184,3 +184,24 @@ COPY INTO t(a, b) TO '/any/path' error: Expected identifier, found INTO COPY INTO t(a, b) TO '/any/path' ^ + +parse-statement +COPY INTO t1 FROM 'http://spacemonkey.info' WITH (FILES = ['foo.csv', 'bar.csv']); +---- +COPY t1 FROM 'http://spacemonkey.info' WITH (FILES = ('foo.csv', 'bar.csv')) +=> +Copy(CopyStatement { relation: Named { name: Name(UnresolvedItemName([Ident("t1")])), columns: [] }, direction: From, target: Expr(Value(String("http://spacemonkey.info"))), options: [CopyOption { name: Files, value: Some(Sequence([Value(String("foo.csv")), Value(String("bar.csv"))])) }] }) + +parse-statement +COPY INTO t1 FROM 'http://spacemonkey.info' WITH (FILES = ['foo.csv', 'bar.csv'], FORMAT CSV); +---- +COPY t1 FROM 'http://spacemonkey.info' WITH (FILES = ('foo.csv', 'bar.csv'), FORMAT = csv) +=> +Copy(CopyStatement { relation: Named { name: Name(UnresolvedItemName([Ident("t1")])), columns: [] }, direction: From, target: Expr(Value(String("http://spacemonkey.info"))), options: [CopyOption { name: Files, value: Some(Sequence([Value(String("foo.csv")), Value(String("bar.csv"))])) }, CopyOption { name: Format, value: Some(UnresolvedItemName(UnresolvedItemName([Ident("csv")]))) }] }) + +parse-statement +COPY INTO t1 FROM 'http://spacemonkey.info' WITH (FILES = ['foo.csv']); +---- +COPY t1 FROM 'http://spacemonkey.info' WITH (FILES = ('foo.csv')) +=> +Copy(CopyStatement { relation: Named { name: Name(UnresolvedItemName([Ident("t1")])), columns: [] }, direction: From, target: Expr(Value(String("http://spacemonkey.info"))), options: [CopyOption { name: Files, value: Some(Sequence([Value(String("foo.csv"))])) }] }) diff --git a/src/sql/src/plan.rs b/src/sql/src/plan.rs index 54154a2a6883c..35f67bca9f114 100644 --- a/src/sql/src/plan.rs +++ b/src/sql/src/plan.rs @@ -936,16 +936,32 @@ pub struct CopyFromPlan { pub source: CopyFromSource, pub columns: Vec, pub params: CopyFormatParams<'static>, + pub filter: Option, } #[derive(Debug)] pub enum CopyFromSource { /// Copying from a file local to the user, transmitted via pgwire. Stdin, - /// A remote resource, e.g. S3. + /// A remote resource, e.g. HTTP file. /// /// The contained [`HirScalarExpr`] evaluates to the Url for the remote resource. Url(HirScalarExpr), + /// A file in an S3 bucket. + AwsS3 { + /// Expression that evaluates to the file we want to copy. + uri: HirScalarExpr, + /// Details for how we connect to AWS S3. + connection: AwsConnection, + /// ID of the connection object. + connection_id: CatalogItemId, + }, +} + +#[derive(Debug)] +pub enum CopyFromFilter { + Files(Vec), + Pattern(String), } #[derive(Debug, Clone)] diff --git a/src/sql/src/plan/statement/dml.rs b/src/sql/src/plan/statement/dml.rs index e757bfed202b6..41034fdeef264 100644 --- a/src/sql/src/plan/statement/dml.rs +++ b/src/sql/src/plan/statement/dml.rs @@ -54,8 +54,8 @@ use crate::plan::query::{plan_expr, plan_up_to, ExprContext, QueryLifetime}; use crate::plan::scope::Scope; use crate::plan::statement::{ddl, StatementContext, StatementDesc}; use crate::plan::{ - self, side_effecting_func, transform_ast, CopyToPlan, CreateSinkPlan, ExplainPushdownPlan, - ExplainSinkSchemaPlan, ExplainTimestampPlan, + self, side_effecting_func, transform_ast, CopyFromFilter, CopyToPlan, CreateSinkPlan, + ExplainPushdownPlan, ExplainSinkSchemaPlan, ExplainTimestampPlan, }; use crate::plan::{ query, CopyFormat, CopyFromPlan, ExplainPlanPlan, InsertPlan, MutationKind, Params, Plan, @@ -1143,7 +1143,24 @@ fn plan_copy_from( }; let from = plan_expr(ecx, &from_expr)?.type_as(ecx, &ScalarType::String)?; - CopyFromSource::Url(from) + match options.aws_connection { + Some(conn_id) => { + let conn_id = CatalogItemId::from(conn_id); + + // Validate the connection type is one we expect. + let connection = match scx.get_item(&conn_id).connection()? { + mz_storage_types::connections::Connection::Aws(conn) => conn, + _ => sql_bail!("only AWS CONNECTION is supported in COPY ... FROM"), + }; + + CopyFromSource::AwsS3 { + uri: from, + connection, + connection_id: conn_id, + } + } + None => CopyFromSource::Url(from), + } } CopyTarget::Stdout => bail_never_supported!("COPY FROM {} not supported", target), }; @@ -1180,12 +1197,24 @@ fn plan_copy_from( CopyFormat::Parquet => bail_unsupported!("FORMAT PARQUET"), }; + let filter = match (options.files, options.pattern) { + (Some(_), Some(_)) => bail_unsupported!("must specify one of FILES or PATTERN"), + (Some(files), None) => Some(CopyFromFilter::Files(files)), + (None, Some(pattern)) => Some(CopyFromFilter::Pattern(pattern)), + (None, None) => None, + }; + + if filter.is_some() && matches!(source, CopyFromSource::Stdin) { + bail_unsupported!("COPY FROM ... WITH (FILES ...) only supported from a URL") + } + let (id, _, columns) = query::plan_copy_from(scx, table_name, columns)?; Ok(Plan::CopyFrom(CopyFromPlan { id, source, columns, params, + filter, })) } @@ -1206,7 +1235,9 @@ generate_extracted_config!( (Quote, String), (Header, bool), (AwsConnection, with_options::Object), - (MaxFileSize, ByteSize, Default(ByteSize::mb(256))) + (MaxFileSize, ByteSize, Default(ByteSize::mb(256))), + (Files, Vec), + (Pattern, String) ); pub fn plan_copy( diff --git a/src/sql/src/rbac.rs b/src/sql/src/rbac.rs index fabb8603ceb10..bba722dbcbf18 100644 --- a/src/sql/src/rbac.rs +++ b/src/sql/src/rbac.rs @@ -841,6 +841,7 @@ fn generate_rbac_requirements( source: _, columns: _, params: _, + filter: _, }) => RbacRequirements { privileges: vec![ ( diff --git a/src/storage-operators/Cargo.toml b/src/storage-operators/Cargo.toml index 9dcc08ab9c819..b442b4c7b6430 100644 --- a/src/storage-operators/Cargo.toml +++ b/src/storage-operators/Cargo.toml @@ -15,12 +15,16 @@ arrow = { version = "53.3.0", default-features = false } async-compression = { version = "0.4.5", features = ["bzip2", "gzip", "tokio", "xz", "zstd"] } async-stream = "0.3.3" aws-types = "1.1.1" +aws-smithy-types = "1.1.8" bytes = "1.3.0" bytesize = "1.1.0" -differential-dataflow = "0.13.4" csv-async = { version = "1.3.0", features = ["tokio"] } +derivative = "2.2.0" +differential-dataflow = "0.13.3" futures = "0.3.25" +glob = "0.3.2" http = "1.1.0" +itertools = "0.12.1" mz-aws-util = { path = "../aws-util" } mz-arrow-util = { path = "../arrow-util" } mz-dyncfg = { path = "../dyncfg" } diff --git a/src/storage-operators/src/oneshot_source.rs b/src/storage-operators/src/oneshot_source.rs index 17ec3f46bf76c..7755d087b3c2e 100644 --- a/src/storage-operators/src/oneshot_source.rs +++ b/src/storage-operators/src/oneshot_source.rs @@ -75,8 +75,11 @@ use mz_persist_client::cache::PersistClientCache; use mz_persist_client::Diagnostics; use mz_persist_types::codec_impls::UnitSchema; use mz_repr::{Diff, GlobalId, Row, Timestamp}; +use mz_storage_types::connections::ConnectionContext; use mz_storage_types::controller::CollectionMetadata; -use mz_storage_types::oneshot_sources::{ContentFormat, ContentSource, OneshotIngestionRequest}; +use mz_storage_types::oneshot_sources::{ + ContentFilter, ContentFormat, ContentSource, OneshotIngestionRequest, +}; use mz_storage_types::sources::SourceData; use mz_timely_util::builder_async::{ Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton, @@ -84,7 +87,7 @@ use mz_timely_util::builder_async::{ use mz_timely_util::pact::Distribute; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; -use std::collections::LinkedList; +use std::collections::{BTreeSet, LinkedList}; use std::fmt::{Debug, Display}; use std::future::Future; use timely::container::CapacityContainerBuilder; @@ -93,10 +96,13 @@ use timely::dataflow::{Scope, Stream as TimelyStream}; use timely::progress::Antichain; use tracing::info; +use crate::oneshot_source::aws_source::{AwsS3Source, S3Checksum, S3Object}; use crate::oneshot_source::csv::{CsvDecoder, CsvRecord, CsvWorkRequest}; use crate::oneshot_source::http_source::{HttpChecksum, HttpObject, HttpOneshotSource}; pub mod csv; + +pub mod aws_source; pub mod http_source; /// Render a dataflow to do a "oneshot" ingestion. @@ -119,6 +125,7 @@ pub mod http_source; pub fn render( scope: G, persist_clients: Arc, + connection_context: ConnectionContext, collection_id: GlobalId, collection_meta: CollectionMetadata, request: OneshotIngestionRequest, @@ -128,14 +135,28 @@ where G: Scope, F: FnOnce(Result, String>) -> () + 'static, { - let OneshotIngestionRequest { source, format } = request; + let OneshotIngestionRequest { + source, + format, + filter, + } = request; let source = match source { ContentSource::Http { url } => { let source = HttpOneshotSource::new(reqwest::Client::default(), url); SourceKind::Http(source) } + ContentSource::AwsS3 { + connection, + connection_id, + uri, + } => { + let source = AwsS3Source::new(connection, connection_id, connection_context, uri); + SourceKind::AwsS3(source) + } }; + tracing::info!(?source, "created oneshot source"); + let format = match format { ContentFormat::Csv(params) => { let format = CsvDecoder::new(params, &collection_meta.relation_desc); @@ -145,7 +166,7 @@ where // Discover what objects are available to copy. let (objects_stream, discover_token) = - render_discover_objects(scope.clone(), collection_id, source.clone()); + render_discover_objects(scope.clone(), collection_id, source.clone(), filter); // Split the objects into individual units of work. let (work_stream, split_token) = render_split_work( scope.clone(), @@ -194,6 +215,7 @@ pub fn render_discover_objects( scope: G, collection_id: GlobalId, source: S, + filter: ContentFilter, ) -> ( TimelyStream>, PressOnDropButton, @@ -219,14 +241,38 @@ where return; } - info!(%collection_id, %worker_id, "CopyFrom Leader Discover"); + let filter = match ObjectFilter::try_new(filter) { + Ok(filter) => filter, + Err(err) => { + tracing::warn!(?err, "failed to create filter"); + start_handle.give(&start_cap, Err(StorageErrorXKind::generic(err).into())); + return; + } + }; let work = source.list().await.context("list"); match work { - Ok(objects) => objects - .into_iter() - .for_each(|object| start_handle.give(&start_cap, Ok(object))), - Err(err) => start_handle.give(&start_cap, Err(err)), + Ok(objects) => { + let names = objects.iter().map(|(o, _check)| o.name()); + let found: String = itertools::intersperse(names, ", ").collect(); + tracing::info!(%worker_id, %found, "listed objects"); + + let filtered: Vec<_> = objects + .into_iter() + .filter(|(o, _check)| filter.filter::(o)) + .collect(); + let names = filtered.iter().map(|(o, _check)| o.name()); + let returning: String = itertools::intersperse(names, ", ").collect(); + tracing::info!(%worker_id, %returning, "filtered objects"); + + filtered + .into_iter() + .for_each(|object| start_handle.give(&start_cap, Ok(object))) + } + Err(err) => { + tracing::warn!(?err, "failed to list oneshot source"); + start_handle.give(&start_cap, Err(err)) + } } }); @@ -279,6 +325,7 @@ where let format_ = format.clone(); let source_ = source.clone(); let work_requests = mz_ore::task::spawn(|| "split-work", async move { + info!(%worker_id, object = %object.name(), "splitting object"); format_.split_work(source_.clone(), object, checksum).await }) .await @@ -614,7 +661,7 @@ pub trait OneshotSource: Clone + Send { &'s self, object: Self::Object, checksum: Self::Checksum, - range: Option>, + range: Option>, ) -> BoxStream<'s, Result>; } @@ -624,9 +671,10 @@ pub trait OneshotSource: Clone + Send { /// making the trait object safe and it's easier to just wrap it in an enum. Also, this wrapper /// provides a convenient place to add [`StorageErrorXContext::context`] for all of our source /// types. -#[derive(Clone)] +#[derive(Clone, Debug)] pub(crate) enum SourceKind { Http(HttpOneshotSource), + AwsS3(AwsS3Source), } impl OneshotSource for SourceKind { @@ -645,6 +693,16 @@ impl OneshotSource for SourceKind { .collect(); Ok(objects) } + SourceKind::AwsS3(s3) => { + let objects = s3.list().await.context("s3")?; + let objects = objects + .into_iter() + .map(|(object, checksum)| { + (ObjectKind::AwsS3(object), ChecksumKind::AwsS3(checksum)) + }) + .collect(); + Ok(objects) + } } } @@ -652,7 +710,7 @@ impl OneshotSource for SourceKind { &'s self, object: Self::Object, checksum: Self::Checksum, - range: Option>, + range: Option>, ) -> BoxStream<'s, Result> { match (self, object, checksum) { (SourceKind::Http(http), ObjectKind::Http(object), ChecksumKind::Http(checksum)) => { @@ -660,6 +718,13 @@ impl OneshotSource for SourceKind { .map(|result| result.context("http")) .boxed() } + (SourceKind::AwsS3(s3), ObjectKind::AwsS3(object), ChecksumKind::AwsS3(checksum)) => s3 + .get(object, checksum, range) + .map(|result| result.context("aws_s3")) + .boxed(), + (SourceKind::AwsS3(_) | SourceKind::Http(_), _, _) => { + unreachable!("programming error! wrong source, object, and checksum kind"); + } } } } @@ -668,18 +733,21 @@ impl OneshotSource for SourceKind { #[derive(Debug, Clone, Serialize, Deserialize)] pub(crate) enum ObjectKind { Http(HttpObject), + AwsS3(S3Object), } impl OneshotObject for ObjectKind { fn name(&self) -> &str { match self { ObjectKind::Http(object) => object.name(), + ObjectKind::AwsS3(object) => object.name(), } } fn encodings(&self) -> &[Encoding] { match self { ObjectKind::Http(object) => object.encodings(), + ObjectKind::AwsS3(object) => object.encodings(), } } } @@ -688,6 +756,7 @@ impl OneshotObject for ObjectKind { #[derive(Debug, Clone, Serialize, Deserialize)] pub(crate) enum ChecksumKind { Http(HttpChecksum), + AwsS3(S3Checksum), } /// Defines a format that we fetch for a "one time" ingestion. @@ -801,6 +870,37 @@ pub(crate) enum RecordChunkKind { Csv(CsvRecord), } +pub(crate) enum ObjectFilter { + None, + Files(BTreeSet>), + Pattern(glob::Pattern), +} + +impl ObjectFilter { + pub fn try_new(filter: ContentFilter) -> Result { + match filter { + ContentFilter::None => Ok(ObjectFilter::None), + ContentFilter::Files(files) => { + let files = files.into_iter().map(|f| f.into()).collect(); + Ok(ObjectFilter::Files(files)) + } + ContentFilter::Pattern(pattern) => { + let pattern = glob::Pattern::new(&pattern)?; + Ok(ObjectFilter::Pattern(pattern)) + } + } + } + + /// Returns if the object should be included. + pub fn filter(&self, object: &S::Object) -> bool { + match self { + ObjectFilter::None => true, + ObjectFilter::Files(files) => files.contains(object.name()), + ObjectFilter::Pattern(pattern) => pattern.matches(object.name()), + } + } +} + /// Experimental Error Type. /// /// The goal of this type is to combine concepts from both `thiserror` and @@ -828,12 +928,20 @@ pub enum StorageErrorXKind { CsvDecoding(Arc), #[error("reqwest error: {0}")] Reqwest(Arc), + #[error("aws s3 request error: {0}")] + AwsS3Request(String), + #[error("aws s3 bytestream error: {0}")] + AwsS3Bytes(Arc), #[error("invalid reqwest header: {0}")] InvalidHeader(Arc), #[error("failed to decode Row from a record batch: {0}")] InvalidRecordBatch(Arc), #[error("programming error: {0}")] ProgrammingError(Arc), + #[error("failed to get the size of an object")] + MissingSize, + #[error("object is missing the required '{0}' field")] + MissingField(Arc), #[error("something went wrong: {0}")] Generic(String), } @@ -856,6 +964,12 @@ impl From for StorageErrorXKind { } } +impl From for StorageErrorXKind { + fn from(err: aws_smithy_types::byte_stream::error::Error) -> Self { + StorageErrorXKind::AwsS3Request(err.to_string()) + } +} + impl StorageErrorXKind { pub fn with_context(self, context: C) -> StorageErrorX { StorageErrorX { diff --git a/src/storage-operators/src/oneshot_source/aws_source.rs b/src/storage-operators/src/oneshot_source/aws_source.rs new file mode 100644 index 0000000000000..75349fd2e71fe --- /dev/null +++ b/src/storage-operators/src/oneshot_source/aws_source.rs @@ -0,0 +1,223 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! AWS S3 [`OneshotSource`]. + +use std::path::Path; +use std::str::FromStr; +use std::sync::Arc; + +use derivative::Derivative; +use futures::stream::{BoxStream, TryStreamExt}; +use futures::StreamExt; +use mz_ore::future::InTask; +use mz_repr::CatalogItemId; +use mz_storage_types::connections::aws::AwsConnection; +use mz_storage_types::connections::ConnectionContext; +use serde::{Deserialize, Serialize}; + +use crate::oneshot_source::{ + OneshotObject, OneshotSource, StorageErrorX, StorageErrorXContext, StorageErrorXKind, +}; + +#[derive(Clone, Derivative)] +#[derivative(Debug)] +pub struct AwsS3Source { + // Only used for initialization. + #[derivative(Debug = "ignore")] + connection: Arc, + connection_id: CatalogItemId, + #[derivative(Debug = "ignore")] + context: Arc, + + /// Name of the S3 bucket we'll list from. + bucket: String, + /// Optional prefix that can be specified via an S3 URI. + prefix: Option, + /// S3 client that is lazily initialized. + #[derivative(Debug = "ignore")] + client: std::sync::OnceLock, +} + +impl AwsS3Source { + pub fn new( + connection: AwsConnection, + connection_id: CatalogItemId, + context: ConnectionContext, + uri: String, + ) -> Self { + let uri = http::Uri::from_str(&uri).expect("validated URI in sequencing"); + + let bucket = uri + .host() + .expect("validated host in sequencing") + .to_string(); + let prefix = if uri.path().is_empty() || uri.path() == "/" { + None + } else { + // The S3 client expects a trailing `/` but no leading `/`. + let mut prefix = uri.path().to_string(); + + if let Some(suffix) = prefix.strip_prefix('/') { + prefix = suffix.to_string(); + } + if !prefix.ends_with('/') { + prefix = format!("{prefix}/"); + } + + Some(prefix) + }; + + AwsS3Source { + connection: Arc::new(connection), + context: Arc::new(context), + connection_id, + bucket, + prefix, + client: std::sync::OnceLock::new(), + } + } + + pub async fn initialize(&self) -> Result { + let sdk_config = self + .connection + .load_sdk_config(&self.context, self.connection_id, InTask::Yes) + .await?; + let s3_client = mz_aws_util::s3::new_client(&sdk_config); + + Ok(s3_client) + } + + pub async fn client(&self) -> Result<&mz_aws_util::s3::Client, anyhow::Error> { + if self.client.get().is_none() { + let client = self.initialize().await?; + let _ = self.client.set(client); + } + + Ok(self.client.get().expect("just initialized")) + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct S3Object { + /// Key from S3 list operation. + key: String, + /// Name of the object, generally the last component of the key. + name: String, + /// Size of the object in bytes. + size: usize, +} + +impl OneshotObject for S3Object { + fn name(&self) -> &str { + &self.name + } + + fn encodings(&self) -> &[super::Encoding] { + &[] + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct S3Checksum { + e_tag: Option, +} + +impl OneshotSource for AwsS3Source { + type Object = S3Object; + type Checksum = S3Checksum; + + async fn list<'a>( + &'a self, + ) -> Result, super::StorageErrorX> { + let client = self.client().await.map_err(StorageErrorXKind::generic)?; + let mut objects_request = client.list_objects_v2().bucket(&self.bucket); + + // Users can optionally specify a prefix via the S3 uri they originally specify. + if let Some(prefix) = &self.prefix { + objects_request = objects_request.prefix(prefix); + } + + let objects = objects_request + .send() + .await + .map_err(StorageErrorXKind::generic) + .context("list_objects_v2")?; + + // TODO(cf1): Pagination. + + let objects: Vec<_> = objects + .contents() + .iter() + .map(|o| { + let key = o + .key() + .ok_or_else(|| StorageErrorXKind::MissingField("key".into()))? + .to_owned(); + let name = Path::new(&key) + .file_name() + .and_then(|os_name| os_name.to_str()) + .ok_or_else(|| StorageErrorXKind::Generic(format!("malformed key: {key}")))? + .to_string(); + let size = o + .size() + .ok_or_else(|| StorageErrorXKind::MissingField("size".into()))?; + let size: usize = size.try_into().map_err(StorageErrorXKind::generic)?; + + let object = S3Object { key, name, size }; + let checksum = S3Checksum { + e_tag: o.e_tag().map(|x| x.to_owned()), + }; + + Ok::<_, StorageErrorXKind>((object, checksum)) + }) + .collect::>() + .context("list")?; + + Ok(objects) + } + + fn get<'s>( + &'s self, + object: Self::Object, + _checksum: Self::Checksum, + range: Option>, + ) -> BoxStream<'s, Result> { + let initial_response = async move { + tracing::info!(name = %object.name(), ?range, "fetching object"); + + // TODO(cf1): Validate our checksum. + let client = self.client().await.map_err(StorageErrorXKind::generic)?; + + let mut request = client.get_object().bucket(&self.bucket).key(&object.name); + if let Some(range) = range { + // See the below link for the specifics of this format. + // + // + let range = format!("byte={}-{}", range.start(), range.end()); + request = request.range(range); + } + + let object = request + .send() + .await + .map_err(|err| StorageErrorXKind::AwsS3Request(err.to_string()))?; + // AWS's ByteStream doesn't implement the Stream trait. + let stream = mz_aws_util::s3::ByteStreamAdapter::new(object.body) + .err_into() + .boxed(); + + Ok::<_, StorageErrorXKind>(stream) + }; + + futures::stream::once(initial_response) + .try_flatten() + .boxed() + } +} diff --git a/src/storage-operators/src/oneshot_source/http_source.rs b/src/storage-operators/src/oneshot_source/http_source.rs index b87555ba135b3..bfbc1236f70c9 100644 --- a/src/storage-operators/src/oneshot_source/http_source.rs +++ b/src/storage-operators/src/oneshot_source/http_source.rs @@ -10,6 +10,7 @@ //! Generic HTTP oneshot source that will fetch a file from the public internet. use bytes::Bytes; +use derivative::Derivative; use futures::stream::{BoxStream, StreamExt}; use futures::TryStreamExt; use reqwest::Client; @@ -21,8 +22,10 @@ use crate::oneshot_source::{ }; /// Generic oneshot source that fetches a file from a URL on the public internet. -#[derive(Clone)] +#[derive(Clone, Derivative)] +#[derivative(Debug)] pub struct HttpOneshotSource { + #[derivative(Debug = "ignore")] client: Client, origin: Url, } @@ -122,7 +125,7 @@ impl OneshotSource for HttpOneshotSource { &'s self, object: Self::Object, _checksum: Self::Checksum, - _range: Option>, + _range: Option>, ) -> BoxStream<'s, Result> { // TODO(cf1): Support the range param. // TODO(cf1): Validate our checksum. diff --git a/src/storage-types/src/oneshot_sources.proto b/src/storage-types/src/oneshot_sources.proto index af10f61f637fa..eb88a65a2ccf4 100644 --- a/src/storage-types/src/oneshot_sources.proto +++ b/src/storage-types/src/oneshot_sources.proto @@ -11,22 +11,46 @@ syntax = "proto3"; package mz_storage_types.oneshot_sources; +import "google/protobuf/empty.proto"; import "pgcopy/src/copy.proto"; +import "repr/src/catalog_item_id.proto"; +import "storage-types/src/connections/aws.proto"; message ProtoOneshotIngestionRequest { oneof source { ProtoHttpContentSource http = 1; + ProtoAwsS3Source aws_s3 = 4; } oneof format { ProtoCsvContentFormat csv = 2; } + + oneof filter { + google.protobuf.Empty none = 5; + ProtoFilterFiles files = 6; + ProtoFilterPattern pattern = 7; + } } message ProtoHttpContentSource { string url = 1; } +message ProtoAwsS3Source { + mz_storage_types.connections.aws.ProtoAwsConnection connection = 1; + mz_repr.catalog_item_id.ProtoCatalogItemId connection_id = 2; + string uri = 3; +} + message ProtoCsvContentFormat { mz_pgcopy.copy.ProtoCopyCsvFormatParams params = 1; } + +message ProtoFilterFiles { + repeated string files = 1; +} + +message ProtoFilterPattern { + string pattern = 1; +} diff --git a/src/storage-types/src/oneshot_sources.rs b/src/storage-types/src/oneshot_sources.rs index 25167220c3932..5e679081269ee 100644 --- a/src/storage-types/src/oneshot_sources.rs +++ b/src/storage-types/src/oneshot_sources.rs @@ -11,12 +11,15 @@ use mz_pgcopy::CopyCsvFormatParams; use mz_proto::{IntoRustIfSome, RustType}; +use mz_repr::CatalogItemId; use mz_timely_util::builder_async::PressOnDropButton; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc::UnboundedReceiver; use url::Url; +use crate::connections::aws::AwsConnection; + include!(concat!( env!("OUT_DIR"), "/mz_storage_types.oneshot_sources.rs" @@ -37,6 +40,7 @@ pub struct OneshotIngestionDescription { pub struct OneshotIngestionRequest { pub source: ContentSource, pub format: ContentFormat, + pub filter: ContentFilter, } impl RustType for OneshotIngestionRequest { @@ -44,6 +48,7 @@ impl RustType for OneshotIngestionRequest { ProtoOneshotIngestionRequest { source: Some(self.source.into_proto()), format: Some(self.format.into_proto()), + filter: Some(self.filter.into_proto()), } } @@ -56,14 +61,28 @@ impl RustType for OneshotIngestionRequest { let format = proto .format .into_rust_if_some("ProtoOneshotIngestionRequest::format")?; - - Ok(OneshotIngestionRequest { source, format }) + let filter = proto + .filter + .into_rust_if_some("ProtoOneshotIngestionRequest::filter")?; + + Ok(OneshotIngestionRequest { + source, + format, + filter, + }) } } #[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] pub enum ContentSource { - Http { url: Url }, + Http { + url: Url, + }, + AwsS3 { + connection: AwsConnection, + connection_id: CatalogItemId, + uri: String, + }, } impl RustType for ContentSource { @@ -74,6 +93,15 @@ impl RustType for ContentSource { url: url.to_string(), }) } + ContentSource::AwsS3 { + connection, + connection_id, + uri, + } => proto_oneshot_ingestion_request::Source::AwsS3(ProtoAwsS3Source { + connection: Some(connection.into_proto()), + connection_id: Some(connection_id.into_proto()), + uri: uri.to_string(), + }), } } @@ -85,6 +113,17 @@ impl RustType for ContentSource { let url = Url::parse(&source.url).expect("failed to roundtrip Url"); Ok(ContentSource::Http { url }) } + proto_oneshot_ingestion_request::Source::AwsS3(source) => { + let connection = source.connection.into_rust_if_some("AwsS3::connection")?; + let connection_id = source + .connection_id + .into_rust_if_some("AwsS3::connection_id")?; + Ok(ContentSource::AwsS3 { + connection, + connection_id, + uri: source.uri, + }) + } } } } @@ -116,3 +155,47 @@ impl RustType for ContentFormat { } } } + +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] +pub enum ContentFilter { + /// No filtering, fetch everything at a remote resource. + None, + /// Filter to only the files specified in this list. + Files(Vec), + /// Regex pattern to filter the files with. + Pattern(String), +} + +impl RustType for ContentFilter { + fn into_proto(&self) -> proto_oneshot_ingestion_request::Filter { + match self { + ContentFilter::None => { + proto_oneshot_ingestion_request::Filter::None(Default::default()) + } + ContentFilter::Files(files) => { + proto_oneshot_ingestion_request::Filter::Files(ProtoFilterFiles { + files: files.clone(), + }) + } + ContentFilter::Pattern(pattern) => { + proto_oneshot_ingestion_request::Filter::Pattern(ProtoFilterPattern { + pattern: pattern.clone(), + }) + } + } + } + + fn from_proto( + proto: proto_oneshot_ingestion_request::Filter, + ) -> Result { + match proto { + proto_oneshot_ingestion_request::Filter::None(()) => Ok(ContentFilter::None), + proto_oneshot_ingestion_request::Filter::Files(files) => { + Ok(ContentFilter::Files(files.files)) + } + proto_oneshot_ingestion_request::Filter::Pattern(pattern) => { + Ok(ContentFilter::Pattern(pattern.pattern)) + } + } + } +} diff --git a/src/storage/src/render.rs b/src/storage/src/render.rs index d9882e77f365f..98c6b1f20f0d8 100644 --- a/src/storage/src/render.rs +++ b/src/storage/src/render.rs @@ -489,11 +489,16 @@ pub(crate) fn build_oneshot_ingestion_dataflow( // here, but that might run into the infamous async-Drop problem. let _ = results_tx.send(result); }; + let connection_context = storage_state + .storage_configuration + .connection_context + .clone(); let tokens = timely_worker.dataflow(|scope| { mz_storage_operators::oneshot_source::render( scope.clone(), Arc::clone(&storage_state.persist_clients), + connection_context, collection_id, collection_meta, description,