From 316fc14d784752e1df28cba708bb604ea8a787b8 Mon Sep 17 00:00:00 2001 From: Gantigmaa Selenge Date: Fri, 29 Nov 2024 10:48:21 +0000 Subject: [PATCH 1/2] Check and apply configurations for controllers via Admin API Signed-off-by: Gantigmaa Selenge --- .../cluster/model/KafkaConfiguration.java | 160 ------------- .../operator/assembly/KafkaReconciler.java | 14 +- ...f.java => KafkaNodeConfigurationDiff.java} | 215 +++++++++++++++--- ...=> KafkaNodeLoggingConfigurationDiff.java} | 6 +- .../operator/resource/KafkaRoller.java | 141 +++++++----- .../KafkaAssemblyOperatorKRaftMockTest.java | 158 ------------- ...va => KafkaNodeConfigurationDiffTest.java} | 124 ++++++---- ...afkaNodeLoggingConfigurationDiffTest.java} | 8 +- .../operator/resource/KafkaRollerTest.java | 6 +- 9 files changed, 351 insertions(+), 481 deletions(-) rename cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/{KafkaBrokerConfigurationDiff.java => KafkaNodeConfigurationDiff.java} (56%) rename cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/{KafkaBrokerLoggingConfigurationDiff.java => KafkaNodeLoggingConfigurationDiff.java} (97%) rename cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/{KafkaBrokerConfigurationDiffTest.java => KafkaNodeConfigurationDiffTest.java} (65%) rename cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/{KafkaBrokerLoggingConfigurationDiffTest.java => KafkaNodeLoggingConfigurationDiffTest.java} (93%) diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaConfiguration.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaConfiguration.java index 1adb760d67b..dac96fc5f72 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaConfiguration.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaConfiguration.java @@ -52,148 +52,6 @@ public class KafkaConfiguration extends AbstractConfiguration { FORBIDDEN_PREFIX_EXCEPTIONS = AbstractConfiguration.splitPrefixesOrOptionsToList(KafkaClusterSpec.FORBIDDEN_PREFIX_EXCEPTIONS); } - /** - * List of configuration options that are relevant to controllers and should be considered when deciding whether - * a controller-only node needs to be rolled or not. - */ - private static final Set CONTROLLER_RELEVANT_CONFIGS = Set.of( - "alter.config.policy.class.name", - "authorizer.class.name", - "auto.create.topics.enable", - "background.threads", - "broker.heartbeat.interval.ms", - "broker.rack", - "broker.session.timeout.ms", - "connection.failed.authentication.delay.ms", - "connections.max.idle.ms", - "connections.max.reauth.ms", - "controlled.shutdown.enable", - "controlled.shutdown.max.retries", - "controlled.shutdown.retry.backoff.ms", - "controller.listener.names", - "controller.quorum.append.linger.ms", - "controller.quorum.election.backoff.max.ms", - "controller.quorum.election.timeout.ms", - "controller.quorum.fetch.timeout.ms", - "controller.quorum.request.timeout.ms", - "controller.quorum.retry.backoff.ms", - "controller.quorum.voters", - "controller.quota.window.num", - "controller.quota.window.size.seconds", - "controller.socket.timeout.ms", - "create.topic.policy.class.name", - "default.replication.factor", - "delete.topic.enable", - "early.start.listeners", - "kafka.metrics.polling.interval.secs", - "kafka.metrics.reporters", - "leader.imbalance.check.interval.seconds", - "leader.imbalance.per.broker.percentage", - "listener.name.controlplane-9090.ssl.keystore.location", - "listener.name.controlplane-9090.ssl.keystore.password", - "listener.name.controlplane-9090.ssl.keystore.type", - "listener.name.controlplane-9090.ssl.truststore.location", - "listener.name.controlplane-9090.ssl.truststore.password", - "listener.name.controlplane-9090.ssl.truststore.type", - "listener.name.controlplane-9090.ssl.client.auth", - "listener.security.protocol.map", - "listeners", - "log.dir", - "log.dirs", - "min.insync.replicas", - "max.connection.creation.rate", - "max.connections.per.ip.overrides", - "max.connections.per.ip", - "max.connections", - "metadata.log.dir", - "metadata.log.max.record.bytes.between.snapshots", - "metadata.log.max.snapshot.interval.ms", - "metadata.log.segment.bytes", - "metadata.log.segment.min.bytes", - "metadata.log.segment.ms", - "metadata.max.idle.interval.ms", - "metadata.max.retention.bytes", - "metadata.max.retention.ms", - "metric.reporters", - "metrics.num.samples", - "metrics.recording.level", - "metrics.sample.window.ms", - "node.id", - "num.io.threads", - "num.network.threads", - "num.partitions", - "offsets.topic.replication.factor", - "principal.builder.class", - "process.roles", - "replica.selector.class", - "reserved.broker.max.id", - "sasl.enabled.mechanisms", - "sasl.kerberos.kinit.cmd", - "sasl.kerberos.min.time.before.relogin", - "sasl.kerberos.principal.to.local.rules", - "sasl.kerberos.service.name", - "sasl.kerberos.ticket.renew.jitter", - "sasl.kerberos.ticket.renew.window.factor", - "sasl.login.callback.handler.class", - "sasl.login.class", - "sasl.login.connect.timeout.ms", - "sasl.login.read.timeout.ms", - "sasl.login.refresh.buffer.seconds", - "sasl.login.refresh.min.period.seconds", - "sasl.login.refresh.window.factor", - "sasl.login.refresh.window.jitter", - "sasl.login.retry.backoff.max.ms", - "sasl.login.retry.backoff.ms", - "sasl.mechanism.controller.protocol", - "sasl.oauthbearer.clock.skew.seconds", - "sasl.oauthbearer.expected.audience", - "sasl.oauthbearer.expected.issuer", - "sasl.oauthbearer.jwks.endpoint.refresh.ms", - "sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms", - "sasl.oauthbearer.jwks.endpoint.retry.backoff.ms", - "sasl.oauthbearer.jwks.endpoint.url", - "sasl.oauthbearer.scope.claim.name", - "sasl.oauthbearer.sub.claim.name", - "sasl.oauthbearer.token.endpoint.url", - "sasl.server.callback.handler.class", - "sasl.server.max.receive.size", - "security.providers", - "server.max.startup.time.ms", - "socket.connection.setup.timeout.max.ms", - "socket.connection.setup.timeout.ms", - "socket.listen.backlog.size", - "socket.receive.buffer.bytes", - "socket.request.max.bytes", - "socket.send.buffer.bytes", - "ssl.cipher.suites", - "ssl.client.auth", - "ssl.enabled.protocols", - "ssl.endpoint.identification.algorithm", - "ssl.engine.factory.class", - "ssl.key.password", - "ssl.keymanager.algorithm", - "ssl.keystore.certificate.chain", - "ssl.keystore.key", - "ssl.keystore.location", - "ssl.keystore.password", - "ssl.keystore.type", - "ssl.principal.mapping.rules", - "ssl.protocol", - "ssl.provider", - "ssl.secure.random.implementation", - "ssl.trustmanager.algorithm", - "ssl.truststore.certificates", - "ssl.truststore.location", - "ssl.truststore.password", - "ssl.truststore.type", - "super.users", - "transaction.state.log.min.isr", - "transaction.state.log.replication.factor", - "queued.max.requests", - "queued.max.requests.bytes", - "unclean.leader.election.enable" - ); - /** * Copy constructor which creates new instance of the Kafka Configuration from existing configuration. It is * useful when you need to modify an instance of the configuration without permanently changing the original. @@ -294,24 +152,6 @@ public Set unknownConfigsWithValues(KafkaVersion kafkaVersion) { return result; } - /** - * Return the config properties with their values in this KafkaConfiguration which are known to be relevant for the - * Kafka controller nodes. - * - * @return The configuration options relevant for controllers - */ - public Set controllerConfigsWithValues() { - Set result = new HashSet<>(); - - for (Map.Entry e :this.asOrderedProperties().asMap().entrySet()) { - if (CONTROLLER_RELEVANT_CONFIGS.contains(e.getKey())) { - result.add(e.getKey() + "=" + e.getValue()); - } - } - - return result; - } - /** * @return True if the configuration is empty. False otherwise. */ diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java index 7c365f758b7..6826759a3a8 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java @@ -161,7 +161,7 @@ public class KafkaReconciler { private final boolean continueOnManualRUFailure; private String logging = ""; - private final Map brokerLoggingHash = new HashMap<>(); + private final Map nodeLoggingHash = new HashMap<>(); private final Map brokerConfigurationHash = new HashMap<>(); private final Map kafkaServerCertificateHash = new HashMap<>(); /* test */ TlsPemIdentity coTlsPemIdentity; @@ -709,15 +709,7 @@ protected Future perBrokerKafkaConfiguration(MetricsAndLogging metricsAndL // We collect the configuration options related to various plugins nodeConfiguration += kc.unknownConfigsWithValues(kafka.getKafkaVersion()).toString(); - // We collect the information relevant to controller-only nodes - if (pool.isController() && !pool.isBroker()) { - // For controllers only, we extract the controller-relevant configurations and use it in the configuration annotations - nodeConfiguration = kc.controllerConfigsWithValues().toString(); - // For controllers only, we use the full logging configuration in the logging annotation - this.brokerLoggingHash.put(nodeId, Util.hashStub(logging)); - } else { - this.brokerLoggingHash.put(nodeId, Util.hashStub(Util.getLoggingDynamicallyUnmodifiableEntries(logging))); - } + this.nodeLoggingHash.put(nodeId, Util.hashStub(Util.getLoggingDynamicallyUnmodifiableEntries(logging))); // We store hash of the broker configurations for later use in Pod and in rolling updates this.brokerConfigurationHash.put(nodeId, Util.hashStub(nodeConfiguration)); @@ -810,7 +802,7 @@ private Map podSetPodAnnotations(NodeRef node) { podAnnotations.put(Ca.ANNO_STRIMZI_IO_CLUSTER_CA_CERT_GENERATION, String.valueOf(this.clusterCa.caCertGeneration())); podAnnotations.put(Ca.ANNO_STRIMZI_IO_CLUSTER_CA_KEY_GENERATION, String.valueOf(this.clusterCa.caKeyGeneration())); podAnnotations.put(Ca.ANNO_STRIMZI_IO_CLIENTS_CA_CERT_GENERATION, String.valueOf(this.clientsCa.caCertGeneration())); - podAnnotations.put(Annotations.ANNO_STRIMZI_LOGGING_HASH, brokerLoggingHash.get(node.nodeId())); + podAnnotations.put(Annotations.ANNO_STRIMZI_LOGGING_HASH, nodeLoggingHash.get(node.nodeId())); podAnnotations.put(KafkaCluster.ANNO_STRIMZI_BROKER_CONFIGURATION_HASH, brokerConfigurationHash.get(node.nodeId())); podAnnotations.put(ANNO_STRIMZI_IO_KAFKA_VERSION, kafka.getKafkaVersion().version()); diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaBrokerConfigurationDiff.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaNodeConfigurationDiff.java similarity index 56% rename from cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaBrokerConfigurationDiff.java rename to cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaNodeConfigurationDiff.java index 211d68f4a41..d8ccffb063b 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaBrokerConfigurationDiff.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaNodeConfigurationDiff.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -36,12 +37,8 @@ * 3b. If entry was removed from desired, add it to the diff with null value. * 3c. If custom entry was removed, delete property */ -public class KafkaBrokerConfigurationDiff extends AbstractJsonDiff { - private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(KafkaBrokerConfigurationDiff.class); - - private final Reconciliation reconciliation; - private final Collection brokerConfigDiff; - private final Map configModel; +public class KafkaNodeConfigurationDiff extends AbstractJsonDiff { + private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(KafkaNodeConfigurationDiff.class); /** * These options are skipped because they contain placeholders @@ -68,19 +65,163 @@ public class KafkaBrokerConfigurationDiff extends AbstractJsonDiff { * KRaft controller configuration options are skipped if it is not combined node */ private static final Pattern IGNORABLE_CONTROLLER_PROPERTIES = Pattern.compile("controller\\.quorum\\..*"); + + /** + * List of configuration options that are relevant to controllers and should be considered when deciding whether + * a controller-only node needs to be rolled or not. + * The options that contain placeholders + * 9090 is for skipping all (internal, plain, secured, external) listeners properties + */ + private static final Set CONTROLLER_RELEVANT_CONFIGS = Set.of( + "alter.config.policy.class.name", + "authorizer.class.name", + "auto.create.topics.enable", + "background.threads", + "broker.heartbeat.interval.ms", + "broker.session.timeout.ms", + "connection.failed.authentication.delay.ms", + "connections.max.idle.ms", + "connections.max.reauth.ms", + "controlled.shutdown.enable", + "controlled.shutdown.max.retries", + "controlled.shutdown.retry.backoff.ms", + "controller.listener.names", + "controller.quorum.append.linger.ms", + "controller.quorum.election.backoff.max.ms", + "controller.quorum.election.timeout.ms", + "controller.quorum.fetch.timeout.ms", + "controller.quorum.request.timeout.ms", + "controller.quorum.retry.backoff.ms", + "controller.quorum.voters", + "controller.quota.window.num", + "controller.quota.window.size.seconds", + "controller.socket.timeout.ms", + "create.topic.policy.class.name", + "default.replication.factor", + "delete.topic.enable", + "early.start.listeners", + "kafka.metrics.polling.interval.secs", + "kafka.metrics.reporters", + "leader.imbalance.check.interval.seconds", + "leader.imbalance.per.broker.percentage", + "listener.security.protocol.map", + "listeners", + "log.dir", + "log.dirs", + "min.insync.replicas", + "max.connection.creation.rate", + "max.connections.per.ip.overrides", + "max.connections.per.ip", + "max.connections", + "metadata.log.dir", + "metadata.log.max.record.bytes.between.snapshots", + "metadata.log.max.snapshot.interval.ms", + "metadata.log.segment.bytes", + "metadata.log.segment.min.bytes", + "metadata.log.segment.ms", + "metadata.max.idle.interval.ms", + "metadata.max.retention.bytes", + "metadata.max.retention.ms", + "metric.reporters", + "metrics.num.samples", + "metrics.recording.level", + "metrics.sample.window.ms", + "num.io.threads", + "num.network.threads", + "num.partitions", + "offsets.topic.replication.factor", + "principal.builder.class", + "process.roles", + "replica.selector.class", + "reserved.broker.max.id", + "sasl.kerberos.kinit.cmd", + "sasl.kerberos.min.time.before.relogin", + "sasl.kerberos.principal.to.local.rules", + "sasl.kerberos.service.name", + "sasl.kerberos.ticket.renew.jitter", + "sasl.kerberos.ticket.renew.window.factor", + "sasl.login.callback.handler.class", + "sasl.login.class", + "sasl.login.connect.timeout.ms", + "sasl.login.read.timeout.ms", + "sasl.login.refresh.buffer.seconds", + "sasl.login.refresh.min.period.seconds", + "sasl.login.refresh.window.factor", + "sasl.login.refresh.window.jitter", + "sasl.login.retry.backoff.max.ms", + "sasl.login.retry.backoff.ms", + "sasl.mechanism.controller.protocol", + "sasl.oauthbearer.clock.skew.seconds", + "sasl.oauthbearer.expected.audience", + "sasl.oauthbearer.expected.issuer", + "sasl.oauthbearer.jwks.endpoint.refresh.ms", + "sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms", + "sasl.oauthbearer.jwks.endpoint.retry.backoff.ms", + "sasl.oauthbearer.jwks.endpoint.url", + "sasl.oauthbearer.scope.claim.name", + "sasl.oauthbearer.sub.claim.name", + "sasl.oauthbearer.token.endpoint.url", + "sasl.server.callback.handler.class", + "sasl.server.max.receive.size", + "security.providers", + "server.max.startup.time.ms", + "socket.connection.setup.timeout.max.ms", + "socket.connection.setup.timeout.ms", + "socket.listen.backlog.size", + "socket.receive.buffer.bytes", + "socket.request.max.bytes", + "socket.send.buffer.bytes", + "ssl.cipher.suites", + "ssl.client.auth", + "ssl.enabled.protocols", + "ssl.endpoint.identification.algorithm", + "ssl.engine.factory.class", + "ssl.key.password", + "ssl.keymanager.algorithm", + "ssl.keystore.certificate.chain", + "ssl.keystore.key", + "ssl.keystore.location", + "ssl.keystore.password", + "ssl.keystore.type", + "ssl.principal.mapping.rules", + "ssl.protocol", + "ssl.provider", + "ssl.secure.random.implementation", + "ssl.trustmanager.algorithm", + "ssl.truststore.certificates", + "ssl.truststore.location", + "ssl.truststore.password", + "ssl.truststore.type", + "super.users", + "transaction.state.log.min.isr", + "transaction.state.log.replication.factor", + "queued.max.requests", + "queued.max.requests.bytes", + "unclean.leader.election.enable" + ); + + private final Reconciliation reconciliation; + private final Collection nodeConfigDiff; + private final Map configModel; + private final boolean isController; + private final boolean isBroker; + /** * Constructor * * @param reconciliation Reconciliation marker - * @param brokerConfigs Broker configuration from Kafka Admin API + * @param nodeConfigs Kafka node configuration from Kafka Admin API * @param desired Desired configuration * @param kafkaVersion Kafka version - * @param brokerNodeRef Broker node reference + * @param nodeRef Node reference */ - protected KafkaBrokerConfigurationDiff(Reconciliation reconciliation, Config brokerConfigs, String desired, KafkaVersion kafkaVersion, NodeRef brokerNodeRef) { + protected KafkaNodeConfigurationDiff(Reconciliation reconciliation, Config nodeConfigs, String desired, KafkaVersion kafkaVersion, NodeRef nodeRef, boolean isController, boolean isBroker) { this.reconciliation = reconciliation; this.configModel = KafkaConfiguration.readConfigModel(kafkaVersion); - this.brokerConfigDiff = diff(brokerNodeRef, desired, brokerConfigs, configModel); + this.isController = isController; + this.isBroker = isBroker; + this.nodeConfigDiff = diff(nodeRef, desired, nodeConfigs, configModel); + } /** @@ -88,7 +229,7 @@ protected KafkaBrokerConfigurationDiff(Reconciliation reconciliation, Config bro */ protected boolean canBeUpdatedDynamically() { boolean result = true; - for (AlterConfigOp entry : brokerConfigDiff) { + for (AlterConfigOp entry : nodeConfigDiff) { if (isEntryReadOnly(entry.configEntry())) { result = false; LOGGER.infoCr(reconciliation, "Configuration can't be updated dynamically due to: {}", entry); @@ -111,44 +252,48 @@ private boolean isEntryReadOnly(ConfigEntry entry) { * @return Collection of AlterConfigOp containing difference between current and desired configuration */ protected Collection getConfigDiff() { - return brokerConfigDiff; + return nodeConfigDiff; } /** * @return The number of broker configs which are different. */ protected int getDiffSize() { - return brokerConfigDiff.size(); + return nodeConfigDiff.size(); } - private static boolean isIgnorableProperty(final String key, final boolean nodeIsController) { - // If node is not a KRaft controller, ignore KRaft controller config properties. - if (!nodeIsController) { - return IGNORABLE_PROPERTIES.matcher(key).matches() || IGNORABLE_CONTROLLER_PROPERTIES.matcher(key).matches(); - } else { + private boolean isIgnorableProperty(final String key) { + if (isController && !isBroker) { + // If this config is not relevant to controllers, ignore it for a pure controller + // ignorable properties are not included in the controller relevant configs so no need to check it as well + return !CONTROLLER_RELEVANT_CONFIGS.contains(key); + } else if (isController) { return IGNORABLE_PROPERTIES.matcher(key).matches(); + } else { + // If node is not a KRaft controller, ignore KRaft controller config properties. + return IGNORABLE_PROPERTIES.matcher(key).matches() || IGNORABLE_CONTROLLER_PROPERTIES.matcher(key).matches(); } } /** * Computes diff between two maps. Entries in IGNORABLE_PROPERTIES are skipped - * @param brokerNodeRef broker node reference of compared broker + * @param nodeRef node reference of compared node * @param desired desired configuration, may be null if the related ConfigMap does not exist yet or no changes are required - * @param brokerConfigs current configuration + * @param nodeConfigs current configuration * @param configModel default configuration for {@code kafkaVersion} of broker * @return Collection of AlterConfigOp containing all entries which were changed from current in desired configuration */ - private Collection diff(NodeRef brokerNodeRef, String desired, - Config brokerConfigs, + private Collection diff(NodeRef nodeRef, String desired, + Config nodeConfigs, Map configModel) { - if (brokerConfigs == null || desired == null) { + if (nodeConfigs == null || desired == null) { return Collections.emptyList(); } Map currentMap; Collection updatedCE = new ArrayList<>(); - currentMap = brokerConfigs.entries().stream().collect( + currentMap = nodeConfigs.entries().stream().collect( Collectors.toMap( ConfigEntry::name, configEntry -> configEntry.value() == null ? "null" : configEntry.value())); @@ -165,7 +310,7 @@ private Collection diff(NodeRef brokerNodeRef, String desired, String pathValue = d.get("path").asText(); String pathValueWithoutSlash = pathValue.substring(1); - Optional optEntry = brokerConfigs.entries().stream() + Optional optEntry = nodeConfigs.entries().stream() .filter(configEntry -> configEntry.name().equals(pathValueWithoutSlash)) .findFirst(); @@ -173,25 +318,25 @@ private Collection diff(NodeRef brokerNodeRef, String desired, if (optEntry.isPresent()) { ConfigEntry entry = optEntry.get(); if ("remove".equals(op)) { - removeProperty(configModel, updatedCE, pathValueWithoutSlash, entry, brokerNodeRef.controller()); + removeProperty(configModel, updatedCE, pathValueWithoutSlash, entry); } else if ("replace".equals(op)) { // entry is in the current, desired is updated value - updateOrAdd(entry.name(), configModel, desiredMap, updatedCE, brokerNodeRef.controller()); + updateOrAdd(entry.name(), configModel, desiredMap, updatedCE); } } else { if ("add".equals(op)) { // entry is not in the current, it is added - updateOrAdd(pathValueWithoutSlash, configModel, desiredMap, updatedCE, brokerNodeRef.controller()); + updateOrAdd(pathValueWithoutSlash, configModel, desiredMap, updatedCE); } } if ("remove".equals(op)) { // there is a lot of properties set by default - not having them in desired causes very noisy log output - LOGGER.traceCr(reconciliation, "Kafka Broker {} Config Differs : {}", brokerNodeRef.nodeId(), d); + LOGGER.traceCr(reconciliation, "Kafka Broker {} Config Differs : {}", nodeRef.nodeId(), d); LOGGER.traceCr(reconciliation, "Current Kafka Broker Config path {} has value {}", pathValueWithoutSlash, lookupPath(source, pathValue)); LOGGER.traceCr(reconciliation, "Desired Kafka Broker Config path {} has value {}", pathValueWithoutSlash, lookupPath(target, pathValue)); } else { - LOGGER.debugCr(reconciliation, "Kafka Broker {} Config Differs : {}", brokerNodeRef.nodeId(), d); + LOGGER.debugCr(reconciliation, "Kafka Broker {} Config Differs : {}", nodeRef.nodeId(), d); LOGGER.debugCr(reconciliation, "Current Kafka Broker Config path {} has value {}", pathValueWithoutSlash, lookupPath(source, pathValue)); LOGGER.debugCr(reconciliation, "Desired Kafka Broker Config path {} has value {}", pathValueWithoutSlash, lookupPath(target, pathValue)); } @@ -200,8 +345,8 @@ private Collection diff(NodeRef brokerNodeRef, String desired, return updatedCE; } - private void updateOrAdd(String propertyName, Map configModel, Map desiredMap, Collection updatedCE, boolean nodeIsController) { - if (!isIgnorableProperty(propertyName, nodeIsController)) { + private void updateOrAdd(String propertyName, Map configModel, Map desiredMap, Collection updatedCE) { + if (!isIgnorableProperty(propertyName)) { if (KafkaConfiguration.isCustomConfigurationOption(propertyName, configModel)) { LOGGER.traceCr(reconciliation, "custom property {} has been updated/added {}", propertyName, desiredMap.get(propertyName)); } else { @@ -213,7 +358,7 @@ private void updateOrAdd(String propertyName, Map configMod } } - private void removeProperty(Map configModel, Collection updatedCE, String pathValueWithoutSlash, ConfigEntry entry, boolean nodeIsController) { + private void removeProperty(Map configModel, Collection updatedCE, String pathValueWithoutSlash, ConfigEntry entry) { if (KafkaConfiguration.isCustomConfigurationOption(entry.name(), configModel)) { // we are deleting custom option LOGGER.traceCr(reconciliation, "removing custom property {}", entry.name()); @@ -226,7 +371,7 @@ private void removeProperty(Map configModel, Collection it was using non-default value and was removed // if the entry was custom, it should be deleted - if (!isIgnorableProperty(pathValueWithoutSlash, nodeIsController)) { + if (!isIgnorableProperty(pathValueWithoutSlash)) { updatedCE.add(new AlterConfigOp(new ConfigEntry(pathValueWithoutSlash, null), AlterConfigOp.OpType.DELETE)); LOGGER.infoCr(reconciliation, "{} not set in desired, unsetting back to default {}", entry.name(), "deleted entry"); } else { @@ -240,6 +385,6 @@ private void removeProperty(Map configModel, Collection diff; private final Reconciliation reconciliation; @@ -38,7 +38,7 @@ public class KafkaBrokerLoggingConfigurationDiff extends AbstractJsonDiff { * @param brokerConfigs Current broker configuration from Kafka Admin API * @param desired Desired logging configuration */ - protected KafkaBrokerLoggingConfigurationDiff(Reconciliation reconciliation, Config brokerConfigs, String desired) { + protected KafkaNodeLoggingConfigurationDiff(Reconciliation reconciliation, Config brokerConfigs, String desired) { this.reconciliation = reconciliation; this.diff = diff(desired, brokerConfigs); } diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaRoller.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaRoller.java index 8356e07384e..977127c3add 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaRoller.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaRoller.java @@ -315,8 +315,8 @@ protected static class RestartContext { boolean needsReconfig; boolean forceRestart; boolean podStuck; - KafkaBrokerConfigurationDiff brokerConfigDiff; - KafkaBrokerLoggingConfigurationDiff brokerLoggingDiff; + KafkaNodeConfigurationDiff nodeConfigDiff; + KafkaNodeLoggingConfigurationDiff nodeLoggingDiff; KafkaQuorumCheck quorumCheck; RestartContext(Supplier backOffSupplier) { @@ -558,7 +558,7 @@ private boolean maybeDynamicUpdateBrokerConfig(NodeRef nodeRef, RestartContext r if (restartContext.needsReconfig) { try { - dynamicUpdateBrokerConfig(nodeRef, brokerAdminClient, restartContext.brokerConfigDiff, restartContext.brokerLoggingDiff); + dynamicUpdateBrokerConfig(nodeRef, restartContext.nodeConfigDiff, restartContext.nodeLoggingDiff); updatedDynamically = true; } catch (ForceableProblem e) { LOGGER.debugCr(reconciliation, "Pod {} could not be updated dynamically ({}), will restart", nodeRef, e); @@ -581,8 +581,8 @@ private void markRestartContextWithForceRestart(RestartContext restartContext) { restartContext.needsRestart = false; restartContext.needsReconfig = false; restartContext.forceRestart = true; - restartContext.brokerConfigDiff = null; - restartContext.brokerLoggingDiff = null; + restartContext.nodeConfigDiff = null; + restartContext.nodeLoggingDiff = null; } /** @@ -606,8 +606,8 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont } boolean needsRestart = reasonToRestartPod.shouldRestart(); - KafkaBrokerConfigurationDiff brokerConfigDiff = null; - KafkaBrokerLoggingConfigurationDiff brokerLoggingDiff = null; + KafkaNodeConfigurationDiff nodeConfigDiff = null; + KafkaNodeLoggingConfigurationDiff nodeLoggingDiff = null; boolean needsReconfig = false; // if it is a pure controller, initialise the admin client specifically for controllers @@ -631,50 +631,51 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont if (isController) { restartContext.quorumCheck = quorumCheck(brokerAdminClient, nodeRef); } + } - // Always get the broker config. This request gets sent to that specific broker, so it's a proof that we can - // connect to the broker and that it's capable of responding. - Config brokerConfig; - try { - brokerConfig = brokerConfig(nodeRef); - } catch (ForceableProblem e) { - if (restartContext.backOff.done()) { - needsRestart = true; - brokerConfig = null; - } else { - throw e; - } + // Always get the node config. This request gets sent to that specific node, so it's a proof that we can + // connect to the node and that it's capable of responding. + Config nodeConfig; + try { + System.out.println("TINA Getting node config for " + nodeRef.nodeId()); + nodeConfig = nodeConfig(nodeRef, isController && !isBroker); + } catch (ForceableProblem e) { + if (restartContext.backOff.done()) { + needsRestart = true; + nodeConfig = null; + } else { + throw e; } + } - if (!needsRestart && allowReconfiguration) { - LOGGER.traceCr(reconciliation, "Pod {}: description {}", nodeRef, brokerConfig); - brokerConfigDiff = new KafkaBrokerConfigurationDiff(reconciliation, brokerConfig, kafkaConfigProvider.apply(nodeRef.nodeId()), kafkaVersion, nodeRef); - brokerLoggingDiff = logging(nodeRef); + if (!needsRestart && allowReconfiguration) { + LOGGER.traceCr(reconciliation, "Pod {}: description {}", nodeRef, nodeConfig); + nodeConfigDiff = new KafkaNodeConfigurationDiff(reconciliation, nodeConfig, kafkaConfigProvider.apply(nodeRef.nodeId()), kafkaVersion, nodeRef, isController, isBroker); + nodeLoggingDiff = logging(nodeRef, isController && !isBroker); - if (brokerConfigDiff.getDiffSize() > 0) { - if (brokerConfigDiff.canBeUpdatedDynamically()) { - LOGGER.debugCr(reconciliation, "Pod {} needs to be reconfigured.", nodeRef); - needsReconfig = true; - } else { - LOGGER.infoCr(reconciliation, "Pod {} needs to be restarted, dynamic update cannot be done.", nodeRef); - restartContext.restartReasons.add(RestartReason.CONFIG_CHANGE_REQUIRES_RESTART); - needsRestart = true; - } - } - - // needsRestart value might have changed from the check in the parent if. So we need to check it again. - if (!needsRestart && brokerLoggingDiff.getDiffSize() > 0) { - LOGGER.debugCr(reconciliation, "Pod {} logging needs to be reconfigured.", nodeRef); + if (nodeConfigDiff.getDiffSize() > 0) { + if (nodeConfigDiff.canBeUpdatedDynamically()) { + LOGGER.debugCr(reconciliation, "Pod {} needs to be reconfigured.", nodeRef); needsReconfig = true; + } else { + LOGGER.infoCr(reconciliation, "Pod {} needs to be restarted, dynamic update cannot be done.", nodeRef); + restartContext.restartReasons.add(RestartReason.CONFIG_CHANGE_REQUIRES_RESTART); + needsRestart = true; } } + + // needsRestart value might have changed from the check in the parent if. So we need to check it again. + if (!needsRestart && nodeLoggingDiff.getDiffSize() > 0) { + LOGGER.debugCr(reconciliation, "Pod {} logging needs to be reconfigured.", nodeRef); + needsReconfig = true; + } } restartContext.needsRestart = needsRestart; restartContext.needsReconfig = needsReconfig; restartContext.forceRestart = false; - restartContext.brokerConfigDiff = brokerConfigDiff; - restartContext.brokerLoggingDiff = brokerLoggingDiff; + restartContext.nodeConfigDiff = nodeConfigDiff; + restartContext.nodeLoggingDiff = nodeLoggingDiff; } private void handleFailedAdminClientForController(NodeRef nodeRef, RestartContext restartContext, RestartReasons reasonToRestartPod, String currentVersion) throws UnforceableProblem { @@ -697,59 +698,77 @@ private void handleFailedAdminClientForController(NodeRef nodeRef, RestartContex * @param nodeRef The reference of the broker. * @return a Future which completes with the config of the given broker. */ - /* test */ Config brokerConfig(NodeRef nodeRef) throws ForceableProblem, InterruptedException { + /* test */ Config nodeConfig(NodeRef nodeRef, boolean isPureController) throws ForceableProblem, InterruptedException { ConfigResource resource = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(nodeRef.nodeId())); - return await(VertxUtil.kafkaFutureToVertxFuture(reconciliation, vertx, brokerAdminClient.describeConfigs(singletonList(resource)).values().get(resource)), - 30, TimeUnit.SECONDS, - error -> new ForceableProblem("Error getting broker config", error) - ); + if (isPureController) { + System.out.println("Getting controller config"); + return await(VertxUtil.kafkaFutureToVertxFuture(reconciliation, vertx, controllerAdminClient.describeConfigs(singletonList(resource)).values().get(resource)), + 30, TimeUnit.SECONDS, + error -> new ForceableProblem("Error getting controller config: " + error, error) + ); + } else { + System.out.println("Getting broker config"); + return await(VertxUtil.kafkaFutureToVertxFuture(reconciliation, vertx, brokerAdminClient.describeConfigs(singletonList(resource)).values().get(resource)), + 30, TimeUnit.SECONDS, + error -> new ForceableProblem("Error getting broker config: " + error, error) + ); + } + } /** - * Returns logging of the given broker. - * @param brokerId The id of the broker. - * @return a Future which completes with the logging of the given broker. + * Returns logging of the given node. + * @param nodeId The id of the node. + * @return a Future which completes with the logging of the given node. */ - /* test */ Config brokerLogging(int brokerId) throws ForceableProblem, InterruptedException { - ConfigResource resource = Util.getBrokersLogging(brokerId); - return await(VertxUtil.kafkaFutureToVertxFuture(reconciliation, vertx, brokerAdminClient.describeConfigs(singletonList(resource)).values().get(resource)), - 30, TimeUnit.SECONDS, - error -> new ForceableProblem("Error getting broker logging", error) - ); + /* test */ Config nodeLogging(int nodeId, boolean isPureController) throws ForceableProblem, InterruptedException { + ConfigResource resource = Util.getBrokersLogging(nodeId); + if (isPureController) { + return await(VertxUtil.kafkaFutureToVertxFuture(reconciliation, vertx, controllerAdminClient.describeConfigs(singletonList(resource)).values().get(resource)), + 30, TimeUnit.SECONDS, + error -> new ForceableProblem("Error getting controller logging", error) + ); + } else { + return await(VertxUtil.kafkaFutureToVertxFuture(reconciliation, vertx, brokerAdminClient.describeConfigs(singletonList(resource)).values().get(resource)), + 30, TimeUnit.SECONDS, + error -> new ForceableProblem("Error getting broker logging", error) + ); + } } - /* test */ void dynamicUpdateBrokerConfig(NodeRef nodeRef, Admin ac, KafkaBrokerConfigurationDiff configurationDiff, KafkaBrokerLoggingConfigurationDiff logDiff) + /* test */ void dynamicUpdateBrokerConfig(NodeRef nodeRef, KafkaNodeConfigurationDiff configurationDiff, KafkaNodeLoggingConfigurationDiff logDiff) throws ForceableProblem, InterruptedException { Map> updatedConfig = new HashMap<>(2); var podId = nodeRef.nodeId(); updatedConfig.put(Util.getBrokersConfig(podId), configurationDiff.getConfigDiff()); updatedConfig.put(Util.getBrokersLogging(podId), logDiff.getLoggingDiff()); - LOGGER.debugCr(reconciliation, "Updating broker configuration {}", nodeRef); - LOGGER.traceCr(reconciliation, "Updating broker configuration {} with {}", nodeRef, updatedConfig); + LOGGER.debugCr(reconciliation, "Updating node configuration {}", nodeRef); + LOGGER.traceCr(reconciliation, "Updating node configuration {} with {}", nodeRef, updatedConfig); + Admin ac = nodeRef.controller() && !nodeRef.broker() ? controllerAdminClient : brokerAdminClient; AlterConfigsResult alterConfigResult = ac.incrementalAlterConfigs(updatedConfig); KafkaFuture brokerConfigFuture = alterConfigResult.values().get(Util.getBrokersConfig(podId)); KafkaFuture brokerLoggingConfigFuture = alterConfigResult.values().get(Util.getBrokersLogging(podId)); await(VertxUtil.kafkaFutureToVertxFuture(reconciliation, vertx, brokerConfigFuture), 30, TimeUnit.SECONDS, error -> { - LOGGER.errorCr(reconciliation, "Error updating broker configuration for pod {}", nodeRef, error); + LOGGER.errorCr(reconciliation, "Error updating node configuration for pod {}", nodeRef, error); return new ForceableProblem("Error updating broker configuration for pod " + nodeRef, error); }); await(VertxUtil.kafkaFutureToVertxFuture(reconciliation, vertx, brokerLoggingConfigFuture), 30, TimeUnit.SECONDS, error -> { - LOGGER.errorCr(reconciliation, "Error updating broker logging configuration pod {}", nodeRef, error); + LOGGER.errorCr(reconciliation, "Error updating node logging configuration pod {}", nodeRef, error); return new ForceableProblem("Error updating broker logging configuration pod " + nodeRef, error); }); LOGGER.infoCr(reconciliation, "Dynamic update of pod {} was successful.", nodeRef); } - private KafkaBrokerLoggingConfigurationDiff logging(NodeRef nodeRef) + private KafkaNodeLoggingConfigurationDiff logging(NodeRef nodeRef, boolean isPureController) throws ForceableProblem, InterruptedException { - Config brokerLogging = brokerLogging(nodeRef.nodeId()); + Config brokerLogging = nodeLogging(nodeRef.nodeId(), isPureController); LOGGER.traceCr(reconciliation, "Pod {}: logging description {}", nodeRef, brokerLogging); - return new KafkaBrokerLoggingConfigurationDiff(reconciliation, brokerLogging, kafkaLogging); + return new KafkaNodeLoggingConfigurationDiff(reconciliation, brokerLogging, kafkaLogging); } /** Exceptions which we're prepared to ignore (thus forcing a restart) in some circumstances. */ diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorKRaftMockTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorKRaftMockTest.java index 62f25ad8ec8..c1e69579858 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorKRaftMockTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorKRaftMockTest.java @@ -711,162 +711,4 @@ public void testReconcileAddAndRemovePool(VertxTestContext context) { async.flag(); }))); } - - /** - * Tests how the KRaft controller-only nodes have their configuration changes tracked using a Pod annotations. The - * annotation on controller-only pods should change when the controller-relevant config is changed. On broker pods - * it should never change. To test this, the test does 3 reconciliations: - * - First initial one to establish the pods and collects the annotations - * - Second with change that is not relevant to controllers => annotations should be the same for all nodes as - * before - * - Third with change to a controller-relevant option => annotations for controller nodes should change, for - * broker nodes should be the same - * - * @param context Test context - */ - @Test - public void testReconcileWithControllerRelevantConfigChange(VertxTestContext context) { - Checkpoint async = context.checkpoint(); - - Map brokerConfigurationAnnotations = new HashMap<>(); - - operator.reconcile(new Reconciliation("initial-trigger", Kafka.RESOURCE_KIND, namespace, CLUSTER_NAME)) - .onComplete(context.succeeding(v -> context.verify(() -> { - // Collect the configuration annotations - StrimziPodSet spsControllers = supplier.strimziPodSetOperator.client().inNamespace(namespace).withName(CLUSTER_NAME + "-controllers").get(); - assertThat(spsControllers, is(notNullValue())); - - spsControllers.getSpec().getPods().stream().map(PodSetUtils::mapToPod).forEach(pod -> brokerConfigurationAnnotations.put(pod.getMetadata().getName(), pod.getMetadata().getAnnotations().get(KafkaCluster.ANNO_STRIMZI_BROKER_CONFIGURATION_HASH))); - - StrimziPodSet spsBrokers = supplier.strimziPodSetOperator.client().inNamespace(namespace).withName(CLUSTER_NAME + "-brokers").get(); - assertThat(spsBrokers, is(notNullValue())); - - spsBrokers.getSpec().getPods().stream().map(PodSetUtils::mapToPod).forEach(pod -> brokerConfigurationAnnotations.put(pod.getMetadata().getName(), pod.getMetadata().getAnnotations().get(KafkaCluster.ANNO_STRIMZI_BROKER_CONFIGURATION_HASH))); - - // Update Kafka with dynamically changeable option that is not controller relevant => controller pod annotations should not change - Crds.kafkaOperation(client).inNamespace(namespace).withName(CLUSTER_NAME) - .edit(k -> new KafkaBuilder(k).editSpec().editKafka().addToConfig(Map.of("compression.type", "gzip")).endKafka().endSpec().build()); - }))) - .compose(v -> operator.reconcile(new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, namespace, CLUSTER_NAME))) - .onComplete(context.succeeding(v -> context.verify(() -> { - StrimziPodSet spsControllers = supplier.strimziPodSetOperator.client().inNamespace(namespace).withName(CLUSTER_NAME + "-controllers").get(); - assertThat(spsControllers, is(notNullValue())); - - spsControllers.getSpec().getPods().stream().map(PodSetUtils::mapToPod).forEach(pod -> { - // Controller annotations be the same - assertThat(pod.getMetadata().getAnnotations().get(KafkaCluster.ANNO_STRIMZI_BROKER_CONFIGURATION_HASH), is(brokerConfigurationAnnotations.get(pod.getMetadata().getName()))); - }); - - StrimziPodSet spsBrokers = supplier.strimziPodSetOperator.client().inNamespace(namespace).withName(CLUSTER_NAME + "-brokers").get(); - assertThat(spsBrokers, is(notNullValue())); - - spsBrokers.getSpec().getPods().stream().map(PodSetUtils::mapToPod).forEach(pod -> { - // Broker annotations should be the same - assertThat(pod.getMetadata().getAnnotations().get(KafkaCluster.ANNO_STRIMZI_BROKER_CONFIGURATION_HASH), is(brokerConfigurationAnnotations.get(pod.getMetadata().getName()))); - }); - - // Update Kafka with dynamically changeable controller relevant option => controller pod annotations should change - Crds.kafkaOperation(client).inNamespace(namespace).withName(CLUSTER_NAME) - .edit(k -> new KafkaBuilder(k).editSpec().editKafka().addToConfig(Map.of("max.connections", "1000")).endKafka().endSpec().build()); - }))) - .compose(v -> operator.reconcile(new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, namespace, CLUSTER_NAME))) - .onComplete(context.succeeding(v -> context.verify(() -> { - StrimziPodSet spsControllers = supplier.strimziPodSetOperator.client().inNamespace(namespace).withName(CLUSTER_NAME + "-controllers").get(); - assertThat(spsControllers, is(notNullValue())); - - spsControllers.getSpec().getPods().stream().map(PodSetUtils::mapToPod).forEach(pod -> { - // Controller annotations should differ - assertThat(pod.getMetadata().getAnnotations().get(KafkaCluster.ANNO_STRIMZI_BROKER_CONFIGURATION_HASH), is(not(brokerConfigurationAnnotations.get(pod.getMetadata().getName())))); - }); - - StrimziPodSet spsBrokers = supplier.strimziPodSetOperator.client().inNamespace(namespace).withName(CLUSTER_NAME + "-brokers").get(); - assertThat(spsBrokers, is(notNullValue())); - - spsBrokers.getSpec().getPods().stream().map(PodSetUtils::mapToPod).forEach(pod -> { - // Broker annotations should be the same - assertThat(pod.getMetadata().getAnnotations().get(KafkaCluster.ANNO_STRIMZI_BROKER_CONFIGURATION_HASH), is(brokerConfigurationAnnotations.get(pod.getMetadata().getName()))); - }); - - async.flag(); - }))); - } - - /** - * Tests how the KRaft controller-only nodes have their logging config changes tracked using a Pod annotations. The - * annotation on controller-only pods should have a hash based on a complete logging configuration. On broker pods - * it should have only the options not dynamically configurable. To test this, the test does 3 reconciliations: - * - First initial one to establish the pods and collects the annotations - * - Second with change that is not relevant to brokers => annotations should change for controllers but not for - * broker - * - Third with change to a logging appender => annotations for controller nodes should change, and so should - * the annotation for brokers as appenders are not dynamically configurable - * - * @param context Test context - */ - @Test - public void testReconcileWithControllerRelevantLoggingChange(VertxTestContext context) { - Checkpoint async = context.checkpoint(); - - Map loggingConfigurationAnnotations = new HashMap<>(); - - operator.reconcile(new Reconciliation("initial-trigger", Kafka.RESOURCE_KIND, namespace, CLUSTER_NAME)) - .onComplete(context.succeeding(v -> context.verify(() -> { - // Collect the configuration annotations - StrimziPodSet spsControllers = supplier.strimziPodSetOperator.client().inNamespace(namespace).withName(CLUSTER_NAME + "-controllers").get(); - assertThat(spsControllers, is(notNullValue())); - - spsControllers.getSpec().getPods().stream().map(PodSetUtils::mapToPod).forEach(pod -> loggingConfigurationAnnotations.put(pod.getMetadata().getName(), pod.getMetadata().getAnnotations().get(Annotations.ANNO_STRIMZI_LOGGING_HASH))); - - StrimziPodSet spsBrokers = supplier.strimziPodSetOperator.client().inNamespace(namespace).withName(CLUSTER_NAME + "-brokers").get(); - assertThat(spsBrokers, is(notNullValue())); - - spsBrokers.getSpec().getPods().stream().map(PodSetUtils::mapToPod).forEach(pod -> loggingConfigurationAnnotations.put(pod.getMetadata().getName(), pod.getMetadata().getAnnotations().get(Annotations.ANNO_STRIMZI_LOGGING_HASH))); - - // Update Kafka and change the log level => only controller pod annotations should change - Crds.kafkaOperation(client).inNamespace(namespace).withName(CLUSTER_NAME) - .edit(k -> new KafkaBuilder(k).editSpec().editKafka().withNewInlineLogging().withLoggers(Map.of("kafka.root.logger.level", "DEBUG")).endInlineLogging().endKafka().endSpec().build()); - }))) - .compose(v -> operator.reconcile(new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, namespace, CLUSTER_NAME))) - .onComplete(context.succeeding(v -> context.verify(() -> { - StrimziPodSet spsControllers = supplier.strimziPodSetOperator.client().inNamespace(namespace).withName(CLUSTER_NAME + "-controllers").get(); - assertThat(spsControllers, is(notNullValue())); - - spsControllers.getSpec().getPods().stream().map(PodSetUtils::mapToPod).forEach(pod -> { - // Controller annotations should differ - assertThat(pod.getMetadata().getAnnotations().get(Annotations.ANNO_STRIMZI_LOGGING_HASH), is(not(loggingConfigurationAnnotations.get(pod.getMetadata().getName())))); - }); - - StrimziPodSet spsBrokers = supplier.strimziPodSetOperator.client().inNamespace(namespace).withName(CLUSTER_NAME + "-brokers").get(); - assertThat(spsBrokers, is(notNullValue())); - - spsBrokers.getSpec().getPods().stream().map(PodSetUtils::mapToPod).forEach(pod -> { - // Broker annotations should be the same - assertThat(pod.getMetadata().getAnnotations().get(Annotations.ANNO_STRIMZI_LOGGING_HASH), is(loggingConfigurationAnnotations.get(pod.getMetadata().getName()))); - }); - - // Update Kafka and change appender => both controller and broker pod annotations should change - Crds.kafkaOperation(client).inNamespace(namespace).withName(CLUSTER_NAME) - .edit(k -> new KafkaBuilder(k).editSpec().editKafka().withNewInlineLogging().withLoggers(Map.of("log4j.appender.CONSOLE", "my.tls.MyAppender")).endInlineLogging().endKafka().endSpec().build()); - }))) - .compose(v -> operator.reconcile(new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, namespace, CLUSTER_NAME))) - .onComplete(context.succeeding(v -> context.verify(() -> { - StrimziPodSet spsControllers = supplier.strimziPodSetOperator.client().inNamespace(namespace).withName(CLUSTER_NAME + "-controllers").get(); - assertThat(spsControllers, is(notNullValue())); - - spsControllers.getSpec().getPods().stream().map(PodSetUtils::mapToPod).forEach(pod -> { - // Controller annotations should differ - assertThat(pod.getMetadata().getAnnotations().get(Annotations.ANNO_STRIMZI_LOGGING_HASH), is(not(loggingConfigurationAnnotations.get(pod.getMetadata().getName())))); - }); - - StrimziPodSet spsBrokers = supplier.strimziPodSetOperator.client().inNamespace(namespace).withName(CLUSTER_NAME + "-brokers").get(); - assertThat(spsBrokers, is(notNullValue())); - - spsBrokers.getSpec().getPods().stream().map(PodSetUtils::mapToPod).forEach(pod -> { - // Broker annotations should differ - assertThat(pod.getMetadata().getAnnotations().get(Annotations.ANNO_STRIMZI_LOGGING_HASH), is(not(loggingConfigurationAnnotations.get(pod.getMetadata().getName())))); - }); - - async.flag(); - }))); - } } diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaBrokerConfigurationDiffTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaNodeConfigurationDiffTest.java similarity index 65% rename from cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaBrokerConfigurationDiffTest.java rename to cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaNodeConfigurationDiffTest.java index 6024885a6c9..98b12e99728 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaBrokerConfigurationDiffTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaNodeConfigurationDiffTest.java @@ -31,7 +31,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.fail; -public class KafkaBrokerConfigurationDiffTest { +public class KafkaNodeConfigurationDiffTest { KafkaVersion kafkaVersion = KafkaVersionTestUtils.getKafkaVersionLookup().defaultVersion(); private final NodeRef nodeRef = new NodeRef("broker-0", 0, "broker", false, true); @@ -85,7 +85,7 @@ private Config getCurrentConfiguration(List additional) { return new Config(entryList); } - private void assertConfig(KafkaBrokerConfigurationDiff kcd, ConfigEntry ce) { + private void assertConfig(KafkaNodeConfigurationDiff kcd, ConfigEntry ce) { Collection brokerDiffConf = kcd.getConfigDiff(); long appearances = brokerDiffConf.stream().filter(entry -> entry.configEntry().name().equals(ce.name())).count(); Optional en = brokerDiffConf.stream().filter(entry -> entry.configEntry().name().equals(ce.name())).findFirst(); @@ -99,7 +99,8 @@ private void assertConfig(KafkaBrokerConfigurationDiff kcd, ConfigEntry ce) { public void testCustomPropertyAdded() { ArrayList ces = new ArrayList<>(); ces.add(new ConfigEntry("custom.property", "42")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(new ArrayList<>()), getDesiredConfiguration(ces), kafkaVersion, nodeRef); + KafkaNodeConfigurationDiff kcd = new KafkaNodeConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(new ArrayList<>()), + getDesiredConfiguration(ces), kafkaVersion, nodeRef, nodeRef.controller(), nodeRef.broker()); assertThat(kcd.getDiffSize(), is(0)); assertThat(kcd.canBeUpdatedDynamically(), is(true)); @@ -108,8 +109,8 @@ public void testCustomPropertyAdded() { @Test public void testCustomPropertyRemoved() { List ces = singletonList(new ConfigEntry("custom.property", "42")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(ces), - getDesiredConfiguration(emptyList()), kafkaVersion, nodeRef); + KafkaNodeConfigurationDiff kcd = new KafkaNodeConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(ces), + getDesiredConfiguration(emptyList()), kafkaVersion, nodeRef, nodeRef.controller(), nodeRef.broker()); assertThat(kcd.getDiffSize(), is(0)); assertThat(kcd.canBeUpdatedDynamically(), is(true)); // custom changes are applied by changing STS @@ -118,8 +119,8 @@ public void testCustomPropertyRemoved() { @Test public void testCustomPropertyKept() { List ces = singletonList(new ConfigEntry("custom.property", "42")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(ces), - getDesiredConfiguration(ces), kafkaVersion, nodeRef); + KafkaNodeConfigurationDiff kcd = new KafkaNodeConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(ces), + getDesiredConfiguration(ces), kafkaVersion, nodeRef, nodeRef.controller(), nodeRef.broker()); assertThat(kcd.getDiffSize(), is(0)); assertThat(kcd.canBeUpdatedDynamically(), is(true)); } @@ -128,8 +129,8 @@ public void testCustomPropertyKept() { public void testCustomPropertyChanged() { List ces = singletonList(new ConfigEntry("custom.property", "42")); List ces2 = singletonList(new ConfigEntry("custom.property", "43")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(ces), - getDesiredConfiguration(ces2), kafkaVersion, nodeRef); + KafkaNodeConfigurationDiff kcd = new KafkaNodeConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(ces), + getDesiredConfiguration(ces2), kafkaVersion, nodeRef, nodeRef.controller(), nodeRef.broker()); assertThat(kcd.getDiffSize(), is(0)); assertThat(kcd.canBeUpdatedDynamically(), is(true)); } @@ -137,8 +138,8 @@ public void testCustomPropertyChanged() { @Test public void testChangedPresentValue() { List ces = singletonList(new ConfigEntry("min.insync.replicas", "1")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), - getDesiredConfiguration(ces), kafkaVersion, nodeRef); + KafkaNodeConfigurationDiff kcd = new KafkaNodeConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), + getDesiredConfiguration(ces), kafkaVersion, nodeRef, nodeRef.controller(), nodeRef.broker()); assertThat(kcd.getDiffSize(), is(1)); assertThat(kcd.canBeUpdatedDynamically(), is(true)); assertConfig(kcd, new ConfigEntry("min.insync.replicas", "1")); @@ -147,36 +148,67 @@ public void testChangedPresentValue() { @Test public void testChangedPresentValueToDefault() { List ces = singletonList(new ConfigEntry("min.insync.replicas", "2")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), - getDesiredConfiguration(ces), kafkaVersion, nodeRef); + KafkaNodeConfigurationDiff kcd = new KafkaNodeConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), + getDesiredConfiguration(ces), kafkaVersion, nodeRef, nodeRef.controller(), nodeRef.broker()); assertThat(kcd.getDiffSize(), is(0)); assertThat(kcd.canBeUpdatedDynamically(), is(true)); } @Test - public void testChangedKRaftControllerConfig() { + public void testChangedControllerConfigForBrokerNode() { List desiredControllerConfig = singletonList(new ConfigEntry("controller.quorum.election.timeout.ms", "5000")); List currentControllerConfig = singletonList(new ConfigEntry("controller.quorum.election.timeout.ms", "1000")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(currentControllerConfig), - getDesiredConfiguration(desiredControllerConfig), kafkaVersion, nodeRef); + KafkaNodeConfigurationDiff kcd = new KafkaNodeConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(currentControllerConfig), + getDesiredConfiguration(desiredControllerConfig), kafkaVersion, nodeRef, nodeRef.controller(), nodeRef.broker()); assertThat(kcd.getDiffSize(), is(0)); } @Test - public void testChangedKRaftControllerConfigForCombinedNode() { - NodeRef combinedNodeId = new NodeRef("broker-0", 0, "broker", true, true); + public void testChangedControllerConfigForCombinedNode() { + NodeRef combinedNodeId = new NodeRef("combined-0", 0, "combined", true, true); List desiredControllerConfig = singletonList(new ConfigEntry("controller.quorum.election.timeout.ms", "5000")); List currentControllerConfig = singletonList(new ConfigEntry("controller.quorum.election.timeout.ms", "1000")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(currentControllerConfig), - getDesiredConfiguration(desiredControllerConfig), kafkaVersion, combinedNodeId); + KafkaNodeConfigurationDiff kcd = new KafkaNodeConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(currentControllerConfig), + getDesiredConfiguration(desiredControllerConfig), kafkaVersion, combinedNodeId, true, true); assertThat(kcd.getDiffSize(), is(1)); } + @Test + public void testChangedBrokerConfigForCombinedNode() { + NodeRef combinedNodeId = new NodeRef("combined-0", 0, "combined", true, true); + List desiredControllerConfig = singletonList(new ConfigEntry("log.retention.hours", "72")); + List currentControllerConfig = singletonList(new ConfigEntry("log.retention.hours", "168")); + KafkaNodeConfigurationDiff kcd = new KafkaNodeConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(currentControllerConfig), + getDesiredConfiguration(desiredControllerConfig), kafkaVersion, combinedNodeId, true, true); + assertThat(kcd.getDiffSize(), is(1)); + } + + @Test + public void testChangedControllerConfigForControllerNode() { + NodeRef combinedNodeId = new NodeRef("controller-0", 0, "controller", true, false); + List desiredControllerConfig = singletonList(new ConfigEntry("controller.quorum.election.timeout.ms", "5000")); + List currentControllerConfig = singletonList(new ConfigEntry("controller.quorum.election.timeout.ms", "1000")); + KafkaNodeConfigurationDiff kcd = new KafkaNodeConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(currentControllerConfig), + getDesiredConfiguration(desiredControllerConfig), kafkaVersion, combinedNodeId, true, false); + assertThat(kcd.getDiffSize(), is(1)); + } + + @Test + public void testChangedBrokerConfigForControllerNode() { + NodeRef combinedNodeId = new NodeRef("controller-0", 0, "controller", true, false); + List desiredControllerConfig = singletonList(new ConfigEntry("log.retention.hours", "72")); + List currentControllerConfig = singletonList(new ConfigEntry("log.retention.hours", "168")); + KafkaNodeConfigurationDiff kcd = new KafkaNodeConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(currentControllerConfig), + getDesiredConfiguration(desiredControllerConfig), kafkaVersion, combinedNodeId, true, false); + assertThat(kcd.getDiffSize(), is(0)); + } + + @Test public void testChangedAdvertisedListener() { List ces = singletonList(new ConfigEntry("advertised.listeners", "karel")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), - getDesiredConfiguration(ces), kafkaVersion, nodeRef); + KafkaNodeConfigurationDiff kcd = new KafkaNodeConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), + getDesiredConfiguration(ces), kafkaVersion, nodeRef, nodeRef.controller(), nodeRef.broker()); assertThat(kcd.getDiffSize(), is(0)); assertThat(kcd.canBeUpdatedDynamically(), is(true)); } @@ -184,8 +216,8 @@ public void testChangedAdvertisedListener() { @Test public void testChangedAdvertisedListenerFromNothingToDefault() { List ces = singletonList(new ConfigEntry("advertised.listeners", "null")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(ces), - getDesiredConfiguration(ces), kafkaVersion, nodeRef); + KafkaNodeConfigurationDiff kcd = new KafkaNodeConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(ces), + getDesiredConfiguration(ces), kafkaVersion, nodeRef, nodeRef.controller(), nodeRef.broker()); assertThat(kcd.getDiffSize(), is(0)); assertThat(kcd.canBeUpdatedDynamically(), is(true)); } @@ -194,8 +226,8 @@ public void testChangedAdvertisedListenerFromNothingToDefault() { public void testChangedAdvertisedListenerFromNonDefaultToDefault() { // advertised listeners are filled after the pod started List ces = singletonList(new ConfigEntry("advertised.listeners", "null")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), - getDesiredConfiguration(ces), kafkaVersion, nodeRef); + KafkaNodeConfigurationDiff kcd = new KafkaNodeConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), + getDesiredConfiguration(ces), kafkaVersion, nodeRef, nodeRef.controller(), nodeRef.broker()); assertThat(kcd.getDiffSize(), is(0)); assertThat(kcd.canBeUpdatedDynamically(), is(true)); } @@ -203,8 +235,8 @@ public void testChangedAdvertisedListenerFromNonDefaultToDefault() { @Test public void testChangedZookeeperConnect() { List ces = singletonList(new ConfigEntry("zookeeper.connect", "karel")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), - getDesiredConfiguration(ces), kafkaVersion, nodeRef); + KafkaNodeConfigurationDiff kcd = new KafkaNodeConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), + getDesiredConfiguration(ces), kafkaVersion, nodeRef, nodeRef.controller(), nodeRef.broker()); assertThat(kcd.getDiffSize(), is(0)); assertThat(kcd.canBeUpdatedDynamically(), is(true)); } @@ -212,8 +244,8 @@ public void testChangedZookeeperConnect() { @Test public void testChangedLogDirs() { List ces = singletonList(new ConfigEntry("log.dirs", "/var/lib/kafka/data/karel")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), - getDesiredConfiguration(ces), kafkaVersion, nodeRef); + KafkaNodeConfigurationDiff kcd = new KafkaNodeConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), + getDesiredConfiguration(ces), kafkaVersion, nodeRef, nodeRef.controller(), nodeRef.broker()); assertThat(kcd.getDiffSize(), is(1)); assertThat(kcd.canBeUpdatedDynamically(), is(false)); assertConfig(kcd, new ConfigEntry("log.dirs", "/var/lib/kafka/data/karel")); @@ -222,8 +254,8 @@ public void testChangedLogDirs() { @Test public void testLogDirsNonDefaultToDefault() { List ces = singletonList(new ConfigEntry("log.dirs", "null")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), - getDesiredConfiguration(ces), kafkaVersion, nodeRef); + KafkaNodeConfigurationDiff kcd = new KafkaNodeConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), + getDesiredConfiguration(ces), kafkaVersion, nodeRef, nodeRef.controller(), nodeRef.broker()); assertThat(kcd.getDiffSize(), is(1)); assertThat(kcd.canBeUpdatedDynamically(), is(false)); assertConfig(kcd, new ConfigEntry("log.dirs", "null")); @@ -232,8 +264,8 @@ public void testLogDirsNonDefaultToDefault() { @Test public void testLogDirsDefaultToDefault() { List ces = singletonList(new ConfigEntry("log.dirs", "null")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(ces), - getDesiredConfiguration(ces), kafkaVersion, nodeRef); + KafkaNodeConfigurationDiff kcd = new KafkaNodeConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(ces), + getDesiredConfiguration(ces), kafkaVersion, nodeRef, nodeRef.controller(), nodeRef.broker()); assertThat(kcd.getDiffSize(), is(0)); assertThat(kcd.canBeUpdatedDynamically(), is(true)); } @@ -241,8 +273,8 @@ public void testLogDirsDefaultToDefault() { @Test public void testUnchangedLogDirs() { List ces = singletonList(new ConfigEntry("log.dirs", "null")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(ces), - getDesiredConfiguration(ces), kafkaVersion, nodeRef); + KafkaNodeConfigurationDiff kcd = new KafkaNodeConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(ces), + getDesiredConfiguration(ces), kafkaVersion, nodeRef, nodeRef.controller(), nodeRef.broker()); assertThat(kcd.getDiffSize(), is(0)); assertThat(kcd.canBeUpdatedDynamically(), is(true)); @@ -251,8 +283,8 @@ public void testUnchangedLogDirs() { @Test public void testChangedInterBrokerListenerName() { List ces = singletonList(new ConfigEntry("inter.broker.listener.name", "david")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), - getDesiredConfiguration(ces), kafkaVersion, nodeRef); + KafkaNodeConfigurationDiff kcd = new KafkaNodeConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), + getDesiredConfiguration(ces), kafkaVersion, nodeRef, nodeRef.controller(), nodeRef.broker()); assertThat(kcd.getDiffSize(), is(1)); assertThat(kcd.canBeUpdatedDynamically(), is(false)); } @@ -260,8 +292,8 @@ public void testChangedInterBrokerListenerName() { @Test public void testChangedListenerSecurityProtocolMap() { List ces = singletonList(new ConfigEntry("listener.security.protocol.map", "david")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), - getDesiredConfiguration(ces), kafkaVersion, nodeRef); + KafkaNodeConfigurationDiff kcd = new KafkaNodeConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), + getDesiredConfiguration(ces), kafkaVersion, nodeRef, nodeRef.controller(), nodeRef.broker()); assertThat(kcd.getDiffSize(), is(1)); assertThat(kcd.canBeUpdatedDynamically(), is(true)); } @@ -270,8 +302,8 @@ public void testChangedListenerSecurityProtocolMap() { public void testChangedListenerSecurityProtocolMapFromNonDefault() { List ces = singletonList(new ConfigEntry("listener.security.protocol.map", "REPLICATION-9091:SSL,PLAIN-9092:SASL_PLAINTEXT,TLS-9093:SSL,EXTERNAL-9094:SSL")); List ces2 = singletonList(new ConfigEntry("listener.security.protocol.map", "REPLICATION-9091:SSL,PLAIN-9092:SASL_PLAINTEXT,TLS-9093:SSL")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(ces), - getDesiredConfiguration(ces2), kafkaVersion, nodeRef); + KafkaNodeConfigurationDiff kcd = new KafkaNodeConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(ces), + getDesiredConfiguration(ces2), kafkaVersion, nodeRef, nodeRef.controller(), nodeRef.broker()); assertThat(kcd.getDiffSize(), is(1)); assertThat(kcd.canBeUpdatedDynamically(), is(true)); assertConfig(kcd, new ConfigEntry("listener.security.protocol.map", "REPLICATION-9091:SSL,PLAIN-9092:SASL_PLAINTEXT,TLS-9093:SSL")); @@ -284,8 +316,8 @@ public void testChangedMoreProperties() { ces.add(new ConfigEntry("inter.broker.listener.name", "david")); ces.add(new ConfigEntry("group.min.session.timeout.ms", "42")); ces.add(new ConfigEntry("auto.create.topics.enable", "false")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), - getDesiredConfiguration(ces), kafkaVersion, nodeRef); + KafkaNodeConfigurationDiff kcd = new KafkaNodeConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), + getDesiredConfiguration(ces), kafkaVersion, nodeRef, nodeRef.controller(), nodeRef.broker()); assertThat(kcd.getDiffSize(), is(3)); assertThat(kcd.canBeUpdatedDynamically(), is(false)); } @@ -294,8 +326,8 @@ public void testChangedMoreProperties() { public void testRemoveDefaultPropertyWhichIsNotDefault() { // it is not seen as default because the ConfigEntry.ConfigSource.DEFAULT_CONFIG is not set List ces = singletonList(new ConfigEntry("log.retention.hours", "168")); - KafkaBrokerConfigurationDiff kcd = new KafkaBrokerConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(ces), - getDesiredConfiguration(emptyList()), kafkaVersion, nodeRef); + KafkaNodeConfigurationDiff kcd = new KafkaNodeConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(ces), + getDesiredConfiguration(emptyList()), kafkaVersion, nodeRef, nodeRef.controller(), nodeRef.broker()); assertThat(kcd.getDiffSize(), is(1)); assertThat(kcd.canBeUpdatedDynamically(), is(false)); } diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaBrokerLoggingConfigurationDiffTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaNodeLoggingConfigurationDiffTest.java similarity index 93% rename from cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaBrokerLoggingConfigurationDiffTest.java rename to cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaNodeLoggingConfigurationDiffTest.java index dda991d117a..38ed37056fb 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaBrokerLoggingConfigurationDiffTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaNodeLoggingConfigurationDiffTest.java @@ -20,7 +20,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -public class KafkaBrokerLoggingConfigurationDiffTest { +public class KafkaNodeLoggingConfigurationDiffTest { private String getDesiredConfiguration(List additional) { StringBuilder desiredConfigString = new StringBuilder(); @@ -51,7 +51,7 @@ private Config getCurrentConfiguration(List additional) { @Test public void testReplaceRootLogger() { - KafkaBrokerLoggingConfigurationDiff klcd = new KafkaBrokerLoggingConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), getDesiredConfiguration(emptyList())); + KafkaNodeLoggingConfigurationDiff klcd = new KafkaNodeLoggingConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, getCurrentConfiguration(emptyList()), getDesiredConfiguration(emptyList())); assertThat(klcd.getDiffSize(), is(0)); } @@ -63,7 +63,7 @@ public void testDiffUsingLoggerInheritance() { // Prepare currentConfig Config currentConfig = getRealisticConfig(); - KafkaBrokerLoggingConfigurationDiff diff = new KafkaBrokerLoggingConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, currentConfig, desiredConfig); + KafkaNodeLoggingConfigurationDiff diff = new KafkaNodeLoggingConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, currentConfig, desiredConfig); assertThat(diff.getLoggingDiff(), is(getRealisticConfigDiff())); } @@ -162,7 +162,7 @@ public void testExpansion() { "log4j.logger.kafka.authorizer.logger=INFO\n" + "monitorInterval=30\n"; - KafkaBrokerLoggingConfigurationDiff kdiff = new KafkaBrokerLoggingConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, null, null); + KafkaNodeLoggingConfigurationDiff kdiff = new KafkaNodeLoggingConfigurationDiff(Reconciliation.DUMMY_RECONCILIATION, null, null); Map res = kdiff.readLog4jConfig(input); assertThat(res.get("root"), is("INFO")); diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaRollerTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaRollerTest.java index 4dee8386c23..ec8ff849e58 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaRollerTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaRollerTest.java @@ -1047,7 +1047,7 @@ int controller(NodeRef nodeRef, long timeout, TimeUnit unit, RestartContext rest } @Override - protected Config brokerConfig(NodeRef nodeRef) throws ForceableProblem { + protected Config nodeConfig(NodeRef nodeRef, boolean isPureController) throws ForceableProblem { ForceableProblem problem = getConfigsException.apply(nodeRef.nodeId()); if (problem != null) { throw problem; @@ -1055,12 +1055,12 @@ protected Config brokerConfig(NodeRef nodeRef) throws ForceableProblem { } @Override - protected Config brokerLogging(int brokerId) { + protected Config nodeLogging(int brokerId, boolean isPureController) { return new Config(emptyList()); } @Override - protected void dynamicUpdateBrokerConfig(NodeRef nodeRef, Admin ac, KafkaBrokerConfigurationDiff configurationDiff, KafkaBrokerLoggingConfigurationDiff logDiff) throws ForceableProblem { + protected void dynamicUpdateBrokerConfig(NodeRef nodeRef, KafkaNodeConfigurationDiff configurationDiff, KafkaNodeLoggingConfigurationDiff logDiff) throws ForceableProblem { ForceableProblem problem = alterConfigsException.apply(nodeRef.nodeId()); if (problem != null) { throw problem; From 36dc1a8586917a13a24e5b9ee904e14a33cd79a1 Mon Sep 17 00:00:00 2001 From: Gantigmaa Selenge Date: Thu, 5 Dec 2024 10:33:08 +0000 Subject: [PATCH 2/2] Remove version check when talking to controllers This change is targeted for a release that supports 3.9.0 or later Signed-off-by: Gantigmaa Selenge --- .../operator/resource/KafkaRoller.java | 56 ++++--------------- .../KafkaAssemblyOperatorKRaftMockTest.java | 2 - .../operator/resource/KafkaRollerTest.java | 28 +--------- 3 files changed, 12 insertions(+), 74 deletions(-) diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaRoller.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaRoller.java index 977127c3add..865c1908ac5 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaRoller.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaRoller.java @@ -20,7 +20,6 @@ import io.strimzi.operator.cluster.operator.resource.events.KubernetesRestartEventPublisher; import io.strimzi.operator.cluster.operator.resource.kubernetes.PodOperator; import io.strimzi.operator.common.AdminClientProvider; -import io.strimzi.operator.common.Annotations; import io.strimzi.operator.common.BackOff; import io.strimzi.operator.common.Reconciliation; import io.strimzi.operator.common.ReconciliationLogger; @@ -209,19 +208,10 @@ private boolean maybeInitBrokerAdminClient() { * Initializes controllerAdminClient if it has not been initialized yet * @return true if the creation of AC succeeded, false otherwise */ - private boolean maybeInitControllerAdminClient(String currentVersion) { + private boolean maybeInitControllerAdminClient() { if (this.controllerAdminClient == null) { - // Prior to 3.9.0, Kafka did not support directly connecting to controllers nodes - // via Kafka Admin API when running in KRaft mode. - // Therefore, use brokers to initialise adminClient for quorum health check - // when the version is older than 3.9.0. try { - if (KafkaVersion.compareDottedVersions(currentVersion, "3.9.0") >= 0) { - this.controllerAdminClient = controllerAdminClient(nodes); - } else { - this.controllerAdminClient = brokerAdminClient(Set.of()); - - } + this.controllerAdminClient = controllerAdminClient(nodes); } catch (ForceableProblem | FatalProblem e) { LOGGER.warnCr(reconciliation, "Failed to create controllerAdminClient.", e); return false; @@ -455,11 +445,9 @@ private void restartIfNecessary(NodeRef nodeRef, RestartContext restartContext) // change and the desired roles still apply. boolean isBroker = Labels.booleanLabel(pod, Labels.STRIMZI_BROKER_ROLE_LABEL, nodeRef.broker()); boolean isController = Labels.booleanLabel(pod, Labels.STRIMZI_CONTROLLER_ROLE_LABEL, nodeRef.controller()); - // This is relevant when creating admin client for controllers - String currentVersion = Annotations.stringAnnotation(pod, KafkaCluster.ANNO_STRIMZI_IO_KAFKA_VERSION, "0.0.0", null); try { - checkIfRestartOrReconfigureRequired(nodeRef, isController, isBroker, restartContext, currentVersion); + checkIfRestartOrReconfigureRequired(nodeRef, isController, isBroker, restartContext); if (restartContext.forceRestart) { LOGGER.debugCr(reconciliation, "Pod {} can be rolled now", nodeRef); restartAndAwaitReadiness(pod, operationTimeoutMs, TimeUnit.MILLISECONDS, restartContext); @@ -589,7 +577,7 @@ private void markRestartContextWithForceRestart(RestartContext restartContext) { * Determine whether the pod should be restarted, or the broker reconfigured. */ @SuppressWarnings("checkstyle:CyclomaticComplexity") - private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isController, boolean isBroker, RestartContext restartContext, String currentVersion) throws ForceableProblem, InterruptedException, FatalProblem, UnforceableProblem { + private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isController, boolean isBroker, RestartContext restartContext) throws ForceableProblem, InterruptedException, FatalProblem { RestartReasons reasonToRestartPod = restartContext.restartReasons; if (restartContext.podStuck && !reasonToRestartPod.contains(RestartReason.POD_HAS_OLD_REVISION)) { // If the pod is unschedulable then deleting it, or trying to open an Admin client to it will make no difference @@ -612,10 +600,13 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont // if it is a pure controller, initialise the admin client specifically for controllers if (isController && !isBroker) { - if (!maybeInitControllerAdminClient(currentVersion)) { - handleFailedAdminClientForController(nodeRef, restartContext, reasonToRestartPod, currentVersion); + if (!maybeInitControllerAdminClient()) { + LOGGER.infoCr(reconciliation, "Pod {} needs to be restarted, because it does not seem to responding to connection attempts", nodeRef); + reasonToRestartPod.add(RestartReason.POD_UNRESPONSIVE); + markRestartContextWithForceRestart(restartContext); return; } + LOGGER.debugCr(reconciliation, "Initialising KafkaQuorumCheck for controller pod {}", nodeRef); restartContext.quorumCheck = quorumCheck(controllerAdminClient, nodeRef); } @@ -629,6 +620,7 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont // If it is a mixed node, initialise quorum check with the broker admin client if (isController) { + LOGGER.debugCr(reconciliation, "Initialising KafkaQuorumCheck for mixed roles pod {}", nodeRef); restartContext.quorumCheck = quorumCheck(brokerAdminClient, nodeRef); } } @@ -637,7 +629,6 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont // connect to the node and that it's capable of responding. Config nodeConfig; try { - System.out.println("TINA Getting node config for " + nodeRef.nodeId()); nodeConfig = nodeConfig(nodeRef, isController && !isBroker); } catch (ForceableProblem e) { if (restartContext.backOff.done()) { @@ -678,21 +669,6 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont restartContext.nodeLoggingDiff = nodeLoggingDiff; } - private void handleFailedAdminClientForController(NodeRef nodeRef, RestartContext restartContext, RestartReasons reasonToRestartPod, String currentVersion) throws UnforceableProblem { - if (KafkaVersion.compareDottedVersions(currentVersion, "3.9.0") >= 0) { - // If the version supports talking to controllers, force restart this pod when the admin client cannot be initialised. - // There is no reason to continue as we won't be able to connect an admin client to this pod for other checks later. - LOGGER.infoCr(reconciliation, "KafkaQuorumCheck cannot be initialised for {} because none of the controllers do not seem to responding to connection attempts.", nodeRef); - reasonToRestartPod.add(RestartReason.POD_UNRESPONSIVE); - markRestartContextWithForceRestart(restartContext); - } else { - // If the version does not support talking to controllers, the admin client should be connecting to the broker nodes. - // Since connection to the brokers failed, throw an UnforceableProblem so that broker nodes can be checked later - // which may potentially resolve the connection issue. - throw new UnforceableProblem("KafkaQuorumCheck cannot be initialised for " + nodeRef + " because none of the brokers do not seem to responding to connection attempts"); - } - } - /** * Returns a config of the given broker. * @param nodeRef The reference of the broker. @@ -701,13 +677,11 @@ private void handleFailedAdminClientForController(NodeRef nodeRef, RestartContex /* test */ Config nodeConfig(NodeRef nodeRef, boolean isPureController) throws ForceableProblem, InterruptedException { ConfigResource resource = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(nodeRef.nodeId())); if (isPureController) { - System.out.println("Getting controller config"); return await(VertxUtil.kafkaFutureToVertxFuture(reconciliation, vertx, controllerAdminClient.describeConfigs(singletonList(resource)).values().get(resource)), 30, TimeUnit.SECONDS, error -> new ForceableProblem("Error getting controller config: " + error, error) ); } else { - System.out.println("Getting broker config"); return await(VertxUtil.kafkaFutureToVertxFuture(reconciliation, vertx, brokerAdminClient.describeConfigs(singletonList(resource)).values().get(resource)), 30, TimeUnit.SECONDS, error -> new ForceableProblem("Error getting broker config: " + error, error) @@ -926,15 +900,7 @@ protected Future restart(Pod pod, RestartContext restartContext) { * empty set, use the brokers service to bootstrap the client. */ /* test */ Admin brokerAdminClient(Set nodes) throws ForceableProblem, FatalProblem { - // If no nodes are passed, initialize the admin client using the bootstrap service - // This is still needed for versions older than 3.9.0, so that when only controller nodes being rolled, - // it can use brokers to get quorum information via AdminClient. - String bootstrapHostnames; - if (nodes.isEmpty()) { - bootstrapHostnames = String.format("%s:%s", DnsNameGenerator.of(namespace, KafkaResources.bootstrapServiceName(cluster)).serviceDnsName(), KafkaCluster.REPLICATION_PORT); - } else { - bootstrapHostnames = nodes.stream().filter(NodeRef::broker).map(node -> DnsNameGenerator.podDnsName(namespace, KafkaResources.brokersServiceName(cluster), node.podName()) + ":" + KafkaCluster.REPLICATION_PORT).collect(Collectors.joining(",")); - } + String bootstrapHostnames = nodes.stream().filter(NodeRef::broker).map(node -> DnsNameGenerator.podDnsName(namespace, KafkaResources.brokersServiceName(cluster), node.podName()) + ":" + KafkaCluster.REPLICATION_PORT).collect(Collectors.joining(",")); try { LOGGER.debugCr(reconciliation, "Creating AdminClient for {}", bootstrapHostnames); diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorKRaftMockTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorKRaftMockTest.java index c1e69579858..565d940538a 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorKRaftMockTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorKRaftMockTest.java @@ -28,7 +28,6 @@ import io.strimzi.operator.cluster.PlatformFeaturesAvailability; import io.strimzi.operator.cluster.ResourceUtils; import io.strimzi.operator.cluster.model.CertUtils; -import io.strimzi.operator.cluster.model.KafkaCluster; import io.strimzi.operator.cluster.model.KafkaVersion; import io.strimzi.operator.cluster.model.PodSetUtils; import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier; @@ -62,7 +61,6 @@ import static org.hamcrest.CoreMatchers.hasItems; import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.CoreMatchers.startsWith; diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaRollerTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaRollerTest.java index ec8ff849e58..69c6ddfbef9 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaRollerTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaRollerTest.java @@ -9,7 +9,6 @@ import io.fabric8.kubernetes.client.KubernetesClientException; import io.strimzi.api.kafka.model.kafka.KafkaResources; import io.strimzi.operator.cluster.KafkaVersionTestUtils; -import io.strimzi.operator.cluster.model.KafkaCluster; import io.strimzi.operator.cluster.model.NodeRef; import io.strimzi.operator.cluster.model.RestartReason; import io.strimzi.operator.cluster.model.RestartReasons; @@ -162,7 +161,7 @@ private static AdminClientProvider givenControllerFutureFailsWithTimeout() { @Test public void testTalkingToControllersLatestVersion(VertxTestContext testContext) { - PodOperator podOps = mockPodOpsWithVersion(podId -> succeededFuture(), KafkaVersionTestUtils.getLatestVersion().version()); + PodOperator podOps = mockPodOps(podId -> succeededFuture()); AdminClientProvider mock = mock(AdminClientProvider.class); when(mock.createControllerAdminClient(anyString(), any(), any())).thenThrow(new RuntimeException("An error while try to create an admin client with bootstrap controllers")); @@ -177,26 +176,6 @@ public void testTalkingToControllersLatestVersion(VertxTestContext testContext) asList(0)); } - @Test - public void testTalkingToControllersWithOldVersion(VertxTestContext testContext) throws InterruptedException { - PodOperator podOps = mockPodOpsWithVersion(podId -> succeededFuture(), "3.8.0"); - - AdminClientProvider mock = mock(AdminClientProvider.class); - when(mock.createAdminClient(anyString(), any(), any())).thenThrow(new RuntimeException("An error while try to create an admin client with bootstrap brokers")); - - TestingKafkaRoller kafkaRoller = new TestingKafkaRoller(addKraftPodNames(0, 0, 1), podOps, - noException(), null, noException(), noException(), noException(), - brokerId -> succeededFuture(true), - true, mock, mockKafkaAgentClientProvider(), true, null, -1); - - // If the controller has older version (< 3.9.0), we should only be creating admin client for brokers - // and when the operator cannot connect to brokers, we expect to fail initialising KafkaQuorumCheck - doFailingRollingRestart(testContext, kafkaRoller, - asList(0), - KafkaRoller.UnforceableProblem.class, "KafkaQuorumCheck cannot be initialised for c-kafka-0/0 because none of the brokers do not seem to responding to connection attempts", - emptyList()); - } - private static KafkaAgentClientProvider mockKafkaAgentClientProvider() { return mock(KafkaAgentClientProvider.class); } @@ -835,17 +814,12 @@ public void clearRestarted() { } private PodOperator mockPodOps(Function> readiness) { - return mockPodOpsWithVersion(readiness, KafkaVersionTestUtils.getLatestVersion().version()); - } - - private PodOperator mockPodOpsWithVersion(Function> readiness, String version) { PodOperator podOps = mock(PodOperator.class); when(podOps.get(any(), any())).thenAnswer( invocation -> new PodBuilder() .withNewMetadata() .withNamespace(invocation.getArgument(0)) .withName(invocation.getArgument(1)) - .addToAnnotations(KafkaCluster.ANNO_STRIMZI_IO_KAFKA_VERSION, version) .endMetadata() .build() );