Skip to content

Commit

Permalink
Add ignore columns for MySQL
Browse files Browse the repository at this point in the history
  • Loading branch information
bobbyiliev committed Sep 4, 2024
1 parent db24a67 commit 37c17c8
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 7 deletions.
2 changes: 1 addition & 1 deletion docs/resources/source_load_generator.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ resource "materialize_source_load_generator" "example_source_load_generator" {

### Optional

- `all_tables` (Boolean) Whether to include all tables in the source. Compatible with `auction_options`, `marketing_options`, and `tpch_options`. IF not specified, use the `materialize_source_table` resource to specify tables to include.
- `all_tables` (Boolean) Whether to include all tables in the source. Compatible with `auction_options`, `marketing_options`, and `tpch_options`. If not specified, use the `materialize_source_table` resource to specify tables to include.
- `auction_options` (Block List, Max: 1) Auction Options. (see [below for nested schema](#nestedblock--auction_options))
- `cluster_name` (String) The cluster to maintain this source.
- `comment` (String) **Public Preview** Comment on an object in the database.
Expand Down
2 changes: 1 addition & 1 deletion docs/resources/source_mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ resource "materialize_source_mysql" "test" {
- `comment` (String) **Public Preview** Comment on an object in the database.
- `database_name` (String) The identifier for the source database in Materialize. Defaults to `MZ_DATABASE` environment variable if set or `materialize` if environment variable is not set.
- `expose_progress` (Block List, Max: 1) The name of the progress collection for the source. If this is not specified, the collection will be named `<src_name>_progress`. (see [below for nested schema](#nestedblock--expose_progress))
- `ignore_columns` (List of String) Ignore specific columns when reading data from MySQL. Can only be updated in place when also updating a corresponding `table` attribute.
- `ignore_columns` (List of String, Deprecated) Ignore specific columns when reading data from MySQL. Can only be updated in place when also updating a corresponding `table` attribute.
- `ownership_role` (String) The owernship role of the object.
- `region` (String) The region to use for the resource connection. If not set, the default region is used.
- `schema_name` (String) The identifier for the source schema in Materialize. Defaults to `public`.
Expand Down
1 change: 1 addition & 0 deletions docs/resources/source_table.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ description: |-

- `comment` (String) **Public Preview** Comment on an object in the database.
- `database_name` (String) The identifier for the table database in Materialize. Defaults to `MZ_DATABASE` environment variable if set or `materialize` if environment variable is not set.
- `ignore_columns` (List of String) Ignore specific columns when reading data from MySQL. Only compatible with MySQL sources, if the source is not MySQL, the attribute will be ignored.
- `ownership_role` (String) The owernship role of the object.
- `region` (String) The region to use for the resource connection. If not set, the default region is used.
- `schema_name` (String) The identifier for the table schema in Materialize. Defaults to `public`.
Expand Down
24 changes: 22 additions & 2 deletions pkg/materialize/source_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ type SourceTableBuilder struct {
upstreamName string
upstreamSchemaName string
textColumns []string
ignoreColumns []string
}

func NewSourceTableBuilder(conn *sqlx.DB, obj MaterializeObject) *SourceTableBuilder {
Expand Down Expand Up @@ -122,6 +123,11 @@ func (b *SourceTableBuilder) TextColumns(c []string) *SourceTableBuilder {
return b
}

func (b *SourceTableBuilder) IgnoreColumns(c []string) *SourceTableBuilder {
b.ignoreColumns = c
return b
}

func (b *SourceTableBuilder) Create() error {
q := strings.Builder{}
q.WriteString(fmt.Sprintf(`CREATE TABLE %s`, b.QualifiedName()))
Expand All @@ -135,9 +141,23 @@ func (b *SourceTableBuilder) Create() error {

q.WriteString(")")

var options []string

if len(b.textColumns) > 0 {
c := strings.Join(b.textColumns, ", ")
q.WriteString(fmt.Sprintf(` WITH (TEXT COLUMNS [%s])`, c))
s := strings.Join(b.textColumns, ", ")
options = append(options, fmt.Sprintf(`TEXT COLUMNS (%s)`, s))
}

// TODO: Implement logic to only use IGNORE COLUMNS if the source is a MySQL source
if len(b.ignoreColumns) > 0 {
s := strings.Join(b.ignoreColumns, ", ")
options = append(options, fmt.Sprintf(`IGNORE COLUMNS (%s)`, s))
}

if len(options) > 0 {
q.WriteString(" WITH (")
q.WriteString(strings.Join(options, ", "))
q.WriteString(")")
}

q.WriteString(`;`)
Expand Down
7 changes: 6 additions & 1 deletion pkg/provider/acceptance_source_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func TestAccSourceTableMySQL_basic(t *testing.T) {
resource.TestCheckResourceAttr("materialize_source_table.test_mysql", "schema_name", "public"),
resource.TestCheckResourceAttr("materialize_source_table.test_mysql", "upstream_name", "mysql_table1"),
resource.TestCheckResourceAttr("materialize_source_table.test_mysql", "upstream_schema_name", "shop"),
resource.TestCheckResourceAttr("materialize_source_table.test_mysql", "ignore_columns.#", "1"),
resource.TestCheckResourceAttr("materialize_source_table.test_mysql", "ignore_columns.0", "banned"),
// resource.TestCheckResourceAttr("materialize_source_table.test_mysql", "text_columns.#", "1"),
// resource.TestCheckResourceAttr("materialize_source_table.test_mysql", "text_columns.0", "about"),
),
},
},
Expand Down Expand Up @@ -221,7 +225,7 @@ func testAccSourceTableMySQLBasicResource(nameSpace string) string {
mysql_connection {
name = materialize_connection_mysql.mysql_connection.name
}
table {
upstream_name = "mysql_table1"
upstream_schema_name = "shop"
Expand All @@ -240,6 +244,7 @@ func testAccSourceTableMySQLBasicResource(nameSpace string) string {
upstream_name = "mysql_table1"
upstream_schema_name = "shop"
ignore_columns = ["banned"]
}
`, nameSpace)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/resources/resource_source_load_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ var sourceLoadgenSchema = map[string]*schema.Schema{
ConflictsWith: []string{"counter_options", "auction_options", "marketing_options", "tpch_options"},
},
"all_tables": {
Description: "Whether to include all tables in the source. Compatible with `auction_options`, `marketing_options`, and `tpch_options`. IF not specified, use the `materialize_source_table` resource to specify tables to include.",
Description: "Whether to include all tables in the source. Compatible with `auction_options`, `marketing_options`, and `tpch_options`. If not specified, use the `materialize_source_table` resource to specify tables to include.",
Type: schema.TypeBool,
Optional: true,
Default: false,
Expand Down
1 change: 1 addition & 0 deletions pkg/resources/resource_source_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var sourceMySQLSchema = map[string]*schema.Schema{
}),
"ignore_columns": {
Description: "Ignore specific columns when reading data from MySQL. Can only be updated in place when also updating a corresponding `table` attribute.",
Deprecated: "Use the new materialize_source_table resource instead.",
Type: schema.TypeList,
Elem: &schema.Schema{Type: schema.TypeString},
Optional: true,
Expand Down
14 changes: 14 additions & 0 deletions pkg/resources/resource_source_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ var sourceTableSchema = map[string]*schema.Schema{
Optional: true,
ForceNew: true,
},
"ignore_columns": {
Description: "Ignore specific columns when reading data from MySQL. Only compatible with MySQL sources, if the source is not MySQL, the attribute will be ignored.",
Type: schema.TypeList,
Elem: &schema.Schema{Type: schema.TypeString},
Optional: true,
},
"comment": CommentSchema(false),
"ownership_role": OwnershipRoleSchema(),
"region": RegionSchema(),
Expand Down Expand Up @@ -92,6 +98,14 @@ func sourceTableCreate(ctx context.Context, d *schema.ResourceData, meta any) di
b.TextColumns(textColumns)
}

if v, ok := d.GetOk("ignore_columns"); ok && len(v.([]interface{})) > 0 {
columns, err := materialize.GetSliceValueString("ignore_columns", v.([]interface{}))
if err != nil {
return diag.FromErr(err)
}
b.IgnoreColumns(columns)
}

if err := b.Create(); err != nil {
return diag.FromErr(err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/resources/resource_source_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var inSourceTable = map[string]interface{}{
"upstream_name": "upstream_table",
"upstream_schema_name": "upstream_schema",
"text_columns": []interface{}{"column1", "column2"},
"ignore_columns": []interface{}{"column3", "column4"},
}

func TestResourceSourceTableCreate(t *testing.T) {
Expand All @@ -39,7 +40,7 @@ func TestResourceSourceTableCreate(t *testing.T) {
`CREATE TABLE "database"."schema"."table"
FROM SOURCE "materialize"."public"."source"
\(REFERENCE "upstream_schema"."upstream_table"\)
WITH \(TEXT COLUMNS \(column1, column2\)\)`,
WITH \(TEXT COLUMNS \(column1, column2\), IGNORE COLUMNS \(column3, column4\)\);`,
).WillReturnResult(sqlmock.NewResult(1, 1))

// Query Id
Expand Down

0 comments on commit 37c17c8

Please sign in to comment.