Skip to content

Commit

Permalink
PR Review Updates
Browse files Browse the repository at this point in the history
- Address init-then-modify comments
  - also delete dependency-reduced-pom from being tracked
- Modify datasource capabilities after considering splittable limitations
  - For connectors with > 1 split, we can't do partial pushdown of limits
    or top n
  - So, remove them from the capabilities, but keep them in the record
    handlers so they can be evaluated at the source, just not removed from
    the query plans
  - Also make FILTER pushdown more granular to account for DDB limitations
- Adding default column for constant expression
- Add default static var for no limit
- Remove unnecessary override now that we are not pushing limits for ddb
- Deleted unneccessary comments and added attribution to standardfunctions
- Move FederationExpressionParser from SDK to athena-jdbc
- Additionally, delete stubs from ElasticSearch for now.
- Rename LIMIT pushdown subtype to INTEGER_CONSTANT
- Reduce scope of catch block for serde incompatibility
- Make all predicate pushdown enhancement changes java8 compliant
- Update supported filter pushdown subtypes for jdbc connectors
  • Loading branch information
Michael Hackett committed Apr 17, 2023
1 parent 3c6a623 commit 8265091
Show file tree
Hide file tree
Showing 71 changed files with 266 additions and 427 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.List;
import java.util.Map;

import static com.amazonaws.athena.connector.lambda.domain.predicate.Constraints.DEFAULT_NO_LIMIT;
import static com.amazonaws.athena.connector.lambda.metadata.ListTablesRequest.UNLIMITED_PAGE_SIZE_VALUE;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -199,7 +200,7 @@ public void doGetSplits()
new TableName("schema1", "table1"),
mockBlock,
Collections.emptyList(),
new Constraints(new HashMap<>(), Collections.emptyList(), Collections.emptyList(), -1),
new Constraints(new HashMap<>(), Collections.emptyList(), Collections.emptyList(), DEFAULT_NO_LIMIT),
null);

GetSplitsResponse response = handler.doGetSplits(blockAllocator, request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.Collections;
import java.util.UUID;

import static com.amazonaws.athena.connector.lambda.domain.predicate.Constraints.DEFAULT_NO_LIMIT;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -114,7 +115,7 @@ public void readWithConstraint()
.withQueryId(UUID.randomUUID().toString())
.withIsDirectory(true)
.build(), keyFactory.create()).build(),
new Constraints(Collections.EMPTY_MAP, Collections.emptyList(), Collections.emptyList(), -1),
new Constraints(Collections.EMPTY_MAP, Collections.emptyList(), Collections.emptyList(), DEFAULT_NO_LIMIT),
100_000,
100_000);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import java.util.Map;
import java.util.UUID;

import static com.amazonaws.athena.connector.lambda.domain.predicate.Constraints.DEFAULT_NO_LIMIT;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.nullable;
Expand Down Expand Up @@ -187,7 +188,7 @@ public void readTableTest()
EquatableValueSet.newBuilder(allocator, Types.MinorType.VARCHAR.getType(), true, false)
.add(idValue).build());

Constraints constraints = new Constraints(constraintsMap, Collections.emptyList(), Collections.emptyList(), -1);
Constraints constraints = new Constraints(constraintsMap, Collections.emptyList(), Collections.emptyList(), DEFAULT_NO_LIMIT);

ConstraintEvaluator evaluator = new ConstraintEvaluator(allocator, response.getSchema(), constraints);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import static com.amazonaws.athena.connectors.cloudwatch.metrics.tables.Table.PERIOD_FIELD;
import static com.amazonaws.athena.connectors.cloudwatch.metrics.tables.Table.STATISTIC_FIELD;
import static com.amazonaws.athena.connectors.cloudwatch.metrics.tables.Table.TIMESTAMP_FIELD;
import static com.amazonaws.athena.connector.lambda.domain.predicate.Constraints.DEFAULT_NO_LIMIT;
import static org.junit.Assert.*;

public class MetricUtilsTest
Expand Down Expand Up @@ -97,7 +98,7 @@ public void applyMetricConstraints()
constraintsMap.put(DIMENSION_NAME_FIELD, makeStringEquals(allocator, "match4"));
constraintsMap.put(DIMENSION_VALUE_FIELD, makeStringEquals(allocator, "match5"));

ConstraintEvaluator constraintEvaluator = new ConstraintEvaluator(allocator, schema, new Constraints(constraintsMap, Collections.emptyList(), Collections.emptyList(), -1));
ConstraintEvaluator constraintEvaluator = new ConstraintEvaluator(allocator, schema, new Constraints(constraintsMap, Collections.emptyList(), Collections.emptyList(), DEFAULT_NO_LIMIT));

