From b95cb8c40dc701cbce18308cdcd5538fd60facb9 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 23 Feb 2024 06:51:38 +0000 Subject: [PATCH 01/11] Introduce `TransportGetSnapshotsAction#GetSnapshotsOperation` (#105609) This commit introduces a class to encapsulate each invocation of `TransportGetSnapshotsAction`, avoiding the need to pass around a huge list of parameters between all the methods that make up the implementation. --- .../get/TransportGetSnapshotsAction.java | 983 +++++++++--------- 1 file changed, 465 insertions(+), 518 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java index ca910a8d94078..4996096492354 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java @@ -109,598 +109,514 @@ protected void masterOperation( ) { assert task instanceof CancellableTask : task + " not cancellable"; - getMultipleReposSnapshotInfo( - request.isSingleRepositoryRequest() == false, - SnapshotsInProgress.get(state), + new GetSnapshotsOperation( + (CancellableTask) task, TransportGetRepositoriesAction.getRepositories(state, request.repositories()), + request.isSingleRepositoryRequest() == false, request.snapshots(), request.ignoreUnavailable(), - request.verbose(), - (CancellableTask) task, + SnapshotPredicates.fromRequest(request), request.sort(), - request.after(), - request.offset(), - request.size(), request.order(), request.fromSortValue(), - SnapshotPredicates.fromRequest(request), - request.includeIndexNames(), - listener - ); + request.offset(), + request.after(), + request.size(), + SnapshotsInProgress.get(state), + request.verbose(), + request.includeIndexNames() + ).getMultipleReposSnapshotInfo(listener); } /** - * Filters the list of repositories that a request will fetch snapshots from in the special case of sorting by repository - * name and having a non-null value for {@link GetSnapshotsRequest#fromSortValue()} on the request to exclude repositories outside - * the sort value range if possible. + * A single invocation of the get-snapshots API. + *

+ * Decides which repositories to query, picks a collection of candidate {@link SnapshotId} values from each {@link RepositoryData}, + * chosen according to the request parameters, loads the relevant {@link SnapshotInfo} blobs, and finally sorts and filters the + * results. */ - private static List maybeFilterRepositories( - List repositories, - GetSnapshotsRequest.SortBy sortBy, - SortOrder order, - @Nullable String fromSortValue - ) { - if (sortBy != GetSnapshotsRequest.SortBy.REPOSITORY || fromSortValue == null) { - return repositories; - } - final Predicate predicate = order == SortOrder.ASC - ? repositoryMetadata -> fromSortValue.compareTo(repositoryMetadata.name()) <= 0 - : repositoryMetadata -> fromSortValue.compareTo(repositoryMetadata.name()) >= 0; - return repositories.stream().filter(predicate).toList(); - } - - private void getMultipleReposSnapshotInfo( - boolean isMultiRepoRequest, - SnapshotsInProgress snapshotsInProgress, - TransportGetRepositoriesAction.RepositoriesResult repositoriesResult, - String[] snapshots, - boolean ignoreUnavailable, - boolean verbose, - CancellableTask cancellableTask, - GetSnapshotsRequest.SortBy sortBy, - @Nullable GetSnapshotsRequest.After after, - int offset, - int size, - SortOrder order, - String fromSortValue, - SnapshotPredicates predicates, - boolean indices, - ActionListener listener - ) { - // Process the missing repositories - final Map failures = ConcurrentCollections.newConcurrentMap(); - for (String missingRepo : repositoriesResult.missing()) { - failures.put(missingRepo, new RepositoryMissingException(missingRepo)); + private class GetSnapshotsOperation { + private final CancellableTask cancellableTask; + + // repositories + private final List repositories; + private final boolean isMultiRepoRequest; + + // snapshots selection + private final String[] snapshots; + private final boolean ignoreUnavailable; + private final SnapshotPredicates predicates; + + // snapshot ordering/pagination + private final GetSnapshotsRequest.SortBy sortBy; + private final SortOrder order; + @Nullable + private final String fromSortValue; + private final int offset; + @Nullable + private final GetSnapshotsRequest.After after; + private final int size; + + // current state + private final SnapshotsInProgress snapshotsInProgress; + + // output detail + private final boolean verbose; + private final boolean indices; + + // results + private final Map failuresByRepository = ConcurrentCollections.newConcurrentMap(); + private final Queue> allSnapshotInfos = ConcurrentCollections.newQueue(); + private final AtomicInteger remaining = new AtomicInteger(); + private final AtomicInteger totalCount = new AtomicInteger(); + + GetSnapshotsOperation( + CancellableTask cancellableTask, + TransportGetRepositoriesAction.RepositoriesResult repositoriesResult, + boolean isMultiRepoRequest, + String[] snapshots, + boolean ignoreUnavailable, + SnapshotPredicates predicates, + GetSnapshotsRequest.SortBy sortBy, + SortOrder order, + String fromSortValue, + int offset, + GetSnapshotsRequest.After after, + int size, + SnapshotsInProgress snapshotsInProgress, + boolean verbose, + boolean indices + ) { + this.cancellableTask = cancellableTask; + this.repositories = repositoriesResult.metadata(); + this.isMultiRepoRequest = isMultiRepoRequest; + this.snapshots = snapshots; + this.ignoreUnavailable = ignoreUnavailable; + this.predicates = predicates; + this.sortBy = sortBy; + this.order = order; + this.fromSortValue = fromSortValue; + this.offset = offset; + this.after = after; + this.size = size; + this.snapshotsInProgress = snapshotsInProgress; + this.verbose = verbose; + this.indices = indices; + + for (final var missingRepo : repositoriesResult.missing()) { + failuresByRepository.put(missingRepo, new RepositoryMissingException(missingRepo)); + } } - final Queue> allSnapshotInfos = ConcurrentCollections.newQueue(); - final var remaining = new AtomicInteger(); - final var totalCount = new AtomicInteger(); - - List repositories = maybeFilterRepositories(repositoriesResult.metadata(), sortBy, order, fromSortValue); - try (var listeners = new RefCountingListener(listener.map(ignored -> { - cancellableTask.ensureNotCancelled(); - final var sortedSnapshotsInRepos = sortSnapshots( - allSnapshotInfos.stream().flatMap(Collection::stream), - totalCount.get(), - sortBy, - after, - offset, - size, - order - ); - final var snapshotInfos = sortedSnapshotsInRepos.snapshotInfos(); - assert indices || snapshotInfos.stream().allMatch(snapshotInfo -> snapshotInfo.indices().isEmpty()); - final int finalRemaining = sortedSnapshotsInRepos.remaining() + remaining.get(); - return new GetSnapshotsResponse( - snapshotInfos, - failures, - finalRemaining > 0 - ? GetSnapshotsRequest.After.from(snapshotInfos.get(snapshotInfos.size() - 1), sortBy).asQueryParam() - : null, - totalCount.get(), - finalRemaining - ); - }))) { - for (final RepositoryMetadata repository : repositories) { - final String repoName = repository.name(); - getSingleRepoSnapshotInfo( - snapshotsInProgress, - repoName, - snapshots, - predicates, - ignoreUnavailable, - verbose, - cancellableTask, - sortBy, - after, - order, - indices, - listeners.acquire((SnapshotsInRepo snapshotsInRepo) -> { + /** + * Filters the list of repositories that a request will fetch snapshots from in the special case of sorting by repository + * name and having a non-null value for {@link GetSnapshotsRequest#fromSortValue()} on the request to exclude repositories outside + * the sort value range if possible. + */ + private List maybeFilterRepositories() { + if (sortBy != GetSnapshotsRequest.SortBy.REPOSITORY || fromSortValue == null) { + return repositories; + } + final Predicate predicate = order == SortOrder.ASC + ? repositoryMetadata -> fromSortValue.compareTo(repositoryMetadata.name()) <= 0 + : repositoryMetadata -> fromSortValue.compareTo(repositoryMetadata.name()) >= 0; + return repositories.stream().filter(predicate).toList(); + } + + void getMultipleReposSnapshotInfo(ActionListener listener) { + List filteredRepositories = maybeFilterRepositories(); + try (var listeners = new RefCountingListener(listener.map(ignored -> { + cancellableTask.ensureNotCancelled(); + final var sortedSnapshotsInRepos = sortSnapshots( + allSnapshotInfos.stream().flatMap(Collection::stream), + totalCount.get(), + offset, + size + ); + final var snapshotInfos = sortedSnapshotsInRepos.snapshotInfos(); + assert indices || snapshotInfos.stream().allMatch(snapshotInfo -> snapshotInfo.indices().isEmpty()); + final int finalRemaining = sortedSnapshotsInRepos.remaining() + remaining.get(); + return new GetSnapshotsResponse( + snapshotInfos, + failuresByRepository, + finalRemaining > 0 + ? GetSnapshotsRequest.After.from(snapshotInfos.get(snapshotInfos.size() - 1), sortBy).asQueryParam() + : null, + totalCount.get(), + finalRemaining + ); + }))) { + for (final RepositoryMetadata repository : filteredRepositories) { + final String repoName = repository.name(); + getSingleRepoSnapshotInfo(repoName, listeners.acquire((SnapshotsInRepo snapshotsInRepo) -> { allSnapshotInfos.add(snapshotsInRepo.snapshotInfos()); remaining.addAndGet(snapshotsInRepo.remaining()); totalCount.addAndGet(snapshotsInRepo.totalCount()); }).delegateResponse((l, e) -> { if (isMultiRepoRequest && e instanceof ElasticsearchException elasticsearchException) { - failures.put(repoName, elasticsearchException); + failuresByRepository.put(repoName, elasticsearchException); l.onResponse(SnapshotsInRepo.EMPTY); } else { l.onFailure(e); } - }) - ); + })); + } } } - } - private void getSingleRepoSnapshotInfo( - SnapshotsInProgress snapshotsInProgress, - String repo, - String[] snapshots, - SnapshotPredicates predicates, - boolean ignoreUnavailable, - boolean verbose, - CancellableTask task, - GetSnapshotsRequest.SortBy sortBy, - @Nullable final GetSnapshotsRequest.After after, - SortOrder order, - boolean indices, - ActionListener listener - ) { - final Map allSnapshotIds = new HashMap<>(); - final List currentSnapshots = new ArrayList<>(); - for (SnapshotInfo snapshotInfo : currentSnapshots(snapshotsInProgress, repo)) { - Snapshot snapshot = snapshotInfo.snapshot(); - allSnapshotIds.put(snapshot.getSnapshotId().getName(), snapshot); - currentSnapshots.add(snapshotInfo.maybeWithoutIndices(indices)); - } + private void getSingleRepoSnapshotInfo(String repo, ActionListener listener) { + final Map allSnapshotIds = new HashMap<>(); + final List currentSnapshots = new ArrayList<>(); + for (final SnapshotInfo snapshotInfo : currentSnapshots(repo)) { + Snapshot snapshot = snapshotInfo.snapshot(); + allSnapshotIds.put(snapshot.getSnapshotId().getName(), snapshot); + currentSnapshots.add(snapshotInfo.maybeWithoutIndices(indices)); + } - final ListenableFuture repositoryDataListener = new ListenableFuture<>(); - if (isCurrentSnapshotsOnly(snapshots)) { - repositoryDataListener.onResponse(null); - } else { - repositoriesService.getRepositoryData(repo, repositoryDataListener); - } + final ListenableFuture repositoryDataListener = new ListenableFuture<>(); + if (isCurrentSnapshotsOnly()) { + repositoryDataListener.onResponse(null); + } else { + repositoriesService.getRepositoryData(repo, repositoryDataListener); + } - repositoryDataListener.addListener( - listener.delegateFailureAndWrap( - (l, repositoryData) -> loadSnapshotInfos( - snapshotsInProgress, - repo, - snapshots, - ignoreUnavailable, - verbose, - allSnapshotIds, - currentSnapshots, - repositoryData, - task, - sortBy, - after, - order, - predicates, - indices, - l + repositoryDataListener.addListener( + listener.delegateFailureAndWrap( + (l, repositoryData) -> loadSnapshotInfos(repo, allSnapshotIds, currentSnapshots, repositoryData, l) ) - ) - ); - } - - /** - * Returns a list of currently running snapshots from repository sorted by snapshot creation date - * - * @param snapshotsInProgress snapshots in progress in the cluster state - * @param repositoryName repository name - * @return list of snapshots - */ - private static List currentSnapshots(SnapshotsInProgress snapshotsInProgress, String repositoryName) { - List snapshotList = new ArrayList<>(); - List entries = SnapshotsService.currentSnapshots( - snapshotsInProgress, - repositoryName, - Collections.emptyList() - ); - for (SnapshotsInProgress.Entry entry : entries) { - snapshotList.add(SnapshotInfo.inProgress(entry)); + ); } - return snapshotList; - } - private void loadSnapshotInfos( - SnapshotsInProgress snapshotsInProgress, - String repo, - String[] snapshots, - boolean ignoreUnavailable, - boolean verbose, - Map allSnapshotIds, - List currentSnapshots, - @Nullable RepositoryData repositoryData, - CancellableTask task, - GetSnapshotsRequest.SortBy sortBy, - @Nullable final GetSnapshotsRequest.After after, - SortOrder order, - SnapshotPredicates predicates, - boolean indices, - ActionListener listener - ) { - if (task.notifyIfCancelled(listener)) { - return; + /** + * Returns a list of currently running snapshots from repository sorted by snapshot creation date + * + * @param repositoryName repository name + * @return list of snapshots + */ + private List currentSnapshots(String repositoryName) { + List snapshotList = new ArrayList<>(); + List entries = SnapshotsService.currentSnapshots( + snapshotsInProgress, + repositoryName, + Collections.emptyList() + ); + for (SnapshotsInProgress.Entry entry : entries) { + snapshotList.add(SnapshotInfo.inProgress(entry)); + } + return snapshotList; } - if (repositoryData != null) { - for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) { - if (predicates.test(snapshotId, repositoryData)) { - allSnapshotIds.put(snapshotId.getName(), new Snapshot(repo, snapshotId)); + private void loadSnapshotInfos( + String repo, + Map allSnapshotIds, + List currentSnapshots, + @Nullable RepositoryData repositoryData, + ActionListener listener + ) { + if (cancellableTask.notifyIfCancelled(listener)) { + return; + } + + if (repositoryData != null) { + for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) { + if (predicates.test(snapshotId, repositoryData)) { + allSnapshotIds.put(snapshotId.getName(), new Snapshot(repo, snapshotId)); + } } } - } - final Set toResolve = new HashSet<>(); - if (TransportGetRepositoriesAction.isMatchAll(snapshots)) { - toResolve.addAll(allSnapshotIds.values()); - } else { - final List includePatterns = new ArrayList<>(); - final List excludePatterns = new ArrayList<>(); - boolean hasCurrent = false; - boolean seenWildcard = false; - for (String snapshotOrPattern : snapshots) { - if (seenWildcard && snapshotOrPattern.length() > 1 && snapshotOrPattern.startsWith("-")) { - excludePatterns.add(snapshotOrPattern.substring(1)); - } else { - if (Regex.isSimpleMatchPattern(snapshotOrPattern)) { - seenWildcard = true; - includePatterns.add(snapshotOrPattern); - } else if (GetSnapshotsRequest.CURRENT_SNAPSHOT.equalsIgnoreCase(snapshotOrPattern)) { - hasCurrent = true; - seenWildcard = true; + final Set toResolve = new HashSet<>(); + if (TransportGetRepositoriesAction.isMatchAll(snapshots)) { + toResolve.addAll(allSnapshotIds.values()); + } else { + final List includePatterns = new ArrayList<>(); + final List excludePatterns = new ArrayList<>(); + boolean hasCurrent = false; + boolean seenWildcard = false; + for (String snapshotOrPattern : snapshots) { + if (seenWildcard && snapshotOrPattern.length() > 1 && snapshotOrPattern.startsWith("-")) { + excludePatterns.add(snapshotOrPattern.substring(1)); } else { - if (ignoreUnavailable == false && allSnapshotIds.containsKey(snapshotOrPattern) == false) { - throw new SnapshotMissingException(repo, snapshotOrPattern); + if (Regex.isSimpleMatchPattern(snapshotOrPattern)) { + seenWildcard = true; + includePatterns.add(snapshotOrPattern); + } else if (GetSnapshotsRequest.CURRENT_SNAPSHOT.equalsIgnoreCase(snapshotOrPattern)) { + hasCurrent = true; + seenWildcard = true; + } else { + if (ignoreUnavailable == false && allSnapshotIds.containsKey(snapshotOrPattern) == false) { + throw new SnapshotMissingException(repo, snapshotOrPattern); + } + includePatterns.add(snapshotOrPattern); } - includePatterns.add(snapshotOrPattern); } } - } - final String[] includes = includePatterns.toArray(Strings.EMPTY_ARRAY); - final String[] excludes = excludePatterns.toArray(Strings.EMPTY_ARRAY); - for (Map.Entry entry : allSnapshotIds.entrySet()) { - final Snapshot snapshot = entry.getValue(); - if (toResolve.contains(snapshot) == false - && Regex.simpleMatch(includes, entry.getKey()) - && Regex.simpleMatch(excludes, entry.getKey()) == false) { - toResolve.add(snapshot); - } - } - if (hasCurrent) { - for (SnapshotInfo snapshotInfo : currentSnapshots) { - final Snapshot snapshot = snapshotInfo.snapshot(); - if (Regex.simpleMatch(excludes, snapshot.getSnapshotId().getName()) == false) { + final String[] includes = includePatterns.toArray(Strings.EMPTY_ARRAY); + final String[] excludes = excludePatterns.toArray(Strings.EMPTY_ARRAY); + for (Map.Entry entry : allSnapshotIds.entrySet()) { + final Snapshot snapshot = entry.getValue(); + if (toResolve.contains(snapshot) == false + && Regex.simpleMatch(includes, entry.getKey()) + && Regex.simpleMatch(excludes, entry.getKey()) == false) { toResolve.add(snapshot); } } + if (hasCurrent) { + for (SnapshotInfo snapshotInfo : currentSnapshots) { + final Snapshot snapshot = snapshotInfo.snapshot(); + if (Regex.simpleMatch(excludes, snapshot.getSnapshotId().getName()) == false) { + toResolve.add(snapshot); + } + } + } + if (toResolve.isEmpty() && ignoreUnavailable == false && isCurrentSnapshotsOnly() == false) { + throw new SnapshotMissingException(repo, snapshots[0]); + } } - if (toResolve.isEmpty() && ignoreUnavailable == false && isCurrentSnapshotsOnly(snapshots) == false) { - throw new SnapshotMissingException(repo, snapshots[0]); + + if (verbose) { + snapshots(repo, toResolve.stream().map(Snapshot::getSnapshotId).toList(), listener); + } else { + assert predicates.isMatchAll() : "filtering is not supported in non-verbose mode"; + final SnapshotsInRepo snapshotInfos; + if (repositoryData != null) { + // want non-current snapshots as well, which are found in the repository data + snapshotInfos = buildSimpleSnapshotInfos(toResolve, repo, repositoryData, currentSnapshots); + } else { + // only want current snapshots + snapshotInfos = sortSnapshotsWithNoOffsetOrLimit(currentSnapshots.stream().map(SnapshotInfo::basic).toList()); + } + listener.onResponse(snapshotInfos); } } - if (verbose) { - snapshots( + /** + * Returns a list of snapshots from repository sorted by snapshot creation date + * + * @param repositoryName repository name + * @param snapshotIds snapshots for which to fetch snapshot information + */ + private void snapshots(String repositoryName, Collection snapshotIds, ActionListener listener) { + if (cancellableTask.notifyIfCancelled(listener)) { + return; + } + final Set snapshotSet = new HashSet<>(); + final Set snapshotIdsToIterate = new HashSet<>(snapshotIds); + // first, look at the snapshots in progress + final List entries = SnapshotsService.currentSnapshots( snapshotsInProgress, - repo, - toResolve.stream().map(Snapshot::getSnapshotId).toList(), - ignoreUnavailable, - task, - sortBy, - after, - order, - predicates, - indices, - listener + repositoryName, + snapshotIdsToIterate.stream().map(SnapshotId::getName).toList() ); - } else { - assert predicates.isMatchAll() : "filtering is not supported in non-verbose mode"; - final SnapshotsInRepo snapshotInfos; - if (repositoryData != null) { - // want non-current snapshots as well, which are found in the repository data - snapshotInfos = buildSimpleSnapshotInfos(toResolve, repo, repositoryData, currentSnapshots, sortBy, after, order, indices); + for (SnapshotsInProgress.Entry entry : entries) { + if (snapshotIdsToIterate.remove(entry.snapshot().getSnapshotId())) { + final SnapshotInfo snapshotInfo = SnapshotInfo.inProgress(entry); + if (predicates.test(snapshotInfo)) { + snapshotSet.add(snapshotInfo.maybeWithoutIndices(indices)); + } + } + } + // then, look in the repository if there's any matching snapshots left + final List snapshotInfos; + if (snapshotIdsToIterate.isEmpty()) { + snapshotInfos = Collections.emptyList(); } else { - // only want current snapshots - snapshotInfos = sortSnapshots( - currentSnapshots.stream().map(SnapshotInfo::basic).toList(), - sortBy, - after, - 0, - GetSnapshotsRequest.NO_LIMIT, - order - ); + snapshotInfos = Collections.synchronizedList(new ArrayList<>()); } - listener.onResponse(snapshotInfos); - } - } - - /** - * Returns a list of snapshots from repository sorted by snapshot creation date - * - * @param snapshotsInProgress snapshots in progress in the cluster state - * @param repositoryName repository name - * @param snapshotIds snapshots for which to fetch snapshot information - * @param ignoreUnavailable if true, snapshots that could not be read will only be logged with a warning, - * @param indices if false, drop the list of indices from each result - */ - private void snapshots( - SnapshotsInProgress snapshotsInProgress, - String repositoryName, - Collection snapshotIds, - boolean ignoreUnavailable, - CancellableTask task, - GetSnapshotsRequest.SortBy sortBy, - @Nullable GetSnapshotsRequest.After after, - SortOrder order, - SnapshotPredicates predicate, - boolean indices, - ActionListener listener - ) { - if (task.notifyIfCancelled(listener)) { - return; - } - final Set snapshotSet = new HashSet<>(); - final Set snapshotIdsToIterate = new HashSet<>(snapshotIds); - // first, look at the snapshots in progress - final List entries = SnapshotsService.currentSnapshots( - snapshotsInProgress, - repositoryName, - snapshotIdsToIterate.stream().map(SnapshotId::getName).toList() - ); - for (SnapshotsInProgress.Entry entry : entries) { - if (snapshotIdsToIterate.remove(entry.snapshot().getSnapshotId())) { - final SnapshotInfo snapshotInfo = SnapshotInfo.inProgress(entry); - if (predicate.test(snapshotInfo)) { - snapshotSet.add(snapshotInfo.maybeWithoutIndices(indices)); - } + final ActionListener allDoneListener = listener.safeMap(v -> { + final ArrayList snapshotList = new ArrayList<>(snapshotInfos); + snapshotList.addAll(snapshotSet); + return sortSnapshotsWithNoOffsetOrLimit(snapshotList); + }); + if (snapshotIdsToIterate.isEmpty()) { + allDoneListener.onResponse(null); + return; } + final Repository repository; + try { + repository = repositoriesService.repository(repositoryName); + } catch (RepositoryMissingException e) { + listener.onFailure(e); + return; + } + repository.getSnapshotInfo( + new GetSnapshotInfoContext( + snapshotIdsToIterate, + ignoreUnavailable == false, + cancellableTask::isCancelled, + (context, snapshotInfo) -> { + if (predicates.test(snapshotInfo)) { + snapshotInfos.add(snapshotInfo.maybeWithoutIndices(indices)); + } + }, + allDoneListener + ) + ); } - // then, look in the repository if there's any matching snapshots left - final List snapshotInfos; - if (snapshotIdsToIterate.isEmpty()) { - snapshotInfos = Collections.emptyList(); - } else { - snapshotInfos = Collections.synchronizedList(new ArrayList<>()); - } - final ActionListener allDoneListener = listener.safeMap(v -> { - final ArrayList snapshotList = new ArrayList<>(snapshotInfos); - snapshotList.addAll(snapshotSet); - return sortSnapshots(snapshotList, sortBy, after, 0, GetSnapshotsRequest.NO_LIMIT, order); - }); - if (snapshotIdsToIterate.isEmpty()) { - allDoneListener.onResponse(null); - return; - } - final Repository repository; - try { - repository = repositoriesService.repository(repositoryName); - } catch (RepositoryMissingException e) { - listener.onFailure(e); - return; - } - repository.getSnapshotInfo( - new GetSnapshotInfoContext(snapshotIdsToIterate, ignoreUnavailable == false, task::isCancelled, (context, snapshotInfo) -> { - if (predicate.test(snapshotInfo)) { - snapshotInfos.add(snapshotInfo.maybeWithoutIndices(indices)); - } - }, allDoneListener) - ); - } - private static boolean isCurrentSnapshotsOnly(String[] snapshots) { - return (snapshots.length == 1 && GetSnapshotsRequest.CURRENT_SNAPSHOT.equalsIgnoreCase(snapshots[0])); - } + private boolean isCurrentSnapshotsOnly() { + return snapshots.length == 1 && GetSnapshotsRequest.CURRENT_SNAPSHOT.equalsIgnoreCase(snapshots[0]); + } - private static SnapshotsInRepo buildSimpleSnapshotInfos( - final Set toResolve, - final String repoName, - final RepositoryData repositoryData, - final List currentSnapshots, - final GetSnapshotsRequest.SortBy sortBy, - @Nullable final GetSnapshotsRequest.After after, - final SortOrder order, - boolean indices - ) { - List snapshotInfos = new ArrayList<>(); - for (SnapshotInfo snapshotInfo : currentSnapshots) { - if (toResolve.remove(snapshotInfo.snapshot())) { - snapshotInfos.add(snapshotInfo.basic()); + private SnapshotsInRepo buildSimpleSnapshotInfos( + final Set toResolve, + final String repoName, + final RepositoryData repositoryData, + final List currentSnapshots + ) { + List snapshotInfos = new ArrayList<>(); + for (SnapshotInfo snapshotInfo : currentSnapshots) { + if (toResolve.remove(snapshotInfo.snapshot())) { + snapshotInfos.add(snapshotInfo.basic()); + } } - } - Map> snapshotsToIndices = new HashMap<>(); - if (indices) { - for (IndexId indexId : repositoryData.getIndices().values()) { - for (SnapshotId snapshotId : repositoryData.getSnapshots(indexId)) { - if (toResolve.contains(new Snapshot(repoName, snapshotId))) { - snapshotsToIndices.computeIfAbsent(snapshotId, (k) -> new ArrayList<>()).add(indexId.getName()); + Map> snapshotsToIndices = new HashMap<>(); + if (indices) { + for (IndexId indexId : repositoryData.getIndices().values()) { + for (SnapshotId snapshotId : repositoryData.getSnapshots(indexId)) { + if (toResolve.contains(new Snapshot(repoName, snapshotId))) { + snapshotsToIndices.computeIfAbsent(snapshotId, (k) -> new ArrayList<>()).add(indexId.getName()); + } } } } + for (Snapshot snapshot : toResolve) { + snapshotInfos.add( + new SnapshotInfo( + snapshot, + snapshotsToIndices.getOrDefault(snapshot.getSnapshotId(), Collections.emptyList()), + Collections.emptyList(), + Collections.emptyList(), + repositoryData.getSnapshotState(snapshot.getSnapshotId()) + ) + ); + } + return sortSnapshotsWithNoOffsetOrLimit(snapshotInfos); } - for (Snapshot snapshot : toResolve) { - snapshotInfos.add( - new SnapshotInfo( - snapshot, - snapshotsToIndices.getOrDefault(snapshot.getSnapshotId(), Collections.emptyList()), - Collections.emptyList(), - Collections.emptyList(), - repositoryData.getSnapshotState(snapshot.getSnapshotId()) - ) - ); - } - return sortSnapshots(snapshotInfos, sortBy, after, 0, GetSnapshotsRequest.NO_LIMIT, order); - } - private static final Comparator BY_START_TIME = Comparator.comparingLong(SnapshotInfo::startTime) - .thenComparing(SnapshotInfo::snapshotId); + private static final Comparator BY_START_TIME = Comparator.comparingLong(SnapshotInfo::startTime) + .thenComparing(SnapshotInfo::snapshotId); - private static final Comparator BY_DURATION = Comparator.comparingLong( - sni -> sni.endTime() - sni.startTime() - ).thenComparing(SnapshotInfo::snapshotId); + private static final Comparator BY_DURATION = Comparator.comparingLong( + sni -> sni.endTime() - sni.startTime() + ).thenComparing(SnapshotInfo::snapshotId); - private static final Comparator BY_INDICES_COUNT = Comparator.comparingInt(sni -> sni.indices().size()) - .thenComparing(SnapshotInfo::snapshotId); + private static final Comparator BY_INDICES_COUNT = Comparator.comparingInt(sni -> sni.indices().size()) + .thenComparing(SnapshotInfo::snapshotId); - private static final Comparator BY_SHARDS_COUNT = Comparator.comparingInt(SnapshotInfo::totalShards) - .thenComparing(SnapshotInfo::snapshotId); + private static final Comparator BY_SHARDS_COUNT = Comparator.comparingInt(SnapshotInfo::totalShards) + .thenComparing(SnapshotInfo::snapshotId); - private static final Comparator BY_FAILED_SHARDS_COUNT = Comparator.comparingInt(SnapshotInfo::failedShards) - .thenComparing(SnapshotInfo::snapshotId); + private static final Comparator BY_FAILED_SHARDS_COUNT = Comparator.comparingInt(SnapshotInfo::failedShards) + .thenComparing(SnapshotInfo::snapshotId); - private static final Comparator BY_NAME = Comparator.comparing(sni -> sni.snapshotId().getName()); + private static final Comparator BY_NAME = Comparator.comparing(sni -> sni.snapshotId().getName()); - private static final Comparator BY_REPOSITORY = Comparator.comparing(SnapshotInfo::repository) - .thenComparing(SnapshotInfo::snapshotId); + private static final Comparator BY_REPOSITORY = Comparator.comparing(SnapshotInfo::repository) + .thenComparing(SnapshotInfo::snapshotId); - private static long getDuration(SnapshotId snapshotId, RepositoryData repositoryData) { - final RepositoryData.SnapshotDetails details = repositoryData.getSnapshotDetails(snapshotId); - if (details == null) { - return -1; - } - final long startTime = details.getStartTimeMillis(); - if (startTime == -1) { - return -1; + private SnapshotsInRepo sortSnapshotsWithNoOffsetOrLimit(List snapshotInfos) { + return sortSnapshots(snapshotInfos.stream(), snapshotInfos.size(), 0, GetSnapshotsRequest.NO_LIMIT); } - final long endTime = details.getEndTimeMillis(); - if (endTime == -1) { - return -1; - } - return endTime - startTime; - } - private static long getStartTime(SnapshotId snapshotId, RepositoryData repositoryData) { - final RepositoryData.SnapshotDetails details = repositoryData.getSnapshotDetails(snapshotId); - return details == null ? -1 : details.getStartTimeMillis(); - } + private SnapshotsInRepo sortSnapshots(Stream infos, int totalCount, int offset, int size) { + final Comparator comparator = switch (sortBy) { + case START_TIME -> BY_START_TIME; + case NAME -> BY_NAME; + case DURATION -> BY_DURATION; + case INDICES -> BY_INDICES_COUNT; + case SHARDS -> BY_SHARDS_COUNT; + case FAILED_SHARDS -> BY_FAILED_SHARDS_COUNT; + case REPOSITORY -> BY_REPOSITORY; + }; - private static int indexCount(SnapshotId snapshotId, RepositoryData repositoryData) { - // TODO: this could be made more efficient by caching this number in RepositoryData - int indexCount = 0; - for (IndexId idx : repositoryData.getIndices().values()) { - if (repositoryData.getSnapshots(idx).contains(snapshotId)) { - indexCount++; + if (after != null) { + assert offset == 0 : "can't combine after and offset but saw [" + after + "] and offset [" + offset + "]"; + infos = infos.filter(buildAfterPredicate()); } + infos = infos.sorted(order == SortOrder.DESC ? comparator.reversed() : comparator).skip(offset); + final List allSnapshots = infos.toList(); + final List snapshots; + if (size != GetSnapshotsRequest.NO_LIMIT) { + snapshots = allSnapshots.stream().limit(size + 1).toList(); + } else { + snapshots = allSnapshots; + } + final List resultSet = size != GetSnapshotsRequest.NO_LIMIT && size < snapshots.size() + ? snapshots.subList(0, size) + : snapshots; + return new SnapshotsInRepo(resultSet, totalCount, allSnapshots.size() - resultSet.size()); + } + + private Predicate buildAfterPredicate() { + final String snapshotName = after.snapshotName(); + final String repoName = after.repoName(); + final String value = after.value(); + return switch (sortBy) { + case START_TIME -> filterByLongOffset(SnapshotInfo::startTime, Long.parseLong(value), snapshotName, repoName, order); + case NAME -> + // TODO: cover via pre-flight predicate + order == SortOrder.ASC + ? (info -> compareName(snapshotName, repoName, info) < 0) + : (info -> compareName(snapshotName, repoName, info) > 0); + case DURATION -> filterByLongOffset( + info -> info.endTime() - info.startTime(), + Long.parseLong(value), + snapshotName, + repoName, + order + ); + case INDICES -> + // TODO: cover via pre-flight predicate + filterByLongOffset(info -> info.indices().size(), Integer.parseInt(value), snapshotName, repoName, order); + case SHARDS -> filterByLongOffset(SnapshotInfo::totalShards, Integer.parseInt(value), snapshotName, repoName, order); + case FAILED_SHARDS -> filterByLongOffset( + SnapshotInfo::failedShards, + Integer.parseInt(value), + snapshotName, + repoName, + order + ); + case REPOSITORY -> + // TODO: cover via pre-flight predicate + order == SortOrder.ASC + ? (info -> compareRepositoryName(snapshotName, repoName, info) < 0) + : (info -> compareRepositoryName(snapshotName, repoName, info) > 0); + }; + } + + private static Predicate filterByLongOffset( + ToLongFunction extractor, + long after, + String snapshotName, + String repoName, + SortOrder order + ) { + return order == SortOrder.ASC ? info -> { + final long val = extractor.applyAsLong(info); + return after < val || (after == val && compareName(snapshotName, repoName, info) < 0); + } : info -> { + final long val = extractor.applyAsLong(info); + return after > val || (after == val && compareName(snapshotName, repoName, info) > 0); + }; + } + + private static int compareRepositoryName(String name, String repoName, SnapshotInfo info) { + final int res = repoName.compareTo(info.repository()); + if (res != 0) { + return res; + } + return name.compareTo(info.snapshotId().getName()); } - return indexCount; - } - - private static SnapshotsInRepo sortSnapshots( - List snapshotInfos, - GetSnapshotsRequest.SortBy sortBy, - @Nullable GetSnapshotsRequest.After after, - int offset, - int size, - SortOrder order - ) { - return sortSnapshots(snapshotInfos.stream(), snapshotInfos.size(), sortBy, after, offset, size, order); - } - - private static SnapshotsInRepo sortSnapshots( - Stream infos, - int totalCount, - GetSnapshotsRequest.SortBy sortBy, - @Nullable GetSnapshotsRequest.After after, - int offset, - int size, - SortOrder order - ) { - final Comparator comparator = switch (sortBy) { - case START_TIME -> BY_START_TIME; - case NAME -> BY_NAME; - case DURATION -> BY_DURATION; - case INDICES -> BY_INDICES_COUNT; - case SHARDS -> BY_SHARDS_COUNT; - case FAILED_SHARDS -> BY_FAILED_SHARDS_COUNT; - case REPOSITORY -> BY_REPOSITORY; - }; - - if (after != null) { - assert offset == 0 : "can't combine after and offset but saw [" + after + "] and offset [" + offset + "]"; - infos = infos.filter(buildAfterPredicate(sortBy, after, order)); - } - infos = infos.sorted(order == SortOrder.DESC ? comparator.reversed() : comparator).skip(offset); - final List allSnapshots = infos.toList(); - final List snapshots; - if (size != GetSnapshotsRequest.NO_LIMIT) { - snapshots = allSnapshots.stream().limit(size + 1).toList(); - } else { - snapshots = allSnapshots; - } - final List resultSet = size != GetSnapshotsRequest.NO_LIMIT && size < snapshots.size() - ? snapshots.subList(0, size) - : snapshots; - return new SnapshotsInRepo(resultSet, totalCount, allSnapshots.size() - resultSet.size()); - } - private static Predicate buildAfterPredicate( - GetSnapshotsRequest.SortBy sortBy, - GetSnapshotsRequest.After after, - SortOrder order - ) { - final String snapshotName = after.snapshotName(); - final String repoName = after.repoName(); - final String value = after.value(); - return switch (sortBy) { - case START_TIME -> filterByLongOffset(SnapshotInfo::startTime, Long.parseLong(value), snapshotName, repoName, order); - case NAME -> - // TODO: cover via pre-flight predicate - order == SortOrder.ASC - ? (info -> compareName(snapshotName, repoName, info) < 0) - : (info -> compareName(snapshotName, repoName, info) > 0); - case DURATION -> filterByLongOffset( - info -> info.endTime() - info.startTime(), - Long.parseLong(value), - snapshotName, - repoName, - order - ); - case INDICES -> - // TODO: cover via pre-flight predicate - filterByLongOffset(info -> info.indices().size(), Integer.parseInt(value), snapshotName, repoName, order); - case SHARDS -> filterByLongOffset(SnapshotInfo::totalShards, Integer.parseInt(value), snapshotName, repoName, order); - case FAILED_SHARDS -> filterByLongOffset(SnapshotInfo::failedShards, Integer.parseInt(value), snapshotName, repoName, order); - case REPOSITORY -> - // TODO: cover via pre-flight predicate - order == SortOrder.ASC - ? (info -> compareRepositoryName(snapshotName, repoName, info) < 0) - : (info -> compareRepositoryName(snapshotName, repoName, info) > 0); - }; - } - - private static Predicate filterByLongOffset( - ToLongFunction extractor, - long after, - String snapshotName, - String repoName, - SortOrder order - ) { - return order == SortOrder.ASC ? info -> { - final long val = extractor.applyAsLong(info); - return after < val || (after == val && compareName(snapshotName, repoName, info) < 0); - } : info -> { - final long val = extractor.applyAsLong(info); - return after > val || (after == val && compareName(snapshotName, repoName, info) > 0); - }; - } - - private static int compareRepositoryName(String name, String repoName, SnapshotInfo info) { - final int res = repoName.compareTo(info.repository()); - if (res != 0) { - return res; + private static int compareName(String name, String repoName, SnapshotInfo info) { + final int res = name.compareTo(info.snapshotId().getName()); + if (res != 0) { + return res; + } + return repoName.compareTo(info.repository()); } - return name.compareTo(info.snapshotId().getName()); - } - private static int compareName(String name, String repoName, SnapshotInfo info) { - final int res = name.compareTo(info.snapshotId().getName()); - if (res != 0) { - return res; - } - return repoName.compareTo(info.repository()); } /** @@ -881,6 +797,37 @@ private static Predicate filterByLongOffset(ToLongFunction after <= extractor.applyAsLong(info) : info -> after >= extractor.applyAsLong(info); } + private static long getDuration(SnapshotId snapshotId, RepositoryData repositoryData) { + final RepositoryData.SnapshotDetails details = repositoryData.getSnapshotDetails(snapshotId); + if (details == null) { + return -1; + } + final long startTime = details.getStartTimeMillis(); + if (startTime == -1) { + return -1; + } + final long endTime = details.getEndTimeMillis(); + if (endTime == -1) { + return -1; + } + return endTime - startTime; + } + + private static long getStartTime(SnapshotId snapshotId, RepositoryData repositoryData) { + final RepositoryData.SnapshotDetails details = repositoryData.getSnapshotDetails(snapshotId); + return details == null ? -1 : details.getStartTimeMillis(); + } + + private static int indexCount(SnapshotId snapshotId, RepositoryData repositoryData) { + // TODO: this could be made more efficient by caching this number in RepositoryData + int indexCount = 0; + for (IndexId idx : repositoryData.getIndices().values()) { + if (repositoryData.getSnapshots(idx).contains(snapshotId)) { + indexCount++; + } + } + return indexCount; + } } private record SnapshotsInRepo(List snapshotInfos, int totalCount, int remaining) { From 2ba37ffc38b4964079f06a2ebdbee95017103700 Mon Sep 17 00:00:00 2001 From: Ignacio Vera Date: Fri, 23 Feb 2024 07:54:41 +0100 Subject: [PATCH 02/11] Reduce InternalAdjacencyMatrix in a streaming fashion (#105751) --- .../adjacency/InternalAdjacencyMatrix.java | 43 ++++++++----------- 1 file changed, 19 insertions(+), 24 deletions(-) diff --git a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java index 8802ffd41571d..6e70e9263df47 100644 --- a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java +++ b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java @@ -11,11 +11,13 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.Maps; +import org.elasticsearch.core.Releasables; import org.elasticsearch.search.aggregations.AggregationReduceContext; import org.elasticsearch.search.aggregations.AggregatorReducer; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; +import org.elasticsearch.search.aggregations.bucket.MultiBucketAggregatorsReducer; import org.elasticsearch.search.aggregations.support.SamplingContext; import org.elasticsearch.xcontent.XContentBuilder; @@ -177,30 +179,38 @@ public InternalBucket getBucketByKey(String key) { @Override protected AggregatorReducer getLeaderReducer(AggregationReduceContext reduceContext, int size) { - Map> bucketsMap = new HashMap<>(); return new AggregatorReducer() { + final Map bucketsReducer = new HashMap<>(getBuckets().size()); + @Override public void accept(InternalAggregation aggregation) { - InternalAdjacencyMatrix filters = (InternalAdjacencyMatrix) aggregation; + final InternalAdjacencyMatrix filters = (InternalAdjacencyMatrix) aggregation; for (InternalBucket bucket : filters.buckets) { - List sameRangeList = bucketsMap.computeIfAbsent(bucket.key, k -> new ArrayList<>(size)); - sameRangeList.add(bucket); + MultiBucketAggregatorsReducer reducer = bucketsReducer.computeIfAbsent( + bucket.key, + k -> new MultiBucketAggregatorsReducer(reduceContext, size) + ); + reducer.accept(bucket); } } @Override public InternalAggregation get() { - List reducedBuckets = new ArrayList<>(bucketsMap.size()); - for (List sameRangeList : bucketsMap.values()) { - InternalBucket reducedBucket = reduceBucket(sameRangeList, reduceContext); - if (reducedBucket.docCount >= 1) { - reducedBuckets.add(reducedBucket); + List reducedBuckets = new ArrayList<>(bucketsReducer.size()); + for (Map.Entry entry : bucketsReducer.entrySet()) { + if (entry.getValue().getDocCount() >= 1) { + reducedBuckets.add(new InternalBucket(entry.getKey(), entry.getValue().getDocCount(), entry.getValue().get())); } } reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size()); reducedBuckets.sort(Comparator.comparing(InternalBucket::getKey)); return new InternalAdjacencyMatrix(name, reducedBuckets, getMetadata()); } + + @Override + public void close() { + Releasables.close(bucketsReducer.values()); + } }; } @@ -209,21 +219,6 @@ public InternalAggregation finalizeSampling(SamplingContext samplingContext) { return new InternalAdjacencyMatrix(name, buckets.stream().map(b -> b.finalizeSampling(samplingContext)).toList(), getMetadata()); } - private InternalBucket reduceBucket(List buckets, AggregationReduceContext context) { - assert buckets.isEmpty() == false; - InternalBucket reduced = null; - for (InternalBucket bucket : buckets) { - if (reduced == null) { - reduced = new InternalBucket(bucket.key, bucket.docCount, bucket.aggregations); - } else { - reduced.docCount += bucket.docCount; - } - } - final List aggregations = new BucketAggregationList<>(buckets); - reduced.aggregations = InternalAggregations.reduce(aggregations, context); - return reduced; - } - @Override public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { builder.startArray(CommonFields.BUCKETS.getPreferredName()); From ae4e57d7461dca73d3c5c32873a3de5bdb46f75b Mon Sep 17 00:00:00 2001 From: Tim Grein Date: Fri, 23 Feb 2024 09:46:57 +0100 Subject: [PATCH 03/11] [Connectors API] Add tests for ConnectorStateMachine (#105736) --- .../connector/ConnectorStateMachine.java | 7 +++++ .../connector/ConnectorStateMachineTests.java | 27 +++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/ConnectorStateMachine.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/ConnectorStateMachine.java index 39a12ba334c30..f722955cc0f9e 100644 --- a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/ConnectorStateMachine.java +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/ConnectorStateMachine.java @@ -42,6 +42,13 @@ public static boolean isValidTransition(ConnectorStatus current, ConnectorStatus return validNextStates(current).contains(next); } + /** + * Throws {@link ConnectorInvalidStatusTransitionException} if a + * transition from one {@link ConnectorStatus} to another is invalid. + * + * @param current The current {@link ConnectorStatus} of the {@link Connector}. + * @param next The proposed next {@link ConnectorStatus} of the {@link Connector}. + */ public static void assertValidStateTransition(ConnectorStatus current, ConnectorStatus next) throws ConnectorInvalidStatusTransitionException { if (isValidTransition(current, next)) return; diff --git a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/ConnectorStateMachineTests.java b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/ConnectorStateMachineTests.java index 372c874310162..d1f08f80d02f2 100644 --- a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/ConnectorStateMachineTests.java +++ b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/ConnectorStateMachineTests.java @@ -65,4 +65,31 @@ public void testTransitionToSameState() { assertFalse("Transition from " + state + " to itself should be invalid", ConnectorStateMachine.isValidTransition(state, state)); } } + + public void testAssertValidStateTransition_ExpectExceptionOnInvalidTransition() { + assertThrows( + ConnectorInvalidStatusTransitionException.class, + () -> ConnectorStateMachine.assertValidStateTransition(ConnectorStatus.CREATED, ConnectorStatus.CONFIGURED) + ); + } + + public void testAssertValidStateTransition_ExpectNoExceptionOnValidTransition() { + ConnectorStatus prevStatus = ConnectorStatus.CREATED; + ConnectorStatus nextStatus = ConnectorStatus.ERROR; + + try { + ConnectorStateMachine.assertValidStateTransition(prevStatus, nextStatus); + } catch (ConnectorInvalidStatusTransitionException e) { + fail( + "Did not expect " + + ConnectorInvalidStatusTransitionException.class.getSimpleName() + + " to be thrown for valid state transition [" + + prevStatus + + "] -> " + + "[" + + nextStatus + + "]." + ); + } + } } From 56716694b0977863fb2f23303b40cf967673c1ef Mon Sep 17 00:00:00 2001 From: Tim Grein Date: Fri, 23 Feb 2024 09:47:18 +0100 Subject: [PATCH 04/11] [Connectors API] Remove unused method (#105739) --- .../connector/syncjob/ConnectorSyncJobIndexService.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexService.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexService.java index b1c08d8b7fbb1..910f0605ef7aa 100644 --- a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexService.java +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexService.java @@ -24,7 +24,6 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.internal.Client; -import org.elasticsearch.common.UUIDs; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.index.query.BoolQueryBuilder; @@ -416,14 +415,6 @@ public void updateConnectorSyncJobIngestionStats( } - private String generateId() { - /* Workaround: only needed for generating an id upfront, autoGenerateId() has a side effect generating a timestamp, - * which would raise an error on the response layer later ("autoGeneratedTimestamp should not be set externally"). - * TODO: do we even need to copy the "_id" and set it as "id"? - */ - return UUIDs.base64UUID(); - } - private void getSyncJobConnectorInfo(String connectorId, ActionListener listener) { try { From 21b64ba2e492317224bef560d52a0bffb39a7cb1 Mon Sep 17 00:00:00 2001 From: Tim Grein Date: Fri, 23 Feb 2024 09:47:40 +0100 Subject: [PATCH 05/11] [Connectors API] Make default scheduling for all sync jobs more readable (#105755) --- .../connector/ConnectorScheduling.java | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/ConnectorScheduling.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/ConnectorScheduling.java index 233bea5d4a842..637957b8ce66e 100644 --- a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/ConnectorScheduling.java +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/ConnectorScheduling.java @@ -30,14 +30,15 @@ public class ConnectorScheduling implements Writeable, ToXContentObject { - private final ScheduleConfig accessControl; - private final ScheduleConfig full; - private final ScheduleConfig incremental; - + private static final String EVERYDAY_AT_MIDNIGHT = "0 0 0 * * ?"; private static final ParseField ACCESS_CONTROL_FIELD = new ParseField("access_control"); private static final ParseField FULL_FIELD = new ParseField("full"); private static final ParseField INCREMENTAL_FIELD = new ParseField("incremental"); + private final ScheduleConfig accessControl; + private final ScheduleConfig full; + private final ScheduleConfig incremental; + /** * @param accessControl connector access control sync schedule represented as {@link ScheduleConfig} * @param full connector full sync schedule represented as {@link ScheduleConfig} @@ -238,12 +239,19 @@ public ScheduleConfig build() { } } + /** + * Default scheduling is set to everyday at midnight (00:00:00). + * + * @return default scheduling for full, incremental and access control syncs. + */ public static ConnectorScheduling getDefaultConnectorScheduling() { return new ConnectorScheduling.Builder().setAccessControl( - new ConnectorScheduling.ScheduleConfig.Builder().setEnabled(false).setInterval(new Cron("0 0 0 * * ?")).build() + new ConnectorScheduling.ScheduleConfig.Builder().setEnabled(false).setInterval(new Cron(EVERYDAY_AT_MIDNIGHT)).build() ) - .setFull(new ConnectorScheduling.ScheduleConfig.Builder().setEnabled(false).setInterval(new Cron("0 0 0 * * ?")).build()) - .setIncremental(new ConnectorScheduling.ScheduleConfig.Builder().setEnabled(false).setInterval(new Cron("0 0 0 * * ?")).build()) + .setFull(new ConnectorScheduling.ScheduleConfig.Builder().setEnabled(false).setInterval(new Cron(EVERYDAY_AT_MIDNIGHT)).build()) + .setIncremental( + new ConnectorScheduling.ScheduleConfig.Builder().setEnabled(false).setInterval(new Cron(EVERYDAY_AT_MIDNIGHT)).build() + ) .build(); } } From 10ec23a42cc009a877f39b3176eaa273e8296368 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Fri, 23 Feb 2024 19:51:04 +1100 Subject: [PATCH 06/11] Add metrics for retries by S3RetryingInputStream (#105600) This PR exposes retries in S3RetryingInputStream as metrics for easier observability. At the class API level, retries can happen when either opening an input stream and reading from an input stream. Retry reading from an input stream internally can also retry re-opening the input stream. All these retries are counted under the retries for reading since the higher API usage is a read instead of open. The list of new metrics are: * `es.repositories.s3.input_stream.retry.event.total` - Number of times a retry cycle has been triggered. * `es.repositories.s3.input_stream.retry.success.total` - Number of a times a retry cycle has been successfully completed. This should match the above metric in numbers. Otherwise it indicates there are threads stuck in infinite retries. * `es.repositories.s3.input_stream.retry.attempts.histogram` - Number of attempts to complete a retry cycle successfully. Relates: https://github.com/elastic/elasticsearch/pull/103300#discussion_r1444125047 Relates: ES-7666 --- .../repositories/azure/AzureRepository.java | 4 +- .../gcs/GoogleCloudStorageRepository.java | 4 +- .../s3/S3BlobStoreRepositoryTests.java | 5 +- .../s3/S3RepositoryThirdPartyTests.java | 3 +- .../repositories/s3/S3BlobStore.java | 31 ++++-- .../s3/S3RepositoriesMetrics.java | 37 +++++++ .../repositories/s3/S3Repository.java | 11 +- .../repositories/s3/S3RepositoryPlugin.java | 9 +- .../s3/S3RetryingInputStream.java | 39 ++++++- .../s3/RepositoryCredentialsTests.java | 5 +- .../s3/S3BlobContainerRetriesTests.java | 103 +++++++++++++++++- .../repositories/s3/S3RepositoryTests.java | 3 +- .../repositories/RepositoriesMetrics.java | 2 + .../blobstore/MeteredBlobStoreRepository.java | 6 +- .../RepositoriesServiceTests.java | 6 +- .../telemetry/RecordingMeterRegistry.java | 2 +- 16 files changed, 216 insertions(+), 54 deletions(-) create mode 100644 modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoriesMetrics.java diff --git a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java index f58611cb0567a..388474acc75ea 100644 --- a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java +++ b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java @@ -21,7 +21,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.indices.recovery.RecoverySettings; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository; import org.elasticsearch.xcontent.NamedXContentRegistry; @@ -108,8 +107,7 @@ public AzureRepository( bigArrays, recoverySettings, buildBasePath(metadata), - buildLocation(metadata), - RepositoriesMetrics.NOOP + buildLocation(metadata) ); this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings()); this.storageService = storageService; diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java index 94d0abe17909f..e2338371cf837 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java @@ -19,7 +19,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.indices.recovery.RecoverySettings; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository; import org.elasticsearch.xcontent.NamedXContentRegistry; @@ -77,8 +76,7 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository { bigArrays, recoverySettings, buildBasePath(metadata), - buildLocation(metadata), - RepositoriesMetrics.NOOP + buildLocation(metadata) ); this.storageService = storageService; diff --git a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index 248ccc119794e..4080a47c7dabe 100644 --- a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -39,7 +39,6 @@ import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; @@ -460,9 +459,9 @@ protected S3Repository createRepository( ClusterService clusterService, BigArrays bigArrays, RecoverySettings recoverySettings, - RepositoriesMetrics repositoriesMetrics + S3RepositoriesMetrics s3RepositoriesMetrics ) { - return new S3Repository(metadata, registry, getService(), clusterService, bigArrays, recoverySettings, repositoriesMetrics) { + return new S3Repository(metadata, registry, getService(), clusterService, bigArrays, recoverySettings, s3RepositoriesMetrics) { @Override public BlobStore blobStore() { diff --git a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java index f182b54b0c696..b8fea485c6276 100644 --- a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java +++ b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java @@ -30,7 +30,6 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.fixtures.minio.MinioTestContainer; @@ -145,7 +144,7 @@ public long absoluteTimeInMillis() { ClusterServiceUtils.createClusterService(threadpool), BigArrays.NON_RECYCLING_INSTANCE, new RecoverySettings(node().settings(), node().injector().getInstance(ClusterService.class).getClusterSettings()), - RepositoriesMetrics.NOOP + S3RepositoriesMetrics.NOOP ) ) { repository.start(); diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java index 78b1e2dba98b3..6b9937b01a433 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java @@ -33,7 +33,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; @@ -84,7 +83,7 @@ class S3BlobStore implements BlobStore { private final ThreadPool threadPool; private final Executor snapshotExecutor; - private final RepositoriesMetrics repositoriesMetrics; + private final S3RepositoriesMetrics s3RepositoriesMetrics; private final StatsCollectors statsCollectors = new StatsCollectors(); @@ -98,7 +97,7 @@ class S3BlobStore implements BlobStore { RepositoryMetadata repositoryMetadata, BigArrays bigArrays, ThreadPool threadPool, - RepositoriesMetrics repositoriesMetrics + S3RepositoriesMetrics s3RepositoriesMetrics ) { this.service = service; this.bigArrays = bigArrays; @@ -110,7 +109,7 @@ class S3BlobStore implements BlobStore { this.repositoryMetadata = repositoryMetadata; this.threadPool = threadPool; this.snapshotExecutor = threadPool.executor(ThreadPool.Names.SNAPSHOT); - this.repositoriesMetrics = repositoriesMetrics; + this.s3RepositoriesMetrics = s3RepositoriesMetrics; } RequestMetricCollector getMetricCollector(Operation operation, OperationPurpose purpose) { @@ -174,19 +173,19 @@ public final void collectMetrics(Request request, Response response) { .map(List::size) .orElse(0); - repositoriesMetrics.operationCounter().incrementBy(1, attributes); + s3RepositoriesMetrics.common().operationCounter().incrementBy(1, attributes); if (numberOfAwsErrors == requestCount) { - repositoriesMetrics.unsuccessfulOperationCounter().incrementBy(1, attributes); + s3RepositoriesMetrics.common().unsuccessfulOperationCounter().incrementBy(1, attributes); } - repositoriesMetrics.requestCounter().incrementBy(requestCount, attributes); + s3RepositoriesMetrics.common().requestCounter().incrementBy(requestCount, attributes); if (exceptionCount > 0) { - repositoriesMetrics.exceptionCounter().incrementBy(exceptionCount, attributes); - repositoriesMetrics.exceptionHistogram().record(exceptionCount, attributes); + s3RepositoriesMetrics.common().exceptionCounter().incrementBy(exceptionCount, attributes); + s3RepositoriesMetrics.common().exceptionHistogram().record(exceptionCount, attributes); } if (throttleCount > 0) { - repositoriesMetrics.throttleCounter().incrementBy(throttleCount, attributes); - repositoriesMetrics.throttleHistogram().record(throttleCount, attributes); + s3RepositoriesMetrics.common().throttleCounter().incrementBy(throttleCount, attributes); + s3RepositoriesMetrics.common().throttleHistogram().record(throttleCount, attributes); } maybeRecordHttpRequestTime(request); } @@ -207,7 +206,7 @@ private void maybeRecordHttpRequestTime(Request request) { if (totalTimeInMicros == 0) { logger.warn("Expected HttpRequestTime to be tracked for request [{}] but found no count.", request); } else { - repositoriesMetrics.httpRequestTimeInMicroHistogram().record(totalTimeInMicros, attributes); + s3RepositoriesMetrics.common().httpRequestTimeInMicroHistogram().record(totalTimeInMicros, attributes); } } @@ -293,6 +292,14 @@ public long bufferSizeInBytes() { return bufferSize.getBytes(); } + public RepositoryMetadata getRepositoryMetadata() { + return repositoryMetadata; + } + + public S3RepositoriesMetrics getS3RepositoriesMetrics() { + return s3RepositoriesMetrics; + } + @Override public BlobContainer blobContainer(BlobPath path) { return new S3BlobContainer(path, this); diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoriesMetrics.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoriesMetrics.java new file mode 100644 index 0000000000000..e025214998d5b --- /dev/null +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoriesMetrics.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.repositories.s3; + +import org.elasticsearch.repositories.RepositoriesMetrics; +import org.elasticsearch.telemetry.metric.LongCounter; +import org.elasticsearch.telemetry.metric.LongHistogram; + +public record S3RepositoriesMetrics( + RepositoriesMetrics common, + LongCounter retryStartedCounter, + LongCounter retryCompletedCounter, + LongHistogram retryHistogram +) { + + public static S3RepositoriesMetrics NOOP = new S3RepositoriesMetrics(RepositoriesMetrics.NOOP); + + public static final String METRIC_RETRY_EVENT_TOTAL = "es.repositories.s3.input_stream.retry.event.total"; + public static final String METRIC_RETRY_SUCCESS_TOTAL = "es.repositories.s3.input_stream.retry.success.total"; + public static final String METRIC_RETRY_ATTEMPTS_HISTOGRAM = "es.repositories.s3.input_stream.retry.attempts.histogram"; + + public S3RepositoriesMetrics(RepositoriesMetrics common) { + this( + common, + common.meterRegistry().registerLongCounter(METRIC_RETRY_EVENT_TOTAL, "s3 input stream retry event count", "unit"), + common.meterRegistry().registerLongCounter(METRIC_RETRY_SUCCESS_TOTAL, "s3 input stream retry success count", "unit"), + common.meterRegistry() + .registerLongHistogram(METRIC_RETRY_ATTEMPTS_HISTOGRAM, "s3 input stream retry attempts histogram", "unit") + ); + } +} diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index 624867a2f0c41..26b1b1158dea0 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -31,7 +31,6 @@ import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.repositories.FinalizeSnapshotContext; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository; @@ -195,6 +194,8 @@ class S3Repository extends MeteredBlobStoreRepository { private final Executor snapshotExecutor; + private final S3RepositoriesMetrics s3RepositoriesMetrics; + /** * Constructs an s3 backed repository */ @@ -205,7 +206,7 @@ class S3Repository extends MeteredBlobStoreRepository { final ClusterService clusterService, final BigArrays bigArrays, final RecoverySettings recoverySettings, - final RepositoriesMetrics repositoriesMetrics + final S3RepositoriesMetrics s3RepositoriesMetrics ) { super( metadata, @@ -214,10 +215,10 @@ class S3Repository extends MeteredBlobStoreRepository { bigArrays, recoverySettings, buildBasePath(metadata), - buildLocation(metadata), - repositoriesMetrics + buildLocation(metadata) ); this.service = service; + this.s3RepositoriesMetrics = s3RepositoriesMetrics; this.snapshotExecutor = threadPool().executor(ThreadPool.Names.SNAPSHOT); // Parse and validate the user's S3 Storage Class setting @@ -408,7 +409,7 @@ protected S3BlobStore createBlobStore() { metadata, bigArrays, threadPool, - repositoriesMetrics + s3RepositoriesMetrics ); } diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java index 83668cc271922..26047c3b416a7 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java @@ -78,9 +78,9 @@ protected S3Repository createRepository( final ClusterService clusterService, final BigArrays bigArrays, final RecoverySettings recoverySettings, - final RepositoriesMetrics repositoriesMetrics + final S3RepositoriesMetrics s3RepositoriesMetrics ) { - return new S3Repository(metadata, registry, service.get(), clusterService, bigArrays, recoverySettings, repositoriesMetrics); + return new S3Repository(metadata, registry, service.get(), clusterService, bigArrays, recoverySettings, s3RepositoriesMetrics); } @Override @@ -101,11 +101,12 @@ public Map getRepositories( final ClusterService clusterService, final BigArrays bigArrays, final RecoverySettings recoverySettings, - RepositoriesMetrics repositoriesMetrics + final RepositoriesMetrics repositoriesMetrics ) { + final S3RepositoriesMetrics s3RepositoriesMetrics = new S3RepositoriesMetrics(repositoriesMetrics); return Collections.singletonMap( S3Repository.TYPE, - metadata -> createRepository(metadata, registry, clusterService, bigArrays, recoverySettings, repositoriesMetrics) + metadata -> createRepository(metadata, registry, clusterService, bigArrays, recoverySettings, s3RepositoriesMetrics) ); } diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java index c457b9d51e8b9..f7a99a399f59f 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java @@ -27,6 +27,7 @@ import java.nio.file.NoSuchFileException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import static org.elasticsearch.core.Strings.format; import static org.elasticsearch.repositories.s3.S3BlobStore.configureRequestForMetrics; @@ -80,7 +81,7 @@ class S3RetryingInputStream extends InputStream { this.end = end; final int initialAttempt = attempt; openStreamWithRetry(); - maybeLogForSuccessAfterRetries(initialAttempt, "opened"); + maybeLogAndRecordMetricsForSuccess(initialAttempt, "open"); } private void openStreamWithRetry() throws IOException { @@ -105,6 +106,9 @@ private void openStreamWithRetry() throws IOException { ); } + if (attempt == 1) { + blobStore.getS3RepositoriesMetrics().retryStartedCounter().incrementBy(1, metricAttributes("open")); + } final long delayInMillis = maybeLogAndComputeRetryDelay("opening", e); delayBeforeRetry(delayInMillis); } @@ -142,9 +146,12 @@ public int read() throws IOException { } else { currentOffset += 1; } - maybeLogForSuccessAfterRetries(initialAttempt, "read"); + maybeLogAndRecordMetricsForSuccess(initialAttempt, "read"); return result; } catch (IOException e) { + if (attempt == initialAttempt) { + blobStore.getS3RepositoriesMetrics().retryStartedCounter().incrementBy(1, metricAttributes("read")); + } reopenStreamOrFail(e); } } @@ -162,9 +169,12 @@ public int read(byte[] b, int off, int len) throws IOException { } else { currentOffset += bytesRead; } - maybeLogForSuccessAfterRetries(initialAttempt, "read"); + maybeLogAndRecordMetricsForSuccess(initialAttempt, "read"); return bytesRead; } catch (IOException e) { + if (attempt == initialAttempt) { + blobStore.getS3RepositoriesMetrics().retryStartedCounter().incrementBy(1, metricAttributes("read")); + } reopenStreamOrFail(e); } } @@ -246,16 +256,20 @@ private void logForRetry(Level level, String action, Exception e) { ); } - private void maybeLogForSuccessAfterRetries(int initialAttempt, String action) { + private void maybeLogAndRecordMetricsForSuccess(int initialAttempt, String action) { if (attempt > initialAttempt) { + final int numberOfRetries = attempt - initialAttempt; logger.info( "successfully {} input stream for [{}/{}] with purpose [{}] after [{}] retries", action, blobStore.bucket(), blobKey, purpose.getKey(), - attempt - initialAttempt + numberOfRetries ); + final Map attributes = metricAttributes(action); + blobStore.getS3RepositoriesMetrics().retryCompletedCounter().incrementBy(1, attributes); + blobStore.getS3RepositoriesMetrics().retryHistogram().record(numberOfRetries, attributes); } } @@ -294,6 +308,21 @@ protected long getRetryDelayInMillis() { return 10L << (Math.min(attempt - 1, 10)); } + private Map metricAttributes(String action) { + return Map.of( + "repo_type", + S3Repository.TYPE, + "repo_name", + blobStore.getRepositoryMetadata().name(), + "operation", + Operation.GET_OBJECT.getKey(), + "purpose", + purpose.getKey(), + "action", + action + ); + } + @Override public void close() throws IOException { maybeAbort(currentStream); diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java index 28a48c2968f59..cf3bc21526bf6 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java @@ -26,7 +26,6 @@ import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.rest.AbstractRestChannel; import org.elasticsearch.rest.RestRequest; @@ -264,9 +263,9 @@ protected S3Repository createRepository( ClusterService clusterService, BigArrays bigArrays, RecoverySettings recoverySettings, - RepositoriesMetrics repositoriesMetrics + S3RepositoriesMetrics s3RepositoriesMetrics ) { - return new S3Repository(metadata, registry, getService(), clusterService, bigArrays, recoverySettings, repositoriesMetrics) { + return new S3Repository(metadata, registry, getService(), clusterService, bigArrays, recoverySettings, s3RepositoriesMetrics) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually on test/main threads diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index 0ddd29171b3bd..05268d750637c 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -43,6 +43,9 @@ import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase; import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; +import org.elasticsearch.telemetry.InstrumentType; +import org.elasticsearch.telemetry.Measurement; +import org.elasticsearch.telemetry.RecordingMeterRegistry; import org.elasticsearch.watcher.ResourceWatcherService; import org.hamcrest.Matcher; import org.junit.After; @@ -59,7 +62,9 @@ import java.nio.charset.StandardCharsets; import java.nio.file.NoSuchFileException; import java.util.Arrays; +import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Objects; import java.util.OptionalInt; import java.util.concurrent.atomic.AtomicBoolean; @@ -74,10 +79,13 @@ import static org.elasticsearch.repositories.s3.S3ClientSettings.READ_TIMEOUT_SETTING; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; @@ -91,6 +99,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes private S3Service service; private AtomicBoolean shouldErrorOnDns; + private RecordingMeterRegistry recordingMeterRegistry; @Before public void setUp() throws Exception { @@ -109,6 +118,7 @@ protected AmazonS3ClientBuilder buildClientBuilder(S3ClientSettings clientSettin return builder; } }; + recordingMeterRegistry = new RecordingMeterRegistry(); super.setUp(); } @@ -185,7 +195,7 @@ protected BlobContainer createBlobContainer( repositoryMetadata, BigArrays.NON_RECYCLING_INSTANCE, new DeterministicTaskQueue().getThreadPool(), - RepositoriesMetrics.NOOP + new S3RepositoriesMetrics(new RepositoriesMetrics(recordingMeterRegistry)) ); return new S3BlobContainer(randomBoolean() ? BlobPath.EMPTY : BlobPath.EMPTY.add("foo"), s3BlobStore) { @Override @@ -669,8 +679,8 @@ public void handle(HttpExchange exchange) throws IOException { } exchange.getResponseBody().write(bytes, rangeStart, length); } else { - failures.incrementAndGet(); if (randomBoolean()) { + failures.incrementAndGet(); exchange.sendResponseHeaders( randomFrom( HttpStatus.SC_INTERNAL_SERVER_ERROR, @@ -686,6 +696,8 @@ public void handle(HttpExchange exchange) throws IOException { if (bytesSent >= meaningfulProgressBytes) { exchange.getResponseBody().flush(); } + } else { + failures.incrementAndGet(); } } } @@ -700,16 +712,28 @@ public void handle(HttpExchange exchange) throws IOException { final int length = between(0, randomBoolean() ? bytes.length : Integer.MAX_VALUE); logger.info("--> position={}, length={}", position, length); try (InputStream inputStream = blobContainer.readBlob(OperationPurpose.INDICES, "read_blob_retries_forever", position, length)) { + assertMetricsForOpeningStream(); + recordingMeterRegistry.getRecorder().resetCalls(); + failures.set(0); + final byte[] bytesRead = BytesReference.toBytes(Streams.readFully(inputStream)); assertArrayEquals(Arrays.copyOfRange(bytes, position, Math.min(bytes.length, position + length)), bytesRead); + assertMetricsForReadingStream(); } assertThat(failures.get(), greaterThan(totalFailures)); // Read the whole blob failures.set(0); + recordingMeterRegistry.getRecorder().resetCalls(); try (InputStream inputStream = blobContainer.readBlob(OperationPurpose.INDICES, "read_blob_retries_forever")) { + assertMetricsForOpeningStream(); + recordingMeterRegistry.getRecorder().resetCalls(); + failures.set(0); + final byte[] bytesRead = BytesReference.toBytes(Streams.readFully(inputStream)); assertArrayEquals(bytes, bytesRead); + + assertMetricsForReadingStream(); } assertThat(failures.get(), greaterThan(totalFailures)); } @@ -737,9 +761,13 @@ public void handle(HttpExchange exchange) throws IOException { : blobContainer.readBlob(randomRetryingPurpose(), "read_blob_not_found", between(0, 100), between(1, 100)) ) { Streams.readFully(inputStream); + } }); assertThat(numberOfReads.get(), equalTo(1)); + assertThat(getRetryStartedMeasurements(), empty()); + assertThat(getRetryCompletedMeasurements(), empty()); + assertThat(getRetryHistogramMeasurements(), empty()); } @Override @@ -761,6 +789,77 @@ protected OperationPurpose randomFiniteRetryingPurpose() { ); } + private void assertMetricsForOpeningStream() { + final long numberOfOperations = getOperationMeasurements(); + // S3 client sdk internally also retries within the configured maxRetries for retryable errors. + // The retries in S3RetryingInputStream are triggered when the client internal retries are unsuccessful + if (numberOfOperations > 1) { + // For opening the stream, there should be exactly one pair of started and completed records. + // There should be one histogram record, the number of retries must be greater than 0 + final Map attributes = metricAttributes("open"); + assertThat(getRetryStartedMeasurements(), contains(new Measurement(1L, attributes, false))); + assertThat(getRetryCompletedMeasurements(), contains(new Measurement(1L, attributes, false))); + final List retryHistogramMeasurements = getRetryHistogramMeasurements(); + assertThat(retryHistogramMeasurements, hasSize(1)); + assertThat(retryHistogramMeasurements.get(0).getLong(), equalTo(numberOfOperations - 1)); + assertThat(retryHistogramMeasurements.get(0).attributes(), equalTo(attributes)); + } else { + assertThat(getRetryStartedMeasurements(), empty()); + assertThat(getRetryCompletedMeasurements(), empty()); + assertThat(getRetryHistogramMeasurements(), empty()); + } + } + + private void assertMetricsForReadingStream() { + // For reading the stream, there could be multiple pairs of started and completed records. + // It is important that they always come in pairs and the number of pairs match the number + // of histogram records. + final Map attributes = metricAttributes("read"); + final List retryHistogramMeasurements = getRetryHistogramMeasurements(); + final int numberOfReads = retryHistogramMeasurements.size(); + retryHistogramMeasurements.forEach(measurement -> { + assertThat(measurement.getLong(), greaterThan(0L)); + assertThat(measurement.attributes(), equalTo(attributes)); + }); + + final List retryStartedMeasurements = getRetryStartedMeasurements(); + assertThat(retryStartedMeasurements, hasSize(1)); + assertThat(retryStartedMeasurements.get(0).getLong(), equalTo((long) numberOfReads)); + assertThat(retryStartedMeasurements.get(0).attributes(), equalTo(attributes)); + assertThat(retryStartedMeasurements, equalTo(getRetryCompletedMeasurements())); + } + + private long getOperationMeasurements() { + final List operationMeasurements = Measurement.combine( + recordingMeterRegistry.getRecorder().getMeasurements(InstrumentType.LONG_COUNTER, RepositoriesMetrics.METRIC_OPERATIONS_TOTAL) + ); + assertThat(operationMeasurements, hasSize(1)); + return operationMeasurements.get(0).getLong(); + } + + private List getRetryStartedMeasurements() { + return Measurement.combine( + recordingMeterRegistry.getRecorder() + .getMeasurements(InstrumentType.LONG_COUNTER, S3RepositoriesMetrics.METRIC_RETRY_EVENT_TOTAL) + ); + } + + private List getRetryCompletedMeasurements() { + return Measurement.combine( + recordingMeterRegistry.getRecorder() + .getMeasurements(InstrumentType.LONG_COUNTER, S3RepositoriesMetrics.METRIC_RETRY_SUCCESS_TOTAL) + ); + } + + private List getRetryHistogramMeasurements() { + return recordingMeterRegistry.getRecorder() + .getMeasurements(InstrumentType.LONG_HISTOGRAM, S3RepositoriesMetrics.METRIC_RETRY_ATTEMPTS_HISTOGRAM); + } + + private Map metricAttributes(String action) { + return Map.of("repo_type", "s3", "repo_name", "repository", "operation", "GetObject", "purpose", "Indices", "action", action); + } + /** * Asserts that an InputStream is fully consumed, or aborted, when it is closed */ diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java index 0a92ed0a28973..50470ec499ef6 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java @@ -18,7 +18,6 @@ import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.env.Environment; import org.elasticsearch.indices.recovery.RecoverySettings; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.test.ESTestCase; @@ -130,7 +129,7 @@ private S3Repository createS3Repo(RepositoryMetadata metadata) { BlobStoreTestUtil.mockClusterService(), MockBigArrays.NON_RECYCLING_INSTANCE, new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), - RepositoriesMetrics.NOOP + S3RepositoriesMetrics.NOOP ) { @Override protected void assertSnapshotOrGenericThread() { diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesMetrics.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesMetrics.java index b4d79d89ec4c6..50aa7881cd2b6 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesMetrics.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesMetrics.java @@ -13,6 +13,7 @@ import org.elasticsearch.telemetry.metric.MeterRegistry; public record RepositoriesMetrics( + MeterRegistry meterRegistry, LongCounter requestCounter, LongCounter exceptionCounter, LongCounter throttleCounter, @@ -36,6 +37,7 @@ public record RepositoriesMetrics( public RepositoriesMetrics(MeterRegistry meterRegistry) { this( + meterRegistry, meterRegistry.registerLongCounter(METRIC_REQUESTS_TOTAL, "repository request counter", "unit"), meterRegistry.registerLongCounter(METRIC_EXCEPTIONS_TOTAL, "repository request exception counter", "unit"), meterRegistry.registerLongCounter(METRIC_THROTTLES_TOTAL, "repository request throttle counter", "unit"), diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/MeteredBlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/MeteredBlobStoreRepository.java index 6ecab2f8c77f2..c5ea99b0e5c14 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/MeteredBlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/MeteredBlobStoreRepository.java @@ -14,7 +14,6 @@ import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.indices.recovery.RecoverySettings; -import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.RepositoryInfo; import org.elasticsearch.repositories.RepositoryStatsSnapshot; import org.elasticsearch.threadpool.ThreadPool; @@ -24,7 +23,6 @@ public abstract class MeteredBlobStoreRepository extends BlobStoreRepository { private final RepositoryInfo repositoryInfo; - protected final RepositoriesMetrics repositoriesMetrics; public MeteredBlobStoreRepository( RepositoryMetadata metadata, @@ -33,11 +31,9 @@ public MeteredBlobStoreRepository( BigArrays bigArrays, RecoverySettings recoverySettings, BlobPath basePath, - Map location, - RepositoriesMetrics repositoriesMetrics + Map location ) { super(metadata, namedXContentRegistry, clusterService, bigArrays, recoverySettings, basePath); - this.repositoriesMetrics = repositoriesMetrics; ThreadPool threadPool = clusterService.getClusterApplierService().threadPool(); this.repositoryInfo = new RepositoryInfo( UUIDs.randomBase64UUID(), diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index 4f7001f00e6a7..45e4bb09c1616 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -482,8 +482,7 @@ private MeteredRepositoryTypeA(RepositoryMetadata metadata, ClusterService clust MockBigArrays.NON_RECYCLING_INSTANCE, mock(RecoverySettings.class), BlobPath.EMPTY, - Map.of("bucket", "bucket-a"), - RepositoriesMetrics.NOOP + Map.of("bucket", "bucket-a") ); } @@ -510,8 +509,7 @@ private MeteredRepositoryTypeB(RepositoryMetadata metadata, ClusterService clust MockBigArrays.NON_RECYCLING_INSTANCE, mock(RecoverySettings.class), BlobPath.EMPTY, - Map.of("bucket", "bucket-b"), - RepositoriesMetrics.NOOP + Map.of("bucket", "bucket-b") ); } diff --git a/test/framework/src/main/java/org/elasticsearch/telemetry/RecordingMeterRegistry.java b/test/framework/src/main/java/org/elasticsearch/telemetry/RecordingMeterRegistry.java index 86bfd9bf38c26..33693c297f166 100644 --- a/test/framework/src/main/java/org/elasticsearch/telemetry/RecordingMeterRegistry.java +++ b/test/framework/src/main/java/org/elasticsearch/telemetry/RecordingMeterRegistry.java @@ -33,7 +33,7 @@ public class RecordingMeterRegistry implements MeterRegistry { protected final MetricRecorder recorder = new MetricRecorder<>(); - MetricRecorder getRecorder() { + public MetricRecorder getRecorder() { return recorder; } From d67af191c0565d44e4e5571d7ea6b575bebf1812 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 23 Feb 2024 12:53:03 +0100 Subject: [PATCH 07/11] Enhance LeakTracker to record thread + test that caused a leak (#105758) Add recording of the current thread name to ever leak record. Also add functionality to set a global hint for the context of a leak and use it to record the current test name. This massively eases the task of correlating an observed leak with the test that caused it. --- .../elasticsearch/transport/LeakTracker.java | 19 ++++++++++++++++++- .../org/elasticsearch/test/ESTestCase.java | 2 ++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/transport/LeakTracker.java b/server/src/main/java/org/elasticsearch/transport/LeakTracker.java index 3be22f6fae53a..77a41cff15fd7 100644 --- a/server/src/main/java/org/elasticsearch/transport/LeakTracker.java +++ b/server/src/main/java/org/elasticsearch/transport/LeakTracker.java @@ -41,6 +41,8 @@ public final class LeakTracker { public static final LeakTracker INSTANCE = new LeakTracker(); + private static volatile String contextHint = ""; + private LeakTracker() {} /** @@ -72,6 +74,15 @@ public void reportLeak() { } } + /** + * Set a hint string that will be recorded with every leak that is recorded. Used by unit tests to allow identifying the exact test + * that caused a leak by setting the test name here. + * @param hint hint value + */ + public static void setContextHint(String hint) { + contextHint = hint; + } + public static Releasable wrap(Releasable releasable) { if (Assertions.ENABLED == false) { return releasable; @@ -299,19 +310,25 @@ private static final class Record extends Throwable { private final Record next; private final int pos; + private final String threadName; + + private final String contextHint = LeakTracker.contextHint; + Record(Record next) { this.next = next; this.pos = next.pos + 1; + threadName = Thread.currentThread().getName(); } private Record() { next = null; pos = -1; + threadName = Thread.currentThread().getName(); } @Override public String toString() { - StringBuilder buf = new StringBuilder(); + StringBuilder buf = new StringBuilder("\tin [").append(threadName).append("][").append(contextHint).append("]\n"); StackTraceElement[] array = getStackTrace(); // Skip the first three elements since those are just related to the leak tracker. for (int i = 3; i < array.length; i++) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index 7b4032cc56cef..67919756e16a9 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -499,6 +499,7 @@ public void removeHeaderWarningAppender() { @Before public final void before() { + LeakTracker.setContextHint(getTestName()); logger.info("{}before test", getTestParamsForLogging()); assertNull("Thread context initialized twice", threadContext); if (enableWarningsCheck()) { @@ -530,6 +531,7 @@ public final void after() throws Exception { ensureAllSearchContextsReleased(); ensureCheckIndexPassed(); logger.info("{}after test", getTestParamsForLogging()); + LeakTracker.setContextHint(""); } private String getTestParamsForLogging() { From 39a4ddb3f6bfcb2db98b14ed47f105003357556e Mon Sep 17 00:00:00 2001 From: Alessandro Stoltenberg Date: Fri, 23 Feb 2024 13:01:08 +0100 Subject: [PATCH 08/11] email-reporting-attachment-docs: Correct auth and proxy fields. (#105730) --- docs/reference/watcher/actions/email.asciidoc | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/reference/watcher/actions/email.asciidoc b/docs/reference/watcher/actions/email.asciidoc index 71fdd95148d24..16b9cc4be0628 100644 --- a/docs/reference/watcher/actions/email.asciidoc +++ b/docs/reference/watcher/actions/email.asciidoc @@ -149,8 +149,10 @@ killed by firewalls or load balancers in-between. means, by default watcher tries to download a dashboard for 10 minutes, forty times fifteen seconds). The setting `xpack.notification.reporting.interval` can be configured globally to change the default. -| `request.auth` | Additional auth configuration for the request -| `request.proxy` | Additional proxy configuration for the request +| `auth` | Additional auth configuration for the request, see + {kibana-ref}/automating-report-generation.html#use-watcher[use watcher] for details +| `proxy` | Additional proxy configuration for the request. See <> + on how to configure the values. |====== From 1dd2712bad106084b0d706ebe170860d11862808 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Fri, 23 Feb 2024 08:22:28 -0500 Subject: [PATCH 09/11] Add new shard_seed parameter for random_sampler agg (#104830) While it is important to ensure IID via shard hashes, it can become a barrier and a complexity when testing out random_sampler. So, this commit adds a new optional parameter called `shardSeed`, which, when combined with `seed` ensures 100% consistent sampling over shards where data is exactly the same. --- docs/changelog/104830.yaml | 5 +++ .../aggregations/bucket/RandomSamplerIT.java | 43 +++++++++++++++++++ .../org/elasticsearch/TransportVersions.java | 1 + .../sampler/random/InternalRandomSampler.java | 19 +++++++- .../RandomSamplerAggregationBuilder.java | 24 +++++++++-- .../random/RandomSamplerAggregator.java | 6 ++- .../RandomSamplerAggregatorFactory.java | 24 +++++++++-- .../aggregations/support/SamplingContext.java | 15 ++++--- .../RandomSamplerAggregationBuilderTests.java | 3 ++ .../support/SamplingContextTests.java | 5 +-- .../aggregations/AggregatorTestCase.java | 6 ++- .../test/InternalAggregationTestCase.java | 6 ++- 12 files changed, 137 insertions(+), 20 deletions(-) create mode 100644 docs/changelog/104830.yaml diff --git a/docs/changelog/104830.yaml b/docs/changelog/104830.yaml new file mode 100644 index 0000000000000..c056f3d618b75 --- /dev/null +++ b/docs/changelog/104830.yaml @@ -0,0 +1,5 @@ +pr: 104830 +summary: All new `shard_seed` parameter for `random_sampler` agg +area: Aggregations +type: enhancement +issues: [] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/RandomSamplerIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/RandomSamplerIT.java index 28c186c559dff..53075e31cd6f9 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/RandomSamplerIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/RandomSamplerIT.java @@ -24,6 +24,7 @@ import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThan; @ESIntegTestCase.SuiteScopeTestCase @@ -84,6 +85,48 @@ public void setupSuiteScopeCluster() throws Exception { ensureSearchable(); } + public void testRandomSamplerConsistentSeed() { + double[] sampleMonotonicValue = new double[1]; + double[] sampleNumericValue = new double[1]; + long[] sampledDocCount = new long[1]; + // initialize the values + assertResponse( + prepareSearch("idx").setPreference("shard:0") + .addAggregation( + new RandomSamplerAggregationBuilder("sampler").setProbability(PROBABILITY) + .setSeed(0) + .subAggregation(avg("mean_monotonic").field(MONOTONIC_VALUE)) + .subAggregation(avg("mean_numeric").field(NUMERIC_VALUE)) + .setShardSeed(42) + ), + response -> { + InternalRandomSampler sampler = response.getAggregations().get("sampler"); + sampleMonotonicValue[0] = ((Avg) sampler.getAggregations().get("mean_monotonic")).getValue(); + sampleNumericValue[0] = ((Avg) sampler.getAggregations().get("mean_numeric")).getValue(); + sampledDocCount[0] = sampler.getDocCount(); + } + ); + + for (int i = 0; i < NUM_SAMPLE_RUNS; i++) { + assertResponse( + prepareSearch("idx").setPreference("shard:0") + .addAggregation( + new RandomSamplerAggregationBuilder("sampler").setProbability(PROBABILITY) + .setSeed(0) + .subAggregation(avg("mean_monotonic").field(MONOTONIC_VALUE)) + .subAggregation(avg("mean_numeric").field(NUMERIC_VALUE)) + .setShardSeed(42) + ), + response -> { + InternalRandomSampler sampler = response.getAggregations().get("sampler"); + assertThat(((Avg) sampler.getAggregations().get("mean_monotonic")).getValue(), equalTo(sampleMonotonicValue[0])); + assertThat(((Avg) sampler.getAggregations().get("mean_numeric")).getValue(), equalTo(sampleNumericValue[0])); + assertThat(sampler.getDocCount(), equalTo(sampledDocCount[0])); + } + ); + } + } + public void testRandomSampler() { double[] sampleMonotonicValue = new double[1]; double[] sampleNumericValue = new double[1]; diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index a6fa7a9ea8e99..c88b56ba25022 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -133,6 +133,7 @@ static TransportVersion def(int id) { public static final TransportVersion INDEX_REQUEST_NORMALIZED_BYTES_PARSED = def(8_593_00_0); public static final TransportVersion INGEST_GRAPH_STRUCTURE_EXCEPTION = def(8_594_00_0); public static final TransportVersion ML_MODEL_IN_SERVICE_SETTINGS = def(8_595_00_0); + public static final TransportVersion RANDOM_AGG_SHARD_SEED = def(8_596_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/InternalRandomSampler.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/InternalRandomSampler.java index 4dde9cc67b975..68a1a22369d2a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/InternalRandomSampler.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/InternalRandomSampler.java @@ -8,6 +8,7 @@ package org.elasticsearch.search.aggregations.bucket.sampler.random; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.Releasables; @@ -29,18 +30,21 @@ public class InternalRandomSampler extends InternalSingleBucketAggregation imple public static final String PARSER_NAME = "random_sampler"; private final int seed; + private final Integer shardSeed; private final double probability; InternalRandomSampler( String name, long docCount, int seed, + Integer shardSeed, double probability, InternalAggregations subAggregations, Map metadata ) { super(name, docCount, subAggregations, metadata); this.seed = seed; + this.shardSeed = shardSeed; this.probability = probability; } @@ -51,6 +55,11 @@ public InternalRandomSampler(StreamInput in) throws IOException { super(in); this.seed = in.readInt(); this.probability = in.readDouble(); + if (in.getTransportVersion().onOrAfter(TransportVersions.RANDOM_AGG_SHARD_SEED)) { + this.shardSeed = in.readOptionalInt(); + } else { + this.shardSeed = null; + } } @Override @@ -58,6 +67,9 @@ protected void doWriteTo(StreamOutput out) throws IOException { super.doWriteTo(out); out.writeInt(seed); out.writeDouble(probability); + if (out.getTransportVersion().onOrAfter(TransportVersions.RANDOM_AGG_SHARD_SEED)) { + out.writeOptionalInt(shardSeed); + } } @Override @@ -72,7 +84,7 @@ public String getType() { @Override protected InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations) { - return new InternalRandomSampler(name, docCount, seed, probability, subAggregations, metadata); + return new InternalRandomSampler(name, docCount, seed, shardSeed, probability, subAggregations, metadata); } @Override @@ -105,12 +117,15 @@ public void close() { } public SamplingContext buildContext() { - return new SamplingContext(probability, seed); + return new SamplingContext(probability, seed, shardSeed); } @Override public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { builder.field(RandomSamplerAggregationBuilder.SEED.getPreferredName(), seed); + if (shardSeed != null) { + builder.field(RandomSamplerAggregationBuilder.SHARD_SEED.getPreferredName(), shardSeed); + } builder.field(RandomSamplerAggregationBuilder.PROBABILITY.getPreferredName(), probability); builder.field(CommonFields.DOC_COUNT.getPreferredName(), getDocCount()); getAggregations().toXContentInternal(builder, params); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregationBuilder.java index 240f016c66954..9bd9ab45b633a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregationBuilder.java @@ -34,6 +34,7 @@ public class RandomSamplerAggregationBuilder extends AbstractAggregationBuilder< static final ParseField PROBABILITY = new ParseField("probability"); static final ParseField SEED = new ParseField("seed"); + static final ParseField SHARD_SEED = new ParseField("shard_seed"); public static final ObjectParser PARSER = ObjectParser.fromBuilder( RandomSamplerAggregationBuilder.NAME, @@ -41,10 +42,12 @@ public class RandomSamplerAggregationBuilder extends AbstractAggregationBuilder< ); static { PARSER.declareInt(RandomSamplerAggregationBuilder::setSeed, SEED); + PARSER.declareInt(RandomSamplerAggregationBuilder::setShardSeed, SHARD_SEED); PARSER.declareDouble(RandomSamplerAggregationBuilder::setProbability, PROBABILITY); } private int seed = Randomness.get().nextInt(); + private Integer shardSeed; private double p; public RandomSamplerAggregationBuilder(String name) { @@ -67,10 +70,18 @@ public RandomSamplerAggregationBuilder setSeed(int seed) { return this; } + public RandomSamplerAggregationBuilder setShardSeed(int shardSeed) { + this.shardSeed = shardSeed; + return this; + } + public RandomSamplerAggregationBuilder(StreamInput in) throws IOException { super(in); this.p = in.readDouble(); this.seed = in.readInt(); + if (in.getTransportVersion().onOrAfter(TransportVersions.RANDOM_AGG_SHARD_SEED)) { + this.shardSeed = in.readOptionalInt(); + } } protected RandomSamplerAggregationBuilder( @@ -81,12 +92,16 @@ protected RandomSamplerAggregationBuilder( super(clone, factoriesBuilder, metadata); this.p = clone.p; this.seed = clone.seed; + this.shardSeed = clone.shardSeed; } @Override protected void doWriteTo(StreamOutput out) throws IOException { out.writeDouble(p); out.writeInt(seed); + if (out.getTransportVersion().onOrAfter(TransportVersions.RANDOM_AGG_SHARD_SEED)) { + out.writeOptionalInt(shardSeed); + } } static void recursivelyCheckSubAggs(Collection builders, Consumer aggregationCheck) { @@ -128,7 +143,7 @@ protected AggregatorFactory doBuild( ); } }); - return new RandomSamplerAggregatorFactory(name, seed, p, context, parent, subfactoriesBuilder, metadata); + return new RandomSamplerAggregatorFactory(name, seed, shardSeed, p, context, parent, subfactoriesBuilder, metadata); } @Override @@ -136,6 +151,9 @@ protected XContentBuilder internalXContent(XContentBuilder builder, Params param builder.startObject(); builder.field(PROBABILITY.getPreferredName(), p); builder.field(SEED.getPreferredName(), seed); + if (shardSeed != null) { + builder.field(SHARD_SEED.getPreferredName(), shardSeed); + } builder.endObject(); return null; } @@ -162,7 +180,7 @@ public TransportVersion getMinimalSupportedVersion() { @Override public int hashCode() { - return Objects.hash(super.hashCode(), p, seed); + return Objects.hash(super.hashCode(), p, seed, shardSeed); } @Override @@ -171,6 +189,6 @@ public boolean equals(Object obj) { if (obj == null || getClass() != obj.getClass()) return false; if (super.equals(obj) == false) return false; RandomSamplerAggregationBuilder other = (RandomSamplerAggregationBuilder) obj; - return Objects.equals(p, other.p) && Objects.equals(seed, other.seed); + return Objects.equals(p, other.p) && Objects.equals(seed, other.seed) && Objects.equals(shardSeed, other.shardSeed); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregator.java index 8853733b9a158..a279b8270cd57 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregator.java @@ -30,12 +30,14 @@ public class RandomSamplerAggregator extends BucketsAggregator implements SingleBucketAggregator { private final int seed; + private final Integer shardSeed; private final double probability; private final CheckedSupplier weightSupplier; RandomSamplerAggregator( String name, int seed, + Integer shardSeed, double probability, CheckedSupplier weightSupplier, AggregatorFactories factories, @@ -53,6 +55,7 @@ public class RandomSamplerAggregator extends BucketsAggregator implements Single ); } this.weightSupplier = weightSupplier; + this.shardSeed = shardSeed; } @Override @@ -63,6 +66,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I name, bucketDocCount(owningBucketOrd), seed, + shardSeed, probability, subAggregationResults, metadata() @@ -72,7 +76,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I @Override public InternalAggregation buildEmptyAggregation() { - return new InternalRandomSampler(name, 0, seed, probability, buildEmptySubAggregations(), metadata()); + return new InternalRandomSampler(name, 0, seed, shardSeed, probability, buildEmptySubAggregations(), metadata()); } /** diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregatorFactory.java index d63f574b4d8bd..4be2e932179fe 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregatorFactory.java @@ -26,6 +26,7 @@ public class RandomSamplerAggregatorFactory extends AggregatorFactory { private final int seed; + private final Integer shardSeed; private final double probability; private final SamplingContext samplingContext; private Weight weight; @@ -33,6 +34,7 @@ public class RandomSamplerAggregatorFactory extends AggregatorFactory { RandomSamplerAggregatorFactory( String name, int seed, + Integer shardSeed, double probability, AggregationContext context, AggregatorFactory parent, @@ -42,7 +44,8 @@ public class RandomSamplerAggregatorFactory extends AggregatorFactory { super(name, context, parent, subFactories, metadata); this.probability = probability; this.seed = seed; - this.samplingContext = new SamplingContext(probability, seed); + this.samplingContext = new SamplingContext(probability, seed, shardSeed); + this.shardSeed = shardSeed; } @Override @@ -53,7 +56,18 @@ public Optional getSamplingContext() { @Override public Aggregator createInternal(Aggregator parent, CardinalityUpperBound cardinality, Map metadata) throws IOException { - return new RandomSamplerAggregator(name, seed, probability, this::getWeight, factories, context, parent, cardinality, metadata); + return new RandomSamplerAggregator( + name, + seed, + shardSeed, + probability, + this::getWeight, + factories, + context, + parent, + cardinality, + metadata + ); } /** @@ -66,7 +80,11 @@ public Aggregator createInternal(Aggregator parent, CardinalityUpperBound cardin */ private Weight getWeight() throws IOException { if (weight == null) { - RandomSamplingQuery query = new RandomSamplingQuery(probability, seed, context.shardRandomSeed()); + RandomSamplingQuery query = new RandomSamplingQuery( + probability, + seed, + shardSeed == null ? context.shardRandomSeed() : shardSeed + ); BooleanQuery booleanQuery = new BooleanQuery.Builder().add(query, BooleanClause.Occur.FILTER) .add(context.query(), BooleanClause.Occur.FILTER) .build(); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/support/SamplingContext.java b/server/src/main/java/org/elasticsearch/search/aggregations/support/SamplingContext.java index 57ea138f63268..d8f34bfcf9973 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/support/SamplingContext.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/support/SamplingContext.java @@ -20,8 +20,9 @@ /** * This provides information around the current sampling context for aggregations */ -public record SamplingContext(double probability, int seed) { - public static final SamplingContext NONE = new SamplingContext(1.0, 0); +public record SamplingContext(double probability, int seed, Integer shardSeed) { + + public static final SamplingContext NONE = new SamplingContext(1.0, 0, null); public boolean isSampled() { return probability < 1.0; @@ -97,20 +98,22 @@ public Query buildQueryWithSampler(QueryBuilder builder, AggregationContext cont } BooleanQuery.Builder queryBuilder = new BooleanQuery.Builder(); queryBuilder.add(rewritten, BooleanClause.Occur.FILTER); - queryBuilder.add(new RandomSamplingQuery(probability(), seed(), context.shardRandomSeed()), BooleanClause.Occur.FILTER); + queryBuilder.add( + new RandomSamplingQuery(probability(), seed(), shardSeed == null ? context.shardRandomSeed() : shardSeed), + BooleanClause.Occur.FILTER + ); return queryBuilder.build(); } /** * @param context The current aggregation context * @return the sampling query if the sampling context indicates that sampling is required - * @throws IOException thrown on query build failure */ - public Optional buildSamplingQueryIfNecessary(AggregationContext context) throws IOException { + public Optional buildSamplingQueryIfNecessary(AggregationContext context) { if (isSampled() == false) { return Optional.empty(); } - return Optional.of(new RandomSamplingQuery(probability(), seed(), context.shardRandomSeed())); + return Optional.of(new RandomSamplingQuery(probability(), seed(), shardSeed == null ? context.shardRandomSeed() : shardSeed)); } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregationBuilderTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregationBuilderTests.java index 5514cb441b54c..18808f9b2aa87 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregationBuilderTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregationBuilderTests.java @@ -19,6 +19,9 @@ protected RandomSamplerAggregationBuilder createTestAggregatorBuilder() { if (randomBoolean()) { builder.setSeed(randomInt()); } + if (randomBoolean()) { + builder.setShardSeed(randomInt()); + } builder.setProbability(randomFrom(1.0, randomDoubleBetween(0.0, 0.5, false))); builder.subAggregation(AggregationBuilders.max(randomAlphaOfLength(10)).field(randomAlphaOfLength(10))); return builder; diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/support/SamplingContextTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/support/SamplingContextTests.java index d9e19cf60e481..ffb56f17c7f8f 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/support/SamplingContextTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/support/SamplingContextTests.java @@ -14,10 +14,9 @@ import static org.hamcrest.Matchers.equalTo; public class SamplingContextTests extends ESTestCase { - protected static final int NUMBER_OF_TEST_RUNS = 20; private static SamplingContext randomContext() { - return new SamplingContext(randomDoubleBetween(1e-6, 0.1, false), randomInt()); + return new SamplingContext(randomDoubleBetween(1e-6, 0.1, false), randomInt(), randomBoolean() ? null : randomInt()); } public void testScaling() { @@ -41,7 +40,7 @@ public void testScaling() { } public void testNoScaling() { - SamplingContext samplingContext = new SamplingContext(1.0, randomInt()); + SamplingContext samplingContext = new SamplingContext(1.0, randomInt(), randomBoolean() ? null : randomInt()); long randomLong = randomLong(); double randomDouble = randomDouble(); assertThat(randomLong, equalTo(samplingContext.scaleDown(randomLong))); diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index 99734e5e224aa..1787638f9fdf3 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -1114,7 +1114,11 @@ public void testSupportedFieldTypes() throws IOException { // We should make sure if the builder says it supports sampling, that the internal aggregations returned override // finalizeSampling if (aggregationBuilder.supportsSampling()) { - SamplingContext randomSamplingContext = new SamplingContext(randomDoubleBetween(1e-8, 0.1, false), randomInt()); + SamplingContext randomSamplingContext = new SamplingContext( + randomDoubleBetween(1e-8, 0.1, false), + randomInt(), + randomBoolean() ? null : randomInt() + ); InternalAggregation sampledResult = internalAggregation.finalizeSampling(randomSamplingContext); assertThat(sampledResult.getClass(), equalTo(internalAggregation.getClass())); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java index f1b147eefe723..12c5085cbcd73 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java @@ -283,7 +283,11 @@ public void testReduceRandom() throws IOException { doAssertReducedMultiBucketConsumer(reduced, bucketConsumer); assertReduced(reduced, inputs.toReduce()); if (supportsSampling()) { - SamplingContext randomContext = new SamplingContext(randomDoubleBetween(1e-8, 0.1, false), randomInt()); + SamplingContext randomContext = new SamplingContext( + randomDoubleBetween(1e-8, 0.1, false), + randomInt(), + randomBoolean() ? null : randomInt() + ); @SuppressWarnings("unchecked") T sampled = (T) reduced.finalizeSampling(randomContext); assertSampled(sampled, reduced, randomContext); From d37d93ac36a04ca757a8818f4ed870f7e53b11f9 Mon Sep 17 00:00:00 2001 From: florent-leborgne Date: Fri, 23 Feb 2024 15:13:51 +0100 Subject: [PATCH 10/11] [Docs] [Remote Clusters] Note about certificates in ESS for Remote Cluster Security (#105771) * note about ess certificates * Update docs/reference/modules/cluster/remote-clusters-api-key.asciidoc Co-authored-by: Liam Thompson <32779855+leemthompo@users.noreply.github.com> --------- Co-authored-by: Liam Thompson <32779855+leemthompo@users.noreply.github.com> --- .../reference/modules/cluster/remote-clusters-api-key.asciidoc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/reference/modules/cluster/remote-clusters-api-key.asciidoc b/docs/reference/modules/cluster/remote-clusters-api-key.asciidoc index b95ebdf143a57..0cac52deaae4b 100644 --- a/docs/reference/modules/cluster/remote-clusters-api-key.asciidoc +++ b/docs/reference/modules/cluster/remote-clusters-api-key.asciidoc @@ -62,6 +62,9 @@ information, refer to https://www.elastic.co/subscriptions. [[remote-clusters-security-api-key]] ==== Establish trust with a remote cluster +NOTE: If a remote cluster is part of an {ess} deployment, it has a valid certificate by default. +You can therefore skip steps related to certificates in these instructions. + ===== On the remote cluster // tag::remote-cluster-steps[] From f86532b552b7c7645b562cf643199702c600f7f6 Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Fri, 23 Feb 2024 09:28:22 -0500 Subject: [PATCH 11/11] [Transform] Retry destination index creation (#105759) For Unattended Transforms, if we fail to create the destination index on the first run, we will retry the transformation iteration, but we will not retry the destination index creation on that next iteration. This change stops the Unattended Transform from progressing beyond the 0th checkpoint, so all retries will include the destination index creation. Fix #105683 Relate #104146 --- .../TransformInsufficientPermissionsIT.java | 10 +++++----- .../integration/TransformRestTestCase.java | 17 +++++++++-------- .../transform/transforms/TransformIndexer.java | 9 +++++---- 3 files changed, 19 insertions(+), 17 deletions(-) diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformInsufficientPermissionsIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformInsufficientPermissionsIT.java index dc48ceb7b309b..59a673790723e 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformInsufficientPermissionsIT.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformInsufficientPermissionsIT.java @@ -418,16 +418,14 @@ public void testTransformPermissionsDeferUnattendedNoDest() throws Exception { ); assertRed(transformId, authIssue); - startTransform(config.getId(), RequestOptions.DEFAULT); - - // Give the transform indexer enough time to try creating destination index - Thread.sleep(5_000); + startTransform(transformId, RequestOptions.DEFAULT); String destIndexIssue = Strings.format("Could not create destination index [%s] for transform [%s]", destIndexName, transformId); // transform's auth state status is still RED due to: // - lacking permissions // - and the inability to create destination index in the indexer (which is also a consequence of lacking permissions) - assertRed(transformId, authIssue, destIndexIssue); + // wait for 10 seconds to give the transform indexer enough time to try creating destination index + assertBusy(() -> { assertRed(transformId, authIssue, destIndexIssue); }); // update transform's credentials so that the transform has permission to access source/dest indices updateConfig(transformId, "{}", RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, Users.SENIOR.header).build()); @@ -593,5 +591,7 @@ private void assertRed(String transformId, String... expectedHealthIssueDetails) .map(issue -> (String) extractValue((Map) issue, "details")) .collect(toSet()); assertThat("Stats were: " + stats, actualHealthIssueDetailsSet, containsInAnyOrder(expectedHealthIssueDetails)); + // We should not progress beyond the 0th checkpoint until we correctly configure the Transform. + assertThat("Stats were: " + stats, getCheckpoint(stats), equalTo(0L)); } } diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java index eed849d35ea44..897de6c120a8b 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java @@ -294,14 +294,15 @@ protected void waitUntilCheckpoint(String id, long checkpoint) throws Exception } protected void waitUntilCheckpoint(String id, long checkpoint, TimeValue waitTime) throws Exception { - assertBusy( - () -> assertEquals( - checkpoint, - ((Integer) XContentMapValues.extractValue("checkpointing.last.checkpoint", getBasicTransformStats(id))).longValue() - ), - waitTime.getMillis(), - TimeUnit.MILLISECONDS - ); + assertBusy(() -> assertEquals(checkpoint, getCheckpoint(id)), waitTime.getMillis(), TimeUnit.MILLISECONDS); + } + + protected long getCheckpoint(String id) throws IOException { + return getCheckpoint(getBasicTransformStats(id)); + } + + protected long getCheckpoint(Map stats) { + return ((Integer) XContentMapValues.extractValue("checkpointing.last.checkpoint", stats)).longValue(); } protected DateHistogramGroupSource createDateHistogramGroupSourceWithFixedInterval( diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 4b2da731351d7..ff52f5e267655 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -333,6 +333,9 @@ protected void onStart(long now, ActionListener listener) { } }, listener::onFailure); + var shouldMaybeCreateDestIndexForUnattended = context.getCheckpoint() == 0 + && Boolean.TRUE.equals(transformConfig.getSettings().getUnattended()); + ActionListener> fieldMappingsListener = ActionListener.wrap(destIndexMappings -> { if (destIndexMappings.isEmpty() == false) { // If we managed to fetch destination index mappings, we use them from now on ... @@ -344,9 +347,7 @@ protected void onStart(long now, ActionListener listener) { // Since the unattended transform could not have created the destination index yet, we do it here. // This is important to create the destination index explicitly before indexing first documents. Otherwise, the destination // index aliases may be missing. - if (destIndexMappings.isEmpty() - && context.getCheckpoint() == 0 - && Boolean.TRUE.equals(transformConfig.getSettings().getUnattended())) { + if (destIndexMappings.isEmpty() && shouldMaybeCreateDestIndexForUnattended) { doMaybeCreateDestIndex(deducedDestIndexMappings.get(), configurationReadyListener); } else { configurationReadyListener.onResponse(null); @@ -364,7 +365,7 @@ protected void onStart(long now, ActionListener listener) { deducedDestIndexMappings.set(validationResponse.getDestIndexMappings()); if (isContinuous()) { transformsConfigManager.getTransformConfiguration(getJobId(), ActionListener.wrap(config -> { - if (transformConfig.equals(config) && fieldMappings != null) { + if (transformConfig.equals(config) && fieldMappings != null && shouldMaybeCreateDestIndexForUnattended == false) { logger.trace("[{}] transform config has not changed.", getJobId()); configurationReadyListener.onResponse(null); } else {