Skip to content

Commit

Permalink
feat(auto_balancer): use uniformed interface for reconfiguration (#2062)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Oct 14, 2024
1 parent 2d9c523 commit 4ec7fda
Show file tree
Hide file tree
Showing 13 changed files with 272 additions and 276 deletions.
26 changes: 13 additions & 13 deletions core/src/main/java/kafka/autobalancer/AutoBalancerListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
package kafka.autobalancer;

import kafka.autobalancer.common.AutoBalancerConstants;
import kafka.autobalancer.detector.AnomalyDetector;
import kafka.autobalancer.listeners.BrokerStatusListener;
import kafka.autobalancer.listeners.ClusterStatusListenerRegistry;
import kafka.autobalancer.listeners.LeaderChangeListener;
import kafka.autobalancer.listeners.TopicPartitionStatusListener;

import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
Expand Down Expand Up @@ -46,17 +46,12 @@ public class AutoBalancerListener implements RaftClient.Listener<ApiMessageAndVe
private final Logger logger;
private final KafkaEventQueue queue;
private final ClusterStatusListenerRegistry registry;
private final LoadRetriever loadRetriever;
private final AnomalyDetector anomalyDetector;

public AutoBalancerListener(int nodeId, Time time, ClusterStatusListenerRegistry registry,
LoadRetriever loadRetriever, AnomalyDetector anomalyDetector) {
public AutoBalancerListener(int nodeId, Time time, ClusterStatusListenerRegistry registry) {
this.nodeId = nodeId;
this.logger = new LogContext(String.format("[AutoBalancerListener id=%d] ", nodeId)).logger(AutoBalancerConstants.AUTO_BALANCER_LOGGER_CLAZZ);
this.queue = new KafkaEventQueue(time, new org.apache.kafka.common.utils.LogContext(), "auto-balancer-listener-");
this.registry = registry;
this.loadRetriever = loadRetriever;
this.anomalyDetector = anomalyDetector;
}

private void handleMessageBatch(Batch<ApiMessageAndVersion> messageBatch) {
Expand Down Expand Up @@ -138,15 +133,20 @@ public void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
@Override
public void handleLeaderChange(LeaderAndEpoch leader) {
queue.append(() -> {
if (leader.leaderId().isEmpty()) {
return;
}
boolean isLeader = leader.isLeader(nodeId);
this.anomalyDetector.onLeaderChanged(isLeader);
this.loadRetriever.onLeaderChanged(isLeader);
handleLeaderChange0(leader);
});
}

void handleLeaderChange0(LeaderAndEpoch leader) {
if (leader.leaderId().isEmpty()) {
return;
}
boolean isLeader = leader.isLeader(nodeId);
for (LeaderChangeListener listener : this.registry.leaderChangeListeners()) {
listener.onLeaderChanged(isLeader);
}
}

@Override
public void beginShutdown() {
this.queue.beginShutdown("AutoBalancerListenerShutdown");
Expand Down
51 changes: 26 additions & 25 deletions core/src/main/java/kafka/autobalancer/AutoBalancerManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,18 @@
package kafka.autobalancer;

import kafka.autobalancer.config.AutoBalancerControllerConfig;
import kafka.autobalancer.detector.AnomalyDetector;
import kafka.autobalancer.detector.AnomalyDetectorBuilder;
import kafka.autobalancer.detector.AbstractAnomalyDetector;
import kafka.autobalancer.detector.AnomalyDetectorImpl;
import kafka.autobalancer.executor.ActionExecutorService;
import kafka.autobalancer.executor.ControllerActionExecutorService;
import kafka.autobalancer.goals.Goal;
import kafka.autobalancer.listeners.BrokerStatusListener;
import kafka.autobalancer.listeners.ClusterStatusListenerRegistry;
import kafka.autobalancer.listeners.LeaderChangeListener;
import kafka.autobalancer.listeners.TopicPartitionStatusListener;
import kafka.autobalancer.model.RecordClusterModel;
import kafka.autobalancer.services.AutoBalancerService;

import org.apache.kafka.common.Reconfigurable;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.ConfigUtils;
import org.apache.kafka.common.utils.Time;
Expand All @@ -32,18 +33,20 @@

import com.automq.stream.utils.LogContext;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class AutoBalancerManager extends AutoBalancerService {
protected final Time time;
protected final Map<?, ?> configs;
protected final QuorumController quorumController;
protected final KafkaRaftClient<ApiMessageAndVersion> raftClient;
protected final List<Reconfigurable> reconfigurables = new ArrayList<>();
protected LoadRetriever loadRetriever;
protected AnomalyDetector anomalyDetector;
protected AbstractAnomalyDetector anomalyDetector;
protected ActionExecutorService actionExecutorService;
protected volatile boolean enabled;

Expand All @@ -62,33 +65,27 @@ public AutoBalancerManager(Time time, Map<?, ?> configs, QuorumController quorum
protected void init() {
AutoBalancerControllerConfig config = new AutoBalancerControllerConfig(configs, false);
this.enabled = config.getBoolean(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ENABLE);
RecordClusterModel clusterModel = new RecordClusterModel(new LogContext(String.format("[ClusterModel id=%d] ", quorumController.nodeId())));
int nodeId = quorumController.nodeId();
RecordClusterModel clusterModel = new RecordClusterModel(new LogContext(String.format("[ClusterModel id=%d] ", nodeId)));
this.loadRetriever = new LoadRetriever(config, quorumController, clusterModel,
new LogContext(String.format("[LoadRetriever id=%d] ", quorumController.nodeId())));
new LogContext(String.format("[LoadRetriever id=%d] ", nodeId)));
this.actionExecutorService = new ControllerActionExecutorService(quorumController,
new LogContext(String.format("[ExecutionManager id=%d] ", quorumController.nodeId())));
new LogContext(String.format("[ExecutionManager id=%d] ", nodeId)));
this.actionExecutorService.start();

this.anomalyDetector = new AnomalyDetectorBuilder()
.logContext(new LogContext(String.format("[AnomalyDetector id=%d] ", quorumController.nodeId())))
.detectIntervalMs(config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ANOMALY_DETECT_INTERVAL_MS))
.maxTolerateMetricsDelayMs(config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ACCEPTED_METRICS_DELAY_MS))
.executionConcurrency(config.getInt(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_CONCURRENCY))
.executionIntervalMs(config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS))
.clusterModel(clusterModel)
.executor(this.actionExecutorService)
.addGoals(config.getConfiguredInstances(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_GOALS, Goal.class))
.excludedBrokers(config.getList(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXCLUDE_BROKER_IDS)
.stream().map(Integer::parseInt).collect(Collectors.toSet()))
.excludedTopics(config.getList(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXCLUDE_TOPICS))
.build();
this.anomalyDetector = new AnomalyDetectorImpl(config.originals(),
new LogContext(String.format("[AnomalyDetector id=%d] ", nodeId)), clusterModel, actionExecutorService);

this.reconfigurables.add(anomalyDetector);

ClusterStatusListenerRegistry registry = new ClusterStatusListenerRegistry();
registry.register((BrokerStatusListener) clusterModel);
registry.register((TopicPartitionStatusListener) clusterModel);
registry.register((BrokerStatusListener) this.actionExecutorService);
registry.register(this.loadRetriever);
raftClient.register(new AutoBalancerListener(quorumController.nodeId(), time, registry, this.loadRetriever, this.anomalyDetector));
registry.register((LeaderChangeListener) this.anomalyDetector);
registry.register((BrokerStatusListener) this.loadRetriever);
registry.register((LeaderChangeListener) this.loadRetriever);
raftClient.register(new AutoBalancerListener(nodeId, time, registry));
}

@Override
Expand Down Expand Up @@ -126,7 +123,9 @@ public void validateReconfiguration(Map<String, ?> configs) throws ConfigExcepti
if (objectConfigs.containsKey(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ENABLE)) {
ConfigUtils.getBoolean(objectConfigs, AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ENABLE);
}
this.anomalyDetector.validateReconfiguration(objectConfigs);
for (Reconfigurable reconfigurable : reconfigurables) {
reconfigurable.validateReconfiguration(objectConfigs);
}
} catch (ConfigException e) {
throw e;
} catch (Exception e) {
Expand All @@ -149,7 +148,9 @@ public void reconfigure(Map<String, ?> configs) {
logger.info("AutoBalancerManager paused.");
}
}
this.anomalyDetector.reconfigure(objectConfigs);
for (Reconfigurable reconfigurable : reconfigurables) {
reconfigurable.reconfigure(objectConfigs);
}
}