Metric metric = new Metric()
.withNamespace("match1")
Expand Down Expand Up @@ -139,7 +140,7 @@ public void pushDownPredicate()
constraintsMap.put(DIMENSION_VALUE_FIELD, makeStringEquals(allocator, "match5"));

ListMetricsRequest request = new ListMetricsRequest();
MetricUtils.pushDownPredicate(new Constraints(constraintsMap, Collections.emptyList(), Collections.emptyList(), -1), request);
MetricUtils.pushDownPredicate(new Constraints(constraintsMap, Collections.emptyList(), Collections.emptyList(), DEFAULT_NO_LIMIT), request);

assertEquals("match1", request.getNamespace());
assertEquals("match2", request.getMetricName());
Expand Down Expand Up @@ -191,7 +192,7 @@ public void makeGetMetricDataRequest()
new TableName(schema, table),
schemaForRead,
split,
new Constraints(constraintsMap, Collections.emptyList(), Collections.emptyList(), -1),
new Constraints(constraintsMap, Collections.emptyList(), Collections.emptyList(), DEFAULT_NO_LIMIT),
100_000_000_000L, //100GB don't expect this to spill
100_000_000_000L
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.util.List;
import java.util.Map;

import static com.amazonaws.athena.connector.lambda.domain.predicate.Constraints.DEFAULT_NO_LIMIT;
import static com.amazonaws.athena.connector.lambda.metadata.ListTablesRequest.UNLIMITED_PAGE_SIZE_VALUE;
import static com.amazonaws.athena.connectors.cloudwatch.metrics.MetricStatSerDe.SERIALIZED_METRIC_STATS_FIELD_NAME;
import static com.amazonaws.athena.connectors.cloudwatch.metrics.tables.Table.METRIC_NAME_FIELD;
Expand Down Expand Up @@ -197,7 +198,7 @@ public void doGetTableLayout()
"queryId",
"default",
new TableName(defaultSchema, "metrics"),
new Constraints(constraintsMap, Collections.emptyList(), Collections.emptyList(), -1),
new Constraints(constraintsMap, Collections.emptyList(), Collections.emptyList(), DEFAULT_NO_LIMIT),
SchemaBuilder.newBuilder().build(),
Collections.EMPTY_SET);

Expand Down Expand Up @@ -230,7 +231,7 @@ public void doGetMetricsSplits()
new TableName(defaultSchema, "metrics"),
partitions,
Collections.singletonList("partitionId"),
new Constraints(new HashMap<>(), Collections.emptyList(), Collections.emptyList(), -1),
new Constraints(new HashMap<>(), Collections.emptyList(), Collections.emptyList(), DEFAULT_NO_LIMIT),
continuationToken);
int numContinuations = 0;
do {
Expand Down Expand Up @@ -304,7 +305,7 @@ public void doGetMetricSamplesSplits()
new TableName(defaultSchema, "metric_samples"),
partitions,
Collections.singletonList("partitionId"),
new Constraints(constraintsMap, Collections.emptyList(), Collections.emptyList(), -1),
new Constraints(constraintsMap, Collections.emptyList(), Collections.emptyList(), DEFAULT_NO_LIMIT),
continuationToken);

int numContinuations = 0;
Expand Down Expand Up @@ -371,7 +372,7 @@ public void doGetMetricSamplesSplitsEmptyMetrics()
new TableName(defaultSchema, "metric_samples"),
partitions,
Collections.singletonList("partitionId"),
new Constraints(constraintsMap, Collections.emptyList(), Collections.emptyList(), -1),
new Constraints(constraintsMap, Collections.emptyList(), Collections.emptyList(), DEFAULT_NO_LIMIT),
null);

