diff --git a/.gitignore b/.gitignore index 6a6f22848..4c37298e0 100644 --- a/.gitignore +++ b/.gitignore @@ -40,3 +40,4 @@ gradle-app.setting # direnv config file - https://direnv.net/ /.envrc +logs diff --git a/build.gradle b/build.gradle index a96d69dde..a7db1cf95 100644 --- a/build.gradle +++ b/build.gradle @@ -165,7 +165,7 @@ allprojects { dependency 'io.netty:netty-tcnative-boringssl-static:2.0.28.Final' - dependencySet(group: 'io.grpc', version: '1.16.1') { + dependencySet(group: 'io.grpc', version: '1.35.0') { entry 'grpc-auth' entry 'grpc-core' entry 'grpc-netty' @@ -200,7 +200,7 @@ allprojects { entry 'google-cloud-core' entry 'google-cloud-core-grpc' } - dependency 'com.google.cloud.bigtable:bigtable-client-core:1.12.1' + dependency 'com.google.cloud.bigtable:bigtable-client-core:1.19.0' dependency 'com.addthis:stream-lib:3.0.0' dependency 'org.xerial.snappy:snappy-java:1.1.7.2' @@ -313,6 +313,7 @@ subprojects { } test { + testLogging.showStandardStreams = true testLogging { events "passed", "skipped", "failed", "standardOut", "standardError" outputs.upToDateWhen { false } diff --git a/docs/content/_docs/config.md b/docs/content/_docs/config.md index 106fc2b75..cd9d6998a 100644 --- a/docs/content/_docs/config.md +++ b/docs/content/_docs/config.md @@ -132,6 +132,7 @@ Precedence for each flag is defined as the following: The following features are available: #### com.spotify.heroic.deterministic_aggregations + {:.no_toc} Enable feature to only perform aggregations that can be performed with limited resources. Disabled by default. @@ -139,6 +140,7 @@ Enable feature to only perform aggregations that can be performed with limited r Aggregations are commonly performed per-shard, and the result concatenated. This enabled experimental support for distributed aggregations which behave transparently across shards. #### com.spotify.heroic.distributed_aggregations + {:.no_toc} Enable feature to perform distributed aggregations. Disabled by default. @@ -146,6 +148,7 @@ Enable feature to perform distributed aggregations. Disabled by default. Aggregations are commonly performed per-shard, and the result concatenated. This enables experimental support for distributed aggregations which behave transparently across shards. Typically this will cause more data to be transported across shards for each request. #### com.spotify.heroic.shift_range + {:.no_toc} Enable feature to cause range to be rounded on the current cadence. Enabled by default. @@ -153,6 +156,7 @@ Enable feature to cause range to be rounded on the current cadence. Enabled by d This will assert that there are data outside of the range queried for and that the range is aligned to the queried cadence. Which is a useful feature when using a dashboarding system. #### com.spotify.heroic.sliced_data_fetch + {:.no_toc} Enable feature to cause data to be fetched in slices. Enabled by default. @@ -160,6 +164,7 @@ Enable feature to cause data to be fetched in slices. Enabled by default. This will cause data to be fetched and consumed by the aggregation framework in pieces avoiding having to load all data into memory before starting to consume it. #### com.spotify.heroic.end_bucket_stategy + {:.no_toc} Enabled by default. @@ -167,6 +172,7 @@ Enabled by default. Use the legacy bucket strategy by default where the resulting value is at the end of the timestamp of the bucket. #### com.spotify.heroic.cache_query + {:.no_toc} Disabled by default. @@ -492,6 +498,39 @@ batchSize: # If set, the Bigtable client will be configured to use this address as a Bigtable emulator. # Default CBT emulator runs at: "localhost:8086" emulatorEndpoint: + +# Reference: https://cloud.google.com/bigtable/docs/hbase-client/javadoc/com/google/cloud/bigtable/config/CallOptionsConfig.Builder +# The amount of milliseconds to wait before issuing a client side timeout for mutation remote +# procedure calls. +# In other words, If timeouts are set, how many milliseconds should pass before a +# DEADLINE_EXCEEDED is thrown. The Google default is 600_000 ms (10 minutes). +# Currently, this feature is experimental. +mutateRpcTimeoutMs: int + +# ReadRowsRpcTimeoutMs +# The amount of milliseconds to wait before issuing a client side timeout for readRows streaming remote procedure calls. +# In other words, from https://github.com/hegemonic/cloud-bigtable-client/blob/master/bigtable-client-core-parent/bigtable-client-core/src/main/java/com/google/cloud/bigtable/config/CallOptionsConfig.java : +# The default duration to wait before timing out read stream RPC (default value: 12 hours). + +readRowsRpcTimeoutMs: int + +# ShortRpcTimeoutMs - The amount of milliseconds to wait before issuing a client side timeout for short remote procedure calls. +# In other words, the default duration to wait before timing out RPCs (default +# value: 60 seconds) +# from https://cloud.google.com/bigtable/docs/hbase-client/javadoc/com/google/cloud/bigtable /config/CallOptionsConfig#SHORT_TIMEOUT_MS_DEFAULT +shortRpcTimeoutMs: int + +# MaxScanTimeoutRetries +# The maximum number of times to retry after a scan timeout. +# https://cloud.google.com/bigtable/docs/hbase-client/javadoc/com/google/cloud/bigtable/config/RetryOptions.html#getmaxscantimeoutretries +# Default is 3. +maxScanTimeoutRetries: int + +# maxElapsedBackoffMs +# Maximum amount of time we will retry an operation that is failing. +# So if this is 5,000ms and we retry every 2,000ms, we would do 2 retries. +# Default is 60 seconds +maxElapsedBackoffMs: int ``` ##### `` @@ -542,18 +581,16 @@ Maximum number of distinct groups a single result group may contain. ##### seriesLimit -Maximum amount of time series a single request is allowed to fetch, per cluster (if federated). +Maximum amount of time series a single request is allowed to fetch, per cluster (if federated). A note: when using resource identifiers this limit only applies to the number of series found in the metadata backend, *not* the total series returned. It is therefore possible to have a low limit *not* be exceeded with the number of series found in metadata, however, return far more series from the metrics backend when resource identifiers are taken into account (which may trigger additional limits). -##### failOnLimits +##### failOnLimits When true, any limits applied will be reported as a failure. - - ### [``](#metadata_backend) Metadata acts as the index to time series data, it is the driving force behind our [Query Language](docs/query_language). @@ -717,7 +754,6 @@ sniff: default = false nodeSamplerInterval: default = 30s ``` - #### [Memory](#memory) An in-memory datastore. This is intended only for testing and is definitely not something you should run in production. @@ -970,6 +1006,7 @@ level: default = TRACE ``` #### Query log output + {:.no_toc} Each successful query will result in several output entries in the query log. Entries from different stages of the query. Example output: @@ -996,6 +1033,7 @@ Each successful query will result in several output entries in the query log. En | `data` | Contains data relevant to this query stage. This might for example be the original query, a partial response or the final response. #### Contextual information + {:.no_toc} It's possible to supply contextual information in the query. This information will then be included in the query log, to ease mapping of performed query to the query log output. @@ -1033,7 +1071,6 @@ Enable distributed tracing output of Heroic's operations. Tracing is instrumente A few tags are added to incoming requests such as the java version. If running on GCP, zone and region tags are added as well. - ```yaml # Probability, between 0.0 and 1.0, of sampling each trace. probability: default = 0.01 diff --git a/heroic-component/src/main/java/com/spotify/heroic/metric/MetricsConnectionSettings.kt b/heroic-component/src/main/java/com/spotify/heroic/metric/MetricsConnectionSettings.kt new file mode 100644 index 000000000..49c5850b1 --- /dev/null +++ b/heroic-component/src/main/java/com/spotify/heroic/metric/MetricsConnectionSettings.kt @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2015 Spotify AB. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.spotify.heroic.metric + +import com.spotify.heroic.metric.consts.ApiQueryConsts +import org.apache.commons.lang3.builder.ToStringBuilder +import org.apache.commons.lang3.builder.ToStringStyle +import java.util.* + +open class MetricsConnectionSettings( + maxWriteBatchSize: Optional, + mutateRpcTimeoutMs: Optional, + readRowsRpcTimeoutMs: Optional, + shortRpcTimeoutMs: Optional, + maxScanTimeoutRetries: Optional, + maxElapsedBackoffMs: Optional +) { + /** + * See [ApiQueryConsts.DEFAULT_MUTATE_RPC_TIMEOUT_MS] + */ + @JvmField + var mutateRpcTimeoutMs: Int + + /** + * See [ApiQueryConsts.DEFAULT_READ_ROWS_RPC_TIMEOUT_MS] + */ + @JvmField + var readRowsRpcTimeoutMs: Int + + /** + * See [ApiQueryConsts.DEFAULT_SHORT_RPC_TIMEOUT_MS] + */ + @JvmField + var shortRpcTimeoutMs: Int + + /** + * See [ApiQueryConsts.DEFAULT_MAX_SCAN_TIMEOUT_RETRIES] + */ + @JvmField + var maxScanTimeoutRetries: Int + + /** + * See [ApiQueryConsts.DEFAULT_MAX_ELAPSED_BACKOFF_MILLIS] + */ + @JvmField + var maxElapsedBackoffMs: Int + + /** + * See [MetricsConnectionSettings.DEFAULT_MUTATION_BATCH_SIZE] + */ + @JvmField + var maxWriteBatchSize: Int + + protected constructor() : this( + Optional.of(MAX_MUTATION_BATCH_SIZE), + Optional.of(ApiQueryConsts.DEFAULT_MUTATE_RPC_TIMEOUT_MS), + Optional.of(ApiQueryConsts.DEFAULT_READ_ROWS_RPC_TIMEOUT_MS), + Optional.of(ApiQueryConsts.DEFAULT_SHORT_RPC_TIMEOUT_MS), + Optional.of(ApiQueryConsts.DEFAULT_MAX_SCAN_TIMEOUT_RETRIES), + Optional.of(ApiQueryConsts.DEFAULT_MAX_ELAPSED_BACKOFF_MILLIS) + ) { + } + + override fun toString(): String { + return ToStringBuilder(this, ToStringStyle.MULTI_LINE_STYLE) + .append("maxWriteBatchSize", maxWriteBatchSize) + .append("mutateRpcTimeoutMs", mutateRpcTimeoutMs) + .append("readRowsRpcTimeoutMs", readRowsRpcTimeoutMs) + .append("shortRpcTimeoutMs", shortRpcTimeoutMs) + .append("maxScanTimeoutRetries", maxScanTimeoutRetries) + .append("maxElapsedBackoffMs", maxElapsedBackoffMs) + .toString() + } + + companion object { + /** + * default number of Cells for each batch mutation + */ + const val DEFAULT_MUTATION_BATCH_SIZE = 1000 + + /** + * maximum possible number of Cells for each batch mutation + */ + const val MAX_MUTATION_BATCH_SIZE = 100000 + + /** + * minimum possible number of Cells supported for each batch mutation + */ + const val MIN_MUTATION_BATCH_SIZE = 10 + @JvmStatic + fun createDefault(): MetricsConnectionSettings { + return MetricsConnectionSettings() + } + } + + init { + // Basically make sure that maxWriteBatchSize, if set, is sane + var maxWriteBatch = maxWriteBatchSize.orElse(DEFAULT_MUTATION_BATCH_SIZE) + maxWriteBatch = maxWriteBatch.coerceAtLeast(MIN_MUTATION_BATCH_SIZE) + maxWriteBatch = maxWriteBatch.coerceAtMost(MAX_MUTATION_BATCH_SIZE) + this.maxWriteBatchSize = maxWriteBatch + + this.mutateRpcTimeoutMs = + mutateRpcTimeoutMs.orElse(ApiQueryConsts.DEFAULT_MUTATE_RPC_TIMEOUT_MS) + this.readRowsRpcTimeoutMs = + readRowsRpcTimeoutMs.orElse(ApiQueryConsts.DEFAULT_READ_ROWS_RPC_TIMEOUT_MS) + this.shortRpcTimeoutMs = + shortRpcTimeoutMs.orElse(ApiQueryConsts.DEFAULT_SHORT_RPC_TIMEOUT_MS) + this.maxScanTimeoutRetries = + maxScanTimeoutRetries.orElse(ApiQueryConsts.DEFAULT_MAX_SCAN_TIMEOUT_RETRIES) + this.maxElapsedBackoffMs = + maxElapsedBackoffMs.orElse(ApiQueryConsts.DEFAULT_MAX_ELAPSED_BACKOFF_MILLIS) + } +} \ No newline at end of file diff --git a/heroic-component/src/main/java/com/spotify/heroic/metric/consts/ApiQueryConsts.java b/heroic-component/src/main/java/com/spotify/heroic/metric/consts/ApiQueryConsts.java new file mode 100644 index 000000000..58e802129 --- /dev/null +++ b/heroic-component/src/main/java/com/spotify/heroic/metric/consts/ApiQueryConsts.java @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2015 Spotify AB. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.heroic.metric.consts; + +import java.util.concurrent.TimeUnit; + +@SuppressWarnings({"LineLength"}) +public class ApiQueryConsts { + /** + * MutateRpcTimeoutMs + * + * The amount of milliseconds to wait before issuing a client side timeout + * for mutation remote procedure calls. + * + * Google default is 10 minutes (!!!) + * + *

Also defined as (source to be rediscovered...) : + * If timeouts are set, how many milliseconds should pass before a + * DEADLINE_EXCEEDED for a long mutation. Currently, this feature is experimental. + * + * @see CallOptionsConfig.Builder#MutateRpcTimeoutMs + */ + public static final int DEFAULT_MUTATE_RPC_TIMEOUT_MS = 600_000; + + /** + * ReadRowsRpcTimeoutMs + * + * The amount of milliseconds to wait before issuing a client side + * timeout for readRows streaming remote procedure calls. + * + *

