Skip to content

Commit

Permalink
more fixes
Browse files Browse the repository at this point in the history
Signed-off-by: see-quick <[email protected]>
  • Loading branch information
see-quick committed Dec 20, 2024
1 parent f8377c8 commit 10ec646
Showing 1 changed file with 26 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,15 @@ class TopicControllerIT implements TestSeparator {
private static final Logger LOGGER = LogManager.getLogger(TopicControllerIT.class);

private static final String NAMESPACE = TopicOperatorTestUtil.namespaceName(TopicControllerIT.class);
public static final Map<String, String> SELECTOR = Map.of("foo", "FOO", "bar", "BAR");
private static final Map<String, String> SELECTOR = Map.of("foo", "FOO", "bar", "BAR");
private static final Map<String, Object> TEST_TOPIC_CONFIG = Map.of(
TopicConfig.CLEANUP_POLICY_CONFIG, List.of("compact"),
TopicConfig.COMPRESSION_TYPE_CONFIG, "producer",
TopicConfig.FLUSH_MS_CONFIG, 1234L,
TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, 1234,
TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, 0.6,
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, true
);

static KubernetesClient kubernetesClient;
TopicOperatorMain operator;
Expand Down Expand Up @@ -437,21 +445,14 @@ static List<KafkaTopic> managedKafkaTopics() {

static List<KafkaTopic> managedKafkaTopicsWithConfigs() {
var topicName = "topic" + System.nanoTime();
var configs = Map.of(
TopicConfig.CLEANUP_POLICY_CONFIG, List.of("compact"), // list typed
TopicConfig.COMPRESSION_TYPE_CONFIG, "producer", // string typed
TopicConfig.FLUSH_MS_CONFIG, 1234L, // long typed
TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, 1234, // int typed
TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, 0.6, // double typed
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, true // boolean typed
);

return List.of(
kafkaTopic(NAMESPACE, topicName + "a", SELECTOR, null, true, topicName + "a", 2, 1, configs),
kafkaTopic(NAMESPACE, topicName + "b", SELECTOR, null, true, null, 2, 1, configs),
kafkaTopic(NAMESPACE, topicName + "c", SELECTOR, null, true, topicName + "c".toUpperCase(Locale.ROOT), 2, 1, configs),
kafkaTopic(NAMESPACE, topicName + "d", SELECTOR, null, null, topicName + "d", 2, 1, configs),
kafkaTopic(NAMESPACE, topicName + "e", SELECTOR, null, null, null, 2, 1, configs),
kafkaTopic(NAMESPACE, topicName + "f", SELECTOR, null, null, topicName + "f".toUpperCase(Locale.ROOT), 2, 1, configs)
kafkaTopic(NAMESPACE, topicName + "a", SELECTOR, null, true, topicName + "a", 2, 1, TEST_TOPIC_CONFIG),
kafkaTopic(NAMESPACE, topicName + "b", SELECTOR, null, true, null, 2, 1, TEST_TOPIC_CONFIG),
kafkaTopic(NAMESPACE, topicName + "c", SELECTOR, null, true, topicName + "c".toUpperCase(Locale.ROOT), 2, 1, TEST_TOPIC_CONFIG),
kafkaTopic(NAMESPACE, topicName + "d", SELECTOR, null, null, topicName + "d", 2, 1, TEST_TOPIC_CONFIG),
kafkaTopic(NAMESPACE, topicName + "e", SELECTOR, null, null, null, 2, 1, TEST_TOPIC_CONFIG),
kafkaTopic(NAMESPACE, topicName + "f", SELECTOR, null, null, topicName + "f".toUpperCase(Locale.ROOT), 2, 1, TEST_TOPIC_CONFIG)
);
}

Expand Down Expand Up @@ -851,14 +852,9 @@ private void shouldUpdateTopicInKafkaWhenConfigChangedInKube(StrimziKafkaCluster
UnaryOperator<Map<String, String>> expectedChangedConfigs) throws ExecutionException, InterruptedException, TimeoutException {
// given
var expectedTopicName = TopicOperatorUtil.topicName(kt);
var expectedCreateConfigs = Map.of(
TopicConfig.CLEANUP_POLICY_CONFIG, "compact", // list typed
TopicConfig.COMPRESSION_TYPE_CONFIG, "producer", // string typed
TopicConfig.FLUSH_MS_CONFIG, "1234", // long typed
TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "1234", // int typed
TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.6", // double typed
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true" // boolean typed
);
var expectedCreateConfigs = TEST_TOPIC_CONFIG.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() instanceof List
? String.join(",", (List) e.getValue()) : String.valueOf(e.getValue())));
Map<String, String> expectedConfigs = expectedChangedConfigs.apply(expectedCreateConfigs);
assertNotEquals(expectedCreateConfigs, expectedConfigs);

Expand All @@ -881,7 +877,7 @@ public void shouldUpdateTopicInKafkaWhenStringConfigChangedInKube() throws Execu
shouldUpdateTopicInKafkaWhenConfigChangedInKube(kafkaCluster, kt,
TopicControllerIT::setSnappyCompression,
expectedCreateConfigs -> {
Map<String, String> expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs);
Map<String, String> expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs);
expectedUpdatedConfigs.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy");
return expectedUpdatedConfigs;
});
Expand All @@ -900,7 +896,7 @@ public void shouldUpdateTopicInKafkaWhenIntConfigChangedInKube() throws Executio
return theKt;
},
expectedCreateConfigs -> {
Map<String, String> expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs);
Map<String, String> expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs);
expectedUpdatedConfigs.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "5678");
return expectedUpdatedConfigs;
});
Expand All @@ -919,7 +915,7 @@ public void shouldUpdateTopicInKafkaWhenLongConfigChangedInKube() throws Executi
return theKt;
},
expectedCreateConfigs -> {
Map<String, String> expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs);
Map<String, String> expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs);
expectedUpdatedConfigs.put(TopicConfig.FLUSH_MS_CONFIG, "9876");
return expectedUpdatedConfigs;
});
Expand All @@ -938,7 +934,7 @@ public void shouldUpdateTopicInKafkaWhenDoubleConfigChangedInKube() throws Execu
return theKt;
},
expectedCreateConfigs -> {
Map<String, String> expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs);
Map<String, String> expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs);
expectedUpdatedConfigs.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.1");
return expectedUpdatedConfigs;
});
Expand All @@ -957,7 +953,7 @@ public void shouldUpdateTopicInKafkaWhenBooleanConfigChangedInKube() throws Exec
return theKt;
},
expectedCreateConfigs -> {
Map<String, String> expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs);
Map<String, String> expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs);
expectedUpdatedConfigs.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false");
return expectedUpdatedConfigs;
});
Expand All @@ -976,7 +972,7 @@ public void shouldUpdateTopicInKafkaWhenListConfigChangedInKube() throws Executi
return theKt;
},
expectedCreateConfigs -> {
Map<String, String> expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs);
Map<String, String> expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs);
expectedUpdatedConfigs.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete");
return expectedUpdatedConfigs;
});
Expand All @@ -995,7 +991,7 @@ public void shouldUpdateTopicInKafkaWhenConfigRemovedInKube() throws ExecutionEx
return theKt;
},
expectedCreateConfigs -> {
var expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs);
var expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs);
expectedUpdatedConfigs.remove(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG);
return expectedUpdatedConfigs;
});
Expand Down

0 comments on commit 10ec646

Please sign in to comment.