diff --git a/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java b/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java index fa13fb0fcc..c9f8463e9e 100644 --- a/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java +++ b/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java @@ -106,7 +106,15 @@ class TopicControllerIT implements TestSeparator { private static final Logger LOGGER = LogManager.getLogger(TopicControllerIT.class); private static final String NAMESPACE = TopicOperatorTestUtil.namespaceName(TopicControllerIT.class); - public static final Map SELECTOR = Map.of("foo", "FOO", "bar", "BAR"); + private static final Map SELECTOR = Map.of("foo", "FOO", "bar", "BAR"); + private static final Map TEST_TOPIC_CONFIG = Map.of( + TopicConfig.CLEANUP_POLICY_CONFIG, List.of("compact"), + TopicConfig.COMPRESSION_TYPE_CONFIG, "producer", + TopicConfig.FLUSH_MS_CONFIG, 1234L, + TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, 1234, + TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, 0.6, + TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, true + ); static KubernetesClient kubernetesClient; TopicOperatorMain operator; @@ -437,21 +445,14 @@ static List managedKafkaTopics() { static List managedKafkaTopicsWithConfigs() { var topicName = "topic" + System.nanoTime(); - var configs = Map.of( - TopicConfig.CLEANUP_POLICY_CONFIG, List.of("compact"), // list typed - TopicConfig.COMPRESSION_TYPE_CONFIG, "producer", // string typed - TopicConfig.FLUSH_MS_CONFIG, 1234L, // long typed - TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, 1234, // int typed - TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, 0.6, // double typed - TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, true // boolean typed - ); + return List.of( - kafkaTopic(NAMESPACE, topicName + "a", SELECTOR, null, true, topicName + "a", 2, 1, configs), - kafkaTopic(NAMESPACE, topicName + "b", SELECTOR, null, true, null, 2, 1, configs), - kafkaTopic(NAMESPACE, topicName + "c", SELECTOR, null, true, topicName + "c".toUpperCase(Locale.ROOT), 2, 1, configs), - kafkaTopic(NAMESPACE, topicName + "d", SELECTOR, null, null, topicName + "d", 2, 1, configs), - kafkaTopic(NAMESPACE, topicName + "e", SELECTOR, null, null, null, 2, 1, configs), - kafkaTopic(NAMESPACE, topicName + "f", SELECTOR, null, null, topicName + "f".toUpperCase(Locale.ROOT), 2, 1, configs) + kafkaTopic(NAMESPACE, topicName + "a", SELECTOR, null, true, topicName + "a", 2, 1, TEST_TOPIC_CONFIG), + kafkaTopic(NAMESPACE, topicName + "b", SELECTOR, null, true, null, 2, 1, TEST_TOPIC_CONFIG), + kafkaTopic(NAMESPACE, topicName + "c", SELECTOR, null, true, topicName + "c".toUpperCase(Locale.ROOT), 2, 1, TEST_TOPIC_CONFIG), + kafkaTopic(NAMESPACE, topicName + "d", SELECTOR, null, null, topicName + "d", 2, 1, TEST_TOPIC_CONFIG), + kafkaTopic(NAMESPACE, topicName + "e", SELECTOR, null, null, null, 2, 1, TEST_TOPIC_CONFIG), + kafkaTopic(NAMESPACE, topicName + "f", SELECTOR, null, null, topicName + "f".toUpperCase(Locale.ROOT), 2, 1, TEST_TOPIC_CONFIG) ); } @@ -851,14 +852,9 @@ private void shouldUpdateTopicInKafkaWhenConfigChangedInKube(StrimziKafkaCluster UnaryOperator> expectedChangedConfigs) throws ExecutionException, InterruptedException, TimeoutException { // given var expectedTopicName = TopicOperatorUtil.topicName(kt); - var expectedCreateConfigs = Map.of( - TopicConfig.CLEANUP_POLICY_CONFIG, "compact", // list typed - TopicConfig.COMPRESSION_TYPE_CONFIG, "producer", // string typed - TopicConfig.FLUSH_MS_CONFIG, "1234", // long typed - TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "1234", // int typed - TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.6", // double typed - TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true" // boolean typed - ); + var expectedCreateConfigs = TEST_TOPIC_CONFIG.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() instanceof List + ? String.join(",", (List) e.getValue()) : String.valueOf(e.getValue()))); Map expectedConfigs = expectedChangedConfigs.apply(expectedCreateConfigs); assertNotEquals(expectedCreateConfigs, expectedConfigs); @@ -881,7 +877,7 @@ public void shouldUpdateTopicInKafkaWhenStringConfigChangedInKube() throws Execu shouldUpdateTopicInKafkaWhenConfigChangedInKube(kafkaCluster, kt, TopicControllerIT::setSnappyCompression, expectedCreateConfigs -> { - Map expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs); + Map expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs); expectedUpdatedConfigs.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy"); return expectedUpdatedConfigs; }); @@ -900,7 +896,7 @@ public void shouldUpdateTopicInKafkaWhenIntConfigChangedInKube() throws Executio return theKt; }, expectedCreateConfigs -> { - Map expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs); + Map expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs); expectedUpdatedConfigs.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "5678"); return expectedUpdatedConfigs; }); @@ -919,7 +915,7 @@ public void shouldUpdateTopicInKafkaWhenLongConfigChangedInKube() throws Executi return theKt; }, expectedCreateConfigs -> { - Map expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs); + Map expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs); expectedUpdatedConfigs.put(TopicConfig.FLUSH_MS_CONFIG, "9876"); return expectedUpdatedConfigs; }); @@ -938,7 +934,7 @@ public void shouldUpdateTopicInKafkaWhenDoubleConfigChangedInKube() throws Execu return theKt; }, expectedCreateConfigs -> { - Map expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs); + Map expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs); expectedUpdatedConfigs.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.1"); return expectedUpdatedConfigs; }); @@ -957,7 +953,7 @@ public void shouldUpdateTopicInKafkaWhenBooleanConfigChangedInKube() throws Exec return theKt; }, expectedCreateConfigs -> { - Map expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs); + Map expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs); expectedUpdatedConfigs.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false"); return expectedUpdatedConfigs; }); @@ -976,7 +972,7 @@ public void shouldUpdateTopicInKafkaWhenListConfigChangedInKube() throws Executi return theKt; }, expectedCreateConfigs -> { - Map expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs); + Map expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs); expectedUpdatedConfigs.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete"); return expectedUpdatedConfigs; }); @@ -995,7 +991,7 @@ public void shouldUpdateTopicInKafkaWhenConfigRemovedInKube() throws ExecutionEx return theKt; }, expectedCreateConfigs -> { - var expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs); + var expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs); expectedUpdatedConfigs.remove(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG); return expectedUpdatedConfigs; });