@Override
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/java/kafka/autobalancer/LoadRetriever.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import kafka.autobalancer.config.StaticAutoBalancerConfig;
import kafka.autobalancer.config.StaticAutoBalancerConfigUtils;
import kafka.autobalancer.listeners.BrokerStatusListener;
import kafka.autobalancer.listeners.LeaderChangeListener;
import kafka.autobalancer.metricsreporter.metric.AutoBalancerMetrics;
import kafka.autobalancer.metricsreporter.metric.BrokerMetrics;
import kafka.autobalancer.metricsreporter.metric.MetricSerde;
Expand Down Expand Up @@ -70,7 +71,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LoadRetriever extends AbstractResumableService implements BrokerStatusListener {
public class LoadRetriever extends AbstractResumableService implements BrokerStatusListener, LeaderChangeListener {
public static final Random RANDOM = new Random();
private final Map<Integer, BrokerEndpoints> bootstrapServerMap;
private volatile int metricReporterTopicPartition;
Expand Down Expand Up @@ -163,7 +164,7 @@ protected Properties buildConsumerProps(String bootstrapServer) {
return consumerProps;
}

static class BrokerEndpoints {
public static class BrokerEndpoints {
private final int brokerId;
private Set<String> endpoints = new HashSet<>();

Expand Down Expand Up @@ -491,6 +492,11 @@ private TopicAction refreshAssignment() {
return TopicAction.NONE;
}

public boolean isLeader() {
return isLeader;
}

@Override
public void onLeaderChanged(boolean isLeader) {
this.leaderEpochInitialized = true;
this.isLeader = isLeader;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package kafka.autobalancer.detector;

import kafka.autobalancer.services.AbstractResumableService;

import org.apache.kafka.common.Reconfigurable;

import com.automq.stream.utils.LogContext;

public abstract class AbstractAnomalyDetector extends AbstractResumableService implements Reconfigurable {

public AbstractAnomalyDetector(LogContext logContext) {
super(logContext);
}


}

This file was deleted.

Loading

0 comments on commit 4ec7fda

Please sign in to comment.