-
Notifications
You must be signed in to change notification settings - Fork 82
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
d187bbe
commit 3d25540
Showing
7 changed files
with
392 additions
and
56 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,215 @@ | ||
import json | ||
import functools | ||
from os import read | ||
|
||
from pathlib import Path | ||
from typing import Optional | ||
|
||
|
||
def provenance(func): | ||
from .provenance_utils import get_files_list | ||
|
||
@functools.wraps(func) | ||
def run_wrapper(self, **kwargs): | ||
ret = [] | ||
pipeline_fullname = self.fullname | ||
in_files_paths = get_files_list(self, pipeline_fullname, dict_field="input_to") | ||
|
||
prov_context = get_context(files_paths=in_files_paths) | ||
prov_command = get_command(self, in_files_paths) | ||
|
||
if validate_command(prov_context, prov_command): | ||
ret = func(self) | ||
else: | ||
raise Exception( | ||
"The pipeline selected is incompatible with the input files provenance" | ||
) | ||
out_files_paths = get_files_list( | ||
self, pipeline_fullname, dict_field="output_from" | ||
) | ||
register_prov(prov_command, out_files_paths) | ||
|
||
return ret | ||
|
||
return run_wrapper | ||
|
||
|
||
def register_prov(prov_command: dict, out_files: list) -> bool: | ||
|
||
# TODO: iterate over out_files and create a provenance file for each | ||
for file in out_files: | ||
write_prov_file(prov_command, file) | ||
print("Provenance registered succesfully") | ||
return True | ||
|
||
|
||
def get_context(files_paths: str) -> dict: | ||
""" | ||
Return a dictionary with the provenance info related to the files in the files_paths | ||
""" | ||
from clinica.engine.provenance_utils import read_prov, get_associated_prov | ||
|
||
prov_data = {"Entity": [], "Agent": [], "Activity": []} | ||
for path in files_paths: | ||
prov_record = read_prov(get_associated_prov(path)) | ||
if prov_record: | ||
prov_data = append_prov_dict(prov_data, prov_record) | ||
|
||
return prov_data | ||
|
||
|
||
def get_command(self, input_files_paths: list) -> dict: | ||
""" | ||
Read the user command and save information in a dict | ||
""" | ||
import sys | ||
|
||
new_entities = [] | ||
new_agent = get_agent() | ||
for path in input_files_paths: | ||
new_entities.append(get_entity(path)) | ||
new_activity = get_activity(self, new_agent["@id"], new_entities) | ||
|
||
return { | ||
"Agent": [new_agent], | ||
"Activity": [new_activity], | ||
"Entity": new_entities, | ||
} | ||
|
||
|
||
def write_prov_file(prov_command, files_paths): | ||
""" | ||
Write the dictionary data to the file_path | ||
""" | ||
from clinica.engine.provenance_utils import read_prov, get_associated_prov | ||
|
||
for file_path in files_paths: | ||
prov_path = get_associated_prov(file_path) | ||
|
||
if prov_path.exists(): | ||
# append the pipeline provenance information to the old provenance file | ||
prov_main = read_prov(prov_path) | ||
prov_main = append_prov_dict(prov_main, prov_command) | ||
else: | ||
print("help") | ||
# create new provenance file with pipeline information | ||
return "" | ||
|
||
|
||
def append_prov_dict(prov_main: dict, prov_new: dict) -> dict: | ||
""" | ||
Append a specific prov data to the global prov dict | ||
""" | ||
|
||
for k in prov_new.keys(): | ||
for el in prov_new[k]: | ||
if prov_main[k] and el not in prov_main[k]: | ||
prov_main[k].append(el) | ||
return prov_main | ||
|
||
|
||
def get_agent() -> dict: | ||
import clinica | ||
from .provenance_utils import get_agent_id | ||
|
||
agent_version = clinica.__version__ | ||
agent_label = clinica.__name__ | ||
agent_id = get_agent_id(agent_label + agent_version) | ||
|
||
new_agent = {"@id": agent_id, "label": agent_label, "version": agent_version} | ||
|
||
return new_agent | ||
|
||
|
||
def get_activity(self, agent_id: str, entities: list) -> dict: | ||
""" | ||
Add the current command to the list of activities | ||
""" | ||
import sys | ||
from .provenance_utils import get_activity_id | ||
|
||
activity_parameters = self.parameters | ||
activity_label = self.fullname | ||
activity_id = get_activity_id(self.fullname) | ||
activity_command = (sys.argv[1:],) | ||
activity_agent = agent_id | ||
activity_used_files = [e["@id"] for e in entities] | ||
|
||
new_activity = { | ||
"@id": activity_id, | ||
"label": activity_label, | ||
"command": activity_command, | ||
"parameters": activity_parameters, | ||
"wasAssociatedWith": activity_agent, | ||
"used": activity_used_files, | ||
} | ||
|
||
return new_activity | ||
|
||
|
||
def get_entity(img_path: str) -> dict: | ||
""" | ||
Add the current file to the list of entities | ||
""" | ||
from clinica.engine.provenance_utils import get_entity_id | ||
from clinica.engine.provenance_utils import get_last_activity | ||
from pathlib import Path | ||
|
||
entity_id = get_entity_id(img_path) | ||
entity_label = Path(img_path).name | ||
entity_path = img_path | ||
entity_source = get_last_activity(img_path) | ||
|
||
new_entity = { | ||
"@id": entity_id, | ||
"label": entity_label, | ||
"atLocation": entity_path, | ||
"wasGeneratedBy": entity_source, | ||
} | ||
|
||
return new_entity | ||
|
||
|
||
def create_prov_file(command, path): | ||
""" | ||
Create new provenance file based on command | ||
""" | ||
# TODO: create a json-ld object next to the file and add it to the active prov object | ||
return | ||
|
||
|
||
def validate_command(prov_context: dict, prov_command: dict) -> bool: | ||
""" | ||
Check the command is valid on the data being run | ||
""" | ||
flag = True | ||
new_activity_id = prov_command["Activity"][0]["@id"] | ||
new_agent_id = prov_command["Agent"][0]["@id"] | ||
|
||
for entity in prov_context["Entity"]: | ||
old_activity_id = entity["wasGeneratedBy"] | ||
if old_activity_id: | ||
ptr_activity = next( | ||
item | ||
for item in prov_context["Activity"] | ||
if item["@id"] == old_activity_id | ||
) | ||
old_agent_id = ptr_activity["wasAssociatedWith"] | ||
flag and is_valid( | ||
{(old_agent_id, old_activity_id): (new_agent_id, new_activity_id)} | ||
) | ||
return flag | ||
|
||
|
||
def is_valid(command: dict) -> bool: | ||
valid_list = [ | ||
{ | ||
("clin:clinica0.5.0", "clin:adni2Bids"): ( | ||
"clin:clinica0.5.0", | ||
"clin:t1-linear", | ||
) | ||
} | ||
] | ||
if command in valid_list: | ||
return True | ||
return False |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
from typing import Union, Optional | ||
from pathlib import Path | ||
|
||
|
||
def get_files_list(self, pipeline_fullname: str, dict_field="input_to") -> list: | ||
""" | ||
Calls clinica_file_reader with the appropriate extentions | ||
""" | ||
from clinica.utils.inputs import clinica_file_reader | ||
import clinica.utils.input_files as cif | ||
|
||
dict_field_options = ["input_to", "output_from"] | ||
if dict_field not in dict_field_options: | ||
raise (f"dict_field must be one of {dict_field_options}") | ||
|
||
# retrieve all the data dictionaries from the input_files module | ||
files_dicts = { | ||
k: v | ||
for k, v in vars(cif).items() | ||
if isinstance(v, dict) | ||
and dict_field in v.keys() | ||
and pipeline_fullname in v[dict_field] | ||
} | ||
# TODO: check if bids or caps as output | ||
ret_files = [] | ||
for elem in files_dicts: | ||
ref_dir = ( | ||
self.bids_directory if dict_field == "input_to" else self.caps_directory | ||
) | ||
current_file = clinica_file_reader( | ||
self.subjects, | ||
self.sessions, | ||
ref_dir, | ||
files_dicts[elem], | ||
raise_exception=False, | ||
) | ||
if current_file: | ||
ret_files.extend(current_file) | ||
|
||
return ret_files | ||
|
||
|
||
def is_entity_tracked(prov_context: dict, entity_id: str) -> bool: | ||
flag_exists = next( | ||
(True for item in prov_context["Entity"] if item["@id"] == entity_id), | ||
False, | ||
) | ||
return flag_exists | ||
|
||
|
||
def is_agent_tracked(prov_context: dict, agent_id: str) -> bool: | ||
flag_exists = next( | ||
(True for item in prov_context["Agent"] if item["@id"] == agent_id), | ||
False, | ||
) | ||
return flag_exists | ||
|
||
|
||
def is_activity_tracked(prov_context: dict, activity_id: str) -> bool: | ||
flag_exists = next( | ||
(True for item in prov_context["Activity"] if item["@id"] == activity_id), | ||
False, | ||
) | ||
return flag_exists | ||
|
||
|
||
def get_entity_id(file_path: str) -> str: | ||
from pathlib import Path | ||
|
||
entity_id = Path(file_path).with_suffix("").name | ||
return entity_id | ||
|
||
|
||
def get_activity_id(pipeline_name: str) -> str: | ||
return "clin:" + pipeline_name | ||
|
||
|
||
def get_agent_id(agent_name: str) -> str: | ||
return "clin:" + agent_name | ||
|
||
|
||
def get_last_activity(file_path: str) -> Optional[list]: | ||
|
||
""" | ||
Return the last activity executed on the file | ||
""" | ||
|
||
prov_record = read_prov(get_associated_prov(file_path)) | ||
if prov_record and prov_record["Activity"]: | ||
last_activity = prov_record["Activity"][-1]["@id"] | ||
return last_activity | ||
return None | ||
|
||
|
||
def get_associated_prov(file_path: str) -> Path: | ||
|
||
file_path = Path(file_path) | ||
while file_path.suffix != "": | ||
file_path = file_path.with_suffix("") | ||
|
||
associated_jsonld = file_path.with_suffix(".jsonld") | ||
return associated_jsonld | ||
|
||
|
||
def read_prov(prov_path: Path) -> Optional[dict]: | ||
""" | ||
Check if the given file is a valid provenance json-ld | ||
""" | ||
import json | ||
|
||
# TODO: check that the provenance file associations and uses exists | ||
if prov_path.exists(): | ||
with open(prov_path, "r") as fp: | ||
json_ld_data = json.load(fp) | ||
return json_ld_data | ||
return None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.