Skip to content

Commit

Permalink
Add support for the PLACEMENT KEY column constraint in Spanner import…
Browse files Browse the repository at this point in the history
…/export templates (GoogleCloudPlatform#1792)

* Support PLACEMENT schema objects in Spanner's import/export templates

* Support import/export of the PLACEMENT KEY column constraint

* Support PLACEMENT schema objects in Spanner's import/export templates
for GoogleSQL

* Added tests

* Ran maven spotless command

* Reverted SpannerServerResource overrides

* Updated InformationSchemaScannerTest

* Fixing PG ITs

---------

Co-authored-by: Shirdon Gorse <[email protected]>
  • Loading branch information
sgorse123 and sgorse12 authored Aug 22, 2024
1 parent b3c32a7 commit 9dabee7
Show file tree
Hide file tree
Showing 9 changed files with 365 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_ON_DELETE_ACTION;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_OPTION;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_PARENT;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_PLACEMENT_KEY;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_PRIMARY_KEY;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_REMOTE;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_SEQUENCE_COUNTER_START;
Expand Down Expand Up @@ -305,6 +306,10 @@ public Table toTable(String tableName, Schema schema) {
String defaultExpression = f.getProp(DEFAULT_EXPRESSION);
column.parseType(sqlType).notNull(!nullable).defaultExpression(defaultExpression);
}
String placementKey = f.getProp(SPANNER_PLACEMENT_KEY);
if (placementKey != null) {
column.isPlacementKey(Boolean.parseBoolean(placementKey));
}
ImmutableList.Builder<String> columnOptions = ImmutableList.builder();
for (int i = 0; ; i++) {
String spannerOption = f.getProp(SPANNER_OPTION + i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ private AvroUtil() {}
public static final String SPANNER_NAMED_SCHEMA = "spannerNamedSchema";
public static final String SPANNER_NAME = "spannerName";
public static final String STORED = "stored";
public static final String SPANNER_PLACEMENT_KEY = "spannerPlacementKey";
public static final String HIDDEN = "hidden";

public static Schema unpackNullable(Schema schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_ON_DELETE_ACTION;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_OPTION;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_PARENT;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_PLACEMENT_KEY;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_PRIMARY_KEY;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_REMOTE;
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_SEQUENCE_COUNTER_START;
Expand Down Expand Up @@ -154,6 +155,9 @@ public Collection<Schema> convert(Ddl ddl) {
for (int i = 0; i < cm.columnOptions().size(); i++) {
fieldBuilder.prop(SPANNER_OPTION + i, cm.columnOptions().get(i));
}
if (cm.isPlacementKey()) {
fieldBuilder.prop(SPANNER_PLACEMENT_KEY, Boolean.toString(cm.isPlacementKey()));
}
if (cm.isGenerated()) {
fieldBuilder.prop(NOT_NULL, Boolean.toString(cm.notNull()));
fieldBuilder.prop(GENERATION_EXPRESSION, cm.generationExpression());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public abstract class Column implements Serializable {

public abstract boolean isStored();

public abstract boolean isPlacementKey();

public abstract Dialect dialect();

public abstract boolean isHidden();
Expand All @@ -68,7 +70,8 @@ public static Builder builder(Dialect dialect) {
.isGenerated(false)
.isHidden(false)
.generationExpression("")
.isStored(false);
.isStored(false)
.isPlacementKey(false);
}

public static Builder builder() {
Expand Down Expand Up @@ -102,6 +105,9 @@ public void prettyPrint(Appendable appendable) throws IOException {
appendable.append(" STORED");
}
}
if (isPlacementKey()) {
appendable.append(" PLACEMENT KEY");
}
if (isHidden()) {
if (dialect() == Dialect.GOOGLE_STANDARD_SQL) {
appendable.append(" HIDDEN");
Expand Down Expand Up @@ -195,6 +201,12 @@ public Builder stored() {
return isStored(true);
}

public abstract Builder isPlacementKey(boolean isPlacementKey);

public Builder placementKey() {
return isPlacementKey(true);
}

public abstract Column autoBuild();

public Builder int64() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,11 @@ private void listColumns(Ddl.Builder builder) {
boolean isStored = !resultSet.isNull(8) && resultSet.getString(8).equalsIgnoreCase("YES");
String defaultExpression = resultSet.isNull(9) ? null : resultSet.getString(9);
boolean isHidden = dialect == Dialect.GOOGLE_STANDARD_SQL ? resultSet.getBoolean(10) : false;
boolean isPlacementKey =
dialect == Dialect.GOOGLE_STANDARD_SQL
? resultSet.getBoolean(11)
: resultSet.getBoolean(10);

builder
.createTable(tableName)
.column(columnName)
Expand All @@ -320,38 +325,55 @@ private void listColumns(Ddl.Builder builder) {
.generationExpression(generationExpression)
.isStored(isStored)
.defaultExpression(defaultExpression)
.isPlacementKey(isPlacementKey)
.endColumn()
.endTable();
}
}

@VisibleForTesting
Statement listColumnsSQL() {
StringBuilder sb = new StringBuilder();
sb.append(
"WITH placementkeycolumns AS ("
+ " SELECT c.table_name, c.column_name, c.constraint_name"
+ " FROM information_schema.constraint_column_usage AS c"
+ " WHERE c.constraint_name = CONCAT('PLACEMENT_KEY_', c.table_name)"
+ ") ");
switch (dialect) {
case GOOGLE_STANDARD_SQL:
return Statement.of(
sb.append(
"SELECT c.table_schema, c.table_name, c.column_name,"
+ " c.ordinal_position, c.spanner_type, c.is_nullable,"
+ " c.is_generated, c.generation_expression, c.is_stored,"
+ " c.column_default, c.is_hidden"
+ " c.column_default, c.is_hidden,"
+ " pkc.constraint_name IS NOT NULL AS is_placement_key"
+ " FROM information_schema.columns as c"
+ " LEFT JOIN placementkeycolumns AS pkc"
+ " ON c.table_name = pkc.table_name AND c.column_name = pkc.column_name"
+ " WHERE c.table_schema NOT IN"
+ " ('INFORMATION_SCHEMA', 'SPANNER_SYS')"
+ " AND c.spanner_state = 'COMMITTED' "
+ " ORDER BY c.table_name, c.ordinal_position");
break;
case POSTGRESQL:
return Statement.of(
sb.append(
"SELECT c.table_schema, c.table_name, c.column_name,"
+ " c.ordinal_position, c.spanner_type, c.is_nullable,"
+ " c.is_generated, c.generation_expression, c.is_stored, c.column_default"
+ " c.is_generated, c.generation_expression, c.is_stored, c.column_default,"
+ " pkc.constraint_name IS NOT NULL AS is_placement_key"
+ " FROM information_schema.columns as c"
+ " LEFT JOIN placementkeycolumns AS pkc"
+ " ON c.table_name = pkc.table_name AND c.column_name = pkc.column_name"
+ " WHERE c.table_schema NOT IN "
+ " ('information_schema', 'spanner_sys', 'pg_catalog') "
+ " AND c.spanner_state = 'COMMITTED' "
+ " ORDER BY c.table_name, c.ordinal_position");
break;
default:
throw new IllegalArgumentException("Unrecognized dialect: " + dialect);
}
return Statement.of(sb.toString());
}

private void listIndexes(Map<String, NavigableMap<String, Index.Builder>> indexes) {
Expand Down Expand Up @@ -1447,10 +1469,18 @@ private void listPlacements(Ddl.Builder builder) {
Map<String, ImmutableList.Builder<String>> placementNameToOptions = Maps.newHashMap();
while (resultSet.next()) {
String name = resultSet.getString(0);
boolean isDefault = resultSet.getBoolean(1);
if (isDefault) {
// Skip `default` placement as this is not created by user DDL.
continue;
if (dialect == Dialect.GOOGLE_STANDARD_SQL) {
boolean isDefault = resultSet.getBoolean(1);
if (isDefault) {
// Skip `default` placement as this is not created by user DDL.
continue;
}
} else {
String isDefault = resultSet.getString(1);
if (isDefault.equals("YES")) {
// Skip `default` placement as this is not created by user DDL.
continue;
}
}
String optionName = resultSet.getString(2);
String optionType = resultSet.getString(3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1008,4 +1008,138 @@ public void placements() {
"\nCREATE PLACEMENT `Placement1`\n\t"
+ " OPTIONS (instance_partition='mr-partition', default_leader='us-east1')"));
}

@Test
public void placementTable() {
String placementKeyAsPrimaryKeyAvroString =
"{"
+ " \"type\" : \"record\","
+ " \"name\" : \"PlacementKeyAsPrimaryKey\","
+ " \"namespace\" : \"spannertest\","
+ " \"fields\" : [ {"
+ " \"name\" : \"location\","
+ " \"type\" : \"string\","
+ " \"sqlType\" : \"STRING(MAX)\","
+ " \"notNull\" : \"true\","
+ " \"spannerPlacementKey\" : \"true\""
+ " }, {"
+ " \"name\" : \"val\","
+ " \"type\" : \"string\","
+ " \"sqlType\" : \"STRING(MAX)\","
+ " \"notNull\" : \"true\""
+ " }],"
+ " \"googleStorage\" : \"CloudSpanner\","
+ " \"spannerParent\" : \"\","
+ " \"googleFormatVersion\" : \"booleans\","
+ " \"spannerPrimaryKey_0\" : \"`location` ASC\""
+ "}";

String placementKeyAsNonPrimaryKeyAvroString =
"{"
+ " \"type\" : \"record\","
+ " \"name\" : \"UsersWithPlacement\","
+ " \"namespace\" : \"spannertest\","
+ " \"fields\" : [ {"
+ " \"name\" : \"id\","
+ " \"type\" : \"long\","
+ " \"sqlType\" : \"INT64\""
+ " }, {"
+ " \"name\" : \"location\","
+ " \"type\" : \"string\","
+ " \"sqlType\" : \"STRING(MAX)\","
+ " \"spannerPlacementKey\" : \"true\""
+ " }],"
+ " \"googleStorage\" : \"CloudSpanner\","
+ " \"spannerParent\" : \"\","
+ " \"googleFormatVersion\" : \"booleans\","
+ " \"spannerPrimaryKey_0\" : \"`id` ASC\""
+ "}";

Collection<Schema> schemas = new ArrayList<>();
Schema.Parser parser = new Schema.Parser();
schemas.add(parser.parse(placementKeyAsPrimaryKeyAvroString));
schemas.add(parser.parse(placementKeyAsNonPrimaryKeyAvroString));

AvroSchemaToDdlConverter converter = new AvroSchemaToDdlConverter();
Ddl ddl = converter.toDdl(schemas);
assertThat(ddl.allTables(), hasSize(2));
assertThat(
ddl.prettyPrint(),
equalToCompressingWhiteSpace(
"CREATE TABLE `PlacementKeyAsPrimaryKey` (\n\t"
+ "`location` STRING(MAX) NOT NULL PLACEMENT KEY,\n\t"
+ "`val` STRING(MAX) NOT NULL,\n"
+ ") PRIMARY KEY (`location` ASC)\n\n\n"
+ "CREATE TABLE `UsersWithPlacement` (\n\t"
+ "`id` INT64 NOT NULL,\n\t"
+ "`location` STRING(MAX) NOT NULL PLACEMENT KEY,\n"
+ ") PRIMARY KEY (`id` ASC)\n\n"));
}

@Test
public void pgPlacementTable() {
String placementKeyAsPrimaryKeyAvroString =
"{"
+ " \"type\" : \"record\","
+ " \"name\" : \"PlacementKeyAsPrimaryKey\","
+ " \"namespace\" : \"spannertest\","
+ " \"fields\" : [ {"
+ " \"name\" : \"location\","
+ " \"type\" : \"string\","
+ " \"sqlType\" : \"character varying\","
+ " \"spannerPlacementKey\" : \"true\""
+ " }, {"
+ " \"name\" : \"val\","
+ " \"type\" : \"string\","
+ " \"sqlType\" : \"character varying\","
+ " \"notNull\" : \"true\""
+ " }],"
+ " \"googleStorage\" : \"CloudSpanner\","
+ " \"spannerParent\" : \"\","
+ " \"googleFormatVersion\" : \"booleans\","
+ " \"spannerPrimaryKey_0\" : \"location ASC\""
+ "}";

String placementKeyAsNonPrimaryKeyAvroString =
"{"
+ " \"type\" : \"record\","
+ " \"name\" : \"UsersWithPlacement\","
+ " \"namespace\" : \"spannertest\","
+ " \"fields\" : [ {"
+ " \"name\" : \"id\","
+ " \"type\" : \"long\","
+ " \"sqlType\" : \"bigint\""
+ " }, {"
+ " \"name\" : \"location\","
+ " \"type\" : \"string\","
+ " \"sqlType\" : \"character varying\","
+ " \"notNull\" : \"true\","
+ " \"spannerPlacementKey\" : \"true\""
+ " }],"
+ " \"googleStorage\" : \"CloudSpanner\","
+ " \"spannerParent\" : \"\","
+ " \"googleFormatVersion\" : \"booleans\","
+ " \"spannerPrimaryKey_0\" : \"id ASC\""
+ "}";

Collection<Schema> schemas = new ArrayList<>();
Schema.Parser parser = new Schema.Parser();
schemas.add(parser.parse(placementKeyAsPrimaryKeyAvroString));
schemas.add(parser.parse(placementKeyAsNonPrimaryKeyAvroString));

AvroSchemaToDdlConverter converter = new AvroSchemaToDdlConverter(Dialect.POSTGRESQL);
Ddl ddl = converter.toDdl(schemas);
assertThat(ddl.allTables(), hasSize(2));
assertThat(
ddl.prettyPrint(),
equalToCompressingWhiteSpace(
"CREATE TABLE \"PlacementKeyAsPrimaryKey\" (\n\t"
+ "\"location\" character varying NOT NULL PLACEMENT KEY,\n\t"
+ "\"val\" character varying NOT NULL,\n\t"
+ "PRIMARY KEY (\"location\")\n)\n\n\n"
+ "CREATE TABLE \"UsersWithPlacement\" (\n\t"
+ "\"id\" bigint NOT NULL,\n\t"
+ "\"location\" character varying NOT NULL PLACEMENT KEY,\n\t"
+ "PRIMARY KEY (\"id\")\n)\n\n"));
}
}
Loading

0 comments on commit 9dabee7

Please sign in to comment.