GetSplitsRequest req = new GetSplitsRequest(originalReq, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import static com.amazonaws.athena.connectors.cloudwatch.metrics.tables.Table.NAMESPACE_FIELD;
import static com.amazonaws.athena.connectors.cloudwatch.metrics.tables.Table.PERIOD_FIELD;
import static com.amazonaws.athena.connectors.cloudwatch.metrics.tables.Table.STATISTIC_FIELD;
import static com.amazonaws.athena.connector.lambda.domain.predicate.Constraints.DEFAULT_NO_LIMIT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -215,7 +216,7 @@ public void readMetricsWithConstraint()
METRICS_TABLE_NAME,
METRIC_TABLE.getSchema(),
split,
new Constraints(constraintsMap, Collections.emptyList(), Collections.emptyList(), -1),
new Constraints(constraintsMap, Collections.emptyList(), Collections.emptyList(), DEFAULT_NO_LIMIT),
100_000_000_000L,
100_000_000_000L//100GB don't expect this to spill
);
Expand Down Expand Up @@ -291,7 +292,7 @@ public void readMetricSamplesWithConstraint()
METRIC_SAMPLES_TABLE_NAME,
METRIC_DATA_TABLE.getSchema(),
split,
new Constraints(constraintsMap, Collections.emptyList(), Collections.emptyList(), -1),
new Constraints(constraintsMap, Collections.emptyList(), Collections.emptyList(), DEFAULT_NO_LIMIT),
100_000_000_000L,
100_000_000_000L//100GB don't expect this to spill
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import java.util.Map;
import java.util.concurrent.TimeoutException;

import static com.amazonaws.athena.connector.lambda.domain.predicate.Constraints.DEFAULT_NO_LIMIT;
import static com.amazonaws.athena.connector.lambda.metadata.ListTablesRequest.UNLIMITED_PAGE_SIZE_VALUE;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.nullable;
Expand Down Expand Up @@ -333,7 +334,7 @@ else if (Integer.valueOf(request.getNextToken()) < 3) {
"queryId",
"default",
new TableName("schema-1", "all_log_streams"),
new Constraints(constraintsMap, Collections.emptyList(), Collections.emptyList(), -1),
new Constraints(constraintsMap, Collections.emptyList(), Collections.emptyList(), DEFAULT_NO_LIMIT),
schema,
Collections.singleton("log_stream"));

Expand Down Expand Up @@ -378,7 +379,7 @@ public void doGetSplits()
new TableName("schema", "all_log_streams"),
partitions,
Collections.singletonList(CloudwatchMetadataHandler.LOG_STREAM_FIELD),
new Constraints(new HashMap<>(), Collections.emptyList(), Collections.emptyList(), -1),
new Constraints(new HashMap<>(), Collections.emptyList(), Collections.emptyList(), DEFAULT_NO_LIMIT),
continuationToken);
int numContinuations = 0;
do {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import java.util.Map;
import java.util.UUID;

import static com.amazonaws.athena.connector.lambda.domain.predicate.Constraints.DEFAULT_NO_LIMIT;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.nullable;
Expand Down Expand Up @@ -211,7 +212,7 @@ public void doReadRecordsNoSpill()
.withIsDirectory(true)
.build(),
keyFactory.create()).add(CloudwatchMetadataHandler.LOG_STREAM_FIELD, "table").build(),
new Constraints(constraintsMap, Collections.emptyList(), Collections.emptyList(), -1),
new Constraints(constraintsMap, Collections.emptyList(), Collections.emptyList(), DEFAULT_NO_LIMIT),
100_000_000_000L,
100_000_000_000L//100GB don't expect this to spill
);
Expand Down Expand Up @@ -251,7 +252,7 @@ public void doReadRecordsSpill()
.withIsDirectory(true)
.build(),
keyFactory.create()).add(CloudwatchMetadataHandler.LOG_STREAM_FIELD, "table").build(),
new Constraints(constraintsMap, Collections.emptyList(), Collections.emptyList(), -1),
new Constraints(constraintsMap, Collections.emptyList(), Collections.emptyList(), DEFAULT_NO_LIMIT),
1_500_000L, //~1.5MB so we should see some spill
0
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.util.HashMap;
import java.util.List;

import static com.amazonaws.athena.connector.lambda.domain.predicate.Constraints.DEFAULT_NO_LIMIT;
import static com.amazonaws.athena.connector.lambda.metadata.ListTablesRequest.UNLIMITED_PAGE_SIZE_VALUE;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -249,7 +250,7 @@ public void doGetTableLayout()
QUERY_ID,
DEFAULT_CATALOG,
TABLE_NAME,
new Constraints(new HashMap<>(), Collections.emptyList(), Collections.emptyList(), -1),
new Constraints(new HashMap<>(), Collections.emptyList(), Collections.emptyList(), DEFAULT_NO_LIMIT),
schema,
Collections.EMPTY_SET);

Expand Down Expand Up @@ -278,7 +279,7 @@ public void doGetSplits()
TABLE_NAME,
partitions,
partitionCols,
new Constraints(new HashMap<>(), Collections.emptyList(), Collections.emptyList(), -1),
new Constraints(new HashMap<>(), Collections.emptyList(), Collections.emptyList(), DEFAULT_NO_LIMIT),
null);

