diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerService.java index fa8c965b4d538..aa617e91aecd7 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerService.java @@ -6,24 +6,137 @@ * compatible open source license. */ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + package org.opensearch.cluster.service; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.Version; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.cluster.AckedClusterStateTaskListener; +import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterManagerMetrics; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.ClusterState.Builder; +import org.opensearch.cluster.ClusterStateTaskConfig; +import org.opensearch.cluster.ClusterStateTaskExecutor; +import org.opensearch.cluster.ClusterStateTaskExecutor.ClusterTasksResult; +import org.opensearch.cluster.ClusterStateTaskListener; +import org.opensearch.cluster.coordination.ClusterStatePublisher; +import org.opensearch.cluster.coordination.FailedToCommitClusterStateException; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.common.Nullable; +import org.opensearch.common.Priority; import org.opensearch.common.annotation.PublicApi; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.CountDown; +import org.opensearch.common.util.concurrent.FutureUtils; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; +import org.opensearch.common.util.concurrent.PrioritizedOpenSearchThreadPoolExecutor; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.common.util.concurrent.ThreadContextAccess; +import org.opensearch.core.Assertions; +import org.opensearch.core.common.text.Text; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.discovery.Discovery; +import org.opensearch.node.Node; +import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; +import org.opensearch.telemetry.metrics.tags.Tags; +import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static org.opensearch.common.util.concurrent.OpenSearchExecutors.daemonThreadFactory; + /** * Main Cluster Manager Node Service * * @opensearch.api */ @PublicApi(since = "2.2.0") -public class ClusterManagerService extends MasterService { +public class ClusterManagerService extends AbstractLifecycleComponent { + private static final Logger logger = LogManager.getLogger(ClusterManagerService.class); + + public static final Setting MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING = Setting.positiveTimeSetting( + "cluster.service.slow_master_task_logging_threshold", + TimeValue.timeValueSeconds(10), + Setting.Property.Dynamic, + Setting.Property.NodeScope, + Setting.Property.Deprecated + ); + // The setting below is going to replace the above. + // To keep backwards compatibility, the old usage is remained, and it's also used as the fallback for the new usage. + public static final Setting CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING = Setting.positiveTimeSetting( + "cluster.service.slow_cluster_manager_task_logging_threshold", + MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + static final String CLUSTER_MANAGER_UPDATE_THREAD_NAME = "clusterManagerService#updateTask"; + + ClusterStatePublisher clusterStatePublisher; + + private final String nodeName; + + private java.util.function.Supplier clusterStateSupplier; + + private volatile TimeValue slowTaskLoggingThreshold; + + protected final ThreadPool threadPool; + + private volatile PrioritizedOpenSearchThreadPoolExecutor threadPoolExecutor; + private volatile Batcher taskBatcher; + protected final ClusterManagerTaskThrottler clusterManagerTaskThrottler; + private final ClusterManagerThrottlingStats throttlingStats; + private final ClusterStateStats stateStats; + private final ClusterManagerMetrics clusterManagerMetrics; public ClusterManagerService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { - super(settings, clusterSettings, threadPool); + this(settings, clusterSettings, threadPool, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE)); } public ClusterManagerService( @@ -32,6 +145,885 @@ public ClusterManagerService( ThreadPool threadPool, ClusterManagerMetrics clusterManagerMetrics ) { - super(settings, clusterSettings, threadPool, clusterManagerMetrics); + this.nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings)); + + this.slowTaskLoggingThreshold = CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer( + CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, + this::setSlowTaskLoggingThreshold + ); + + this.throttlingStats = new ClusterManagerThrottlingStats(); + this.clusterManagerTaskThrottler = new ClusterManagerTaskThrottler( + settings, + clusterSettings, + this::getMinNodeVersion, + throttlingStats + ); + this.stateStats = new ClusterStateStats(); + this.threadPool = threadPool; + this.clusterManagerMetrics = clusterManagerMetrics; + } + + private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) { + this.slowTaskLoggingThreshold = slowTaskLoggingThreshold; + } + + public synchronized void setClusterStatePublisher(ClusterStatePublisher publisher) { + clusterStatePublisher = publisher; + } + + public synchronized void setClusterStateSupplier(java.util.function.Supplier clusterStateSupplier) { + this.clusterStateSupplier = clusterStateSupplier; + } + + @Override + protected synchronized void doStart() { + Objects.requireNonNull(clusterStatePublisher, "please set a cluster state publisher before starting"); + Objects.requireNonNull(clusterStateSupplier, "please set a cluster state supplier before starting"); + threadPoolExecutor = createThreadPoolExecutor(); + taskBatcher = new Batcher(logger, threadPoolExecutor, clusterManagerTaskThrottler); + } + + protected PrioritizedOpenSearchThreadPoolExecutor createThreadPoolExecutor() { + return OpenSearchExecutors.newSinglePrioritizing( + nodeName + "/" + CLUSTER_MANAGER_UPDATE_THREAD_NAME, + daemonThreadFactory(nodeName, CLUSTER_MANAGER_UPDATE_THREAD_NAME), + threadPool.getThreadContext(), + threadPool.scheduler() + ); + } + + @SuppressWarnings("unchecked") + class Batcher extends TaskBatcher { + + Batcher(Logger logger, PrioritizedOpenSearchThreadPoolExecutor threadExecutor, TaskBatcherListener taskBatcherListener) { + super(logger, threadExecutor, taskBatcherListener); + } + + @Override + protected void onTimeout(List tasks, TimeValue timeout) { + threadPool.generic() + .execute( + () -> tasks.forEach( + task -> ((UpdateTask) task).listener.onFailure( + task.source, + new ProcessClusterEventTimeoutException(timeout, task.source) + ) + ) + ); + } + + @Override + protected void run(Object batchingKey, List tasks, Function taskSummaryGenerator) { + ClusterStateTaskExecutor taskExecutor = (ClusterStateTaskExecutor) batchingKey; + List updateTasks = (List) tasks; + runTasks(new TaskInputs(taskExecutor, updateTasks, taskSummaryGenerator)); + } + + class UpdateTask extends BatchedTask { + final ClusterStateTaskListener listener; + + UpdateTask( + Priority priority, + String source, + Object task, + ClusterStateTaskListener listener, + ClusterStateTaskExecutor executor + ) { + super(priority, source, executor, task); + this.listener = listener; + } + + @Override + public String describeTasks(List tasks) { + return ((ClusterStateTaskExecutor) batchingKey).describeTasks( + tasks.stream().map(BatchedTask::getTask).collect(Collectors.toList()) + ); + } + } + } + + @Override + protected synchronized void doStop() { + ThreadPool.terminate(threadPoolExecutor, 10, TimeUnit.SECONDS); + } + + @Override + protected synchronized void doClose() {} + + /** + * The current cluster state exposed by the discovery layer. Package-visible for tests. + */ + ClusterState state() { + return clusterStateSupplier.get(); + } + + private static boolean isClusterManagerUpdateThread() { + return Thread.currentThread().getName().contains(CLUSTER_MANAGER_UPDATE_THREAD_NAME); + } + + public static boolean assertClusterManagerUpdateThread() { + assert isClusterManagerUpdateThread() : "not called from the cluster-manager service thread"; + return true; + } + + public static boolean assertNotClusterManagerUpdateThread(String reason) { + assert isClusterManagerUpdateThread() == false : "Expected current thread [" + + Thread.currentThread() + + "] to not be the cluster-manager service thread. Reason: [" + + reason + + "]"; + return true; + } + + private void runTasks(TaskInputs taskInputs) { + final String summary; + if (logger.isTraceEnabled()) { + summary = taskInputs.taskSummaryGenerator.apply(true); + } else { + summary = taskInputs.taskSummaryGenerator.apply(false); + } + + if (!lifecycle.started()) { + logger.debug("processing [{}]: ignoring, cluster-manager service not started", summary); + return; + } + + if (logger.isTraceEnabled()) { + logger.trace("executing cluster state update for [{}]", summary); + } else { + logger.debug("executing cluster state update for [{}]", summary); + } + + final ClusterState previousClusterState = state(); + + if (!previousClusterState.nodes().isLocalNodeElectedClusterManager() && taskInputs.runOnlyWhenClusterManager()) { + logger.debug("failing [{}]: local node is no longer cluster-manager", summary); + taskInputs.onNoLongerClusterManager(); + return; + } + + final long computationStartTime = threadPool.preciseRelativeTimeInNanos(); + final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState, summary); + taskOutputs.notifyFailedTasks(); + final TimeValue computationTime = getTimeSince(computationStartTime); + logExecutionTime(computationTime, "compute cluster state update", summary); + + clusterManagerMetrics.recordLatency( + clusterManagerMetrics.clusterStateComputeHistogram, + (double) computationTime.getMillis(), + Optional.of(Tags.create().addTag("Operation", taskInputs.executor.getClass().getSimpleName())) + ); + + if (taskOutputs.clusterStateUnchanged()) { + final long notificationStartTime = threadPool.preciseRelativeTimeInNanos(); + taskOutputs.notifySuccessfulTasksOnUnchangedClusterState(); + final TimeValue executionTime = getTimeSince(notificationStartTime); + logExecutionTime(executionTime, "notify listeners on unchanged cluster state", summary); + } else { + final ClusterState newClusterState = taskOutputs.newClusterState; + if (logger.isTraceEnabled()) { + logger.trace("cluster state updated, source [{}]\n{}", summary, newClusterState); + } else { + logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), summary); + } + final long publicationStartTime = threadPool.preciseRelativeTimeInNanos(); + try { + ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState); + // new cluster state, notify all listeners + final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta(); + if (nodesDelta.hasChanges() && logger.isInfoEnabled()) { + String nodesDeltaSummary = nodesDelta.shortSummary(); + if (nodesDeltaSummary.length() > 0) { + logger.info( + "{}, term: {}, version: {}, delta: {}", + summary, + newClusterState.term(), + newClusterState.version(), + nodesDeltaSummary + ); + } + } + + logger.debug("publishing cluster state version [{}]", newClusterState.version()); + publish(clusterChangedEvent, taskOutputs, publicationStartTime); + } catch (Exception e) { + handleException(summary, publicationStartTime, newClusterState, e); + } + } + } + + private TimeValue getTimeSince(long startTimeNanos) { + return TimeValue.timeValueMillis(TimeValue.nsecToMSec(threadPool.preciseRelativeTimeInNanos() - startTimeNanos)); + } + + protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeNanos) { + final PlainActionFuture fut = new PlainActionFuture() { + @Override + protected boolean blockingAllowed() { + return isClusterManagerUpdateThread() || super.blockingAllowed(); + } + }; + clusterStatePublisher.publish(clusterChangedEvent, fut, taskOutputs.createAckListener(threadPool, clusterChangedEvent.state())); + + // indefinitely wait for publication to complete + try { + FutureUtils.get(fut); + onPublicationSuccess(clusterChangedEvent, taskOutputs); + final long durationMillis = getTimeSince(startTimeNanos).millis(); + stateStats.stateUpdateTook(durationMillis); + stateStats.stateUpdated(); + clusterManagerMetrics.recordLatency(clusterManagerMetrics.clusterStatePublishHistogram, (double) durationMillis); + } catch (Exception e) { + stateStats.stateUpdateFailed(); + onPublicationFailed(clusterChangedEvent, taskOutputs, startTimeNanos, e); + } + } + + void onPublicationSuccess(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs) { + final long notificationStartTime = threadPool.preciseRelativeTimeInNanos(); + taskOutputs.processedDifferentClusterState(clusterChangedEvent.previousState(), clusterChangedEvent.state()); + + try { + taskOutputs.clusterStatePublished(clusterChangedEvent); + } catch (Exception e) { + logger.error( + () -> new ParameterizedMessage( + "exception thrown while notifying executor of new cluster state publication [{}]", + clusterChangedEvent.source() + ), + e + ); + } + final TimeValue executionTime = getTimeSince(notificationStartTime); + logExecutionTime( + executionTime, + "notify listeners on successful publication of cluster state (version: " + + clusterChangedEvent.state().version() + + ", uuid: " + + clusterChangedEvent.state().stateUUID() + + ')', + clusterChangedEvent.source() + ); + } + + void onPublicationFailed(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeMillis, Exception exception) { + if (exception instanceof FailedToCommitClusterStateException) { + final long version = clusterChangedEvent.state().version(); + logger.warn( + () -> new ParameterizedMessage( + "failing [{}]: failed to commit cluster state version [{}]", + clusterChangedEvent.source(), + version + ), + exception + ); + taskOutputs.publishingFailed((FailedToCommitClusterStateException) exception); + } else { + handleException(clusterChangedEvent.source(), startTimeMillis, clusterChangedEvent.state(), exception); + } + } + + private void handleException(String summary, long startTimeMillis, ClusterState newClusterState, Exception e) { + final TimeValue executionTime = getTimeSince(startTimeMillis); + final long version = newClusterState.version(); + final String stateUUID = newClusterState.stateUUID(); + final String fullState = newClusterState.toString(); + logger.warn( + new ParameterizedMessage( + "took [{}] and then failed to publish updated cluster state (version: {}, uuid: {}) for [{}]:\n{}", + executionTime, + version, + stateUUID, + summary, + fullState + ), + e + ); + // TODO: do we want to call updateTask.onFailure here? + } + + private TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState, String taskSummary) { + ClusterTasksResult clusterTasksResult = executeTasks(taskInputs, previousClusterState, taskSummary); + ClusterState newClusterState = patchVersions(previousClusterState, clusterTasksResult); + return new TaskOutputs( + taskInputs, + previousClusterState, + newClusterState, + getNonFailedTasks(taskInputs, clusterTasksResult), + clusterTasksResult.executionResults + ); + } + + private ClusterState patchVersions(ClusterState previousClusterState, ClusterTasksResult executionResult) { + ClusterState newClusterState = executionResult.resultingState; + + if (previousClusterState != newClusterState) { + // only the cluster-manager controls the version numbers + Builder builder = incrementVersion(newClusterState); + if (previousClusterState.routingTable() != newClusterState.routingTable()) { + builder.routingTable( + RoutingTable.builder(newClusterState.routingTable()).version(newClusterState.routingTable().version() + 1).build() + ); + } + if (previousClusterState.metadata() != newClusterState.metadata()) { + builder.metadata(Metadata.builder(newClusterState.metadata()).version(newClusterState.metadata().version() + 1)); + } + + newClusterState = builder.build(); + } + + return newClusterState; + } + + public Builder incrementVersion(ClusterState clusterState) { + return ClusterState.builder(clusterState).incrementVersion(); + } + + /** + * Submits a cluster state update task; unlike {@link #submitStateUpdateTask(String, Object, ClusterStateTaskConfig, + * ClusterStateTaskExecutor, ClusterStateTaskListener)}, submitted updates will not be batched. + * + * @param source the source of the cluster state update task + * @param updateTask the full context for the cluster state update + * task + */ + public & ClusterStateTaskListener> void submitStateUpdateTask( + String source, + T updateTask + ) { + submitStateUpdateTask(source, updateTask, updateTask, updateTask, updateTask); + } + + /** + * Submits a cluster state update task; submitted updates will be + * batched across the same instance of executor. The exact batching + * semantics depend on the underlying implementation but a rough + * guideline is that if the update task is submitted while there + * are pending update tasks for the same executor, these update + * tasks will all be executed on the executor in a single batch + * + * @param source the source of the cluster state update task + * @param task the state needed for the cluster state update task + * @param config the cluster state update task configuration + * @param executor the cluster state update task executor; tasks + * that share the same executor will be executed + * batches on this executor + * @param listener callback after the cluster state update task + * completes + * @param the type of the cluster state update task state + */ + public void submitStateUpdateTask( + String source, + T task, + ClusterStateTaskConfig config, + ClusterStateTaskExecutor executor, + ClusterStateTaskListener listener + ) { + submitStateUpdateTasks(source, Collections.singletonMap(task, listener), config, executor); + } + + /** + * Output created by executing a set of tasks provided as TaskInputs + */ + class TaskOutputs { + final TaskInputs taskInputs; + final ClusterState previousClusterState; + final ClusterState newClusterState; + final List nonFailedTasks; + final Map executionResults; + + TaskOutputs( + TaskInputs taskInputs, + ClusterState previousClusterState, + ClusterState newClusterState, + List nonFailedTasks, + Map executionResults + ) { + this.taskInputs = taskInputs; + this.previousClusterState = previousClusterState; + this.newClusterState = newClusterState; + this.nonFailedTasks = nonFailedTasks; + this.executionResults = executionResults; + } + + void publishingFailed(FailedToCommitClusterStateException t) { + nonFailedTasks.forEach(task -> task.listener.onFailure(task.source(), t)); + } + + void processedDifferentClusterState(ClusterState previousClusterState, ClusterState newClusterState) { + nonFailedTasks.forEach(task -> task.listener.clusterStateProcessed(task.source(), previousClusterState, newClusterState)); + } + + void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { + taskInputs.executor.clusterStatePublished(clusterChangedEvent); + } + + Discovery.AckListener createAckListener(ThreadPool threadPool, ClusterState newClusterState) { + return new DelegatingAckListener( + nonFailedTasks.stream() + .filter(task -> task.listener instanceof AckedClusterStateTaskListener) + .map( + task -> new AckCountDownListener( + (AckedClusterStateTaskListener) task.listener, + newClusterState.version(), + newClusterState.nodes(), + threadPool + ) + ) + .collect(Collectors.toList()) + ); + } + + boolean clusterStateUnchanged() { + return previousClusterState == newClusterState; + } + + void notifyFailedTasks() { + // fail all tasks that have failed + for (Batcher.UpdateTask updateTask : taskInputs.updateTasks) { + assert executionResults.containsKey(updateTask.task) : "missing " + updateTask; + final ClusterStateTaskExecutor.TaskResult taskResult = executionResults.get(updateTask.task); + if (taskResult.isSuccess() == false) { + updateTask.listener.onFailure(updateTask.source(), taskResult.getFailure()); + } + } + } + + void notifySuccessfulTasksOnUnchangedClusterState() { + nonFailedTasks.forEach(task -> { + if (task.listener instanceof AckedClusterStateTaskListener) { + // no need to wait for ack if nothing changed, the update can be counted as acknowledged + ((AckedClusterStateTaskListener) task.listener).onAllNodesAcked(null); + } + task.listener.clusterStateProcessed(task.source(), newClusterState, newClusterState); + }); + } + } + + /** + * Returns the tasks that are pending. + */ + public List pendingTasks() { + return Arrays.stream(threadPoolExecutor.getPending()).map(pending -> { + assert pending.task instanceof SourcePrioritizedRunnable + : "thread pool executor should only use SourcePrioritizedRunnable instances but found: " + + pending.task.getClass().getName(); + SourcePrioritizedRunnable task = (SourcePrioritizedRunnable) pending.task; + return new PendingClusterTask( + pending.insertionOrder, + pending.priority, + new Text(task.source()), + task.getAgeInMillis(), + pending.executing + ); + }).collect(Collectors.toList()); + } + + /** + * Returns the number of throttled pending tasks. + */ + public long numberOfThrottledPendingTasks() { + return throttlingStats.getTotalThrottledTaskCount(); + } + + /** + * Returns the stats of throttled pending tasks. + */ + public ClusterManagerThrottlingStats getThrottlingStats() { + return throttlingStats; } + + /** + * Returns the min version of nodes in cluster + */ + public Version getMinNodeVersion() { + return state().getNodes().getMinNodeVersion(); + } + + /** + * Returns the number of currently pending tasks. + */ + public int numberOfPendingTasks() { + return threadPoolExecutor.getNumberOfPendingTasks(); + } + + /** + * Returns the maximum wait time for tasks in the queue + * + * @return A zero time value if the queue is empty, otherwise the time value oldest task waiting in the queue + */ + public TimeValue getMaxTaskWaitTime() { + return threadPoolExecutor.getMaxTaskWaitTime(); + } + + private SafeClusterStateTaskListener safe(ClusterStateTaskListener listener, Supplier contextSupplier) { + if (listener instanceof AckedClusterStateTaskListener) { + return new SafeAckedClusterStateTaskListener((AckedClusterStateTaskListener) listener, contextSupplier, logger); + } else { + return new SafeClusterStateTaskListener(listener, contextSupplier, logger); + } + } + + private static class SafeClusterStateTaskListener implements ClusterStateTaskListener { + private final ClusterStateTaskListener listener; + protected final Supplier context; + private final Logger logger; + + SafeClusterStateTaskListener(ClusterStateTaskListener listener, Supplier context, Logger logger) { + this.listener = listener; + this.context = context; + this.logger = logger; + } + + @Override + public void onFailure(String source, Exception e) { + try (ThreadContext.StoredContext ignore = context.get()) { + listener.onFailure(source, e); + } catch (Exception inner) { + inner.addSuppressed(e); + logger.error(() -> new ParameterizedMessage("exception thrown by listener notifying of failure from [{}]", source), inner); + } + } + + @Override + public void onNoLongerClusterManager(String source) { + try (ThreadContext.StoredContext ignore = context.get()) { + listener.onNoLongerClusterManager(source); + } catch (Exception e) { + logger.error( + () -> new ParameterizedMessage( + "exception thrown by listener while notifying no longer cluster-manager from [{}]", + source + ), + e + ); + } + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + try (ThreadContext.StoredContext ignore = context.get()) { + listener.clusterStateProcessed(source, oldState, newState); + } catch (Exception e) { + logger.error( + () -> new ParameterizedMessage( + "exception thrown by listener while notifying of cluster state processed from [{}], old cluster state:\n" + + "{}\nnew cluster state:\n{}", + source, + oldState, + newState + ), + e + ); + } + } + } + + private static class SafeAckedClusterStateTaskListener extends SafeClusterStateTaskListener implements AckedClusterStateTaskListener { + private final AckedClusterStateTaskListener listener; + private final Logger logger; + + SafeAckedClusterStateTaskListener( + AckedClusterStateTaskListener listener, + Supplier context, + Logger logger + ) { + super(listener, context, logger); + this.listener = listener; + this.logger = logger; + } + + @Override + public boolean mustAck(DiscoveryNode discoveryNode) { + return listener.mustAck(discoveryNode); + } + + @Override + public void onAllNodesAcked(@Nullable Exception e) { + try (ThreadContext.StoredContext ignore = context.get()) { + listener.onAllNodesAcked(e); + } catch (Exception inner) { + inner.addSuppressed(e); + logger.error("exception thrown by listener while notifying on all nodes acked", inner); + } + } + + @Override + public void onAckTimeout() { + try (ThreadContext.StoredContext ignore = context.get()) { + listener.onAckTimeout(); + } catch (Exception e) { + logger.error("exception thrown by listener while notifying on ack timeout", e); + } + } + + @Override + public TimeValue ackTimeout() { + return listener.ackTimeout(); + } + } + + private void logExecutionTime(TimeValue executionTime, String activity, String summary) { + if (executionTime.getMillis() > slowTaskLoggingThreshold.getMillis()) { + logger.warn("took [{}], which is over [{}], to {} for [{}]", executionTime, slowTaskLoggingThreshold, activity, summary); + } else { + logger.debug("took [{}] to {} for [{}]", executionTime, activity, summary); + } + } + + private static class DelegatingAckListener implements Discovery.AckListener { + + private final List listeners; + + private DelegatingAckListener(List listeners) { + this.listeners = listeners; + } + + @Override + public void onCommit(TimeValue commitTime) { + for (Discovery.AckListener listener : listeners) { + listener.onCommit(commitTime); + } + } + + @Override + public void onNodeAck(DiscoveryNode node, @Nullable Exception e) { + for (Discovery.AckListener listener : listeners) { + listener.onNodeAck(node, e); + } + } + } + + private static class AckCountDownListener implements Discovery.AckListener { + + private static final Logger logger = LogManager.getLogger(AckCountDownListener.class); + + private final AckedClusterStateTaskListener ackedTaskListener; + private final CountDown countDown; + private final DiscoveryNode clusterManagerNode; + private final ThreadPool threadPool; + private final long clusterStateVersion; + private volatile Scheduler.Cancellable ackTimeoutCallback; + private Exception lastFailure; + + AckCountDownListener( + AckedClusterStateTaskListener ackedTaskListener, + long clusterStateVersion, + DiscoveryNodes nodes, + ThreadPool threadPool + ) { + this.ackedTaskListener = ackedTaskListener; + this.clusterStateVersion = clusterStateVersion; + this.threadPool = threadPool; + this.clusterManagerNode = nodes.getClusterManagerNode(); + int countDown = 0; + for (DiscoveryNode node : nodes) { + // we always wait for at least the cluster-manager node + if (node.equals(clusterManagerNode) || ackedTaskListener.mustAck(node)) { + countDown++; + } + } + logger.trace("expecting {} acknowledgements for cluster_state update (version: {})", countDown, clusterStateVersion); + this.countDown = new CountDown(countDown + 1); // we also wait for onCommit to be called + } + + @Override + public void onCommit(TimeValue commitTime) { + TimeValue ackTimeout = ackedTaskListener.ackTimeout(); + if (ackTimeout == null) { + ackTimeout = TimeValue.ZERO; + } + final TimeValue timeLeft = TimeValue.timeValueNanos(Math.max(0, ackTimeout.nanos() - commitTime.nanos())); + if (timeLeft.nanos() == 0L) { + onTimeout(); + } else if (countDown.countDown()) { + finish(); + } else { + this.ackTimeoutCallback = threadPool.schedule(this::onTimeout, timeLeft, ThreadPool.Names.GENERIC); + // re-check if onNodeAck has not completed while we were scheduling the timeout + if (countDown.isCountedDown()) { + ackTimeoutCallback.cancel(); + } + } + } + + @Override + public void onNodeAck(DiscoveryNode node, @Nullable Exception e) { + if (node.equals(clusterManagerNode) == false && ackedTaskListener.mustAck(node) == false) { + return; + } + if (e == null) { + logger.trace("ack received from node [{}], cluster_state update (version: {})", node, clusterStateVersion); + } else { + this.lastFailure = e; + logger.debug( + () -> new ParameterizedMessage( + "ack received from node [{}], cluster_state update (version: {})", + node, + clusterStateVersion + ), + e + ); + } + + if (countDown.countDown()) { + finish(); + } + } + + private void finish() { + logger.trace("all expected nodes acknowledged cluster_state update (version: {})", clusterStateVersion); + if (ackTimeoutCallback != null) { + ackTimeoutCallback.cancel(); + } + ackedTaskListener.onAllNodesAcked(lastFailure); + } + + public void onTimeout() { + if (countDown.fastForward()) { + logger.trace("timeout waiting for acknowledgement for cluster_state update (version: {})", clusterStateVersion); + ackedTaskListener.onAckTimeout(); + } + } + } + + private ClusterTasksResult executeTasks(TaskInputs taskInputs, ClusterState previousClusterState, String taskSummary) { + ClusterTasksResult clusterTasksResult; + try { + List inputs = taskInputs.updateTasks.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList()); + clusterTasksResult = taskInputs.executor.execute(previousClusterState, inputs); + if (previousClusterState != clusterTasksResult.resultingState + && previousClusterState.nodes().isLocalNodeElectedClusterManager() + && (clusterTasksResult.resultingState.nodes().isLocalNodeElectedClusterManager() == false)) { + throw new AssertionError("update task submitted to ClusterManagerService cannot remove cluster-manager"); + } + } catch (Exception e) { + logger.trace( + () -> new ParameterizedMessage( + "failed to execute cluster state update (on version: [{}], uuid: [{}]) for [{}]\n{}{}{}", + previousClusterState.version(), + previousClusterState.stateUUID(), + taskSummary, + previousClusterState.nodes(), + previousClusterState.routingTable(), + previousClusterState.getRoutingNodes() + ), // may be expensive => construct message lazily + e + ); + clusterTasksResult = ClusterTasksResult.builder() + .failures(taskInputs.updateTasks.stream().map(updateTask -> updateTask.task)::iterator, e) + .build(previousClusterState); + } + + assert clusterTasksResult.executionResults != null; + assert clusterTasksResult.executionResults.size() == taskInputs.updateTasks.size() : String.format( + Locale.ROOT, + "expected [%d] task result%s but was [%d]", + taskInputs.updateTasks.size(), + taskInputs.updateTasks.size() == 1 ? "" : "s", + clusterTasksResult.executionResults.size() + ); + if (Assertions.ENABLED) { + ClusterTasksResult finalClusterTasksResult = clusterTasksResult; + taskInputs.updateTasks.forEach(updateTask -> { + assert finalClusterTasksResult.executionResults.containsKey(updateTask.task) : "missing task result for " + updateTask; + }); + } + + return clusterTasksResult; + } + + private List getNonFailedTasks(TaskInputs taskInputs, ClusterTasksResult clusterTasksResult) { + return taskInputs.updateTasks.stream().filter(updateTask -> { + assert clusterTasksResult.executionResults.containsKey(updateTask.task) : "missing " + updateTask; + final ClusterStateTaskExecutor.TaskResult taskResult = clusterTasksResult.executionResults.get(updateTask.task); + return taskResult.isSuccess(); + }).collect(Collectors.toList()); + } + + /** + * Represents a set of tasks to be processed together with their executor + */ + private class TaskInputs { + + final List updateTasks; + final ClusterStateTaskExecutor executor; + final Function taskSummaryGenerator; + + TaskInputs( + ClusterStateTaskExecutor executor, + List updateTasks, + final Function taskSummaryGenerator + ) { + this.executor = executor; + this.updateTasks = updateTasks; + this.taskSummaryGenerator = taskSummaryGenerator; + } + + boolean runOnlyWhenClusterManager() { + return executor.runOnlyOnClusterManager(); + } + + void onNoLongerClusterManager() { + updateTasks.forEach(task -> task.listener.onNoLongerClusterManager(task.source())); + } + } + + /** + * Functionality for register task key to cluster manager node. + * + * @param taskKey - task key of task + * @param throttlingEnabled - throttling is enabled for task or not i.e does data node perform retries on it or not + * @return throttling task key which needs to be passed while submitting task to cluster manager + */ + public ClusterManagerTaskThrottler.ThrottlingKey registerClusterManagerTask(String taskKey, boolean throttlingEnabled) { + return clusterManagerTaskThrottler.registerClusterManagerTask(taskKey, throttlingEnabled); + } + + /** + * Submits a batch of cluster state update tasks; submitted updates are guaranteed to be processed together, + * potentially with more tasks of the same executor. + * + * @param source the source of the cluster state update task + * @param tasks a map of update tasks and their corresponding listeners + * @param config the cluster state update task configuration + * @param executor the cluster state update task executor; tasks + * that share the same executor will be executed + * batches on this executor + * @param the type of the cluster state update task state + */ + public void submitStateUpdateTasks( + final String source, + final Map tasks, + final ClusterStateTaskConfig config, + final ClusterStateTaskExecutor executor + ) { + if (!lifecycle.started()) { + return; + } + final ThreadContext threadContext = threadPool.getThreadContext(); + final Supplier supplier = threadContext.newRestorableContext(true); + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + ThreadContextAccess.doPrivilegedVoid(threadContext::markAsSystemContext); + + List safeTasks = tasks.entrySet() + .stream() + .map(e -> taskBatcher.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue(), supplier), executor)) + .collect(Collectors.toList()); + taskBatcher.submitTasks(safeTasks, config.timeout()); + } catch (OpenSearchRejectedExecutionException e) { + // ignore cases where we are shutting down..., there is really nothing interesting + // to be done here... + if (!lifecycle.stoppedOrClosed()) { + throw e; + } + } + } + + public ClusterStateStats getClusterStateStats() { + return stateStats; + } + } diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java index b4f2250f6dec9..72dd8bba54dbf 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java @@ -262,7 +262,7 @@ public ClusterManagerService getClusterManagerService() { /** @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #getClusterManagerService()} */ @Deprecated - public MasterService getMasterService() { + public ClusterManagerService getMasterService() { return clusterManagerService; } diff --git a/server/src/main/java/org/opensearch/cluster/service/MasterService.java b/server/src/main/java/org/opensearch/cluster/service/MasterService.java deleted file mode 100644 index 455e7301a490d..0000000000000 --- a/server/src/main/java/org/opensearch/cluster/service/MasterService.java +++ /dev/null @@ -1,1050 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.cluster.service; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.opensearch.Version; -import org.opensearch.action.support.PlainActionFuture; -import org.opensearch.cluster.AckedClusterStateTaskListener; -import org.opensearch.cluster.ClusterChangedEvent; -import org.opensearch.cluster.ClusterManagerMetrics; -import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.ClusterState.Builder; -import org.opensearch.cluster.ClusterStateTaskConfig; -import org.opensearch.cluster.ClusterStateTaskExecutor; -import org.opensearch.cluster.ClusterStateTaskExecutor.ClusterTasksResult; -import org.opensearch.cluster.ClusterStateTaskListener; -import org.opensearch.cluster.coordination.ClusterStatePublisher; -import org.opensearch.cluster.coordination.FailedToCommitClusterStateException; -import org.opensearch.cluster.metadata.Metadata; -import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException; -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.node.DiscoveryNodes; -import org.opensearch.cluster.routing.RoutingTable; -import org.opensearch.common.Nullable; -import org.opensearch.common.Priority; -import org.opensearch.common.annotation.DeprecatedApi; -import org.opensearch.common.lifecycle.AbstractLifecycleComponent; -import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.settings.Setting; -import org.opensearch.common.settings.Settings; -import org.opensearch.common.unit.TimeValue; -import org.opensearch.common.util.concurrent.CountDown; -import org.opensearch.common.util.concurrent.FutureUtils; -import org.opensearch.common.util.concurrent.OpenSearchExecutors; -import org.opensearch.common.util.concurrent.PrioritizedOpenSearchThreadPoolExecutor; -import org.opensearch.common.util.concurrent.ThreadContext; -import org.opensearch.common.util.concurrent.ThreadContextAccess; -import org.opensearch.core.Assertions; -import org.opensearch.core.common.text.Text; -import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; -import org.opensearch.discovery.Discovery; -import org.opensearch.node.Node; -import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; -import org.opensearch.telemetry.metrics.tags.Tags; -import org.opensearch.threadpool.Scheduler; -import org.opensearch.threadpool.ThreadPool; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; - -import static org.opensearch.common.util.concurrent.OpenSearchExecutors.daemonThreadFactory; - -/** - * Main Master Node Service - * - * @opensearch.api - * @deprecated As of 2.2, because supporting inclusive language, replaced by {@link ClusterManagerService}. - */ -@Deprecated -@DeprecatedApi(since = "2.2.0") -public class MasterService extends AbstractLifecycleComponent { - private static final Logger logger = LogManager.getLogger(MasterService.class); - - public static final Setting MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING = Setting.positiveTimeSetting( - "cluster.service.slow_master_task_logging_threshold", - TimeValue.timeValueSeconds(10), - Setting.Property.Dynamic, - Setting.Property.NodeScope, - Setting.Property.Deprecated - ); - // The setting below is going to replace the above. - // To keep backwards compatibility, the old usage is remained, and it's also used as the fallback for the new usage. - public static final Setting CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING = Setting.positiveTimeSetting( - "cluster.service.slow_cluster_manager_task_logging_threshold", - MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - static final String CLUSTER_MANAGER_UPDATE_THREAD_NAME = "clusterManagerService#updateTask"; - - /** - * @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #CLUSTER_MANAGER_UPDATE_THREAD_NAME} - */ - @Deprecated - static final String MASTER_UPDATE_THREAD_NAME = "masterService#updateTask"; - - ClusterStatePublisher clusterStatePublisher; - - private final String nodeName; - - private java.util.function.Supplier clusterStateSupplier; - - private volatile TimeValue slowTaskLoggingThreshold; - - protected final ThreadPool threadPool; - - private volatile PrioritizedOpenSearchThreadPoolExecutor threadPoolExecutor; - private volatile Batcher taskBatcher; - protected final ClusterManagerTaskThrottler clusterManagerTaskThrottler; - private final ClusterManagerThrottlingStats throttlingStats; - private final ClusterStateStats stateStats; - private final ClusterManagerMetrics clusterManagerMetrics; - - public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { - this(settings, clusterSettings, threadPool, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE)); - } - - public MasterService( - Settings settings, - ClusterSettings clusterSettings, - ThreadPool threadPool, - ClusterManagerMetrics clusterManagerMetrics - ) { - this.nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings)); - - this.slowTaskLoggingThreshold = CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings); - clusterSettings.addSettingsUpdateConsumer( - CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, - this::setSlowTaskLoggingThreshold - ); - - this.throttlingStats = new ClusterManagerThrottlingStats(); - this.clusterManagerTaskThrottler = new ClusterManagerTaskThrottler( - settings, - clusterSettings, - this::getMinNodeVersion, - throttlingStats - ); - this.stateStats = new ClusterStateStats(); - this.threadPool = threadPool; - this.clusterManagerMetrics = clusterManagerMetrics; - } - - private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) { - this.slowTaskLoggingThreshold = slowTaskLoggingThreshold; - } - - public synchronized void setClusterStatePublisher(ClusterStatePublisher publisher) { - clusterStatePublisher = publisher; - } - - public synchronized void setClusterStateSupplier(java.util.function.Supplier clusterStateSupplier) { - this.clusterStateSupplier = clusterStateSupplier; - } - - @Override - protected synchronized void doStart() { - Objects.requireNonNull(clusterStatePublisher, "please set a cluster state publisher before starting"); - Objects.requireNonNull(clusterStateSupplier, "please set a cluster state supplier before starting"); - threadPoolExecutor = createThreadPoolExecutor(); - taskBatcher = new Batcher(logger, threadPoolExecutor, clusterManagerTaskThrottler); - } - - protected PrioritizedOpenSearchThreadPoolExecutor createThreadPoolExecutor() { - return OpenSearchExecutors.newSinglePrioritizing( - nodeName + "/" + CLUSTER_MANAGER_UPDATE_THREAD_NAME, - daemonThreadFactory(nodeName, CLUSTER_MANAGER_UPDATE_THREAD_NAME), - threadPool.getThreadContext(), - threadPool.scheduler() - ); - } - - @SuppressWarnings("unchecked") - class Batcher extends TaskBatcher { - - Batcher(Logger logger, PrioritizedOpenSearchThreadPoolExecutor threadExecutor, TaskBatcherListener taskBatcherListener) { - super(logger, threadExecutor, taskBatcherListener); - } - - @Override - protected void onTimeout(List tasks, TimeValue timeout) { - threadPool.generic() - .execute( - () -> tasks.forEach( - task -> ((UpdateTask) task).listener.onFailure( - task.source, - new ProcessClusterEventTimeoutException(timeout, task.source) - ) - ) - ); - } - - @Override - protected void run(Object batchingKey, List tasks, Function taskSummaryGenerator) { - ClusterStateTaskExecutor taskExecutor = (ClusterStateTaskExecutor) batchingKey; - List updateTasks = (List) tasks; - runTasks(new TaskInputs(taskExecutor, updateTasks, taskSummaryGenerator)); - } - - class UpdateTask extends BatchedTask { - final ClusterStateTaskListener listener; - - UpdateTask( - Priority priority, - String source, - Object task, - ClusterStateTaskListener listener, - ClusterStateTaskExecutor executor - ) { - super(priority, source, executor, task); - this.listener = listener; - } - - @Override - public String describeTasks(List tasks) { - return ((ClusterStateTaskExecutor) batchingKey).describeTasks( - tasks.stream().map(BatchedTask::getTask).collect(Collectors.toList()) - ); - } - } - } - - @Override - protected synchronized void doStop() { - ThreadPool.terminate(threadPoolExecutor, 10, TimeUnit.SECONDS); - } - - @Override - protected synchronized void doClose() {} - - /** - * The current cluster state exposed by the discovery layer. Package-visible for tests. - */ - ClusterState state() { - return clusterStateSupplier.get(); - } - - private static boolean isClusterManagerUpdateThread() { - return Thread.currentThread().getName().contains(CLUSTER_MANAGER_UPDATE_THREAD_NAME) - || Thread.currentThread().getName().contains(MASTER_UPDATE_THREAD_NAME); - } - - public static boolean assertClusterManagerUpdateThread() { - assert isClusterManagerUpdateThread() : "not called from the cluster-manager service thread"; - return true; - } - - public static boolean assertNotClusterManagerUpdateThread(String reason) { - assert isClusterManagerUpdateThread() == false : "Expected current thread [" - + Thread.currentThread() - + "] to not be the cluster-manager service thread. Reason: [" - + reason - + "]"; - return true; - } - - /** @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #assertClusterManagerUpdateThread()} */ - @Deprecated - public static boolean assertMasterUpdateThread() { - return assertClusterManagerUpdateThread(); - } - - /** @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #assertNotClusterManagerUpdateThread(String)} */ - @Deprecated - public static boolean assertNotMasterUpdateThread(String reason) { - return assertNotClusterManagerUpdateThread(reason); - } - - private void runTasks(TaskInputs taskInputs) { - final String summary; - if (logger.isTraceEnabled()) { - summary = taskInputs.taskSummaryGenerator.apply(true); - } else { - summary = taskInputs.taskSummaryGenerator.apply(false); - } - - if (!lifecycle.started()) { - logger.debug("processing [{}]: ignoring, cluster-manager service not started", summary); - return; - } - - if (logger.isTraceEnabled()) { - logger.trace("executing cluster state update for [{}]", summary); - } else { - logger.debug("executing cluster state update for [{}]", summary); - } - - final ClusterState previousClusterState = state(); - - if (!previousClusterState.nodes().isLocalNodeElectedClusterManager() && taskInputs.runOnlyWhenClusterManager()) { - logger.debug("failing [{}]: local node is no longer cluster-manager", summary); - taskInputs.onNoLongerClusterManager(); - return; - } - - final long computationStartTime = threadPool.preciseRelativeTimeInNanos(); - final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState, summary); - taskOutputs.notifyFailedTasks(); - final TimeValue computationTime = getTimeSince(computationStartTime); - logExecutionTime(computationTime, "compute cluster state update", summary); - - clusterManagerMetrics.recordLatency( - clusterManagerMetrics.clusterStateComputeHistogram, - (double) computationTime.getMillis(), - Optional.of(Tags.create().addTag("Operation", taskInputs.executor.getClass().getSimpleName())) - ); - - if (taskOutputs.clusterStateUnchanged()) { - final long notificationStartTime = threadPool.preciseRelativeTimeInNanos(); - taskOutputs.notifySuccessfulTasksOnUnchangedClusterState(); - final TimeValue executionTime = getTimeSince(notificationStartTime); - logExecutionTime(executionTime, "notify listeners on unchanged cluster state", summary); - } else { - final ClusterState newClusterState = taskOutputs.newClusterState; - if (logger.isTraceEnabled()) { - logger.trace("cluster state updated, source [{}]\n{}", summary, newClusterState); - } else { - logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), summary); - } - final long publicationStartTime = threadPool.preciseRelativeTimeInNanos(); - try { - ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState); - // new cluster state, notify all listeners - final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta(); - if (nodesDelta.hasChanges() && logger.isInfoEnabled()) { - String nodesDeltaSummary = nodesDelta.shortSummary(); - if (nodesDeltaSummary.length() > 0) { - logger.info( - "{}, term: {}, version: {}, delta: {}", - summary, - newClusterState.term(), - newClusterState.version(), - nodesDeltaSummary - ); - } - } - - logger.debug("publishing cluster state version [{}]", newClusterState.version()); - publish(clusterChangedEvent, taskOutputs, publicationStartTime); - } catch (Exception e) { - handleException(summary, publicationStartTime, newClusterState, e); - } - } - } - - private TimeValue getTimeSince(long startTimeNanos) { - return TimeValue.timeValueMillis(TimeValue.nsecToMSec(threadPool.preciseRelativeTimeInNanos() - startTimeNanos)); - } - - protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeNanos) { - final PlainActionFuture fut = new PlainActionFuture() { - @Override - protected boolean blockingAllowed() { - return isClusterManagerUpdateThread() || super.blockingAllowed(); - } - }; - clusterStatePublisher.publish(clusterChangedEvent, fut, taskOutputs.createAckListener(threadPool, clusterChangedEvent.state())); - - // indefinitely wait for publication to complete - try { - FutureUtils.get(fut); - onPublicationSuccess(clusterChangedEvent, taskOutputs); - final long durationMillis = getTimeSince(startTimeNanos).millis(); - stateStats.stateUpdateTook(durationMillis); - stateStats.stateUpdated(); - clusterManagerMetrics.recordLatency(clusterManagerMetrics.clusterStatePublishHistogram, (double) durationMillis); - } catch (Exception e) { - stateStats.stateUpdateFailed(); - onPublicationFailed(clusterChangedEvent, taskOutputs, startTimeNanos, e); - } - } - - void onPublicationSuccess(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs) { - final long notificationStartTime = threadPool.preciseRelativeTimeInNanos(); - taskOutputs.processedDifferentClusterState(clusterChangedEvent.previousState(), clusterChangedEvent.state()); - - try { - taskOutputs.clusterStatePublished(clusterChangedEvent); - } catch (Exception e) { - logger.error( - () -> new ParameterizedMessage( - "exception thrown while notifying executor of new cluster state publication [{}]", - clusterChangedEvent.source() - ), - e - ); - } - final TimeValue executionTime = getTimeSince(notificationStartTime); - logExecutionTime( - executionTime, - "notify listeners on successful publication of cluster state (version: " - + clusterChangedEvent.state().version() - + ", uuid: " - + clusterChangedEvent.state().stateUUID() - + ')', - clusterChangedEvent.source() - ); - } - - void onPublicationFailed(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeMillis, Exception exception) { - if (exception instanceof FailedToCommitClusterStateException) { - final long version = clusterChangedEvent.state().version(); - logger.warn( - () -> new ParameterizedMessage( - "failing [{}]: failed to commit cluster state version [{}]", - clusterChangedEvent.source(), - version - ), - exception - ); - taskOutputs.publishingFailed((FailedToCommitClusterStateException) exception); - } else { - handleException(clusterChangedEvent.source(), startTimeMillis, clusterChangedEvent.state(), exception); - } - } - - private void handleException(String summary, long startTimeMillis, ClusterState newClusterState, Exception e) { - final TimeValue executionTime = getTimeSince(startTimeMillis); - final long version = newClusterState.version(); - final String stateUUID = newClusterState.stateUUID(); - final String fullState = newClusterState.toString(); - logger.warn( - new ParameterizedMessage( - "took [{}] and then failed to publish updated cluster state (version: {}, uuid: {}) for [{}]:\n{}", - executionTime, - version, - stateUUID, - summary, - fullState - ), - e - ); - // TODO: do we want to call updateTask.onFailure here? - } - - private TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState, String taskSummary) { - ClusterTasksResult clusterTasksResult = executeTasks(taskInputs, previousClusterState, taskSummary); - ClusterState newClusterState = patchVersions(previousClusterState, clusterTasksResult); - return new TaskOutputs( - taskInputs, - previousClusterState, - newClusterState, - getNonFailedTasks(taskInputs, clusterTasksResult), - clusterTasksResult.executionResults - ); - } - - private ClusterState patchVersions(ClusterState previousClusterState, ClusterTasksResult executionResult) { - ClusterState newClusterState = executionResult.resultingState; - - if (previousClusterState != newClusterState) { - // only the cluster-manager controls the version numbers - Builder builder = incrementVersion(newClusterState); - if (previousClusterState.routingTable() != newClusterState.routingTable()) { - builder.routingTable( - RoutingTable.builder(newClusterState.routingTable()).version(newClusterState.routingTable().version() + 1).build() - ); - } - if (previousClusterState.metadata() != newClusterState.metadata()) { - builder.metadata(Metadata.builder(newClusterState.metadata()).version(newClusterState.metadata().version() + 1)); - } - - newClusterState = builder.build(); - } - - return newClusterState; - } - - public Builder incrementVersion(ClusterState clusterState) { - return ClusterState.builder(clusterState).incrementVersion(); - } - - /** - * Submits a cluster state update task; unlike {@link #submitStateUpdateTask(String, Object, ClusterStateTaskConfig, - * ClusterStateTaskExecutor, ClusterStateTaskListener)}, submitted updates will not be batched. - * - * @param source the source of the cluster state update task - * @param updateTask the full context for the cluster state update - * task - */ - public & ClusterStateTaskListener> void submitStateUpdateTask( - String source, - T updateTask - ) { - submitStateUpdateTask(source, updateTask, updateTask, updateTask, updateTask); - } - - /** - * Submits a cluster state update task; submitted updates will be - * batched across the same instance of executor. The exact batching - * semantics depend on the underlying implementation but a rough - * guideline is that if the update task is submitted while there - * are pending update tasks for the same executor, these update - * tasks will all be executed on the executor in a single batch - * - * @param source the source of the cluster state update task - * @param task the state needed for the cluster state update task - * @param config the cluster state update task configuration - * @param executor the cluster state update task executor; tasks - * that share the same executor will be executed - * batches on this executor - * @param listener callback after the cluster state update task - * completes - * @param the type of the cluster state update task state - */ - public void submitStateUpdateTask( - String source, - T task, - ClusterStateTaskConfig config, - ClusterStateTaskExecutor executor, - ClusterStateTaskListener listener - ) { - submitStateUpdateTasks(source, Collections.singletonMap(task, listener), config, executor); - } - - /** - * Output created by executing a set of tasks provided as TaskInputs - */ - class TaskOutputs { - final TaskInputs taskInputs; - final ClusterState previousClusterState; - final ClusterState newClusterState; - final List nonFailedTasks; - final Map executionResults; - - TaskOutputs( - TaskInputs taskInputs, - ClusterState previousClusterState, - ClusterState newClusterState, - List nonFailedTasks, - Map executionResults - ) { - this.taskInputs = taskInputs; - this.previousClusterState = previousClusterState; - this.newClusterState = newClusterState; - this.nonFailedTasks = nonFailedTasks; - this.executionResults = executionResults; - } - - void publishingFailed(FailedToCommitClusterStateException t) { - nonFailedTasks.forEach(task -> task.listener.onFailure(task.source(), t)); - } - - void processedDifferentClusterState(ClusterState previousClusterState, ClusterState newClusterState) { - nonFailedTasks.forEach(task -> task.listener.clusterStateProcessed(task.source(), previousClusterState, newClusterState)); - } - - void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { - taskInputs.executor.clusterStatePublished(clusterChangedEvent); - } - - Discovery.AckListener createAckListener(ThreadPool threadPool, ClusterState newClusterState) { - return new DelegatingAckListener( - nonFailedTasks.stream() - .filter(task -> task.listener instanceof AckedClusterStateTaskListener) - .map( - task -> new AckCountDownListener( - (AckedClusterStateTaskListener) task.listener, - newClusterState.version(), - newClusterState.nodes(), - threadPool - ) - ) - .collect(Collectors.toList()) - ); - } - - boolean clusterStateUnchanged() { - return previousClusterState == newClusterState; - } - - void notifyFailedTasks() { - // fail all tasks that have failed - for (Batcher.UpdateTask updateTask : taskInputs.updateTasks) { - assert executionResults.containsKey(updateTask.task) : "missing " + updateTask; - final ClusterStateTaskExecutor.TaskResult taskResult = executionResults.get(updateTask.task); - if (taskResult.isSuccess() == false) { - updateTask.listener.onFailure(updateTask.source(), taskResult.getFailure()); - } - } - } - - void notifySuccessfulTasksOnUnchangedClusterState() { - nonFailedTasks.forEach(task -> { - if (task.listener instanceof AckedClusterStateTaskListener) { - // no need to wait for ack if nothing changed, the update can be counted as acknowledged - ((AckedClusterStateTaskListener) task.listener).onAllNodesAcked(null); - } - task.listener.clusterStateProcessed(task.source(), newClusterState, newClusterState); - }); - } - } - - /** - * Returns the tasks that are pending. - */ - public List pendingTasks() { - return Arrays.stream(threadPoolExecutor.getPending()).map(pending -> { - assert pending.task instanceof SourcePrioritizedRunnable - : "thread pool executor should only use SourcePrioritizedRunnable instances but found: " - + pending.task.getClass().getName(); - SourcePrioritizedRunnable task = (SourcePrioritizedRunnable) pending.task; - return new PendingClusterTask( - pending.insertionOrder, - pending.priority, - new Text(task.source()), - task.getAgeInMillis(), - pending.executing - ); - }).collect(Collectors.toList()); - } - - /** - * Returns the number of throttled pending tasks. - */ - public long numberOfThrottledPendingTasks() { - return throttlingStats.getTotalThrottledTaskCount(); - } - - /** - * Returns the stats of throttled pending tasks. - */ - public ClusterManagerThrottlingStats getThrottlingStats() { - return throttlingStats; - } - - /** - * Returns the min version of nodes in cluster - */ - public Version getMinNodeVersion() { - return state().getNodes().getMinNodeVersion(); - } - - /** - * Returns the number of currently pending tasks. - */ - public int numberOfPendingTasks() { - return threadPoolExecutor.getNumberOfPendingTasks(); - } - - /** - * Returns the maximum wait time for tasks in the queue - * - * @return A zero time value if the queue is empty, otherwise the time value oldest task waiting in the queue - */ - public TimeValue getMaxTaskWaitTime() { - return threadPoolExecutor.getMaxTaskWaitTime(); - } - - private SafeClusterStateTaskListener safe(ClusterStateTaskListener listener, Supplier contextSupplier) { - if (listener instanceof AckedClusterStateTaskListener) { - return new SafeAckedClusterStateTaskListener((AckedClusterStateTaskListener) listener, contextSupplier, logger); - } else { - return new SafeClusterStateTaskListener(listener, contextSupplier, logger); - } - } - - private static class SafeClusterStateTaskListener implements ClusterStateTaskListener { - private final ClusterStateTaskListener listener; - protected final Supplier context; - private final Logger logger; - - SafeClusterStateTaskListener(ClusterStateTaskListener listener, Supplier context, Logger logger) { - this.listener = listener; - this.context = context; - this.logger = logger; - } - - @Override - public void onFailure(String source, Exception e) { - try (ThreadContext.StoredContext ignore = context.get()) { - listener.onFailure(source, e); - } catch (Exception inner) { - inner.addSuppressed(e); - logger.error(() -> new ParameterizedMessage("exception thrown by listener notifying of failure from [{}]", source), inner); - } - } - - @Override - public void onNoLongerClusterManager(String source) { - try (ThreadContext.StoredContext ignore = context.get()) { - listener.onNoLongerClusterManager(source); - } catch (Exception e) { - logger.error( - () -> new ParameterizedMessage( - "exception thrown by listener while notifying no longer cluster-manager from [{}]", - source - ), - e - ); - } - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - try (ThreadContext.StoredContext ignore = context.get()) { - listener.clusterStateProcessed(source, oldState, newState); - } catch (Exception e) { - logger.error( - () -> new ParameterizedMessage( - "exception thrown by listener while notifying of cluster state processed from [{}], old cluster state:\n" - + "{}\nnew cluster state:\n{}", - source, - oldState, - newState - ), - e - ); - } - } - } - - private static class SafeAckedClusterStateTaskListener extends SafeClusterStateTaskListener implements AckedClusterStateTaskListener { - private final AckedClusterStateTaskListener listener; - private final Logger logger; - - SafeAckedClusterStateTaskListener( - AckedClusterStateTaskListener listener, - Supplier context, - Logger logger - ) { - super(listener, context, logger); - this.listener = listener; - this.logger = logger; - } - - @Override - public boolean mustAck(DiscoveryNode discoveryNode) { - return listener.mustAck(discoveryNode); - } - - @Override - public void onAllNodesAcked(@Nullable Exception e) { - try (ThreadContext.StoredContext ignore = context.get()) { - listener.onAllNodesAcked(e); - } catch (Exception inner) { - inner.addSuppressed(e); - logger.error("exception thrown by listener while notifying on all nodes acked", inner); - } - } - - @Override - public void onAckTimeout() { - try (ThreadContext.StoredContext ignore = context.get()) { - listener.onAckTimeout(); - } catch (Exception e) { - logger.error("exception thrown by listener while notifying on ack timeout", e); - } - } - - @Override - public TimeValue ackTimeout() { - return listener.ackTimeout(); - } - } - - private void logExecutionTime(TimeValue executionTime, String activity, String summary) { - if (executionTime.getMillis() > slowTaskLoggingThreshold.getMillis()) { - logger.warn("took [{}], which is over [{}], to {} for [{}]", executionTime, slowTaskLoggingThreshold, activity, summary); - } else { - logger.debug("took [{}] to {} for [{}]", executionTime, activity, summary); - } - } - - private static class DelegatingAckListener implements Discovery.AckListener { - - private final List listeners; - - private DelegatingAckListener(List listeners) { - this.listeners = listeners; - } - - @Override - public void onCommit(TimeValue commitTime) { - for (Discovery.AckListener listener : listeners) { - listener.onCommit(commitTime); - } - } - - @Override - public void onNodeAck(DiscoveryNode node, @Nullable Exception e) { - for (Discovery.AckListener listener : listeners) { - listener.onNodeAck(node, e); - } - } - } - - private static class AckCountDownListener implements Discovery.AckListener { - - private static final Logger logger = LogManager.getLogger(AckCountDownListener.class); - - private final AckedClusterStateTaskListener ackedTaskListener; - private final CountDown countDown; - private final DiscoveryNode clusterManagerNode; - private final ThreadPool threadPool; - private final long clusterStateVersion; - private volatile Scheduler.Cancellable ackTimeoutCallback; - private Exception lastFailure; - - AckCountDownListener( - AckedClusterStateTaskListener ackedTaskListener, - long clusterStateVersion, - DiscoveryNodes nodes, - ThreadPool threadPool - ) { - this.ackedTaskListener = ackedTaskListener; - this.clusterStateVersion = clusterStateVersion; - this.threadPool = threadPool; - this.clusterManagerNode = nodes.getClusterManagerNode(); - int countDown = 0; - for (DiscoveryNode node : nodes) { - // we always wait for at least the cluster-manager node - if (node.equals(clusterManagerNode) || ackedTaskListener.mustAck(node)) { - countDown++; - } - } - logger.trace("expecting {} acknowledgements for cluster_state update (version: {})", countDown, clusterStateVersion); - this.countDown = new CountDown(countDown + 1); // we also wait for onCommit to be called - } - - @Override - public void onCommit(TimeValue commitTime) { - TimeValue ackTimeout = ackedTaskListener.ackTimeout(); - if (ackTimeout == null) { - ackTimeout = TimeValue.ZERO; - } - final TimeValue timeLeft = TimeValue.timeValueNanos(Math.max(0, ackTimeout.nanos() - commitTime.nanos())); - if (timeLeft.nanos() == 0L) { - onTimeout(); - } else if (countDown.countDown()) { - finish(); - } else { - this.ackTimeoutCallback = threadPool.schedule(this::onTimeout, timeLeft, ThreadPool.Names.GENERIC); - // re-check if onNodeAck has not completed while we were scheduling the timeout - if (countDown.isCountedDown()) { - ackTimeoutCallback.cancel(); - } - } - } - - @Override - public void onNodeAck(DiscoveryNode node, @Nullable Exception e) { - if (node.equals(clusterManagerNode) == false && ackedTaskListener.mustAck(node) == false) { - return; - } - if (e == null) { - logger.trace("ack received from node [{}], cluster_state update (version: {})", node, clusterStateVersion); - } else { - this.lastFailure = e; - logger.debug( - () -> new ParameterizedMessage( - "ack received from node [{}], cluster_state update (version: {})", - node, - clusterStateVersion - ), - e - ); - } - - if (countDown.countDown()) { - finish(); - } - } - - private void finish() { - logger.trace("all expected nodes acknowledged cluster_state update (version: {})", clusterStateVersion); - if (ackTimeoutCallback != null) { - ackTimeoutCallback.cancel(); - } - ackedTaskListener.onAllNodesAcked(lastFailure); - } - - public void onTimeout() { - if (countDown.fastForward()) { - logger.trace("timeout waiting for acknowledgement for cluster_state update (version: {})", clusterStateVersion); - ackedTaskListener.onAckTimeout(); - } - } - } - - private ClusterTasksResult executeTasks(TaskInputs taskInputs, ClusterState previousClusterState, String taskSummary) { - ClusterTasksResult clusterTasksResult; - try { - List inputs = taskInputs.updateTasks.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList()); - clusterTasksResult = taskInputs.executor.execute(previousClusterState, inputs); - if (previousClusterState != clusterTasksResult.resultingState - && previousClusterState.nodes().isLocalNodeElectedClusterManager() - && (clusterTasksResult.resultingState.nodes().isLocalNodeElectedClusterManager() == false)) { - throw new AssertionError("update task submitted to ClusterManagerService cannot remove cluster-manager"); - } - } catch (Exception e) { - logger.trace( - () -> new ParameterizedMessage( - "failed to execute cluster state update (on version: [{}], uuid: [{}]) for [{}]\n{}{}{}", - previousClusterState.version(), - previousClusterState.stateUUID(), - taskSummary, - previousClusterState.nodes(), - previousClusterState.routingTable(), - previousClusterState.getRoutingNodes() - ), // may be expensive => construct message lazily - e - ); - clusterTasksResult = ClusterTasksResult.builder() - .failures(taskInputs.updateTasks.stream().map(updateTask -> updateTask.task)::iterator, e) - .build(previousClusterState); - } - - assert clusterTasksResult.executionResults != null; - assert clusterTasksResult.executionResults.size() == taskInputs.updateTasks.size() : String.format( - Locale.ROOT, - "expected [%d] task result%s but was [%d]", - taskInputs.updateTasks.size(), - taskInputs.updateTasks.size() == 1 ? "" : "s", - clusterTasksResult.executionResults.size() - ); - if (Assertions.ENABLED) { - ClusterTasksResult finalClusterTasksResult = clusterTasksResult; - taskInputs.updateTasks.forEach(updateTask -> { - assert finalClusterTasksResult.executionResults.containsKey(updateTask.task) : "missing task result for " + updateTask; - }); - } - - return clusterTasksResult; - } - - private List getNonFailedTasks(TaskInputs taskInputs, ClusterTasksResult clusterTasksResult) { - return taskInputs.updateTasks.stream().filter(updateTask -> { - assert clusterTasksResult.executionResults.containsKey(updateTask.task) : "missing " + updateTask; - final ClusterStateTaskExecutor.TaskResult taskResult = clusterTasksResult.executionResults.get(updateTask.task); - return taskResult.isSuccess(); - }).collect(Collectors.toList()); - } - - /** - * Represents a set of tasks to be processed together with their executor - */ - private class TaskInputs { - - final List updateTasks; - final ClusterStateTaskExecutor executor; - final Function taskSummaryGenerator; - - TaskInputs( - ClusterStateTaskExecutor executor, - List updateTasks, - final Function taskSummaryGenerator - ) { - this.executor = executor; - this.updateTasks = updateTasks; - this.taskSummaryGenerator = taskSummaryGenerator; - } - - boolean runOnlyWhenClusterManager() { - return executor.runOnlyOnClusterManager(); - } - - void onNoLongerClusterManager() { - updateTasks.forEach(task -> task.listener.onNoLongerClusterManager(task.source())); - } - } - - /** - * Functionality for register task key to cluster manager node. - * - * @param taskKey - task key of task - * @param throttlingEnabled - throttling is enabled for task or not i.e does data node perform retries on it or not - * @return throttling task key which needs to be passed while submitting task to cluster manager - */ - public ClusterManagerTaskThrottler.ThrottlingKey registerClusterManagerTask(String taskKey, boolean throttlingEnabled) { - return clusterManagerTaskThrottler.registerClusterManagerTask(taskKey, throttlingEnabled); - } - - /** - * Submits a batch of cluster state update tasks; submitted updates are guaranteed to be processed together, - * potentially with more tasks of the same executor. - * - * @param source the source of the cluster state update task - * @param tasks a map of update tasks and their corresponding listeners - * @param config the cluster state update task configuration - * @param executor the cluster state update task executor; tasks - * that share the same executor will be executed - * batches on this executor - * @param the type of the cluster state update task state - */ - public void submitStateUpdateTasks( - final String source, - final Map tasks, - final ClusterStateTaskConfig config, - final ClusterStateTaskExecutor executor - ) { - if (!lifecycle.started()) { - return; - } - final ThreadContext threadContext = threadPool.getThreadContext(); - final Supplier supplier = threadContext.newRestorableContext(true); - try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { - ThreadContextAccess.doPrivilegedVoid(threadContext::markAsSystemContext); - - List safeTasks = tasks.entrySet() - .stream() - .map(e -> taskBatcher.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue(), supplier), executor)) - .collect(Collectors.toList()); - taskBatcher.submitTasks(safeTasks, config.timeout()); - } catch (OpenSearchRejectedExecutionException e) { - // ignore cases where we are shutting down..., there is really nothing interesting - // to be done here... - if (!lifecycle.stoppedOrClosed()) { - throw e; - } - } - } - - public ClusterStateStats getClusterStateStats() { - return stateStats; - } - -} diff --git a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java index 3a7988bcd2bda..a1c914c69ce21 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java @@ -49,8 +49,8 @@ import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterManagerService; +import org.opensearch.cluster.service.ClusterManagerServiceTests; import org.opensearch.cluster.service.FakeThreadPoolClusterManagerService; -import org.opensearch.cluster.service.MasterServiceTests; import org.opensearch.common.Randomness; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -549,9 +549,11 @@ public void testJoinUpdateVotingConfigExclusion() throws Exception { ) ); - assertTrue(MasterServiceTests.discoveryState(clusterManagerService).getVotingConfigExclusions().stream().anyMatch(exclusion -> { - return "knownNodeName".equals(exclusion.getNodeName()) && "newNodeId".equals(exclusion.getNodeId()); - })); + assertTrue( + ClusterManagerServiceTests.discoveryState(clusterManagerService).getVotingConfigExclusions().stream().anyMatch(exclusion -> { + return "knownNodeName".equals(exclusion.getNodeName()) && "newNodeId".equals(exclusion.getNodeId()); + }) + ); } private ClusterState buildStateWithVotingConfigExclusion( @@ -777,7 +779,7 @@ public void testConcurrentJoining() { throw new RuntimeException(e); } - assertTrue(MasterServiceTests.discoveryState(clusterManagerService).nodes().isLocalNodeElectedMaster()); + assertTrue(ClusterManagerServiceTests.discoveryState(clusterManagerService).nodes().isLocalNodeElectedMaster()); for (DiscoveryNode successfulNode : successfulNodes) { assertTrue(successfulNode + " joined cluster", clusterStateHasNode(successfulNode)); assertFalse(successfulNode + " voted for cluster-manager", coordinator.missingJoinVoteFrom(successfulNode)); @@ -861,11 +863,11 @@ public void testJoinFailsWhenDecommissioned() { } private boolean isLocalNodeElectedMaster() { - return MasterServiceTests.discoveryState(clusterManagerService).nodes().isLocalNodeElectedMaster(); + return ClusterManagerServiceTests.discoveryState(clusterManagerService).nodes().isLocalNodeElectedMaster(); } private boolean clusterStateHasNode(DiscoveryNode node) { - return node.equals(MasterServiceTests.discoveryState(clusterManagerService).nodes().get(node.getId())); + return node.equals(ClusterManagerServiceTests.discoveryState(clusterManagerService).nodes().get(node.getId())); } private static ClusterState initialStateWithDecommissionedAttribute( diff --git a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java b/server/src/test/java/org/opensearch/cluster/service/ClusterManagerServiceTests.java similarity index 92% rename from server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java rename to server/src/test/java/org/opensearch/cluster/service/ClusterManagerServiceTests.java index bb9e34d93431f..d1b06d24cc797 100644 --- a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/ClusterManagerServiceTests.java @@ -108,14 +108,14 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class MasterServiceTests extends OpenSearchTestCase { +public class ClusterManagerServiceTests extends OpenSearchTestCase { private static ThreadPool threadPool; private static long timeDiffInMillis; @BeforeClass public static void createThreadPool() { - threadPool = new TestThreadPool(MasterServiceTests.class.getName()) { + threadPool = new TestThreadPool(ClusterManagerServiceTests.class.getName()) { @Override public long preciseRelativeTimeInNanos() { return timeDiffInMillis * TimeValue.NSEC_PER_MSEC; @@ -149,7 +149,7 @@ private ClusterManagerService createClusterManagerService( if (metricsRegistryOptional != null && metricsRegistryOptional.isPresent()) { clusterManagerService = new ClusterManagerService( Settings.builder() - .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName()) + .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), ClusterManagerServiceTests.class.getSimpleName()) .put(Node.NODE_NAME_SETTING.getKey(), "test_node") .build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), @@ -159,7 +159,7 @@ private ClusterManagerService createClusterManagerService( } else { clusterManagerService = new ClusterManagerService( Settings.builder() - .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName()) + .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), ClusterManagerServiceTests.class.getSimpleName()) .put(Node.NODE_NAME_SETTING.getKey(), "test_node") .build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), @@ -167,7 +167,7 @@ private ClusterManagerService createClusterManagerService( ); } - final ClusterState initialClusterState = ClusterState.builder(new ClusterName(MasterServiceTests.class.getSimpleName())) + final ClusterState initialClusterState = ClusterState.builder(new ClusterName(ClusterManagerServiceTests.class.getSimpleName())) .nodes( DiscoveryNodes.builder() .add(localNode) @@ -377,11 +377,11 @@ public void onFailure(String source, Exception e) {} @TestLogging(value = "org.opensearch.cluster.service:TRACE", reason = "to ensure that we log cluster state events on TRACE level") public void testClusterStateUpdateLoggingWithTraceEnabled() throws Exception { - try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(MasterService.class))) { + try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(ClusterManagerService.class))) { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test1 start", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.TRACE, "executing cluster state update for [test1]" ) @@ -389,7 +389,7 @@ public void testClusterStateUpdateLoggingWithTraceEnabled() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test1 computation", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.DEBUG, "took [1s] to compute cluster state update for [test1]" ) @@ -397,7 +397,7 @@ public void testClusterStateUpdateLoggingWithTraceEnabled() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test1 notification", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.DEBUG, "took [0s] to notify listeners on unchanged cluster state for [test1]" ) @@ -406,7 +406,7 @@ public void testClusterStateUpdateLoggingWithTraceEnabled() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test2 start", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.TRACE, "executing cluster state update for [test2]" ) @@ -414,7 +414,7 @@ public void testClusterStateUpdateLoggingWithTraceEnabled() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test2 failure", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.TRACE, "failed to execute cluster state update (on version: [*], uuid: [*]) for [test2]*" ) @@ -422,7 +422,7 @@ public void testClusterStateUpdateLoggingWithTraceEnabled() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test2 computation", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.DEBUG, "took [2s] to compute cluster state update for [test2]" ) @@ -430,7 +430,7 @@ public void testClusterStateUpdateLoggingWithTraceEnabled() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test2 notification", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.DEBUG, "took [0s] to notify listeners on unchanged cluster state for [test2]" ) @@ -439,7 +439,7 @@ public void testClusterStateUpdateLoggingWithTraceEnabled() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test3 start", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.TRACE, "executing cluster state update for [test3]" ) @@ -447,7 +447,7 @@ public void testClusterStateUpdateLoggingWithTraceEnabled() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test3 computation", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.DEBUG, "took [3s] to compute cluster state update for [test3]" ) @@ -455,7 +455,7 @@ public void testClusterStateUpdateLoggingWithTraceEnabled() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test3 notification", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.DEBUG, "took [4s] to notify listeners on successful publication of cluster state (version: *, uuid: *) for [test3]" ) @@ -464,7 +464,7 @@ public void testClusterStateUpdateLoggingWithTraceEnabled() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test4", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.TRACE, "executing cluster state update for [test4]" ) @@ -542,96 +542,96 @@ public void onFailure(String source, Exception e) { @TestLogging(value = "org.opensearch.cluster.service:DEBUG", reason = "to ensure that we log cluster state events on DEBUG level") public void testClusterStateUpdateLoggingWithDebugEnabled() throws Exception { - try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(MasterService.class))) { + try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(ClusterManagerService.class))) { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test1 start", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.DEBUG, - "executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test1]" + "executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.ClusterManagerServiceTests, count:1 and sample tasks: test1]" ) ); mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test1 computation", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.DEBUG, - "took [1s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test1]" + "took [1s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.ClusterManagerServiceTests, count:1 and sample tasks: test1]" ) ); mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test1 notification", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.DEBUG, - "took [0s] to notify listeners on unchanged cluster state for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test1]" + "took [0s] to notify listeners on unchanged cluster state for [Tasks batched with key: org.opensearch.cluster.service.ClusterManagerServiceTests, count:1 and sample tasks: test1]" ) ); mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test2 start", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.DEBUG, - "executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test2]" + "executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.ClusterManagerServiceTests, count:1 and sample tasks: test2]" ) ); mockAppender.addExpectation( new MockLogAppender.UnseenEventExpectation( "test2 failure", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.DEBUG, - "failed to execute cluster state update (on version: [*], uuid: [*]) for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test2]*" + "failed to execute cluster state update (on version: [*], uuid: [*]) for [Tasks batched with key: org.opensearch.cluster.service.ClusterManagerServiceTests, count:1 and sample tasks: test2]*" ) ); mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test2 computation", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.DEBUG, - "took [2s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test2]" + "took [2s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.ClusterManagerServiceTests, count:1 and sample tasks: test2]" ) ); mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test2 notification", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.DEBUG, - "took [0s] to notify listeners on unchanged cluster state for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test2]" + "took [0s] to notify listeners on unchanged cluster state for [Tasks batched with key: org.opensearch.cluster.service.ClusterManagerServiceTests, count:1 and sample tasks: test2]" ) ); mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test3 start", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.DEBUG, - "executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test3]" + "executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.ClusterManagerServiceTests, count:1 and sample tasks: test3]" ) ); mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test3 computation", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.DEBUG, - "took [3s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test3]" + "took [3s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.ClusterManagerServiceTests, count:1 and sample tasks: test3]" ) ); mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test3 notification", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.DEBUG, - "took [4s] to notify listeners on successful publication of cluster state (version: *, uuid: *) for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test3]" + "took [4s] to notify listeners on successful publication of cluster state (version: *, uuid: *) for [Tasks batched with key: org.opensearch.cluster.service.ClusterManagerServiceTests, count:1 and sample tasks: test3]" ) ); mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test4", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.DEBUG, - "executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test4]" + "executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.ClusterManagerServiceTests, count:1 and sample tasks: test4]" ) ); @@ -909,13 +909,13 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } public void testThrottlingForTaskSubmission() throws InterruptedException { - MasterService masterService = createClusterManagerService(true); + ClusterManagerService clusterManagerService = createClusterManagerService(true); int throttlingLimit = randomIntBetween(1, 10); int taskId = 1; final CyclicBarrier barrier = new CyclicBarrier(2); final CountDownLatch latch = new CountDownLatch(1); final String taskName = "test"; - ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = masterService.registerClusterManagerTask(taskName, true); + ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = clusterManagerService.registerClusterManagerTask(taskName, true); class Task { private final int id; @@ -945,7 +945,7 @@ public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { } } - masterService.clusterManagerTaskThrottler.updateLimit(taskName, throttlingLimit); + clusterManagerService.clusterManagerTaskThrottler.updateLimit(taskName, throttlingLimit); final ClusterStateTaskListener listener = new ClusterStateTaskListener() { @Override @@ -958,7 +958,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS TaskExecutor executor = new TaskExecutor(); // submit one task which will be execution, post that will submit throttlingLimit tasks. try { - masterService.submitStateUpdateTask( + clusterManagerService.submitStateUpdateTask( taskName, new Task(taskId++), ClusterStateTaskConfig.build(randomFrom(Priority.values())), @@ -973,7 +973,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS for (int i = 0; i < throttlingLimit; i++) { try { - masterService.submitStateUpdateTask( + clusterManagerService.submitStateUpdateTask( taskName, new Task(taskId++), ClusterStateTaskConfig.build(randomFrom(Priority.values())), @@ -988,7 +988,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS // we have one task in execution and tasks in queue so next task should throttled. final AtomicReference assertionRef = new AtomicReference<>(); try { - masterService.submitStateUpdateTask( + clusterManagerService.submitStateUpdateTask( taskName, new Task(taskId++), ClusterStateTaskConfig.build(randomFrom(Priority.values())), @@ -999,11 +999,11 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS assertionRef.set(e); } assertNotNull(assertionRef.get()); - masterService.close(); + clusterManagerService.close(); } public void testThrottlingForMultipleTaskTypes() throws InterruptedException { - MasterService masterService = createClusterManagerService(true); + ClusterManagerService clusterManagerService = createClusterManagerService(true); int throttlingLimitForTask1 = randomIntBetween(1, 5); int throttlingLimitForTask2 = randomIntBetween(1, 5); int throttlingLimitForTask3 = randomIntBetween(1, 5); @@ -1014,9 +1014,9 @@ public void testThrottlingForMultipleTaskTypes() throws InterruptedException { String task2 = "Task2"; String task3 = "Task3"; - ClusterManagerTaskThrottler.ThrottlingKey throttlingKey1 = masterService.registerClusterManagerTask(task1, true); - ClusterManagerTaskThrottler.ThrottlingKey throttlingKey2 = masterService.registerClusterManagerTask(task2, true); - ClusterManagerTaskThrottler.ThrottlingKey throttlingKey3 = masterService.registerClusterManagerTask(task3, true); + ClusterManagerTaskThrottler.ThrottlingKey throttlingKey1 = clusterManagerService.registerClusterManagerTask(task1, true); + ClusterManagerTaskThrottler.ThrottlingKey throttlingKey2 = clusterManagerService.registerClusterManagerTask(task2, true); + ClusterManagerTaskThrottler.ThrottlingKey throttlingKey3 = clusterManagerService.registerClusterManagerTask(task3, true); class Task {} class Task1 extends Task {} class Task2 extends Task {} @@ -1071,8 +1071,8 @@ public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey( } // configuring limits for Task1 and Task3. All task submission of Task2 should pass. - masterService.clusterManagerTaskThrottler.updateLimit(task1, throttlingLimitForTask1); - masterService.clusterManagerTaskThrottler.updateLimit(task3, throttlingLimitForTask3); + clusterManagerService.clusterManagerTaskThrottler.updateLimit(task1, throttlingLimitForTask1); + clusterManagerService.clusterManagerTaskThrottler.updateLimit(task3, throttlingLimitForTask3); final CountDownLatch latch = new CountDownLatch(numberOfTask1 + numberOfTask2 + numberOfTask3); AtomicInteger throttledTask1 = new AtomicInteger(); AtomicInteger throttledTask2 = new AtomicInteger(); @@ -1112,7 +1112,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS @Override public void run() { try { - masterService.submitStateUpdateTask( + clusterManagerService.submitStateUpdateTask( task1, new Task1(), ClusterStateTaskConfig.build(randomFrom(Priority.values())), @@ -1132,7 +1132,7 @@ public void run() { @Override public void run() { try { - masterService.submitStateUpdateTask( + clusterManagerService.submitStateUpdateTask( task2, new Task2(), ClusterStateTaskConfig.build(randomFrom(Priority.values())), @@ -1151,7 +1151,7 @@ public void run() { @Override public void run() { try { - masterService.submitStateUpdateTask( + clusterManagerService.submitStateUpdateTask( task3, new Task3(), ClusterStateTaskConfig.build(randomFrom(Priority.values()), new TimeValue(0)), @@ -1175,7 +1175,7 @@ public void run() { assertEquals(numberOfTask2, succeededTask2.get()); assertEquals(0, throttledTask2.get()); assertEquals(numberOfTask3, throttledTask3.get() + timedOutTask3.get() + succeededTask3.get()); - masterService.close(); + clusterManagerService.close(); } public void testBlockingCallInClusterStateTaskListenerFails() throws InterruptedException { @@ -1224,11 +1224,11 @@ public void onFailure(String source, Exception e) {} @TestLogging(value = "org.opensearch.cluster.service:WARN", reason = "to ensure that we log cluster state events on WARN level") public void testLongClusterStateUpdateLogging() throws Exception { - try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(MasterService.class))) { + try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(ClusterManagerService.class))) { mockAppender.addExpectation( new MockLogAppender.UnseenEventExpectation( "test1 shouldn't log because it was fast enough", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.WARN, "*took*test1*" ) @@ -1236,31 +1236,31 @@ public void testLongClusterStateUpdateLogging() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test2", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.WARN, - "*took [*], which is over [10s], to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test2]" + "*took [*], which is over [10s], to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.ClusterManagerServiceTests, count:1 and sample tasks: test2]" ) ); mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test3", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.WARN, - "*took [*], which is over [10s], to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test3]" + "*took [*], which is over [10s], to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.ClusterManagerServiceTests, count:1 and sample tasks: test3]" ) ); mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test4", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.WARN, - "*took [*], which is over [10s], to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test4]" + "*took [*], which is over [10s], to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.ClusterManagerServiceTests, count:1 and sample tasks: test4]" ) ); mockAppender.addExpectation( new MockLogAppender.UnseenEventExpectation( "test5 should not log despite publishing slowly", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.WARN, "*took*test5*" ) @@ -1269,7 +1269,7 @@ public void testLongClusterStateUpdateLogging() throws Exception { try ( ClusterManagerService clusterManagerService = new ClusterManagerService( Settings.builder() - .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName()) + .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), ClusterManagerServiceTests.class.getSimpleName()) .put(Node.NODE_NAME_SETTING.getKey(), "test_node") .build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), @@ -1285,7 +1285,9 @@ public void testLongClusterStateUpdateLogging() throws Exception { emptySet(), Version.CURRENT ); - final ClusterState initialClusterState = ClusterState.builder(new ClusterName(MasterServiceTests.class.getSimpleName())) + final ClusterState initialClusterState = ClusterState.builder( + new ClusterName(ClusterManagerServiceTests.class.getSimpleName()) + ) .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId())) .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK) .build(); @@ -1426,20 +1428,20 @@ public void onFailure(String source, Exception e) { @TestLogging(value = "org.opensearch.cluster.service:WARN", reason = "to ensure that we log failed cluster state events on WARN level") public void testLongClusterStateUpdateLoggingForFailedPublication() throws Exception { - try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(MasterService.class))) { + try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(ClusterManagerService.class))) { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test1 should log due to slow and failing publication", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.WARN, - "took [*] and then failed to publish updated cluster state (version: *, uuid: *) for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test1]:*" + "took [*] and then failed to publish updated cluster state (version: *, uuid: *) for [Tasks batched with key: org.opensearch.cluster.service.ClusterManagerServiceTests, count:1 and sample tasks: test1]:*" ) ); try ( ClusterManagerService clusterManagerService = new ClusterManagerService( Settings.builder() - .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName()) + .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), ClusterManagerServiceTests.class.getSimpleName()) .put(Node.NODE_NAME_SETTING.getKey(), "test_node") .build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), @@ -1455,7 +1457,9 @@ public void testLongClusterStateUpdateLoggingForFailedPublication() throws Excep emptySet(), Version.CURRENT ); - final ClusterState initialClusterState = ClusterState.builder(new ClusterName(MasterServiceTests.class.getSimpleName())) + final ClusterState initialClusterState = ClusterState.builder( + new ClusterName(ClusterManagerServiceTests.class.getSimpleName()) + ) .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId())) .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK) .build(); @@ -1529,7 +1533,7 @@ public void testAcking() throws InterruptedException { try ( ClusterManagerService clusterManagerService = new ClusterManagerService( Settings.builder() - .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName()) + .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), ClusterManagerServiceTests.class.getSimpleName()) .put(Node.NODE_NAME_SETTING.getKey(), "test_node") .build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), @@ -1538,7 +1542,7 @@ public void testAcking() throws InterruptedException { ) ) { - final ClusterState initialClusterState = ClusterState.builder(new ClusterName(MasterServiceTests.class.getSimpleName())) + final ClusterState initialClusterState = ClusterState.builder(new ClusterName(ClusterManagerServiceTests.class.getSimpleName())) .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3).localNodeId(node1.getId()).masterNodeId(node1.getId())) .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK) .build(); @@ -1658,16 +1662,13 @@ public void onAckTimeout() { verify(clusterStatePublishHistogram, times(1)).record(anyDouble()); } - public void testDeprecatedMasterServiceUpdateTaskThreadName() { - Thread.currentThread().setName(MasterService.MASTER_UPDATE_THREAD_NAME); - assertThat(MasterService.assertClusterManagerUpdateThread(), is(Boolean.TRUE)); - assertThrows(AssertionError.class, () -> MasterService.assertNotClusterManagerUpdateThread("test")); - Thread.currentThread().setName(MasterService.CLUSTER_MANAGER_UPDATE_THREAD_NAME); - assertThat(MasterService.assertClusterManagerUpdateThread(), is(Boolean.TRUE)); - assertThrows(AssertionError.class, () -> MasterService.assertNotClusterManagerUpdateThread("test")); + public void testUpdateTaskThreadName() { + Thread.currentThread().setName(ClusterManagerService.CLUSTER_MANAGER_UPDATE_THREAD_NAME); + assertThat(ClusterManagerService.assertClusterManagerUpdateThread(), is(Boolean.TRUE)); + assertThrows(AssertionError.class, () -> ClusterManagerService.assertNotClusterManagerUpdateThread("test")); Thread.currentThread().setName("test not cluster manager update thread"); - assertThat(MasterService.assertNotClusterManagerUpdateThread("test"), is(Boolean.TRUE)); - assertThrows(AssertionError.class, () -> MasterService.assertClusterManagerUpdateThread()); + assertThat(ClusterManagerService.assertNotClusterManagerUpdateThread("test"), is(Boolean.TRUE)); + assertThrows(AssertionError.class, () -> ClusterManagerService.assertClusterManagerUpdateThread()); } @Timeout(millis = 5_000) diff --git a/server/src/test/java/org/opensearch/cluster/service/ClusterServiceTests.java b/server/src/test/java/org/opensearch/cluster/service/ClusterServiceTests.java index 4d88683826af7..b371bbdf1dd2f 100644 --- a/server/src/test/java/org/opensearch/cluster/service/ClusterServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/ClusterServiceTests.java @@ -32,7 +32,7 @@ public void testDeprecatedGetMasterServiceBWC() { threadPool ) ) { - MasterService masterService = clusterService.getMasterService(); + ClusterManagerService masterService = clusterService.getMasterService(); ClusterManagerService clusterManagerService = clusterService.getClusterManagerService(); assertThat(masterService, equalTo(clusterManagerService)); } diff --git a/server/src/test/java/org/opensearch/cluster/service/MasterServiceRenamedSettingTests.java b/server/src/test/java/org/opensearch/cluster/service/MasterServiceRenamedSettingTests.java index acf089dc43b56..020dafa1559ce 100644 --- a/server/src/test/java/org/opensearch/cluster/service/MasterServiceRenamedSettingTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/MasterServiceRenamedSettingTests.java @@ -33,8 +33,8 @@ public void testClusterManagerServiceSettingsExist() { "Both 'cluster.service.slow_cluster_manager_task_logging_threshold' and its predecessor should be supported built-in settings", settings.containsAll( Arrays.asList( - MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, - MasterService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING + ClusterManagerService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, + ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING ) ) ); @@ -45,8 +45,8 @@ public void testClusterManagerServiceSettingsExist() { */ public void testSettingFallback() { assertEquals( - MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY), - MasterService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY) + ClusterManagerService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY), + ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY) ); } @@ -57,11 +57,11 @@ public void testSettingGetValue() { Settings settings = Settings.builder().put("cluster.service.slow_cluster_manager_task_logging_threshold", "9s").build(); assertEquals( TimeValue.timeValueSeconds(9), - MasterService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings) + ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings) ); assertEquals( - MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.getDefault(Settings.EMPTY), - MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings) + ClusterManagerService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.getDefault(Settings.EMPTY), + ClusterManagerService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings) ); } @@ -73,10 +73,10 @@ public void testSettingGetValueWithFallback() { Settings settings = Settings.builder().put("cluster.service.slow_master_task_logging_threshold", "8s").build(); assertEquals( TimeValue.timeValueSeconds(8), - MasterService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings) + ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings) ); - assertSettingDeprecationsAndWarnings(new Setting[] { MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING }); + assertSettingDeprecationsAndWarnings(new Setting[] { ClusterManagerService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING }); } /** @@ -89,11 +89,11 @@ public void testSettingGetValueWhenBothAreConfigured() { .build(); assertEquals( TimeValue.timeValueSeconds(9), - MasterService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings) + ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings) ); - assertEquals(TimeValue.timeValueSeconds(8), MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings)); - assertSettingDeprecationsAndWarnings(new Setting[] { MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING }); + assertEquals(TimeValue.timeValueSeconds(8), ClusterManagerService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings)); + assertSettingDeprecationsAndWarnings(new Setting[] { ClusterManagerService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING }); } } diff --git a/test/framework/src/main/java/org/opensearch/test/ClusterServiceUtils.java b/test/framework/src/main/java/org/opensearch/test/ClusterServiceUtils.java index f0c0e9bc2d589..147f186c74d25 100644 --- a/test/framework/src/main/java/org/opensearch/test/ClusterServiceUtils.java +++ b/test/framework/src/main/java/org/opensearch/test/ClusterServiceUtils.java @@ -49,7 +49,6 @@ import org.opensearch.cluster.service.ClusterApplierService; import org.opensearch.cluster.service.ClusterManagerService; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.cluster.service.MasterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.node.Node; @@ -92,13 +91,13 @@ public static ClusterManagerService createClusterManagerService(ThreadPool threa /** @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #createClusterManagerService(ThreadPool, ClusterState)} */ @Deprecated - public static MasterService createMasterService(ThreadPool threadPool, ClusterState initialClusterState) { + public static ClusterManagerService createMasterService(ThreadPool threadPool, ClusterState initialClusterState) { return createClusterManagerService(threadPool, initialClusterState); } /** @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #createClusterManagerService(ThreadPool, DiscoveryNode)} */ @Deprecated - public static MasterService createMasterService(ThreadPool threadPool, DiscoveryNode localNode) { + public static ClusterManagerService createMasterService(ThreadPool threadPool, DiscoveryNode localNode) { return createClusterManagerService(threadPool, localNode); }