AKA + * The default duration to wait before timing out read stream RPC + * (default value: 12 hour). + * @see ReadRowsRpcTimeoutMs + */ + public static final int DEFAULT_READ_ROWS_RPC_TIMEOUT_MS = (int) TimeUnit.HOURS.toMillis(12); + + /** + * ShortRpcTimeoutMs + * The amount of milliseconds to wait before issuing a client side timeout for + * short remote procedure calls. + * + *

AKA + * The default duration to wait before timing out RPCs (default Google value: 60 + * seconds) @see CallOptionsConfig.SHORT_TIMEOUT_MS_DEFAULT + * + * Accoding to our Google rep, this is the value that's used for all read + * operations that target exactly 1 row. Since Heroic never does this (it + * calls readRows()), this setting will have no effect. Note that + * MutateRpcTimeoutMs governs all write timeouts. + */ + public static final int DEFAULT_SHORT_RPC_TIMEOUT_MS = 60_000; + + /** + * Maximum number of times to retry after a scan timeout (Google default value: 10 retries). + * Note that we're going with 3 retries since that's what the common-config BT repo has. Note + * that that repo specifies "max-attempts" so we want 3-1 = 2. + * @see RetryOptions.DEFAULT_MAX_SCAN_TIMEOUT_RETRIES + */ + public static final int DEFAULT_MAX_SCAN_TIMEOUT_RETRIES = 10; + + /** + * Copy of RetryOptions#DEFAULT_INITIAL_BACKOFF_MILLIS + * so that we don't have to link/depend on the Google jar. + * We go with 10 since that's what common-config repo has. + *

+ * Initial amount of time to wait before retrying failed operations (default value: 5ms). + **/ + public static final int DEFAULT_INITIAL_BACKOFF_MILLIS = 10; + + /** + * Copy of com.google.cloud.bigtable.config.RetryOptions#DEFAULT_BACKOFF_MULTIPLIER + * So that we don't have to link/depend on the Google jar + *

+ * Multiplier to apply to wait times after failed retries (default value: 2.0). + * */ + public static final double DEFAULT_BACKOFF_MULTIPLIER = 2.0; + + /** + * A little "safety buffer" to err on the side of caution (against ceasing + * retrying prematurely). + */ + private static final int SAFETY_BUFFER_MILLIS = 25; + + /** + * Maximum amount of time to retry before failing the operation (Google default value: 600 + * seconds). + *

+ * From Adam Steele [adamsteele@google.com]: + * The operation will be retried until you hit either maxElapsedBackoffMs or (for scan + * operations) maxScanTimeoutRetries. + *

+ * So, we use com.google.cloud.bigtable.config.RetryOptions#DEFAULT_BACKOFF_MULTIPLIER + * and com.google.cloud.bigtable.config.RetryOptions#DEFAULT_INITIAL_BACKOFF_MILLIS + * to come up with a number of millis, assuming + * DEFAULT_READ_ROWS_RPC_TIMEOUT_MS is set to 4,000 : + *

+ * 4000 + (4000 * 1.5) + (4000 * 1.5 * 1.5) + 5 + 25 ms + * = 19_030 ms total potential wait for an operation which is pretty + * reasonable. + */ + public static final int DEFAULT_MAX_ELAPSED_BACKOFF_MILLIS = (int) + ((1 + DEFAULT_MAX_SCAN_TIMEOUT_RETRIES) * + DEFAULT_READ_ROWS_RPC_TIMEOUT_MS * + DEFAULT_BACKOFF_MULTIPLIER) + + DEFAULT_INITIAL_BACKOFF_MILLIS + + SAFETY_BUFFER_MILLIS; +} diff --git a/heroic-core/src/main/java/com/spotify/heroic/CoreQueryManager.java b/heroic-core/src/main/java/com/spotify/heroic/CoreQueryManager.java index 854fd9c8d..0a1abd489 100644 --- a/heroic-core/src/main/java/com/spotify/heroic/CoreQueryManager.java +++ b/heroic-core/src/main/java/com/spotify/heroic/CoreQueryManager.java @@ -127,6 +127,7 @@ public class CoreQueryManager implements QueryManager { private final long smallQueryThreshold; + @SuppressWarnings({"LineLength"}) @Inject public CoreQueryManager( @Named("features") final Features features, @@ -485,8 +486,7 @@ private BiFunction, FullQuery> getStoreTracesTransfo return (fullQuery, queryTraces) -> { /* We want to store the current QT + queryTraces list of QT's. * Create new QT with queryTraces + current QT as a children */ - final List traces = new ArrayList<>(); - traces.addAll(queryTraces); + final List traces = new ArrayList<>(queryTraces); traces.add(fullQuery.trace()); final QueryTrace newTrace = shardLocalWatch.end(traces); return fullQuery.withTrace(newTrace); @@ -549,7 +549,7 @@ private static T retryTraceHandlerNoop(T result, List traces) { public static final long INTERVAL_GOAL = 240; - private Duration cadenceFromRange(final DateRange range) { + private static Duration cadenceFromRange(final DateRange range) { final long diff = range.diff(); final long nominal = diff / INTERVAL_GOAL; diff --git a/heroic-core/src/main/java/com/spotify/heroic/http/tracing/OpenCensusUtils.java b/heroic-core/src/main/java/com/spotify/heroic/http/tracing/OpenCensusUtils.java index 027a3372f..a7ec5cc1f 100644 --- a/heroic-core/src/main/java/com/spotify/heroic/http/tracing/OpenCensusUtils.java +++ b/heroic-core/src/main/java/com/spotify/heroic/http/tracing/OpenCensusUtils.java @@ -39,14 +39,10 @@ static String formatProviders(Iterable providers) { .collect(Collectors.joining(", ")); } - @SuppressWarnings("checkstyle:LineLength") + @SuppressWarnings("LineLength") static Status mapStatusCode(int status) { - // @formatter:off - // // Mapping from: // https://github.com/census-instrumentation/opencensus-specs/blob/master/trace/HTTP.md#mapping-from-http-status-codes-to-trace-status-codes - // - // @formatter:on final Status traceStatus; if (status < 200) { traceStatus = Status.UNKNOWN; diff --git a/heroic-core/src/main/java/com/spotify/heroic/metric/MetricManagerModule.java b/heroic-core/src/main/java/com/spotify/heroic/metric/MetricManagerModule.java index 833a34932..ba26fa5c7 100644 --- a/heroic-core/src/main/java/com/spotify/heroic/metric/MetricManagerModule.java +++ b/heroic-core/src/main/java/com/spotify/heroic/metric/MetricManagerModule.java @@ -51,6 +51,7 @@ import org.slf4j.LoggerFactory; @Module +@SuppressWarnings({"LineLength"}) public class MetricManagerModule { private static final Logger log = LoggerFactory.getLogger(MetricManagerModule.class); @@ -101,6 +102,7 @@ public class MetricManagerModule { * Threshold for defining a "small" query, measured in pre-aggregation sample size */ private final long smallQueryThreshold; + private final MetricsConnectionSettings connectionSettings; private MetricManagerModule( List backends, @@ -112,7 +114,8 @@ private MetricManagerModule( OptionalLimit concurrentQueriesBackoff, int fetchParallelism, boolean failOnLimits, - long smallQueryThreshold + long smallQueryThreshold, + MetricsConnectionSettings connectionSettings ) { this.backends = backends; this.defaultBackends = defaultBackends; @@ -124,13 +127,14 @@ private MetricManagerModule( this.fetchParallelism = fetchParallelism; this.failOnLimits = failOnLimits; this.smallQueryThreshold = smallQueryThreshold; + this.connectionSettings = connectionSettings; log.info("Metric Manager Module: \n{}", toString()); } @Provides @MetricScope - public MetricBackendReporter reporter(HeroicReporter reporter) { + public static MetricBackendReporter reporter(HeroicReporter reporter) { return reporter.newMetricBackend(); } @@ -165,8 +169,8 @@ public List components( @Provides @MetricScope - public Set backends( - List components, MetricBackendReporter reporter + public static Set backends( + List components, MetricBackendReporter reporter ) { return ImmutableSet.copyOf(components .stream() @@ -178,7 +182,7 @@ public Set backends( @Provides @MetricScope @Named("metric") - public LifeCycle metricLife(List components) { + public static LifeCycle metricLife(List components) { return LifeCycle.combined(components.stream().map(MetricModule.Exposed::life)); } @@ -238,6 +242,13 @@ public long smallQueryThreshold() { return smallQueryThreshold; } + @Provides + @MetricScope + @Named("connectionSettings") + public MetricsConnectionSettings connectionSettings() { + return connectionSettings; + } + @Override public String toString() { return new ToStringBuilder(this, ToStringStyle.MULTI_LINE_STYLE) @@ -251,6 +262,7 @@ public String toString() { .append("fetchParallelism", fetchParallelism) .append("failOnLimits", failOnLimits) .append("smallQueryThreshold", smallQueryThreshold) + .append("connectionSettings", connectionSettings) .toString(); } @@ -259,6 +271,7 @@ public static Builder builder() { } public static class Builder { + private MetricsConnectionSettings connectionSettings; private Optional> backends = empty(); private Optional> defaultBackends = empty(); private OptionalLimit groupLimit = OptionalLimit.empty(); @@ -284,7 +297,12 @@ public Builder( @JsonProperty("concurrentQueriesBackoff") OptionalLimit concurrentQueriesBackoff, @JsonProperty("fetchParallelism") Optional fetchParallelism, @JsonProperty("failOnLimits") Optional failOnLimits, - @JsonProperty("smallQueryThreshold") Optional smallQueryThreshold + @JsonProperty("smallQueryThreshold") Optional smallQueryThreshold, + @JsonProperty("mutateRpcTimeoutMs") Optional mutateRpcTimeoutMs, + @JsonProperty("readRowsRpcTimeoutMs") Optional readRowsRpcTimeoutMs, + @JsonProperty("shortRpcTimeoutMs") Optional shortRpcTimeoutMs, + @JsonProperty("maxScanTimeoutRetries") Optional maxScanTimeoutRetries, + @JsonProperty("maxElapsedBackoffMs") Optional maxElapsedBackoffMs ) { this.backends = backends; this.defaultBackends = defaultBackends; @@ -296,6 +314,12 @@ public Builder( this.fetchParallelism = fetchParallelism; this.failOnLimits = failOnLimits; this.smallQueryThreshold = smallQueryThreshold; + + this.connectionSettings = new MetricsConnectionSettings( + Optional.ofNullable(null), + mutateRpcTimeoutMs, readRowsRpcTimeoutMs, shortRpcTimeoutMs, + maxScanTimeoutRetries, + maxElapsedBackoffMs); } public Builder backends(List backends) { @@ -349,20 +373,23 @@ public Builder smallQueryThreshold(long smallQueryThreshold) { } public Builder merge(final Builder o) { - // @formatter:off return new Builder( - mergeOptionalList(o.backends, backends), - mergeOptionalList(o.defaultBackends, defaultBackends), - groupLimit.orElse(o.groupLimit), - seriesLimit.orElse(o.seriesLimit), - aggregationLimit.orElse(o.aggregationLimit), - dataLimit.orElse(o.dataLimit), - concurrentQueriesBackoff.orElse(o.concurrentQueriesBackoff), - pickOptional(fetchParallelism, o.fetchParallelism), - pickOptional(failOnLimits, o.failOnLimits), - pickOptional(smallQueryThreshold, o.smallQueryThreshold) + mergeOptionalList(o.backends, backends), + mergeOptionalList(o.defaultBackends, defaultBackends), + groupLimit.orElse(o.groupLimit), + seriesLimit.orElse(o.seriesLimit), + aggregationLimit.orElse(o.aggregationLimit), + dataLimit.orElse(o.dataLimit), + concurrentQueriesBackoff.orElse(o.concurrentQueriesBackoff), + pickOptional(fetchParallelism, o.fetchParallelism), + pickOptional(failOnLimits, o.failOnLimits), + pickOptional(smallQueryThreshold, o.smallQueryThreshold), + Optional.of(connectionSettings.mutateRpcTimeoutMs), + Optional.of(connectionSettings.readRowsRpcTimeoutMs), + Optional.of(connectionSettings.shortRpcTimeoutMs), + Optional.of(connectionSettings.maxScanTimeoutRetries), + Optional.of(connectionSettings.maxElapsedBackoffMs) ); - // @formatter:on } public MetricManagerModule build() { @@ -377,7 +404,8 @@ public MetricManagerModule build() { concurrentQueriesBackoff, fetchParallelism.orElse(DEFAULT_FETCH_PARALLELISM), failOnLimits.orElse(DEFAULT_FAIL_ON_LIMITS), - smallQueryThreshold.orElse(DEFAULT_SMALL_QUERY_THRESHOLD) + smallQueryThreshold.orElse(DEFAULT_SMALL_QUERY_THRESHOLD), + connectionSettings ); // @formatter:on } diff --git a/heroic-core/src/test/java/com/spotify/heroic/CoreQueryManagerTest.java b/heroic-core/src/test/java/com/spotify/heroic/CoreQueryManagerTest.java index d474334af..1d70c918f 100644 --- a/heroic-core/src/test/java/com/spotify/heroic/CoreQueryManagerTest.java +++ b/heroic-core/src/test/java/com/spotify/heroic/CoreQueryManagerTest.java @@ -53,8 +53,8 @@ public void setup() { when(queryLoggerFactory.create(any())).thenReturn(queryLogger); manager = new CoreQueryManager(Features.empty(), async, Clock.system(), cluster, parser, - queryCache, aggregations, OptionalLimit.empty(), smallQueryThreshold, queryReporter, - Optional.empty(), queryLoggerFactory); + queryCache, aggregations, OptionalLimit.empty(), smallQueryThreshold, + queryReporter, Optional.empty(), queryLoggerFactory); } @Test diff --git a/heroic-dist/build.gradle b/heroic-dist/build.gradle index aa9748901..61901aa22 100644 --- a/heroic-dist/build.gradle +++ b/heroic-dist/build.gradle @@ -25,6 +25,9 @@ shadowJar { append 'META-INF/services/org.apache.lucene.analysis.util.CharFilterFactory' append 'META-INF/services/org.apache.lucene.analysis.util.TokenFilterFactory' append 'META-INF/services/org.apache.lucene.analysis.util.TokenizerFactory' + + // required for gRPC v1.35.0 + append 'META-INF/services/io.grpc.LoadBalancerProvider' } dependencies { diff --git a/heroic-dist/src/test/java/com/spotify/heroic/AbstractClusterQueryIT.java b/heroic-dist/src/test/java/com/spotify/heroic/AbstractClusterQueryIT.java index 6e8d09922..a7f29cc54 100644 --- a/heroic-dist/src/test/java/com/spotify/heroic/AbstractClusterQueryIT.java +++ b/heroic-dist/src/test/java/com/spotify/heroic/AbstractClusterQueryIT.java @@ -59,32 +59,27 @@ import org.junit.Test; - public abstract class AbstractClusterQueryIT extends AbstractLocalClusterIT { + private final static int RECORD_COUNT = 100_000; // Number of datapoint recorded in each tdigest + private static double EXPECTED_ERROR_RATE = 0.01d; private final Series s1 = new Series("key1", ImmutableSortedMap.of("shared", "a", "diff", "a"), - ImmutableSortedMap.of("resource", "a")); + ImmutableSortedMap.of("resource", "a")); private final Series s2 = new Series("key1", ImmutableSortedMap.of("shared", "a", "diff", "b"), - ImmutableSortedMap.of("resource", "b")); + ImmutableSortedMap.of("resource", "b")); private final Series s3 = new Series("key1", ImmutableSortedMap.of("shared", "a", "diff", "c"), - ImmutableSortedMap.of("resource", "c")); - - private final static int RECORD_COUNT = 100_000; // Number of datapoint recorded in each tdigest - - private final RandomData randDataset1 = HeroicDistributionGenerator.generateRandomDataset(RECORD_COUNT); - private final RandomData randDataset2 = HeroicDistributionGenerator.generateRandomDataset(RECORD_COUNT); - private final RandomData randDataset3 = HeroicDistributionGenerator.generateRandomDataset(RECORD_COUNT); - - private static double EXPECTED_ERROR_RATE = 0.01d; - + ImmutableSortedMap.of("resource", "c")); + private final RandomData randDataset1 = + HeroicDistributionGenerator.generateRandomDataset(RECORD_COUNT); + private final RandomData randDataset2 = + HeroicDistributionGenerator.generateRandomDataset(RECORD_COUNT); + private final RandomData randDataset3 = + HeroicDistributionGenerator.generateRandomDataset(RECORD_COUNT); + protected boolean cardinalitySupport = true; /* the number of queries run */ private int queryCount = 0; - private QueryManager query; - private QueryContext queryContext; - protected boolean cardinalitySupport = true; - protected void setupSupport() { } @@ -100,11 +95,11 @@ public final void setupAbstract() { @After public final void verifyLoggers() { final QueryLogger coreQueryManagerLogger = getQueryLogger("CoreQueryManager").orElseThrow( - () -> new AssertionError("Should have logger for CoreQueryManager")); + () -> new AssertionError("Should have logger for CoreQueryManager")); final QueryLogger localMetricManagerLogger = - getQueryLogger("LocalMetricManager").orElseThrow( - () -> new AssertionError("Should have logger for LocalMetricManager")); + getQueryLogger("LocalMetricManager").orElseThrow( + () -> new AssertionError("Should have logger for LocalMetricManager")); /* number of expected log-calls is related to the number of queries performed during the * test */ @@ -112,15 +107,15 @@ public final void verifyLoggers() { final int dataNodeCount = queryCount * 2; verify(coreQueryManagerLogger, times(apiNodeCount)).logQuery(any(QueryContext.class), - any(Query.class)); + any(Query.class)); verify(coreQueryManagerLogger, times(apiNodeCount)).logOutgoingRequestToShards( - any(QueryContext.class), any(FullQuery.Request.class)); + any(QueryContext.class), any(FullQuery.Request.class)); verify(localMetricManagerLogger, times(dataNodeCount)).logIncomingRequestAtNode( - any(QueryContext.class), any(FullQuery.Request.class)); + any(QueryContext.class), any(FullQuery.Request.class)); verify(localMetricManagerLogger, times(dataNodeCount)).logOutgoingResponseAtNode( - any(QueryContext.class), any(FullQuery.class)); + any(QueryContext.class), any(FullQuery.class)); verify(coreQueryManagerLogger, times(dataNodeCount)).logIncomingResponseFromShard( - any(QueryContext.class), any(FullQuery.class)); + any(QueryContext.class), any(FullQuery.class)); verifyNoMoreInteractions(coreQueryManagerLogger, localMetricManagerLogger); } @@ -128,9 +123,9 @@ public final void verifyLoggers() { @Override protected AsyncFuture prepareEnvironment() { final List ingestion = instances - .stream() - .map(i -> i.inject(IngestionComponent::ingestionManager)) - .collect(Collectors.toList()); + .stream() + .map(i -> i.inject(IngestionComponent::ingestionManager)) + .collect(Collectors.toList()); final List> writes = new ArrayList<>(); @@ -138,58 +133,58 @@ protected AsyncFuture prepareEnvironment() { final IngestionManager m2 = ingestion.get(1); writes.add(m1 - .useDefaultGroup() - .write(new Request(s1, Data.points().p(10, 1D).p(30, 2D).build()))); + .useDefaultGroup() + .write(new Request(s1, Data.points().p(10, 1D).p(30, 2D).build()))); writes.add(m2 - .useDefaultGroup() - .write(new Request(s2, Data.points().p(10, 1D).p(20, 4D).build()))); + .useDefaultGroup() + .write(new Request(s2, Data.points().p(10, 1D).p(20, 4D).build()))); writes.add(m1 - .useDefaultGroup() - .write(new Request(s2, Data.distributionPoints() - .p(10, randDataset1.getRandomData()) - .p(30, randDataset3.getRandomData()) - .build()))); + .useDefaultGroup() + .write(new Request(s2, Data.distributionPoints() + .p(10, randDataset1.getRandomData()) + .p(30, randDataset3.getRandomData()) + .build()))); writes.add(m2 - .useDefaultGroup() - .write(new Request(s1, Data.distributionPoints() - .p(10, randDataset1.getRandomData()) - .p(20, randDataset2.getRandomData()) - .build()))); + .useDefaultGroup() + .write(new Request(s1, Data.distributionPoints() + .p(10, randDataset1.getRandomData()) + .p(20, randDataset2.getRandomData()) + .build()))); return async.collectAndDiscard(writes); } public QueryResult query(final String queryString) throws Exception { return query(query.newQueryFromString(queryString), builder -> { - }, - MetricType.POINT, - true); + }, + MetricType.POINT, + true); } public QueryResult query(final String queryString, final Consumer modifier) - throws Exception { + throws Exception { return query(query.newQueryFromString(queryString), - modifier, - MetricType.POINT, - true); + modifier, + MetricType.POINT, + true); } public QueryResult query(final QueryBuilder builder, final Consumer modifier, final MetricType source, final boolean isDistributed) - throws Exception { + throws Exception { queryCount += 1; builder .source(Optional.of(source)) - .rangeIfAbsent(Optional.of(new QueryDateRange.Absolute(0, 40))); + .rangeIfAbsent(Optional.of(new QueryDateRange.Absolute(0, 40))); if (isDistributed) { builder - .features(Optional.of(FeatureSet.of(Feature.DISTRIBUTED_AGGREGATIONS))); + .features(Optional.of(FeatureSet.of(Feature.DISTRIBUTED_AGGREGATIONS))); } modifier.accept(builder); @@ -200,21 +195,22 @@ public QueryResult query(final QueryBuilder builder, * Aggregation that's not distributed usually returns one group. * But in case of tdigest, the number of group return is equal * to the number of stat that was computed. + * * @throws Exception */ @Test public void testSimpleTdigestAggregation() throws Exception { final QueryResult result = - query(query.newQueryFromString("tdigest(10ms)"), - builder -> { - }, - MetricType.DISTRIBUTION_POINTS, - true); + query(query.newQueryFromString("tdigest(10ms)"), + builder -> { + }, + MetricType.DISTRIBUTION_POINTS, + true); final int numberSeries = 2; // s1 and s2 final int datapointCount = 3; final long expectedCadence = 10L; - assertEquals(3,result.getGroups().size()); + assertEquals(3, result.getGroups().size()); for (ShardedResultGroup shardedResultGroup : result.getGroups()) { assertEquals(numberSeries, shardedResultGroup.getSeries().size()); @@ -225,8 +221,8 @@ public void testSimpleTdigestAggregation() throws Exception { final List cadences = getCadences(result); assertEquals(ImmutableList.of(expectedCadence, expectedCadence, expectedCadence), cadences); - Map mapRes = this.extractResult(result, - new ComputeDistributionStat.Percentile("P99", 0.99)); + Map mapRes = AbstractClusterQueryIT.extractResult(result, + new ComputeDistributionStat.Percentile("P99", 0.99)); // ensure that result is within the error margin validateStatAccuracy(mapRes); @@ -236,17 +232,17 @@ public void testSimpleTdigestAggregation() throws Exception { @Test public void testDistributedTdigestAggregation() throws Exception { final QueryResult result = - query(query.newQueryFromString("tdigest(10ms) by diff"), - builder -> { - }, - MetricType.DISTRIBUTION_POINTS, - true); + query(query.newQueryFromString("tdigest(10ms) by diff"), + builder -> { + }, + MetricType.DISTRIBUTION_POINTS, + true); final int expectedGroupCount = 6; final int numberSeries = 1; final int datapointCount = 2; final long expectedCadence = 10L; - assertEquals(expectedGroupCount,result.getGroups().size()); + assertEquals(expectedGroupCount, result.getGroups().size()); for (ShardedResultGroup shardedResultGroup : result.getGroups()) { assertEquals(numberSeries, shardedResultGroup.getSeries().size()); @@ -257,9 +253,9 @@ public void testDistributedTdigestAggregation() throws Exception { final List cadences = getCadences(result); assertEquals(ImmutableList.of(expectedCadence, expectedCadence, - expectedCadence, expectedCadence, expectedCadence, expectedCadence), cadences); - Map mapRes = this.extractResult(result, - new ComputeDistributionStat.Percentile("P99", 0.99)); + expectedCadence, expectedCadence, expectedCadence, expectedCadence), cadences); + Map mapRes = AbstractClusterQueryIT.extractResult(result, + new ComputeDistributionStat.Percentile("P99", 0.99)); validateStatAccuracy(mapRes); } @@ -268,18 +264,18 @@ public void testDistributedTdigestAggregation() throws Exception { public void testDistributionWithNoAggregation() throws Exception { final int expectedGroupCount = 2; // m1 and m2 final QueryResult result = - query(query.newQueryFromString("empty"), - builder -> { - }, - MetricType.DISTRIBUTION_POINTS, - false); + query(query.newQueryFromString("empty"), + builder -> { + }, + MetricType.DISTRIBUTION_POINTS, + false); - assertEquals(expectedGroupCount,result.getGroups().size()); + assertEquals(expectedGroupCount, result.getGroups().size()); //Validate record count - for(ShardedResultGroup group : result.getGroups()) { + for (ShardedResultGroup group : result.getGroups()) { List metrics = group.getMetrics().getDataAs(Point.class); - metrics.forEach(p-> assertEquals(RECORD_COUNT, p.getValue(),0)); + metrics.forEach(p -> assertEquals(RECORD_COUNT, p.getValue(), 0)); } } @@ -287,23 +283,22 @@ public void testDistributionWithNoAggregation() throws Exception { public void testbasicWithNoDistribution() throws Exception { final int expectedGroupCount = 2; // m1 and m2 final QueryResult result = - query(query.newQueryFromString("empty"), - builder -> { - }, - MetricType.POINT, - false); + query(query.newQueryFromString("empty"), + builder -> { + }, + MetricType.POINT, + false); - assertEquals(expectedGroupCount,result.getGroups().size()); + assertEquals(expectedGroupCount, result.getGroups().size()); } - @Test public void basicQueryTest() throws Exception { final QueryResult result = query("sum(10ms)"); // check the number of ShardedResultGroup - assertEquals(1,result.getGroups().size()); + assertEquals(1, result.getGroups().size()); assertEquals(2, result.getGroups().get(0).getSeries().size()); assertEquals(3, result.getGroups().get(0).getMetrics().size()); assertEquals(0, result.getGroups().get(0).getShard().size()); @@ -337,16 +332,16 @@ public void distributedQueryTraceTest() throws Exception { assertThat(result.getTrace(), hasIdentifier(equalTo(CoreQueryManager.QUERY))); // Verify that second level is of type QUERY_SHARD assertThat(result.getTrace(), containsChild( - hasIdentifier(identifierContains(CoreQueryManager.QUERY_SHARD.toString())))); + hasIdentifier(identifierContains(CoreQueryManager.QUERY_SHARD.toString())))); /* Verify that the third level (under QUERY_SHARD) contains at least one entry for the * local node and at least one for the remote node */ assertThat(result.getTrace(), containsChild( - allOf(hasIdentifier(identifierContains(CoreQueryManager.QUERY_SHARD.toString())), - containsChild(hasIdentifier(identifierContains("[local]")))))); + allOf(hasIdentifier(identifierContains(CoreQueryManager.QUERY_SHARD.toString())), + containsChild(hasIdentifier(identifierContains("[local]")))))); assertThat(result.getTrace(), containsChild( - allOf(hasIdentifier(identifierContains(CoreQueryManager.QUERY_SHARD.toString())), - containsChild(hasIdentifier(not(identifierContains("[local]"))))))); + allOf(hasIdentifier(identifierContains(CoreQueryManager.QUERY_SHARD.toString())), + containsChild(hasIdentifier(not(identifierContains("[local]"))))))); } @Test @@ -354,7 +349,7 @@ public void distributedDifferentQueryTest() throws Exception { final QueryResult result = query("sum(10ms) by diff"); // check the number of ShardedResultGroup - assertEquals(2,result.getGroups().size()); + assertEquals(2, result.getGroups().size()); assertEquals(1, result.getGroups().get(0).getSeries().size()); assertEquals(2, result.getGroups().get(0).getMetrics().size()); assertEquals(0, result.getGroups().get(0).getShard().size()); @@ -365,10 +360,9 @@ public void distributedDifferentQueryTest() throws Exception { assertEquals(ImmutableList.of(10L, 10L), cadences); assertEquals(ImmutableSet.of(Data.points().p(10, 1D).p(30, 2D).build(), - Data.points().p(10, 1D).p(20, 4D).build()), m); + Data.points().p(10, 1D).p(20, 4D).build()), m); } - @Test public void distributedFilterQueryTest() throws Exception { final QueryResult result = query("average(10ms) by * | topk(2) | bottomk(1) | sum(10ms)"); @@ -383,16 +377,16 @@ public void distributedFilterQueryTest() throws Exception { @Test public void filterQueryTest() throws Exception { final QueryResult result = - query("average(10ms) by * | topk(2) | bottomk(1) | sum(10ms)", builder -> { - builder.features(Optional.empty()); - }); + query("average(10ms) by * | topk(2) | bottomk(1) | sum(10ms)", builder -> { + builder.features(Optional.empty()); + }); final Set m = getResults(result); final List cadences = getCadences(result); assertEquals(ImmutableList.of(10L, 10L), cadences); assertEquals(ImmutableSet.of(Data.points().p(10, 1D).p(20, 4D).build(), - Data.points().p(10, 1D).p(30, 2D).build()), m); + Data.points().p(10, 1D).p(30, 2D).build()), m); } @Test @@ -419,7 +413,8 @@ public void pointsBelowTest() throws Exception { assertEquals(ImmutableList.of(0L, 0L), cadences); assertEquals( - ImmutableSet.of(Data.points().p(10, 1D).build(), Data.points().p(10, 1D).p(30, 2.0D).build()), m); + ImmutableSet.of(Data.points().p(10, 1D).build(), + Data.points().p(10, 1D).p(30, 2.0D).build()), m); } @Test @@ -432,7 +427,9 @@ public void deltaQueryTest() throws Exception { final List cadences = getCadences(result); assertEquals(ImmutableList.of(-1L, -1L), cadences); - assertEquals(ImmutableSet.of(Data.points().p(30, 1D).build(), Data.points().p(20, 3D).build()), m); + assertEquals( + ImmutableSet.of(Data.points().p(30, 1D).build(), Data.points().p(20, 3D).build()), + m); } @Test @@ -456,7 +453,8 @@ public void deltaPerSecondQueryTest() throws Exception { final List cadences = getCadences(result); assertEquals(ImmutableList.of(-1L, -1L), cadences); - assertEquals(ImmutableSet.of(Data.points().p(30, 50D).build(), Data.points().p(20, 300D).build()), m); + assertEquals(ImmutableSet + .of(Data.points().p(30, 50D).build(), Data.points().p(20, 300D).build()), m); } @Test @@ -503,7 +501,8 @@ public void cardinalityTest() throws Exception { final List cadences = getCadences(result); assertEquals(ImmutableList.of(10L), cadences); - assertEquals(ImmutableSet.of(Data.points().p(10, 1D).p(20, 1D).p(30, 1D).p(40, 0D).build()), m); + assertEquals(ImmutableSet.of(Data.points().p(10, 1D).p(20, 1D).p(30, 1D).p(40, 0D).build()), + m); } @Test @@ -517,7 +516,8 @@ public void cardinalityWithKeyTest() throws Exception { final List cadences = getCadences(result); assertEquals(ImmutableList.of(10L), cadences); - assertEquals(ImmutableSet.of(Data.points().p(10, 2D).p(20, 1D).p(30, 1D).p(40, 0D).build()), m); + assertEquals(ImmutableSet.of(Data.points().p(10, 2D).p(20, 1D).p(30, 1D).p(40, 0D).build()), + m); } @Test @@ -527,7 +527,7 @@ public void dataLimit() throws Exception { } @Test - public void testGroupLimit() throws Exception { + public void testGroupLimit() throws Exception { final QueryResult result = query("*", builder -> { builder.options(Optional.of(QueryOptions.builder().groupLimit(1L).build())); }); @@ -563,16 +563,16 @@ private void testDataLimit(final MetricType metricType) throws Exception { assertTrue((e instanceof QueryError)); final QueryError q = (QueryError) e; assertThat(q.getError(), - containsString("Some fetches failed (1) or were cancelled (0)")); + containsString("Some fetches failed (1) or were cancelled (0)")); } assertEquals(ResultLimits.of(ResultLimit.QUOTA), result.getLimits()); } - private void testSeriesLimitFailure(final MetricType metricType ) throws Exception { + private void testSeriesLimitFailure(final MetricType metricType) throws Exception { final QueryResult result = query("*", builder -> { builder.options( - Optional.of(QueryOptions.builder().seriesLimit(0L).failOnLimits(true).build())); + Optional.of(QueryOptions.builder().seriesLimit(0L).failOnLimits(true).build())); }); assertEquals(2, result.getErrors().size()); @@ -581,7 +581,7 @@ private void testSeriesLimitFailure(final MetricType metricType ) throws Excepti assertTrue((e instanceof QueryError)); final QueryError q = (QueryError) e; assertThat(q.getError(), containsString( - "The number of series requested is more than the allowed limit of [0]")); + "The number of series requested is more than the allowed limit of [0]")); } assertEquals(ResultLimits.of(ResultLimit.SERIES), result.getLimits()); @@ -590,7 +590,7 @@ private void testSeriesLimitFailure(final MetricType metricType ) throws Excepti private void testGroupLimitFailure(MetricType metricType) throws Exception { final QueryResult result = query("*", builder -> { builder.options( - Optional.of(QueryOptions.builder().groupLimit(0L).failOnLimits(true).build())); + Optional.of(QueryOptions.builder().groupLimit(0L).failOnLimits(true).build())); }); assertEquals(2, result.getErrors().size()); @@ -599,14 +599,15 @@ private void testGroupLimitFailure(MetricType metricType) throws Exception { assertTrue((e instanceof QueryError)); final QueryError q = (QueryError) e; assertThat(q.getError(), containsString( - "The number of result groups is more than the allowed limit of [0]")); + "The number of result groups is more than the allowed limit of [0]")); } assertEquals(ResultLimits.of(ResultLimit.GROUP), result.getLimits()); assertEquals(0, result.getGroups().size()); } - private void testAggregationLimit(final MetricType metricType, final String query) throws Exception { + private void testAggregationLimit(final MetricType metricType, final String query) + throws Exception { final QueryResult result = query(query, builder -> { builder.options(Optional.of(QueryOptions.builder().aggregationLimit(1L).build())); }); @@ -618,34 +619,36 @@ private void testAggregationLimit(final MetricType metricType, final String quer assertTrue((e instanceof QueryError)); final QueryError q = (QueryError) e; assertThat(q.getError(), - containsString("Some fetches failed (1) or were cancelled (0)")); + containsString("Some fetches failed (1) or were cancelled (0)")); } assertEquals(ResultLimits.of(ResultLimit.AGGREGATION), result.getLimits()); } - private Set getResults(final QueryResult result) { + private static Set getResults(final QueryResult result) { return result - .getGroups() - .stream() - .map(ShardedResultGroup::getMetrics) - .collect(Collectors.toSet()); + .getGroups() + .stream() + .map(ShardedResultGroup::getMetrics) + .collect(Collectors.toSet()); } // Percentile name. - private Map extractResult(final QueryResult res, - final ComputeDistributionStat.Percentile percentile) { - final Map resMap = new HashMap<>(); + private static Map extractResult(final QueryResult res, + final ComputeDistributionStat.Percentile percentile) { + final Map resMap = new HashMap<>(); List shardedResultGroups = res.getGroups(); - for(ShardedResultGroup shardedGrpRes : shardedResultGroups){ + for (ShardedResultGroup shardedGrpRes : shardedResultGroups) { Set pTags = new HashSet<>(); - shardedGrpRes.getSeries().forEach(s->s.getTags().entrySet().stream() - .filter(t->t.getKey().contentEquals("tdigeststat")) - .forEach(ss->pTags.add(ss.getValue()))); + shardedGrpRes.getSeries().forEach(s -> s.getTags().entrySet().stream() + .filter(t -> t.getKey().contentEquals("tdigeststat")) + .forEach(ss -> pTags.add(ss.getValue()))); assertEquals(1, pTags.size()); // Each group represents one stat - if ( !pTags.contains(percentile.getName())) continue; + if (!pTags.contains(percentile.getName())) { + continue; + } MetricCollection metricCollection = shardedGrpRes.getMetrics(); - for(Point p : metricCollection.getDataAs(Point.class)){ + for (Point p : metricCollection.getDataAs(Point.class)) { resMap.put(p.getTimestamp(), p.getValue()); } } @@ -653,38 +656,36 @@ private Map extractResult(final QueryResult res, } - private void validateStatAccuracy(final Map resMap){ + private void validateStatAccuracy(final Map resMap) { double expected = randDataset1.getDataStat().get(99); double actual = resMap.get(10L); - assertTrue( errorRate(expected, actual) <= EXPECTED_ERROR_RATE); + assertTrue(errorRate(expected, actual) <= EXPECTED_ERROR_RATE); expected = randDataset2.getDataStat().get(99); - actual = resMap.get(20L); - assertTrue( errorRate(expected, actual) <= EXPECTED_ERROR_RATE); + actual = resMap.get(20L); + assertTrue(errorRate(expected, actual) <= EXPECTED_ERROR_RATE); expected = randDataset3.getDataStat().get(99); - actual = resMap.get(30L); - assertTrue( errorRate(expected, actual) <= EXPECTED_ERROR_RATE); + actual = resMap.get(30L); + assertTrue(errorRate(expected, actual) <= EXPECTED_ERROR_RATE); } - - - private double errorRate(final double num1, final double num2) { + private static double errorRate(final double num1, final double num2) { BigDecimal expected = BigDecimal.valueOf(num1); BigDecimal actual = BigDecimal.valueOf(num2); return expected.add(actual.negate()) - .divide(expected, RoundingMode.CEILING).doubleValue(); + .divide(expected, RoundingMode.CEILING).doubleValue(); } - private List getCadences(final QueryResult result) { + private static List getCadences(final QueryResult result) { final List cadences = result - .getGroups() - .stream() - .map(ShardedResultGroup::getCadence) - .collect(Collectors.toList()); + .getGroups() + .stream() + .map(ShardedResultGroup::getCadence) + .collect(Collectors.toList()); Collections.sort(cadences); return cadences; diff --git a/heroic-dist/src/test/java/com/spotify/heroic/AbstractConsumerIT.java b/heroic-dist/src/test/java/com/spotify/heroic/AbstractConsumerIT.java index e89a57311..69c933a13 100644 --- a/heroic-dist/src/test/java/com/spotify/heroic/AbstractConsumerIT.java +++ b/heroic-dist/src/test/java/com/spotify/heroic/AbstractConsumerIT.java @@ -247,7 +247,7 @@ static List createTestMetricCollection(final WriteMetric.Request reques } - Object createJsonMetric(final TMetric metric, final Series series){ + static Object createJsonMetric(final TMetric metric, final Series series){ Object jsonMetric; if (metric.getValue() instanceof Value.DoubleValue){ jsonMetric = @@ -263,7 +263,7 @@ Object createJsonMetric(final TMetric metric, final Series series){ return jsonMetric; } - private void tryUntil(Callable callable) throws Exception { + private static void tryUntil(Callable callable) throws Exception { RetryPolicy.Instance instance = RETRY_POLICY.apply(ClockSource.SYSTEM); List supressed = new ArrayList<>(); diff --git a/heroic-dist/src/test/java/com/spotify/heroic/AbstractKafkaConsumerIT.java b/heroic-dist/src/test/java/com/spotify/heroic/AbstractKafkaConsumerIT.java index 9cc1cd7aa..468283173 100644 --- a/heroic-dist/src/test/java/com/spotify/heroic/AbstractKafkaConsumerIT.java +++ b/heroic-dist/src/test/java/com/spotify/heroic/AbstractKafkaConsumerIT.java @@ -11,18 +11,14 @@ import com.spotify.heroic.consumer.kafka.KafkaConsumerModule; import com.spotify.heroic.consumer.schemas.Spotify100; import com.spotify.heroic.consumer.schemas.spotify100.Version; -import com.spotify.heroic.consumer.schemas.spotify100.v2.Value; import com.spotify.heroic.ingestion.IngestionModule; import com.spotify.heroic.instrumentation.OperationsLogImpl; -import com.spotify.heroic.metric.DistributionPoint; import com.spotify.heroic.metric.MetricCollection; import com.spotify.heroic.metric.MetricManagerModule; import com.spotify.heroic.metric.MetricModule; import com.spotify.heroic.metric.MetricType; -import com.spotify.heroic.metric.Point; import com.spotify.heroic.metric.WriteMetric; import com.spotify.heroic.metric.memory.MemoryMetricModule; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; diff --git a/heroic-dist/src/test/java/com/spotify/heroic/analytics/bigtable/HeroicMetricsConfigurationTest.java b/heroic-dist/src/test/java/com/spotify/heroic/analytics/bigtable/HeroicMetricsConfigurationTest.java index bc591ff85..4de2bb042 100644 --- a/heroic-dist/src/test/java/com/spotify/heroic/analytics/bigtable/HeroicMetricsConfigurationTest.java +++ b/heroic-dist/src/test/java/com/spotify/heroic/analytics/bigtable/HeroicMetricsConfigurationTest.java @@ -1,11 +1,20 @@ package com.spotify.heroic.analytics.bigtable; -import static com.spotify.heroic.HeroicConfigurationTestUtils.testConfiguration; import static org.junit.Assert.assertEquals; +import com.spotify.heroic.HeroicConfigurationTestUtils; +import com.spotify.heroic.dagger.DaggerCoreComponent; import com.spotify.heroic.metric.LocalMetricManager; +import com.spotify.heroic.metric.MetricsConnectionSettings; import com.spotify.heroic.metric.bigtable.BigtableBackend; import com.spotify.heroic.metric.bigtable.BigtableMetricModule; +import com.spotify.heroic.metric.bigtable.MetricsRowKeySerializer; +import eu.toolchain.async.TinyAsync; +import eu.toolchain.serializer.TinySerializer; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.jetbrains.annotations.NotNull; import org.junit.Test; /** @@ -18,62 +27,135 @@ * maxWriteBatchSize: 250 * */ +@SuppressWarnings({"LineLength"}) public class HeroicMetricsConfigurationTest { public static final int EXPECTED_MAX_WRITE_BATCH_SIZE = 250; + public static final int DEFAULT_TIMEOUT = 250; + public static final int DEFAULT_RETRIES = 2; + public static final int EXPECTED_MUTATE_RPC_TIMEOUT_MS = 135; + public static final int EXPECTED_READ_ROWS_RPC_TIMEOUT_MS = 136; + public static final int EXPECTED_SHORT_RPC_TIMEOUT_MS = 137; + public static final int EXPECTED_MAX_ELAPSED_BACKOFF_MS = 138; + public static final int EXPECTED_MAX_SCAN_RETRIES = 7; + @NotNull + private static BigtableBackend getBigtableBackend(int maxWriteBatchSize) { + return getBigtableBackend(maxWriteBatchSize, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT, + DEFAULT_TIMEOUT, DEFAULT_RETRIES, DEFAULT_TIMEOUT); + } + + @NotNull + private static BigtableBackend getBigtableBackend(int maxWriteBatchSize, int mutateRpcTimeoutMs, + int readRowsRpcTimeoutMs, + int shortRpcTimeoutMs, + int maxScanTimeoutRetries, + int maxElapsedBackoffMs) { + final ExecutorService executor = Executors.newSingleThreadExecutor(); + var serializer = TinySerializer.builder().build(); + + var connectionSettings = new MetricsConnectionSettings(Optional.of(maxWriteBatchSize), + Optional.of(mutateRpcTimeoutMs), Optional.of(readRowsRpcTimeoutMs), + Optional.of(shortRpcTimeoutMs), Optional.of(maxScanTimeoutRetries), + Optional.of(maxElapsedBackoffMs)); + + var bigtableBackend = new BigtableBackend(null, + new MetricsRowKeySerializer(), + null, + null, + "bananas", + false, + connectionSettings, + null); + return bigtableBackend; + } + + /** + * Use this convenience overload if you don't care about *timeoutMs. + * + * @param maxWriteBatchSize max size of each written batch of metrics + * @return a bigtable module object + */ private static BigtableMetricModule getBigtableMetricModule(int maxWriteBatchSize) { + return getBigtableMetricModule(maxWriteBatchSize, DEFAULT_TIMEOUT, + DEFAULT_TIMEOUT, DEFAULT_TIMEOUT, DEFAULT_RETRIES, DEFAULT_TIMEOUT); + } + + private static BigtableMetricModule getBigtableMetricModule( + int maxWriteBatchSize, + int mutateRpcTimeoutMs, + int readRowsRpcTimeoutMs, + int shortRpcTimeoutMs, + int maxScanTimeoutRetries, + int maxElapsedBackoffMs) { + return new BigtableMetricModule.Builder() - .maxWriteBatchSize(maxWriteBatchSize) - .batchSize(1000) - .project("banana_count") - .build(); + .maxWriteBatchSize(maxWriteBatchSize) + .mutateRpcTimeoutMs(mutateRpcTimeoutMs) + .readRowsRpcTimeoutMs(readRowsRpcTimeoutMs) + .shortRpcTimeoutMs(shortRpcTimeoutMs) + .maxScanTimeoutRetries(maxScanTimeoutRetries) + .maxElapsedBackoffMs(maxElapsedBackoffMs) + .batchSize(1000) + .project("banana_count") + .build(); } @Test - public void testMaxWriteBatchSizeConfig() throws Exception { + public void testBigtableLimitsConfig() throws Exception { - final var instance = testConfiguration("heroic-all.yml"); + final var instance = HeroicConfigurationTestUtils.testConfiguration("heroic-all.yml"); - // Check that the BigTableBackend's maxWriteBatchSize was picked up + // Check that the BigTableBackend's maxWriteBatchSize et al were picked up // from the heroic-all.yml config file - instance.inject( - coreComponent -> { - var metricManager = + // @formatter:off + instance.inject(coreComponent -> { + var metricManager = (LocalMetricManager) coreComponent.metricManager(); - var analyticsBackend = + var analyticsBackend = metricManager - .groupSet() - .useGroup("bigtable") - .getMembers() - .toArray(new BigtableAnalyticsMetricBackend[0])[0]; - var bigtableBackend = (BigtableBackend) analyticsBackend.getBackend(); + .groupSet() + .useGroup("bigtable") + .getMembers() + .toArray(new BigtableAnalyticsMetricBackend[0])[0]; + var bigtableBackend = (BigtableBackend) analyticsBackend.getBackend(); - assertEquals(EXPECTED_MAX_WRITE_BATCH_SIZE, bigtableBackend.getMaxWriteBatchSize()); + // These (int) casts are needed to guide the compiler to pick the correct method + // call. + var mcs = bigtableBackend.metricsConnectionSettings(); + assertEquals(EXPECTED_MAX_WRITE_BATCH_SIZE, (int) mcs.maxWriteBatchSize); + assertEquals(EXPECTED_MUTATE_RPC_TIMEOUT_MS, (int) mcs.mutateRpcTimeoutMs); + assertEquals(EXPECTED_READ_ROWS_RPC_TIMEOUT_MS, (int) mcs.readRowsRpcTimeoutMs); + assertEquals(EXPECTED_SHORT_RPC_TIMEOUT_MS, (int) mcs.shortRpcTimeoutMs); + assertEquals(EXPECTED_MAX_SCAN_RETRIES, (int) mcs.maxScanTimeoutRetries); + assertEquals(EXPECTED_MAX_ELAPSED_BACKOFF_MS, (int) mcs.maxElapsedBackoffMs); - return null; - }); + return null; + }); + // @formatter:on } @Test public void testMaxWriteBatchSizeLimitsAreEnforced() { { final int tooBigBatchSize = 5_000_000; - var bigtableBackend = getBigtableMetricModule(tooBigBatchSize); - assertEquals(BigtableMetricModule.MAX_MUTATION_BATCH_SIZE, bigtableBackend.getMaxWriteBatchSize()); + var backend = getBigtableBackend(tooBigBatchSize); + assertEquals(BigtableMetricModule.MAX_MUTATION_BATCH_SIZE, + backend.metricsConnectionSettings().maxWriteBatchSize); } { final int tooSmallBatchSize = 1; - var bigtableBackend = getBigtableMetricModule(tooSmallBatchSize); - assertEquals(BigtableMetricModule.MIN_MUTATION_BATCH_SIZE, bigtableBackend.getMaxWriteBatchSize()); + assertEquals(BigtableMetricModule.MIN_MUTATION_BATCH_SIZE, + getBigtableBackend(tooSmallBatchSize) + .metricsConnectionSettings().maxWriteBatchSize); } { - final int validSize = 500_000; - var bigtableBackend = getBigtableMetricModule(validSize); + final int validSize = 100_000; - assertEquals(validSize, bigtableBackend.getMaxWriteBatchSize()); + assertEquals(validSize, + getBigtableBackend(validSize).metricsConnectionSettings().maxWriteBatchSize); } } } diff --git a/heroic-dist/src/test/resources/heroic-all.yml b/heroic-dist/src/test/resources/heroic-all.yml index e8706cea7..15481894f 100644 --- a/heroic-dist/src/test/resources/heroic-all.yml +++ b/heroic-dist/src/test/resources/heroic-all.yml @@ -21,6 +21,11 @@ metrics: - type: bigtable project: project maxWriteBatchSize: 250 + mutateRpcTimeoutMs: 135 + readRowsRpcTimeoutMs: 136 + shortRpcTimeoutMs: 137 + maxScanTimeoutRetries: 7 + maxElapsedBackoffMs: 138 - type: datastax - type: memory diff --git a/heroic-elasticsearch-utils/build.gradle b/heroic-elasticsearch-utils/build.gradle index bb86c43e8..0a4f214c8 100644 --- a/heroic-elasticsearch-utils/build.gradle +++ b/heroic-elasticsearch-utils/build.gradle @@ -5,6 +5,7 @@ dependencies { api 'org.elasticsearch.client:elasticsearch-rest-high-level-client' implementation 'org.elasticsearch.client:elasticsearch-rest-client-sniffer' implementation project(path: ':heroic-component', configuration: 'testRuntime') + implementation 'org.apache.logging.log4j:log4j-core' } description = 'Heroic: Elasticsearch Utilities' diff --git a/heroic-test/build.gradle b/heroic-test/build.gradle index e912252d1..1e6d6964d 100644 --- a/heroic-test/build.gradle +++ b/heroic-test/build.gradle @@ -1,8 +1,10 @@ dependencies { implementation project(':heroic-component') implementation project(':heroic-core') - + api group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.25' + implementation 'junit:junit' + implementation 'org.apache.logging.log4j:log4j-core' implementation 'org.mockito:mockito-core' implementation 'org.testcontainers:testcontainers' implementation "org.testcontainers:elasticsearch" diff --git a/heroic-test/src/main/java/com/spotify/heroic/test/AbstractMetadataBackendIndexResourceIT.java b/heroic-test/src/main/java/com/spotify/heroic/test/AbstractMetadataBackendIndexResourceIT.java index 188450c0b..42f25d82b 100644 --- a/heroic-test/src/main/java/com/spotify/heroic/test/AbstractMetadataBackendIndexResourceIT.java +++ b/heroic-test/src/main/java/com/spotify/heroic/test/AbstractMetadataBackendIndexResourceIT.java @@ -403,7 +403,7 @@ public void filterTest() throws Exception { private void retrySome(final ThrowingRunnable action) throws Exception { AssertionError error = null; - for (int i = 0; i < 10; i++) { + for (int i = 0; i < 20; i++) { try { action.run(); } catch (final AssertionError e) { diff --git a/heroic-test/src/main/java/com/spotify/heroic/test/AbstractMetricBackendIT.java b/heroic-test/src/main/java/com/spotify/heroic/test/AbstractMetricBackendIT.java index 1cb1e9e1e..f002477f5 100644 --- a/heroic-test/src/main/java/com/spotify/heroic/test/AbstractMetricBackendIT.java +++ b/heroic-test/src/main/java/com/spotify/heroic/test/AbstractMetricBackendIT.java @@ -77,9 +77,21 @@ public abstract class AbstractMetricBackendIT { public static final Map EVENT = ImmutableMap.of(); + /** + * Switch that determines how the metrics backend (typically the system under test) + * behaves. + */ + protected enum BackendModuleMode { + NORMAL, // behave "normally" i.e. as per default configuration + VERY_SHORT_TIMEOUTS // impose VERY_SHORT_WAIT_TIME_MS read* timeouts + } + protected MetricBackend backend; - /* For backends which currently does not implement period edge cases correctly, + // https://en.wikipedia.org/wiki/Phrases_from_The_Hitchhiker%27s_Guide_to_the_Galaxy + public static final double MEANING_OF_LIFE_HHGTTG = 42D; + + /** For backends which currently does not implement period edge cases correctly, * See: https://github.com/spotify/heroic/pull/208 */ protected boolean brokenSegmentsPr208 = false; protected boolean eventSupport = false; @@ -90,13 +102,23 @@ public abstract class AbstractMetricBackendIT { public TestRule setupBackend = (base, description) -> new Statement() { @Override public void evaluate() throws Throwable { - MetricModule module = setupModule(); - final MetricManagerModule.Builder metric = + final HeroicCoreInstance core = getHeroicCoreInstance( + BackendModuleMode.NORMAL); + backend = createBackend(core); + base.evaluate(); + core.shutdown().get(); + } + }; + + protected HeroicCoreInstance getHeroicCoreInstance(BackendModuleMode mode) throws Exception { + MetricModule module = setupModule(mode); + + final MetricManagerModule.Builder metric = MetricManagerModule.builder().backends(ImmutableList.of(module)); - final HeroicConfig.Builder fragment = HeroicConfig.builder().metrics(metric); + final HeroicConfig.Builder fragment = HeroicConfig.builder().metrics(metric); - final HeroicCoreInstance core = HeroicCore + final HeroicCoreInstance core = HeroicCore .builder() .setupShellServer(false) .setupService(false) @@ -104,28 +126,29 @@ public void evaluate() throws Throwable { .build() .newInstance(); - core.start().get(); + core.start().get(); - backend = core + return core; + } + + protected static MetricBackend createBackend(HeroicCoreInstance core) { + return core .inject(c -> c - .metricManager() - .groupSet() - .inspectAll() - .stream() - .map(GroupMember::getMember) - .findFirst()) + .metricManager() + .groupSet() + .inspectAll() + .stream() + .map(GroupMember::getMember) + .findFirst()) .orElseThrow(() -> new IllegalStateException("Failed to find backend")); - base.evaluate(); - core.shutdown().get(); - } - }; + } @Before public void setup() { setupSupport(); } - protected abstract MetricModule setupModule(); + protected abstract MetricModule setupModule(BackendModuleMode mode); protected Optional period() { return Optional.empty(); @@ -167,11 +190,12 @@ public void testOne() throws Exception { @Test public void testMaxBatchSize() throws Exception { assumeNotNull("max batch size", maxBatchSize); - DateRange range = new DateRange(99L, 100L + (maxBatchSize * 4)); + final int maxBatchMultiplier = 4; + DateRange range = new DateRange(99L, 100L + (maxBatchSize * maxBatchMultiplier)); new TestCase() .denseStart(100) - .dense(maxBatchSize * 4) + .dense(maxBatchSize * maxBatchMultiplier) .forEach((input, expected) -> verifyReadWrite(input, expected, range)); } @@ -230,8 +254,8 @@ private void verifyReadWrite( // Compare the metrics contained in the MetricCollections, ignoring if the data was split into // multiple MCs or not. - private void assertSortedMetricsEqual( - final Set expected, final Set actual + private static void assertSortedMetricsEqual( + final Set expected, final Set actual ) { final Comparator comparator = Comparator.comparingLong(Metric::getTimestamp); @@ -246,6 +270,7 @@ private void assertSortedMetricsEqual( } private class TestCase { + private Optional denseStart = Optional.empty(); private Optional dense = Optional.empty(); private final List input = new ArrayList<>(); @@ -284,8 +309,8 @@ void forEach(final ThrowingBiConsumer consum final Points input = new Points(); final Points expected = new Points(); - inputStream().forEach(t -> input.p(t, 42D)); - expectedStream().forEach(t -> expected.p(t, 42D)); + inputStream().forEach(t -> input.p(t, MEANING_OF_LIFE_HHGTTG)); + expectedStream().forEach(t -> expected.p(t, MEANING_OF_LIFE_HHGTTG)); consumer.accept(input.build(), expected.build()); } @@ -382,9 +407,9 @@ public void testWriteAndFetchLongSeries() throws Exception { } @Test - public void testWriteHugeMetric() throws Exception { + public void testWriteThenFetchSeriesWithManyTags() throws Exception { assumeTrue("Test huge row key write", hugeRowKey); - final MetricCollection points = new Points().p(100000L, 42D).build(); + var points = new Points().p(100000L, 42D, 1).build(); Map tags = new HashMap<>(); for (int i = 0; i < 110; i++) { tags.put("VeryLongTagName" + i, "VeryLongValueName" + i); @@ -394,14 +419,15 @@ public void testWriteHugeMetric() throws Exception { ImmutableSortedMap.of("resource", "a")); backend.write(new WriteMetric.Request(hugeSeries, points)).get(); - FetchData.Request request = - new FetchData.Request(MetricType.POINT, hugeSeries, new DateRange(10000L, 200000L), - QueryOptions.builder().build()); + var request = + new FetchData.Request(MetricType.POINT, hugeSeries, new DateRange(100000L, 200000L), + QueryOptions.builder().build()); - assertEquals(Collections.emptyList(), fetchMetrics(request, true)); + var metrics = fetchMetrics(request, true); + assertEquals(Collections.emptyList(), metrics); } - private List fetchMetrics(FetchData.Request request, boolean slicedFetch) + protected List fetchMetrics(FetchData.Request request, boolean slicedFetch) throws Exception { if (slicedFetch) { List fetchedMetrics = Collections.synchronizedList(new ArrayList<>()); @@ -418,7 +444,7 @@ private List fetchMetrics(FetchData.Request request, boolean s } } - private static void assertEqualMetrics( + protected static void assertEqualMetrics( MetricCollection expected, List actual ) { Stream types = actual.stream().map(MetricCollection::getType); diff --git a/heroic-test/src/main/java/com/spotify/heroic/test/AbstractSuggestBackendIT.java b/heroic-test/src/main/java/com/spotify/heroic/test/AbstractSuggestBackendIT.java index 335330ec1..3c5ee9d28 100644 --- a/heroic-test/src/main/java/com/spotify/heroic/test/AbstractSuggestBackendIT.java +++ b/heroic-test/src/main/java/com/spotify/heroic/test/AbstractSuggestBackendIT.java @@ -35,6 +35,8 @@ import com.spotify.heroic.common.OptionalLimit; import com.spotify.heroic.common.Series; import com.spotify.heroic.dagger.LoadingComponent; +import com.spotify.heroic.filter.Filter; +import com.spotify.heroic.filter.MatchKeyFilter; import com.spotify.heroic.filter.TrueFilter; import com.spotify.heroic.suggest.KeySuggest; import com.spotify.heroic.suggest.KeySuggest.Suggestion; @@ -53,9 +55,11 @@ import eu.toolchain.async.AsyncFramework; import eu.toolchain.async.AsyncFuture; import eu.toolchain.async.RetryPolicy; +import eu.toolchain.async.RetryResult; import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -68,6 +72,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.jetbrains.annotations.NotNull; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -80,47 +85,187 @@ public abstract class AbstractSuggestBackendIT { public static final int REQ_SUGGESTION_ENTITY_LIMIT = 15; public static final String STARTS_WITH_RO = "ro"; // e.g. role public static final int EFFECTIVELY_NO_LIMIT = 100_000; - - private static final int SMALL_SERIES_SIZE = 3; - private static final int LARGE_NUM_ENTITIES = 20; - private static final int VERY_LARGE_NUM_ENTITIES = 500; public static final String BAR = "bar"; public static final String BAZ = "baz"; public static final String FOO = "foo"; - public static final String AA_2 = "aa2"; public static final String AA = "aa"; public static final String ROLE = "role"; public static final String AA_1 = "aa1"; + public static final String AA_2 = "aa2"; public static final String BB_3 = "bb3"; - protected final String testName = "heroic-it-" + UUID.randomUUID().toString(); - // MetaData and Suggest have no concept of datetime ranges so just set // the same for all. protected static final DateRange UNIVERSAL_RANGE = new DateRange(0L, 0L); + private static final int SMALL_SERIES_SIZE = 3; + private static final int LARGE_NUM_ENTITIES = 20; + private static final int VERY_LARGE_NUM_ENTITIES = 500; + + // longest that we will wait for a write to ES to complete + public static final int ES_WRITE_MAX_ELAPSED_MS = 20_000; + // We check to see if the writes are done every 200ms + public static final int ES_WRITE_CHECK_DONE_WAIT_MS = 200; + public static final String NO_SUGGESTIONS = "No tag suggestion available for the given " + + "series"; + + protected final String testName = "heroic-it-" + UUID.randomUUID().toString(); protected AsyncFramework async; protected SuggestBackend backend; private HeroicCoreInstance core; + private static final int EXPECTED_TAG_VALUE_COUNT = 20; + private static final int EXPECTED_KEY_COUNT = 1; + + private static final int EXPECTED_SMALL_SERIES_COUNT = 1; // foo, bar, baz + + protected static List> createSmallSerieses(long timestamp, + EntityType et) { + + var p = new TimestampPrepender(et, timestamp); + + // Create 3 series of the form e.g. + // { key: aa1, tags: { role, foo } } + return new ArrayList<>() { + { + add(createSeriesPair(AA_1, FOO, p)); + add(createSeriesPair(AA_2, BAR, p)); + add(createSeriesPair(BB_3, BAZ, p)); + } + + @NotNull + private ImmutablePair createSeriesPair(String key, String tagValue, + TimestampPrepender p) { + return new ImmutablePair<>(Series.of( + p.prepend(key, EntityType.KEY), + ImmutableMap.of(p.prepend(ROLE, EntityType.TAG), + p.prepend(tagValue, EntityType.TAG_VALUE))), UNIVERSAL_RANGE); + } + }; + } + + private static TagKeyCount.Request createTagCountRequest(long timestamp) { + return new TagKeyCount.Request(TrueFilter.get(), + new DateRange(timestamp, timestamp), OptionalLimit.empty(), OptionalLimit.empty()); + } + + @NotNull + private static TagValuesSuggest.Request buildTagValuesRequest( + OptionalLimit numSuggestionsLimit) { + return new TagValuesSuggest.Request(TrueFilter.get(), + UNIVERSAL_RANGE, numSuggestionsLimit, + OptionalLimit.of(EFFECTIVELY_NO_LIMIT), ImmutableList.of()); + } + + @NotNull + private static TagValuesSuggest.Request buildTagValuesRequest( + Filter filter, + OptionalLimit numSuggestionsLimit) { + return new TagValuesSuggest.Request(filter, + UNIVERSAL_RANGE, numSuggestionsLimit, + OptionalLimit.of(EFFECTIVELY_NO_LIMIT), ImmutableList.of()); + } + + private static TagValueSuggest.Request buildTagValueSuggestReq( + String tagValue, long timestamp, OptionalLimit numSuggestionsLimit) { + + return new TagValueSuggest.Request(TrueFilter.get(), + UNIVERSAL_RANGE, numSuggestionsLimit, + Optional.of(TimestampPrepender.prepend(timestamp, tagValue))); + } + + @NotNull + private static Request buildTagSuggestRequest(String tagValue, long timestamp) { + return new Request( + TrueFilter.get(), UNIVERSAL_RANGE, OptionalLimit.empty(), + MatchOptions.builder().build(), + Optional.of(TimestampPrepender.prepend(timestamp, tagValue)), + Optional.empty()); + } + + @NotNull + private static Request buildTagSuggestRequest( + String tagValue, long timestamp, int numSuggestionsLimit) { + return new Request(TrueFilter.get(), UNIVERSAL_RANGE, + OptionalLimit.of(numSuggestionsLimit), + MatchOptions.builder().build(), + Optional.of(TimestampPrepender.prepend(timestamp, tagValue)), Optional.empty()); + } + + private static KeySuggest.Request keySuggestStartsWithReq(String startsWith, long timestamp) { + return keySuggestStartsWithReq(startsWith, timestamp, OptionalLimit.empty()); + } + + private static KeySuggest.Request keySuggestStartsWithReq(String startsWith, long timestamp, + OptionalLimit numSuggestionsLimit) { + + return new KeySuggest.Request(TrueFilter.get(), UNIVERSAL_RANGE, numSuggestionsLimit, + MatchOptions.builder().build(), + Optional.of(TimestampPrepender.prepend(timestamp, startsWith))); + } + + private static long getUniqueTimestamp() { + final long t = Instant.now().toEpochMilli() + (long) Math.random(); + return t; + } + + private static List> createTestSeriesData(int numKeys, + int tagsAndTagValuesPerKey, + long timestamp, + Set et) { + var p = new TimestampPrepender(et, timestamp); + return createSeriesTestDataImpl(numKeys, tagsAndTagValuesPerKey, p); + + } + + private static List> createTestSeriesData(int numKeys, + int tagsAndTagValuesPerKey, + long timestamp, + EntityType et) { + var p = new TimestampPrepender(et, timestamp); + return createSeriesTestDataImpl(numKeys, tagsAndTagValuesPerKey, p); + } + + @NotNull + private static ArrayList> createSeriesTestDataImpl( + int numKeys, + int tagsAndTagValuesPerKey, + TimestampPrepender p) { + var series = new ArrayList>(numKeys); + + for (int i = 0; i < numKeys; i++) { + final var key = p.prepend(String.format(AA + "-%d", i + 1), EntityType.KEY); + for (int j = 0; j < tagsAndTagValuesPerKey; j++) { + + final var tags = + ImmutableMap.of( + p.prepend(ROLE, EntityType.TAG), + p.prepend(FOO + "-" + (j + 1), EntityType.TAG_VALUE)); + + series.add(new ImmutablePair<>(Series.of(key, tags), UNIVERSAL_RANGE)); + } + } + + return series; + } protected abstract SuggestModule setupModule() throws Exception; @Before public final void abstractSetup() throws Exception { final HeroicConfig.Builder fragment = HeroicConfig.builder() - .suggest(SuggestManagerModule.builder().backends(ImmutableList.of(setupModule()))); + .suggest(SuggestManagerModule.builder().backends(ImmutableList.of(setupModule()))); core = HeroicCore.builder().setupService(false).setupShellServer(false) - .configFragment(fragment).build() - .newInstance(); + .configFragment(fragment).build() + .newInstance(); core.start().get(); async = core.inject(LoadingComponent::async); backend = core.inject( - c -> c.suggestManager().groupSet().inspectAll().stream() - .map(GroupMember::getMember).findFirst()).orElseThrow( - () -> new IllegalStateException("Failed to find backend")); + c -> c.suggestManager().groupSet().inspectAll().stream() + .map(GroupMember::getMember).findFirst()).orElseThrow( + () -> new IllegalStateException("Failed to find backend")); } @After @@ -133,44 +278,64 @@ public void tagValuesSuggestSmall() throws Exception { // Check a single suggestion with values final long timestamp = getUniqueTimestamp(); - writeSeries(backend, createSmallSeries(timestamp, EntityType.TAG)); + writeSeries(backend, + createSmallSerieses(timestamp, EntityType.TAG), EXPECTED_SMALL_SERIES_COUNT); var result = getTagValuesSuggest( - buildTagValuesRequest(OptionalLimit.empty())); + buildTagValuesRequest(OptionalLimit.empty())); var suggestion = result.getSuggestions().get(0); var expected = new TreeSet(Arrays.asList(BAR, BAZ, FOO)); assertEquals(new TagValuesSuggest.Suggestion(TimestampPrepender.prepend(timestamp, - ROLE), expected, false), suggestion); + ROLE), expected, false), suggestion); } @Test public void tagValuesTruncatedSuggest() throws Exception { + tagValuesTruncatedSuggestImpl(0); + } + /** + * This is to investigate the different behaviour we're seeing between running + * the IT's locally and in CI. + */ + @Test + public void tagValuesTruncatedSuggestMany() throws Exception { + for (int i = 0; i < 15; i++) { + tagValuesTruncatedSuggestImpl(i + 1); + } + } + + private void tagValuesTruncatedSuggestImpl(int count) throws Exception { // Check that a number of tag values larger than the supplied limit is // correctly truncated. final long timestamp = getUniqueTimestamp(); + var entitiesToTimestamp = Set.of(EntityType.KEY, EntityType.TAG); + var p = new TimestampPrepender(entitiesToTimestamp, timestamp); - var largeNumTagsSeries = - createTestSeriesData(1, LARGE_NUM_ENTITIES, timestamp, EntityType.TAG); - writeSeries(backend, largeNumTagsSeries); + var seriesData = + createTestSeriesData(1, LARGE_NUM_ENTITIES, timestamp, entitiesToTimestamp); + writeSeries(backend, seriesData, EXPECTED_TAG_VALUE_COUNT); + final var key = p.prepend(AA + "-1", EntityType.KEY); var result = - getTagValuesSuggest( - buildTagValuesRequest(OptionalLimit.of(REQ_SUGGESTION_ENTITY_LIMIT))); + getTagValuesSuggest( + buildTagValuesRequest(MatchKeyFilter.create(key), + OptionalLimit.of(REQ_SUGGESTION_ENTITY_LIMIT))); final var suggestions = result.getSuggestions(); assertEquals(1, suggestions.size()); - assertEquals(REQ_SUGGESTION_ENTITY_LIMIT, suggestions.get(0).getValues().size()); + Assert.assertEquals("Iteration " + count + " not equal", REQ_SUGGESTION_ENTITY_LIMIT, + suggestions.get(0).getValues().size()); } @Test public void tagKeyCount() throws Exception { final long timestamp = getUniqueTimestamp(); - var smallTestSeries = createSmallSeries(timestamp, EntityType.TAG); - writeSeries(backend, smallTestSeries); + var smallTestSeries = createSmallSerieses(timestamp, EntityType.TAG); + writeSeries(backend, smallTestSeries, EXPECTED_SMALL_SERIES_COUNT); final TagKeyCount result = getTagKeyCount(createTagCountRequest(timestamp)); final TagKeyCount.Suggestion s = result.getSuggestions().get(0); @@ -187,18 +352,17 @@ public void tagSuggestSmall() throws Exception { final long timestamp = getUniqueTimestamp(); var smallTestSeries = - createSmallSeries(timestamp, EntityType.TAG); - writeSeries(backend, smallTestSeries); // adds 3 tags + createSmallSerieses(timestamp, EntityType.TAG); + writeSeries(backend, smallTestSeries, EXPECTED_SMALL_SERIES_COUNT); // adds 3 tags var result = getTagSuggest( - buildTagSuggestRequest(STARTS_WITH_RO, timestamp)); + buildTagSuggestRequest(STARTS_WITH_RO, timestamp)); assertEquals(SMALL_SERIES_SIZE, result.size()); assertEquals(TimestampPrepender.prepend(timestamp, ROLE), - result.stream().findFirst().get().getKey()); + result.stream().findFirst().get().getKey()); } - /** * Check that a request limit is respected and one without gets the whole lot. */ @@ -209,12 +373,12 @@ public void tagSuggestLimit() throws Exception { // add LARGE_NUM_ENTITIES tags. Total is now 23 var largeNumTagsSeries = - createTestSeriesData(1, LARGE_NUM_ENTITIES, timestamp, EntityType.TAG); - writeSeries(backend, largeNumTagsSeries); + createTestSeriesData(1, LARGE_NUM_ENTITIES, timestamp, EntityType.TAG); + writeSeries(backend, largeNumTagsSeries, EXPECTED_TAG_VALUE_COUNT); var result = - getTagSuggest(buildTagSuggestRequest(STARTS_WITH_RO, timestamp, - REQ_SUGGESTION_ENTITY_LIMIT)); + getTagSuggest(buildTagSuggestRequest(STARTS_WITH_RO, timestamp, + REQ_SUGGESTION_ENTITY_LIMIT)); assertEquals(REQ_SUGGESTION_ENTITY_LIMIT, result.size()); @@ -235,11 +399,11 @@ public void tagSuggestCeiling() throws Exception { long timestamp = getUniqueTimestamp(); var veryLargeNumTagsSeries = - createTestSeriesData(1, VERY_LARGE_NUM_ENTITIES, timestamp, EntityType.TAG); - writeSeries(backend, veryLargeNumTagsSeries); + createTestSeriesData(1, VERY_LARGE_NUM_ENTITIES, timestamp, EntityType.TAG); + writeSeries(backend, veryLargeNumTagsSeries, NumSuggestionsLimit.DEFAULT_LIMIT); var reqStartsWithRo = buildTagSuggestRequest(STARTS_WITH_RO, timestamp, - AbstractSuggestBackendIT.EFFECTIVELY_NO_LIMIT); + AbstractSuggestBackendIT.EFFECTIVELY_NO_LIMIT); var result = getTagSuggest(reqStartsWithRo); assertEquals(NumSuggestionsLimit.LIMIT_CEILING, result.size()); } @@ -248,10 +412,11 @@ public void tagSuggestCeiling() throws Exception { public void tagValueSuggestSmall() throws Exception { final long timestamp = getUniqueTimestamp(); - writeSeries(backend, createSmallSeries(timestamp, EntityType.TAG)); + writeSeries(backend, + createSmallSerieses(timestamp, EntityType.TAG), EXPECTED_SMALL_SERIES_COUNT); var result = getTagValueSuggest( - buildTagValueSuggestReq(ROLE, timestamp, OptionalLimit.empty())); + buildTagValueSuggestReq(ROLE, timestamp, OptionalLimit.empty())); var expected = new TreeSet(Arrays.asList(BAR, BAZ, FOO)); assertEquals(ImmutableSet.copyOf(expected), ImmutableSet.copyOf(result.getValues())); @@ -262,13 +427,13 @@ public void tagValueSuggestLimited() throws Exception { final long timestamp = getUniqueTimestamp(); var largeNumTagsSeries = - createTestSeriesData(1, LARGE_NUM_ENTITIES, timestamp, EntityType.TAG); + createTestSeriesData(1, LARGE_NUM_ENTITIES, timestamp, EntityType.TAG); - writeSeries(backend, largeNumTagsSeries); + writeSeries(backend, largeNumTagsSeries, EXPECTED_TAG_VALUE_COUNT); var result = getTagValueSuggest( - buildTagValueSuggestReq(ROLE, timestamp, - OptionalLimit.of(REQ_SUGGESTION_ENTITY_LIMIT))); + buildTagValueSuggestReq(ROLE, timestamp, + OptionalLimit.of(REQ_SUGGESTION_ENTITY_LIMIT))); assertEquals(REQ_SUGGESTION_ENTITY_LIMIT, result.getValues().size()); } @@ -278,29 +443,29 @@ public void keySuggest() throws Exception { var et = EntityType.KEY; { final long timestamp = getUniqueTimestamp(); - var smallTestSeries = createSmallSeries(timestamp, et); + var smallTestSeries = createSmallSerieses(timestamp, et); - writeSeries(backend, smallTestSeries); + writeSeries(backend, smallTestSeries, EXPECTED_KEY_COUNT); var result = getKeySuggest(keySuggestStartsWithReq(AA, timestamp)); assertEquals(ImmutableSet.of(TimestampPrepender.prepend(timestamp, AA_1), - TimestampPrepender.prepend(timestamp, - AA_2)), - result); + TimestampPrepender.prepend(timestamp, + AA_2)), + result); } { final long timestamp = getUniqueTimestamp(); var largeNumKeysSeries = - createTestSeriesData(LARGE_NUM_ENTITIES, 1, timestamp, EntityType.KEY); + createTestSeriesData(LARGE_NUM_ENTITIES, 1, timestamp, EntityType.KEY); - writeSeries(backend, largeNumKeysSeries); + writeSeries(backend, largeNumKeysSeries, EXPECTED_KEY_COUNT); var result = - getKeySuggest( - keySuggestStartsWithReq( - AA, timestamp, OptionalLimit.of(REQ_SUGGESTION_ENTITY_LIMIT))); + getKeySuggest( + keySuggestStartsWithReq( + AA, timestamp, OptionalLimit.of(REQ_SUGGESTION_ENTITY_LIMIT))); assertEquals(REQ_SUGGESTION_ENTITY_LIMIT, result.size()); } } @@ -308,7 +473,7 @@ public void keySuggest() throws Exception { @Test public void tagValueSuggestNoIdx() throws Exception { final TagValueSuggest result = getTagValueSuggest( - buildTagValueSuggestReq(ROLE, 0L, OptionalLimit.empty())); + buildTagValueSuggestReq(ROLE, 0L, OptionalLimit.empty())); assertEquals(Collections.emptyList(), result.getValues()); } @@ -316,7 +481,7 @@ public void tagValueSuggestNoIdx() throws Exception { @Test public void tagValuesSuggestNoIdx() throws Exception { final TagValuesSuggest result = getTagValuesSuggest( - buildTagValuesRequest(OptionalLimit.empty())); + buildTagValuesRequest(OptionalLimit.empty())); assertEquals(Collections.emptyList(), result.getSuggestions()); } @@ -334,7 +499,7 @@ public void tagKeyCountNoIdx() throws Exception { @Test public void tagSuggestNoIdx() throws Exception { final Set> result = - getTagSuggest(buildTagSuggestRequest("ba", getUniqueTimestamp())); + getTagSuggest(buildTagSuggestRequest("ba", getUniqueTimestamp())); assertEquals(Collections.emptySet(), result); } @@ -342,187 +507,121 @@ public void tagSuggestNoIdx() throws Exception { @Test public void keySuggestNoIdx() throws Exception { final Set result = - getKeySuggest(keySuggestStartsWithReq(AA, getUniqueTimestamp())); + getKeySuggest(keySuggestStartsWithReq(AA, getUniqueTimestamp())); assertEquals(Collections.emptySet(), result); } - private AsyncFuture writeSeries( - final SuggestBackend suggest, final Series s, final DateRange range) { - return suggest.write(new WriteSuggest.Request(s, range)).lazyTransform(r -> async - .retryUntilResolved(() -> checks(s, range), RetryPolicy.timed( - 10000, RetryPolicy.exponential(100, 200)))) - .directTransform(retry -> null); - } - - private AsyncFuture checks(final Series s, DateRange range) { - final List> checks = new ArrayList<>(); - - checks.add(backend.tagSuggest(new TagSuggest.Request( - matchKey(s.getKey()), range, - OptionalLimit.empty(), MatchOptions.builder().build(), - Optional.empty(), Optional.empty())) - .directTransform(result -> { - if (result.getSuggestions().isEmpty()) { - throw new IllegalStateException("No tag suggestion available for the given " - + "series"); - } - - return null; - })); - - checks.add(backend.keySuggest(new KeySuggest.Request( - matchKey(s.getKey()), range, - OptionalLimit.empty(), MatchOptions.builder().build(), - Optional.empty())).directTransform(result -> { - if (result.getSuggestions().isEmpty()) { - throw new IllegalStateException( - "No key suggestion available for the given series"); - } - - return null; - } - )); - - return async.collectAndDiscard(checks); + private AsyncFuture>> writeSeries( + final SuggestBackend suggest, final Series s, final DateRange range, + int expectedTagValueCount) { + + return suggest + .write(new WriteSuggest.Request(s, range)) + .lazyTransform( + r -> async + // we wait up to ES_WRITE_MAX_ELAPSED_MS millis for the write to + // succeed. We test to see if it's done every + // ES_WRITE_RETRY_WAIT_MS millis. + .retryUntilResolved( + () -> + checks(s, range, expectedTagValueCount), + RetryPolicy.timed( + ES_WRITE_MAX_ELAPSED_MS, + RetryPolicy.linear(ES_WRITE_CHECK_DONE_WAIT_MS) + ) + ) + ); + } + + private AsyncFuture> checks(final Series s, DateRange range, + int expectedTagValueCount) { + final List> checks = new ArrayList<>(); + + checks.add( + backend.tagSuggest( + new TagSuggest.Request( + matchKey( + s.getKey() + ), + range, + OptionalLimit.empty(), + MatchOptions.builder().build(), + Optional.empty(), + Optional.empty())) + .directTransform( + result -> { + var suggestions = result.getSuggestions(); + int count = suggestions == null ? 0 : suggestions.size(); + if (suggestions.size() != expectedTagValueCount) { + throw new IllegalStateException( + suggestions.size() + "/" + expectedTagValueCount + + " tag suggestions returned. Will wait for" + + " the rest."); + } + + return suggestions.size(); + })); + + checks.add( + backend.keySuggest( + new KeySuggest.Request( + matchKey( + s.getKey() + ), + range, + OptionalLimit.empty(), + MatchOptions.builder().build(), + Optional.empty())) + .directTransform( + result -> { + if (result.getSuggestions().isEmpty()) { + throw new IllegalStateException( + "No key suggestion available for the given series"); + } + + return result.getSuggestions().size(); + })); + + return async.collect(checks); } private void writeSeries(final SuggestBackend backend, - final List> data) throws Exception { + final List> data, + int expectedTagValueCount) + throws Exception { - final List> writes = new ArrayList<>(); + final List>>> writes = new ArrayList<>(); for (Pair p : data) { - writes.add(writeSeries(backend, p.getKey(), p.getValue())); + writes.add(writeSeries(backend, p.getKey(), p.getValue(), expectedTagValueCount)); } async.collectAndDiscard(writes).get(); } private TagValuesSuggest getTagValuesSuggest(final TagValuesSuggest.Request req) - throws ExecutionException, InterruptedException { + throws ExecutionException, InterruptedException { return backend.tagValuesSuggest(req).get(); } private TagValueSuggest getTagValueSuggest(final TagValueSuggest.Request req) - throws ExecutionException, InterruptedException { + throws ExecutionException, InterruptedException { return backend.tagValueSuggest(req).get(); } private TagKeyCount getTagKeyCount(final TagKeyCount.Request req) - throws ExecutionException, InterruptedException { + throws ExecutionException, InterruptedException { return backend.tagKeyCount(req).get(); } private Set> getTagSuggest(final TagSuggest.Request req) - throws ExecutionException, InterruptedException { + throws ExecutionException, InterruptedException { return backend.tagSuggest(req).get().getSuggestions().stream() - .map(s -> Pair.of(s.getKey(), s.getValue())).collect(Collectors.toSet()); + .map(s -> Pair.of(s.getKey(), s.getValue())).collect(Collectors.toSet()); } private Set getKeySuggest(final KeySuggest.Request req) - throws ExecutionException, InterruptedException { + throws ExecutionException, InterruptedException { return backend.keySuggest(req).get().getSuggestions().stream() - .map(Suggestion::getKey).collect(Collectors.toSet()); - } - - protected static List> createSmallSeries(long timestamp, - EntityType et) { - - var p = new TimestampPrepender(et, timestamp); - - return new ArrayList<>() { - { - add(createSeriesPair(AA_1, FOO, p)); - add(createSeriesPair(AA_2, BAR, p)); - add(createSeriesPair(BB_3, BAZ, p)); - } - - @NotNull - private ImmutablePair createSeriesPair(String key, String foo, - TimestampPrepender p) { - return new ImmutablePair<>(Series.of( - p.prepend(key, EntityType.KEY), - ImmutableMap.of(p.prepend(ROLE, EntityType.TAG), - p.prepend(foo, EntityType.TAG_VALUE))), UNIVERSAL_RANGE); - } - }; - } - - private static TagKeyCount.Request createTagCountRequest(long timestamp) { - return new TagKeyCount.Request(TrueFilter.get(), - new DateRange(timestamp, timestamp), OptionalLimit.empty(), OptionalLimit.empty()); - } - - @NotNull - private static TagValuesSuggest.Request buildTagValuesRequest( - OptionalLimit numSuggestionsLimit) { - return new TagValuesSuggest.Request(TrueFilter.get(), - UNIVERSAL_RANGE, numSuggestionsLimit, - OptionalLimit.of(EFFECTIVELY_NO_LIMIT), ImmutableList.of()); - } - - private static TagValueSuggest.Request buildTagValueSuggestReq( - String tagValue, long timestamp, OptionalLimit numSuggestionsLimit) { - - return new TagValueSuggest.Request(TrueFilter.get(), - UNIVERSAL_RANGE, numSuggestionsLimit, - Optional.of(TimestampPrepender.prepend(timestamp, tagValue))); - } - - @NotNull - private static Request buildTagSuggestRequest(String tagValue, long timestamp) { - return new Request( - TrueFilter.get(), UNIVERSAL_RANGE, OptionalLimit.empty(), - MatchOptions.builder().build(), - Optional.of(TimestampPrepender.prepend(timestamp, tagValue)), - Optional.empty()); - } - - @NotNull - private static Request buildTagSuggestRequest( - String tagValue, long timestamp, int numSuggestionsLimit) { - return new Request(TrueFilter.get(), UNIVERSAL_RANGE, - OptionalLimit.of(numSuggestionsLimit), - MatchOptions.builder().build(), - Optional.of(TimestampPrepender.prepend(timestamp, tagValue)), Optional.empty()); - } - - private static KeySuggest.Request keySuggestStartsWithReq(String startsWith, long timestamp) { - return keySuggestStartsWithReq(startsWith, timestamp, OptionalLimit.empty()); - } - - private static KeySuggest.Request keySuggestStartsWithReq(String startsWith, long timestamp, - OptionalLimit numSuggestionsLimit) { - - return new KeySuggest.Request(TrueFilter.get(), UNIVERSAL_RANGE, numSuggestionsLimit, - MatchOptions.builder().build(), - Optional.of(TimestampPrepender.prepend(timestamp, startsWith))); - } - - private static long getUniqueTimestamp() { - final long t = Instant.now().toEpochMilli() + (long) Math.random(); - return t; - } - - private static List> createTestSeriesData(int numKeys, - int tagsAndTagValuesPerKey, long timestamp, EntityType et) { - - var p = new TimestampPrepender(et, timestamp); - - var series = new ArrayList>(numKeys); - - for (int i = 0; i < numKeys; i++) { - final var key = p.prepend(String.format(AA + "-%d", i + 1), EntityType.KEY); - for (int j = 0; j < tagsAndTagValuesPerKey; j++) { - - final var tags = - ImmutableMap.of( - p.prepend(ROLE, EntityType.TAG), - p.prepend(FOO + "-" + (j + 1), EntityType.TAG_VALUE)); - - series.add(new ImmutablePair<>(Series.of(key, tags), UNIVERSAL_RANGE)); - } - } - - return series; + .map(Suggestion::getKey).collect(Collectors.toSet()); } } diff --git a/heroic-test/src/main/java/com/spotify/heroic/test/Points.java b/heroic-test/src/main/java/com/spotify/heroic/test/Points.java index ee39c3fbc..4df513f12 100644 --- a/heroic-test/src/main/java/com/spotify/heroic/test/Points.java +++ b/heroic-test/src/main/java/com/spotify/heroic/test/Points.java @@ -33,6 +33,16 @@ public Points p(final long t, final double v) { return this; } + // We suppress this warning as modifying a primitive parameter is safe + @SuppressWarnings("AssignmentToMethodParameter") + public Points p(long startingTimestamp, double startingMetricValue, int numPoints) { + for (int i = 0; i < numPoints; i++) { + points.add(new Point(startingTimestamp++, startingMetricValue++)); + } + + return this; + } + public MetricCollection build() { return MetricCollection.points(points.build()); } diff --git a/heroic-test/src/main/java/com/spotify/heroic/test/TimestampPrepender.java b/heroic-test/src/main/java/com/spotify/heroic/test/TimestampPrepender.java index d2a1fbaa1..863635956 100644 --- a/heroic-test/src/main/java/com/spotify/heroic/test/TimestampPrepender.java +++ b/heroic-test/src/main/java/com/spotify/heroic/test/TimestampPrepender.java @@ -21,33 +21,41 @@ package com.spotify.heroic.test; +import java.util.HashSet; +import java.util.Set; + /** * Simple class to encapsulate conditional logic on whether a key, tag or tag value should be * prepended with a [uniquely-identifying] timestamp. */ public class TimestampPrepender { - public enum EntityType { - KEY, - TAG, - TAG_VALUE - } - + private Set ets; + private long timestamp; public TimestampPrepender(EntityType et, long timestamp) { - this.et = et; + this.ets = new HashSet(); + this.ets.add(et); this.timestamp = timestamp; } - private EntityType et; - private long timestamp; + public TimestampPrepender(Set et, long timestamp) { + this.ets = et; + this.timestamp = timestamp; + } public static String prepend(long timestamp, String input) { return Long.toString(timestamp) + "-" + input; } public String prepend(String input, EntityType et) { - return et == this.et - ? prepend(timestamp, input) - : input; + return this.ets.contains(et) + ? prepend(timestamp, input) + : input; + } + + public enum EntityType { + KEY, + TAG, + TAG_VALUE } } diff --git a/heroic-test/src/test/java/com/spotify/heroic/suggest/elasticsearch/AbstractSuggestBackendKVIT.java b/heroic-test/src/test/java/com/spotify/heroic/suggest/elasticsearch/AbstractSuggestBackendKVIT.java index c56c8056e..e354341cb 100644 --- a/heroic-test/src/test/java/com/spotify/heroic/suggest/elasticsearch/AbstractSuggestBackendKVIT.java +++ b/heroic-test/src/test/java/com/spotify/heroic/suggest/elasticsearch/AbstractSuggestBackendKVIT.java @@ -23,7 +23,6 @@ import static org.junit.Assert.assertEquals; -import com.spotify.heroic.common.DateRange; import com.spotify.heroic.elasticsearch.ClientWrapper; import com.spotify.heroic.elasticsearch.ConnectionModule; import com.spotify.heroic.elasticsearch.index.RotatingIndexMapping; @@ -66,7 +65,7 @@ protected SuggestModule setupModule() { @Test public void writeDuplicatesReturnErrorInResponse() throws Exception { var smallTestSeries = - createSmallSeries(0L, EntityType.KEY); + createSmallSerieses(0L, EntityType.KEY); final WriteSuggest firstWrite = backend.write(new WriteSuggest.Request(smallTestSeries.get(0).getKey(), diff --git a/metric/bigtable/build.gradle b/metric/bigtable/build.gradle index 6c91ebe25..70f39a2f7 100644 --- a/metric/bigtable/build.gradle +++ b/metric/bigtable/build.gradle @@ -12,6 +12,7 @@ dependencies { implementation 'com.google.guava:guava' implementation project(':heroic-component') implementation 'eu.toolchain.serializer:tiny-serializer-core' + implementation 'org.apache.logging.log4j:log4j-core' implementation 'com.tdunning:t-digest' diff --git a/metric/bigtable/src/main/java/com/spotify/heroic/analytics/bigtable/BigtableAnalyticsModule.java b/metric/bigtable/src/main/java/com/spotify/heroic/analytics/bigtable/BigtableAnalyticsModule.java index 310f23c27..132ad66e1 100644 --- a/metric/bigtable/src/main/java/com/spotify/heroic/analytics/bigtable/BigtableAnalyticsModule.java +++ b/metric/bigtable/src/main/java/com/spotify/heroic/analytics/bigtable/BigtableAnalyticsModule.java @@ -28,6 +28,7 @@ import com.spotify.heroic.dagger.PrimaryComponent; import com.spotify.heroic.lifecycle.LifeCycle; import com.spotify.heroic.lifecycle.LifeCycleManager; +import com.spotify.heroic.metric.MetricsConnectionSettings; import com.spotify.heroic.metric.bigtable.BigtableConnection; import com.spotify.heroic.metric.bigtable.BigtableConnectionBuilder; import com.spotify.heroic.metric.bigtable.CredentialsBuilder; @@ -100,9 +101,16 @@ public Managed connection(final AsyncFramework async) { @Override public AsyncFuture construct() { return async.call( - new BigtableConnectionBuilder(project, instance, profile, credentials, - emulatorEndpoint, async, DEFAULT_DISABLE_BULK_MUTATIONS, - DEFAULT_FLUSH_INTERVAL_SECONDS, Optional.empty())); + new BigtableConnectionBuilder( + project, + instance, + profile, + credentials, + emulatorEndpoint, + async, + DEFAULT_DISABLE_BULK_MUTATIONS, + DEFAULT_FLUSH_INTERVAL_SECONDS, + MetricsConnectionSettings.createDefault())); } @Override diff --git a/metric/bigtable/src/main/java/com/spotify/heroic/metric/bigtable/BigtableBackend.java b/metric/bigtable/src/main/java/com/spotify/heroic/metric/bigtable/BigtableBackend.java index 84d07453d..738e10862 100644 --- a/metric/bigtable/src/main/java/com/spotify/heroic/metric/bigtable/BigtableBackend.java +++ b/metric/bigtable/src/main/java/com/spotify/heroic/metric/bigtable/BigtableBackend.java @@ -48,6 +48,7 @@ import com.spotify.heroic.metric.MetricCollection; import com.spotify.heroic.metric.MetricReadResult; import com.spotify.heroic.metric.MetricType; +import com.spotify.heroic.metric.MetricsConnectionSettings; import com.spotify.heroic.metric.Point; import com.spotify.heroic.metric.QueryError; import com.spotify.heroic.metric.QueryTrace; @@ -119,7 +120,7 @@ public class BigtableBackend extends AbstractMetricBackend implements LifeCycles private final Tracer tracer = Tracing.getTracer(); private final Meter written = new Meter(); - private final int maxWriteBatchSize; + private final MetricsConnectionSettings metricsConnectionSettings; @Inject @@ -130,14 +131,14 @@ public BigtableBackend( final Groups groups, @Named("table") final String table, @Named("configure") final boolean configure, - @Named("maxWriteBatchSize") final int maxWriteBatchSize, + final MetricsConnectionSettings metricsConnectionSettings, MetricBackendReporter reporter ) { super(async); this.async = async; this.rowKeySerializer = rowKeySerializer; this.connection = connection; - this.maxWriteBatchSize = maxWriteBatchSize; + this.metricsConnectionSettings = metricsConnectionSettings; this.groups = groups; this.table = table; this.configure = configure; @@ -267,7 +268,7 @@ private List distributionPointsRanges(final FetchData.Request req public AsyncFuture fetch( final FetchData.Request request, final FetchQuotaWatcher watcher, - final Consumer consumer, + final Consumer metricsConsumer, final Span parentSpan ) { return connection.doto(c -> { @@ -280,10 +281,10 @@ public AsyncFuture fetch( switch (type) { case POINT: return fetchBatch( - watcher, type, pointsRanges(request), c, consumer, parentSpan); + watcher, type, pointsRanges(request), c, metricsConsumer, parentSpan); case DISTRIBUTION_POINTS: return fetchBatch(watcher, type, distributionPointsRanges(request), c, - consumer, parentSpan); + metricsConsumer, parentSpan); default: return async.resolved(new FetchData.Result(QueryTrace.of(FETCH), new QueryError("unsupported source: " + request.getType()))); @@ -392,7 +393,7 @@ private AsyncFuture writeBatch( builder.setCell(columnFamily, offsetBytes, valueBytes); - if (builder.size() >= maxWriteBatchSize) { + if (builder.size() >= metricsConnectionSettings.maxWriteBatchSize) { saved.add(Pair.of(rowKey, builder.build())); building.put(rowKey, Mutations.builder()); } @@ -414,8 +415,8 @@ private AsyncFuture writeBatch( if (rowKeyBytes.size() >= MAX_KEY_ROW_SIZE) { reporter.reportWritesDroppedBySize(); - log.error("Row key length greater than 4096 bytes (2): " + rowKeyBytes.size() - + " " + rowKeyBytes); + log.error("Row key length greater than 4096 bytes (2): {} {}", rowKeyBytes.size(), + rowKeyBytes); continue; } @@ -457,8 +458,8 @@ private AsyncFuture writeOne( if (rowKeyBytes.size() >= MAX_KEY_ROW_SIZE) { reporter.reportWritesDroppedBySize(); - log.error("Row key length greater than 4096 bytes (1): " + - rowKeyBytes.size() + " " + rowKey); + log.error("Row key length greater than 4096 bytes (1): {} {}", rowKeyBytes.size(), + rowKey); return async.resolved().directTransform(result -> timer.end()); } @@ -603,7 +604,7 @@ List ranges( return bases; } - ByteString serializeValue(double value) { + static ByteString serializeValue(double value) { final ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES).putLong(Double.doubleToLongBits(value)); return ByteString.copyFrom(buffer.array()); @@ -654,8 +655,11 @@ public String toString() { return "BigtableBackend(connection=" + this.connection + ")"; } - public int getMaxWriteBatchSize() { - return maxWriteBatchSize; + /** + * Do not use - public purely to enable unit testing + */ + public MetricsConnectionSettings metricsConnectionSettings() { + return metricsConnectionSettings; } private static final class PreparedQuery { diff --git a/metric/bigtable/src/main/java/com/spotify/heroic/metric/bigtable/BigtableConnectionBuilder.java b/metric/bigtable/src/main/java/com/spotify/heroic/metric/bigtable/BigtableConnectionBuilder.java index a35a96925..a63fa0d6f 100644 --- a/metric/bigtable/src/main/java/com/spotify/heroic/metric/bigtable/BigtableConnectionBuilder.java +++ b/metric/bigtable/src/main/java/com/spotify/heroic/metric/bigtable/BigtableConnectionBuilder.java @@ -23,18 +23,15 @@ import com.google.cloud.bigtable.config.BigtableOptions; import com.google.cloud.bigtable.config.BulkOptions; -import com.google.cloud.bigtable.config.CredentialOptions; +import com.google.cloud.bigtable.config.CallOptionsConfig; import com.google.cloud.bigtable.config.RetryOptions; import com.google.cloud.bigtable.grpc.BigtableSession; -import com.spotify.heroic.metric.bigtable.api.BigtableDataClient; +import com.spotify.heroic.metric.MetricsConnectionSettings; import com.spotify.heroic.metric.bigtable.api.BigtableDataClientImpl; -import com.spotify.heroic.metric.bigtable.api.BigtableMutator; import com.spotify.heroic.metric.bigtable.api.BigtableMutatorImpl; -import com.spotify.heroic.metric.bigtable.api.BigtableTableAdminClient; import com.spotify.heroic.metric.bigtable.api.BigtableTableTableAdminClientImpl; import eu.toolchain.async.AsyncFramework; import io.grpc.Status; -import java.util.Optional; import java.util.concurrent.Callable; import javax.annotation.Nullable; import org.apache.commons.lang3.builder.ToStringBuilder; @@ -57,9 +54,11 @@ public class BigtableConnectionBuilder implements Callable { private final boolean disableBulkMutations; private final int flushIntervalSeconds; - private final Optional batchSize; + private final String emulatorEndpoint; + private final MetricsConnectionSettings metricsConnectionSettings; + public BigtableConnectionBuilder( final String project, final String instance, @@ -69,7 +68,7 @@ public BigtableConnectionBuilder( final AsyncFramework async, final boolean disableBulkMutations, final int flushIntervalSeconds, - final Optional batchSize + final MetricsConnectionSettings metricsConnectionSettings ) { this.project = project; this.instance = instance; @@ -79,31 +78,41 @@ public BigtableConnectionBuilder( this.async = async; this.disableBulkMutations = disableBulkMutations; this.flushIntervalSeconds = flushIntervalSeconds; - this.batchSize = batchSize; + this.metricsConnectionSettings = metricsConnectionSettings; } @Override public BigtableConnection call() throws Exception { - final CredentialOptions credentials = this.credentials.build(); + final var credentials = this.credentials.build(); - final RetryOptions retryOptions = RetryOptions.builder() + final var retryOptions = RetryOptions.builder() .addStatusToRetryOn(Status.Code.UNKNOWN) .addStatusToRetryOn(Status.Code.UNAVAILABLE) .setAllowRetriesWithoutTimestamp(true) + .setEnableRetries(true) + .setMaxScanTimeoutRetries(metricsConnectionSettings.maxScanTimeoutRetries) + .setMaxElapsedBackoffMillis(metricsConnectionSettings.maxElapsedBackoffMs) .build(); - final BulkOptions bulkOptions = batchSize - .map(integer -> BulkOptions.builder().setBulkMaxRowKeyCount(integer).build()) - .orElseGet(() -> BulkOptions.builder().build()); + final var bulkOptions = + BulkOptions.builder() + .setBulkMaxRowKeyCount(metricsConnectionSettings.maxWriteBatchSize).build(); + + var callOptionsConfig = CallOptionsConfig.builder() + .setReadRowsRpcTimeoutMs(metricsConnectionSettings.readRowsRpcTimeoutMs) + .setMutateRpcTimeoutMs(metricsConnectionSettings.mutateRpcTimeoutMs) + .setShortRpcTimeoutMs(metricsConnectionSettings.shortRpcTimeoutMs) + .setUseTimeout(true).build(); - BigtableOptions.Builder builder = BigtableOptions.builder() + var builder = BigtableOptions.builder() .setProjectId(project) .setInstanceId(instance) .setUserAgent(USER_AGENT) .setDataChannelCount(64) .setCredentialOptions(credentials) .setRetryOptions(retryOptions) - .setBulkOptions(bulkOptions); + .setBulkOptions(bulkOptions) + .setCallOptionsConfig(callOptionsConfig); if (profile != null) { builder.setAppProfileId(profile); @@ -113,22 +122,23 @@ public BigtableConnection call() throws Exception { builder.enableEmulator(emulatorEndpoint); } - final BigtableOptions bigtableOptions = builder.build(); + final var bigtableOptions = builder.build(); log.info("Retry Options: {}", retryOptions.toString()); log.info("Bulk Options: {}", bulkOptions.toString()); log.info("Bigtable Options: {}", bigtableOptions.toString()); + log.info("Call Options: {}", callOptionsConfig.toString()); log.info("BigTable Connection Builder: \n{}", toString()); - final BigtableSession session = new BigtableSession(bigtableOptions); + final var session = new BigtableSession(bigtableOptions); - final BigtableTableAdminClient adminClient = + final var adminClient = new BigtableTableTableAdminClientImpl(session.getTableAdminClient(), project, instance); - final BigtableMutator mutator = + final var mutator = new BigtableMutatorImpl(async, session, disableBulkMutations, flushIntervalSeconds); - final BigtableDataClient client = + final var client = new BigtableDataClientImpl(async, session, mutator, project, instance); return new BigtableConnection(async, project, instance, session, mutator, adminClient, @@ -144,8 +154,8 @@ public String toString() { .append("credentials", credentials) .append("disableBulkMutations", disableBulkMutations) .append("flushIntervalSeconds", flushIntervalSeconds) - .append("batchSize", batchSize.orElse(-1)) .append("emulatorEndpoint", emulatorEndpoint) + .append("settings", metricsConnectionSettings) .toString(); } } diff --git a/metric/bigtable/src/main/java/com/spotify/heroic/metric/bigtable/BigtableMetricModule.java b/metric/bigtable/src/main/java/com/spotify/heroic/metric/bigtable/BigtableMetricModule.java index 5986661be..9ee6f8019 100644 --- a/metric/bigtable/src/main/java/com/spotify/heroic/metric/bigtable/BigtableMetricModule.java +++ b/metric/bigtable/src/main/java/com/spotify/heroic/metric/bigtable/BigtableMetricModule.java @@ -35,6 +35,7 @@ import com.spotify.heroic.lifecycle.LifeCycle; import com.spotify.heroic.lifecycle.LifeCycleManager; import com.spotify.heroic.metric.MetricModule; +import com.spotify.heroic.metric.MetricsConnectionSettings; import com.spotify.heroic.metric.bigtable.credentials.DefaultCredentialsBuilder; import dagger.Component; import dagger.Module; @@ -49,6 +50,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@SuppressWarnings({"LineLength"}) @ModuleId("bigtable") public final class BigtableMetricModule implements MetricModule, DynamicModuleId { @@ -56,10 +58,10 @@ public final class BigtableMetricModule implements MetricModule, DynamicModuleId private static final String BIGTABLE_CONFIGURE_PARAM = "bigtable.configure"; /* default number of Cells for each batch mutation */ - public static final int DEFAULT_MUTATION_BATCH_SIZE = 10_000; + public static final int DEFAULT_MUTATION_BATCH_SIZE = 1_000; /* maximum possible number of Cells for each batch mutation */ - public static final int MAX_MUTATION_BATCH_SIZE = 1_000_000; + public static final int MAX_MUTATION_BATCH_SIZE = 100_000; /* minimum possible number of Cells supported for each batch mutation */ public static final int MIN_MUTATION_BATCH_SIZE = 10; @@ -81,13 +83,14 @@ public final class BigtableMetricModule implements MetricModule, DynamicModuleId private final CredentialsBuilder credentials; private final boolean configure; private final boolean disableBulkMutations; - - /** This sets the max number of Rows in each batch that's written to the - * BigTable Client.

Note that that does not mean that the client will send - * that many in one request. That is seemingly controlled by: - * @see BigtableMetricModule#batchSize - */ private final int maxWriteBatchSize; + private final Optional mutateRpcTimeoutMs; + private final Optional readRowsRpcTimeoutMs; + private final Optional shortRpcTimeoutMs; + private final Optional maxScanTimeoutRetries; + private final Optional maxElapsedBackoffMs; + private final MetricsConnectionSettings metricsConnectionSettings; + private final int flushIntervalSeconds; /** @@ -113,6 +116,11 @@ public BigtableMetricModule( @JsonProperty("configure") Optional configure, @JsonProperty("disableBulkMutations") Optional disableBulkMutations, @JsonProperty("maxWriteBatchSize") Optional maxWriteBatchSize, + @JsonProperty("mutateRpcTimeoutMs") Optional mutateRpcTimeoutMs, + @JsonProperty("readRowsRpcTimeoutMs") Optional readRowsRpcTimeoutMs, + @JsonProperty("shortRpcTimeoutMs") Optional shortRpcTimeoutMs, + @JsonProperty("maxScanTimeoutRetries") Optional maxScanTimeoutRetries, + @JsonProperty("maxElapsedBackoffMs") Optional maxElapsedBackoffMs, @JsonProperty("flushIntervalSeconds") Optional flushIntervalSeconds, @JsonProperty("batchSize") Optional batchSize, @JsonProperty("emulatorEndpoint") Optional emulatorEndpoint @@ -127,35 +135,43 @@ public BigtableMetricModule( this.configure = configure.orElse(DEFAULT_CONFIGURE); this.disableBulkMutations = disableBulkMutations.orElse(DEFAULT_DISABLE_BULK_MUTATIONS); + // This batch of fields ends up in a BigtableMetricsConnectionSettings object + this.mutateRpcTimeoutMs = mutateRpcTimeoutMs; + this.readRowsRpcTimeoutMs = readRowsRpcTimeoutMs; + this.shortRpcTimeoutMs = shortRpcTimeoutMs; + this.maxScanTimeoutRetries = maxScanTimeoutRetries; + this.maxElapsedBackoffMs = maxElapsedBackoffMs; + // Basically make sure that maxWriteBatchSize, if set, is sane - int maxWriteBatch = maxWriteBatchSize.orElse(DEFAULT_MUTATION_BATCH_SIZE); - maxWriteBatch = Math.max(MIN_MUTATION_BATCH_SIZE, maxWriteBatch); - maxWriteBatch = Math.min(MAX_MUTATION_BATCH_SIZE, maxWriteBatch); + int maxWriteBatchBounded = maxWriteBatchSize.orElse(DEFAULT_MUTATION_BATCH_SIZE); + maxWriteBatchBounded = Math.max(MIN_MUTATION_BATCH_SIZE, maxWriteBatchBounded); + maxWriteBatchBounded = Math.min(MAX_MUTATION_BATCH_SIZE, maxWriteBatchBounded); + + this.maxWriteBatchSize = maxWriteBatchBounded; - this.maxWriteBatchSize = maxWriteBatch; this.flushIntervalSeconds = flushIntervalSeconds.orElse(DEFAULT_FLUSH_INTERVAL_SECONDS); this.batchSize = batchSize; this.emulatorEndpoint = emulatorEndpoint.orElse(null); + this.metricsConnectionSettings = new MetricsConnectionSettings(maxWriteBatchSize, + mutateRpcTimeoutMs, readRowsRpcTimeoutMs, shortRpcTimeoutMs, + maxScanTimeoutRetries, maxElapsedBackoffMs); + log.info("BigTable Metric Module: \n{}", toString()); } @Override public Exposed module(PrimaryComponent primary, Depends backend, String id) { return DaggerBigtableMetricModule_C - .builder() - .primaryComponent(primary) - .depends(backend) - .m(new M()) - .build(); - } - - public int getMaxWriteBatchSize() { - return maxWriteBatchSize; + .builder() + .primaryComponent(primary) + .depends(backend) + .m(new M()) + .build(); } @BigtableScope - @Component(modules = M.class, dependencies = {PrimaryComponent.class, Depends.class}) + @Component(modules = {M.class}, dependencies = {PrimaryComponent.class, Depends.class}) interface C extends Exposed { @Override BigtableBackend backend(); @@ -173,9 +189,11 @@ public Managed connection(final AsyncFramework async) { @Override public AsyncFuture construct() { return async.call( - new BigtableConnectionBuilder( - project, instance, profile, credentials, emulatorEndpoint, - async, disableBulkMutations, flushIntervalSeconds, batchSize)); + new BigtableConnectionBuilder( + project, instance, profile, credentials, emulatorEndpoint, + async, disableBulkMutations, flushIntervalSeconds, + metricsConnectionSettings + )); } @Override @@ -195,13 +213,6 @@ public String table() { return table; } - @Provides - @BigtableScope - @Named("maxWriteBatchSize") - public Integer maxWriteBatchSize() { - return maxWriteBatchSize; - } - @Provides @BigtableScope @Named("configure") @@ -216,6 +227,12 @@ public RowKeySerializer rowKeySerializer() { return new MetricsRowKeySerializer(); } + @Provides + @BigtableScope + public MetricsConnectionSettings metricsConnectionSettings() { + return metricsConnectionSettings; + } + @Provides @BigtableScope public Groups groups() { @@ -249,6 +266,11 @@ public static class Builder { private Optional configure = empty(); private Optional disableBulkMutations = empty(); private Optional maxWriteBatchSize = empty(); + private Optional mutateRpcTimeoutMs = empty(); + private Optional readRowsRpcTimeoutMs = empty(); + private Optional shortRpcTimeoutMs = empty(); + private Optional maxScanTimeoutRetries = empty(); + private Optional maxElapsedBackoffMs = empty(); private Optional flushIntervalSeconds = empty(); private Optional batchSize = empty(); private Optional emulatorEndpoint = empty(); @@ -303,6 +325,31 @@ public Builder maxWriteBatchSize(int maxWriteBatchSize) { return this; } + public Builder mutateRpcTimeoutMs(int mutateRpcTimeoutMs) { + this.mutateRpcTimeoutMs = of(mutateRpcTimeoutMs); + return this; + } + + public Builder readRowsRpcTimeoutMs(int readRowsRpcTimeoutMs) { + this.readRowsRpcTimeoutMs = of(readRowsRpcTimeoutMs); + return this; + } + + public Builder shortRpcTimeoutMs(int shortRpcTimeoutMs) { + this.shortRpcTimeoutMs = of(shortRpcTimeoutMs); + return this; + } + + public Builder maxScanTimeoutRetries(int maxScanTimeoutRetries) { + this.maxScanTimeoutRetries = of(maxScanTimeoutRetries); + return this; + } + + public Builder maxElapsedBackoffMs(int maxElapsedBackoffMs) { + this.maxElapsedBackoffMs = of(maxElapsedBackoffMs); + return this; + } + public Builder batchSize(int batchSize) { this.batchSize = of(batchSize); return this; @@ -330,6 +377,11 @@ public BigtableMetricModule build() { configure, disableBulkMutations, maxWriteBatchSize, + mutateRpcTimeoutMs, + readRowsRpcTimeoutMs, + shortRpcTimeoutMs, + maxScanTimeoutRetries, + maxElapsedBackoffMs, flushIntervalSeconds, batchSize, emulatorEndpoint); @@ -348,10 +400,10 @@ public String toString() { .append("credentials", credentials) .append("configure", configure) .append("disableBulkMutations", disableBulkMutations) - .append("maxWriteBatchSize", maxWriteBatchSize) .append("flushIntervalSeconds", flushIntervalSeconds) .append("batchSize", batchSize) .append("emulatorEndpoint", emulatorEndpoint) + .append("connectionSettings", metricsConnectionSettings) .toString(); } } diff --git a/metric/bigtable/src/test/java/com/spotify/heroic/metric/bigtable/BigtableBackendIT.java b/metric/bigtable/src/test/java/com/spotify/heroic/metric/bigtable/BigtableBackendIT.java index 654768126..af038083b 100644 --- a/metric/bigtable/src/test/java/com/spotify/heroic/metric/bigtable/BigtableBackendIT.java +++ b/metric/bigtable/src/test/java/com/spotify/heroic/metric/bigtable/BigtableBackendIT.java @@ -1,25 +1,100 @@ package com.spotify.heroic.metric.bigtable; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.spotify.heroic.HeroicCoreInstance; +import com.spotify.heroic.QueryOptions; +import com.spotify.heroic.common.DateRange; +import com.spotify.heroic.metric.FetchData; import com.spotify.heroic.metric.MetricModule; +import com.spotify.heroic.metric.MetricType; +import com.spotify.heroic.metric.WriteMetric; import com.spotify.heroic.test.AbstractMetricBackendIT; +import com.spotify.heroic.test.Points; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.ExecutionException; import org.junit.ClassRule; +import org.junit.Test; import org.testcontainers.containers.GenericContainer; public class BigtableBackendIT extends AbstractMetricBackendIT { - // TODO reverse emulator image to bigtruedata/gcloud-bigtable-emulator when they fix it + public static final int VERY_SHORT_WAIT_TIME_MS = 10; + // TODO revert emulator image to bigtruedata/gcloud-bigtable-emulator when they fix it @ClassRule - public static GenericContainer container = + public static GenericContainer emulatorContainer = new GenericContainer("malish8632/bigtable-emulator:latest") .withExposedPorts(8086) .withCommand("start", "--host-port", "0.0.0.0:8086"); + /** + * This is an image of adamsteele@google.com's personal fork of the Bigtable + * emulator that respects request timeout configuration limits. When it is + * merged, we can get rid of this and just use malish8632/bigtable-emulator:latest + */ + @ClassRule + public static GenericContainer slowEmulatorContainer = + new GenericContainer("us.gcr.io/bigtable-playground-272916/btemu:latest") + .withExposedPorts(9000) + .withCommand("--host=0.0.0.0 --port=9000 --inject-latency=ReadRows:p00:5m"); + @Override protected Optional period() { return Optional.of(BigtableBackend.PERIOD); } + /** + * This test is marginally useful because it is testing the Bigtable emulator + * and not the real thing. It also doesn't test that the user experiences timeouts + * correctly. + * + * It instead just tests that we will get the exceptions that our metrics code + * expects (see SemanticMetricBackendReporter for timeout counting methods) + * and more importantly that the Bigtable client doesn't change what it throws + * upon a timeout. + */ + @Test + public void testBackendTimesOutCorrectly() throws Exception { + + // We write out our data outside the try..catch because writes aren't the + // system under test. + var numPoints = 100; + var startTimestamp = 100000L; + var mc = new Points().p(startTimestamp, MEANING_OF_LIFE_HHGTTG, numPoints).build(); + backend.write(new WriteMetric.Request(s3, mc)).get(); + + // Now we basically emulate AbstractMetricBackendIT.setupBackend except we create + // a special core with very short Bigtable read timeouts, which will be hit when + // we do our large fetch operation + final HeroicCoreInstance core = + getHeroicCoreInstance(BackendModuleMode.VERY_SHORT_TIMEOUTS); + + // This is what gives us the special VERY_SHORT_TIMEOUTS behaviour. + backend = createBackend(core); + + try { + var request = + new FetchData.Request(MetricType.POINT, s3, + new DateRange(0, startTimestamp + numPoints), + QueryOptions.builder().build()); + fetchMetrics(request, true); + } catch (ExecutionException tertiaryCause) { + var secondaryCause = tertiaryCause.getCause(); + assertEquals(secondaryCause.getClass(), + com.google.cloud.bigtable.grpc.scanner.BigtableRetriesExhaustedException.class); + assertTrue(secondaryCause.getMessage().startsWith("Exhausted retries after ")); + + var primaryCause = secondaryCause.getCause(); + assertEquals(primaryCause.getClass(), io.grpc.StatusRuntimeException.class); + + assertTrue("Actual primary cause: " + primaryCause.getMessage(), + primaryCause.getMessage().contains("DEADLINE_EXCEEDED")); + } finally { + core.shutdown().get(); + } + } + @Override protected void setupSupport() { super.setupSupport(); @@ -30,17 +105,30 @@ protected void setupSupport() { } @Override - public MetricModule setupModule() { + public MetricModule setupModule(BackendModuleMode mode) { String table = "heroic_it_" + UUID.randomUUID(); - String endpoint = container.getContainerIpAddress() + ":" + container.getFirstMappedPort(); - - return BigtableMetricModule - .builder() - .configure(true) - .project("fake") - .instance("fake") - .table(table) - .emulatorEndpoint(endpoint) - .build(); + + var container = mode == BackendModuleMode.NORMAL + ? emulatorContainer + : slowEmulatorContainer; + + String endpoint = + container.getContainerIpAddress() + ":" + container.getFirstMappedPort(); + + var moduleBuilder = BigtableMetricModule + .builder() + .configure(true) + .project("fake") + .instance("fake") + .table(table) + .emulatorEndpoint(endpoint); + + if (mode == BackendModuleMode.VERY_SHORT_TIMEOUTS) { + moduleBuilder.maxElapsedBackoffMs(VERY_SHORT_WAIT_TIME_MS) + .readRowsRpcTimeoutMs(VERY_SHORT_WAIT_TIME_MS) + .shortRpcTimeoutMs(VERY_SHORT_WAIT_TIME_MS); + } + + return moduleBuilder.build(); } } diff --git a/metric/datastax/src/test/java/com/spotify/heroic/metric/datastax/AbstractDatastaxBackendIT.java b/metric/datastax/src/test/java/com/spotify/heroic/metric/datastax/AbstractDatastaxBackendIT.java index b0bfbb658..b5a363c23 100644 --- a/metric/datastax/src/test/java/com/spotify/heroic/metric/datastax/AbstractDatastaxBackendIT.java +++ b/metric/datastax/src/test/java/com/spotify/heroic/metric/datastax/AbstractDatastaxBackendIT.java @@ -13,7 +13,7 @@ public abstract class AbstractDatastaxBackendIT extends AbstractMetricBackendIT public static CassandraContainer container = new CassandraContainer(); @Override - protected MetricModule setupModule() { + protected MetricModule setupModule(BackendModuleMode mode) { final String keyspace = "heroic_it_" + UUID.randomUUID().toString().replace('-', '_'); final String seed = container.getContainerIpAddress() + ":" + container.getFirstMappedPort(); diff --git a/metric/memory/build.gradle b/metric/memory/build.gradle index 4eb50d0af..67f685b3c 100644 --- a/metric/memory/build.gradle +++ b/metric/memory/build.gradle @@ -4,6 +4,7 @@ dependencies { implementation project(':heroic-component') testImplementation project(':heroic-test') kapt 'com.google.dagger:dagger-compiler' + implementation 'org.apache.logging.log4j:log4j-core' } group = 'com.spotify.heroic.metric' diff --git a/metric/memory/src/test/java/com/spotify/heroic/metric/memory/MemoryBackendIT.java b/metric/memory/src/test/java/com/spotify/heroic/metric/memory/MemoryBackendIT.java index 691453509..0e0bfe42f 100644 --- a/metric/memory/src/test/java/com/spotify/heroic/metric/memory/MemoryBackendIT.java +++ b/metric/memory/src/test/java/com/spotify/heroic/metric/memory/MemoryBackendIT.java @@ -3,8 +3,6 @@ import com.spotify.heroic.metric.MetricModule; import com.spotify.heroic.test.AbstractMetricBackendIT; -import java.util.Optional; - public class MemoryBackendIT extends AbstractMetricBackendIT { @Override protected void setupSupport() { @@ -15,7 +13,7 @@ protected void setupSupport() { } @Override - protected MetricModule setupModule() { + protected MetricModule setupModule(BackendModuleMode mode) { return MemoryMetricModule.builder().build(); } } diff --git a/rpc/grpc/src/main/java/com/spotify/heroic/rpc/grpc/GrpcRpcProtocol.java b/rpc/grpc/src/main/java/com/spotify/heroic/rpc/grpc/GrpcRpcProtocol.java index c6472ba71..18ef7d333 100644 --- a/rpc/grpc/src/main/java/com/spotify/heroic/rpc/grpc/GrpcRpcProtocol.java +++ b/rpc/grpc/src/main/java/com/spotify/heroic/rpc/grpc/GrpcRpcProtocol.java @@ -52,6 +52,7 @@ import io.grpc.MethodDescriptor; import io.grpc.netty.NettyChannelBuilder; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; import io.opencensus.common.Scope; import io.opencensus.trace.Span; import io.opencensus.trace.Tracer; @@ -115,6 +116,7 @@ public AsyncFuture construct() throws Exception { .usePlaintext() .executor(workerGroup) .eventLoopGroup(workerGroup) + .channelType(NioSocketChannel.class) .maxInboundMessageSize(maxFrameSize) .build(); diff --git a/rpc/grpc/src/main/java/com/spotify/heroic/rpc/grpc/GrpcRpcProtocolServer.java b/rpc/grpc/src/main/java/com/spotify/heroic/rpc/grpc/GrpcRpcProtocolServer.java index 49fb192c8..e5b5300f2 100644 --- a/rpc/grpc/src/main/java/com/spotify/heroic/rpc/grpc/GrpcRpcProtocolServer.java +++ b/rpc/grpc/src/main/java/com/spotify/heroic/rpc/grpc/GrpcRpcProtocolServer.java @@ -48,6 +48,7 @@ import io.grpc.netty.NettyServerBuilder; import io.netty.channel.Channel; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; import java.io.IOException; import java.lang.reflect.Field; import java.net.InetSocketAddress; @@ -175,6 +176,7 @@ private AsyncFuture start() throws IOException { .maxInboundMessageSize(maxFrameSize) .bossEventLoopGroup(bossGroup) .workerEventLoopGroup(workerGroup) + .channelType(NioServerSocketChannel.class) .build(); return async.call(() -> { @@ -182,33 +184,24 @@ private AsyncFuture start() throws IOException { this.server.set(server); return null; }).directTransform(v -> { - final InetSocketAddress localAddress = extractInetSocketAddress(server); - bindFuture.resolve(localAddress); + bindFuture.resolve(extractInetSocketAddress(server)); return null; }); } - /** - * Extract the local address from the current server. - *

- * Because no api is available to accomplish this, it currently uses a very ugly reflexive - * approach. - * - * @param server Server to extract local address from. - * @return an InetSocketAddress - * @throws Exception if something goes wrong (which it should). - */ - private InetSocketAddress extractInetSocketAddress(final Server server) throws Exception { - final ServerImpl impl = (ServerImpl) server; - - final Field transportServerField = ServerImpl.class.getDeclaredField("transportServer"); + private static InetSocketAddress extractInetSocketAddress(Server server) throws Exception { + ServerImpl impl = (ServerImpl) server; + + Field transportServerField = ServerImpl.class.getDeclaredField("transportServers"); transportServerField.setAccessible(true); - final Object transportServer = transportServerField.get(impl); - final Field channelField = transportServer.getClass().getDeclaredField("channel"); + var transportServerArr = (ArrayList) transportServerField.get(impl); + Object transportServer = transportServerArr.get(0); + + Field channelField = transportServer.getClass().getDeclaredField("channel"); channelField.setAccessible(true); - final Channel channel = (Channel) channelField.get(transportServer); + Channel channel = (Channel) channelField.get(transportServer); return (InetSocketAddress) channel.localAddress(); } diff --git a/suggest/elasticsearch/src/main/java/com/spotify/heroic/suggest/elasticsearch/ElasticsearchSuggestModule.java b/suggest/elasticsearch/src/main/java/com/spotify/heroic/suggest/elasticsearch/ElasticsearchSuggestModule.java index 022c68608..286674079 100644 --- a/suggest/elasticsearch/src/main/java/com/spotify/heroic/suggest/elasticsearch/ElasticsearchSuggestModule.java +++ b/suggest/elasticsearch/src/main/java/com/spotify/heroic/suggest/elasticsearch/ElasticsearchSuggestModule.java @@ -143,13 +143,14 @@ public ElasticsearchSuggestModule( this.distributedCacheSrvRecord = distributedCacheSrvRecord.orElse(""); this.templateName = templateName.orElse(DEFAULT_TEMPLATE_NAME); - this.type = backendType.map(this::lookupBackendType).orElse(defaultSetup); + this.type = backendType + .map(ElasticsearchSuggestModule::lookupBackendType).orElse(defaultSetup); this.configure = configure.orElse(DEFAULT_CONFIGURE); this.numSuggestionsLimit = NumSuggestionsLimit.of(numSuggestionsLimit); } - private Supplier lookupBackendType(final String bt) { + private static Supplier lookupBackendType(final String bt) { final Supplier type = backendTypes.get(bt); if (type == null) { diff --git a/suggest/elasticsearch/src/main/java/com/spotify/heroic/suggest/elasticsearch/SuggestBackendKV.java b/suggest/elasticsearch/src/main/java/com/spotify/heroic/suggest/elasticsearch/SuggestBackendKV.java index b56e75f1c..a1d9d1d0a 100644 --- a/suggest/elasticsearch/src/main/java/com/spotify/heroic/suggest/elasticsearch/SuggestBackendKV.java +++ b/suggest/elasticsearch/src/main/java/com/spotify/heroic/suggest/elasticsearch/SuggestBackendKV.java @@ -242,7 +242,7 @@ public AsyncFuture tagValuesSuggest(final TagValuesSuggest.Req c.execute(searchRequest, bind(future)); return future.directTransform( - (SearchResponse response) -> { + response -> { return createTagValuesSuggest(numSuggestionsLimit, groupLimit, response); }); diff --git a/system-tests/test_heroic.py b/system-tests/test_heroic.py index 4dff82d38..ac7394da0 100644 --- a/system-tests/test_heroic.py +++ b/system-tests/test_heroic.py @@ -22,13 +22,14 @@ def request(self, method, url, *args, **kwargs): @pytest.fixture(scope='session') def api_session(session_scoped_container_getter): - """Wait for the Heroic container to become available and return a requests session.""" + """Wait for the Heroic container to become available and return a requests + session.""" container = session_scoped_container_getter.get('heroic') assert container.is_running # Wait for Heroic to startup within 2 minutes - timeout = 120 + timeout = 150 max_time = time.time() + 120 while time.time() < max_time: if 'Startup finished, hello' in container.logs().decode(): @@ -36,7 +37,8 @@ def api_session(session_scoped_container_getter): else: time.sleep(2) else: - raise TimeoutError('Heroic did not start within {} seconds'.format(timeout)) + raise TimeoutError('Heroic did not start within {} seconds' + .format(timeout)) api_url = 'http://127.0.0.1:{}/'.format(container.network_info[0].host_port) request_session = HeroicSession(api_url)