GetSplitsRequest req = new GetSplitsRequest(originalReq, continuationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import java.util.UUID;

import static com.amazonaws.athena.connectors.docdb.DocDBMetadataHandler.DOCDB_CONN_STR;
import static com.amazonaws.athena.connector.lambda.domain.predicate.Constraints.DEFAULT_NO_LIMIT;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
Expand Down Expand Up @@ -266,7 +267,7 @@ public void doReadRecordsNoSpill()
TABLE_NAME,
schemaForRead,
Split.newBuilder(splitLoc, keyFactory.create()).add(DOCDB_CONN_STR, CONNECTION_STRING).build(),
new Constraints(constraintsMap, Collections.emptyList(), Collections.emptyList(), -1),
new Constraints(constraintsMap, Collections.emptyList(), Collections.emptyList(), DEFAULT_NO_LIMIT),
100_000_000_000L, //100GB don't expect this to spill
100_000_000_000L
);
Expand Down Expand Up @@ -320,7 +321,7 @@ public void doReadRecordsSpill()
TABLE_NAME,
schemaForRead,
Split.newBuilder(splitLoc, keyFactory.create()).add(DOCDB_CONN_STR, CONNECTION_STRING).build(),
new Constraints(constraintsMap, Collections.emptyList(), Collections.emptyList(), -1),
new Constraints(constraintsMap, Collections.emptyList(), Collections.emptyList(), DEFAULT_NO_LIMIT),
1_500_000L, //~1.5MB so we should see some spill
0L
);
Expand Down Expand Up @@ -420,7 +421,7 @@ public void nestedStructTest()
TABLE_NAME,
res.getSchema(),
Split.newBuilder(splitLoc, keyFactory.create()).add(DOCDB_CONN_STR, CONNECTION_STRING).build(),
new Constraints(constraintsMap, Collections.emptyList(), Collections.emptyList(), -1),
new Constraints(constraintsMap, Collections.emptyList(), Collections.emptyList(), DEFAULT_NO_LIMIT),
100_000_000_000L, //100GB don't expect this to spill
100_000_000_000L
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import com.amazonaws.athena.connector.lambda.domain.predicate.ValueSet;
import com.amazonaws.athena.connector.lambda.domain.spill.SpillLocation;
import com.amazonaws.athena.connector.lambda.handlers.GlueMetadataHandler;
import com.amazonaws.athena.connector.lambda.metadata.GetDataSourceCapabilitiesRequest;
import com.amazonaws.athena.connector.lambda.metadata.GetDataSourceCapabilitiesResponse;
import com.amazonaws.athena.connector.lambda.metadata.GetSplitsRequest;
import com.amazonaws.athena.connector.lambda.metadata.GetSplitsResponse;
import com.amazonaws.athena.connector.lambda.metadata.GetTableLayoutRequest;
Expand All @@ -43,10 +41,6 @@
import com.amazonaws.athena.connector.lambda.metadata.ListTablesRequest;
import com.amazonaws.athena.connector.lambda.metadata.ListTablesResponse;
import com.amazonaws.athena.connector.lambda.metadata.glue.GlueFieldLexer;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.DataSourceOptimizations;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.OptimizationSubType;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.FilterPushdownSubType;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.LimitPushdownSubType;
import com.amazonaws.athena.connector.lambda.security.EncryptionKeyFactory;
import com.amazonaws.athena.connectors.dynamodb.constants.DynamoDBConstants;
import com.amazonaws.athena.connectors.dynamodb.model.DynamoDBIndex;
Expand Down Expand Up @@ -168,19 +162,6 @@ public DynamoDBMetadataHandler(java.util.Map<String, String> configOptions)
this.tableResolver = new DynamoDBTableResolver(invoker, ddbClient);
}

@Override
public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAllocator allocator, GetDataSourceCapabilitiesRequest request)
{
Map<String, List<OptimizationSubType>> capabilities = new HashMap<>();
capabilities.putAll(DataSourceOptimizations.SUPPORTS_LIMIT_PUSHDOWN.withSupportedSubTypes(
LimitPushdownSubType.ALL
));
capabilities.putAll(DataSourceOptimizations.SUPPORTS_FILTER_PUSHDOWN.withSupportedSubTypes(
FilterPushdownSubType.ALL
));
return new GetDataSourceCapabilitiesResponse(request.getCatalogName(), capabilities);
}

/**
* Since DynamoDB does not have "schemas" or "databases", this lists all the Glue databases (if not
* disabled) that contain {@value #DYNAMO_DB_FLAG} in their URIs . Otherwise returns just a "default" schema.
Expand Down
Loading

0 comments on commit 8265091

Please sign in to comment.