Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: remove TEST SCRIPT source #25247

Merged
merged 1 commit into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 0 additions & 33 deletions doc/developer/testdrive.md
Original file line number Diff line number Diff line change
Expand Up @@ -643,39 +643,6 @@ Adjust the number of tries testdrive will perform while waiting for a SQL statem
Set `max-tries` to `1` in order to ensure that statements are executed only once. If the desired result is not achieved on the first try, the test will fail. This is
useful when testing operations that should return the right result immediately rather than eventually.

## `TEST SCRIPT` sources
`TEST SCRIPT` sources can be a useful to have a source that emits data in specific pattern,
without setting up data in a local source. They are created as follows:

```
> CREATE SOURCE unit
FROM TEST SCRIPT
'[
{"command": "emit", "key": "fish", "value": "value", "offset": 0},
{"command": "emit", "key": "fish2", "value": "hmm", "offset": 1},
{"command": "emit" ,"key": "fish", "value": "value2", "offset": 2}
]'
KEY FORMAT BYTES
VALUE FORMAT BYTES
ENVELOPE UPSERT
```

Each "command" can be:
- `"emit"`: emit data at a specific offset. The `"key"` is optional, but required
for some envelopes, like `UPSERT`
- `"terminate"`: terminate the source, ignoring all later commands. This closes the
source's `upper`
- The default behavior if there is no `"terminate"` command is for the source
to pend forever, after it processes all other commands.

Note that this soure has some limitations:
- It does not work with formats like `avro`
- It requires the key and value format to specified individually, as
it does not support CSR formats.

These limitations may be lifted in the future; additionally, more features may
be added to this source.

## Actions on local files

#### `$ file-append path=file.name [compression=gzip] [repeat=N]`
Expand Down
2 changes: 0 additions & 2 deletions src/sql-lexer/src/keywords.txt
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,6 @@ Sasl
Scale
Schema
Schemas
Script
Second
Seconds
Secret
Expand Down Expand Up @@ -377,7 +376,6 @@ Tables
Tail
Temp
Temporary
Test
Text
Then
Tick
Expand Down
9 changes: 0 additions & 9 deletions src/sql-parser/src/ast/defs/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -980,9 +980,6 @@ pub enum CreateSourceConnection<T: AstInfo> {
generator: LoadGenerator,
options: Vec<LoadGeneratorOption<T>>,
},
TestScript {
desc_json: String,
},
}

impl<T: AstInfo> AstDisplay for CreateSourceConnection<T> {
Expand Down Expand Up @@ -1033,12 +1030,6 @@ impl<T: AstInfo> AstDisplay for CreateSourceConnection<T> {
f.write_str(")");
}
}
CreateSourceConnection::TestScript { desc_json } => {
f.write_str("TEST SCRIPT ");
f.write_str("'");
f.write_str(&display::escape_single_quote_string(desc_json));
f.write_str("'");
}
}
}
}
Expand Down
8 changes: 1 addition & 7 deletions src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3001,7 +3001,7 @@ impl<'a> Parser<'a> {
fn parse_create_source_connection(
&mut self,
) -> Result<CreateSourceConnection<Raw>, ParserError> {
match self.expect_one_of_keywords(&[KAFKA, POSTGRES, MYSQL, LOAD, TEST])? {
match self.expect_one_of_keywords(&[KAFKA, POSTGRES, MYSQL, LOAD])? {
POSTGRES => {
self.expect_keyword(CONNECTION)?;
let connection = self.parse_raw_name()?;
Expand Down Expand Up @@ -3077,12 +3077,6 @@ impl<'a> Parser<'a> {
};
Ok(CreateSourceConnection::LoadGenerator { generator, options })
}
TEST => {
self.expect_keyword(SCRIPT)?;
Ok(CreateSourceConnection::TestScript {
desc_json: self.parse_literal_string()?,
})
}
_ => unreachable!(),
}
}
Expand Down
7 changes: 0 additions & 7 deletions src/sql-parser/tests/testdata/ddl
Original file line number Diff line number Diff line change
Expand Up @@ -2361,13 +2361,6 @@ CREATE SOURCE lg FROM LOAD GENERATOR TPCH
=>
CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("lg")]), in_cluster: None, col_names: [], connection: LoadGenerator { generator: Tpch, options: [] }, include_metadata: [], format: None, envelope: None, if_not_exists: false, key_constraint: None, with_options: [], referenced_subsources: None, progress_subsource: None })

