Skip to content

Commit

Permalink
stricter type annotations
Browse files Browse the repository at this point in the history
  • Loading branch information
kdp-cloud committed Jun 6, 2024
1 parent 30e698b commit 2c3199b
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 43 deletions.
11 changes: 6 additions & 5 deletions mars-cli/mars_lib/authentication.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from typing import Optional
import requests
import json


def get_webin_auth_token(
credentials_dict,
header={"Content-Type": "application/json"},
auth_base_url="https://wwwdev.ebi.ac.uk/ena/dev/submit/webin/auth/token",
token_expiration_time=1,
):
credentials_dict: dict[str, str],
header: dict[str, str] = {"Content-Type": "application/json"},
auth_base_url: str = "https://wwwdev.ebi.ac.uk/ena/dev/submit/webin/auth/token",
token_expiration_time: int = 1,
) -> Optional[str]:
"""
Obtain Webin authentication token.
Expand Down
49 changes: 31 additions & 18 deletions mars-cli/mars_lib/biosamples_external_references.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import os
from jsonschema import validate
from jsonschema.exceptions import ValidationError, SchemaError
from typing import Union
from typing import Union, Any, Optional, List

# -- #
# Hardcoded values
Expand All @@ -21,7 +21,7 @@
# -- #
# Code blocks
# -- #
def load_json_file(file):
def load_json_file(file: str) -> Any:
"""
Function to load a JSON file as a dictionary.
Args:
Expand All @@ -46,7 +46,7 @@ def load_json_file(file):
)


def handle_input_dict(input):
def handle_input_dict(input: dict[str, str]) -> Optional[dict[str, str]]:
"""
Function to handle the input: assert that it's either a dictionary or
the filepath to an existing file containing the dictionary
Expand All @@ -73,7 +73,7 @@ def handle_input_dict(input):
raise ValueError(f"The file '{input}' is not a valid JSON file.")


def get_header(token):
def get_header(token: str) -> dict[str, str]:
"""
Obtain the header using a token.
Expand All @@ -90,7 +90,7 @@ def get_header(token):
}


def validate_bs_accession(accession_str):
def validate_bs_accession(accession_str: str) -> None:
"""
Validates that the given accession string conforms to the specified regex format.
See: https://registry.identifiers.org/registry/biosample
Expand All @@ -108,8 +108,8 @@ def validate_bs_accession(accession_str):


def validate_json_against_schema(
json_doc: Union[dict, str], json_schema: Union[dict, str]
):
json_doc: Union[dict[str, List[str]], str], json_schema: Union[dict[str, str], str]
) -> Optional[bool]:
"""
Validates a JSON document against a given JSON Schema.
Expand Down Expand Up @@ -150,7 +150,7 @@ class BiosamplesRecord:
production: boolean indicating environment mode
"""

def __init__(self, bs_accession):
def __init__(self, bs_accession: str) -> None:
"""
Initialize the BiosamplesRecord with provided arguments.
Expand All @@ -159,16 +159,19 @@ def __init__(self, bs_accession):
"""
validate_bs_accession(bs_accession)
self.bs_accession = bs_accession
self.biosamples_credentials: Optional[dict[str, str]] = None
self.biosamples_externalReferences: List[str] = []
self.production: bool = False

def display(self):
def display(self) -> None:
"""
Display the attributes for demonstration purposes.
"""
print("Biosamples Credentials:", self.biosamples_credentials)
print("Biosamples External References:", self.biosamples_externalReferences)
print("Production Mode:", self.production)

def fetch_bs_json(self, biosamples_endpoint):
def fetch_bs_json(self, biosamples_endpoint: str) -> Optional[dict[str, str]]:
"""
Fetches the BioSample's record (JSON) of the accession.
Expand Down Expand Up @@ -206,7 +209,9 @@ def fetch_bs_json(self, biosamples_endpoint):
self.bs_json = response_json
return self.bs_json

