Skip to content

Commit

Permalink
Merge branch 'main' into 2024/02/22/assert-cache-file-reads-not-excee…
Browse files Browse the repository at this point in the history
…d-file-length
  • Loading branch information
tlrx committed Feb 23, 2024
2 parents 87d51b3 + f86532b commit 08ab2c4
Show file tree
Hide file tree
Showing 41 changed files with 932 additions and 652 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/104830.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 104830
summary: All new `shard_seed` parameter for `random_sampler` agg
area: Aggregations
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ information, refer to https://www.elastic.co/subscriptions.
[[remote-clusters-security-api-key]]
==== Establish trust with a remote cluster

NOTE: If a remote cluster is part of an {ess} deployment, it has a valid certificate by default.
You can therefore skip steps related to certificates in these instructions.

===== On the remote cluster

// tag::remote-cluster-steps[]
Expand Down
6 changes: 4 additions & 2 deletions docs/reference/watcher/actions/email.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,10 @@ killed by firewalls or load balancers in-between.
means, by default watcher tries to download a dashboard for 10 minutes,
forty times fifteen seconds). The setting `xpack.notification.reporting.interval`
can be configured globally to change the default.
| `request.auth` | Additional auth configuration for the request
| `request.proxy` | Additional proxy configuration for the request
| `auth` | Additional auth configuration for the request, see
{kibana-ref}/automating-report-generation.html#use-watcher[use watcher] for details
| `proxy` | Additional proxy configuration for the request. See <<http-input-attributes>>
on how to configure the values.
|======


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.AggregatorReducer;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketAggregatorsReducer;
import org.elasticsearch.search.aggregations.support.SamplingContext;
import org.elasticsearch.xcontent.XContentBuilder;

