diff --git a/topic-operator/src/main/java/io/strimzi/operator/topic/TopicOperatorConfig.java b/topic-operator/src/main/java/io/strimzi/operator/topic/TopicOperatorConfig.java index 14c2dd49b6..c7c7a7f4bd 100644 --- a/topic-operator/src/main/java/io/strimzi/operator/topic/TopicOperatorConfig.java +++ b/topic-operator/src/main/java/io/strimzi/operator/topic/TopicOperatorConfig.java @@ -27,82 +27,12 @@ /** * Topic Operator configuration. - * - * @param namespace The namespace that the operator will watch for KafkaTopics. - * @param labelSelector The label selector that KafkaTopics must match. - * @param bootstrapServers The Kafka bootstrap servers. - * @param clientId The client Id to use for the Admin client. - * @param fullReconciliationIntervalMs The periodic reconciliation interval in milliseconds. - * @param tlsEnabled Whether the Admin client should be configured to use TLS. - * @param truststoreLocation The location (path) of the Admin client's truststore. - * @param truststorePassword The password for the truststore at {@code truststoreLocation}. - * @param keystoreLocation The location (path) of the Admin client's keystore. - * @param keystorePassword The password for the keystore at {@code keystoreLocation}. - * @param sslEndpointIdentificationAlgorithm The SSL endpoint identification algorithm. - * @param saslEnabled Whether the Admin client should be configured to use SASL. - * @param saslMechanism The SASL mechanism for the Admin client. - * @param saslCustomConfigJson The SASL custom values for the Admin client when using alternate auth mechanisms. - * @param saslUsername The SASL username for the Admin client. - * @param saslPassword The SASL password for the Admin client. - * @param securityProtocol The security protocol for the Admin client. - * @param useFinalizer Whether to use finalizers. - * @param maxQueueSize The capacity of the queue. - * @param maxBatchSize The maximum size of a reconciliation batch. - * @param maxBatchLingerMs The maximum time to wait for a reconciliation batch to contain {@code maxBatchSize} items. - * @param enableAdditionalMetrics Whether to enable additional metrics. - * @param featureGates Configured feature gates. - * @param cruiseControlEnabled Whether Cruise Control integration is enabled. - * @param cruiseControlRackEnabled Whether the target Kafka cluster has rack awareness. - * @param cruiseControlHostname Cruise Control hostname. - * @param cruiseControlPort Cruise Control port. - * @param cruiseControlSslEnabled Whether Cruise Control SSL encryption is enabled. - * @param cruiseControlAuthEnabled Whether Cruise Control Basic authentication is enabled. - * @param cruiseControlCrtFilePath Certificate chain to be trusted. - * @param cruiseControlApiUserPath Api admin username file path. - * @param cruiseControlApiPassPath Api admin password file path. - * @param alterableTopicConfig Comma separated list of the alterable Kafka topic properties. - * @param skipClusterConfigReview For some managed Kafka services the Cluster config is not callable, so this skips those calls. */ -public record TopicOperatorConfig( - String namespace, - Labels labelSelector, - String bootstrapServers, - String clientId, - long fullReconciliationIntervalMs, - boolean tlsEnabled, - String truststoreLocation, - String truststorePassword, - String keystoreLocation, - String keystorePassword, - String sslEndpointIdentificationAlgorithm, - boolean saslEnabled, - String saslMechanism, - String saslCustomConfigJson, - String saslUsername, - String saslPassword, - String securityProtocol, - boolean useFinalizer, - int maxQueueSize, - int maxBatchSize, - long maxBatchLingerMs, - boolean enableAdditionalMetrics, - FeatureGates featureGates, - boolean cruiseControlEnabled, - boolean cruiseControlRackEnabled, - String cruiseControlHostname, - int cruiseControlPort, - boolean cruiseControlSslEnabled, - boolean cruiseControlAuthEnabled, - String cruiseControlCrtFilePath, - String cruiseControlApiUserPath, - String cruiseControlApiPassPath, - String alterableTopicConfig, - boolean skipClusterConfigReview -) { +public class TopicOperatorConfig { private final static ReconciliationLogger LOGGER = ReconciliationLogger.create(TopicOperatorConfig.class); private static final Map> CONFIG_VALUES = new HashMap<>(); private static final TypeReference> STRING_HASH_MAP_TYPE_REFERENCE = new TypeReference<>() { }; - + /** Namespace in which the operator will run and create resources. */ public static final ConfigParameter NAMESPACE = new ConfigParameter<>("STRIMZI_NAMESPACE", ConfigParameterParser.NON_EMPTY_STRING, CONFIG_VALUES); /** Labels used to filter the custom resources seen by the cluster operator. */ @@ -158,7 +88,7 @@ public record TopicOperatorConfig( /** Cruise Control: whether rack awareness is enabled. */ public static final ConfigParameter CRUISE_CONTROL_RACK_ENABLED = new ConfigParameter<>("STRIMZI_CRUISE_CONTROL_RACK_ENABLED", ConfigParameterParser.BOOLEAN, "false", CONFIG_VALUES); /** Cruise Control: server hostname. */ - public static final ConfigParameter CRUISE_CONTROL_HOSTNAME = new ConfigParameter<>("STRIMZI_CRUISE_CONTROL_HOSTNAME", ConfigParameterParser.STRING, "127.0.0.1", CONFIG_VALUES); + public static final ConfigParameter CRUISE_CONTROL_HOSTNAME = new ConfigParameter<>("STRIMZI_CRUISE_CONTROL_HOSTNAME", ConfigParameterParser.STRING, "localhost", CONFIG_VALUES); /** Cruise Control: server port. */ public static final ConfigParameter CRUISE_CONTROL_PORT = new ConfigParameter<>("STRIMZI_CRUISE_CONTROL_PORT", ConfigParameterParser.strictlyPositive(ConfigParameterParser.INTEGER), "9090", CONFIG_VALUES); /** Cruise Control: whether rack awareness is enabled. */ @@ -172,116 +102,251 @@ public record TopicOperatorConfig( /** Cruise Control: password file location. */ public static final ConfigParameter CRUISE_CONTROL_API_PASS_PATH = new ConfigParameter<>("STRIMZI_CRUISE_CONTROL_API_PASS_PATH", ConfigParameterParser.STRING, "/etc/eto-cc-api/" + CruiseControlApiProperties.TOPIC_OPERATOR_PASSWORD_KEY, CONFIG_VALUES); - @SuppressWarnings("unchecked") - private static T get(Map map, ConfigParameter value) { - return (T) map.get(value.key()); - } + private final Map map; - static Set keyNames() { - return Collections.unmodifiableSet(CONFIG_VALUES.keySet()); + /** + * Constructor. + * + * @param map Map containing configurations and their respective values. + */ + private TopicOperatorConfig(Map map) { + this.map = map; } /** - * Creates the TopicOperator configuration from a map. - * + * Creates the Topic Operator configuration from a map. + * * @param map Configuration map. - * @return TopicOperator config. + * @return Topic Operator configuration. */ public static TopicOperatorConfig buildFromMap(Map map) { Map envMap = new HashMap<>(map); envMap.keySet().retainAll(TopicOperatorConfig.keyNames()); - Map generatedMap = ConfigParameter.define(envMap, CONFIG_VALUES); - TopicOperatorConfig topicOperatorConfig = new TopicOperatorConfig(generatedMap); LOGGER.infoOp("TopicOperator configuration is {}", topicOperatorConfig); return topicOperatorConfig; } - /** - * Creates the TopicOperator configuration. - * - * @param map Configuration map. - */ - public TopicOperatorConfig(Map map) { - this( - get(map, NAMESPACE), - get(map, RESOURCE_LABELS), - get(map, BOOTSTRAP_SERVERS), - get(map, CLIENT_ID), - get(map, FULL_RECONCILIATION_INTERVAL_MS), - get(map, TLS_ENABLED), - get(map, TRUSTSTORE_LOCATION), - get(map, TRUSTSTORE_PASSWORD), - get(map, KEYSTORE_LOCATION), - get(map, KEYSTORE_PASSWORD), - get(map, SSL_ENDPOINT_IDENTIFICATION_ALGORITHM), - get(map, SASL_ENABLED), - get(map, SASL_MECHANISM), - get(map, SASL_CUSTOM_CONFIG_JSON), - get(map, SASL_USERNAME), - get(map, SASL_PASSWORD), - get(map, SECURITY_PROTOCOL), - get(map, USE_FINALIZERS), - get(map, MAX_QUEUE_SIZE), - get(map, MAX_BATCH_SIZE), - get(map, MAX_BATCH_LINGER_MS), - get(map, ENABLE_ADDITIONAL_METRICS), - get(map, FEATURE_GATES), - get(map, CRUISE_CONTROL_ENABLED), - get(map, CRUISE_CONTROL_RACK_ENABLED), - get(map, CRUISE_CONTROL_HOSTNAME), - get(map, CRUISE_CONTROL_PORT), - get(map, CRUISE_CONTROL_SSL_ENABLED), - get(map, CRUISE_CONTROL_AUTH_ENABLED), - get(map, CRUISE_CONTROL_CRT_FILE_PATH), - get(map, CRUISE_CONTROL_API_USER_PATH), - get(map, CRUISE_CONTROL_API_PASS_PATH), - get(map, ALTERABLE_TOPIC_CONFIG), - get(map, SKIP_CLUSTER_CONFIG_REVIEW) - ); + private static Set keyNames() { + return Collections.unmodifiableSet(CONFIG_VALUES.keySet()); + } + + @SuppressWarnings("unchecked") + private T get(ConfigParameter value) { + return (T) map.get(value.key()); + } + + /** @return Value of {@link #NAMESPACE} configuration. */ + public String namespace() { + return get(NAMESPACE); + } + + /** @return Value of {@link #RESOURCE_LABELS} configuration. */ + public Labels resourceLabels() { + return get(RESOURCE_LABELS); + } + + /** @return Value of {@link #BOOTSTRAP_SERVERS} configuration. */ + public String bootstrapServers() { + return get(BOOTSTRAP_SERVERS); + } + + /** @return Value of {@link #CLIENT_ID} configuration. */ + public String clientId() { + return get(CLIENT_ID); + } + + /** @return Value of {@link #FULL_RECONCILIATION_INTERVAL_MS} configuration. */ + public long fullReconciliationIntervalMs() { + return get(FULL_RECONCILIATION_INTERVAL_MS); + } + + /** @return Value of {@link #TLS_ENABLED} configuration. */ + public boolean tlsEnabled() { + return get(TLS_ENABLED); + } + + /** @return Value of {@link #TRUSTSTORE_LOCATION} configuration. */ + public String truststoreLocation() { + return get(TRUSTSTORE_LOCATION); + } + + /** @return Value of {@link #TRUSTSTORE_PASSWORD} configuration. */ + public String truststorePassword() { + return get(TRUSTSTORE_PASSWORD); + } + + /** @return Value of {@link #KEYSTORE_LOCATION} configuration. */ + public String keystoreLocation() { + return get(KEYSTORE_LOCATION); + } + + /** @return Value of {@link #KEYSTORE_PASSWORD} configuration. */ + public String keystorePassword() { + return get(KEYSTORE_PASSWORD); + } + + /** @return Value of {@link #SSL_ENDPOINT_IDENTIFICATION_ALGORITHM} configuration. */ + public String sslEndpointIdentificationAlgorithm() { + return get(SSL_ENDPOINT_IDENTIFICATION_ALGORITHM); + } + + /** @return Value of {@link #SASL_ENABLED} configuration. */ + public boolean saslEnabled() { + return get(SASL_ENABLED); + } + + /** @return Value of {@link #SASL_MECHANISM} configuration. */ + public String saslMechanism() { + return get(SASL_MECHANISM); + } + + /** @return Value of {@link #SASL_CUSTOM_CONFIG_JSON} configuration. */ + public String saslCustomConfigJson() { + return get(SASL_CUSTOM_CONFIG_JSON); + } + + /** @return Value of {@link #SASL_USERNAME} configuration. */ + public String saslUsername() { + return get(SASL_USERNAME); + } + + /** @return Value of {@link #SASL_PASSWORD} configuration. */ + public String saslPassword() { + return get(SASL_PASSWORD); + } + + /** @return Value of {@link #SECURITY_PROTOCOL} configuration. */ + public String securityProtocol() { + return get(SECURITY_PROTOCOL); + } + + /** @return Value of {@link #USE_FINALIZERS} configuration. */ + public boolean useFinalizer() { + return get(USE_FINALIZERS); + } + + /** @return Value of {@link #MAX_QUEUE_SIZE} configuration. */ + public int maxQueueSize() { + return get(MAX_QUEUE_SIZE); + } + + /** @return Value of {@link #MAX_BATCH_SIZE} configuration. */ + public int maxBatchSize() { + return get(MAX_BATCH_SIZE); + } + + /** @return Value of {@link #MAX_BATCH_LINGER_MS} configuration. */ + public long maxBatchLingerMs() { + return get(MAX_BATCH_LINGER_MS); + } + + /** @return Value of {@link #ENABLE_ADDITIONAL_METRICS} configuration. */ + public boolean enableAdditionalMetrics() { + return get(ENABLE_ADDITIONAL_METRICS); + } + + /** @return Value of {@link #ALTERABLE_TOPIC_CONFIG} configuration. */ + public String alterableTopicConfig() { + return get(ALTERABLE_TOPIC_CONFIG); + } + + /** @return Value of {@link #SKIP_CLUSTER_CONFIG_REVIEW} configuration. */ + public boolean skipClusterConfigReview() { + return get(SKIP_CLUSTER_CONFIG_REVIEW); + } + + /** @return Value of {@link #FEATURE_GATES} configuration. */ + public FeatureGates featureGates() { + return get(FEATURE_GATES); + } + + /** @return Value of {@link #CRUISE_CONTROL_ENABLED} configuration. */ + public boolean cruiseControlEnabled() { + return get(CRUISE_CONTROL_ENABLED); + } + + /** @return Value of {@link #CRUISE_CONTROL_RACK_ENABLED} configuration. */ + public boolean cruiseControlRackEnabled() { + return get(CRUISE_CONTROL_RACK_ENABLED); + } + + /** @return Value of {@link #CRUISE_CONTROL_HOSTNAME} configuration. */ + public String cruiseControlHostname() { + return get(CRUISE_CONTROL_HOSTNAME); + } + + /** @return Value of {@link #CRUISE_CONTROL_PORT} configuration. */ + public int cruiseControlPort() { + return get(CRUISE_CONTROL_PORT); + } + + /** @return Value of {@link #CRUISE_CONTROL_SSL_ENABLED} configuration. */ + public boolean cruiseControlSslEnabled() { + return get(CRUISE_CONTROL_SSL_ENABLED); + } + + /** @return Value of {@link #CRUISE_CONTROL_AUTH_ENABLED} configuration. */ + public boolean cruiseControlAuthEnabled() { + return get(CRUISE_CONTROL_AUTH_ENABLED); + } + + /** @return Value of {@link #CRUISE_CONTROL_CRT_FILE_PATH} configuration. */ + public String cruiseControlCrtFilePath() { + return get(CRUISE_CONTROL_CRT_FILE_PATH); + } + + /** @return Value of {@link #CRUISE_CONTROL_API_USER_PATH} configuration. */ + public String cruiseControlApiUserPath() { + return get(CRUISE_CONTROL_API_USER_PATH); + } + + /** @return Value of {@link #CRUISE_CONTROL_API_PASS_PATH} configuration. */ + public String cruiseControlApiPassPath() { + return get(CRUISE_CONTROL_API_PASS_PATH); } Map adminClientConfig() { var kafkaClientProps = new HashMap(); - kafkaClientProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers()); - kafkaClientProps.put(AdminClientConfig.CLIENT_ID_CONFIG, this.clientId()); + kafkaClientProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()); + kafkaClientProps.put(AdminClientConfig.CLIENT_ID_CONFIG, clientId()); - if (this.tlsEnabled() && !this.securityProtocol().isEmpty()) { - if (!this.securityProtocol().equals("SSL") && !this.securityProtocol().equals("SASL_SSL")) { + if (tlsEnabled() && !securityProtocol().isEmpty()) { + if (!securityProtocol().equals("SSL") && !securityProtocol().equals("SASL_SSL")) { throw new InvalidConfigurationException("TLS is enabled but the security protocol does not match SSL or SASL_SSL"); } } - if (!this.securityProtocol().isEmpty()) { - kafkaClientProps.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, this.securityProtocol()); - } else if (this.tlsEnabled()) { + if (!securityProtocol().isEmpty()) { + kafkaClientProps.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, securityProtocol()); + } else if (tlsEnabled()) { kafkaClientProps.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SSL"); } else { kafkaClientProps.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT"); } - if (this.securityProtocol().equals("SASL_SSL") || this.securityProtocol().equals("SSL") || this.tlsEnabled()) { - kafkaClientProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, this.sslEndpointIdentificationAlgorithm()); + if (securityProtocol().equals("SASL_SSL") || securityProtocol().equals("SSL") || tlsEnabled()) { + kafkaClientProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, sslEndpointIdentificationAlgorithm()); - if (!this.truststoreLocation().isEmpty()) { - kafkaClientProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, this.truststoreLocation()); + if (!truststoreLocation().isEmpty()) { + kafkaClientProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreLocation()); } - if (!this.truststorePassword().isEmpty()) { - if (this.truststoreLocation().isEmpty()) { + if (!truststorePassword().isEmpty()) { + if (truststoreLocation().isEmpty()) { throw new InvalidConfigurationException("TLS_TRUSTSTORE_PASSWORD was supplied but TLS_TRUSTSTORE_LOCATION was not supplied"); } - kafkaClientProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, this.truststorePassword()); + kafkaClientProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststorePassword()); } - if (!this.keystoreLocation().isEmpty() && !this.keystorePassword().isEmpty()) { - kafkaClientProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, this.keystoreLocation()); - kafkaClientProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, this.keystorePassword()); + if (!keystoreLocation().isEmpty() && !keystorePassword().isEmpty()) { + kafkaClientProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keystoreLocation()); + kafkaClientProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keystorePassword()); } } - if (this.saslEnabled()) { + if (saslEnabled()) { putSaslConfigs(kafkaClientProps); } @@ -289,10 +354,7 @@ Map adminClientConfig() { } private void putSaslConfigs(Map kafkaClientProps) { - TopicOperatorConfig config = this; - String customSaslConfigJson = config.saslCustomConfigJson(); - - if (customSaslConfigJson.isBlank()) { + if (saslCustomConfigJson().isBlank()) { setStandardSaslConfigs(kafkaClientProps); } else { setCustomSaslConfigs(kafkaClientProps); @@ -300,10 +362,7 @@ private void putSaslConfigs(Map kafkaClientProps) { } private void setCustomSaslConfigs(Map kafkaClientProps) { - TopicOperatorConfig config = this; - String customPropsString = config.saslCustomConfigJson(); - - if (customPropsString.isEmpty()) { + if (saslCustomConfigJson().isEmpty()) { throw new InvalidConfigurationException("Custom SASL config properties are not set"); } @@ -311,7 +370,7 @@ private void setCustomSaslConfigs(Map kafkaClientProps) { objectMapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true); try { - Map customProperties = objectMapper.readValue(customPropsString, STRING_HASH_MAP_TYPE_REFERENCE); + Map customProperties = objectMapper.readValue(saslCustomConfigJson(), STRING_HASH_MAP_TYPE_REFERENCE); if (customProperties.isEmpty()) { throw new InvalidConfigurationException("SASL custom config properties empty"); @@ -328,35 +387,31 @@ private void setCustomSaslConfigs(Map kafkaClientProps) { kafkaClientProps.put(key, value); } } catch (JsonProcessingException e) { - throw new InvalidConfigurationException("SASL custom config properties deserialize failed. customProperties: '" + customPropsString + "'"); + throw new InvalidConfigurationException("SASL custom config properties deserialize failed. customProperties: '" + saslCustomConfigJson() + "'"); } } private void setStandardSaslConfigs(Map kafkaClientProps) { - TopicOperatorConfig config = this; String saslMechanism; String jaasConfig; - String username = config.saslUsername(); - String password = config.saslPassword(); - String configSaslMechanism = config.saslMechanism(); - if (username.isEmpty() || password.isEmpty()) { + if (saslUsername().isEmpty() || saslPassword().isEmpty()) { throw new InvalidConfigurationException("SASL credentials are not set"); } - if ("plain".equals(configSaslMechanism)) { + if ("plain".equals(saslMechanism())) { saslMechanism = "PLAIN"; - jaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";"; - } else if ("scram-sha-256".equals(configSaslMechanism) || "scram-sha-512".equals(configSaslMechanism)) { - jaasConfig = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + username + "\" password=\"" + password + "\";"; + jaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + saslUsername() + "\" password=\"" + saslPassword() + "\";"; + } else if ("scram-sha-256".equals(saslMechanism()) || "scram-sha-512".equals(saslMechanism())) { + jaasConfig = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + saslUsername() + "\" password=\"" + saslPassword() + "\";"; - if ("scram-sha-256".equals(configSaslMechanism)) { + if ("scram-sha-256".equals(saslMechanism())) { saslMechanism = "SCRAM-SHA-256"; } else { saslMechanism = "SCRAM-SHA-512"; } } else { - throw new IllegalArgumentException("Invalid SASL_MECHANISM type: " + configSaslMechanism); + throw new IllegalArgumentException("Invalid SASL_MECHANISM type: " + saslMechanism()); } kafkaClientProps.put(SaslConfigs.SASL_MECHANISM, saslMechanism); @@ -367,40 +422,40 @@ private void setStandardSaslConfigs(Map kafkaClientProps) { public String toString() { String mask = "********"; return "TopicOperatorConfig{" + - "\n\tnamespace='" + namespace + '\'' + - "\n\tlabelSelector=" + labelSelector + - "\n\tbootstrapServers='" + bootstrapServers + '\'' + - "\n\tclientId='" + clientId + '\'' + - "\n\tfullReconciliationIntervalMs=" + fullReconciliationIntervalMs + - "\n\ttlsEnabled=" + tlsEnabled + - "\n\ttruststoreLocation='" + truststoreLocation + '\'' + - "\n\ttruststorePassword='" + mask + '\'' + - "\n\tkeystoreLocation='" + keystoreLocation + '\'' + - "\n\tkeystorePassword='" + mask + '\'' + - "\n\tsslEndpointIdentificationAlgorithm='" + sslEndpointIdentificationAlgorithm + '\'' + - "\n\tsaslEnabled=" + saslEnabled + - "\n\tsaslMechanism='" + saslMechanism + '\'' + - "\n\tsaslCustomConfigJson='" + (saslCustomConfigJson == null ? null : mask) + '\'' + - "\n\talterableTopicConfig='" + alterableTopicConfig + '\'' + - "\n\tskipClusterConfigReview='" + skipClusterConfigReview + '\'' + - "\n\tsaslUsername='" + saslUsername + '\'' + - "\n\tsaslPassword='" + mask + '\'' + - "\n\tsecurityProtocol='" + securityProtocol + '\'' + - "\n\tuseFinalizer=" + useFinalizer + - "\n\tmaxQueueSize=" + maxQueueSize + - "\n\tmaxBatchSize=" + maxBatchSize + - "\n\tmaxBatchLingerMs=" + maxBatchLingerMs + - "\n\tenableAdditionalMetrics=" + enableAdditionalMetrics + - "\n\tfeatureGates='" + featureGates + "'" + - "\n\tcruiseControlEnabled=" + cruiseControlEnabled + - "\n\tcruiseControlRackEnabled=" + cruiseControlRackEnabled + - "\n\tcruiseControlHostname=" + cruiseControlHostname + - "\n\tcruiseControlPort=" + cruiseControlPort + - "\n\tcruiseControlSslEnabled=" + cruiseControlSslEnabled + - "\n\tcruiseControlAuthEnabled=" + cruiseControlAuthEnabled + - "\n\tcruiseControlCrtFilePath=" + cruiseControlCrtFilePath + - "\n\tcruiseControlApiUserPath=" + cruiseControlApiUserPath + - "\n\tcruiseControlApiPassPath=" + cruiseControlApiPassPath + - '}'; + "\n\tnamespace='" + namespace() + '\'' + + "\n\tresourceLabels=" + resourceLabels() + + "\n\tbootstrapServers='" + bootstrapServers() + '\'' + + "\n\tclientId='" + clientId() + '\'' + + "\n\tfullReconciliationIntervalMs=" + fullReconciliationIntervalMs() + + "\n\ttlsEnabled=" + tlsEnabled() + + "\n\ttruststoreLocation='" + truststoreLocation() + '\'' + + "\n\ttruststorePassword='" + mask + '\'' + + "\n\tkeystoreLocation='" + keystoreLocation() + '\'' + + "\n\tkeystorePassword='" + mask + '\'' + + "\n\tsslEndpointIdentificationAlgorithm='" + sslEndpointIdentificationAlgorithm() + '\'' + + "\n\tsaslEnabled=" + saslEnabled() + + "\n\tsaslMechanism='" + saslMechanism() + '\'' + + "\n\tsaslCustomConfigJson='" + (saslCustomConfigJson() == null ? null : mask) + '\'' + + "\n\talterableTopicConfig='" + alterableTopicConfig() + '\'' + + "\n\tskipClusterConfigReview='" + skipClusterConfigReview() + '\'' + + "\n\tsaslUsername='" + saslUsername() + '\'' + + "\n\tsaslPassword='" + mask + '\'' + + "\n\tsecurityProtocol='" + securityProtocol() + '\'' + + "\n\tuseFinalizer=" + useFinalizer() + + "\n\tmaxQueueSize=" + maxQueueSize() + + "\n\tmaxBatchSize=" + maxBatchSize() + + "\n\tmaxBatchLingerMs=" + maxBatchLingerMs() + + "\n\tenableAdditionalMetrics=" + enableAdditionalMetrics() + + "\n\tfeatureGates='" + featureGates() + "'" + + "\n\tcruiseControlEnabled=" + cruiseControlEnabled() + + "\n\tcruiseControlRackEnabled=" + cruiseControlRackEnabled() + + "\n\tcruiseControlHostname=" + cruiseControlHostname() + + "\n\tcruiseControlPort=" + cruiseControlPort() + + "\n\tcruiseControlSslEnabled=" + cruiseControlSslEnabled() + + "\n\tcruiseControlAuthEnabled=" + cruiseControlAuthEnabled() + + "\n\tcruiseControlCrtFilePath=" + cruiseControlCrtFilePath() + + "\n\tcruiseControlApiUserPath=" + cruiseControlApiUserPath() + + "\n\tcruiseControlApiPassPath=" + cruiseControlApiPassPath() + + '}'; } } diff --git a/topic-operator/src/main/java/io/strimzi/operator/topic/TopicOperatorMain.java b/topic-operator/src/main/java/io/strimzi/operator/topic/TopicOperatorMain.java index ebe0d4bc6c..640f00edcf 100644 --- a/topic-operator/src/main/java/io/strimzi/operator/topic/TopicOperatorMain.java +++ b/topic-operator/src/main/java/io/strimzi/operator/topic/TopicOperatorMain.java @@ -58,9 +58,9 @@ public class TopicOperatorMain implements Liveness, Readiness { TopicOperatorMain(TopicOperatorConfig config, Admin kafkaAdminClient) { Objects.requireNonNull(config.namespace()); - Objects.requireNonNull(config.labelSelector()); + Objects.requireNonNull(config.resourceLabels()); this.config = config; - var selector = config.labelSelector().toMap(); + var selector = config.resourceLabels().toMap(); this.kubernetesClient = new OperatorKubernetesClientBuilder("strimzi-topic-operator", TopicOperatorMain.class.getPackage().getImplementationVersion()).build(); this.kafkaAdminClient = kafkaAdminClient; this.cruiseControlClient = TopicOperatorUtil.createCruiseControlClient(config); diff --git a/topic-operator/src/test/java/io/strimzi/operator/topic/BatchingTopicControllerIT.java b/topic-operator/src/test/java/io/strimzi/operator/topic/BatchingTopicControllerIT.java index c9e99106ae..1878f69c4e 100644 --- a/topic-operator/src/test/java/io/strimzi/operator/topic/BatchingTopicControllerIT.java +++ b/topic-operator/src/test/java/io/strimzi/operator/topic/BatchingTopicControllerIT.java @@ -72,8 +72,7 @@ import static org.mockito.Mockito.verifyNoInteractions; /** - * This test is not intended to provide lots of coverage of the {@link BatchingTopicController}, - * rather it aims to cover some parts that a difficult to test via {@link TopicControllerIT}. + * This test covers some parts that a difficult to test via {@link TopicControllerIT}. */ @SuppressWarnings("checkstyle:ClassDataAbstractionCoupling") class BatchingTopicControllerIT implements TestSeparator { diff --git a/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java b/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java index c9f8463e9e..0aff79230b 100644 --- a/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java +++ b/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java @@ -13,7 +13,6 @@ import io.strimzi.api.kafka.model.topic.KafkaTopic; import io.strimzi.api.kafka.model.topic.KafkaTopicBuilder; import io.strimzi.api.kafka.model.topic.KafkaTopicStatus; -import io.strimzi.operator.common.featuregates.FeatureGates; import io.strimzi.operator.common.model.Labels; import io.strimzi.operator.topic.model.KubeRef; import io.strimzi.operator.topic.model.TopicOperatorException; @@ -98,8 +97,8 @@ import static org.mockito.Mockito.mock; /** - * This integration test suite provides coverage of the {@link BatchingTopicController}. - * If you need to test individual units of code, use the the {@link BatchingTopicController}. + * This integration test covers the {@link BatchingTopicController}. + * This test requires a connection to a Kubernetes cluster. */ @SuppressWarnings("checkstyle:ClassFanOutComplexity") class TopicControllerIT implements TestSeparator { @@ -1273,17 +1272,17 @@ private static TopicOperatorConfig topicOperatorConfig(String ns, StrimziKafkaCl } private static TopicOperatorConfig topicOperatorConfig(String ns, StrimziKafkaCluster kafkaCluster, boolean useFinalizer, long fullReconciliationIntervalMs) { - return new TopicOperatorConfig(ns, - Labels.fromMap(SELECTOR), - kafkaCluster.getBootstrapServers(), - TopicControllerIT.class.getSimpleName(), - fullReconciliationIntervalMs, - false, "", "", "", "", "", - false, "", "", "", "", "", - useFinalizer, - 100, 100, 10, false, new FeatureGates(""), - false, false, "", 9090, false, false, "", "", "", - "all", false); + return TopicOperatorConfig.buildFromMap(Map.of( + TopicOperatorConfig.NAMESPACE.key(), ns, + TopicOperatorConfig.RESOURCE_LABELS.key(), Labels.fromMap(SELECTOR).toSelectorString(), + TopicOperatorConfig.BOOTSTRAP_SERVERS.key(), kafkaCluster.getBootstrapServers(), + TopicOperatorConfig.CLIENT_ID.key(), TopicControllerIT.class.getSimpleName(), + TopicOperatorConfig.FULL_RECONCILIATION_INTERVAL_MS.key(), String.valueOf(fullReconciliationIntervalMs), + TopicOperatorConfig.USE_FINALIZERS.key(), String.valueOf(useFinalizer), + TopicOperatorConfig.MAX_QUEUE_SIZE.key(), "100", + TopicOperatorConfig.MAX_BATCH_SIZE.key(), "100", + TopicOperatorConfig.MAX_BATCH_LINGER_MS.key(), "10" + )); } @Test @@ -2038,14 +2037,17 @@ public void shouldTerminateIfQueueFull() throws ExecutionException, InterruptedE // given String ns = createNamespace(NAMESPACE); - var config = new TopicOperatorConfig(ns, Labels.fromMap(SELECTOR), - kafkaCluster.getBootstrapServers(), TopicControllerIT.class.getSimpleName(), 10_000, - false, "", "", "", "", "", - false, "", "", "", "", "", - true, - 1, 100, 5_0000, false, new FeatureGates(""), - false, false, "", 9090, false, false, "", "", "", - "all", false); + var config = TopicOperatorConfig.buildFromMap(Map.of( + TopicOperatorConfig.NAMESPACE.key(), ns, + TopicOperatorConfig.RESOURCE_LABELS.key(), Labels.fromMap(SELECTOR).toSelectorString(), + TopicOperatorConfig.BOOTSTRAP_SERVERS.key(), kafkaCluster.getBootstrapServers(), + TopicOperatorConfig.CLIENT_ID.key(), TopicControllerIT.class.getSimpleName(), + TopicOperatorConfig.FULL_RECONCILIATION_INTERVAL_MS.key(), "10000", + TopicOperatorConfig.USE_FINALIZERS.key(), "true", + TopicOperatorConfig.MAX_QUEUE_SIZE.key(), "1", + TopicOperatorConfig.MAX_BATCH_SIZE.key(), "100", + TopicOperatorConfig.MAX_BATCH_LINGER_MS.key(), "5000" + )); maybeStartOperator(config);