Skip to content

Commit

Permalink
Remove version check when talking to controllers
Browse files Browse the repository at this point in the history
This change is targeted for a release that supports 3.9.0 or later

Signed-off-by: Gantigmaa Selenge <[email protected]>
  • Loading branch information
tinaselenge committed Dec 5, 2024
1 parent 316fc14 commit 36dc1a8
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.strimzi.operator.cluster.operator.resource.events.KubernetesRestartEventPublisher;
import io.strimzi.operator.cluster.operator.resource.kubernetes.PodOperator;
import io.strimzi.operator.common.AdminClientProvider;
import io.strimzi.operator.common.Annotations;
import io.strimzi.operator.common.BackOff;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.ReconciliationLogger;
Expand Down Expand Up @@ -209,19 +208,10 @@ private boolean maybeInitBrokerAdminClient() {
* Initializes controllerAdminClient if it has not been initialized yet
* @return true if the creation of AC succeeded, false otherwise
*/
private boolean maybeInitControllerAdminClient(String currentVersion) {
private boolean maybeInitControllerAdminClient() {
if (this.controllerAdminClient == null) {
// Prior to 3.9.0, Kafka did not support directly connecting to controllers nodes
// via Kafka Admin API when running in KRaft mode.
// Therefore, use brokers to initialise adminClient for quorum health check
// when the version is older than 3.9.0.
try {
if (KafkaVersion.compareDottedVersions(currentVersion, "3.9.0") >= 0) {
this.controllerAdminClient = controllerAdminClient(nodes);
} else {
this.controllerAdminClient = brokerAdminClient(Set.of());

}
this.controllerAdminClient = controllerAdminClient(nodes);
} catch (ForceableProblem | FatalProblem e) {
LOGGER.warnCr(reconciliation, "Failed to create controllerAdminClient.", e);
return false;
Expand Down Expand Up @@ -455,11 +445,9 @@ private void restartIfNecessary(NodeRef nodeRef, RestartContext restartContext)
// change and the desired roles still apply.
boolean isBroker = Labels.booleanLabel(pod, Labels.STRIMZI_BROKER_ROLE_LABEL, nodeRef.broker());
boolean isController = Labels.booleanLabel(pod, Labels.STRIMZI_CONTROLLER_ROLE_LABEL, nodeRef.controller());
// This is relevant when creating admin client for controllers
String currentVersion = Annotations.stringAnnotation(pod, KafkaCluster.ANNO_STRIMZI_IO_KAFKA_VERSION, "0.0.0", null);

try {
checkIfRestartOrReconfigureRequired(nodeRef, isController, isBroker, restartContext, currentVersion);
checkIfRestartOrReconfigureRequired(nodeRef, isController, isBroker, restartContext);
if (restartContext.forceRestart) {
LOGGER.debugCr(reconciliation, "Pod {} can be rolled now", nodeRef);
restartAndAwaitReadiness(pod, operationTimeoutMs, TimeUnit.MILLISECONDS, restartContext);
Expand Down Expand Up @@ -589,7 +577,7 @@ private void markRestartContextWithForceRestart(RestartContext restartContext) {
* Determine whether the pod should be restarted, or the broker reconfigured.
*/
@SuppressWarnings("checkstyle:CyclomaticComplexity")
private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isController, boolean isBroker, RestartContext restartContext, String currentVersion) throws ForceableProblem, InterruptedException, FatalProblem, UnforceableProblem {
private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isController, boolean isBroker, RestartContext restartContext) throws ForceableProblem, InterruptedException, FatalProblem {
RestartReasons reasonToRestartPod = restartContext.restartReasons;
if (restartContext.podStuck && !reasonToRestartPod.contains(RestartReason.POD_HAS_OLD_REVISION)) {
// If the pod is unschedulable then deleting it, or trying to open an Admin client to it will make no difference
Expand All @@ -612,10 +600,13 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont

// if it is a pure controller, initialise the admin client specifically for controllers
if (isController && !isBroker) {
if (!maybeInitControllerAdminClient(currentVersion)) {
handleFailedAdminClientForController(nodeRef, restartContext, reasonToRestartPod, currentVersion);
if (!maybeInitControllerAdminClient()) {
LOGGER.infoCr(reconciliation, "Pod {} needs to be restarted, because it does not seem to responding to connection attempts", nodeRef);
reasonToRestartPod.add(RestartReason.POD_UNRESPONSIVE);
markRestartContextWithForceRestart(restartContext);
return;
}
LOGGER.debugCr(reconciliation, "Initialising KafkaQuorumCheck for controller pod {}", nodeRef);
restartContext.quorumCheck = quorumCheck(controllerAdminClient, nodeRef);
}

Expand All @@ -629,6 +620,7 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont

// If it is a mixed node, initialise quorum check with the broker admin client
if (isController) {
LOGGER.debugCr(reconciliation, "Initialising KafkaQuorumCheck for mixed roles pod {}", nodeRef);
restartContext.quorumCheck = quorumCheck(brokerAdminClient, nodeRef);
}
}
Expand All @@ -637,7 +629,6 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont
// connect to the node and that it's capable of responding.
Config nodeConfig;
try {
System.out.println("TINA Getting node config for " + nodeRef.nodeId());
nodeConfig = nodeConfig(nodeRef, isController && !isBroker);
} catch (ForceableProblem e) {
if (restartContext.backOff.done()) {
Expand Down Expand Up @@ -678,21 +669,6 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont
restartContext.nodeLoggingDiff = nodeLoggingDiff;
}

private void handleFailedAdminClientForController(NodeRef nodeRef, RestartContext restartContext, RestartReasons reasonToRestartPod, String currentVersion) throws UnforceableProblem {
if (KafkaVersion.compareDottedVersions(currentVersion, "3.9.0") >= 0) {
// If the version supports talking to controllers, force restart this pod when the admin client cannot be initialised.
// There is no reason to continue as we won't be able to connect an admin client to this pod for other checks later.
LOGGER.infoCr(reconciliation, "KafkaQuorumCheck cannot be initialised for {} because none of the controllers do not seem to responding to connection attempts.", nodeRef);
reasonToRestartPod.add(RestartReason.POD_UNRESPONSIVE);
markRestartContextWithForceRestart(restartContext);
} else {
// If the version does not support talking to controllers, the admin client should be connecting to the broker nodes.
// Since connection to the brokers failed, throw an UnforceableProblem so that broker nodes can be checked later
// which may potentially resolve the connection issue.
throw new UnforceableProblem("KafkaQuorumCheck cannot be initialised for " + nodeRef + " because none of the brokers do not seem to responding to connection attempts");
}
}

/**
* Returns a config of the given broker.
* @param nodeRef The reference of the broker.
Expand All @@ -701,13 +677,11 @@ private void handleFailedAdminClientForController(NodeRef nodeRef, RestartContex
/* test */ Config nodeConfig(NodeRef nodeRef, boolean isPureController) throws ForceableProblem, InterruptedException {
ConfigResource resource = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(nodeRef.nodeId()));
if (isPureController) {
System.out.println("Getting controller config");
return await(VertxUtil.kafkaFutureToVertxFuture(reconciliation, vertx, controllerAdminClient.describeConfigs(singletonList(resource)).values().get(resource)),
30, TimeUnit.SECONDS,
error -> new ForceableProblem("Error getting controller config: " + error, error)
);
} else {
System.out.println("Getting broker config");
return await(VertxUtil.kafkaFutureToVertxFuture(reconciliation, vertx, brokerAdminClient.describeConfigs(singletonList(resource)).values().get(resource)),
30, TimeUnit.SECONDS,
error -> new ForceableProblem("Error getting broker config: " + error, error)
Expand Down Expand Up @@ -926,15 +900,7 @@ protected Future<Void> restart(Pod pod, RestartContext restartContext) {
* empty set, use the brokers service to bootstrap the client.
*/
/* test */ Admin brokerAdminClient(Set<NodeRef> nodes) throws ForceableProblem, FatalProblem {
// If no nodes are passed, initialize the admin client using the bootstrap service
// This is still needed for versions older than 3.9.0, so that when only controller nodes being rolled,
// it can use brokers to get quorum information via AdminClient.
String bootstrapHostnames;
if (nodes.isEmpty()) {
bootstrapHostnames = String.format("%s:%s", DnsNameGenerator.of(namespace, KafkaResources.bootstrapServiceName(cluster)).serviceDnsName(), KafkaCluster.REPLICATION_PORT);
} else {
bootstrapHostnames = nodes.stream().filter(NodeRef::broker).map(node -> DnsNameGenerator.podDnsName(namespace, KafkaResources.brokersServiceName(cluster), node.podName()) + ":" + KafkaCluster.REPLICATION_PORT).collect(Collectors.joining(","));
}
String bootstrapHostnames = nodes.stream().filter(NodeRef::broker).map(node -> DnsNameGenerator.podDnsName(namespace, KafkaResources.brokersServiceName(cluster), node.podName()) + ":" + KafkaCluster.REPLICATION_PORT).collect(Collectors.joining(","));

try {
LOGGER.debugCr(reconciliation, "Creating AdminClient for {}", bootstrapHostnames);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.strimzi.operator.cluster.PlatformFeaturesAvailability;
import io.strimzi.operator.cluster.ResourceUtils;
import io.strimzi.operator.cluster.model.CertUtils;
import io.strimzi.operator.cluster.model.KafkaCluster;
import io.strimzi.operator.cluster.model.KafkaVersion;
import io.strimzi.operator.cluster.model.PodSetUtils;
import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier;
Expand Down Expand Up @@ -62,7 +61,6 @@

import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.CoreMatchers.startsWith;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.strimzi.api.kafka.model.kafka.KafkaResources;
import io.strimzi.operator.cluster.KafkaVersionTestUtils;
import io.strimzi.operator.cluster.model.KafkaCluster;
import io.strimzi.operator.cluster.model.NodeRef;
import io.strimzi.operator.cluster.model.RestartReason;
import io.strimzi.operator.cluster.model.RestartReasons;
Expand Down Expand Up @@ -162,7 +161,7 @@ private static AdminClientProvider givenControllerFutureFailsWithTimeout() {

@Test
public void testTalkingToControllersLatestVersion(VertxTestContext testContext) {
PodOperator podOps = mockPodOpsWithVersion(podId -> succeededFuture(), KafkaVersionTestUtils.getLatestVersion().version());
PodOperator podOps = mockPodOps(podId -> succeededFuture());
AdminClientProvider mock = mock(AdminClientProvider.class);
when(mock.createControllerAdminClient(anyString(), any(), any())).thenThrow(new RuntimeException("An error while try to create an admin client with bootstrap controllers"));

Expand All @@ -177,26 +176,6 @@ public void testTalkingToControllersLatestVersion(VertxTestContext testContext)
asList(0));
}

@Test
public void testTalkingToControllersWithOldVersion(VertxTestContext testContext) throws InterruptedException {
PodOperator podOps = mockPodOpsWithVersion(podId -> succeededFuture(), "3.8.0");

AdminClientProvider mock = mock(AdminClientProvider.class);
when(mock.createAdminClient(anyString(), any(), any())).thenThrow(new RuntimeException("An error while try to create an admin client with bootstrap brokers"));

TestingKafkaRoller kafkaRoller = new TestingKafkaRoller(addKraftPodNames(0, 0, 1), podOps,
noException(), null, noException(), noException(), noException(),
brokerId -> succeededFuture(true),
true, mock, mockKafkaAgentClientProvider(), true, null, -1);

// If the controller has older version (< 3.9.0), we should only be creating admin client for brokers
// and when the operator cannot connect to brokers, we expect to fail initialising KafkaQuorumCheck
doFailingRollingRestart(testContext, kafkaRoller,
asList(0),
KafkaRoller.UnforceableProblem.class, "KafkaQuorumCheck cannot be initialised for c-kafka-0/0 because none of the brokers do not seem to responding to connection attempts",
emptyList());
}

private static KafkaAgentClientProvider mockKafkaAgentClientProvider() {
return mock(KafkaAgentClientProvider.class);
}
Expand Down Expand Up @@ -835,17 +814,12 @@ public void clearRestarted() {
}

private PodOperator mockPodOps(Function<Integer, Future<Void>> readiness) {
return mockPodOpsWithVersion(readiness, KafkaVersionTestUtils.getLatestVersion().version());
}

private PodOperator mockPodOpsWithVersion(Function<Integer, Future<Void>> readiness, String version) {
PodOperator podOps = mock(PodOperator.class);
when(podOps.get(any(), any())).thenAnswer(
invocation -> new PodBuilder()
.withNewMetadata()
.withNamespace(invocation.getArgument(0))
.withName(invocation.getArgument(1))
.addToAnnotations(KafkaCluster.ANNO_STRIMZI_IO_KAFKA_VERSION, version)
.endMetadata()
.build()
);
Expand Down

0 comments on commit 36dc1a8

Please sign in to comment.