Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store Kafka node certificates in separate Secrets #10967

Merged
merged 7 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* Support for ZooKeeper-based Apache Kafka clusters and for KRaft migration has been removed
* Support for MirrorMaker 1 has been removed
* Added support to configure `dnsPolicy` and `dnsConfig` using the `template` sections.
* Store Kafka node certificates in separate Secrets, one Secret per pod.

### Major changes, deprecations and removals

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public static String initContainerClusterRoleBindingName(String cluster, String
*
* @return The name of the corresponding Kafka Secret.
*/
@Deprecated // Kafka server certificates are now kept in per-node Secrets
public static String kafkaSecretName(String clusterName) {
return clusterName + "-kafka-brokers";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,21 @@ public static Map<String, String> buildSecretData(Map<String, CertAndKey> certif
return data;
}

/**
* Constructs a Map containing the provided certificates to be stored in a Kubernetes Secret.
*
* @param certificateName Name to use to create identifier for storing the data in the Secret.
* @param certAndKey Private key and public cert pair to store in the Secret.
*
* @return Map of certificate identifier to base64 encoded certificate or key
*/
public static Map<String, String> buildSecretData(String certificateName, CertAndKey certAndKey) {
return Map.of(
Ca.SecretEntry.KEY.asKey(certificateName), certAndKey.keyAsBase64String(),
Ca.SecretEntry.CRT.asKey(certificateName), certAndKey.certAsBase64String()
);
}

private static byte[] decodeFromSecret(Secret secret, String key) {
if (secret.getData().get(key) != null && !secret.getData().get(key).isEmpty()) {
return Util.decodeBytesFromBase64(secret.getData().get(key));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ protected String caName() {
* @param existingCertificates Existing certificates (or null if they do not exist yet)
* @param nodes Nodes that are part of the Cruise Control cluster
* @param isMaintenanceTimeWindowsSatisfied Flag indicating whether we can do maintenance tasks or not
* @param caCertGenerationChanged Flag indicating whether the CA cert generation has changed since the existing certificates were issued
*
* @return Map with CertAndKey object containing the public and private key
*
Expand All @@ -118,8 +117,7 @@ protected Map<String, CertAndKey> generateCcCerts(
String clusterName,
Map<String, CertAndKey> existingCertificates,
Set<NodeRef> nodes,
boolean isMaintenanceTimeWindowsSatisfied,
boolean caCertGenerationChanged
scholzj marked this conversation as resolved.
Show resolved Hide resolved
boolean isMaintenanceTimeWindowsSatisfied
) throws IOException {
DnsNameGenerator ccDnsGenerator = DnsNameGenerator.of(namespace, CruiseControlResources.serviceName(clusterName));

Expand All @@ -143,8 +141,8 @@ protected Map<String, CertAndKey> generateCcCerts(
nodes,
subjectFn,
existingCertificates,
isMaintenanceTimeWindowsSatisfied,
caCertGenerationChanged);
isMaintenanceTimeWindowsSatisfied
);
}

/**
Expand All @@ -158,7 +156,6 @@ protected Map<String, CertAndKey> generateCcCerts(
* @param externalBootstrapAddresses List of external bootstrap addresses (used for certificate SANs)
* @param externalAddresses Map with external listener addresses for the different nodes (used for certificate SANs)
* @param isMaintenanceTimeWindowsSatisfied Flag indicating whether we can do maintenance tasks or not
* @param caCertGenerationChanged Flag indicating whether the CA cert generation has changed since the existing certificates were issued
*
* @return Map with CertAndKey objects containing the public and private keys for the different brokers
*
Expand All @@ -171,8 +168,7 @@ protected Map<String, CertAndKey> generateBrokerCerts(
Set<NodeRef> nodes,
Set<String> externalBootstrapAddresses,
Map<Integer, Set<String>> externalAddresses,
boolean isMaintenanceTimeWindowsSatisfied,
boolean caCertGenerationChanged
boolean isMaintenanceTimeWindowsSatisfied
) throws IOException {
Function<NodeRef, Subject> subjectFn = node -> {
Subject.Builder subject = new Subject.Builder()
Expand Down Expand Up @@ -219,8 +215,8 @@ protected Map<String, CertAndKey> generateBrokerCerts(
nodes,
subjectFn,
existingCertificates,
isMaintenanceTimeWindowsSatisfied,
caCertGenerationChanged);
isMaintenanceTimeWindowsSatisfied
);
}

@Override
Expand All @@ -237,7 +233,6 @@ protected String caCertGenerationAnnotation() {
* @param subjectFn Function to generate certificate subject for given node / pod
* @param existingCertificates Existing certificates (or null if they do not exist yet)
* @param isMaintenanceTimeWindowsSatisfied Flag indicating if we are inside a maintenance window or not
* @param caCertGenerationChanged Flag indicating whether the CA cert generation has changed since the existing certificates were issued
*
* @return Returns map with node certificates which can be used to create or update the stored certificates
*
Expand All @@ -248,8 +243,7 @@ protected String caCertGenerationAnnotation() {
Set<NodeRef> nodes,
Function<NodeRef, Subject> subjectFn,
Map<String, CertAndKey> existingCertificates,
boolean isMaintenanceTimeWindowsSatisfied,
boolean caCertGenerationChanged
boolean isMaintenanceTimeWindowsSatisfied
) throws IOException {
// Maps for storing the certificates => will be used in the new or updated certificate store. This map is filled in this method and returned at the end.
Map<String, CertAndKey> certs = new HashMap<>();
Expand All @@ -269,7 +263,6 @@ protected String caCertGenerationAnnotation() {

if (!this.certRenewed() // No CA renewal is happening
&& certAndKey != null // There is a public cert and private key for this pod
&& !caCertGenerationChanged // The CA cert generation has not changed since the existing certificates were issued
) {
// A certificate for this node already exists, so we will try to reuse it
LOGGER.debugCr(reconciliation, "Certificate for node {} already exists", node);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,10 @@ public Secret generateCertificatesSecret(String namespace, String clusterName, C
LOGGER.debugCr(reconciliation, "Generating certificates");
try {
Set<NodeRef> nodes = Set.of(new NodeRef(CruiseControl.COMPONENT_TYPE, 0, null, false, false));
ccCerts = clusterCa.generateCcCerts(namespace, clusterName, CertUtils.extractCertsAndKeysFromSecret(existingSecret, nodes),
nodes, isMaintenanceTimeWindowsSatisfied, clusterCa.hasCaCertGenerationChanged(existingSecret));
ccCerts = clusterCa.generateCcCerts(namespace, clusterName,
// Only pass existing certificates if the CA cert generation hasn't changed since they were generated
clusterCa.hasCaCertGenerationChanged(existingSecret) ? Map.of() : CertUtils.extractCertsAndKeysFromSecret(existingSecret, nodes),
nodes, isMaintenanceTimeWindowsSatisfied);
} catch (IOException e) {
LOGGER.warnCr(reconciliation, "Error while generating certificates", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,10 @@
*/
@SuppressWarnings({"checkstyle:ClassDataAbstractionCoupling", "checkstyle:ClassFanOutComplexity"})
public class KafkaCluster extends AbstractModel implements SupportsMetrics, SupportsLogging, SupportsJmx {
protected static final String COMPONENT_TYPE = "kafka";
/**
* Component type used by Kubernetes labels
*/
public static final String COMPONENT_TYPE = "kafka";

protected static final String ENV_VAR_KAFKA_INIT_EXTERNAL_ADDRESS = "EXTERNAL_ADDRESS";
private static final String ENV_VAR_KAFKA_METRICS_ENABLED = "KAFKA_METRICS_ENABLED";
Expand Down Expand Up @@ -1230,37 +1233,55 @@ public List<StrimziPodSet> generatePodSets(boolean isOpenShift,
}

/**
* Generates the private keys for the Kafka brokers (if needed) and the secret with them which contains both the
* Generates the private keys for the Kafka nodes (if needed) and the Secrets with them which contain both the
* public and private keys.
*
* @param clusterCa The CA for cluster certificates
* @param clientsCa The CA for clients certificates
* @param existingSecret The existing secret with Kafka certificates
* @param existingSecrets The existing secrets containing Kafka certificates
* @param externalBootstrapDnsName Map with bootstrap DNS names which should be added to the certificate
* @param externalDnsNames Map with broker DNS names which should be added to the certificate
* @param isMaintenanceTimeWindowsSatisfied Indicates whether we are in a maintenance window or not
*
* @return The generated Secret with broker certificates
* @return The generated Secrets containing Kafka node certificates
*/
public Secret generateCertificatesSecret(ClusterCa clusterCa, ClientsCa clientsCa, Secret existingSecret, Set<String> externalBootstrapDnsName, Map<Integer, Set<String>> externalDnsNames, boolean isMaintenanceTimeWindowsSatisfied) {
public List<Secret> generateCertificatesSecrets(ClusterCa clusterCa, ClientsCa clientsCa, List<Secret> existingSecrets, Set<String> externalBootstrapDnsName, Map<Integer, Set<String>> externalDnsNames, boolean isMaintenanceTimeWindowsSatisfied) {
Map<String, Secret> existingSecretWithName = existingSecrets.stream().collect(Collectors.toMap(secret -> secret.getMetadata().getName(), secret -> secret));
Set<NodeRef> nodes = nodes();
Map<String, CertAndKey> brokerCerts;
Map<String, CertAndKey> existingCerts = new HashMap<>();
for (NodeRef node : nodes) {
String podName = node.podName();
// Reuse existing certificate if it exists and the CA cert generation hasn't changed since they were generated
if (existingSecretWithName.get(podName) != null) {
if (clusterCa.hasCaCertGenerationChanged(existingSecretWithName.get(podName))) {
LOGGER.debugCr(reconciliation, "Certificate for pod {}/{} has old cert generation", namespace, podName);
} else {
existingCerts.put(podName, CertUtils.keyStoreCertAndKey(existingSecretWithName.get(podName), podName));
}
} else {
LOGGER.debugCr(reconciliation, "No existing certificate found for pod {}/{}", namespace, podName);
}
}

Map<String, CertAndKey> updatedCerts;
try {
brokerCerts = clusterCa.generateBrokerCerts(namespace, cluster, CertUtils.extractCertsAndKeysFromSecret(existingSecret, nodes),
nodes, externalBootstrapDnsName, externalDnsNames, isMaintenanceTimeWindowsSatisfied, clusterCa.hasCaCertGenerationChanged(existingSecret));
updatedCerts = clusterCa.generateBrokerCerts(namespace, cluster, existingCerts,
nodes, externalBootstrapDnsName, externalDnsNames, isMaintenanceTimeWindowsSatisfied);
} catch (IOException e) {
LOGGER.warnCr(reconciliation, "Error while generating certificates", e);
throw new RuntimeException("Failed to prepare Kafka certificates", e);
}

return ModelUtils.createSecret(KafkaResources.kafkaSecretName(cluster), namespace, labels, ownerReference,
CertUtils.buildSecretData(brokerCerts),
Map.ofEntries(
clusterCa.caCertGenerationFullAnnotation(),
clientsCa.caCertGenerationFullAnnotation()
),
emptyMap());
return updatedCerts.entrySet()
katheris marked this conversation as resolved.
Show resolved Hide resolved
.stream()
.map(entry -> ModelUtils.createSecret(entry.getKey(), namespace, labels, ownerReference,
CertUtils.buildSecretData(entry.getKey(), entry.getValue()),
Map.ofEntries(
clusterCa.caCertGenerationFullAnnotation(),
clientsCa.caCertGenerationFullAnnotation()
),
emptyMap()))
.toList();
}

/**
Expand Down Expand Up @@ -1354,7 +1375,7 @@ private List<Volume> getNonDataVolumes(boolean isOpenShift, NodeRef node, PodTem

volumeList.add(VolumeUtils.createTempDirVolume(templatePod));
volumeList.add(VolumeUtils.createSecretVolume(CLUSTER_CA_CERTS_VOLUME, AbstractModel.clusterCaCertSecretName(cluster), isOpenShift));
volumeList.add(VolumeUtils.createSecretVolume(BROKER_CERTS_VOLUME, KafkaResources.kafkaSecretName(cluster), isOpenShift));
volumeList.add(VolumeUtils.createSecretVolume(BROKER_CERTS_VOLUME, node.podName(), isOpenShift));
volumeList.add(VolumeUtils.createSecretVolume(CLIENT_CA_CERTS_VOLUME, KafkaResources.clientsCaCertificateSecretName(cluster), isOpenShift));
volumeList.add(VolumeUtils.createConfigMapVolume(LOG_AND_METRICS_CONFIG_VOLUME_NAME, node.podName()));
volumeList.add(VolumeUtils.createEmptyDirVolume("ready-files", "1Ki", "Memory"));
Expand Down
Loading
Loading