def load_bs_json(self, bs_json: Union[str, dict]):
def load_bs_json(
self, bs_json: Union[str, dict[str, str]]
) -> Optional[dict[str, str]]:
"""
Loads a given JSON, or the file containing it, as the BioSample's record (JSON) for this instance.
It is an alternative to fetching it directly from BioSample.
Expand All @@ -226,21 +231,27 @@ def load_bs_json(self, bs_json: Union[str, dict]):
"Neither the file containing the Biosamples JSON nor the Biosamples JSON itself were given to load it into the instance."
)

def pop_links(self):
def pop_links(self) -> dict[str, str]:
"""
Removes "_links" array (which is added automatically after updating the biosamples on the BioSample's side).
"""

if "_links" not in self.bs_json:
return self.bs_json
if "_links" in self.bs_json:
self.bs_json.pop("_links")

self.bs_json.pop("_links")
return self.bs_json

def extend_externalReferences(self, new_ext_refs_list):
def extend_externalReferences(
self, new_ext_refs_list: List[dict[str, str]]
) -> dict[str, str]:
"""Extends the JSON of the BioSample's record with new externalReferences"""
if not self.bs_json:
self.fetch_bs_json()
endpoint = (
biosamples_endpoints["prod"]
if self.production
else biosamples_endpoints["dev"]
)
self.fetch_bs_json(endpoint)
self.pop_links()

if "externalReferences" not in self.bs_json:
Expand All @@ -259,7 +270,9 @@ def extend_externalReferences(self, new_ext_refs_list):
self.bs_json["externalReferences"] = ext_refs_list
return self.bs_json

def update_remote_record(self, header, webin_auth="?authProvider=WEBIN"):
def update_remote_record(
self, header: dict[str, str], webin_auth: str = "?authProvider=WEBIN"
) -> Optional[str]:
"""
Updates the remote record of the BioSample's accession with the current sample JSON.
Expand Down
23 changes: 15 additions & 8 deletions mars-cli/mars_lib/credential.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,27 +52,31 @@


class CredentialManager:
def __init__(self, service_name):
def __init__(self, service_name: str) -> None:
self.service_name = service_name

def get_credential_env(self, username):
def get_credential_env(self, username: str) -> str:
"""
Retrieves a credential from environment variables.
:param username: The environment variable username.
:return: The value of the environment variable or None if not found.
"""
return os.getenv(username)
result = os.getenv(username)
if result is None:
raise ValueError(f"Environment variable '{username}' not found.")

def prompt_for_password(self):
return result

def prompt_for_password(self) -> str:
"""
Securely prompts the user to enter a password in the console.
:return: The password entered by the user.
"""
return getpass.getpass(prompt="Enter your password: ")

def set_password_keyring(self, username, password):
def set_password_keyring(self, username: str, password: str) -> None:
"""
Stores a password in the keyring under the given username.
Expand All @@ -81,16 +85,19 @@ def set_password_keyring(self, username, password):
"""
keyring.set_password(self.service_name, username, password)

def get_password_keyring(self, username):
def get_password_keyring(self, username: str) -> str:
"""
Retrieves a password from the keyring for the given username.
:param username: The username whose password to retrieve.
:return: The password or None if not found.
"""
return keyring.get_password(self.service_name, username)
pwd = keyring.get_password(self.service_name, username)
if pwd is None:
raise ValueError(f"Password not found for username '{username}'.")
return pwd

def delete_password_keyring(self, username):
def delete_password_keyring(self, username: str) -> None:
"""
Deletes a password from the keyring for the given username.
Expand Down
6 changes: 3 additions & 3 deletions mars-cli/mars_lib/models/isa_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class Data(IsaBase):
type: Optional[DataTypeEnum] = None

@field_validator("type")
def apply_enum(cls, v):
def apply_enum(cls, v: str) -> str:
if v not in [item.value for item in DataTypeEnum]:
raise ValueError("Invalid material type")
return v
Expand Down Expand Up @@ -151,7 +151,7 @@ class Material(IsaBase):
derivesFrom: List[Material] = []

@field_validator("type")
def apply_enum(cls, v):
def apply_enum(cls, v: str) -> str:
if v not in [item.value for item in MaterialTypeEnum]:
raise ValueError("Invalid material type")
return v
Expand Down Expand Up @@ -193,7 +193,7 @@ class Assay(IsaBase):
unitCategories: List[OntologyAnnotation] = []

@field_validator("comments")
def detect_target_repo_comments(cls, v):
def detect_target_repo_comments(cls, v: List[Comment]) -> Optional[List[Comment]]:
target_repo_comments = [
comment for comment in v if comment.name == TARGET_REPO_KEY
]
Expand Down
10 changes: 5 additions & 5 deletions mars-cli/mars_lib/models/repository_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class Accession(BaseModel):
value: str

@field_validator("path")
def validate_path(cls, path):
def validate_path(cls, path: List[Path]) -> List[Path]:
keys = [p.key for p in path]
if len(keys) != len(set(keys)):
raise ValueError("Duplicate keys found in path list")
Expand Down Expand Up @@ -49,20 +49,20 @@ class RepositoryResponse(BaseModel):
info: List[Info] = []

@field_validator("target_repository")
def validate_target_repository(cls, v):
def validate_target_repository(cls, v: str) -> str:
if v not in [item.value for item in TargetRepository]:
raise ValueError(f"Invalid 'target repository' value: '{v}'")
return v

@classmethod
def from_json_file(cls, json_file):
with open(json_file, "r") as file:
def from_json_file(cls, json_file_path: str) -> "RepositoryResponse":
with open(json_file_path, "r") as file:
data = json.load(file)

return cls.model_validate(data)

@classmethod
def from_json(cls, json_string: str):
def from_json(cls, json_string: str) -> "RepositoryResponse":
data = json.loads(json_string)

return cls.model_validate(data)
13 changes: 9 additions & 4 deletions mars-cli/mars_lib/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
from mars_lib.isa_json import reduce_isa_json_for_target_repo
from mars_lib.target_repo import TargetRepository
import requests
from typing import Any


def submit_to_biosamples(
investiagation: Investigation,
biosamples_credentials,
url,
biosamples_credentials: dict[str, str],
url: str,
) -> requests.Response:
bs_input_investiagation = reduce_isa_json_for_target_repo(
investiagation, TargetRepository.BIOSAMPLES
Expand All @@ -32,8 +33,10 @@ def submit_to_biosamples(


def create_external_references(
biosamples_credentials, biosamples_externalReferences, production
):
biosamples_credentials: dict[str, str],
biosamples_externalReferences: dict[str, Any],
production: bool,
) -> None:
"""
Main function to be executed when script is run.
Expand All @@ -51,6 +54,8 @@ def create_external_references(
json_doc=biosamples_externalReferences, json_schema=input_json_schema_filepath
)
token = get_webin_auth_token(biosamples_credentials)
if not token:
raise ValueError("The token could not be generated.")
header = get_header(token)

for biosample_r in biosamples_externalReferences["biosampleExternalReferences"]:
Expand Down

0 comments on commit 2c3199b

Please sign in to comment.