Skip to content

Commit

Permalink
DPE-2112 - fix: don't raise if user deletion not exist (#57)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcoppenheimer authored Jul 7, 2023
1 parent f8f3b7d commit f84783d
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 62 deletions.
3 changes: 2 additions & 1 deletion charmcraft.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
# Copyright 2022 Canonical Ltd.
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.

type: charm
parts:
charm:
charm-binary-python-packages:
- setuptools
- pydantic
build-packages:
- libffi-dev
- libssl-dev
Expand Down
25 changes: 14 additions & 11 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 21 additions & 20 deletions src/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from dataclasses import asdict, dataclass
from typing import Optional, Set

from ops.pebble import ExecError

from utils import run_bin_command

logger = logging.getLogger(__name__)
Expand All @@ -32,9 +34,14 @@ def __init__(self, charm):
self.opts = self.charm.kafka_config.auth_args
self.zookeeper = self.charm.kafka_config.zookeeper_config.get("connect", "")
self.container = self.charm.container
self.current_acls: Set[Acl] = set()
self.new_user_acls: Set[Acl] = set()

@property
def current_acls(self) -> Set[Acl]:
"""Sets the current cluster ACLs."""
acls = self._get_acls_from_cluster()
return self._parse_acls(acls=acls)

def _get_acls_from_cluster(self) -> str:
"""Loads the currently active ACLs from the Kafka cluster."""
command = [
Expand Down Expand Up @@ -88,18 +95,6 @@ def _parse_acls(acls: str) -> Set[Acl]:

return current_acls

def load_current_acls(self) -> None:
"""Sets the current cluster ACLs to the instance state.
State is set to `KafkaAuth.current_acls`.
Raises:
`subprocess.CalledProcessError`: if the error returned a non-zero exit code
"""
acls = self._get_acls_from_cluster()

self.current_acls = self._parse_acls(acls=acls)

@staticmethod
def _generate_producer_acls(topic: str, username: str, **_) -> Set[Acl]:
"""Generates expected set of `Acl`s for a producer client application."""
Expand Down Expand Up @@ -185,13 +180,19 @@ def delete_user(self, username: str) -> None:
f"--entity-name={username}",
"--delete-config=SCRAM-SHA-512",
]
run_bin_command(
container=self.container,
bin_keyword="configs",
bin_args=command,
extra_args=self.opts,
zk_tls_config_filepath=self.charm.kafka_config.server_properties_filepath,
)
try:
run_bin_command(
container=self.container,
bin_keyword="configs",
bin_args=command,
extra_args=self.opts,
zk_tls_config_filepath=self.charm.kafka_config.server_properties_filepath,
)
except ExecError as e:
if "delete a user credential that does not exist" in str(e.stderr):
logger.warning(f"User: {username} can't be deleted, it does not exist")
return
raise

