diff --git a/docs/changelog/104830.yaml b/docs/changelog/104830.yaml new file mode 100644 index 0000000000000..c056f3d618b75 --- /dev/null +++ b/docs/changelog/104830.yaml @@ -0,0 +1,5 @@ +pr: 104830 +summary: All new `shard_seed` parameter for `random_sampler` agg +area: Aggregations +type: enhancement +issues: [] diff --git a/docs/reference/modules/cluster/remote-clusters-api-key.asciidoc b/docs/reference/modules/cluster/remote-clusters-api-key.asciidoc index b95ebdf143a57..0cac52deaae4b 100644 --- a/docs/reference/modules/cluster/remote-clusters-api-key.asciidoc +++ b/docs/reference/modules/cluster/remote-clusters-api-key.asciidoc @@ -62,6 +62,9 @@ information, refer to https://www.elastic.co/subscriptions. [[remote-clusters-security-api-key]] ==== Establish trust with a remote cluster +NOTE: If a remote cluster is part of an {ess} deployment, it has a valid certificate by default. +You can therefore skip steps related to certificates in these instructions. + ===== On the remote cluster // tag::remote-cluster-steps[] diff --git a/docs/reference/watcher/actions/email.asciidoc b/docs/reference/watcher/actions/email.asciidoc index 71fdd95148d24..16b9cc4be0628 100644 --- a/docs/reference/watcher/actions/email.asciidoc +++ b/docs/reference/watcher/actions/email.asciidoc @@ -149,8 +149,10 @@ killed by firewalls or load balancers in-between. means, by default watcher tries to download a dashboard for 10 minutes, forty times fifteen seconds). The setting `xpack.notification.reporting.interval` can be configured globally to change the default. -| `request.auth` | Additional auth configuration for the request -| `request.proxy` | Additional proxy configuration for the request +| `auth` | Additional auth configuration for the request, see + {kibana-ref}/automating-report-generation.html#use-watcher[use watcher] for details +| `proxy` | Additional proxy configuration for the request. See <> + on how to configure the values. |====== diff --git a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java index 8802ffd41571d..6e70e9263df47 100644 --- a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java +++ b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java @@ -11,11 +11,13 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.Maps; +import org.elasticsearch.core.Releasables; import org.elasticsearch.search.aggregations.AggregationReduceContext; import org.elasticsearch.search.aggregations.AggregatorReducer; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; +import org.elasticsearch.search.aggregations.bucket.MultiBucketAggregatorsReducer; import org.elasticsearch.search.aggregations.support.SamplingContext; import org.elasticsearch.xcontent.XContentBuilder; @@ -177,30 +179,38 @@ public InternalBucket getBucketByKey(String key) { @Override protected AggregatorReducer getLeaderReducer(AggregationReduceContext reduceContext, int size) { - Map> bucketsMap = new HashMap<>(); return new AggregatorReducer() { + final Map bucketsReducer = new HashMap<>(getBuckets().size()); + @Override public void accept(InternalAggregation aggregation) { - InternalAdjacencyMatrix filters = (InternalAdjacencyMatrix) aggregation; + final InternalAdjacencyMatrix filters = (InternalAdjacencyMatrix) aggregation; for (InternalBucket bucket : filters.buckets) { - List sameRangeList = bucketsMap.computeIfAbsent(bucket.key, k -> new ArrayList<>(size)); - sameRangeList.add(bucket); + MultiBucketAggregatorsReducer reducer = bucketsReducer.computeIfAbsent( + bucket.key, + k -> new MultiBucketAggregatorsReducer(reduceContext, size) + ); + reducer.accept(bucket); } } @Override public InternalAggregation get() { - List reducedBuckets = new ArrayList<>(bucketsMap.size()); - for (List sameRangeList : bucketsMap.values()) { - InternalBucket reducedBucket = reduceBucket(sameRangeList, reduceContext); - if (reducedBucket.docCount >= 1) { - reducedBuckets.add(reducedBucket); + List reducedBuckets = new ArrayList<>(bucketsReducer.size()); + for (Map.Entry entry : bucketsReducer.entrySet()) { + if (entry.getValue().getDocCount() >= 1) { + reducedBuckets.add(new InternalBucket(entry.getKey(), entry.getValue().getDocCount(), entry.getValue().get())); } } reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size()); reducedBuckets.sort(Comparator.comparing(InternalBucket::getKey)); return new InternalAdjacencyMatrix(name, reducedBuckets, getMetadata()); } + + @Override + public void close() { + Releasables.close(bucketsReducer.values()); + } }; } @@ -209,21 +219,6 @@ public InternalAggregation finalizeSampling(SamplingContext samplingContext) { return new InternalAdjacencyMatrix(name, buckets.stream().map(b -> b.finalizeSampling(samplingContext)).toList(), getMetadata()); } - private InternalBucket reduceBucket(List buckets, AggregationReduceContext context) { - assert buckets.isEmpty() == false; - InternalBucket reduced = null; - for (InternalBucket bucket : buckets) { - if (reduced == null) { - reduced = new InternalBucket(bucket.key, bucket.docCount, bucket.aggregations); - } else { - reduced.docCount += bucket.docCount; - } - } - final List aggregations = new BucketAggregationList<>(buckets); - reduced.aggregations = InternalAggregations.reduce(aggregations, context); - return reduced; - } - @Override public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { builder.startArray(CommonFields.BUCKETS.getPreferredName()); diff --git a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java index f58611cb0567a..388474acc75ea 100644 --- a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java +++ b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java @@ -21,7 +21,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.indices.recovery.RecoverySettings; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository; import org.elasticsearch.xcontent.NamedXContentRegistry; @@ -108,8 +107,7 @@ public AzureRepository( bigArrays, recoverySettings, buildBasePath(metadata), - buildLocation(metadata), - RepositoriesMetrics.NOOP + buildLocation(metadata) ); this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings()); this.storageService = storageService; diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java index 94d0abe17909f..e2338371cf837 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java @@ -19,7 +19,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.indices.recovery.RecoverySettings; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository; import org.elasticsearch.xcontent.NamedXContentRegistry; @@ -77,8 +76,7 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository { bigArrays, recoverySettings, buildBasePath(metadata), - buildLocation(metadata), - RepositoriesMetrics.NOOP + buildLocation(metadata) ); this.storageService = storageService; diff --git a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index 248ccc119794e..4080a47c7dabe 100644 --- a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -39,7 +39,6 @@ import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; @@ -460,9 +459,9 @@ protected S3Repository createRepository( ClusterService clusterService, BigArrays bigArrays, RecoverySettings recoverySettings, - RepositoriesMetrics repositoriesMetrics + S3RepositoriesMetrics s3RepositoriesMetrics ) { - return new S3Repository(metadata, registry, getService(), clusterService, bigArrays, recoverySettings, repositoriesMetrics) { + return new S3Repository(metadata, registry, getService(), clusterService, bigArrays, recoverySettings, s3RepositoriesMetrics) { @Override public BlobStore blobStore() { diff --git a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java index f182b54b0c696..b8fea485c6276 100644 --- a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java +++ b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java @@ -30,7 +30,6 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.fixtures.minio.MinioTestContainer; @@ -145,7 +144,7 @@ public long absoluteTimeInMillis() { ClusterServiceUtils.createClusterService(threadpool), BigArrays.NON_RECYCLING_INSTANCE, new RecoverySettings(node().settings(), node().injector().getInstance(ClusterService.class).getClusterSettings()), - RepositoriesMetrics.NOOP + S3RepositoriesMetrics.NOOP ) ) { repository.start(); diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java index 78b1e2dba98b3..6b9937b01a433 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java @@ -33,7 +33,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; @@ -84,7 +83,7 @@ class S3BlobStore implements BlobStore { private final ThreadPool threadPool; private final Executor snapshotExecutor; - private final RepositoriesMetrics repositoriesMetrics; + private final S3RepositoriesMetrics s3RepositoriesMetrics; private final StatsCollectors statsCollectors = new StatsCollectors(); @@ -98,7 +97,7 @@ class S3BlobStore implements BlobStore { RepositoryMetadata repositoryMetadata, BigArrays bigArrays, ThreadPool threadPool, - RepositoriesMetrics repositoriesMetrics + S3RepositoriesMetrics s3RepositoriesMetrics ) { this.service = service; this.bigArrays = bigArrays; @@ -110,7 +109,7 @@ class S3BlobStore implements BlobStore { this.repositoryMetadata = repositoryMetadata; this.threadPool = threadPool; this.snapshotExecutor = threadPool.executor(ThreadPool.Names.SNAPSHOT); - this.repositoriesMetrics = repositoriesMetrics; + this.s3RepositoriesMetrics = s3RepositoriesMetrics; } RequestMetricCollector getMetricCollector(Operation operation, OperationPurpose purpose) { @@ -174,19 +173,19 @@ public final void collectMetrics(Request request, Response response) { .map(List::size) .orElse(0); - repositoriesMetrics.operationCounter().incrementBy(1, attributes); + s3RepositoriesMetrics.common().operationCounter().incrementBy(1, attributes); if (numberOfAwsErrors == requestCount) { - repositoriesMetrics.unsuccessfulOperationCounter().incrementBy(1, attributes); + s3RepositoriesMetrics.common().unsuccessfulOperationCounter().incrementBy(1, attributes); } - repositoriesMetrics.requestCounter().incrementBy(requestCount, attributes); + s3RepositoriesMetrics.common().requestCounter().incrementBy(requestCount, attributes); if (exceptionCount > 0) { - repositoriesMetrics.exceptionCounter().incrementBy(exceptionCount, attributes); - repositoriesMetrics.exceptionHistogram().record(exceptionCount, attributes); + s3RepositoriesMetrics.common().exceptionCounter().incrementBy(exceptionCount, attributes); + s3RepositoriesMetrics.common().exceptionHistogram().record(exceptionCount, attributes); } if (throttleCount > 0) { - repositoriesMetrics.throttleCounter().incrementBy(throttleCount, attributes); - repositoriesMetrics.throttleHistogram().record(throttleCount, attributes); + s3RepositoriesMetrics.common().throttleCounter().incrementBy(throttleCount, attributes); + s3RepositoriesMetrics.common().throttleHistogram().record(throttleCount, attributes); } maybeRecordHttpRequestTime(request); } @@ -207,7 +206,7 @@ private void maybeRecordHttpRequestTime(Request request) { if (totalTimeInMicros == 0) { logger.warn("Expected HttpRequestTime to be tracked for request [{}] but found no count.", request); } else { - repositoriesMetrics.httpRequestTimeInMicroHistogram().record(totalTimeInMicros, attributes); + s3RepositoriesMetrics.common().httpRequestTimeInMicroHistogram().record(totalTimeInMicros, attributes); } } @@ -293,6 +292,14 @@ public long bufferSizeInBytes() { return bufferSize.getBytes(); } + public RepositoryMetadata getRepositoryMetadata() { + return repositoryMetadata; + } + + public S3RepositoriesMetrics getS3RepositoriesMetrics() { + return s3RepositoriesMetrics; + } + @Override public BlobContainer blobContainer(BlobPath path) { return new S3BlobContainer(path, this); diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoriesMetrics.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoriesMetrics.java new file mode 100644 index 0000000000000..e025214998d5b --- /dev/null +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoriesMetrics.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.repositories.s3; + +import org.elasticsearch.repositories.RepositoriesMetrics; +import org.elasticsearch.telemetry.metric.LongCounter; +import org.elasticsearch.telemetry.metric.LongHistogram; + +public record S3RepositoriesMetrics( + RepositoriesMetrics common, + LongCounter retryStartedCounter, + LongCounter retryCompletedCounter, + LongHistogram retryHistogram +) { + + public static S3RepositoriesMetrics NOOP = new S3RepositoriesMetrics(RepositoriesMetrics.NOOP); + + public static final String METRIC_RETRY_EVENT_TOTAL = "es.repositories.s3.input_stream.retry.event.total"; + public static final String METRIC_RETRY_SUCCESS_TOTAL = "es.repositories.s3.input_stream.retry.success.total"; + public static final String METRIC_RETRY_ATTEMPTS_HISTOGRAM = "es.repositories.s3.input_stream.retry.attempts.histogram"; + + public S3RepositoriesMetrics(RepositoriesMetrics common) { + this( + common, + common.meterRegistry().registerLongCounter(METRIC_RETRY_EVENT_TOTAL, "s3 input stream retry event count", "unit"), + common.meterRegistry().registerLongCounter(METRIC_RETRY_SUCCESS_TOTAL, "s3 input stream retry success count", "unit"), + common.meterRegistry() + .registerLongHistogram(METRIC_RETRY_ATTEMPTS_HISTOGRAM, "s3 input stream retry attempts histogram", "unit") + ); + } +} diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index 624867a2f0c41..26b1b1158dea0 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -31,7 +31,6 @@ import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.repositories.FinalizeSnapshotContext; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository; @@ -195,6 +194,8 @@ class S3Repository extends MeteredBlobStoreRepository { private final Executor snapshotExecutor; + private final S3RepositoriesMetrics s3RepositoriesMetrics; + /** * Constructs an s3 backed repository */ @@ -205,7 +206,7 @@ class S3Repository extends MeteredBlobStoreRepository { final ClusterService clusterService, final BigArrays bigArrays, final RecoverySettings recoverySettings, - final RepositoriesMetrics repositoriesMetrics + final S3RepositoriesMetrics s3RepositoriesMetrics ) { super( metadata, @@ -214,10 +215,10 @@ class S3Repository extends MeteredBlobStoreRepository { bigArrays, recoverySettings, buildBasePath(metadata), - buildLocation(metadata), - repositoriesMetrics + buildLocation(metadata) ); this.service = service; + this.s3RepositoriesMetrics = s3RepositoriesMetrics; this.snapshotExecutor = threadPool().executor(ThreadPool.Names.SNAPSHOT); // Parse and validate the user's S3 Storage Class setting @@ -408,7 +409,7 @@ protected S3BlobStore createBlobStore() { metadata, bigArrays, threadPool, - repositoriesMetrics + s3RepositoriesMetrics ); } diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java index 83668cc271922..26047c3b416a7 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java @@ -78,9 +78,9 @@ protected S3Repository createRepository( final ClusterService clusterService, final BigArrays bigArrays, final RecoverySettings recoverySettings, - final RepositoriesMetrics repositoriesMetrics + final S3RepositoriesMetrics s3RepositoriesMetrics ) { - return new S3Repository(metadata, registry, service.get(), clusterService, bigArrays, recoverySettings, repositoriesMetrics); + return new S3Repository(metadata, registry, service.get(), clusterService, bigArrays, recoverySettings, s3RepositoriesMetrics); } @Override @@ -101,11 +101,12 @@ public Map getRepositories( final ClusterService clusterService, final BigArrays bigArrays, final RecoverySettings recoverySettings, - RepositoriesMetrics repositoriesMetrics + final RepositoriesMetrics repositoriesMetrics ) { + final S3RepositoriesMetrics s3RepositoriesMetrics = new S3RepositoriesMetrics(repositoriesMetrics); return Collections.singletonMap( S3Repository.TYPE, - metadata -> createRepository(metadata, registry, clusterService, bigArrays, recoverySettings, repositoriesMetrics) + metadata -> createRepository(metadata, registry, clusterService, bigArrays, recoverySettings, s3RepositoriesMetrics) ); } diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java index c457b9d51e8b9..f7a99a399f59f 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java @@ -27,6 +27,7 @@ import java.nio.file.NoSuchFileException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import static org.elasticsearch.core.Strings.format; import static org.elasticsearch.repositories.s3.S3BlobStore.configureRequestForMetrics; @@ -80,7 +81,7 @@ class S3RetryingInputStream extends InputStream { this.end = end; final int initialAttempt = attempt; openStreamWithRetry(); - maybeLogForSuccessAfterRetries(initialAttempt, "opened"); + maybeLogAndRecordMetricsForSuccess(initialAttempt, "open"); } private void openStreamWithRetry() throws IOException { @@ -105,6 +106,9 @@ private void openStreamWithRetry() throws IOException { ); } + if (attempt == 1) { + blobStore.getS3RepositoriesMetrics().retryStartedCounter().incrementBy(1, metricAttributes("open")); + } final long delayInMillis = maybeLogAndComputeRetryDelay("opening", e); delayBeforeRetry(delayInMillis); } @@ -142,9 +146,12 @@ public int read() throws IOException { } else { currentOffset += 1; } - maybeLogForSuccessAfterRetries(initialAttempt, "read"); + maybeLogAndRecordMetricsForSuccess(initialAttempt, "read"); return result; } catch (IOException e) { + if (attempt == initialAttempt) { + blobStore.getS3RepositoriesMetrics().retryStartedCounter().incrementBy(1, metricAttributes("read")); + } reopenStreamOrFail(e); } } @@ -162,9 +169,12 @@ public int read(byte[] b, int off, int len) throws IOException { } else { currentOffset += bytesRead; } - maybeLogForSuccessAfterRetries(initialAttempt, "read"); + maybeLogAndRecordMetricsForSuccess(initialAttempt, "read"); return bytesRead; } catch (IOException e) { + if (attempt == initialAttempt) { + blobStore.getS3RepositoriesMetrics().retryStartedCounter().incrementBy(1, metricAttributes("read")); + } reopenStreamOrFail(e); } } @@ -246,16 +256,20 @@ private void logForRetry(Level level, String action, Exception e) { ); } - private void maybeLogForSuccessAfterRetries(int initialAttempt, String action) { + private void maybeLogAndRecordMetricsForSuccess(int initialAttempt, String action) { if (attempt > initialAttempt) { + final int numberOfRetries = attempt - initialAttempt; logger.info( "successfully {} input stream for [{}/{}] with purpose [{}] after [{}] retries", action, blobStore.bucket(), blobKey, purpose.getKey(), - attempt - initialAttempt + numberOfRetries ); + final Map attributes = metricAttributes(action); + blobStore.getS3RepositoriesMetrics().retryCompletedCounter().incrementBy(1, attributes); + blobStore.getS3RepositoriesMetrics().retryHistogram().record(numberOfRetries, attributes); } } @@ -294,6 +308,21 @@ protected long getRetryDelayInMillis() { return 10L << (Math.min(attempt - 1, 10)); } + private Map metricAttributes(String action) { + return Map.of( + "repo_type", + S3Repository.TYPE, + "repo_name", + blobStore.getRepositoryMetadata().name(), + "operation", + Operation.GET_OBJECT.getKey(), + "purpose", + purpose.getKey(), + "action", + action + ); + } + @Override public void close() throws IOException { maybeAbort(currentStream); diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java index 28a48c2968f59..cf3bc21526bf6 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java @@ -26,7 +26,6 @@ import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.rest.AbstractRestChannel; import org.elasticsearch.rest.RestRequest; @@ -264,9 +263,9 @@ protected S3Repository createRepository( ClusterService clusterService, BigArrays bigArrays, RecoverySettings recoverySettings, - RepositoriesMetrics repositoriesMetrics + S3RepositoriesMetrics s3RepositoriesMetrics ) { - return new S3Repository(metadata, registry, getService(), clusterService, bigArrays, recoverySettings, repositoriesMetrics) { + return new S3Repository(metadata, registry, getService(), clusterService, bigArrays, recoverySettings, s3RepositoriesMetrics) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually on test/main threads diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index 0ddd29171b3bd..05268d750637c 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -43,6 +43,9 @@ import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase; import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; +import org.elasticsearch.telemetry.InstrumentType; +import org.elasticsearch.telemetry.Measurement; +import org.elasticsearch.telemetry.RecordingMeterRegistry; import org.elasticsearch.watcher.ResourceWatcherService; import org.hamcrest.Matcher; import org.junit.After; @@ -59,7 +62,9 @@ import java.nio.charset.StandardCharsets; import java.nio.file.NoSuchFileException; import java.util.Arrays; +import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Objects; import java.util.OptionalInt; import java.util.concurrent.atomic.AtomicBoolean; @@ -74,10 +79,13 @@ import static org.elasticsearch.repositories.s3.S3ClientSettings.READ_TIMEOUT_SETTING; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; @@ -91,6 +99,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes private S3Service service; private AtomicBoolean shouldErrorOnDns; + private RecordingMeterRegistry recordingMeterRegistry; @Before public void setUp() throws Exception { @@ -109,6 +118,7 @@ protected AmazonS3ClientBuilder buildClientBuilder(S3ClientSettings clientSettin return builder; } }; + recordingMeterRegistry = new RecordingMeterRegistry(); super.setUp(); } @@ -185,7 +195,7 @@ protected BlobContainer createBlobContainer( repositoryMetadata, BigArrays.NON_RECYCLING_INSTANCE, new DeterministicTaskQueue().getThreadPool(), - RepositoriesMetrics.NOOP + new S3RepositoriesMetrics(new RepositoriesMetrics(recordingMeterRegistry)) ); return new S3BlobContainer(randomBoolean() ? BlobPath.EMPTY : BlobPath.EMPTY.add("foo"), s3BlobStore) { @Override @@ -669,8 +679,8 @@ public void handle(HttpExchange exchange) throws IOException { } exchange.getResponseBody().write(bytes, rangeStart, length); } else { - failures.incrementAndGet(); if (randomBoolean()) { + failures.incrementAndGet(); exchange.sendResponseHeaders( randomFrom( HttpStatus.SC_INTERNAL_SERVER_ERROR, @@ -686,6 +696,8 @@ public void handle(HttpExchange exchange) throws IOException { if (bytesSent >= meaningfulProgressBytes) { exchange.getResponseBody().flush(); } + } else { + failures.incrementAndGet(); } } } @@ -700,16 +712,28 @@ public void handle(HttpExchange exchange) throws IOException { final int length = between(0, randomBoolean() ? bytes.length : Integer.MAX_VALUE); logger.info("--> position={}, length={}", position, length); try (InputStream inputStream = blobContainer.readBlob(OperationPurpose.INDICES, "read_blob_retries_forever", position, length)) { + assertMetricsForOpeningStream(); + recordingMeterRegistry.getRecorder().resetCalls(); + failures.set(0); + final byte[] bytesRead = BytesReference.toBytes(Streams.readFully(inputStream)); assertArrayEquals(Arrays.copyOfRange(bytes, position, Math.min(bytes.length, position + length)), bytesRead); + assertMetricsForReadingStream(); } assertThat(failures.get(), greaterThan(totalFailures)); // Read the whole blob failures.set(0); + recordingMeterRegistry.getRecorder().resetCalls(); try (InputStream inputStream = blobContainer.readBlob(OperationPurpose.INDICES, "read_blob_retries_forever")) { + assertMetricsForOpeningStream(); + recordingMeterRegistry.getRecorder().resetCalls(); + failures.set(0); + final byte[] bytesRead = BytesReference.toBytes(Streams.readFully(inputStream)); assertArrayEquals(bytes, bytesRead); + + assertMetricsForReadingStream(); } assertThat(failures.get(), greaterThan(totalFailures)); } @@ -737,9 +761,13 @@ public void handle(HttpExchange exchange) throws IOException { : blobContainer.readBlob(randomRetryingPurpose(), "read_blob_not_found", between(0, 100), between(1, 100)) ) { Streams.readFully(inputStream); + } }); assertThat(numberOfReads.get(), equalTo(1)); + assertThat(getRetryStartedMeasurements(), empty()); + assertThat(getRetryCompletedMeasurements(), empty()); + assertThat(getRetryHistogramMeasurements(), empty()); } @Override @@ -761,6 +789,77 @@ protected OperationPurpose randomFiniteRetryingPurpose() { ); } + private void assertMetricsForOpeningStream() { + final long numberOfOperations = getOperationMeasurements(); + // S3 client sdk internally also retries within the configured maxRetries for retryable errors. + // The retries in S3RetryingInputStream are triggered when the client internal retries are unsuccessful + if (numberOfOperations > 1) { + // For opening the stream, there should be exactly one pair of started and completed records. + // There should be one histogram record, the number of retries must be greater than 0 + final Map attributes = metricAttributes("open"); + assertThat(getRetryStartedMeasurements(), contains(new Measurement(1L, attributes, false))); + assertThat(getRetryCompletedMeasurements(), contains(new Measurement(1L, attributes, false))); + final List retryHistogramMeasurements = getRetryHistogramMeasurements(); + assertThat(retryHistogramMeasurements, hasSize(1)); + assertThat(retryHistogramMeasurements.get(0).getLong(), equalTo(numberOfOperations - 1)); + assertThat(retryHistogramMeasurements.get(0).attributes(), equalTo(attributes)); + } else { + assertThat(getRetryStartedMeasurements(), empty()); + assertThat(getRetryCompletedMeasurements(), empty()); + assertThat(getRetryHistogramMeasurements(), empty()); + } + } + + private void assertMetricsForReadingStream() { + // For reading the stream, there could be multiple pairs of started and completed records. + // It is important that they always come in pairs and the number of pairs match the number + // of histogram records. + final Map attributes = metricAttributes("read"); + final List retryHistogramMeasurements = getRetryHistogramMeasurements(); + final int numberOfReads = retryHistogramMeasurements.size(); + retryHistogramMeasurements.forEach(measurement -> { + assertThat(measurement.getLong(), greaterThan(0L)); + assertThat(measurement.attributes(), equalTo(attributes)); + }); + + final List retryStartedMeasurements = getRetryStartedMeasurements(); + assertThat(retryStartedMeasurements, hasSize(1)); + assertThat(retryStartedMeasurements.get(0).getLong(), equalTo((long) numberOfReads)); + assertThat(retryStartedMeasurements.get(0).attributes(), equalTo(attributes)); + assertThat(retryStartedMeasurements, equalTo(getRetryCompletedMeasurements())); + } + + private long getOperationMeasurements() { + final List operationMeasurements = Measurement.combine( + recordingMeterRegistry.getRecorder().getMeasurements(InstrumentType.LONG_COUNTER, RepositoriesMetrics.METRIC_OPERATIONS_TOTAL) + ); + assertThat(operationMeasurements, hasSize(1)); + return operationMeasurements.get(0).getLong(); + } + + private List getRetryStartedMeasurements() { + return Measurement.combine( + recordingMeterRegistry.getRecorder() + .getMeasurements(InstrumentType.LONG_COUNTER, S3RepositoriesMetrics.METRIC_RETRY_EVENT_TOTAL) + ); + } + + private List getRetryCompletedMeasurements() { + return Measurement.combine( + recordingMeterRegistry.getRecorder() + .getMeasurements(InstrumentType.LONG_COUNTER, S3RepositoriesMetrics.METRIC_RETRY_SUCCESS_TOTAL) + ); + } + + private List getRetryHistogramMeasurements() { + return recordingMeterRegistry.getRecorder() + .getMeasurements(InstrumentType.LONG_HISTOGRAM, S3RepositoriesMetrics.METRIC_RETRY_ATTEMPTS_HISTOGRAM); + } + + private Map metricAttributes(String action) { + return Map.of("repo_type", "s3", "repo_name", "repository", "operation", "GetObject", "purpose", "Indices", "action", action); + } + /** * Asserts that an InputStream is fully consumed, or aborted, when it is closed */ diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java index 0a92ed0a28973..50470ec499ef6 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java @@ -18,7 +18,6 @@ import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.env.Environment; import org.elasticsearch.indices.recovery.RecoverySettings; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.test.ESTestCase; @@ -130,7 +129,7 @@ private S3Repository createS3Repo(RepositoryMetadata metadata) { BlobStoreTestUtil.mockClusterService(), MockBigArrays.NON_RECYCLING_INSTANCE, new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), - RepositoriesMetrics.NOOP + S3RepositoriesMetrics.NOOP ) { @Override protected void assertSnapshotOrGenericThread() { diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/RandomSamplerIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/RandomSamplerIT.java index 28c186c559dff..53075e31cd6f9 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/RandomSamplerIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/RandomSamplerIT.java @@ -24,6 +24,7 @@ import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThan; @ESIntegTestCase.SuiteScopeTestCase @@ -84,6 +85,48 @@ public void setupSuiteScopeCluster() throws Exception { ensureSearchable(); } + public void testRandomSamplerConsistentSeed() { + double[] sampleMonotonicValue = new double[1]; + double[] sampleNumericValue = new double[1]; + long[] sampledDocCount = new long[1]; + // initialize the values + assertResponse( + prepareSearch("idx").setPreference("shard:0") + .addAggregation( + new RandomSamplerAggregationBuilder("sampler").setProbability(PROBABILITY) + .setSeed(0) + .subAggregation(avg("mean_monotonic").field(MONOTONIC_VALUE)) + .subAggregation(avg("mean_numeric").field(NUMERIC_VALUE)) + .setShardSeed(42) + ), + response -> { + InternalRandomSampler sampler = response.getAggregations().get("sampler"); + sampleMonotonicValue[0] = ((Avg) sampler.getAggregations().get("mean_monotonic")).getValue(); + sampleNumericValue[0] = ((Avg) sampler.getAggregations().get("mean_numeric")).getValue(); + sampledDocCount[0] = sampler.getDocCount(); + } + ); + + for (int i = 0; i < NUM_SAMPLE_RUNS; i++) { + assertResponse( + prepareSearch("idx").setPreference("shard:0") + .addAggregation( + new RandomSamplerAggregationBuilder("sampler").setProbability(PROBABILITY) + .setSeed(0) + .subAggregation(avg("mean_monotonic").field(MONOTONIC_VALUE)) + .subAggregation(avg("mean_numeric").field(NUMERIC_VALUE)) + .setShardSeed(42) + ), + response -> { + InternalRandomSampler sampler = response.getAggregations().get("sampler"); + assertThat(((Avg) sampler.getAggregations().get("mean_monotonic")).getValue(), equalTo(sampleMonotonicValue[0])); + assertThat(((Avg) sampler.getAggregations().get("mean_numeric")).getValue(), equalTo(sampleNumericValue[0])); + assertThat(sampler.getDocCount(), equalTo(sampledDocCount[0])); + } + ); + } + } + public void testRandomSampler() { double[] sampleMonotonicValue = new double[1]; double[] sampleNumericValue = new double[1]; diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index a6fa7a9ea8e99..c88b56ba25022 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -133,6 +133,7 @@ static TransportVersion def(int id) { public static final TransportVersion INDEX_REQUEST_NORMALIZED_BYTES_PARSED = def(8_593_00_0); public static final TransportVersion INGEST_GRAPH_STRUCTURE_EXCEPTION = def(8_594_00_0); public static final TransportVersion ML_MODEL_IN_SERVICE_SETTINGS = def(8_595_00_0); + public static final TransportVersion RANDOM_AGG_SHARD_SEED = def(8_596_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java index ca910a8d94078..4996096492354 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java @@ -109,598 +109,514 @@ protected void masterOperation( ) { assert task instanceof CancellableTask : task + " not cancellable"; - getMultipleReposSnapshotInfo( - request.isSingleRepositoryRequest() == false, - SnapshotsInProgress.get(state), + new GetSnapshotsOperation( + (CancellableTask) task, TransportGetRepositoriesAction.getRepositories(state, request.repositories()), + request.isSingleRepositoryRequest() == false, request.snapshots(), request.ignoreUnavailable(), - request.verbose(), - (CancellableTask) task, + SnapshotPredicates.fromRequest(request), request.sort(), - request.after(), - request.offset(), - request.size(), request.order(), request.fromSortValue(), - SnapshotPredicates.fromRequest(request), - request.includeIndexNames(), - listener - ); + request.offset(), + request.after(), + request.size(), + SnapshotsInProgress.get(state), + request.verbose(), + request.includeIndexNames() + ).getMultipleReposSnapshotInfo(listener); } /** - * Filters the list of repositories that a request will fetch snapshots from in the special case of sorting by repository - * name and having a non-null value for {@link GetSnapshotsRequest#fromSortValue()} on the request to exclude repositories outside - * the sort value range if possible. + * A single invocation of the get-snapshots API. + *

