diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaRoller.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaRoller.java index 977127c3ad..865c1908ac 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaRoller.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaRoller.java @@ -20,7 +20,6 @@ import io.strimzi.operator.cluster.operator.resource.events.KubernetesRestartEventPublisher; import io.strimzi.operator.cluster.operator.resource.kubernetes.PodOperator; import io.strimzi.operator.common.AdminClientProvider; -import io.strimzi.operator.common.Annotations; import io.strimzi.operator.common.BackOff; import io.strimzi.operator.common.Reconciliation; import io.strimzi.operator.common.ReconciliationLogger; @@ -209,19 +208,10 @@ private boolean maybeInitBrokerAdminClient() { * Initializes controllerAdminClient if it has not been initialized yet * @return true if the creation of AC succeeded, false otherwise */ - private boolean maybeInitControllerAdminClient(String currentVersion) { + private boolean maybeInitControllerAdminClient() { if (this.controllerAdminClient == null) { - // Prior to 3.9.0, Kafka did not support directly connecting to controllers nodes - // via Kafka Admin API when running in KRaft mode. - // Therefore, use brokers to initialise adminClient for quorum health check - // when the version is older than 3.9.0. try { - if (KafkaVersion.compareDottedVersions(currentVersion, "3.9.0") >= 0) { - this.controllerAdminClient = controllerAdminClient(nodes); - } else { - this.controllerAdminClient = brokerAdminClient(Set.of()); - - } + this.controllerAdminClient = controllerAdminClient(nodes); } catch (ForceableProblem | FatalProblem e) { LOGGER.warnCr(reconciliation, "Failed to create controllerAdminClient.", e); return false; @@ -455,11 +445,9 @@ private void restartIfNecessary(NodeRef nodeRef, RestartContext restartContext) // change and the desired roles still apply. boolean isBroker = Labels.booleanLabel(pod, Labels.STRIMZI_BROKER_ROLE_LABEL, nodeRef.broker()); boolean isController = Labels.booleanLabel(pod, Labels.STRIMZI_CONTROLLER_ROLE_LABEL, nodeRef.controller()); - // This is relevant when creating admin client for controllers - String currentVersion = Annotations.stringAnnotation(pod, KafkaCluster.ANNO_STRIMZI_IO_KAFKA_VERSION, "0.0.0", null); try { - checkIfRestartOrReconfigureRequired(nodeRef, isController, isBroker, restartContext, currentVersion); + checkIfRestartOrReconfigureRequired(nodeRef, isController, isBroker, restartContext); if (restartContext.forceRestart) { LOGGER.debugCr(reconciliation, "Pod {} can be rolled now", nodeRef); restartAndAwaitReadiness(pod, operationTimeoutMs, TimeUnit.MILLISECONDS, restartContext); @@ -589,7 +577,7 @@ private void markRestartContextWithForceRestart(RestartContext restartContext) { * Determine whether the pod should be restarted, or the broker reconfigured. */ @SuppressWarnings("checkstyle:CyclomaticComplexity") - private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isController, boolean isBroker, RestartContext restartContext, String currentVersion) throws ForceableProblem, InterruptedException, FatalProblem, UnforceableProblem { + private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isController, boolean isBroker, RestartContext restartContext) throws ForceableProblem, InterruptedException, FatalProblem { RestartReasons reasonToRestartPod = restartContext.restartReasons; if (restartContext.podStuck && !reasonToRestartPod.contains(RestartReason.POD_HAS_OLD_REVISION)) { // If the pod is unschedulable then deleting it, or trying to open an Admin client to it will make no difference @@ -612,10 +600,13 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont // if it is a pure controller, initialise the admin client specifically for controllers if (isController && !isBroker) { - if (!maybeInitControllerAdminClient(currentVersion)) { - handleFailedAdminClientForController(nodeRef, restartContext, reasonToRestartPod, currentVersion); + if (!maybeInitControllerAdminClient()) { + LOGGER.infoCr(reconciliation, "Pod {} needs to be restarted, because it does not seem to responding to connection attempts", nodeRef); + reasonToRestartPod.add(RestartReason.POD_UNRESPONSIVE); + markRestartContextWithForceRestart(restartContext); return; } + LOGGER.debugCr(reconciliation, "Initialising KafkaQuorumCheck for controller pod {}", nodeRef); restartContext.quorumCheck = quorumCheck(controllerAdminClient, nodeRef); } @@ -629,6 +620,7 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont // If it is a mixed node, initialise quorum check with the broker admin client if (isController) { + LOGGER.debugCr(reconciliation, "Initialising KafkaQuorumCheck for mixed roles pod {}", nodeRef); restartContext.quorumCheck = quorumCheck(brokerAdminClient, nodeRef); } } @@ -637,7 +629,6 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont // connect to the node and that it's capable of responding. Config nodeConfig; try { - System.out.println("TINA Getting node config for " + nodeRef.nodeId()); nodeConfig = nodeConfig(nodeRef, isController && !isBroker); } catch (ForceableProblem e) { if (restartContext.backOff.done()) { @@ -678,21 +669,6 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont restartContext.nodeLoggingDiff = nodeLoggingDiff; } - private void handleFailedAdminClientForController(NodeRef nodeRef, RestartContext restartContext, RestartReasons reasonToRestartPod, String currentVersion) throws UnforceableProblem { - if (KafkaVersion.compareDottedVersions(currentVersion, "3.9.0") >= 0) { - // If the version supports talking to controllers, force restart this pod when the admin client cannot be initialised. - // There is no reason to continue as we won't be able to connect an admin client to this pod for other checks later. - LOGGER.infoCr(reconciliation, "KafkaQuorumCheck cannot be initialised for {} because none of the controllers do not seem to responding to connection attempts.", nodeRef); - reasonToRestartPod.add(RestartReason.POD_UNRESPONSIVE); - markRestartContextWithForceRestart(restartContext); - } else { - // If the version does not support talking to controllers, the admin client should be connecting to the broker nodes. - // Since connection to the brokers failed, throw an UnforceableProblem so that broker nodes can be checked later - // which may potentially resolve the connection issue. - throw new UnforceableProblem("KafkaQuorumCheck cannot be initialised for " + nodeRef + " because none of the brokers do not seem to responding to connection attempts"); - } - } - /** * Returns a config of the given broker. * @param nodeRef The reference of the broker. @@ -701,13 +677,11 @@ private void handleFailedAdminClientForController(NodeRef nodeRef, RestartContex /* test */ Config nodeConfig(NodeRef nodeRef, boolean isPureController) throws ForceableProblem, InterruptedException { ConfigResource resource = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(nodeRef.nodeId())); if (isPureController) { - System.out.println("Getting controller config"); return await(VertxUtil.kafkaFutureToVertxFuture(reconciliation, vertx, controllerAdminClient.describeConfigs(singletonList(resource)).values().get(resource)), 30, TimeUnit.SECONDS, error -> new ForceableProblem("Error getting controller config: " + error, error) ); } else { - System.out.println("Getting broker config"); return await(VertxUtil.kafkaFutureToVertxFuture(reconciliation, vertx, brokerAdminClient.describeConfigs(singletonList(resource)).values().get(resource)), 30, TimeUnit.SECONDS, error -> new ForceableProblem("Error getting broker config: " + error, error) @@ -926,15 +900,7 @@ protected Future restart(Pod pod, RestartContext restartContext) { * empty set, use the brokers service to bootstrap the client. */ /* test */ Admin brokerAdminClient(Set nodes) throws ForceableProblem, FatalProblem { - // If no nodes are passed, initialize the admin client using the bootstrap service - // This is still needed for versions older than 3.9.0, so that when only controller nodes being rolled, - // it can use brokers to get quorum information via AdminClient. - String bootstrapHostnames; - if (nodes.isEmpty()) { - bootstrapHostnames = String.format("%s:%s", DnsNameGenerator.of(namespace, KafkaResources.bootstrapServiceName(cluster)).serviceDnsName(), KafkaCluster.REPLICATION_PORT); - } else { - bootstrapHostnames = nodes.stream().filter(NodeRef::broker).map(node -> DnsNameGenerator.podDnsName(namespace, KafkaResources.brokersServiceName(cluster), node.podName()) + ":" + KafkaCluster.REPLICATION_PORT).collect(Collectors.joining(",")); - } + String bootstrapHostnames = nodes.stream().filter(NodeRef::broker).map(node -> DnsNameGenerator.podDnsName(namespace, KafkaResources.brokersServiceName(cluster), node.podName()) + ":" + KafkaCluster.REPLICATION_PORT).collect(Collectors.joining(",")); try { LOGGER.debugCr(reconciliation, "Creating AdminClient for {}", bootstrapHostnames); diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorKRaftMockTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorKRaftMockTest.java index c1e6957985..565d940538 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorKRaftMockTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorKRaftMockTest.java @@ -28,7 +28,6 @@ import io.strimzi.operator.cluster.PlatformFeaturesAvailability; import io.strimzi.operator.cluster.ResourceUtils; import io.strimzi.operator.cluster.model.CertUtils; -import io.strimzi.operator.cluster.model.KafkaCluster; import io.strimzi.operator.cluster.model.KafkaVersion; import io.strimzi.operator.cluster.model.PodSetUtils; import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier; @@ -62,7 +61,6 @@ import static org.hamcrest.CoreMatchers.hasItems; import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.CoreMatchers.startsWith; diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaRollerTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaRollerTest.java index ec8ff849e5..69c6ddfbef 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaRollerTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaRollerTest.java @@ -9,7 +9,6 @@ import io.fabric8.kubernetes.client.KubernetesClientException; import io.strimzi.api.kafka.model.kafka.KafkaResources; import io.strimzi.operator.cluster.KafkaVersionTestUtils; -import io.strimzi.operator.cluster.model.KafkaCluster; import io.strimzi.operator.cluster.model.NodeRef; import io.strimzi.operator.cluster.model.RestartReason; import io.strimzi.operator.cluster.model.RestartReasons; @@ -162,7 +161,7 @@ private static AdminClientProvider givenControllerFutureFailsWithTimeout() { @Test public void testTalkingToControllersLatestVersion(VertxTestContext testContext) { - PodOperator podOps = mockPodOpsWithVersion(podId -> succeededFuture(), KafkaVersionTestUtils.getLatestVersion().version()); + PodOperator podOps = mockPodOps(podId -> succeededFuture()); AdminClientProvider mock = mock(AdminClientProvider.class); when(mock.createControllerAdminClient(anyString(), any(), any())).thenThrow(new RuntimeException("An error while try to create an admin client with bootstrap controllers")); @@ -177,26 +176,6 @@ public void testTalkingToControllersLatestVersion(VertxTestContext testContext) asList(0)); } - @Test - public void testTalkingToControllersWithOldVersion(VertxTestContext testContext) throws InterruptedException { - PodOperator podOps = mockPodOpsWithVersion(podId -> succeededFuture(), "3.8.0"); - - AdminClientProvider mock = mock(AdminClientProvider.class); - when(mock.createAdminClient(anyString(), any(), any())).thenThrow(new RuntimeException("An error while try to create an admin client with bootstrap brokers")); - - TestingKafkaRoller kafkaRoller = new TestingKafkaRoller(addKraftPodNames(0, 0, 1), podOps, - noException(), null, noException(), noException(), noException(), - brokerId -> succeededFuture(true), - true, mock, mockKafkaAgentClientProvider(), true, null, -1); - - // If the controller has older version (< 3.9.0), we should only be creating admin client for brokers - // and when the operator cannot connect to brokers, we expect to fail initialising KafkaQuorumCheck - doFailingRollingRestart(testContext, kafkaRoller, - asList(0), - KafkaRoller.UnforceableProblem.class, "KafkaQuorumCheck cannot be initialised for c-kafka-0/0 because none of the brokers do not seem to responding to connection attempts", - emptyList()); - } - private static KafkaAgentClientProvider mockKafkaAgentClientProvider() { return mock(KafkaAgentClientProvider.class); } @@ -835,17 +814,12 @@ public void clearRestarted() { } private PodOperator mockPodOps(Function> readiness) { - return mockPodOpsWithVersion(readiness, KafkaVersionTestUtils.getLatestVersion().version()); - } - - private PodOperator mockPodOpsWithVersion(Function> readiness, String version) { PodOperator podOps = mock(PodOperator.class); when(podOps.get(any(), any())).thenAnswer( invocation -> new PodBuilder() .withNewMetadata() .withNamespace(invocation.getArgument(0)) .withName(invocation.getArgument(1)) - .addToAnnotations(KafkaCluster.ANNO_STRIMZI_IO_KAFKA_VERSION, version) .endMetadata() .build() );