Skip to content

Commit

Permalink
Remove ZooKeeper support - Part I
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Scholz <[email protected]>
  • Loading branch information
scholzj committed Dec 18, 2024
1 parent 829a29a commit 8cc3dd8
Show file tree
Hide file tree
Showing 158 changed files with 894 additions and 28,873 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
public class KafkaAuthorizationSimple extends KafkaAuthorization {
public static final String TYPE_SIMPLE = "simple";

public static final String AUTHORIZER_CLASS_NAME = "kafka.security.authorizer.AclAuthorizer";
public static final String KRAFT_AUTHORIZER_CLASS_NAME = "org.apache.kafka.metadata.authorizer.StandardAuthorizer";

private List<String> superUsers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,31 @@ public interface PodSecurityProvider {
void configure(PlatformFeatures platformFeatures);

/**
* Provides the Pod security context for the ZooKeeper pods. The default implementation just returns the security
* context configured by the user in the template section or null (no Pod security context).
* Provides the Pod security context for the ZooKeeper pods. However, since Zookeeper is no longer supported, this
* method has been deprecated and throws an UnsupportedOperationException exception.
*
* @param context Provides the context which can be used to generate the Pod security context
*
* @return Pod security context which will be set for the ZooKeeper pods
*/
@Deprecated
@SuppressWarnings("unused")
default PodSecurityContext zooKeeperPodSecurityContext(PodSecurityProviderContext context) {
return podSecurityContextOrNull(context);
throw new UnsupportedOperationException("ZooKeeper pods are not supported anymore");
}

/**
* Provides the (container) security context for the ZooKeeper containers. The default implementation just
* returns the security context configured by the user in the template section or null (no security context).
* Provides the (container) security context for the ZooKeeper containers. However, since Zookeeper is no longer
* supported, this method has been deprecated and throws an UnsupportedOperationException exception.
*
* @param context Provides the context which can be used to generate the security context
*
* @return Security context which will be set for the ZooKeeper containers
*/
@Deprecated
@SuppressWarnings("unused")
default SecurityContext zooKeeperContainerSecurityContext(ContainerSecurityProviderContext context) {
return securityContextOrNull(context);
throw new UnsupportedOperationException("ZooKeeper container is not supported anymore");
}

/**
Expand Down Expand Up @@ -120,8 +124,8 @@ default SecurityContext entityUserOperatorContainerSecurityContext(ContainerSecu
}

/**
* Provides the (container) security context for the TLS sidecar container. The default implementation just
* returns the security context configured by the user in the template section or null (no security context).
* Provides the (container) security context for the TLS sidecar container. TLS sidecar is not used anymore and this
* method always throws an UnsupportedOperationException exception.
*
* @param context Provides the context which can be used to generate the security context
*
Expand Down Expand Up @@ -183,7 +187,8 @@ default SecurityContext cruiseControlContainerSecurityContext(ContainerSecurityP

/**
* Previously, this method was responsible for providing PodSecurityContext for the JMXTrans deployment in Strimzi.
* However, since JMXTrans is no longer supported, this method has been deprecated and always returns null.
* However, since JMXTrans is no longer supported, this method has been deprecated and throws an
* UnsupportedOperationException exception.
*
* @param context Provides the context which can be used to generate the Pod security context
*
Expand All @@ -197,7 +202,8 @@ default PodSecurityContext jmxTransPodSecurityContext(PodSecurityProviderContext

/**
* Previously, this method was responsible for providing SecurityContext for the JMXTrans container in Strimzi.
* However, since JMXTrans is no longer supported, this method has been deprecated and always returns null.
* However, since JMXTrans is no longer supported, this method has been deprecated and throws an
* UnsupportedOperationException exception.
*
* @param context Provides the context which can be used to generate the security context
*
Expand Down Expand Up @@ -272,7 +278,7 @@ default SecurityContext kafkaConnectBuildContainerSecurityContext(ContainerSecur

/**
* Previously, this method provided the Pod security context for the Kafka Mirror Maker 1 pods. As Mirror Maker 1 is
* not supported anymore, this method is deprecated and always returns null.
* not supported anymore, this method is deprecated and throws an UnsupportedOperationException exception.
*
* @param context Provides the context which can be used to generate the Pod security context
*
Expand All @@ -286,7 +292,7 @@ default PodSecurityContext kafkaMirrorMakerPodSecurityContext(PodSecurityProvide

/**
* Previously, this method provided the security context for the Kafka Mirror Maker 1 containers. As Mirror Maker 1
* is not supported anymore, this method is deprecated and always returns null.
* is not supported anymore, this method is deprecated and throws an UnsupportedOperationException exception.
*
* @param context Provides the context which can be used to generate the security context
*
Expand Down
11 changes: 0 additions & 11 deletions cluster-operator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -225,15 +225,6 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-server-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>
<!-- ZooKeeper Jute is needed for the ZooKeeper client to work -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper-jute</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
Expand Down Expand Up @@ -340,8 +331,6 @@
<ignoredUnusedDeclaredDependency>io.fabric8:kubernetes-model-common</ignoredUnusedDeclaredDependency>
<!-- Needed at runtime by Fabric8 -->
<ignoredUnusedDeclaredDependency>io.fabric8:kubernetes-model-coordination</ignoredUnusedDeclaredDependency>
<!-- Jute is needed for the ZooKeeper client to work (not just in tests) - but Maven believes it is only a test dependency -->
<ignoredNonTestScopedDependency>org.apache.zookeeper:zookeeper-jute</ignoredNonTestScopedDependency>
</ignoredNonTestScopedDependencies>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,6 @@ public class ClusterOperatorConfig {
*/
public static final ConfigParameter<Integer> OPERATIONS_THREAD_POOL_SIZE = new ConfigParameter<>("STRIMZI_OPERATIONS_THREAD_POOL_SIZE", INTEGER, "10", CONFIG_VALUES);

/**
* Session timeout for the Zookeeper Admin client used in ZK scaling operations
*/
public static final ConfigParameter<Integer> ZOOKEEPER_ADMIN_SESSION_TIMEOUT_MS = new ConfigParameter<>("STRIMZI_ZOOKEEPER_ADMIN_SESSION_TIMEOUT_MS", INTEGER, "10000", CONFIG_VALUES);

/**
* Number of seconds to cache a successful DNS name lookup
*/
Expand Down Expand Up @@ -470,13 +465,6 @@ public long getOperationTimeoutMs() {
return get(OPERATION_TIMEOUT_MS);
}

/**
* @return how many milliseconds should we wait for Zookeeper Admin Sessions to timeout
*/
public int getZkAdminSessionTimeoutMs() {
return get(ZOOKEEPER_ADMIN_SESSION_TIMEOUT_MS);
}

/**
* @return How many milliseconds should we wait for Kafka Connect build to complete
*/
Expand Down Expand Up @@ -615,7 +603,6 @@ public String toString() {
"\n\toperatorNamespaceLabels='" + getOperatorNamespaceLabels() + '\'' +
"\n\tcustomResourceSelector='" + getCustomResourceSelector() + '\'' +
"\n\tfeatureGates='" + featureGates() + '\'' +
"\n\tzkAdminSessionTimeoutMs=" + getZkAdminSessionTimeoutMs() +
"\n\tdnsCacheTtlSec=" + getDnsCacheTtlSec() +
"\n\tpodSetReconciliationOnly=" + isPodSetReconciliationOnly() +
"\n\tpodSetControllerWorkQueueSize=" + getPodSetControllerWorkQueueSize() +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ static CompositeFuture deployClusterOperatorVerticles(Vertx vertx, KubernetesCli
client,
metricsProvider,
pfa,
config.getOperationTimeoutMs(),
config.getOperatorName()
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ protected AbstractModel(Reconciliation reconciliation, String cluster, String na
* @param reconciliation The reconciliation marker
* @param resource Custom resource with metadata containing the namespace and cluster name
* @param componentName Name of the Strimzi component usually consisting from the cluster name and component type
* @param componentType Type of the component that the extending class is deploying (e.g. Kafka, ZooKeeper etc. )
* @param componentType Type of the component that the extending class is deploying (e.g. Kafka etc. )
* @param sharedEnvironmentProvider Shared environment provider
*/
protected AbstractModel(Reconciliation reconciliation, HasMetadata resource, String componentName, String componentType, SharedEnvironmentProvider sharedEnvironmentProvider) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,60 +147,6 @@ protected Map<String, CertAndKey> generateCcCerts(
caCertGenerationChanged);
}

/**
* Prepares the ZooKeeper node certificates. It either reuses the existing certificates, renews them or generates new
* certificates if needed.
*
* @param namespace Namespace of the Kafka cluster
* @param clusterName Name of the Kafka cluster
* @param existingCertificates Existing certificates (or null if they do not exist yet)
* @param nodes Nodes that are part of the ZooKeeper cluster
* @param isMaintenanceTimeWindowsSatisfied Flag indicating whether we can do maintenance tasks or not
* @param caCertGenerationChanged Flag indicating whether the CA cert generation has changed since the existing certificates were issued
*
* @return Map with CertAndKey objects containing the public and private keys for the different nodes
*
* @throws IOException IOException is thrown when it is raised while working with the certificates
*/
protected Map<String, CertAndKey> generateZkCerts(
String namespace,
String clusterName,
Map<String, CertAndKey> existingCertificates,
Set<NodeRef> nodes,
boolean isMaintenanceTimeWindowsSatisfied,
boolean caCertGenerationChanged
) throws IOException {
DnsNameGenerator zkDnsGenerator = DnsNameGenerator.of(namespace, KafkaResources.zookeeperServiceName(clusterName));
DnsNameGenerator zkHeadlessDnsGenerator = DnsNameGenerator.of(namespace, KafkaResources.zookeeperHeadlessServiceName(clusterName));

Function<NodeRef, Subject> subjectFn = node -> {
Subject.Builder subject = new Subject.Builder()
.withOrganizationName("io.strimzi")
.withCommonName(KafkaResources.zookeeperComponentName(clusterName));
subject.addDnsName(KafkaResources.zookeeperServiceName(clusterName));
subject.addDnsName(String.format("%s.%s", KafkaResources.zookeeperServiceName(clusterName), namespace));
subject.addDnsName(zkDnsGenerator.serviceDnsNameWithoutClusterDomain());
subject.addDnsName(zkDnsGenerator.serviceDnsName());
subject.addDnsName(node.podName());
subject.addDnsName(DnsNameGenerator.podDnsName(namespace, KafkaResources.zookeeperHeadlessServiceName(clusterName), node.podName()));
subject.addDnsName(DnsNameGenerator.podDnsNameWithoutClusterDomain(namespace, KafkaResources.zookeeperHeadlessServiceName(clusterName), node.podName()));
subject.addDnsName(zkDnsGenerator.wildcardServiceDnsNameWithoutClusterDomain());
subject.addDnsName(zkDnsGenerator.wildcardServiceDnsName());
subject.addDnsName(zkHeadlessDnsGenerator.wildcardServiceDnsNameWithoutClusterDomain());
subject.addDnsName(zkHeadlessDnsGenerator.wildcardServiceDnsName());
return subject.build();
};

LOGGER.debugCr(reconciliation, "{}: Reconciling ZooKeeper certificates", this);
return maybeCopyOrGenerateCerts(
reconciliation,
nodes,
subjectFn,
existingCertificates,
isMaintenanceTimeWindowsSatisfied,
caCertGenerationChanged);
}

/**
* Prepares the Kafka broker certificates. It either reuses the existing certificates, renews them or generates new
* certificates if needed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,42 +57,6 @@ public static void validateMetadataVersion(String metadataVersion) {
}
}

/**
* In ZooKeeper mode, some of the fields marked as not required (because they are not used in KRaft) are in fact
* required. This method validates that the fields are present and in case they are missing, it throws an exception.
*
* @param kafkaSpec The .spec section of the Kafka CR which should be checked
* @param nodePoolsEnabled Flag indicating whether Node Pools are enabled or not
*/
public static void validateKafkaCrForZooKeeper(KafkaSpec kafkaSpec, boolean nodePoolsEnabled) {
Set<String> errors = new HashSet<>(0);

if (kafkaSpec != null) {
if (kafkaSpec.getZookeeper() == null) {
errors.add("The .spec.zookeeper section of the Kafka custom resource is missing. " +
"This section is required for a ZooKeeper-based cluster.");
}

if (!nodePoolsEnabled) {
if (kafkaSpec.getKafka().getReplicas() == null || kafkaSpec.getKafka().getReplicas() == 0) {
errors.add("The .spec.kafka.replicas property of the Kafka custom resource is missing. " +
"This property is required for a ZooKeeper-based Kafka cluster that is not using Node Pools.");
}

if (kafkaSpec.getKafka().getStorage() == null) {
errors.add("The .spec.kafka.storage section of the Kafka custom resource is missing. " +
"This section is required for a ZooKeeper-based Kafka cluster that is not using Node Pools.");
}
}
} else {
errors.add("The .spec section of the Kafka custom resource is missing");
}

if (!errors.isEmpty()) {
throw new InvalidResourceException("Kafka configuration is not valid: " + errors);
}
}

/**
* Generates Kafka CR status warnings about the fields ignored in Kraft mode if they are set - the ZooKeeper section
* and Kafka replicas and storage configuration.
Expand All @@ -117,7 +81,7 @@ public static void kraftWarnings(Kafka kafkaCr, KafkaStatus kafkaStatus) {
* @param kafkaCr The Kafka custom resource
* @param kafkaStatus The Kafka Status to add the warnings to
*/
public static void nodePoolWarnings(Kafka kafkaCr, KafkaStatus kafkaStatus) {
private static void nodePoolWarnings(Kafka kafkaCr, KafkaStatus kafkaStatus) {
if (kafkaCr.getSpec().getKafka() != null
&& kafkaCr.getSpec().getKafka().getReplicas() != null
&& kafkaCr.getSpec().getKafka().getReplicas() > 0) {
Expand All @@ -133,39 +97,4 @@ public static void nodePoolWarnings(Kafka kafkaCr, KafkaStatus kafkaStatus) {
"are used and should be removed from the custom resource."));
}
}

/**
* Validate the Kafka version set in the Kafka custom resource (in spec.kafka.version), together with the
* metadata version (in spec.kafka.metadataVersion) and the configured inter.broker.protocol.version
* and log.message.format.version. (in spec.kafka.config).
* They need to be all aligned and at least 3.7.0 to support ZooKeeper to KRaft migration, otherwise the check
* throws an {@code InvalidResourceException}.
*
* @param kafkaVersionFromCr Kafka version from the custom resource
* @param metadataVersionFromCr Metadata version from the custom resource
* @param interBrokerProtocolVersionFromCr Inter broker protocol version from the configuration of the Kafka custom resource
* @param logMessageFormatVersionFromCr Log message format version from the configuration of the Kafka custom resource
*/
public static void validateVersionsForKRaftMigration(String kafkaVersionFromCr, String metadataVersionFromCr,
String interBrokerProtocolVersionFromCr, String logMessageFormatVersionFromCr) {
// validate 3.7.0 <= kafka.version && metadataVersion/IBP/LMF == kafka.version

MetadataVersion kafkaVersion = MetadataVersion.fromVersionString(kafkaVersionFromCr);
// this should check that spec.kafka.version is >= 3.7.0
boolean isMigrationSupported = kafkaVersion.isAtLeast(MetadataVersion.IBP_3_7_IV0);

MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionFromCr);
MetadataVersion interBrokerProtocolVersion = MetadataVersion.fromVersionString(interBrokerProtocolVersionFromCr);
MetadataVersion logMessageFormatVersion = MetadataVersion.fromVersionString(logMessageFormatVersionFromCr);

if (!isMigrationSupported ||
metadataVersion.compareTo(interBrokerProtocolVersion) != 0 ||
metadataVersion.compareTo(logMessageFormatVersion) != 0) {
String message = String.format("Migration cannot be performed with Kafka version %s, metadata version %s, inter.broker.protocol.version %s, log.message.format.version %s. " +
"Please make sure the Kafka version, metadata version, inter.broker.protocol.version and log.message.format.version " +
"are all set to the same value, which must be equal to, or higher than 3.7.0",
kafkaVersion, metadataVersion, interBrokerProtocolVersion, logMessageFormatVersion);
throw new InvalidResourceException(message);
}
}
}
Loading

0 comments on commit 8cc3dd8

Please sign in to comment.