Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use TypedDicts rather than nested BaseModels #48

Merged
merged 2 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 57 additions & 56 deletions hv4gha/gh.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,16 @@

import json
from datetime import datetime
from typing import Annotated, Final, Literal, TypedDict
from typing import Annotated, Final, Literal

import requests
from pydantic import BaseModel, Field, TypeAdapter, ValidationError
from typing_extensions import NotRequired, TypedDict

PermARW = None | Literal["admin", "read", "write"]
PermRW = None | Literal["read", "write"]
PermR = None | Literal["read"]
PermW = None | Literal["write"]


class TokenResponse(TypedDict, total=False):
"""Typing for customized Access Token response"""

access_token: str
expires_at: datetime
permissions: dict[str, str]
repositories: list[str]
PermARW = Literal["admin", "read", "write"]
PermRW = Literal["read", "write"]
PermR = Literal["read"]
PermW = Literal["write"]


class GitHubAPIError(Exception):
Expand All @@ -46,7 +38,7 @@ class GitHubErrors(BaseModel):
message: str


class AccountInfo(BaseModel):
class AccountInfo(TypedDict):
"""Part of Installation"""

login: Annotated[
Expand All @@ -63,48 +55,48 @@ class Installation(BaseModel):
account: AccountInfo


class TokenPermissions(BaseModel):
class TokenPermissions(TypedDict, total=False):
"""Part of AccessToken"""

# Repository permissions
actions: PermRW = None
administration: PermRW = None
checks: PermRW = None
contents: PermRW = None
deployments: PermRW = None
environments: PermRW = None
issues: PermRW = None
metadata: PermRW = None
packages: PermRW = None
pages: PermRW = None
pull_requests: PermRW = None
repository_hooks: PermRW = None
repository_projects: PermARW = None
secret_scanning_alerts: PermRW = None
secrets: PermRW = None
security_events: PermRW = None
single_file: PermRW = None
statuses: PermRW = None
vulnerability_alerts: PermRW = None
workflows: PermW = None
actions: PermRW
administration: PermRW
checks: PermRW
contents: PermRW
deployments: PermRW
environments: PermRW
issues: PermRW
metadata: PermRW
packages: PermRW
pages: PermRW
pull_requests: PermRW
repository_hooks: PermRW
repository_projects: PermARW
secret_scanning_alerts: PermRW
secrets: PermRW
security_events: PermRW
single_file: PermRW
statuses: PermRW
vulnerability_alerts: PermRW
workflows: PermW
# Organizational permissions
members: PermRW = None
organization_administration: PermRW = None
organization_custom_roles: PermRW = None
organization_announcement_banners: PermRW = None
organization_hooks: PermRW = None
organization_personal_access_tokens: PermRW = None
organization_personal_access_token_requests: PermRW = None
organization_plan: PermR = None
organization_projects: PermARW = None
organization_packages: PermRW = None
organization_secrets: PermRW = None
organization_self_hosted_runners: PermRW = None
organization_user_blocking: PermRW = None
team_discussions: PermRW = None


class Repository(BaseModel):
members: PermRW
organization_administration: PermRW
organization_custom_roles: PermRW
organization_announcement_banners: PermRW
organization_hooks: PermRW
organization_personal_access_tokens: PermRW
organization_personal_access_token_requests: PermRW
organization_plan: PermR
organization_projects: PermARW
organization_packages: PermRW
organization_secrets: PermRW
organization_self_hosted_runners: PermRW
organization_user_blocking: PermRW
team_discussions: PermRW


class Repository(TypedDict):
"""Part of AccessToken"""

name: Annotated[str, Field(max_length=100, pattern=r"^[a-zA-Z0-9_\-\.]+$")]
Expand All @@ -121,6 +113,15 @@ class AccessToken(BaseModel):
repositories: None | list[Repository] = None


class TokenResponse(TypedDict):
"""Typing for customized Access Token response"""

access_token: str
expires_at: datetime
permissions: TokenPermissions
repositories: NotRequired[list[str]]


class GitHubApp:
"""GitHub App Access Tokens, etc"""

Expand Down Expand Up @@ -174,7 +175,7 @@ def __find_installation(self) -> str:
raise InstallationLookupError(error_message) from validation_error

for installation in installations:
if installation.account.login.lower() == self.account.lower():
if installation.account["login"].lower() == self.account.lower():
return str(installation.id)

if "next" in response.links:
Expand Down Expand Up @@ -242,12 +243,12 @@ def issue_token(
access_token: TokenResponse = {
"access_token": access_token_bm.token,
"expires_at": access_token_bm.expires_at,
"permissions": access_token_bm.permissions.model_dump(exclude_unset=True),
"permissions": access_token_bm.permissions,
}

if access_token_bm.repositories is not None:
access_token["repositories"] = sorted(
[repo.name for repo in access_token_bm.repositories]
[repo["name"] for repo in access_token_bm.repositories]
)

return access_token
9 changes: 5 additions & 4 deletions hv4gha/vault.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from pydantic import BaseModel, ValidationError
from typing_extensions import TypedDict

from .helpers import b64str, prepare_gh_app_jwt, private_pem_to_der, vault_wrap_key

Expand Down Expand Up @@ -41,7 +42,7 @@ class VaultErrors(BaseModel):
errors: list[str]


class JWTData(BaseModel):
class JWTData(TypedDict):
"""Part of SignedJWT"""

signature: str
Expand All @@ -55,7 +56,7 @@ class SignedJWT(BaseModel):
data: JWTData


class KeyData(BaseModel):
class KeyData(TypedDict):
"""Part of WrappingKey"""

public_key: str
Expand Down Expand Up @@ -108,7 +109,7 @@ def __download_wrapping_key(self) -> rsa.RSAPublicKey:
error_message = "<Failed to parse Wrapping Key API response>"
raise WrappingKeyDownloadError(error_message) from validation_error

wrapping_pem_key = wrapping_key_bm.data.public_key.encode()
wrapping_pem_key = wrapping_key_bm.data["public_key"].encode()
wrapping_key = serialization.load_pem_public_key(wrapping_pem_key)

if not isinstance(wrapping_key, rsa.RSAPublicKey):
Expand Down Expand Up @@ -200,7 +201,7 @@ def sign_jwt(self, *, key_name: str, app_id: str) -> str:
error_message = "<Failed to parse Sign JWT API response>"
raise JWTSigningError(error_message) from validation_error

signature = signature_bm.data.signature.removeprefix("vault:v1:")
signature = signature_bm.data["signature"].removeprefix("vault:v1:")
signature = b64str(base64.b64decode(signature), urlsafe=True)

jwt_token = header_and_claims + "." + signature
Expand Down
3 changes: 2 additions & 1 deletion integration/testrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
from base64 import b64decode

from hv4gha import TokenResponse, import_app_key, issue_access_token
from hv4gha.gh import TokenPermissions


def _check_perms(requested: dict[str, str], result: dict[str, str]) -> None:
def _check_perms(requested: dict[str, str], result: TokenPermissions) -> None:
result.pop("metadata") # Clear default permission
complaint = "Returned permissions does not match requested permissions"
assert requested == result, complaint
Expand Down
Loading