+ * Decides which repositories to query, picks a collection of candidate {@link SnapshotId} values from each {@link RepositoryData}, + * chosen according to the request parameters, loads the relevant {@link SnapshotInfo} blobs, and finally sorts and filters the + * results. */ - private static List maybeFilterRepositories( - List repositories, - GetSnapshotsRequest.SortBy sortBy, - SortOrder order, - @Nullable String fromSortValue - ) { - if (sortBy != GetSnapshotsRequest.SortBy.REPOSITORY || fromSortValue == null) { - return repositories; - } - final Predicate predicate = order == SortOrder.ASC - ? repositoryMetadata -> fromSortValue.compareTo(repositoryMetadata.name()) <= 0 - : repositoryMetadata -> fromSortValue.compareTo(repositoryMetadata.name()) >= 0; - return repositories.stream().filter(predicate).toList(); - } - - private void getMultipleReposSnapshotInfo( - boolean isMultiRepoRequest, - SnapshotsInProgress snapshotsInProgress, - TransportGetRepositoriesAction.RepositoriesResult repositoriesResult, - String[] snapshots, - boolean ignoreUnavailable, - boolean verbose, - CancellableTask cancellableTask, - GetSnapshotsRequest.SortBy sortBy, - @Nullable GetSnapshotsRequest.After after, - int offset, - int size, - SortOrder order, - String fromSortValue, - SnapshotPredicates predicates, - boolean indices, - ActionListener listener - ) { - // Process the missing repositories - final Map failures = ConcurrentCollections.newConcurrentMap(); - for (String missingRepo : repositoriesResult.missing()) { - failures.put(missingRepo, new RepositoryMissingException(missingRepo)); + private class GetSnapshotsOperation { + private final CancellableTask cancellableTask; + + // repositories + private final List repositories; + private final boolean isMultiRepoRequest; + + // snapshots selection + private final String[] snapshots; + private final boolean ignoreUnavailable; + private final SnapshotPredicates predicates; + + // snapshot ordering/pagination + private final GetSnapshotsRequest.SortBy sortBy; + private final SortOrder order; + @Nullable + private final String fromSortValue; + private final int offset; + @Nullable + private final GetSnapshotsRequest.After after; + private final int size; + + // current state + private final SnapshotsInProgress snapshotsInProgress; + + // output detail + private final boolean verbose; + private final boolean indices; + + // results + private final Map failuresByRepository = ConcurrentCollections.newConcurrentMap(); + private final Queue> allSnapshotInfos = ConcurrentCollections.newQueue(); + private final AtomicInteger remaining = new AtomicInteger(); + private final AtomicInteger totalCount = new AtomicInteger(); + + GetSnapshotsOperation( + CancellableTask cancellableTask, + TransportGetRepositoriesAction.RepositoriesResult repositoriesResult, + boolean isMultiRepoRequest, + String[] snapshots, + boolean ignoreUnavailable, + SnapshotPredicates predicates, + GetSnapshotsRequest.SortBy sortBy, + SortOrder order, + String fromSortValue, + int offset, + GetSnapshotsRequest.After after, + int size, + SnapshotsInProgress snapshotsInProgress, + boolean verbose, + boolean indices + ) { + this.cancellableTask = cancellableTask; + this.repositories = repositoriesResult.metadata(); + this.isMultiRepoRequest = isMultiRepoRequest; + this.snapshots = snapshots; + this.ignoreUnavailable = ignoreUnavailable; + this.predicates = predicates; + this.sortBy = sortBy; + this.order = order; + this.fromSortValue = fromSortValue; + this.offset = offset; + this.after = after; + this.size = size; + this.snapshotsInProgress = snapshotsInProgress; + this.verbose = verbose; + this.indices = indices; + + for (final var missingRepo : repositoriesResult.missing()) { + failuresByRepository.put(missingRepo, new RepositoryMissingException(missingRepo)); + } } - final Queue> allSnapshotInfos = ConcurrentCollections.newQueue(); - final var remaining = new AtomicInteger(); - final var totalCount = new AtomicInteger(); - - List repositories = maybeFilterRepositories(repositoriesResult.metadata(), sortBy, order, fromSortValue); - try (var listeners = new RefCountingListener(listener.map(ignored -> { - cancellableTask.ensureNotCancelled(); - final var sortedSnapshotsInRepos = sortSnapshots( - allSnapshotInfos.stream().flatMap(Collection::stream), - totalCount.get(), - sortBy, - after, - offset, - size, - order - ); - final var snapshotInfos = sortedSnapshotsInRepos.snapshotInfos(); - assert indices || snapshotInfos.stream().allMatch(snapshotInfo -> snapshotInfo.indices().isEmpty()); - final int finalRemaining = sortedSnapshotsInRepos.remaining() + remaining.get(); - return new GetSnapshotsResponse( - snapshotInfos, - failures, - finalRemaining > 0 - ? GetSnapshotsRequest.After.from(snapshotInfos.get(snapshotInfos.size() - 1), sortBy).asQueryParam() - : null, - totalCount.get(), - finalRemaining - ); - }))) { - for (final RepositoryMetadata repository : repositories) { - final String repoName = repository.name(); - getSingleRepoSnapshotInfo( - snapshotsInProgress, - repoName, - snapshots, - predicates, - ignoreUnavailable, - verbose, - cancellableTask, - sortBy, - after, - order, - indices, - listeners.acquire((SnapshotsInRepo snapshotsInRepo) -> { + /** + * Filters the list of repositories that a request will fetch snapshots from in the special case of sorting by repository + * name and having a non-null value for {@link GetSnapshotsRequest#fromSortValue()} on the request to exclude repositories outside + * the sort value range if possible. + */ + private List maybeFilterRepositories() { + if (sortBy != GetSnapshotsRequest.SortBy.REPOSITORY || fromSortValue == null) { + return repositories; + } + final Predicate predicate = order == SortOrder.ASC + ? repositoryMetadata -> fromSortValue.compareTo(repositoryMetadata.name()) <= 0 + : repositoryMetadata -> fromSortValue.compareTo(repositoryMetadata.name()) >= 0; + return repositories.stream().filter(predicate).toList(); + } + + void getMultipleReposSnapshotInfo(ActionListener listener) { + List filteredRepositories = maybeFilterRepositories(); + try (var listeners = new RefCountingListener(listener.map(ignored -> { + cancellableTask.ensureNotCancelled(); + final var sortedSnapshotsInRepos = sortSnapshots( + allSnapshotInfos.stream().flatMap(Collection::stream), + totalCount.get(), + offset, + size + ); + final var snapshotInfos = sortedSnapshotsInRepos.snapshotInfos(); + assert indices || snapshotInfos.stream().allMatch(snapshotInfo -> snapshotInfo.indices().isEmpty()); + final int finalRemaining = sortedSnapshotsInRepos.remaining() + remaining.get(); + return new GetSnapshotsResponse( + snapshotInfos, + failuresByRepository, + finalRemaining > 0 + ? GetSnapshotsRequest.After.from(snapshotInfos.get(snapshotInfos.size() - 1), sortBy).asQueryParam() + : null, + totalCount.get(), + finalRemaining + ); + }))) { + for (final RepositoryMetadata repository : filteredRepositories) { + final String repoName = repository.name(); + getSingleRepoSnapshotInfo(repoName, listeners.acquire((SnapshotsInRepo snapshotsInRepo) -> { allSnapshotInfos.add(snapshotsInRepo.snapshotInfos()); remaining.addAndGet(snapshotsInRepo.remaining()); totalCount.addAndGet(snapshotsInRepo.totalCount()); }).delegateResponse((l, e) -> { if (isMultiRepoRequest && e instanceof ElasticsearchException elasticsearchException) { - failures.put(repoName, elasticsearchException); + failuresByRepository.put(repoName, elasticsearchException); l.onResponse(SnapshotsInRepo.EMPTY); } else { l.onFailure(e); } - }) - ); + })); + } } } - } - private void getSingleRepoSnapshotInfo( - SnapshotsInProgress snapshotsInProgress, - String repo, - String[] snapshots, - SnapshotPredicates predicates, - boolean ignoreUnavailable, - boolean verbose, - CancellableTask task, - GetSnapshotsRequest.SortBy sortBy, - @Nullable final GetSnapshotsRequest.After after, - SortOrder order, - boolean indices, - ActionListener listener - ) { - final Map allSnapshotIds = new HashMap<>(); - final List currentSnapshots = new ArrayList<>(); - for (SnapshotInfo snapshotInfo : currentSnapshots(snapshotsInProgress, repo)) { - Snapshot snapshot = snapshotInfo.snapshot(); - allSnapshotIds.put(snapshot.getSnapshotId().getName(), snapshot); - currentSnapshots.add(snapshotInfo.maybeWithoutIndices(indices)); - } + private void getSingleRepoSnapshotInfo(String repo, ActionListener listener) { + final Map allSnapshotIds = new HashMap<>(); + final List currentSnapshots = new ArrayList<>(); + for (final SnapshotInfo snapshotInfo : currentSnapshots(repo)) { + Snapshot snapshot = snapshotInfo.snapshot(); + allSnapshotIds.put(snapshot.getSnapshotId().getName(), snapshot); + currentSnapshots.add(snapshotInfo.maybeWithoutIndices(indices)); + } - final ListenableFuture repositoryDataListener = new ListenableFuture<>(); - if (isCurrentSnapshotsOnly(snapshots)) { - repositoryDataListener.onResponse(null); - } else { - repositoriesService.getRepositoryData(repo, repositoryDataListener); - } + final ListenableFuture repositoryDataListener = new ListenableFuture<>(); + if (isCurrentSnapshotsOnly()) { + repositoryDataListener.onResponse(null); + } else { + repositoriesService.getRepositoryData(repo, repositoryDataListener); + } - repositoryDataListener.addListener( - listener.delegateFailureAndWrap( - (l, repositoryData) -> loadSnapshotInfos( - snapshotsInProgress, - repo, - snapshots, - ignoreUnavailable, - verbose, - allSnapshotIds, - currentSnapshots, - repositoryData, - task, - sortBy, - after, - order, - predicates, - indices, - l + repositoryDataListener.addListener( + listener.delegateFailureAndWrap( + (l, repositoryData) -> loadSnapshotInfos(repo, allSnapshotIds, currentSnapshots, repositoryData, l) ) - ) - ); - } - - /** - * Returns a list of currently running snapshots from repository sorted by snapshot creation date - * - * @param snapshotsInProgress snapshots in progress in the cluster state - * @param repositoryName repository name - * @return list of snapshots - */ - private static List currentSnapshots(SnapshotsInProgress snapshotsInProgress, String repositoryName) { - List snapshotList = new ArrayList<>(); - List entries = SnapshotsService.currentSnapshots( - snapshotsInProgress, - repositoryName, - Collections.emptyList() - ); - for (SnapshotsInProgress.Entry entry : entries) { - snapshotList.add(SnapshotInfo.inProgress(entry)); + ); } - return snapshotList; - } - private void loadSnapshotInfos( - SnapshotsInProgress snapshotsInProgress, - String repo, - String[] snapshots, - boolean ignoreUnavailable, - boolean verbose, - Map allSnapshotIds, - List currentSnapshots, - @Nullable RepositoryData repositoryData, - CancellableTask task, - GetSnapshotsRequest.SortBy sortBy, - @Nullable final GetSnapshotsRequest.After after, - SortOrder order, - SnapshotPredicates predicates, - boolean indices, - ActionListener listener - ) { - if (task.notifyIfCancelled(listener)) { - return; + /** + * Returns a list of currently running snapshots from repository sorted by snapshot creation date + * + * @param repositoryName repository name + * @return list of snapshots + */ + private List currentSnapshots(String repositoryName) { + List snapshotList = new ArrayList<>(); + List entries = SnapshotsService.currentSnapshots( + snapshotsInProgress, + repositoryName, + Collections.emptyList() + ); + for (SnapshotsInProgress.Entry entry : entries) { + snapshotList.add(SnapshotInfo.inProgress(entry)); + } + return snapshotList; } - if (repositoryData != null) { - for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) { - if (predicates.test(snapshotId, repositoryData)) { - allSnapshotIds.put(snapshotId.getName(), new Snapshot(repo, snapshotId)); + private void loadSnapshotInfos( + String repo, + Map allSnapshotIds, + List currentSnapshots, + @Nullable RepositoryData repositoryData, + ActionListener listener + ) { + if (cancellableTask.notifyIfCancelled(listener)) { + return; + } + + if (repositoryData != null) { + for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) { + if (predicates.test(snapshotId, repositoryData)) { + allSnapshotIds.put(snapshotId.getName(), new Snapshot(repo, snapshotId)); + } } } - } - final Set toResolve = new HashSet<>(); - if (TransportGetRepositoriesAction.isMatchAll(snapshots)) { - toResolve.addAll(allSnapshotIds.values()); - } else { - final List includePatterns = new ArrayList<>(); - final List excludePatterns = new ArrayList<>(); - boolean hasCurrent = false; - boolean seenWildcard = false; - for (String snapshotOrPattern : snapshots) { - if (seenWildcard && snapshotOrPattern.length() > 1 && snapshotOrPattern.startsWith("-")) { - excludePatterns.add(snapshotOrPattern.substring(1)); - } else { - if (Regex.isSimpleMatchPattern(snapshotOrPattern)) { - seenWildcard = true; - includePatterns.add(snapshotOrPattern); - } else if (GetSnapshotsRequest.CURRENT_SNAPSHOT.equalsIgnoreCase(snapshotOrPattern)) { - hasCurrent = true; - seenWildcard = true; + final Set toResolve = new HashSet<>(); + if (TransportGetRepositoriesAction.isMatchAll(snapshots)) { + toResolve.addAll(allSnapshotIds.values()); + } else { + final List includePatterns = new ArrayList<>(); + final List excludePatterns = new ArrayList<>(); + boolean hasCurrent = false; + boolean seenWildcard = false; + for (String snapshotOrPattern : snapshots) { + if (seenWildcard && snapshotOrPattern.length() > 1 && snapshotOrPattern.startsWith("-")) { + excludePatterns.add(snapshotOrPattern.substring(1)); } else { - if (ignoreUnavailable == false && allSnapshotIds.containsKey(snapshotOrPattern) == false) { - throw new SnapshotMissingException(repo, snapshotOrPattern); + if (Regex.isSimpleMatchPattern(snapshotOrPattern)) { + seenWildcard = true; + includePatterns.add(snapshotOrPattern); + } else if (GetSnapshotsRequest.CURRENT_SNAPSHOT.equalsIgnoreCase(snapshotOrPattern)) { + hasCurrent = true; + seenWildcard = true; + } else { + if (ignoreUnavailable == false && allSnapshotIds.containsKey(snapshotOrPattern) == false) { + throw new SnapshotMissingException(repo, snapshotOrPattern); + } + includePatterns.add(snapshotOrPattern); } - includePatterns.add(snapshotOrPattern); } } - } - final String[] includes = includePatterns.toArray(Strings.EMPTY_ARRAY); - final String[] excludes = excludePatterns.toArray(Strings.EMPTY_ARRAY); - for (Map.Entry entry : allSnapshotIds.entrySet()) { - final Snapshot snapshot = entry.getValue(); - if (toResolve.contains(snapshot) == false - && Regex.simpleMatch(includes, entry.getKey()) - && Regex.simpleMatch(excludes, entry.getKey()) == false) { - toResolve.add(snapshot); - } - } - if (hasCurrent) { - for (SnapshotInfo snapshotInfo : currentSnapshots) { - final Snapshot snapshot = snapshotInfo.snapshot(); - if (Regex.simpleMatch(excludes, snapshot.getSnapshotId().getName()) == false) { + final String[] includes = includePatterns.toArray(Strings.EMPTY_ARRAY); + final String[] excludes = excludePatterns.toArray(Strings.EMPTY_ARRAY); + for (Map.Entry entry : allSnapshotIds.entrySet()) { + final Snapshot snapshot = entry.getValue(); + if (toResolve.contains(snapshot) == false + && Regex.simpleMatch(includes, entry.getKey()) + && Regex.simpleMatch(excludes, entry.getKey()) == false) { toResolve.add(snapshot); } } + if (hasCurrent) { + for (SnapshotInfo snapshotInfo : currentSnapshots) { + final Snapshot snapshot = snapshotInfo.snapshot(); + if (Regex.simpleMatch(excludes, snapshot.getSnapshotId().getName()) == false) { + toResolve.add(snapshot); + } + } + } + if (toResolve.isEmpty() && ignoreUnavailable == false && isCurrentSnapshotsOnly() == false) { + throw new SnapshotMissingException(repo, snapshots[0]); + } } - if (toResolve.isEmpty() && ignoreUnavailable == false && isCurrentSnapshotsOnly(snapshots) == false) { - throw new SnapshotMissingException(repo, snapshots[0]); + + if (verbose) { + snapshots(repo, toResolve.stream().map(Snapshot::getSnapshotId).toList(), listener); + } else { + assert predicates.isMatchAll() : "filtering is not supported in non-verbose mode"; + final SnapshotsInRepo snapshotInfos; + if (repositoryData != null) { + // want non-current snapshots as well, which are found in the repository data + snapshotInfos = buildSimpleSnapshotInfos(toResolve, repo, repositoryData, currentSnapshots); + } else { + // only want current snapshots + snapshotInfos = sortSnapshotsWithNoOffsetOrLimit(currentSnapshots.stream().map(SnapshotInfo::basic).toList()); + } + listener.onResponse(snapshotInfos); } } - if (verbose) { - snapshots( + /** + * Returns a list of snapshots from repository sorted by snapshot creation date + * + * @param repositoryName repository name + * @param snapshotIds snapshots for which to fetch snapshot information + */ + private void snapshots(String repositoryName, Collection snapshotIds, ActionListener listener) { + if (cancellableTask.notifyIfCancelled(listener)) { + return; + } + final Set snapshotSet = new HashSet<>(); + final Set snapshotIdsToIterate = new HashSet<>(snapshotIds); + // first, look at the snapshots in progress + final List entries = SnapshotsService.currentSnapshots( snapshotsInProgress, - repo, - toResolve.stream().map(Snapshot::getSnapshotId).toList(), - ignoreUnavailable, - task, - sortBy, - after, - order, - predicates, - indices, - listener + repositoryName, + snapshotIdsToIterate.stream().map(SnapshotId::getName).toList() ); - } else { - assert predicates.isMatchAll() : "filtering is not supported in non-verbose mode"; - final SnapshotsInRepo snapshotInfos; - if (repositoryData != null) { - // want non-current snapshots as well, which are found in the repository data - snapshotInfos = buildSimpleSnapshotInfos(toResolve, repo, repositoryData, currentSnapshots, sortBy, after, order, indices); + for (SnapshotsInProgress.Entry entry : entries) { + if (snapshotIdsToIterate.remove(entry.snapshot().getSnapshotId())) { + final SnapshotInfo snapshotInfo = SnapshotInfo.inProgress(entry); + if (predicates.test(snapshotInfo)) { + snapshotSet.add(snapshotInfo.maybeWithoutIndices(indices)); + } + } + } + // then, look in the repository if there's any matching snapshots left + final List snapshotInfos; + if (snapshotIdsToIterate.isEmpty()) { + snapshotInfos = Collections.emptyList(); } else { - // only want current snapshots - snapshotInfos = sortSnapshots( - currentSnapshots.stream().map(SnapshotInfo::basic).toList(), - sortBy, - after, - 0, - GetSnapshotsRequest.NO_LIMIT, - order - ); + snapshotInfos = Collections.synchronizedList(new ArrayList<>()); } - listener.onResponse(snapshotInfos); - } - } - - /** - * Returns a list of snapshots from repository sorted by snapshot creation date - * - * @param snapshotsInProgress snapshots in progress in the cluster state - * @param repositoryName repository name - * @param snapshotIds snapshots for which to fetch snapshot information - * @param ignoreUnavailable if true, snapshots that could not be read will only be logged with a warning, - * @param indices if false, drop the list of indices from each result - */ - private void snapshots( - SnapshotsInProgress snapshotsInProgress, - String repositoryName, - Collection snapshotIds, - boolean ignoreUnavailable, - CancellableTask task, - GetSnapshotsRequest.SortBy sortBy, - @Nullable GetSnapshotsRequest.After after, - SortOrder order, - SnapshotPredicates predicate, - boolean indices, - ActionListener listener - ) { - if (task.notifyIfCancelled(listener)) { - return; - } - final Set snapshotSet = new HashSet<>(); - final Set snapshotIdsToIterate = new HashSet<>(snapshotIds); - // first, look at the snapshots in progress - final List entries = SnapshotsService.currentSnapshots( - snapshotsInProgress, - repositoryName, - snapshotIdsToIterate.stream().map(SnapshotId::getName).toList() - ); - for (SnapshotsInProgress.Entry entry : entries) { - if (snapshotIdsToIterate.remove(entry.snapshot().getSnapshotId())) { - final SnapshotInfo snapshotInfo = SnapshotInfo.inProgress(entry); - if (predicate.test(snapshotInfo)) { - snapshotSet.add(snapshotInfo.maybeWithoutIndices(indices)); - } + final ActionListener allDoneListener = listener.safeMap(v -> { + final ArrayList snapshotList = new ArrayList<>(snapshotInfos); + snapshotList.addAll(snapshotSet); + return sortSnapshotsWithNoOffsetOrLimit(snapshotList); + }); + if (snapshotIdsToIterate.isEmpty()) { + allDoneListener.onResponse(null); + return; } + final Repository repository; + try { + repository = repositoriesService.repository(repositoryName); + } catch (RepositoryMissingException e) { + listener.onFailure(e); + return; + } + repository.getSnapshotInfo( + new GetSnapshotInfoContext( + snapshotIdsToIterate, + ignoreUnavailable == false, + cancellableTask::isCancelled, + (context, snapshotInfo) -> { + if (predicates.test(snapshotInfo)) { + snapshotInfos.add(snapshotInfo.maybeWithoutIndices(indices)); + } + }, + allDoneListener + ) + ); } - // then, look in the repository if there's any matching snapshots left - final List snapshotInfos; - if (snapshotIdsToIterate.isEmpty()) { - snapshotInfos = Collections.emptyList(); - } else { - snapshotInfos = Collections.synchronizedList(new ArrayList<>()); - } - final ActionListener allDoneListener = listener.safeMap(v -> { - final ArrayList snapshotList = new ArrayList<>(snapshotInfos); - snapshotList.addAll(snapshotSet); - return sortSnapshots(snapshotList, sortBy, after, 0, GetSnapshotsRequest.NO_LIMIT, order); - }); - if (snapshotIdsToIterate.isEmpty()) { - allDoneListener.onResponse(null); - return; - } - final Repository repository; - try { - repository = repositoriesService.repository(repositoryName); - } catch (RepositoryMissingException e) { - listener.onFailure(e); - return; - } - repository.getSnapshotInfo( - new GetSnapshotInfoContext(snapshotIdsToIterate, ignoreUnavailable == false, task::isCancelled, (context, snapshotInfo) -> { - if (predicate.test(snapshotInfo)) { - snapshotInfos.add(snapshotInfo.maybeWithoutIndices(indices)); - } - }, allDoneListener) - ); - } - private static boolean isCurrentSnapshotsOnly(String[] snapshots) { - return (snapshots.length == 1 && GetSnapshotsRequest.CURRENT_SNAPSHOT.equalsIgnoreCase(snapshots[0])); - } + private boolean isCurrentSnapshotsOnly() { + return snapshots.length == 1 && GetSnapshotsRequest.CURRENT_SNAPSHOT.equalsIgnoreCase(snapshots[0]); + } - private static SnapshotsInRepo buildSimpleSnapshotInfos( - final Set toResolve, - final String repoName, - final RepositoryData repositoryData, - final List currentSnapshots, - final GetSnapshotsRequest.SortBy sortBy, - @Nullable final GetSnapshotsRequest.After after, - final SortOrder order, - boolean indices - ) { - List snapshotInfos = new ArrayList<>(); - for (SnapshotInfo snapshotInfo : currentSnapshots) { - if (toResolve.remove(snapshotInfo.snapshot())) { - snapshotInfos.add(snapshotInfo.basic()); + private SnapshotsInRepo buildSimpleSnapshotInfos( + final Set toResolve, + final String repoName, + final RepositoryData repositoryData, + final List currentSnapshots + ) { + List snapshotInfos = new ArrayList<>(); + for (SnapshotInfo snapshotInfo : currentSnapshots) { + if (toResolve.remove(snapshotInfo.snapshot())) { + snapshotInfos.add(snapshotInfo.basic()); + } } - } - Map> snapshotsToIndices = new HashMap<>(); - if (indices) { - for (IndexId indexId : repositoryData.getIndices().values()) { - for (SnapshotId snapshotId : repositoryData.getSnapshots(indexId)) { - if (toResolve.contains(new Snapshot(repoName, snapshotId))) { - snapshotsToIndices.computeIfAbsent(snapshotId, (k) -> new ArrayList<>()).add(indexId.getName()); + Map> snapshotsToIndices = new HashMap<>(); + if (indices) { + for (IndexId indexId : repositoryData.getIndices().values()) { + for (SnapshotId snapshotId : repositoryData.getSnapshots(indexId)) { + if (toResolve.contains(new Snapshot(repoName, snapshotId))) { + snapshotsToIndices.computeIfAbsent(snapshotId, (k) -> new ArrayList<>()).add(indexId.getName()); + } } } } + for (Snapshot snapshot : toResolve) { + snapshotInfos.add( + new SnapshotInfo( + snapshot, + snapshotsToIndices.getOrDefault(snapshot.getSnapshotId(), Collections.emptyList()), + Collections.emptyList(), + Collections.emptyList(), + repositoryData.getSnapshotState(snapshot.getSnapshotId()) + ) + ); + } + return sortSnapshotsWithNoOffsetOrLimit(snapshotInfos); } - for (Snapshot snapshot : toResolve) { - snapshotInfos.add( - new SnapshotInfo( - snapshot, - snapshotsToIndices.getOrDefault(snapshot.getSnapshotId(), Collections.emptyList()), - Collections.emptyList(), - Collections.emptyList(), - repositoryData.getSnapshotState(snapshot.getSnapshotId()) - ) - ); - } - return sortSnapshots(snapshotInfos, sortBy, after, 0, GetSnapshotsRequest.NO_LIMIT, order); - } - private static final Comparator BY_START_TIME = Comparator.comparingLong(SnapshotInfo::startTime) - .thenComparing(SnapshotInfo::snapshotId); + private static final Comparator BY_START_TIME = Comparator.comparingLong(SnapshotInfo::startTime) + .thenComparing(SnapshotInfo::snapshotId); - private static final Comparator BY_DURATION = Comparator.comparingLong( - sni -> sni.endTime() - sni.startTime() - ).thenComparing(SnapshotInfo::snapshotId); + private static final Comparator BY_DURATION = Comparator.comparingLong( + sni -> sni.endTime() - sni.startTime() + ).thenComparing(SnapshotInfo::snapshotId); - private static final Comparator BY_INDICES_COUNT = Comparator.comparingInt(sni -> sni.indices().size()) - .thenComparing(SnapshotInfo::snapshotId); + private static final Comparator BY_INDICES_COUNT = Comparator.comparingInt(sni -> sni.indices().size()) + .thenComparing(SnapshotInfo::snapshotId); - private static final Comparator BY_SHARDS_COUNT = Comparator.comparingInt(SnapshotInfo::totalShards) - .thenComparing(SnapshotInfo::snapshotId); + private static final Comparator BY_SHARDS_COUNT = Comparator.comparingInt(SnapshotInfo::totalShards) + .thenComparing(SnapshotInfo::snapshotId); - private static final Comparator BY_FAILED_SHARDS_COUNT = Comparator.comparingInt(SnapshotInfo::failedShards) - .thenComparing(SnapshotInfo::snapshotId); + private static final Comparator BY_FAILED_SHARDS_COUNT = Comparator.comparingInt(SnapshotInfo::failedShards) + .thenComparing(SnapshotInfo::snapshotId); - private static final Comparator BY_NAME = Comparator.comparing(sni -> sni.snapshotId().getName()); + private static final Comparator BY_NAME = Comparator.comparing(sni -> sni.snapshotId().getName()); - private static final Comparator BY_REPOSITORY = Comparator.comparing(SnapshotInfo::repository) - .thenComparing(SnapshotInfo::snapshotId); + private static final Comparator BY_REPOSITORY = Comparator.comparing(SnapshotInfo::repository) + .thenComparing(SnapshotInfo::snapshotId); - private static long getDuration(SnapshotId snapshotId, RepositoryData repositoryData) { - final RepositoryData.SnapshotDetails details = repositoryData.getSnapshotDetails(snapshotId); - if (details == null) { - return -1; - } - final long startTime = details.getStartTimeMillis(); - if (startTime == -1) { - return -1; + private SnapshotsInRepo sortSnapshotsWithNoOffsetOrLimit(List snapshotInfos) { + return sortSnapshots(snapshotInfos.stream(), snapshotInfos.size(), 0, GetSnapshotsRequest.NO_LIMIT); } - final long endTime = details.getEndTimeMillis(); - if (endTime == -1) { - return -1; - } - return endTime - startTime; - } - private static long getStartTime(SnapshotId snapshotId, RepositoryData repositoryData) { - final RepositoryData.SnapshotDetails details = repositoryData.getSnapshotDetails(snapshotId); - return details == null ? -1 : details.getStartTimeMillis(); - } + private SnapshotsInRepo sortSnapshots(Stream infos, int totalCount, int offset, int size) { + final Comparator comparator = switch (sortBy) { + case START_TIME -> BY_START_TIME; + case NAME -> BY_NAME; + case DURATION -> BY_DURATION; + case INDICES -> BY_INDICES_COUNT; + case SHARDS -> BY_SHARDS_COUNT; + case FAILED_SHARDS -> BY_FAILED_SHARDS_COUNT; + case REPOSITORY -> BY_REPOSITORY; + }; - private static int indexCount(SnapshotId snapshotId, RepositoryData repositoryData) { - // TODO: this could be made more efficient by caching this number in RepositoryData - int indexCount = 0; - for (IndexId idx : repositoryData.getIndices().values()) { - if (repositoryData.getSnapshots(idx).contains(snapshotId)) { - indexCount++; + if (after != null) { + assert offset == 0 : "can't combine after and offset but saw [" + after + "] and offset [" + offset + "]"; + infos = infos.filter(buildAfterPredicate()); } + infos = infos.sorted(order == SortOrder.DESC ? comparator.reversed() : comparator).skip(offset); + final List allSnapshots = infos.toList(); + final List snapshots; + if (size != GetSnapshotsRequest.NO_LIMIT) { + snapshots = allSnapshots.stream().limit(size + 1).toList(); + } else { + snapshots = allSnapshots; + } + final List resultSet = size != GetSnapshotsRequest.NO_LIMIT && size < snapshots.size() + ? snapshots.subList(0, size) + : snapshots; + return new SnapshotsInRepo(resultSet, totalCount, allSnapshots.size() - resultSet.size()); + } + + private Predicate buildAfterPredicate() { + final String snapshotName = after.snapshotName(); + final String repoName = after.repoName(); + final String value = after.value(); + return switch (sortBy) { + case START_TIME -> filterByLongOffset(SnapshotInfo::startTime, Long.parseLong(value), snapshotName, repoName, order); + case NAME -> + // TODO: cover via pre-flight predicate + order == SortOrder.ASC + ? (info -> compareName(snapshotName, repoName, info) < 0) + : (info -> compareName(snapshotName, repoName, info) > 0); + case DURATION -> filterByLongOffset( + info -> info.endTime() - info.startTime(), + Long.parseLong(value), + snapshotName, + repoName, + order + ); + case INDICES -> + // TODO: cover via pre-flight predicate + filterByLongOffset(info -> info.indices().size(), Integer.parseInt(value), snapshotName, repoName, order); + case SHARDS -> filterByLongOffset(SnapshotInfo::totalShards, Integer.parseInt(value), snapshotName, repoName, order); + case FAILED_SHARDS -> filterByLongOffset( + SnapshotInfo::failedShards, + Integer.parseInt(value), + snapshotName, + repoName, + order + ); + case REPOSITORY -> + // TODO: cover via pre-flight predicate + order == SortOrder.ASC + ? (info -> compareRepositoryName(snapshotName, repoName, info) < 0) + : (info -> compareRepositoryName(snapshotName, repoName, info) > 0); + }; + } + + private static Predicate filterByLongOffset( + ToLongFunction extractor, + long after, + String snapshotName, + String repoName, + SortOrder order + ) { + return order == SortOrder.ASC ? info -> { + final long val = extractor.applyAsLong(info); + return after < val || (after == val && compareName(snapshotName, repoName, info) < 0); + } : info -> { + final long val = extractor.applyAsLong(info); + return after > val || (after == val && compareName(snapshotName, repoName, info) > 0); + }; + } + + private static int compareRepositoryName(String name, String repoName, SnapshotInfo info) { + final int res = repoName.compareTo(info.repository()); + if (res != 0) { + return res; + } + return name.compareTo(info.snapshotId().getName()); } - return indexCount; - } - - private static SnapshotsInRepo sortSnapshots( - List snapshotInfos, - GetSnapshotsRequest.SortBy sortBy, - @Nullable GetSnapshotsRequest.After after, - int offset, - int size, - SortOrder order - ) { - return sortSnapshots(snapshotInfos.stream(), snapshotInfos.size(), sortBy, after, offset, size, order); - } - - private static SnapshotsInRepo sortSnapshots( - Stream infos, - int totalCount, - GetSnapshotsRequest.SortBy sortBy, - @Nullable GetSnapshotsRequest.After after, - int offset, - int size, - SortOrder order - ) { - final Comparator comparator = switch (sortBy) { - case START_TIME -> BY_START_TIME; - case NAME -> BY_NAME; - case DURATION -> BY_DURATION; - case INDICES -> BY_INDICES_COUNT; - case SHARDS -> BY_SHARDS_COUNT; - case FAILED_SHARDS -> BY_FAILED_SHARDS_COUNT; - case REPOSITORY -> BY_REPOSITORY; - }; - - if (after != null) { - assert offset == 0 : "can't combine after and offset but saw [" + after + "] and offset [" + offset + "]"; - infos = infos.filter(buildAfterPredicate(sortBy, after, order)); - } - infos = infos.sorted(order == SortOrder.DESC ? comparator.reversed() : comparator).skip(offset); - final List allSnapshots = infos.toList(); - final List snapshots; - if (size != GetSnapshotsRequest.NO_LIMIT) { - snapshots = allSnapshots.stream().limit(size + 1).toList(); - } else { - snapshots = allSnapshots; - } - final List resultSet = size != GetSnapshotsRequest.NO_LIMIT && size < snapshots.size() - ? snapshots.subList(0, size) - : snapshots; - return new SnapshotsInRepo(resultSet, totalCount, allSnapshots.size() - resultSet.size()); - } - private static Predicate buildAfterPredicate( - GetSnapshotsRequest.SortBy sortBy, - GetSnapshotsRequest.After after, - SortOrder order - ) { - final String snapshotName = after.snapshotName(); - final String repoName = after.repoName(); - final String value = after.value(); - return switch (sortBy) { - case START_TIME -> filterByLongOffset(SnapshotInfo::startTime, Long.parseLong(value), snapshotName, repoName, order); - case NAME -> - // TODO: cover via pre-flight predicate - order == SortOrder.ASC - ? (info -> compareName(snapshotName, repoName, info) < 0) - : (info -> compareName(snapshotName, repoName, info) > 0); - case DURATION -> filterByLongOffset( - info -> info.endTime() - info.startTime(), - Long.parseLong(value), - snapshotName, - repoName, - order - ); - case INDICES -> - // TODO: cover via pre-flight predicate - filterByLongOffset(info -> info.indices().size(), Integer.parseInt(value), snapshotName, repoName, order); - case SHARDS -> filterByLongOffset(SnapshotInfo::totalShards, Integer.parseInt(value), snapshotName, repoName, order); - case FAILED_SHARDS -> filterByLongOffset(SnapshotInfo::failedShards, Integer.parseInt(value), snapshotName, repoName, order); - case REPOSITORY -> - // TODO: cover via pre-flight predicate - order == SortOrder.ASC - ? (info -> compareRepositoryName(snapshotName, repoName, info) < 0) - : (info -> compareRepositoryName(snapshotName, repoName, info) > 0); - }; - } - - private static Predicate filterByLongOffset( - ToLongFunction extractor, - long after, - String snapshotName, - String repoName, - SortOrder order - ) { - return order == SortOrder.ASC ? info -> { - final long val = extractor.applyAsLong(info); - return after < val || (after == val && compareName(snapshotName, repoName, info) < 0); - } : info -> { - final long val = extractor.applyAsLong(info); - return after > val || (after == val && compareName(snapshotName, repoName, info) > 0); - }; - } - - private static int compareRepositoryName(String name, String repoName, SnapshotInfo info) { - final int res = repoName.compareTo(info.repository()); - if (res != 0) { - return res; + private static int compareName(String name, String repoName, SnapshotInfo info) { + final int res = name.compareTo(info.snapshotId().getName()); + if (res != 0) { + return res; + } + return repoName.compareTo(info.repository()); } - return name.compareTo(info.snapshotId().getName()); - } - private static int compareName(String name, String repoName, SnapshotInfo info) { - final int res = name.compareTo(info.snapshotId().getName()); - if (res != 0) { - return res; - } - return repoName.compareTo(info.repository()); } /** @@ -881,6 +797,37 @@ private static Predicate filterByLongOffset(ToLongFunction after <= extractor.applyAsLong(info) : info -> after >= extractor.applyAsLong(info); } + private static long getDuration(SnapshotId snapshotId, RepositoryData repositoryData) { + final RepositoryData.SnapshotDetails details = repositoryData.getSnapshotDetails(snapshotId); + if (details == null) { + return -1; + } + final long startTime = details.getStartTimeMillis(); + if (startTime == -1) { + return -1; + } + final long endTime = details.getEndTimeMillis(); + if (endTime == -1) { + return -1; + } + return endTime - startTime; + } + + private static long getStartTime(SnapshotId snapshotId, RepositoryData repositoryData) { + final RepositoryData.SnapshotDetails details = repositoryData.getSnapshotDetails(snapshotId); + return details == null ? -1 : details.getStartTimeMillis(); + } + + private static int indexCount(SnapshotId snapshotId, RepositoryData repositoryData) { + // TODO: this could be made more efficient by caching this number in RepositoryData + int indexCount = 0; + for (IndexId idx : repositoryData.getIndices().values()) { + if (repositoryData.getSnapshots(idx).contains(snapshotId)) { + indexCount++; + } + } + return indexCount; + } } private record SnapshotsInRepo(List snapshotInfos, int totalCount, int remaining) { diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesMetrics.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesMetrics.java index b4d79d89ec4c6..50aa7881cd2b6 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesMetrics.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesMetrics.java @@ -13,6 +13,7 @@ import org.elasticsearch.telemetry.metric.MeterRegistry; public record RepositoriesMetrics( + MeterRegistry meterRegistry, LongCounter requestCounter, LongCounter exceptionCounter, LongCounter throttleCounter, @@ -36,6 +37,7 @@ public record RepositoriesMetrics( public RepositoriesMetrics(MeterRegistry meterRegistry) { this( + meterRegistry, meterRegistry.registerLongCounter(METRIC_REQUESTS_TOTAL, "repository request counter", "unit"), meterRegistry.registerLongCounter(METRIC_EXCEPTIONS_TOTAL, "repository request exception counter", "unit"), meterRegistry.registerLongCounter(METRIC_THROTTLES_TOTAL, "repository request throttle counter", "unit"), diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/MeteredBlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/MeteredBlobStoreRepository.java index 6ecab2f8c77f2..c5ea99b0e5c14 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/MeteredBlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/MeteredBlobStoreRepository.java @@ -14,7 +14,6 @@ import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.indices.recovery.RecoverySettings; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.RepositoryInfo; import org.elasticsearch.repositories.RepositoryStatsSnapshot; import org.elasticsearch.threadpool.ThreadPool; @@ -24,7 +23,6 @@ public abstract class MeteredBlobStoreRepository extends BlobStoreRepository { private final RepositoryInfo repositoryInfo; - protected final RepositoriesMetrics repositoriesMetrics; public MeteredBlobStoreRepository( RepositoryMetadata metadata, @@ -33,11 +31,9 @@ public MeteredBlobStoreRepository( BigArrays bigArrays, RecoverySettings recoverySettings, BlobPath basePath, - Map location, - RepositoriesMetrics repositoriesMetrics + Map location ) { super(metadata, namedXContentRegistry, clusterService, bigArrays, recoverySettings, basePath); - this.repositoriesMetrics = repositoriesMetrics; ThreadPool threadPool = clusterService.getClusterApplierService().threadPool(); this.repositoryInfo = new RepositoryInfo( UUIDs.randomBase64UUID(), diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/InternalRandomSampler.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/InternalRandomSampler.java index 4dde9cc67b975..68a1a22369d2a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/InternalRandomSampler.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/InternalRandomSampler.java @@ -8,6 +8,7 @@ package org.elasticsearch.search.aggregations.bucket.sampler.random; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.Releasables; @@ -29,18 +30,21 @@ public class InternalRandomSampler extends InternalSingleBucketAggregation imple public static final String PARSER_NAME = "random_sampler"; private final int seed; + private final Integer shardSeed; private final double probability; InternalRandomSampler( String name, long docCount, int seed, + Integer shardSeed, double probability, InternalAggregations subAggregations, Map metadata ) { super(name, docCount, subAggregations, metadata); this.seed = seed; + this.shardSeed = shardSeed; this.probability = probability; } @@ -51,6 +55,11 @@ public InternalRandomSampler(StreamInput in) throws IOException { super(in); this.seed = in.readInt(); this.probability = in.readDouble(); + if (in.getTransportVersion().onOrAfter(TransportVersions.RANDOM_AGG_SHARD_SEED)) { + this.shardSeed = in.readOptionalInt(); + } else { + this.shardSeed = null; + } } @Override @@ -58,6 +67,9 @@ protected void doWriteTo(StreamOutput out) throws IOException { super.doWriteTo(out); out.writeInt(seed); out.writeDouble(probability); + if (out.getTransportVersion().onOrAfter(TransportVersions.RANDOM_AGG_SHARD_SEED)) { + out.writeOptionalInt(shardSeed); + } } @Override @@ -72,7 +84,7 @@ public String getType() { @Override protected InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations) { - return new InternalRandomSampler(name, docCount, seed, probability, subAggregations, metadata); + return new InternalRandomSampler(name, docCount, seed, shardSeed, probability, subAggregations, metadata); } @Override @@ -105,12 +117,15 @@ public void close() { } public SamplingContext buildContext() { - return new SamplingContext(probability, seed); + return new SamplingContext(probability, seed, shardSeed); } @Override public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { builder.field(RandomSamplerAggregationBuilder.SEED.getPreferredName(), seed); + if (shardSeed != null) { + builder.field(RandomSamplerAggregationBuilder.SHARD_SEED.getPreferredName(), shardSeed); + } builder.field(RandomSamplerAggregationBuilder.PROBABILITY.getPreferredName(), probability); builder.field(CommonFields.DOC_COUNT.getPreferredName(), getDocCount()); getAggregations().toXContentInternal(builder, params); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregationBuilder.java index 240f016c66954..9bd9ab45b633a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregationBuilder.java @@ -34,6 +34,7 @@ public class RandomSamplerAggregationBuilder extends AbstractAggregationBuilder< static final ParseField PROBABILITY = new ParseField("probability"); static final ParseField SEED = new ParseField("seed"); + static final ParseField SHARD_SEED = new ParseField("shard_seed"); public static final ObjectParser PARSER = ObjectParser.fromBuilder( RandomSamplerAggregationBuilder.NAME, @@ -41,10 +42,12 @@ public class RandomSamplerAggregationBuilder extends AbstractAggregationBuilder< ); static { PARSER.declareInt(RandomSamplerAggregationBuilder::setSeed, SEED); + PARSER.declareInt(RandomSamplerAggregationBuilder::setShardSeed, SHARD_SEED); PARSER.declareDouble(RandomSamplerAggregationBuilder::setProbability, PROBABILITY); } private int seed = Randomness.get().nextInt(); + private Integer shardSeed; private double p; public RandomSamplerAggregationBuilder(String name) { @@ -67,10 +70,18 @@ public RandomSamplerAggregationBuilder setSeed(int seed) { return this; } + public RandomSamplerAggregationBuilder setShardSeed(int shardSeed) { + this.shardSeed = shardSeed; + return this; + } + public RandomSamplerAggregationBuilder(StreamInput in) throws IOException { super(in); this.p = in.readDouble(); this.seed = in.readInt(); + if (in.getTransportVersion().onOrAfter(TransportVersions.RANDOM_AGG_SHARD_SEED)) { + this.shardSeed = in.readOptionalInt(); + } } protected RandomSamplerAggregationBuilder( @@ -81,12 +92,16 @@ protected RandomSamplerAggregationBuilder( super(clone, factoriesBuilder, metadata); this.p = clone.p; this.seed = clone.seed; + this.shardSeed = clone.shardSeed; } @Override protected void doWriteTo(StreamOutput out) throws IOException { out.writeDouble(p); out.writeInt(seed); + if (out.getTransportVersion().onOrAfter(TransportVersions.RANDOM_AGG_SHARD_SEED)) { + out.writeOptionalInt(shardSeed); + } } static void recursivelyCheckSubAggs(Collection builders, Consumer aggregationCheck) { @@ -128,7 +143,7 @@ protected AggregatorFactory doBuild( ); } }); - return new RandomSamplerAggregatorFactory(name, seed, p, context, parent, subfactoriesBuilder, metadata); + return new RandomSamplerAggregatorFactory(name, seed, shardSeed, p, context, parent, subfactoriesBuilder, metadata); } @Override @@ -136,6 +151,9 @@ protected XContentBuilder internalXContent(XContentBuilder builder, Params param builder.startObject(); builder.field(PROBABILITY.getPreferredName(), p); builder.field(SEED.getPreferredName(), seed); + if (shardSeed != null) { + builder.field(SHARD_SEED.getPreferredName(), shardSeed); + } builder.endObject(); return null; } @@ -162,7 +180,7 @@ public TransportVersion getMinimalSupportedVersion() { @Override public int hashCode() { - return Objects.hash(super.hashCode(), p, seed); + return Objects.hash(super.hashCode(), p, seed, shardSeed); } @Override @@ -171,6 +189,6 @@ public boolean equals(Object obj) { if (obj == null || getClass() != obj.getClass()) return false; if (super.equals(obj) == false) return false; RandomSamplerAggregationBuilder other = (RandomSamplerAggregationBuilder) obj; - return Objects.equals(p, other.p) && Objects.equals(seed, other.seed); + return Objects.equals(p, other.p) && Objects.equals(seed, other.seed) && Objects.equals(shardSeed, other.shardSeed); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregator.java index 8853733b9a158..a279b8270cd57 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregator.java @@ -30,12 +30,14 @@ public class RandomSamplerAggregator extends BucketsAggregator implements SingleBucketAggregator { private final int seed; + private final Integer shardSeed; private final double probability; private final CheckedSupplier weightSupplier; RandomSamplerAggregator( String name, int seed, + Integer shardSeed, double probability, CheckedSupplier weightSupplier, AggregatorFactories factories, @@ -53,6 +55,7 @@ public class RandomSamplerAggregator extends BucketsAggregator implements Single ); } this.weightSupplier = weightSupplier; + this.shardSeed = shardSeed; } @Override @@ -63,6 +66,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I name, bucketDocCount(owningBucketOrd), seed, + shardSeed, probability, subAggregationResults, metadata() @@ -72,7 +76,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I @Override public InternalAggregation buildEmptyAggregation() { - return new InternalRandomSampler(name, 0, seed, probability, buildEmptySubAggregations(), metadata()); + return new InternalRandomSampler(name, 0, seed, shardSeed, probability, buildEmptySubAggregations(), metadata()); } /** diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregatorFactory.java index d63f574b4d8bd..4be2e932179fe 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregatorFactory.java @@ -26,6 +26,7 @@ public class RandomSamplerAggregatorFactory extends AggregatorFactory { private final int seed; + private final Integer shardSeed; private final double probability; private final SamplingContext samplingContext; private Weight weight; @@ -33,6 +34,7 @@ public class RandomSamplerAggregatorFactory extends AggregatorFactory { RandomSamplerAggregatorFactory( String name, int seed, + Integer shardSeed, double probability, AggregationContext context, AggregatorFactory parent, @@ -42,7 +44,8 @@ public class RandomSamplerAggregatorFactory extends AggregatorFactory { super(name, context, parent, subFactories, metadata); this.probability = probability; this.seed = seed; - this.samplingContext = new SamplingContext(probability, seed); + this.samplingContext = new SamplingContext(probability, seed, shardSeed); + this.shardSeed = shardSeed; } @Override @@ -53,7 +56,18 @@ public Optional getSamplingContext() { @Override public Aggregator createInternal(Aggregator parent, CardinalityUpperBound cardinality, Map metadata) throws IOException { - return new RandomSamplerAggregator(name, seed, probability, this::getWeight, factories, context, parent, cardinality, metadata); + return new RandomSamplerAggregator( + name, + seed, + shardSeed, + probability, + this::getWeight, + factories, + context, + parent, + cardinality, + metadata + ); } /** @@ -66,7 +80,11 @@ public Aggregator createInternal(Aggregator parent, CardinalityUpperBound cardin */ private Weight getWeight() throws IOException { if (weight == null) { - RandomSamplingQuery query = new RandomSamplingQuery(probability, seed, context.shardRandomSeed()); + RandomSamplingQuery query = new RandomSamplingQuery( + probability, + seed, + shardSeed == null ? context.shardRandomSeed() : shardSeed + ); BooleanQuery booleanQuery = new BooleanQuery.Builder().add(query, BooleanClause.Occur.FILTER) .add(context.query(), BooleanClause.Occur.FILTER) .build(); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/support/SamplingContext.java b/server/src/main/java/org/elasticsearch/search/aggregations/support/SamplingContext.java index 57ea138f63268..d8f34bfcf9973 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/support/SamplingContext.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/support/SamplingContext.java @@ -20,8 +20,9 @@ /** * This provides information around the current sampling context for aggregations */ -public record SamplingContext(double probability, int seed) { - public static final SamplingContext NONE = new SamplingContext(1.0, 0); +public record SamplingContext(double probability, int seed, Integer shardSeed) { + + public static final SamplingContext NONE = new SamplingContext(1.0, 0, null); public boolean isSampled() { return probability < 1.0; @@ -97,20 +98,22 @@ public Query buildQueryWithSampler(QueryBuilder builder, AggregationContext cont } BooleanQuery.Builder queryBuilder = new BooleanQuery.Builder(); queryBuilder.add(rewritten, BooleanClause.Occur.FILTER); - queryBuilder.add(new RandomSamplingQuery(probability(), seed(), context.shardRandomSeed()), BooleanClause.Occur.FILTER); + queryBuilder.add( + new RandomSamplingQuery(probability(), seed(), shardSeed == null ? context.shardRandomSeed() : shardSeed), + BooleanClause.Occur.FILTER + ); return queryBuilder.build(); } /** * @param context The current aggregation context * @return the sampling query if the sampling context indicates that sampling is required - * @throws IOException thrown on query build failure */ - public Optional buildSamplingQueryIfNecessary(AggregationContext context) throws IOException { + public Optional buildSamplingQueryIfNecessary(AggregationContext context) { if (isSampled() == false) { return Optional.empty(); } - return Optional.of(new RandomSamplingQuery(probability(), seed(), context.shardRandomSeed())); + return Optional.of(new RandomSamplingQuery(probability(), seed(), shardSeed == null ? context.shardRandomSeed() : shardSeed)); } } diff --git a/server/src/main/java/org/elasticsearch/transport/LeakTracker.java b/server/src/main/java/org/elasticsearch/transport/LeakTracker.java index 3be22f6fae53a..77a41cff15fd7 100644 --- a/server/src/main/java/org/elasticsearch/transport/LeakTracker.java +++ b/server/src/main/java/org/elasticsearch/transport/LeakTracker.java @@ -41,6 +41,8 @@ public final class LeakTracker { public static final LeakTracker INSTANCE = new LeakTracker(); + private static volatile String contextHint = ""; + private LeakTracker() {} /** @@ -72,6 +74,15 @@ public void reportLeak() { } } + /** + * Set a hint string that will be recorded with every leak that is recorded. Used by unit tests to allow identifying the exact test + * that caused a leak by setting the test name here. + * @param hint hint value + */ + public static void setContextHint(String hint) { + contextHint = hint; + } + public static Releasable wrap(Releasable releasable) { if (Assertions.ENABLED == false) { return releasable; @@ -299,19 +310,25 @@ private static final class Record extends Throwable { private final Record next; private final int pos; + private final String threadName; + + private final String contextHint = LeakTracker.contextHint; + Record(Record next) { this.next = next; this.pos = next.pos + 1; + threadName = Thread.currentThread().getName(); } private Record() { next = null; pos = -1; + threadName = Thread.currentThread().getName(); } @Override public String toString() { - StringBuilder buf = new StringBuilder(); + StringBuilder buf = new StringBuilder("\tin [").append(threadName).append("][").append(contextHint).append("]\n"); StackTraceElement[] array = getStackTrace(); // Skip the first three elements since those are just related to the leak tracker. for (int i = 3; i < array.length; i++) { diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index 4f7001f00e6a7..45e4bb09c1616 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -482,8 +482,7 @@ private MeteredRepositoryTypeA(RepositoryMetadata metadata, ClusterService clust MockBigArrays.NON_RECYCLING_INSTANCE, mock(RecoverySettings.class), BlobPath.EMPTY, - Map.of("bucket", "bucket-a"), - RepositoriesMetrics.NOOP + Map.of("bucket", "bucket-a") ); } @@ -510,8 +509,7 @@ private MeteredRepositoryTypeB(RepositoryMetadata metadata, ClusterService clust MockBigArrays.NON_RECYCLING_INSTANCE, mock(RecoverySettings.class), BlobPath.EMPTY, - Map.of("bucket", "bucket-b"), - RepositoriesMetrics.NOOP + Map.of("bucket", "bucket-b") ); } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregationBuilderTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregationBuilderTests.java index 5514cb441b54c..18808f9b2aa87 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregationBuilderTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregationBuilderTests.java @@ -19,6 +19,9 @@ protected RandomSamplerAggregationBuilder createTestAggregatorBuilder() { if (randomBoolean()) { builder.setSeed(randomInt()); } + if (randomBoolean()) { + builder.setShardSeed(randomInt()); + } builder.setProbability(randomFrom(1.0, randomDoubleBetween(0.0, 0.5, false))); builder.subAggregation(AggregationBuilders.max(randomAlphaOfLength(10)).field(randomAlphaOfLength(10))); return builder; diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/support/SamplingContextTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/support/SamplingContextTests.java index d9e19cf60e481..ffb56f17c7f8f 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/support/SamplingContextTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/support/SamplingContextTests.java @@ -14,10 +14,9 @@ import static org.hamcrest.Matchers.equalTo; public class SamplingContextTests extends ESTestCase { - protected static final int NUMBER_OF_TEST_RUNS = 20; private static SamplingContext randomContext() { - return new SamplingContext(randomDoubleBetween(1e-6, 0.1, false), randomInt()); + return new SamplingContext(randomDoubleBetween(1e-6, 0.1, false), randomInt(), randomBoolean() ? null : randomInt()); } public void testScaling() { @@ -41,7 +40,7 @@ public void testScaling() { } public void testNoScaling() { - SamplingContext samplingContext = new SamplingContext(1.0, randomInt()); + SamplingContext samplingContext = new SamplingContext(1.0, randomInt(), randomBoolean() ? null : randomInt()); long randomLong = randomLong(); double randomDouble = randomDouble(); assertThat(randomLong, equalTo(samplingContext.scaleDown(randomLong))); diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index 99734e5e224aa..1787638f9fdf3 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -1114,7 +1114,11 @@ public void testSupportedFieldTypes() throws IOException { // We should make sure if the builder says it supports sampling, that the internal aggregations returned override // finalizeSampling if (aggregationBuilder.supportsSampling()) { - SamplingContext randomSamplingContext = new SamplingContext(randomDoubleBetween(1e-8, 0.1, false), randomInt()); + SamplingContext randomSamplingContext = new SamplingContext( + randomDoubleBetween(1e-8, 0.1, false), + randomInt(), + randomBoolean() ? null : randomInt() + ); InternalAggregation sampledResult = internalAggregation.finalizeSampling(randomSamplingContext); assertThat(sampledResult.getClass(), equalTo(internalAggregation.getClass())); } diff --git a/test/framework/src/main/java/org/elasticsearch/telemetry/RecordingMeterRegistry.java b/test/framework/src/main/java/org/elasticsearch/telemetry/RecordingMeterRegistry.java index 86bfd9bf38c26..33693c297f166 100644 --- a/test/framework/src/main/java/org/elasticsearch/telemetry/RecordingMeterRegistry.java +++ b/test/framework/src/main/java/org/elasticsearch/telemetry/RecordingMeterRegistry.java @@ -33,7 +33,7 @@ public class RecordingMeterRegistry implements MeterRegistry { protected final MetricRecorder recorder = new MetricRecorder<>(); - MetricRecorder getRecorder() { + public MetricRecorder getRecorder() { return recorder; } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index 7b4032cc56cef..67919756e16a9 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -499,6 +499,7 @@ public void removeHeaderWarningAppender() { @Before public final void before() { + LeakTracker.setContextHint(getTestName()); logger.info("{}before test", getTestParamsForLogging()); assertNull("Thread context initialized twice", threadContext); if (enableWarningsCheck()) { @@ -530,6 +531,7 @@ public final void after() throws Exception { ensureAllSearchContextsReleased(); ensureCheckIndexPassed(); logger.info("{}after test", getTestParamsForLogging()); + LeakTracker.setContextHint(""); } private String getTestParamsForLogging() { diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java index f1b147eefe723..12c5085cbcd73 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java @@ -283,7 +283,11 @@ public void testReduceRandom() throws IOException { doAssertReducedMultiBucketConsumer(reduced, bucketConsumer); assertReduced(reduced, inputs.toReduce()); if (supportsSampling()) { - SamplingContext randomContext = new SamplingContext(randomDoubleBetween(1e-8, 0.1, false), randomInt()); + SamplingContext randomContext = new SamplingContext( + randomDoubleBetween(1e-8, 0.1, false), + randomInt(), + randomBoolean() ? null : randomInt() + ); @SuppressWarnings("unchecked") T sampled = (T) reduced.finalizeSampling(randomContext); assertSampled(sampled, reduced, randomContext); diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/ConnectorScheduling.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/ConnectorScheduling.java index 233bea5d4a842..637957b8ce66e 100644 --- a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/ConnectorScheduling.java +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/ConnectorScheduling.java @@ -30,14 +30,15 @@ public class ConnectorScheduling implements Writeable, ToXContentObject { - private final ScheduleConfig accessControl; - private final ScheduleConfig full; - private final ScheduleConfig incremental; - + private static final String EVERYDAY_AT_MIDNIGHT = "0 0 0 * * ?"; private static final ParseField ACCESS_CONTROL_FIELD = new ParseField("access_control"); private static final ParseField FULL_FIELD = new ParseField("full"); private static final ParseField INCREMENTAL_FIELD = new ParseField("incremental"); + private final ScheduleConfig accessControl; + private final ScheduleConfig full; + private final ScheduleConfig incremental; + /** * @param accessControl connector access control sync schedule represented as {@link ScheduleConfig} * @param full connector full sync schedule represented as {@link ScheduleConfig} @@ -238,12 +239,19 @@ public ScheduleConfig build() { } } + /** + * Default scheduling is set to everyday at midnight (00:00:00). + * + * @return default scheduling for full, incremental and access control syncs. + */ public static ConnectorScheduling getDefaultConnectorScheduling() { return new ConnectorScheduling.Builder().setAccessControl( - new ConnectorScheduling.ScheduleConfig.Builder().setEnabled(false).setInterval(new Cron("0 0 0 * * ?")).build() + new ConnectorScheduling.ScheduleConfig.Builder().setEnabled(false).setInterval(new Cron(EVERYDAY_AT_MIDNIGHT)).build() ) - .setFull(new ConnectorScheduling.ScheduleConfig.Builder().setEnabled(false).setInterval(new Cron("0 0 0 * * ?")).build()) - .setIncremental(new ConnectorScheduling.ScheduleConfig.Builder().setEnabled(false).setInterval(new Cron("0 0 0 * * ?")).build()) + .setFull(new ConnectorScheduling.ScheduleConfig.Builder().setEnabled(false).setInterval(new Cron(EVERYDAY_AT_MIDNIGHT)).build()) + .setIncremental( + new ConnectorScheduling.ScheduleConfig.Builder().setEnabled(false).setInterval(new Cron(EVERYDAY_AT_MIDNIGHT)).build() + ) .build(); } } diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/ConnectorStateMachine.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/ConnectorStateMachine.java index 39a12ba334c30..f722955cc0f9e 100644 --- a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/ConnectorStateMachine.java +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/ConnectorStateMachine.java @@ -42,6 +42,13 @@ public static boolean isValidTransition(ConnectorStatus current, ConnectorStatus return validNextStates(current).contains(next); } + /** + * Throws {@link ConnectorInvalidStatusTransitionException} if a + * transition from one {@link ConnectorStatus} to another is invalid. + * + * @param current The current {@link ConnectorStatus} of the {@link Connector}. + * @param next The proposed next {@link ConnectorStatus} of the {@link Connector}. + */ public static void assertValidStateTransition(ConnectorStatus current, ConnectorStatus next) throws ConnectorInvalidStatusTransitionException { if (isValidTransition(current, next)) return; diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexService.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexService.java index b1c08d8b7fbb1..910f0605ef7aa 100644 --- a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexService.java +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexService.java @@ -24,7 +24,6 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.internal.Client; -import org.elasticsearch.common.UUIDs; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.index.query.BoolQueryBuilder; @@ -416,14 +415,6 @@ public void updateConnectorSyncJobIngestionStats( } - private String generateId() { - /* Workaround: only needed for generating an id upfront, autoGenerateId() has a side effect generating a timestamp, - * which would raise an error on the response layer later ("autoGeneratedTimestamp should not be set externally"). - * TODO: do we even need to copy the "_id" and set it as "id"? - */ - return UUIDs.base64UUID(); - } - private void getSyncJobConnectorInfo(String connectorId, ActionListener listener) { try { diff --git a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/ConnectorStateMachineTests.java b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/ConnectorStateMachineTests.java index 372c874310162..d1f08f80d02f2 100644 --- a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/ConnectorStateMachineTests.java +++ b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/ConnectorStateMachineTests.java @@ -65,4 +65,31 @@ public void testTransitionToSameState() { assertFalse("Transition from " + state + " to itself should be invalid", ConnectorStateMachine.isValidTransition(state, state)); } } + + public void testAssertValidStateTransition_ExpectExceptionOnInvalidTransition() { + assertThrows( + ConnectorInvalidStatusTransitionException.class, + () -> ConnectorStateMachine.assertValidStateTransition(ConnectorStatus.CREATED, ConnectorStatus.CONFIGURED) + ); + } + + public void testAssertValidStateTransition_ExpectNoExceptionOnValidTransition() { + ConnectorStatus prevStatus = ConnectorStatus.CREATED; + ConnectorStatus nextStatus = ConnectorStatus.ERROR; + + try { + ConnectorStateMachine.assertValidStateTransition(prevStatus, nextStatus); + } catch (ConnectorInvalidStatusTransitionException e) { + fail( + "Did not expect " + + ConnectorInvalidStatusTransitionException.class.getSimpleName() + + " to be thrown for valid state transition [" + + prevStatus + + "] -> " + + "[" + + nextStatus + + "]." + ); + } + } } diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformInsufficientPermissionsIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformInsufficientPermissionsIT.java index dc48ceb7b309b..59a673790723e 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformInsufficientPermissionsIT.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformInsufficientPermissionsIT.java @@ -418,16 +418,14 @@ public void testTransformPermissionsDeferUnattendedNoDest() throws Exception { ); assertRed(transformId, authIssue); - startTransform(config.getId(), RequestOptions.DEFAULT); - - // Give the transform indexer enough time to try creating destination index - Thread.sleep(5_000); + startTransform(transformId, RequestOptions.DEFAULT); String destIndexIssue = Strings.format("Could not create destination index [%s] for transform [%s]", destIndexName, transformId); // transform's auth state status is still RED due to: // - lacking permissions // - and the inability to create destination index in the indexer (which is also a consequence of lacking permissions) - assertRed(transformId, authIssue, destIndexIssue); + // wait for 10 seconds to give the transform indexer enough time to try creating destination index + assertBusy(() -> { assertRed(transformId, authIssue, destIndexIssue); }); // update transform's credentials so that the transform has permission to access source/dest indices updateConfig(transformId, "{}", RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, Users.SENIOR.header).build()); @@ -593,5 +591,7 @@ private void assertRed(String transformId, String... expectedHealthIssueDetails) .map(issue -> (String) extractValue((Map) issue, "details")) .collect(toSet()); assertThat("Stats were: " + stats, actualHealthIssueDetailsSet, containsInAnyOrder(expectedHealthIssueDetails)); + // We should not progress beyond the 0th checkpoint until we correctly configure the Transform. + assertThat("Stats were: " + stats, getCheckpoint(stats), equalTo(0L)); } } diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java index eed849d35ea44..897de6c120a8b 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java @@ -294,14 +294,15 @@ protected void waitUntilCheckpoint(String id, long checkpoint) throws Exception } protected void waitUntilCheckpoint(String id, long checkpoint, TimeValue waitTime) throws Exception { - assertBusy( - () -> assertEquals( - checkpoint, - ((Integer) XContentMapValues.extractValue("checkpointing.last.checkpoint", getBasicTransformStats(id))).longValue() - ), - waitTime.getMillis(), - TimeUnit.MILLISECONDS - ); + assertBusy(() -> assertEquals(checkpoint, getCheckpoint(id)), waitTime.getMillis(), TimeUnit.MILLISECONDS); + } + + protected long getCheckpoint(String id) throws IOException { + return getCheckpoint(getBasicTransformStats(id)); + } + + protected long getCheckpoint(Map stats) { + return ((Integer) XContentMapValues.extractValue("checkpointing.last.checkpoint", stats)).longValue(); } protected DateHistogramGroupSource createDateHistogramGroupSourceWithFixedInterval( diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 4b2da731351d7..ff52f5e267655 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -333,6 +333,9 @@ protected void onStart(long now, ActionListener listener) { } }, listener::onFailure); + var shouldMaybeCreateDestIndexForUnattended = context.getCheckpoint() == 0 + && Boolean.TRUE.equals(transformConfig.getSettings().getUnattended()); + ActionListener> fieldMappingsListener = ActionListener.wrap(destIndexMappings -> { if (destIndexMappings.isEmpty() == false) { // If we managed to fetch destination index mappings, we use them from now on ... @@ -344,9 +347,7 @@ protected void onStart(long now, ActionListener listener) { // Since the unattended transform could not have created the destination index yet, we do it here. // This is important to create the destination index explicitly before indexing first documents. Otherwise, the destination // index aliases may be missing. - if (destIndexMappings.isEmpty() - && context.getCheckpoint() == 0 - && Boolean.TRUE.equals(transformConfig.getSettings().getUnattended())) { + if (destIndexMappings.isEmpty() && shouldMaybeCreateDestIndexForUnattended) { doMaybeCreateDestIndex(deducedDestIndexMappings.get(), configurationReadyListener); } else { configurationReadyListener.onResponse(null); @@ -364,7 +365,7 @@ protected void onStart(long now, ActionListener listener) { deducedDestIndexMappings.set(validationResponse.getDestIndexMappings()); if (isContinuous()) { transformsConfigManager.getTransformConfiguration(getJobId(), ActionListener.wrap(config -> { - if (transformConfig.equals(config) && fieldMappings != null) { + if (transformConfig.equals(config) && fieldMappings != null && shouldMaybeCreateDestIndexForUnattended == false) { logger.trace("[{}] transform config has not changed.", getJobId()); configurationReadyListener.onResponse(null); } else {