def add_acl(
self, username: str, operation: str, resource_type: str, resource_name: str
Expand Down
50 changes: 20 additions & 30 deletions src/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,15 @@
"""KafkaProvider class and methods."""

import logging
from typing import Optional

from charms.data_platform_libs.v0.data_interfaces import KafkaProvides, TopicRequestedEvent
from ops.charm import RelationBrokenEvent, RelationCreatedEvent
from ops.framework import Object
from ops.model import Relation
from ops.pebble import ExecError

from auth import KafkaAuth
from config import KafkaConfig
from literals import PEER, REL_NAME
from literals import REL_NAME
from utils import generate_password

logger = logging.getLogger(__name__)
Expand All @@ -38,17 +36,9 @@ def __init__(self, charm) -> None:

self.framework.observe(self.charm.on[REL_NAME].relation_broken, self._on_relation_broken)

self.framework.observe(self.kafka_provider.on.topic_requested, self.on_topic_requested)

@property
def peer_relation(self) -> Optional[Relation]:
"""The Kafka cluster's peer relation."""
return self.charm.model.get_relation(PEER)

def _on_relation_created(self, event: RelationCreatedEvent) -> None:
"""Handler for `kafka-client-relation-created` event."""
# this will trigger kafka restart (if needed) before granting credentials
self.charm._on_config_changed(event)
self.framework.observe(
getattr(self.kafka_provider.on, "topic_requested"), self.on_topic_requested
)

def on_topic_requested(self, event: TopicRequestedEvent):
"""Handle the on topic requested event."""
Expand All @@ -59,14 +49,14 @@ def on_topic_requested(self, event: TopicRequestedEvent):
# on all unit update the server properties to enable client listener if needed
self.charm._on_config_changed(event)

if not self.charm.unit.is_leader() or not self.peer_relation:
if not self.charm.unit.is_leader() or not self.charm.peer_relation:
return

extra_user_roles = event.extra_user_roles or ""
topic = event.topic or ""
relation = event.relation
username = f"relation-{relation.id}"
password = self.peer_relation.data[self.charm.app].get(username) or generate_password()
password = self.charm.app_peer_data.get(username) or generate_password()
bootstrap_server = self.charm.kafka_config.bootstrap_server
zookeeper_uris = self.charm.kafka_config.zookeeper_config.get("connect", "")
tls = "enabled" if self.charm.tls.enabled else "disabled"
Expand All @@ -86,10 +76,7 @@ def on_topic_requested(self, event: TopicRequestedEvent):
event.defer()
return

# non-leader units need cluster_config_changed event to update their super.users
self.peer_relation.data[self.charm.app].update({username: password})

self.kafka_auth.load_current_acls()
self.charm.app_peer_data.update({username: password})

self.kafka_auth.update_user_acls(
username=username,
Expand All @@ -98,10 +85,7 @@ def on_topic_requested(self, event: TopicRequestedEvent):
group=consumer_group_prefix,
)

# non-leader units need cluster_config_changed event to update their super.users
self.peer_relation.data[self.charm.app].update(
{"super-users": self.kafka_config.super_users}
)
self.charm.app_peer_data.update({"super-users": self.kafka_config.super_users})

self.kafka_provider.set_bootstrap_server(relation.id, ",".join(bootstrap_server))
self.kafka_provider.set_consumer_group_prefix(relation.id, consumer_group_prefix)
Expand All @@ -110,6 +94,11 @@ def on_topic_requested(self, event: TopicRequestedEvent):
self.kafka_provider.set_zookeeper_uris(relation.id, zookeeper_uris)
self.kafka_provider.set_topic(relation.id, topic)

def _on_relation_created(self, event: RelationCreatedEvent) -> None:
"""Handler for `kafka-client-relation-created` event."""
# this will trigger kafka restart (if needed) before granting credentials
self.charm._on_config_changed(event)

def _on_relation_broken(self, event: RelationBrokenEvent) -> None:
"""Handler for `kafka-client-relation-broken` event.
Expand All @@ -118,7 +107,11 @@ def _on_relation_broken(self, event: RelationBrokenEvent) -> None:
Args:
event: the event from a related client application needing a user
"""
if not self.charm.unit.is_leader():
# don't remove anything if app is going down
if self.charm.app.planned_units == 0:
return

if not self.charm.unit.is_leader() or not self.charm.peer_relation:
return

if not self.charm.ready_to_start:
Expand All @@ -127,14 +120,11 @@ def _on_relation_broken(self, event: RelationBrokenEvent) -> None:
return

if event.relation.app != self.charm.app or not self.charm.app.planned_units() == 0:
self.kafka_auth.load_current_acls()
username = f"relation-{event.relation.id}"
self.kafka_auth.remove_all_user_acls(
username=username,
)
self.kafka_auth.remove_all_user_acls(username=username)
self.kafka_auth.delete_user(username=username)
# non-leader units need cluster_config_changed event to update their super.users
self.charm.peer_relation.data[self.charm.app].update({username: ""})
self.charm.app_peer_data.update({username: ""})

def update_connection_info(self):
"""Updates all relations with current endpoints, bootstrap-server and tls data.
Expand Down

0 comments on commit f84783d

Please sign in to comment.