Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MinioAdmin: add IDP/LDAP attach/detach/list APIs #1470

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions minio/minioadmin.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ class _COMMAND(Enum):
SERVICE_ACCOUNT_ADD = "add-service-account"
SERVICE_ACCOUNT_UPDATE = "update-service-account"
SERVICE_ACCOUNT_DELETE = "delete-service-account"
IDP_LDAP_POLICY_ATTACH = "idp/ldap/policy/attach"
IDP_LDAP_POLICY_DETACH = "idp/ldap/policy/detach"
IDP_LDAP_LIST_ACCESS_KEYS = "idp/ldap/list-access-keys"
IDP_LDAP_LIST_ACCESS_KEYS_BULK = "idp/ldap/list-access-keys-bulk"


def _safe_str(value: Any) -> str:
Expand Down Expand Up @@ -842,3 +846,84 @@ def delete_service_account(self, access_key: str) -> str:
query_params={"accessKey": access_key},
)
return response.data.decode()

def _attach_detach_policy_ldap(
self,
command: _COMMAND,
policies: list[str],
user: str | None = None,
group: str | None = None,
) -> str:
"""Attach or detach policies for LDAP."""
if (user is not None) ^ (group is not None):
key = "user" if user else "group"
body = json.dumps(
{"policies": policies,
key: cast(str, user or group)},
).encode()
response = self._url_open(
"POST",
command,
body=encrypt(body, self._provider.retrieve().secret_key),
)
return response.data.decode()
raise ValueError("either user or group must be set")

def attach_policy_ldap(
self,
policies: list[str],
user: str | None = None,
group: str | None = None,
) -> str:
"""Attach policies for LDAP."""
return self._attach_detach_policy_ldap(
_COMMAND.IDP_LDAP_POLICY_ATTACH, policies, user, group,
)

def detach_policy_ldap(
self,
policies: list[str],
user: str | None = None,
group: str | None = None,
) -> str:
"""Detach policies for LDAP."""
return self._attach_detach_policy_ldap(
_COMMAND.IDP_LDAP_POLICY_DETACH, policies, user, group,
)

def list_access_keys_ldap(
self,
user_dn: str,
list_type: str,
) -> str:
"""List service accounts belonging to the specified user."""
response = self._url_open(
"GET", _COMMAND.IDP_LDAP_LIST_ACCESS_KEYS,
query_params={"userDN": user_dn, "listType": list_type},
preload_content=False,
)
plain_data = decrypt(
response, self._provider.retrieve().secret_key,
)
return plain_data.decode()

def list_access_keys_ldap_bulk(
self,
users: list[str],
list_type: str,
all_users: bool,
) -> str:
"""List access keys belonging to the given users or all users."""
if len(users) != 0 and all_users:
raise ValueError("both users and all_users are not permitted")

key, value = ("all", "true") if all_users else ("userDNs", users)
response = self._url_open(
"GET", _COMMAND.IDP_LDAP_LIST_ACCESS_KEYS_BULK,
query_params={"listType": list_type, key: value},
preload_content=False,
)
plain_data = decrypt(
response, self._provider.retrieve().secret_key,
)
return plain_data.decode()
Loading