Skip to content

Commit

Permalink
PMR: Create or Delete RoleBindings in create_or_update_profiles() (#36
Browse files Browse the repository at this point in the history
)

* classes: Add dict for Contributors in Profile

* Extend create_or_update for creating/deleting RoleBindings

* Unit tests

* Integration tests

* review(nit): Update log message

* review: Remove redundant KFPResource class

* Update src/profiles_management/helpers/kfam.py

Co-authored-by: Daniela Plascencia <[email protected]>

* review: Use regex in rfc1123 helper

* review: Refactor delete_rolebindings_not_matching_profile_contributors

* review: Refactor resource_matches_profile_contributor

* review: Different handling for empty contributors

* review: Rename to has_valid_kfam_annotations

* Update src/profiles_management/helpers/kfam.py

Co-authored-by: Manos Vlassis <[email protected]>

* Update src/profiles_management/helpers/kfam.py

Co-authored-by: Manos Vlassis <[email protected]>

* review: Extend docstring

* review: Custom Exception for invalid KFAM Annotations

* review: Document raised ApiError exceptions

* review: Handling of empty name RFC 1123

* review: Parametrize rfc1123 unit tests

* review: Parametrise annotation unit tests

* review: Parametrise kfam unit tests

* review: Refactor profiles contributor check

* review: Omit the extra annotations check

---------

Co-authored-by: Daniela Plascencia <[email protected]>
Co-authored-by: Manos Vlassis <[email protected]>
  • Loading branch information
3 people authored Jan 27, 2025
1 parent 3b6e7e7 commit e779929
Show file tree
Hide file tree
Showing 8 changed files with 528 additions and 50 deletions.
19 changes: 17 additions & 2 deletions src/profiles_management/create_or_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from lightkube import Client
from lightkube.generic_resource import GenericGlobalResource