Expand Down Expand Up @@ -177,30 +179,38 @@ public InternalBucket getBucketByKey(String key) {

@Override
protected AggregatorReducer getLeaderReducer(AggregationReduceContext reduceContext, int size) {
Map<String, List<InternalBucket>> bucketsMap = new HashMap<>();
return new AggregatorReducer() {
final Map<String, MultiBucketAggregatorsReducer> bucketsReducer = new HashMap<>(getBuckets().size());

@Override
public void accept(InternalAggregation aggregation) {
InternalAdjacencyMatrix filters = (InternalAdjacencyMatrix) aggregation;
final InternalAdjacencyMatrix filters = (InternalAdjacencyMatrix) aggregation;
for (InternalBucket bucket : filters.buckets) {
List<InternalBucket> sameRangeList = bucketsMap.computeIfAbsent(bucket.key, k -> new ArrayList<>(size));
sameRangeList.add(bucket);
MultiBucketAggregatorsReducer reducer = bucketsReducer.computeIfAbsent(
bucket.key,
k -> new MultiBucketAggregatorsReducer(reduceContext, size)
);
reducer.accept(bucket);
}
}

@Override
public InternalAggregation get() {
List<InternalBucket> reducedBuckets = new ArrayList<>(bucketsMap.size());
for (List<InternalBucket> sameRangeList : bucketsMap.values()) {
InternalBucket reducedBucket = reduceBucket(sameRangeList, reduceContext);
if (reducedBucket.docCount >= 1) {
reducedBuckets.add(reducedBucket);
List<InternalBucket> reducedBuckets = new ArrayList<>(bucketsReducer.size());
for (Map.Entry<String, MultiBucketAggregatorsReducer> entry : bucketsReducer.entrySet()) {
if (entry.getValue().getDocCount() >= 1) {
reducedBuckets.add(new InternalBucket(entry.getKey(), entry.getValue().getDocCount(), entry.getValue().get()));
}
}
reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size());
reducedBuckets.sort(Comparator.comparing(InternalBucket::getKey));
return new InternalAdjacencyMatrix(name, reducedBuckets, getMetadata());
}

@Override
public void close() {
Releasables.close(bucketsReducer.values());
}
};
}

Expand All @@ -209,21 +219,6 @@ public InternalAggregation finalizeSampling(SamplingContext samplingContext) {
return new InternalAdjacencyMatrix(name, buckets.stream().map(b -> b.finalizeSampling(samplingContext)).toList(), getMetadata());
}

private InternalBucket reduceBucket(List<InternalBucket> buckets, AggregationReduceContext context) {
assert buckets.isEmpty() == false;
InternalBucket reduced = null;
for (InternalBucket bucket : buckets) {
if (reduced == null) {
reduced = new InternalBucket(bucket.key, bucket.docCount, bucket.aggregations);
} else {
reduced.docCount += bucket.docCount;
}
}
final List<InternalAggregations> aggregations = new BucketAggregationList<>(buckets);
reduced.aggregations = InternalAggregations.reduce(aggregations, context);
return reduced;
}

@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.startArray(CommonFields.BUCKETS.getPreferredName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.repositories.RepositoriesMetrics;
import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository;
import org.elasticsearch.xcontent.NamedXContentRegistry;

Expand Down Expand Up @@ -108,8 +107,7 @@ public AzureRepository(
bigArrays,
recoverySettings,
buildBasePath(metadata),
buildLocation(metadata),
RepositoriesMetrics.NOOP
buildLocation(metadata)
);
this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings());
this.storageService = storageService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.repositories.RepositoriesMetrics;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository;
import org.elasticsearch.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -77,8 +76,7 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository {
bigArrays,
recoverySettings,
buildBasePath(metadata),
buildLocation(metadata),
RepositoriesMetrics.NOOP
buildLocation(metadata)
);
this.storageService = storageService;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.repositories.RepositoriesMetrics;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
Expand Down Expand Up @@ -460,9 +459,9 @@ protected S3Repository createRepository(
ClusterService clusterService,
BigArrays bigArrays,
RecoverySettings recoverySettings,
RepositoriesMetrics repositoriesMetrics
S3RepositoriesMetrics s3RepositoriesMetrics
) {
return new S3Repository(metadata, registry, getService(), clusterService, bigArrays, recoverySettings, repositoriesMetrics) {
return new S3Repository(metadata, registry, getService(), clusterService, bigArrays, recoverySettings, s3RepositoriesMetrics) {

@Override
public BlobStore blobStore() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase;
import org.elasticsearch.repositories.RepositoriesMetrics;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.fixtures.minio.MinioTestContainer;
Expand Down Expand Up @@ -145,7 +144,7 @@ public long absoluteTimeInMillis() {
ClusterServiceUtils.createClusterService(threadpool),
BigArrays.NON_RECYCLING_INSTANCE,
new RecoverySettings(node().settings(), node().injector().getInstance(ClusterService.class).getClusterSettings()),
RepositoriesMetrics.NOOP
S3RepositoriesMetrics.NOOP
)
) {
repository.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.repositories.RepositoriesMetrics;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
Expand Down Expand Up @@ -84,7 +83,7 @@ class S3BlobStore implements BlobStore {

private final ThreadPool threadPool;
private final Executor snapshotExecutor;
private final RepositoriesMetrics repositoriesMetrics;
private final S3RepositoriesMetrics s3RepositoriesMetrics;

private final StatsCollectors statsCollectors = new StatsCollectors();

Expand All @@ -98,7 +97,7 @@ class S3BlobStore implements BlobStore {
RepositoryMetadata repositoryMetadata,
BigArrays bigArrays,
ThreadPool threadPool,
RepositoriesMetrics repositoriesMetrics
S3RepositoriesMetrics s3RepositoriesMetrics
) {
this.service = service;
this.bigArrays = bigArrays;
Expand All @@ -110,7 +109,7 @@ class S3BlobStore implements BlobStore {
this.repositoryMetadata = repositoryMetadata;
this.threadPool = threadPool;
this.snapshotExecutor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
this.repositoriesMetrics = repositoriesMetrics;
this.s3RepositoriesMetrics = s3RepositoriesMetrics;
}

RequestMetricCollector getMetricCollector(Operation operation, OperationPurpose purpose) {
Expand Down Expand Up @@ -174,19 +173,19 @@ public final void collectMetrics(Request<?> request, Response<?> response) {
.map(List::size)
.orElse(0);

repositoriesMetrics.operationCounter().incrementBy(1, attributes);
s3RepositoriesMetrics.common().operationCounter().incrementBy(1, attributes);
if (numberOfAwsErrors == requestCount) {
repositoriesMetrics.unsuccessfulOperationCounter().incrementBy(1, attributes);
s3RepositoriesMetrics.common().unsuccessfulOperationCounter().incrementBy(1, attributes);
}

repositoriesMetrics.requestCounter().incrementBy(requestCount, attributes);
s3RepositoriesMetrics.common().requestCounter().incrementBy(requestCount, attributes);
if (exceptionCount > 0) {
repositoriesMetrics.exceptionCounter().incrementBy(exceptionCount, attributes);
repositoriesMetrics.exceptionHistogram().record(exceptionCount, attributes);
s3RepositoriesMetrics.common().exceptionCounter().incrementBy(exceptionCount, attributes);
s3RepositoriesMetrics.common().exceptionHistogram().record(exceptionCount, attributes);
}
if (throttleCount > 0) {
repositoriesMetrics.throttleCounter().incrementBy(throttleCount, attributes);
repositoriesMetrics.throttleHistogram().record(throttleCount, attributes);
s3RepositoriesMetrics.common().throttleCounter().incrementBy(throttleCount, attributes);
s3RepositoriesMetrics.common().throttleHistogram().record(throttleCount, attributes);
}
maybeRecordHttpRequestTime(request);
}
Expand All @@ -207,7 +206,7 @@ private void maybeRecordHttpRequestTime(Request<?> request) {
if (totalTimeInMicros == 0) {
logger.warn("Expected HttpRequestTime to be tracked for request [{}] but found no count.", request);
} else {
repositoriesMetrics.httpRequestTimeInMicroHistogram().record(totalTimeInMicros, attributes);
s3RepositoriesMetrics.common().httpRequestTimeInMicroHistogram().record(totalTimeInMicros, attributes);
}
}

Expand Down Expand Up @@ -293,6 +292,14 @@ public long bufferSizeInBytes() {
return bufferSize.getBytes();
}

public RepositoryMetadata getRepositoryMetadata() {
return repositoryMetadata;
}

public S3RepositoriesMetrics getS3RepositoriesMetrics() {
return s3RepositoriesMetrics;
}

@Override
public BlobContainer blobContainer(BlobPath path) {
return new S3BlobContainer(path, this);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.repositories.s3;

import org.elasticsearch.repositories.RepositoriesMetrics;
import org.elasticsearch.telemetry.metric.LongCounter;
import org.elasticsearch.telemetry.metric.LongHistogram;

public record S3RepositoriesMetrics(
RepositoriesMetrics common,
LongCounter retryStartedCounter,
LongCounter retryCompletedCounter,
LongHistogram retryHistogram
) {

public static S3RepositoriesMetrics NOOP = new S3RepositoriesMetrics(RepositoriesMetrics.NOOP);

public static final String METRIC_RETRY_EVENT_TOTAL = "es.repositories.s3.input_stream.retry.event.total";
public static final String METRIC_RETRY_SUCCESS_TOTAL = "es.repositories.s3.input_stream.retry.success.total";
public static final String METRIC_RETRY_ATTEMPTS_HISTOGRAM = "es.repositories.s3.input_stream.retry.attempts.histogram";

public S3RepositoriesMetrics(RepositoriesMetrics common) {
this(
common,
common.meterRegistry().registerLongCounter(METRIC_RETRY_EVENT_TOTAL, "s3 input stream retry event count", "unit"),
common.meterRegistry().registerLongCounter(METRIC_RETRY_SUCCESS_TOTAL, "s3 input stream retry success count", "unit"),
common.meterRegistry()
.registerLongHistogram(METRIC_RETRY_ATTEMPTS_HISTOGRAM, "s3 input stream retry attempts histogram", "unit")
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.repositories.FinalizeSnapshotContext;
import org.elasticsearch.repositories.RepositoriesMetrics;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository;
Expand Down Expand Up @@ -195,6 +194,8 @@ class S3Repository extends MeteredBlobStoreRepository {

private final Executor snapshotExecutor;

private final S3RepositoriesMetrics s3RepositoriesMetrics;

/**
* Constructs an s3 backed repository
*/
Expand All @@ -205,7 +206,7 @@ class S3Repository extends MeteredBlobStoreRepository {
final ClusterService clusterService,
final BigArrays bigArrays,
final RecoverySettings recoverySettings,
final RepositoriesMetrics repositoriesMetrics
final S3RepositoriesMetrics s3RepositoriesMetrics
) {
super(
metadata,
Expand All @@ -214,10 +215,10 @@ class S3Repository extends MeteredBlobStoreRepository {
bigArrays,
recoverySettings,
buildBasePath(metadata),
buildLocation(metadata),
repositoriesMetrics
buildLocation(metadata)
);
this.service = service;
this.s3RepositoriesMetrics = s3RepositoriesMetrics;
this.snapshotExecutor = threadPool().executor(ThreadPool.Names.SNAPSHOT);

// Parse and validate the user's S3 Storage Class setting
Expand Down Expand Up @@ -408,7 +409,7 @@ protected S3BlobStore createBlobStore() {
metadata,
bigArrays,
threadPool,
repositoriesMetrics
s3RepositoriesMetrics
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ protected S3Repository createRepository(
final ClusterService clusterService,
final BigArrays bigArrays,
final RecoverySettings recoverySettings,
final RepositoriesMetrics repositoriesMetrics
final S3RepositoriesMetrics s3RepositoriesMetrics
) {
return new S3Repository(metadata, registry, service.get(), clusterService, bigArrays, recoverySettings, repositoriesMetrics);
return new S3Repository(metadata, registry, service.get(), clusterService, bigArrays, recoverySettings, s3RepositoriesMetrics);
}

@Override
Expand All @@ -101,11 +101,12 @@ public Map<String, Repository.Factory> getRepositories(
final ClusterService clusterService,
final BigArrays bigArrays,
final RecoverySettings recoverySettings,
RepositoriesMetrics repositoriesMetrics
final RepositoriesMetrics repositoriesMetrics
) {
final S3RepositoriesMetrics s3RepositoriesMetrics = new S3RepositoriesMetrics(repositoriesMetrics);
return Collections.singletonMap(
S3Repository.TYPE,
metadata -> createRepository(metadata, registry, clusterService, bigArrays, recoverySettings, repositoriesMetrics)
metadata -> createRepository(metadata, registry, clusterService, bigArrays, recoverySettings, s3RepositoriesMetrics)
);
}

Expand Down
Loading

0 comments on commit 08ab2c4

Please sign in to comment.