Skip to content

Commit

Permalink
Address review comments from scholzj and tinaselenge
Browse files Browse the repository at this point in the history
Signed-off-by: Katherine Stanley <[email protected]>
  • Loading branch information
katheris committed Jan 13, 2025
1 parent 858b23d commit 65aadd4
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1268,6 +1268,7 @@ public List<Secret> generateCertificatesSecrets(ClusterCa clusterCa, ClientsCa c
LOGGER.warnCr(reconciliation, "Error while generating certificates", e);
throw new RuntimeException("Failed to prepare Kafka certificates", e);
}

return updatedCerts.entrySet()
.stream()
.map(entry -> ModelUtils.createSecret(entry.getKey(), namespace, labels, ownerReference,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ public class KafkaReconciler {
private final Map<Integer, String> brokerLoggingHash = new HashMap<>();
private final Map<Integer, String> brokerConfigurationHash = new HashMap<>();
private final Map<Integer, String> kafkaServerCertificateHash = new HashMap<>();
private final List<String> secretsToDelete = new ArrayList<>();
/* test */ TlsPemIdentity coTlsPemIdentity;
/* test */ KafkaListenersReconciler.ReconciliationResult listenerReconciliationResults; // Result of the listener reconciliation with the listener details

Expand Down Expand Up @@ -273,6 +274,7 @@ public Future<Void> reconcile(KafkaStatus kafkaStatus, Clock clock) {
.compose(i -> metadataVersion(kafkaStatus))
.compose(i -> deletePersistentClaims())
.compose(i -> sharedKafkaConfigurationCleanup())
.compose(i -> deleteOldCertificateSecrets())
// This has to run after all possible rolling updates which might move the pods to different nodes
.compose(i -> nodePortExternalListenerStatus())
.compose(i -> updateKafkaStatus(kafkaStatus));
Expand Down Expand Up @@ -746,26 +748,25 @@ protected Future<Void> certificateSecrets(Clock clock) {
listenerReconciliationResults.bootstrapDnsNames, listenerReconciliationResults.brokerDnsNames,
Util.isMaintenanceTimeWindowsSatisfied(reconciliation, maintenanceWindows, clock.instant()));

List<String> secretsToDelete = new ArrayList<>(existingSecrets.stream().map(secret -> secret.getMetadata().getName()).toList());
secretsToDelete.removeAll(desiredCertSecrets.stream().map(secret -> secret.getMetadata().getName()).toList());
// Don't delete jmx secrets
secretsToDelete.remove(KafkaResources.kafkaJmxSecretName(reconciliation.name()));

Future<Void> deleteSecrets = deleteOldCertificateSecrets(secretsToDelete);
Future<Void> updateSecrets = updateCertificateSecrets(desiredCertSecrets);
return Future.join(deleteSecrets, updateSecrets);
List<String> desiredCertSecretNames = desiredCertSecrets.stream().map(secret -> secret.getMetadata().getName()).toList();
existingSecrets.forEach(secret -> {
String secretName = secret.getMetadata().getName();
// Don't delete desired secrets or jmx secrets
if (!desiredCertSecretNames.contains(secretName) && !KafkaResources.kafkaJmxSecretName(reconciliation.name()).equals(secretName)) {
secretsToDelete.add(secretName);
}
});
return updateCertificateSecrets(desiredCertSecrets);
}).mapEmpty();
}

/**
* Delete old certificate Secrets that are no longer needed.
*
* @param secrets List of Secrets to delete.
*
* @return Future that completes when the Secrets have been deleted.
*/
protected Future<Void> deleteOldCertificateSecrets(List<String> secrets) {
List<Future<Void>> deleteFutures = secrets.stream()
protected Future<Void> deleteOldCertificateSecrets() {
List<Future<Void>> deleteFutures = secretsToDelete.stream()
.map(secretName -> {
LOGGER.debugCr(reconciliation, "Deleting old Secret {}/{} that is no longer used.", reconciliation.namespace(), secretName);
return secretOperator.deleteAsync(reconciliation, reconciliation.namespace(), secretName, false);
Expand All @@ -777,7 +778,7 @@ protected Future<Void> deleteOldCertificateSecrets(List<String> secrets) {
return secretOperator.getAsync(reconciliation.namespace(), oldSecretName)
.compose(oldSecret -> {
if (oldSecret != null) {
LOGGER.debugCr(reconciliation, "Deleting old Secret {}/{} that is no longer needed.", reconciliation.namespace(), oldSecretName);
LOGGER.debugCr(reconciliation, "Deleting legacy Secret {}/{} that is replaced by pod specific Secret.", reconciliation.namespace(), oldSecretName);
deleteFutures.add(secretOperator.deleteAsync(reconciliation, reconciliation.namespace(), oldSecretName, false));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,13 @@ public void testReconcileAddAndRemovePool(VertxTestContext context) {
operator.reconcile(new Reconciliation("initial-trigger", Kafka.RESOURCE_KIND, namespace, CLUSTER_NAME))
.onComplete(context.succeeding(v -> context.verify(() -> {
assertThat(client.secrets().inNamespace(namespace).withLabels(Labels.fromMap(kafkaLabels).withStrimziComponentType("kafka").toMap()).list().getItems().size(), is(6));
assertThat(client.pods().inNamespace(namespace).withLabels(kafkaLabels).list().getItems().size(), is(6));
List<Pod> initialPods = client.pods().inNamespace(namespace).withLabels(kafkaLabels).list().getItems();
assertThat(initialPods.size(), is(6));
for (Pod pod : initialPods) {
Secret certSecret = client.secrets().inNamespace(namespace).withName(pod.getMetadata().getName()).get();
assertThat(certSecret, is(notNullValue()));
assertThat(certSecret.getData().keySet(), hasItems(pod.getMetadata().getName() + ".crt", pod.getMetadata().getName() + ".key"));
}

KafkaNodePool newPool = new KafkaNodePoolBuilder()
.withNewMetadata()
Expand All @@ -658,9 +664,17 @@ public void testReconcileAddAndRemovePool(VertxTestContext context) {
// Assert that the new pool is added
assertThat(client.secrets().inNamespace(namespace).withLabels(Labels.fromMap(kafkaLabels).withStrimziComponentType("kafka").toMap()).list().getItems().size(), is(9));
assertThat(client.pods().inNamespace(namespace).withLabels(kafkaLabels).list().getItems().size(), is(9));
assertThat(client.pods().inNamespace(namespace).withName(CLUSTER_NAME + "-new-pool-13").get(), is(notNullValue()));
assertThat(client.pods().inNamespace(namespace).withName(CLUSTER_NAME + "-new-pool-14").get(), is(notNullValue()));
assertThat(client.pods().inNamespace(namespace).withName(CLUSTER_NAME + "-new-pool-15").get(), is(notNullValue()));
List<String> expectedNewResources = List.of(
CLUSTER_NAME + "-new-pool-13",
CLUSTER_NAME + "-new-pool-14",
CLUSTER_NAME + "-new-pool-15"
);
for (String resourceName : expectedNewResources) {
assertThat(client.pods().inNamespace(namespace).withName(resourceName).get(), is(notNullValue()));
Secret certSecret = client.secrets().inNamespace(namespace).withName(resourceName).get();
assertThat(certSecret, is(notNullValue()));
assertThat(certSecret.getData().keySet(), hasItems(resourceName + ".crt", resourceName + ".key"));
}

Kafka kafka = Crds.kafkaOperation(client).inNamespace(namespace).withName(CLUSTER_NAME).get();
assertThat(kafka.getStatus().getKafkaNodePools().size(), is(3));
Expand Down Expand Up @@ -694,9 +708,15 @@ public void testReconcileAddAndRemovePool(VertxTestContext context) {
// Assert that the new pool is deleted
assertThat(client.secrets().inNamespace(namespace).withLabels(Labels.fromMap(kafkaLabels).withStrimziComponentType("kafka").toMap()).list().getItems().size(), is(6));
assertThat(client.pods().inNamespace(namespace).withLabels(kafkaLabels).list().getItems().size(), is(6));
assertThat(client.pods().inNamespace(namespace).withName(CLUSTER_NAME + "-new-pool-13").get(), is(nullValue()));
assertThat(client.pods().inNamespace(namespace).withName(CLUSTER_NAME + "-new-pool-14").get(), is(nullValue()));
assertThat(client.pods().inNamespace(namespace).withName(CLUSTER_NAME + "-new-pool-15").get(), is(nullValue()));
List<String> expectedDeletedResources = List.of(
CLUSTER_NAME + "-new-pool-13",
CLUSTER_NAME + "-new-pool-14",
CLUSTER_NAME + "-new-pool-15"
);
for (String resourceName : expectedDeletedResources) {
assertThat(client.pods().inNamespace(namespace).withName(resourceName).get(), is(nullValue()));
assertThat(client.secrets().inNamespace(namespace).withName(resourceName).get(), is(nullValue()));
}

Kafka kafka = Crds.kafkaOperation(client).inNamespace(namespace).withName(CLUSTER_NAME).get();
assertThat(kafka.getStatus().getKafkaNodePools().size(), is(2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ The following resources are created by the Cluster Operator in the Kubernetes cl
`<kafka_cluster_name>-kafka-<listener_name>-bootstrap`:: Bootstrap route for clients connecting from outside the Kubernetes cluster. This resource is created only when an external listener is enabled and set to type `route`. The new route name will be used for all other external listeners.
`<kafka_cluster_name>-kafka-<listener_name>-<pod_id>`:: Route for traffic from outside the Kubernetes cluster to individual pods. This resource is created only when an external listener is enabled and set to type `route`. The new route name will be used for all other external listeners.
`<kafka_cluster_name>-kafka-config`:: ConfigMap containing the Kafka ancillary configuration, which is mounted as a volume by the broker pods when the `UseStrimziPodSets` feature gate is disabled.
`<kafka_cluster_name>-<pool_name>-<pod_id>_`:: Secret with Kafka pod public and private keys.
`<kafka_cluster_name>-<pool_name>-<pod_id>_`:: Secret with Kafka node public and private keys.
`<kafka_cluster_name>-network-policy-kafka`:: Network policy managing access to the Kafka services.
`strimzi-_namespace-name_-<kafka_cluster_name>-kafka-init`:: Cluster role binding used by the Kafka brokers.
`<kafka_cluster_name>-jmx`:: Secret with JMX username and password used to secure the Kafka broker port. This resource is created only when JMX is enabled in Kafka.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ Contains the public key of the clients CA. Kafka brokers use the public key to v
Secrets for communication between Strimzi components contain a private key and a public key certificate signed by the cluster CA.

`_<cluster_name>_-_<pool_name>_-_<pod_id>_`::
Contains the private and public keys for a Kafka pod. Each pod has its own secret.
Contains the private and public keys for a Kafka node. Each pod has its own secret.
`_<cluster_name>_-cluster-operator-certs`:: Contains the private and public keys for encrypting communication between the Cluster Operator and Kafka.
`_<cluster_name>_-entity-topic-operator-certs`::
Contains the private and public keys for encrypting communication between the Topic Operator and Kafka.
Expand Down Expand Up @@ -156,12 +156,6 @@ m¦ca.crt
¦Field
¦Description

m¦_<cluster_name>_-kafka-_<pod_id>_.p12
¦PKCS #12 store for storing certificates and keys.

m¦_<cluster_name>_-kafka-_<pod_id>_.password
¦Password for protecting the PKCS #12 store.

m¦_<cluster_name>_-kafka-_<pod_id>_.crt
¦Certificate for a Kafka pod _<num>_. Signed by a current or former cluster CA private key in `_<cluster_name>_-cluster-ca`.

Expand Down

0 comments on commit 65aadd4

Please sign in to comment.