from profiles_management.helpers import profiles
from profiles_management.helpers import kfam, profiles
from profiles_management.helpers.k8s import get_name
from profiles_management.helpers.kfam import (
list_contributor_authorization_policies,
Expand Down Expand Up @@ -51,7 +51,10 @@ def remove_access_to_stale_profile(client: Client, profile: GenericGlobalResourc
log.info("Deleted all KFAM AuthorizationPolicies")


def create_or_update_profiles(client: Client, pmr: ProfilesManagementRepresentation):
def create_or_update_profiles(
client: Client,
pmr: ProfilesManagementRepresentation,
):
"""Update the cluster to ensure Profiles and contributors are updated accordingly.
The function ensures that:
Expand All @@ -69,6 +72,12 @@ def create_or_update_profiles(client: Client, pmr: ProfilesManagementRepresentat
client: The lightkube client to use.
pmr: The ProfilesManagementRepresentation expressing what Profiles and contributors
should exist in the cluster.
Raises:
ApiError: From lightkube if an error occurred while trying to create or delete
Profiles, RoleBindings or AuthorizationPolicies.
InvalidKfamAnnotationsError: If a RoleBinding or AuthorizationPolicy does not have
KFAM valid annotations.
"""
log.info("Fetching all Profiles in the cluster")

Expand All @@ -95,3 +104,9 @@ def create_or_update_profiles(client: Client, pmr: ProfilesManagementRepresentat

log.info("Creating or updating the ResourceQuota for Profile %s", profile_name)
profiles.update_resource_quota(client, existing_profile, profile)

log.info("Deleting RoleBindings that don't match Profile: %s", profile_name)
kfam.delete_rolebindings_not_matching_profile_contributors(client, profile)

log.info("Creating RoleBindings for Profile: %s", profile_name)
kfam.create_rolebindings_for_profile_contributors(client, profile)
46 changes: 44 additions & 2 deletions src/profiles_management/helpers/k8s.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
"""Generic helpers for manipulating K8s objects, via lightkube."""

import logging
import re

import tenacity
from lightkube import Client
from lightkube.core.exceptions import ApiError
from lightkube.generic_resource import GenericGlobalResource, GenericNamespacedResource
from lightkube.resources.core_v1 import Namespace
from lightkube.resources.rbac_authorization_v1 import RoleBinding

log = logging.getLogger(__name__)

Expand All @@ -18,11 +20,12 @@ class ObjectStillExistsError(Exception):
pass


def get_name(res: GenericNamespacedResource | GenericGlobalResource) -> str:
def get_name(res: GenericNamespacedResource | GenericGlobalResource | RoleBinding) -> str:
"""Return the name from generic lightkube resource.
Args:
res: The resource to get it's name from metadata.name
res: The resource (Profile, AuthorizationPolicy or RoleBinding) to get it's name
from metadata.name
Raises:
ValueError: if the object doesn't have metadata or metadata.name
Expand All @@ -39,6 +42,45 @@ def get_name(res: GenericNamespacedResource | GenericGlobalResource) -> str:
return res.metadata.name


def get_annotations(res: GenericNamespacedResource | RoleBinding) -> dict[str, str]:
"""Return annotations of a RoleBinding or AuthorizationPolicy, or an empty dict.
Args:
res: The resource to return its annotations.
Returns:
A dictionary with the annotations, or empty dictionary if no annotations exist.
"""
if res.metadata and res.metadata.annotations:
return res.metadata.annotations

return {}


def to_rfc1123_compliant(name: str) -> str:
"""Transform a given string into an RFC 1123-compliant string.
The resulting string will:
1. Contain at most 63 characters.
2. Contain only lowercase alphanumeric characters or '-'.
Args:
name: The input string to transform.
Returns:
The RFC 1123-compliant string.
"""
if not name:
return ""

compliant_str = name.lower()
compliant_str = re.sub(r"[^a-z0-9-]", "-", compliant_str)

compliant_str = compliant_str.lstrip("-").rstrip("-")

return compliant_str[:63]


@tenacity.retry(stop=tenacity.stop_after_delay(300), wait=tenacity.wait_fixed(5), reraise=True)
def ensure_namespace_is_deleted(namespace: str, client: Client):
"""Check if the name doesn't exist with retries.
Expand Down
210 changes: 205 additions & 5 deletions src/profiles_management/helpers/kfam.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
import logging
from typing import List

from charmed_kubeflow_chisme.lightkube.batch import delete_many
from lightkube import Client
from lightkube.generic_resource import GenericNamespacedResource, create_namespaced_resource
from lightkube.resources.rbac_authorization_v1 import RoleBinding

from profiles_management.helpers import k8s
from profiles_management.pmr.classes import Contributor, ContributorRole, Profile

log = logging.getLogger(__name__)

AuthorizationPolicy = create_namespaced_resource(
Expand All @@ -17,17 +21,31 @@
)


def has_kfam_annotations(resource: GenericNamespacedResource | RoleBinding) -> bool:
class InvalidKfamAnnotationsError(Exception):
"""Exception for when KFAM Annotations were expected but not found in object."""

pass


def has_valid_kfam_annotations(resource: GenericNamespacedResource | RoleBinding) -> bool:
"""Check if resource has "user" and "role" KFAM annotations.
The function will also ensure that the value for "role", in the annotations will have
one of the expected values: admin, edit, view
Args:
resource: The RoleBinding or AuthorizationPolicy to check if it has KFAM annotations.
Returns:
A boolean if the provided resources has a `role` and `user` annotation.
"""
if resource.metadata and resource.metadata.annotations:
return "role" in resource.metadata.annotations and "user" in resource.metadata.annotations
annotations = k8s.get_annotations(resource)
if annotations:
return (
"user" in annotations
and "role" in annotations
and annotations["role"].upper() in ContributorRole.__members__
)

return False

Expand All @@ -50,6 +68,98 @@ def resource_is_for_profile_owner(resource: GenericNamespacedResource | RoleBind
return False


def get_contributor_user(resource: GenericNamespacedResource | RoleBinding) -> str:
"""Return user in KFAM annotation.
Raises:
InvalidKfamAnnotationsError: If the object does not have KFAM annotations.
Returns:
The user defined in metadata.annotations.user of the resource.
"""
if not has_valid_kfam_annotations(resource):
raise InvalidKfamAnnotationsError(
"Resource doesn't have valid KFAM metadata: %s" % k8s.get_name(resource)
)

return k8s.get_annotations(resource)["user"]


def get_contributor_role(
resource: GenericNamespacedResource | RoleBinding,
) -> ContributorRole:
"""Return role in KFAM annotation.
Raises:
InvalidKfamAnnotationsError: If the object does not have valid KFAM annotations.
Returns:
The user defined in metadata.annotations.user of the resource.
"""
if not has_valid_kfam_annotations(resource):
raise InvalidKfamAnnotationsError(
"Resource doesn't have invalid KFAM metadata: %s" % k8s.get_name(resource)
)

annotations = k8s.get_annotations(resource)
return ContributorRole(annotations["role"])


def resource_matches_profile_contributor(
resource: RoleBinding | GenericNamespacedResource, profile: Profile
) -> bool:
"""Check if the user and its role in the RoleBinding match the PMR.
Args:
resource: The AuthorizationPolicy or RoleBinding to check if it matches any
Contributor in the PMR Profile.
profile: The PMR Profile to check if the resource is matching it.
Returns:
A boolean representing if the resources matches the expected contributor
"""
role = get_contributor_role(resource)
user = get_contributor_user(resource)
if role in profile._contributors_dict.get(user, []):
return True

return False


def generate_contributor_rolebinding(contributor: Contributor, namespace: str) -> RoleBinding:
"""Generate RoleBinding for a PMR Contributor.
Args:
contributor: The PMR Contributor to generate a RoleBinding for.
namespace: The namespace to use for the RoleBinding.
Returns:
The generated RoleBinding lightkube object for the contributor.
"""
name_rfc1123 = k8s.to_rfc1123_compliant(f"{contributor.name}-{contributor.role}")

return RoleBinding.from_dict(
{
"metadata": {
"name": name_rfc1123,
"namespace": namespace,
"annotations": {
"user": contributor.name,
"role": contributor.role,
},
},
"roleRef": {
"apiGroup": "rbac.authorization.k8s.io",
"kind": "ClusterRole",
"name": f"kubeflow-{contributor.role}",
},
"subjects": [
{"apiGroup": "rbac.authorization.k8s.io", "kind": "User", "name": contributor.name}
],
},
)


def list_contributor_rolebindings(client: Client, namespace="") -> List[RoleBinding]:
"""Return a list of KFAM RoleBindings.
Expand All @@ -73,7 +183,7 @@ def list_contributor_rolebindings(client: Client, namespace="") -> List[RoleBind
return [
rb
for rb in role_bindings
if has_kfam_annotations(rb) and not resource_is_for_profile_owner(rb)
if has_valid_kfam_annotations(rb) and not resource_is_for_profile_owner(rb)
]


Expand Down Expand Up @@ -102,5 +212,95 @@ def list_contributor_authorization_policies(
return [
ap
for ap in authorization_policies
if has_kfam_annotations(ap) and not resource_is_for_profile_owner(ap)
if has_valid_kfam_annotations(ap) and not resource_is_for_profile_owner(ap)
]


def kfam_resources_list_to_roles_dict(
resources: List[RoleBinding] | List[GenericNamespacedResource],
) -> dict[str, List[ContributorRole]]:
"""Convert list of KFAM RoleBindings or AuthorizationPolicies to dict.
The user of the resource will be used as a key and its role as the value.
Args:
resources: List of KFAM RoleBindings or AuthorizationPolicies.
Returns:
Dictionary with keys the user names and values the roles, derived from parsing all
the provided resources.
"""
contributor_roles_dict = {}
for resource in resources:
user = get_contributor_user(resource)
role = get_contributor_role(resource)
contributor_roles_dict[user] = contributor_roles_dict.get(user, []) + [role]

return contributor_roles_dict


def delete_rolebindings_not_matching_profile_contributors(
client: Client,
profile: Profile,
) -> None:
"""Delete RoleBindings in the cluster that doesn't match Contributors in PMR Profile.
The function will be handling 404 errors, in case the RoleBinding doesn't exist in the
cluster.
Args:
client: The lightkube client to use.
profile: The PMR Profile to create RoleBindings based on its Contributors.
Raises:
ApiError: From lightkube if something unexpected occurred while deleting the
resources.
"""
existing_rolebindings = list_contributor_rolebindings(client, profile.name)
role_bindings_to_delete = []

if not profile.contributors:
role_bindings_to_delete = existing_rolebindings
else:
for rb in existing_rolebindings:
if not resource_matches_profile_contributor(rb, profile):
log.info(
"RoleBinding '%s' doesn't belong to Profile. Will delete it.",
k8s.get_name(rb),
)
role_bindings_to_delete.append(rb)

if role_bindings_to_delete:
log.info("Deleting all resources that don't match the PMR.")
delete_many(client, role_bindings_to_delete, logger=log)


def create_rolebindings_for_profile_contributors(
client: Client,
profile: Profile,
) -> None:
"""Create RoleBindings for all contributors defined in a Profile, in the PMR.
If a RoleBinding already exists for the specific Contributor name and role, then
no API requests will happen.
Args:
client: The lightkube client to use.
profile: The PMR to iterate over its Contributors.
existing_rolebindings: List of existing RoleBindings, to avoid doing redundant
API requests
Raises:
ApiError: From lightkube if there was an error while trying to create the
RoleBindings.
"""
existing_rolebindings = list_contributor_rolebindings(client, profile.name)
existing_contributor_roles = kfam_resources_list_to_roles_dict(existing_rolebindings)

if not profile.contributors:
return

for contributor in profile.contributors:
if contributor.role not in existing_contributor_roles.get(contributor.name, []):
log.info("Will create RoleBinding for Contributor: %s", contributor)
client.apply(generate_contributor_rolebinding(contributor, profile.name))
15 changes: 15 additions & 0 deletions src/profiles_management/pmr/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,21 @@ class Profile(BaseModel):
resources: Optional[ResourceQuotaSpecModel] = None
contributors: Optional[List[Contributor]] = []

_contributors_dict: dict[str, List[ContributorRole]]

def __init__(self, **data: Any) -> None:
super().__init__(**data)

# Group contributors based on user name for more efficient retrieval
self._contributors_dict = {}
if self.contributors is None:
return

for contributor in self.contributors:
self._contributors_dict[contributor.name] = self._contributors_dict.get(
contributor.name, []
) + [contributor.role]


class ProfilesManagementRepresentation:
"""A class representing the Profiles and Contributors.
Expand Down
Loading

0 comments on commit e779929

Please sign in to comment.