parse-statement
CREATE SOURCE ts FROM TEST SCRIPT 'foo.json'
----
CREATE SOURCE ts FROM TEST SCRIPT 'foo.json'
=>
CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("ts")]), in_cluster: None, col_names: [], connection: TestScript { desc_json: "foo.json" }, include_metadata: [], format: None, envelope: None, if_not_exists: false, key_constraint: None, with_options: [], referenced_subsources: None, progress_subsource: None })

parse-statement
CREATE SOURCE psychic FROM POSTGRES CONNECTION pgconn (PUBLICATION 'red') with (IGNORE KEYS 'true', TIMELINE 'timeline', TIMESTAMP INTERVAL 'interval')
----
Expand Down
11 changes: 0 additions & 11 deletions src/sql/src/plan/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use crate::plan::scope::ScopeItem;
use crate::pure::error::{
CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
TestScriptSourcePurificationError,
};
use crate::session::vars::VarError;

Expand Down Expand Up @@ -236,7 +235,6 @@ pub enum PlanError {
PgSourcePurification(PgSourcePurificationError),
KafkaSourcePurification(KafkaSourcePurificationError),
KafkaSinkPurification(KafkaSinkPurificationError),
TestScriptSourcePurification(TestScriptSourcePurificationError),
LoadGeneratorSourcePurification(LoadGeneratorSourcePurificationError),
CsrPurification(CsrPurificationError),
MySqlSourcePurification(MySqlSourcePurificationError),
Expand Down Expand Up @@ -300,7 +298,6 @@ impl PlanError {
Self::InternalFunctionCall => Some("This function is for the internal use of the database system and cannot be called directly.".into()),
Self::PgSourcePurification(e) => e.detail(),
Self::KafkaSourcePurification(e) => e.detail(),
Self::TestScriptSourcePurification(e) => e.detail(),
Self::LoadGeneratorSourcePurification(e) => e.detail(),
Self::CsrPurification(e) => e.detail(),
Self::KafkaSinkPurification(e) => e.detail(),
Expand Down Expand Up @@ -404,7 +401,6 @@ impl PlanError {
Self::VarError(e) => e.hint(),
Self::PgSourcePurification(e) => e.hint(),
Self::KafkaSourcePurification(e) => e.hint(),
Self::TestScriptSourcePurification(e) => e.hint(),
Self::LoadGeneratorSourcePurification(e) => e.hint(),
Self::CsrPurification(e) => e.hint(),
Self::KafkaSinkPurification(e) => e.hint(),
Expand Down Expand Up @@ -663,7 +659,6 @@ impl fmt::Display for PlanError {
or LIMIT INPUT GROUP SIZE"),
Self::PgSourcePurification(e) => write!(f, "POSTGRES source validation: {}", e),
Self::KafkaSourcePurification(e) => write!(f, "KAFKA source validation: {}", e),
Self::TestScriptSourcePurification(e) => write!(f, "TEST SCRIPT source validation: {}", e),
Self::LoadGeneratorSourcePurification(e) => write!(f, "LOAD GENERATOR source validation: {}", e),
Self::KafkaSinkPurification(e) => write!(f, "KAFKA sink validation: {}", e),
Self::CsrPurification(e) => write!(f, "CONFLUENT SCHEMA REGISTRY validation: {}", e),
Expand Down Expand Up @@ -812,12 +807,6 @@ impl From<CsrPurificationError> for PlanError {
}
}

impl From<TestScriptSourcePurificationError> for PlanError {
fn from(e: TestScriptSourcePurificationError) -> Self {
PlanError::TestScriptSourcePurification(e)
}
}

impl From<LoadGeneratorSourcePurificationError> for PlanError {
fn from(e: LoadGeneratorSourcePurificationError) -> Self {
PlanError::LoadGeneratorSourcePurification(e)
Expand Down
8 changes: 0 additions & 8 deletions src/sql/src/plan/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ use mz_storage_types::sources::postgres::{
PostgresSourceConnection, PostgresSourcePublicationDetails,
ProtoPostgresSourcePublicationDetails,
};
use mz_storage_types::sources::testscript::TestScriptSourceConnection;
use mz_storage_types::sources::{GenericSourceConnection, SourceConnection, SourceDesc, Timeline};
use prost::Message;

Expand Down Expand Up @@ -1046,13 +1045,6 @@ pub fn plan_create_source(

(connection, available_subsources)
}
CreateSourceConnection::TestScript { desc_json } => {
scx.require_feature_flag(&vars::ENABLE_CREATE_SOURCE_FROM_TESTSCRIPT)?;
let connection = GenericSourceConnection::from(TestScriptSourceConnection {
desc_json: desc_json.clone(),
});
(connection, None)
}
};

let (available_subsources, requested_subsources) = match (
Expand Down
18 changes: 1 addition & 17 deletions src/sql/src/pure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ use crate::{kafka_util, normalize};
use self::error::{
CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
TestScriptSourcePurificationError,
};

pub(crate) mod error;
Expand Down Expand Up @@ -531,9 +530,6 @@ async fn purify_create_source(
CreateSourceConnection::LoadGenerator { .. } => {
&mz_storage_types::sources::load_generator::LOAD_GEN_PROGRESS_DESC
}
CreateSourceConnection::TestScript { .. } => {
&mz_storage_types::sources::testscript::TEST_SCRIPT_PROGRESS_DESC
}
};

match connection {
Expand Down Expand Up @@ -647,14 +643,6 @@ async fn purify_create_source(
}
}
}
CreateSourceConnection::TestScript { desc_json: _ } => {
if let Some(referenced_subsources) = referenced_subsources {
Err(TestScriptSourcePurificationError::ReferencedSubsources(
referenced_subsources.clone(),
))?;
}
// TODO: verify valid json and valid schema
}
CreateSourceConnection::Postgres {
connection,
options,
Expand Down Expand Up @@ -1520,12 +1508,8 @@ async fn purify_source_format(
storage_configuration: &StorageConfiguration,
) -> Result<(), PlanError> {
if matches!(format, Some(CreateSourceFormat::KeyValue { .. }))
&& !matches!(
connection,
CreateSourceConnection::Kafka { .. } | CreateSourceConnection::TestScript { .. }
)
&& !matches!(connection, CreateSourceConnection::Kafka { .. })
{
// We don't mention `TestScript` to users here
sql_bail!("Kafka sources are the only source type that can provide KEY/VALUE formats")
}

Expand Down
17 changes: 0 additions & 17 deletions src/sql/src/pure/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,23 +151,6 @@ impl KafkaSourcePurificationError {
}
}

/// Logical errors detectable during purification for a TEST SCRIPT SOURCE.
#[derive(Debug, Clone, thiserror::Error)]
pub enum TestScriptSourcePurificationError {
#[error("{} is only valid for multi-output sources", .0.to_ast_string())]
ReferencedSubsources(ReferencedSubsources<Aug>),
}

impl TestScriptSourcePurificationError {
pub fn detail(&self) -> Option<String> {
None
}

pub fn hint(&self) -> Option<String> {
None
}
}

/// Logical errors detectable during purification for a LOAD GENERATOR SOURCE.
#[derive(Debug, Clone, thiserror::Error)]
pub enum LoadGeneratorSourcePurificationError {
Expand Down
7 changes: 0 additions & 7 deletions src/sql/src/session/vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1779,13 +1779,6 @@ feature_flags!(
internal: true,
enable_for_item_parsing: true,
},
{
name: enable_create_source_from_testscript,
desc: "CREATE SOURCE ... FROM TEST SCRIPT",
default: false,
internal: true,
enable_for_item_parsing: true,
},
{
name: enable_date_bin_hopping,
desc: "the date_bin_hopping function",
Expand Down
1 change: 0 additions & 1 deletion src/storage-types/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ fn main() {
"storage-types/src/sources/mysql.proto",
"storage-types/src/sources/postgres.proto",
"storage-types/src/sources/load_generator.proto",
"storage-types/src/sources/testscript.proto",
],
&[".."],
)
Expand Down
4 changes: 1 addition & 3 deletions src/storage-types/src/sources.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import "storage-types/src/sources/kafka.proto";
import "storage-types/src/sources/load_generator.proto";
import "storage-types/src/sources/mysql.proto";
import "storage-types/src/sources/postgres.proto";
import "storage-types/src/sources/testscript.proto";

package mz_storage_types.sources;

Expand All @@ -48,12 +47,11 @@ message ProtoSourceDesc {
}

message ProtoSourceConnection {
reserved 2, 3, 5;
reserved 2, 3, 5, 7;
oneof kind {
mz_storage_types.sources.kafka.ProtoKafkaSourceConnection kafka = 1;
mz_storage_types.sources.postgres.ProtoPostgresSourceConnection postgres = 4;
mz_storage_types.sources.load_generator.ProtoLoadGeneratorSourceConnection loadgen = 6;
mz_storage_types.sources.testscript.ProtoTestScriptSourceConnection testscript = 7;
mz_storage_types.sources.mysql.ProtoMySqlSourceConnection mysql = 8;
}
}
Expand Down
Loading
Loading