diff --git a/core/src/main/java/kafka/autobalancer/AutoBalancerListener.java b/core/src/main/java/kafka/autobalancer/AutoBalancerListener.java index 0dd16bfbb5..75ff94ca98 100644 --- a/core/src/main/java/kafka/autobalancer/AutoBalancerListener.java +++ b/core/src/main/java/kafka/autobalancer/AutoBalancerListener.java @@ -12,9 +12,9 @@ package kafka.autobalancer; import kafka.autobalancer.common.AutoBalancerConstants; -import kafka.autobalancer.detector.AnomalyDetector; import kafka.autobalancer.listeners.BrokerStatusListener; import kafka.autobalancer.listeners.ClusterStatusListenerRegistry; +import kafka.autobalancer.listeners.LeaderChangeListener; import kafka.autobalancer.listeners.TopicPartitionStatusListener; import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord; @@ -46,17 +46,12 @@ public class AutoBalancerListener implements RaftClient.Listener messageBatch) { @@ -138,15 +133,20 @@ public void handleLoadSnapshot(SnapshotReader reader) { @Override public void handleLeaderChange(LeaderAndEpoch leader) { queue.append(() -> { - if (leader.leaderId().isEmpty()) { - return; - } - boolean isLeader = leader.isLeader(nodeId); - this.anomalyDetector.onLeaderChanged(isLeader); - this.loadRetriever.onLeaderChanged(isLeader); + handleLeaderChange0(leader); }); } + void handleLeaderChange0(LeaderAndEpoch leader) { + if (leader.leaderId().isEmpty()) { + return; + } + boolean isLeader = leader.isLeader(nodeId); + for (LeaderChangeListener listener : this.registry.leaderChangeListeners()) { + listener.onLeaderChanged(isLeader); + } + } + @Override public void beginShutdown() { this.queue.beginShutdown("AutoBalancerListenerShutdown"); diff --git a/core/src/main/java/kafka/autobalancer/AutoBalancerManager.java b/core/src/main/java/kafka/autobalancer/AutoBalancerManager.java index 93c34b8da2..3127a80387 100644 --- a/core/src/main/java/kafka/autobalancer/AutoBalancerManager.java +++ b/core/src/main/java/kafka/autobalancer/AutoBalancerManager.java @@ -12,17 +12,18 @@ package kafka.autobalancer; import kafka.autobalancer.config.AutoBalancerControllerConfig; -import kafka.autobalancer.detector.AnomalyDetector; -import kafka.autobalancer.detector.AnomalyDetectorBuilder; +import kafka.autobalancer.detector.AbstractAnomalyDetector; +import kafka.autobalancer.detector.AnomalyDetectorImpl; import kafka.autobalancer.executor.ActionExecutorService; import kafka.autobalancer.executor.ControllerActionExecutorService; -import kafka.autobalancer.goals.Goal; import kafka.autobalancer.listeners.BrokerStatusListener; import kafka.autobalancer.listeners.ClusterStatusListenerRegistry; +import kafka.autobalancer.listeners.LeaderChangeListener; import kafka.autobalancer.listeners.TopicPartitionStatusListener; import kafka.autobalancer.model.RecordClusterModel; import kafka.autobalancer.services.AutoBalancerService; +import org.apache.kafka.common.Reconfigurable; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.utils.ConfigUtils; import org.apache.kafka.common.utils.Time; @@ -32,18 +33,20 @@ import com.automq.stream.utils.LogContext; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; public class AutoBalancerManager extends AutoBalancerService { protected final Time time; protected final Map configs; protected final QuorumController quorumController; protected final KafkaRaftClient raftClient; + protected final List reconfigurables = new ArrayList<>(); protected LoadRetriever loadRetriever; - protected AnomalyDetector anomalyDetector; + protected AbstractAnomalyDetector anomalyDetector; protected ActionExecutorService actionExecutorService; protected volatile boolean enabled; @@ -62,33 +65,27 @@ public AutoBalancerManager(Time time, Map configs, QuorumController quorum protected void init() { AutoBalancerControllerConfig config = new AutoBalancerControllerConfig(configs, false); this.enabled = config.getBoolean(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ENABLE); - RecordClusterModel clusterModel = new RecordClusterModel(new LogContext(String.format("[ClusterModel id=%d] ", quorumController.nodeId()))); + int nodeId = quorumController.nodeId(); + RecordClusterModel clusterModel = new RecordClusterModel(new LogContext(String.format("[ClusterModel id=%d] ", nodeId))); this.loadRetriever = new LoadRetriever(config, quorumController, clusterModel, - new LogContext(String.format("[LoadRetriever id=%d] ", quorumController.nodeId()))); + new LogContext(String.format("[LoadRetriever id=%d] ", nodeId))); this.actionExecutorService = new ControllerActionExecutorService(quorumController, - new LogContext(String.format("[ExecutionManager id=%d] ", quorumController.nodeId()))); + new LogContext(String.format("[ExecutionManager id=%d] ", nodeId))); this.actionExecutorService.start(); - this.anomalyDetector = new AnomalyDetectorBuilder() - .logContext(new LogContext(String.format("[AnomalyDetector id=%d] ", quorumController.nodeId()))) - .detectIntervalMs(config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ANOMALY_DETECT_INTERVAL_MS)) - .maxTolerateMetricsDelayMs(config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ACCEPTED_METRICS_DELAY_MS)) - .executionConcurrency(config.getInt(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_CONCURRENCY)) - .executionIntervalMs(config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS)) - .clusterModel(clusterModel) - .executor(this.actionExecutorService) - .addGoals(config.getConfiguredInstances(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_GOALS, Goal.class)) - .excludedBrokers(config.getList(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXCLUDE_BROKER_IDS) - .stream().map(Integer::parseInt).collect(Collectors.toSet())) - .excludedTopics(config.getList(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXCLUDE_TOPICS)) - .build(); + this.anomalyDetector = new AnomalyDetectorImpl(config.originals(), + new LogContext(String.format("[AnomalyDetector id=%d] ", nodeId)), clusterModel, actionExecutorService); + + this.reconfigurables.add(anomalyDetector); ClusterStatusListenerRegistry registry = new ClusterStatusListenerRegistry(); registry.register((BrokerStatusListener) clusterModel); registry.register((TopicPartitionStatusListener) clusterModel); registry.register((BrokerStatusListener) this.actionExecutorService); - registry.register(this.loadRetriever); - raftClient.register(new AutoBalancerListener(quorumController.nodeId(), time, registry, this.loadRetriever, this.anomalyDetector)); + registry.register((LeaderChangeListener) this.anomalyDetector); + registry.register((BrokerStatusListener) this.loadRetriever); + registry.register((LeaderChangeListener) this.loadRetriever); + raftClient.register(new AutoBalancerListener(nodeId, time, registry)); } @Override @@ -126,7 +123,9 @@ public void validateReconfiguration(Map configs) throws ConfigExcepti if (objectConfigs.containsKey(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ENABLE)) { ConfigUtils.getBoolean(objectConfigs, AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ENABLE); } - this.anomalyDetector.validateReconfiguration(objectConfigs); + for (Reconfigurable reconfigurable : reconfigurables) { + reconfigurable.validateReconfiguration(objectConfigs); + } } catch (ConfigException e) { throw e; } catch (Exception e) { @@ -149,7 +148,9 @@ public void reconfigure(Map configs) { logger.info("AutoBalancerManager paused."); } } - this.anomalyDetector.reconfigure(objectConfigs); + for (Reconfigurable reconfigurable : reconfigurables) { + reconfigurable.reconfigure(objectConfigs); + } } @Override diff --git a/core/src/main/java/kafka/autobalancer/LoadRetriever.java b/core/src/main/java/kafka/autobalancer/LoadRetriever.java index 19db7a7137..7db48ad765 100644 --- a/core/src/main/java/kafka/autobalancer/LoadRetriever.java +++ b/core/src/main/java/kafka/autobalancer/LoadRetriever.java @@ -19,6 +19,7 @@ import kafka.autobalancer.config.StaticAutoBalancerConfig; import kafka.autobalancer.config.StaticAutoBalancerConfigUtils; import kafka.autobalancer.listeners.BrokerStatusListener; +import kafka.autobalancer.listeners.LeaderChangeListener; import kafka.autobalancer.metricsreporter.metric.AutoBalancerMetrics; import kafka.autobalancer.metricsreporter.metric.BrokerMetrics; import kafka.autobalancer.metricsreporter.metric.MetricSerde; @@ -70,7 +71,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -public class LoadRetriever extends AbstractResumableService implements BrokerStatusListener { +public class LoadRetriever extends AbstractResumableService implements BrokerStatusListener, LeaderChangeListener { public static final Random RANDOM = new Random(); private final Map bootstrapServerMap; private volatile int metricReporterTopicPartition; @@ -163,7 +164,7 @@ protected Properties buildConsumerProps(String bootstrapServer) { return consumerProps; } - static class BrokerEndpoints { + public static class BrokerEndpoints { private final int brokerId; private Set endpoints = new HashSet<>(); @@ -491,6 +492,11 @@ private TopicAction refreshAssignment() { return TopicAction.NONE; } + public boolean isLeader() { + return isLeader; + } + + @Override public void onLeaderChanged(boolean isLeader) { this.leaderEpochInitialized = true; this.isLeader = isLeader; diff --git a/core/src/main/java/kafka/autobalancer/detector/AbstractAnomalyDetector.java b/core/src/main/java/kafka/autobalancer/detector/AbstractAnomalyDetector.java new file mode 100644 index 0000000000..a8ab8797ac --- /dev/null +++ b/core/src/main/java/kafka/autobalancer/detector/AbstractAnomalyDetector.java @@ -0,0 +1,27 @@ +/* + * Copyright 2024, AutoMQ HK Limited. + * + * The use of this file is governed by the Business Source License, + * as detailed in the file "/LICENSE.S3Stream" included in this repository. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package kafka.autobalancer.detector; + +import kafka.autobalancer.services.AbstractResumableService; + +import org.apache.kafka.common.Reconfigurable; + +import com.automq.stream.utils.LogContext; + +public abstract class AbstractAnomalyDetector extends AbstractResumableService implements Reconfigurable { + + public AbstractAnomalyDetector(LogContext logContext) { + super(logContext); + } + + +} diff --git a/core/src/main/java/kafka/autobalancer/detector/AnomalyDetectorBuilder.java b/core/src/main/java/kafka/autobalancer/detector/AnomalyDetectorBuilder.java deleted file mode 100644 index 5db24f0b95..0000000000 --- a/core/src/main/java/kafka/autobalancer/detector/AnomalyDetectorBuilder.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Copyright 2024, AutoMQ HK Limited. - * - * The use of this file is governed by the Business Source License, - * as detailed in the file "/LICENSE.S3Stream" included in this repository. - * - * As of the Change Date specified in that file, in accordance with - * the Business Source License, use of this software will be governed - * by the Apache License, Version 2.0 - */ - -package kafka.autobalancer.detector; - -import kafka.autobalancer.executor.ActionExecutorService; -import kafka.autobalancer.goals.Goal; -import kafka.autobalancer.model.ClusterModel; - -import com.automq.stream.utils.LogContext; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -public class AnomalyDetectorBuilder { - private final List goalsByPriority = new ArrayList<>(); - private final Set excludedBrokers = new HashSet<>(); - private final Set excludedTopics = new HashSet<>(); - private LogContext logContext = null; - private ClusterModel clusterModel = null; - private ActionExecutorService executor = null; - private long detectIntervalMs = 60000; - private long maxTolerateMetricsDelayMs = 30000; - private int executionConcurrency = 50; - private long executionIntervalMs = 5000; - - public AnomalyDetectorBuilder() { - - } - - public AnomalyDetectorBuilder logContext(LogContext logContext) { - this.logContext = logContext; - return this; - } - - public AnomalyDetectorBuilder addGoal(Goal goal) { - this.goalsByPriority.add(goal); - return this; - } - - public AnomalyDetectorBuilder addGoals(List goals) { - this.goalsByPriority.addAll(goals); - return this; - } - - public AnomalyDetectorBuilder excludedBroker(Integer excludedBroker) { - this.excludedBrokers.add(excludedBroker); - return this; - } - - public AnomalyDetectorBuilder excludedBrokers(Collection excludedBrokers) { - this.excludedBrokers.addAll(excludedBrokers); - return this; - } - - public AnomalyDetectorBuilder excludedTopic(String excludedTopic) { - this.excludedTopics.add(excludedTopic); - return this; - } - - public AnomalyDetectorBuilder excludedTopics(Collection excludedTopics) { - this.excludedTopics.addAll(excludedTopics); - return this; - } - - public AnomalyDetectorBuilder clusterModel(ClusterModel clusterModel) { - this.clusterModel = clusterModel; - return this; - } - - public AnomalyDetectorBuilder executor(ActionExecutorService executor) { - this.executor = executor; - return this; - } - - public AnomalyDetectorBuilder detectIntervalMs(long detectIntervalMs) { - this.detectIntervalMs = detectIntervalMs; - return this; - } - - public AnomalyDetectorBuilder maxTolerateMetricsDelayMs(long maxTolerateMetricsDelayMs) { - this.maxTolerateMetricsDelayMs = maxTolerateMetricsDelayMs; - return this; - } - - public AnomalyDetectorBuilder executionConcurrency(int executionConcurrency) { - this.executionConcurrency = executionConcurrency; - return this; - } - - public AnomalyDetectorBuilder executionIntervalMs(long executionIntervalMs) { - this.executionIntervalMs = executionIntervalMs; - return this; - } - - public AnomalyDetector build() { - if (logContext == null) { - logContext = new LogContext("[AnomalyDetector] "); - } - if (clusterModel == null) { - throw new IllegalArgumentException("ClusterModel must be set"); - } - if (executor == null) { - throw new IllegalArgumentException("Executor must be set"); - } - if (goalsByPriority.isEmpty()) { - throw new IllegalArgumentException("At least one goal must be set"); - } - return new AnomalyDetector(logContext, detectIntervalMs, maxTolerateMetricsDelayMs, executionConcurrency, - executionIntervalMs, clusterModel, executor, goalsByPriority, excludedBrokers, excludedTopics); - } -} diff --git a/core/src/main/java/kafka/autobalancer/detector/AnomalyDetector.java b/core/src/main/java/kafka/autobalancer/detector/AnomalyDetectorImpl.java similarity index 89% rename from core/src/main/java/kafka/autobalancer/detector/AnomalyDetector.java rename to core/src/main/java/kafka/autobalancer/detector/AnomalyDetectorImpl.java index 6164b58c7f..27f647973b 100644 --- a/core/src/main/java/kafka/autobalancer/detector/AnomalyDetector.java +++ b/core/src/main/java/kafka/autobalancer/detector/AnomalyDetectorImpl.java @@ -18,11 +18,11 @@ import kafka.autobalancer.executor.ActionExecutorService; import kafka.autobalancer.goals.Goal; import kafka.autobalancer.goals.GoalUtils; +import kafka.autobalancer.listeners.LeaderChangeListener; import kafka.autobalancer.model.BrokerUpdater; import kafka.autobalancer.model.ClusterModel; import kafka.autobalancer.model.ClusterModelSnapshot; import kafka.autobalancer.model.TopicPartitionReplicaUpdater; -import kafka.autobalancer.services.AbstractResumableService; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; @@ -46,40 +46,31 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; -public class AnomalyDetector extends AbstractResumableService { +public class AnomalyDetectorImpl extends AbstractAnomalyDetector implements LeaderChangeListener { private static final double MAX_PARTITION_REASSIGNMENT_RATIO = 0.5; private static final int MAX_REASSIGNMENT_SOURCE_NODE_COUNT = 10; private static final long MAX_REASSIGNMENT_EXECUTION_TIME_MS = 60000; private final ClusterModel clusterModel; private final ScheduledExecutorService executorService; private final ActionExecutorService actionExecutor; - private final Lock configChangeLock; - private volatile List goalsByPriority; - private volatile Set excludedBrokers; - private volatile Set excludedTopics; - private volatile long detectInterval; - private volatile long maxTolerateMetricsDelayMs; - private volatile int executionConcurrency; - private volatile long executionIntervalMs; + private final Lock configChangeLock = new ReentrantLock(); + private List goalsByPriority; + private Set excludedBrokers; + private Set excludedTopics; + private long detectInterval; + private long maxTolerateMetricsDelayMs; + private int executionConcurrency; + private long executionIntervalMs; private volatile boolean isLeader = false; private volatile Map slowBrokers = new HashMap<>(); - protected AnomalyDetector(LogContext logContext, long detectIntervalMs, long maxTolerateMetricsDelayMs, int executionConcurrency, - long executionIntervalMs, ClusterModel clusterModel, ActionExecutorService actionExecutor, - List goals, Set excludedBrokers, Set excludedTopics) { + public AnomalyDetectorImpl(Map configs, LogContext logContext, ClusterModel clusterModel, ActionExecutorService actionExecutor) { super(logContext); - this.configChangeLock = new ReentrantLock(); - this.detectInterval = detectIntervalMs; - this.maxTolerateMetricsDelayMs = maxTolerateMetricsDelayMs; - this.executionConcurrency = executionConcurrency; - this.executionIntervalMs = executionIntervalMs; + this.configure(configs); this.clusterModel = clusterModel; this.actionExecutor = actionExecutor; this.executorService = Executors.newScheduledThreadPool(2, new AutoBalancerThreadFactory("anomaly-detector")); - this.goalsByPriority = goals; Collections.sort(this.goalsByPriority); - this.excludedBrokers = excludedBrokers; - this.excludedTopics = excludedTopics; this.executorService.schedule(this::detect, detectInterval, TimeUnit.MILLISECONDS); this.executorService.scheduleAtFixedRate(() -> { if (isRunning()) { @@ -123,6 +114,11 @@ public Set getExcludedBrokers() { return this.excludedBrokers; } + public boolean isLeader() { + return this.isLeader; + } + + @Override public void onLeaderChanged(boolean isLeader) { this.isLeader = isLeader; } @@ -131,7 +127,32 @@ List goals() { return new ArrayList<>(goalsByPriority); } - public void validateReconfiguration(Map configs) throws ConfigException { + @Override + public void configure(Map rawConfigs) { + AutoBalancerControllerConfig config = new AutoBalancerControllerConfig(rawConfigs, false); + this.maxTolerateMetricsDelayMs = config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ACCEPTED_METRICS_DELAY_MS); + this.detectInterval = config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ANOMALY_DETECT_INTERVAL_MS); + this.executionConcurrency = config.getInt(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_CONCURRENCY); + this.excludedBrokers = config.getList(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXCLUDE_BROKER_IDS) + .stream().map(Integer::parseInt).collect(Collectors.toSet()); + ClusterStats.getInstance().updateExcludedBrokers(this.excludedBrokers); + this.excludedTopics = new HashSet<>(config.getList(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXCLUDE_TOPICS)); + this.executionIntervalMs = config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS); + List goals = config.getConfiguredInstances(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_GOALS, Goal.class); + Collections.sort(goals); + this.goalsByPriority = goals; + } + + @Override + public Set reconfigurableConfigs() { + // NOT USED + return Collections.emptySet(); + } + + @SuppressWarnings("NPathComplexity") + @Override + public void validateReconfiguration(Map rawConfigs) throws ConfigException { + Map configs = new HashMap<>(rawConfigs); try { if (configs.containsKey(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ACCEPTED_METRICS_DELAY_MS)) { long metricsDelay = ConfigUtils.getInteger(configs, AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ACCEPTED_METRICS_DELAY_MS); @@ -187,7 +208,9 @@ private void validateGoalsReconfiguration(Map configs) { } } - public void reconfigure(Map configs) { + @Override + public void reconfigure(Map rawConfigs) { + Map configs = new HashMap<>(rawConfigs); configChangeLock.lock(); try { if (configs.containsKey(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ACCEPTED_METRICS_DELAY_MS)) { @@ -247,6 +270,7 @@ private boolean isRunnable() { return this.running.get() && this.isLeader; } + @SuppressWarnings("NPathComplexity") long detect0() throws Exception { long detectInterval; Set excludedBrokers; diff --git a/core/src/main/java/kafka/autobalancer/goals/Goal.java b/core/src/main/java/kafka/autobalancer/goals/Goal.java index 8f7a941b48..a25c49e6ba 100644 --- a/core/src/main/java/kafka/autobalancer/goals/Goal.java +++ b/core/src/main/java/kafka/autobalancer/goals/Goal.java @@ -16,8 +16,7 @@ import kafka.autobalancer.model.BrokerUpdater; import kafka.autobalancer.model.ClusterModelSnapshot; -import org.apache.kafka.common.Configurable; -import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.Reconfigurable; import com.automq.stream.utils.LogContext; @@ -30,7 +29,7 @@ import java.util.Set; import java.util.stream.Collectors; -public interface Goal extends Configurable, Comparable { +public interface Goal extends Reconfigurable, Comparable { Logger LOGGER = new LogContext().logger(AutoBalancerConstants.AUTO_BALANCER_LOGGER_CLAZZ); List doOptimize(List eligibleBrokers, ClusterModelSnapshot cluster, @@ -90,6 +89,4 @@ default boolean isEligibleBroker(BrokerUpdater.Broker broker) { default int compareTo(Goal other) { return Boolean.compare(other.isHardGoal(), this.isHardGoal()); } - - void validateReconfiguration(Map configs) throws ConfigException; } diff --git a/core/src/main/java/kafka/autobalancer/goals/NetworkInUsageDistributionGoal.java b/core/src/main/java/kafka/autobalancer/goals/NetworkInUsageDistributionGoal.java index a979827ddc..cfdbd2eb29 100644 --- a/core/src/main/java/kafka/autobalancer/goals/NetworkInUsageDistributionGoal.java +++ b/core/src/main/java/kafka/autobalancer/goals/NetworkInUsageDistributionGoal.java @@ -18,8 +18,10 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.utils.ConfigUtils; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; public class NetworkInUsageDistributionGoal extends AbstractNetworkUsageDistributionGoal { @@ -53,6 +55,12 @@ public double weight() { return 1.0; } + @Override + public Set reconfigurableConfigs() { + // NOT USED + return Collections.emptySet(); + } + @Override public void validateReconfiguration(Map configs) throws ConfigException { Map objectConfigs = new HashMap<>(configs); @@ -70,8 +78,29 @@ public void validateReconfiguration(Map configs) throws ConfigExcepti "Value must be in between 0 and 1"); } } + if (configs.containsKey(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_IN_TRIVIAL_CHANGE_RATIO)) { + double trivialRatio = ConfigUtils.getDouble(objectConfigs, AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_IN_TRIVIAL_CHANGE_RATIO); + if (trivialRatio < 0 || trivialRatio > 1) { + throw new ConfigException(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_IN_TRIVIAL_CHANGE_RATIO, trivialRatio, + "Value must be in between 0 and 1"); + } + } } catch (Exception e) { throw new ConfigException("Reconfiguration validation error", e); } } + + @Override + public void reconfigure(Map configs) { + Map objectConfigs = new HashMap<>(configs); + if (configs.containsKey(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_IN_USAGE_DISTRIBUTION_DETECT_THRESHOLD)) { + this.usageDetectThreshold = ConfigUtils.getLong(objectConfigs, AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_IN_USAGE_DISTRIBUTION_DETECT_THRESHOLD); + } + if (configs.containsKey(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_IN_DISTRIBUTION_DETECT_AVG_DEVIATION)) { + this.usageAvgDeviationRatio = ConfigUtils.getDouble(objectConfigs, AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_IN_DISTRIBUTION_DETECT_AVG_DEVIATION); + } + if (configs.containsKey(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_IN_TRIVIAL_CHANGE_RATIO)) { + this.usageTrivialRatio = ConfigUtils.getDouble(objectConfigs, AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_IN_TRIVIAL_CHANGE_RATIO); + } + } } diff --git a/core/src/main/java/kafka/autobalancer/goals/NetworkOutUsageDistributionGoal.java b/core/src/main/java/kafka/autobalancer/goals/NetworkOutUsageDistributionGoal.java index 1a9687795c..ff30d9fb94 100644 --- a/core/src/main/java/kafka/autobalancer/goals/NetworkOutUsageDistributionGoal.java +++ b/core/src/main/java/kafka/autobalancer/goals/NetworkOutUsageDistributionGoal.java @@ -18,8 +18,10 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.utils.ConfigUtils; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; public class NetworkOutUsageDistributionGoal extends AbstractNetworkUsageDistributionGoal { @@ -53,6 +55,12 @@ public double weight() { return 1.0; } + @Override + public Set reconfigurableConfigs() { + // NOT USED + return Collections.emptySet(); + } + @Override public void validateReconfiguration(Map configs) throws ConfigException { Map objectConfigs = new HashMap<>(configs); @@ -70,8 +78,29 @@ public void validateReconfiguration(Map configs) throws ConfigExcepti "Value must be in between 0 and 1"); } } + if (configs.containsKey(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_OUT_TRIVIAL_CHANGE_RATIO)) { + double trivialChangeRatio = ConfigUtils.getDouble(objectConfigs, AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_OUT_TRIVIAL_CHANGE_RATIO); + if (trivialChangeRatio < 0 || trivialChangeRatio > 1) { + throw new ConfigException(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_OUT_TRIVIAL_CHANGE_RATIO, trivialChangeRatio, + "Value must be in between 0 and 1"); + } + } } catch (Exception e) { throw new ConfigException("Reconfiguration validation error", e); } } + + @Override + public void reconfigure(Map configs) { + Map objectConfigs = new HashMap<>(configs); + if (configs.containsKey(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_OUT_USAGE_DISTRIBUTION_DETECT_THRESHOLD)) { + this.usageDetectThreshold = ConfigUtils.getLong(objectConfigs, AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_OUT_USAGE_DISTRIBUTION_DETECT_THRESHOLD); + } + if (configs.containsKey(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_OUT_DISTRIBUTION_DETECT_AVG_DEVIATION)) { + this.usageAvgDeviationRatio = ConfigUtils.getDouble(objectConfigs, AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_OUT_DISTRIBUTION_DETECT_AVG_DEVIATION); + } + if (configs.containsKey(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_OUT_TRIVIAL_CHANGE_RATIO)) { + this.usageTrivialRatio = ConfigUtils.getDouble(objectConfigs, AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_OUT_TRIVIAL_CHANGE_RATIO); + } + } } diff --git a/core/src/main/java/kafka/autobalancer/listeners/ClusterStatusListenerRegistry.java b/core/src/main/java/kafka/autobalancer/listeners/ClusterStatusListenerRegistry.java index ea8efa8166..3ec8b302e2 100644 --- a/core/src/main/java/kafka/autobalancer/listeners/ClusterStatusListenerRegistry.java +++ b/core/src/main/java/kafka/autobalancer/listeners/ClusterStatusListenerRegistry.java @@ -15,9 +15,14 @@ import java.util.List; public class ClusterStatusListenerRegistry { + private final List leaderChangeListeners = new ArrayList<>(); private final List brokerListeners = new ArrayList<>(); private final List topicPartitionListeners = new ArrayList<>(); + public void register(LeaderChangeListener listener) { + leaderChangeListeners.add(listener); + } + public void register(BrokerStatusListener listener) { brokerListeners.add(listener); } @@ -26,6 +31,10 @@ public void register(TopicPartitionStatusListener listener) { topicPartitionListeners.add(listener); } + public List leaderChangeListeners() { + return leaderChangeListeners; + } + public List brokerListeners() { return brokerListeners; } diff --git a/core/src/main/java/kafka/autobalancer/listeners/LeaderChangeListener.java b/core/src/main/java/kafka/autobalancer/listeners/LeaderChangeListener.java index 6badd8b715..971dba2dab 100644 --- a/core/src/main/java/kafka/autobalancer/listeners/LeaderChangeListener.java +++ b/core/src/main/java/kafka/autobalancer/listeners/LeaderChangeListener.java @@ -12,7 +12,5 @@ package kafka.autobalancer.listeners; public interface LeaderChangeListener { - void onBecomeLeader(); - - void onResign(); + void onLeaderChanged(boolean isLeader); } diff --git a/core/src/test/java/kafka/autobalancer/AutoBalancerListenerTest.java b/core/src/test/java/kafka/autobalancer/AutoBalancerListenerTest.java new file mode 100644 index 0000000000..0d24631119 --- /dev/null +++ b/core/src/test/java/kafka/autobalancer/AutoBalancerListenerTest.java @@ -0,0 +1,51 @@ +/* + * Copyright 2024, AutoMQ HK Limited. + * + * The use of this file is governed by the Business Source License, + * as detailed in the file "/LICENSE.S3Stream" included in this repository. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package kafka.autobalancer; + +import kafka.autobalancer.config.AutoBalancerControllerConfig; +import kafka.autobalancer.detector.AnomalyDetectorImpl; +import kafka.autobalancer.executor.ActionExecutorService; +import kafka.autobalancer.listeners.BrokerStatusListener; +import kafka.autobalancer.listeners.ClusterStatusListenerRegistry; +import kafka.autobalancer.listeners.LeaderChangeListener; +import kafka.autobalancer.model.ClusterModel; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.controller.Controller; +import org.apache.kafka.raft.LeaderAndEpoch; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.util.Collections; +import java.util.OptionalInt; + +public class AutoBalancerListenerTest { + @Test + public void testReconfigure() { + ClusterStatusListenerRegistry registry = new ClusterStatusListenerRegistry(); + AnomalyDetectorImpl anomalyDetector = new AnomalyDetectorImpl(Collections.emptyMap(), null, Mockito.mock(ClusterModel.class), Mockito.mock(ActionExecutorService.class)); + LoadRetriever loadRetriever = new LoadRetriever(Mockito.mock(AutoBalancerControllerConfig.class), Mockito.mock(Controller.class), Mockito.mock(ClusterModel.class)); + + Assertions.assertFalse(anomalyDetector.isLeader()); + Assertions.assertFalse(loadRetriever.isLeader()); + + registry.register(anomalyDetector); + registry.register((BrokerStatusListener) loadRetriever); + registry.register((LeaderChangeListener) loadRetriever); + AutoBalancerListener autoBalancerListener = new AutoBalancerListener(0, Time.SYSTEM, registry); + autoBalancerListener.handleLeaderChange0(new LeaderAndEpoch(OptionalInt.of(0), 0)); + Assertions.assertTrue(anomalyDetector.isLeader()); + Assertions.assertTrue(loadRetriever.isLeader()); + } +} diff --git a/core/src/test/java/kafka/autobalancer/detector/AnomalyDetectorTest.java b/core/src/test/java/kafka/autobalancer/detector/AnomalyDetectorImplTest.java similarity index 85% rename from core/src/test/java/kafka/autobalancer/detector/AnomalyDetectorTest.java rename to core/src/test/java/kafka/autobalancer/detector/AnomalyDetectorImplTest.java index 6c802ab9e0..10e43468eb 100644 --- a/core/src/test/java/kafka/autobalancer/detector/AnomalyDetectorTest.java +++ b/core/src/test/java/kafka/autobalancer/detector/AnomalyDetectorImplTest.java @@ -24,7 +24,6 @@ import kafka.autobalancer.config.AutoBalancerControllerConfig; import kafka.autobalancer.executor.ActionExecutorService; import kafka.autobalancer.goals.AbstractResourceUsageDistributionGoal; -import kafka.autobalancer.goals.Goal; import kafka.autobalancer.goals.NetworkInUsageDistributionGoal; import kafka.autobalancer.goals.NetworkOutUsageDistributionGoal; import kafka.autobalancer.model.ClusterModel; @@ -52,30 +51,11 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -public class AnomalyDetectorTest { +public class AnomalyDetectorImplTest { @Test public void testFilterAndMergeActions() { - AnomalyDetector anomalyDetector = new AnomalyDetectorBuilder() - .clusterModel(Mockito.mock(ClusterModel.class)) - .addGoal(Mockito.mock(Goal.class)) - .executor(new ActionExecutorService() { - @Override - public void start() { - - } - - @Override - public void shutdown() { - - } - - @Override - public CompletableFuture execute(List actions) { - return CompletableFuture.completedFuture(null); - } - }) - .build(); + AnomalyDetectorImpl anomalyDetector = new AnomalyDetectorImpl(Collections.emptyMap(), null, Mockito.mock(ClusterModel.class), Mockito.mock(ActionExecutorService.class)); List actions = List.of( new Action(ActionType.MOVE, new TopicPartition("topic-1", 0), 1, 11), @@ -131,26 +111,7 @@ public CompletableFuture execute(List actions) { @Test public void testGroupActions() { - AnomalyDetector anomalyDetector = new AnomalyDetectorBuilder() - .clusterModel(Mockito.mock(ClusterModel.class)) - .addGoal(Mockito.mock(Goal.class)) - .executor(new ActionExecutorService() { - @Override - public void start() { - - } - - @Override - public void shutdown() { - - } - - @Override - public CompletableFuture execute(List actions) { - return CompletableFuture.completedFuture(null); - } - }) - .build(); + AnomalyDetectorImpl anomalyDetector = new AnomalyDetectorImpl(Collections.emptyMap(), null, Mockito.mock(ClusterModel.class), Mockito.mock(ActionExecutorService.class)); List actions = List.of( new Action(ActionType.MOVE, new TopicPartition("topic-1", 0), 0, 1), @@ -274,38 +235,31 @@ public void testSchedulingTimeCost() { } } - Map configs = new AutoBalancerControllerConfig(Collections.emptyMap(), false).originals(); - Goal goal0 = new NetworkInUsageDistributionGoal(); - Goal goal1 = new NetworkOutUsageDistributionGoal(); - goal0.configure(configs); - goal1.configure(configs); - List actionList = new ArrayList<>(); - AnomalyDetector detector = new AnomalyDetectorBuilder() - .clusterModel(clusterModel) - .detectIntervalMs(Long.MAX_VALUE) - .executionIntervalMs(0) - .executionConcurrency(100) - .addGoal(goal0) - .addGoal(goal1) - .executor(new ActionExecutorService() { - @Override - public void start() { - - } - - @Override - public void shutdown() { - - } - - @Override - public CompletableFuture execute(List actions) { - actionList.addAll(actions); - return CompletableFuture.completedFuture(null); - } - }) - .build(); + AnomalyDetectorImpl detector = new AnomalyDetectorImpl(Map.of( + AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ANOMALY_DETECT_INTERVAL_MS, Long.MAX_VALUE, + AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS, 0, + AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_CONCURRENCY, 100, + AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_GOALS, new StringJoiner(",") + .add(NetworkInUsageDistributionGoal.class.getName()) + .add(NetworkOutUsageDistributionGoal.class.getName()).toString() + ), null, clusterModel, new ActionExecutorService() { + @Override + public void start() { + + } + + @Override + public void shutdown() { + + } + + @Override + public CompletableFuture execute(List actions) { + actionList.addAll(actions); + return CompletableFuture.completedFuture(null); + } + }); TimerUtil timerUtil = new TimerUtil(); detector.onLeaderChanged(true); @@ -359,15 +313,9 @@ private Map generateRandomMetrics(Random r) { @Test public void testReconfigure() { - AutoBalancerControllerConfig config = new AutoBalancerControllerConfig(Collections.emptyMap(), false); - List goals = config.getConfiguredInstances(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_GOALS, Goal.class); - - AnomalyDetector detector = new AnomalyDetectorBuilder() - .clusterModel(Mockito.mock(ClusterModel.class)) - .detectIntervalMs(Long.MAX_VALUE) - .addGoals(goals) - .executor(Mockito.mock(ActionExecutorService.class)) - .build(); + AnomalyDetectorImpl detector = new AnomalyDetectorImpl(Map.of( + AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ANOMALY_DETECT_INTERVAL_MS, Long.MAX_VALUE + ), null, Mockito.mock(ClusterModel.class), Mockito.mock(ActionExecutorService.class)); Assertions.assertEquals(2, detector.goals().size()); Assertions.assertEquals(NetworkInUsageDistributionGoal.class, detector.goals().get(0).getClass()); Assertions.assertEquals(1024 * 1024, ((AbstractResourceUsageDistributionGoal) detector.goals().get(0)).getUsageDetectThreshold());