diff --git a/clinica/engine/__init__.py b/clinica/engine/__init__.py index 5396b9d94..66d9fb818 100644 --- a/clinica/engine/__init__.py +++ b/clinica/engine/__init__.py @@ -1 +1,5 @@ +from nipype import config + from .cmdparser import CmdParser + +config.enable_debug_mode() diff --git a/clinica/engine/prov_model.py b/clinica/engine/prov_model.py new file mode 100644 index 000000000..eb178d44d --- /dev/null +++ b/clinica/engine/prov_model.py @@ -0,0 +1,135 @@ +from abc import ABC, abstractmethod +from typing import List + +import attr +from attr import define, field + +# Define PROV abstract concepts + + +@define +class ProvContext: + _namespaces: list + + +@define +class Namespace: + id: str + uri: str + + +@define +class Identifier: + label: str = field( + validator=attr.validators.optional(attr.validators.instance_of(str)), + ) + + def __repr__(self): + return "%s" % self.label + + +class ProvElement(ABC): + @property + @classmethod + @abstractmethod + def uid(cls): + """id is required for ProvElements""" + return NotImplementedError + + @property + def attributes(cls): + """attributes are optional""" + return NotImplementedError + + @classmethod + def get_type(cls): + return type(cls).__name__ + + +class ProvRelation(ABC): + + id: Identifier + src: ProvElement + dest: ProvElement + + +# Define PROV Types + + +@define +class ProvEntity(ProvElement): + """Provenance Entity element""" + + uid: Identifier = field(validator=[attr.validators.instance_of(Identifier)]) + attributes: dict = field(default=attr.Factory(dict)) + + def unstruct(self): + return {"id": str(self.uid), **self.attributes} + + +@define +class ProvActivity(ProvElement): + """Provenance Activity element""" + + uid: Identifier = field(validator=[attr.validators.instance_of(Identifier)]) + attributes: dict = field(default=attr.Factory(dict)) + + def unstruct(self): + return {"id": str(self.uid), **self.attributes} + + +@define +class ProvAgent(ProvElement): + """Provenance Agent element""" + + uid: Identifier = field(validator=[attr.validators.instance_of(Identifier)]) + attributes: dict = field(default=attr.Factory(dict)) + + def unstruct(self): + return {"id": str(self.uid), **self.attributes} + + +@define +class ProvEntry: + """ + A prov entry in triple form + """ + + subject: ProvElement + predicate: ProvRelation + object: ProvElement + + +@define +class ProvRecord: + """ + A provenance document containting a PROV context and a list of entries + """ + + context: ProvContext = field() + elements: List[ProvElement] = field(default=[]) + + def __getitem__(self, idx): + for element in self.elements: + if element.uid == idx: + return element + + def json(self): + + json_dict = {} + + context_keys = [x.id for x in self.context._namespaces] + context_vals = [y.uri for y in self.context._namespaces] + + json_dict["@context"] = dict(zip(context_keys, context_vals)) + + json_dict["prov:Agent"] = [ + x.unstruct() for x in self.elements if isinstance(x, ProvAgent) + ] + json_dict["prov:Activity"] = [ + x.unstruct() for x in self.elements if isinstance(x, ProvActivity) + ] + json_dict["prov:Entity"] = [ + x.unstruct() for x in self.elements if isinstance(x, ProvEntity) + ] + return json_dict diff --git a/clinica/engine/prov_utils.py b/clinica/engine/prov_utils.py new file mode 100644 index 000000000..47945faea --- /dev/null +++ b/clinica/engine/prov_utils.py @@ -0,0 +1,238 @@ +from pathlib import Path +from typing import List, Optional + +from clinica.engine.prov_model import ( + Identifier, + Namespace, + ProvActivity, + ProvAgent, + ProvContext, + ProvEntity, + ProvRecord, +) + + +def mint_agent() -> ProvAgent: + """ + return + ProvAgent associated with running version of the software + """ + from clinica import __name__, __version__ + from clinica.engine.prov_utils import generate_agent_id + + new_agent = ProvAgent(uid=generate_agent_id()) + + new_agent.attributes["version"] = __version__ + new_agent.attributes["label"] = __name__ + + return new_agent + + +def mint_activity(agent: Identifier, entities: List[ProvEntity]) -> ProvActivity: + """ + return + ProvActivity from related entities and associated agent + """ + import sys + + from clinica.engine.prov_utils import generate_activity_id + + new_activity = ProvActivity(uid=generate_activity_id("testfullname")) + + new_activity.attributes["parameters"] = "testparameters" + new_activity.attributes["label"] = "testfullname" + new_activity.attributes["command"] = sys.argv[1:] + new_activity.attributes["used"] = [str(x.uid) for x in entities] + new_activity.attributes["wasAssociatedWith"] = str(agent.uid) + + return new_activity + + +def mint_entity(path_curr: Path) -> ProvEntity: + """ + return an Entity object from the file in path_curr + """ + + from clinica.engine.prov_utils import generate_entity_id, get_last_activity + + new_entity = ProvEntity(uid=generate_entity_id(path_curr)) + new_entity.attributes["label"] = path_curr.name + new_entity.attributes["path"] = str(path_curr) + + # TODO: implement function to return the latest associated activity + new_entity.attributes["wasGeneratedBy"] = get_last_activity(path_curr) + + return new_entity + + +def generate_entity_id(path_file: Path) -> Identifier: + id = Identifier(label=path_file.with_suffix("").name) + return id + + +def generate_activity_id(pipeline_name: str) -> Identifier: + id = Identifier(label="clin:" + pipeline_name) + return id + + +def generate_agent_id() -> Identifier: + id = Identifier(label="RRID:Clinica") + return id + + +def get_last_activity(path_entity: Path) -> Optional[ProvActivity]: + + """ + return the last activity executed on the file + """ + + prov_record = read_prov_jsonld(get_path_prov(path_entity)) + if prov_record and prov_record.elements: + # TODO: filter activities by date + last_activity = [ + x for x in prov_record.elements if isinstance(x, ProvActivity) + ][-1] + return str(last_activity.uid) + return None + + +def get_path_prov(path_entity: Path) -> Path: + """ + return: Path of the provenance file associated with an entity + """ + if path_entity.is_file(): + while path_entity.suffix != "": + path_entity = path_entity.with_suffix("") + path_prov = path_entity.with_suffix(".jsonld") + return path_prov + else: + return None + + +def create_prov_file(prov_command, prov_path): + """ + Create new provenance file based on command + """ + import json + + with open(prov_path, "w") as fp: + json.dump(prov_command.json(), fp, indent=4) + + return + + +def read_prov_jsonld(path_prov: Path) -> Optional[ProvRecord]: + """ + return: ProvRecord in a specific location stored in jsonld format + """ + + if path_prov and path_prov.exists(): + prov_record = deserialize_jsonld(path_prov) + return prov_record + + return None + + +def deserialize_jsonld(path_prov) -> ProvRecord: + """ + params: + + return ProvRecord object from jsonld dictionary data + """ + + import rdflib + + g = rdflib.Graph(identifier="prov_graph_records") + built_in_namepsaces = list(g.namespace_manager.namespaces()) + g.parse(path_prov, format="json-ld") + json_namespaces = list(g.namespace_manager.namespaces()) + json_namespaces = list(set(json_namespaces) - set(built_in_namepsaces)) + + elements = {} + + # fetch context: + context = ProvContext([]) + + for lbl, link in json_namespaces: + namespace = Namespace(lbl, link.n3()) + context._namespaces.append(namespace) + + for s, p, o in g: + if str(p) == "http://www.w3.org/ns/prov#Activity": + id = Identifier(label=g.namespace_manager.qname(o)) + elements[id.label] = ProvActivity(id) + + elif str(p) == "http://www.w3.org/ns/prov#Agent": + id = Identifier(label=g.namespace_manager.qname(o)) + elements[id.label] = ProvAgent(id) + + elif str(p) == "http://www.w3.org/ns/prov#Entity": + id = Identifier(label=g.namespace_manager.qname(o)) + elements[id.label] = ProvEntity(id) + + for s, p, o in g: + if type(s) != rdflib.term.BNode: + attr = g.namespace_manager.qname(p).split(":")[1] + + subj = elements[g.namespace_manager.qname(s)] + subj.attributes[attr] = str(o) + + prov_rec = ProvRecord(context=context, elements=list(elements.values())) + + return prov_rec + + +def clean_arguments(pipeline_args, file_func): + import inspect + + argspec = inspect.getargspec(file_func) + if not argspec.keywords: + for key in pipeline_args.copy().keys(): + if key not in argspec.args: + del pipeline_args[key] + return pipeline_args + + +def validate_command(prov_history: ProvRecord, prov_current: ProvRecord) -> bool: + """ + Check the command is valid on the data being run + """ + flag = True + + for a in prov_history.elements: + for b in prov_current.elements: + # TODO: check that the record entries are compatible with the current entry + flag = True + return flag + + +def is_valid(command: dict) -> bool: + valid_list = [ + { + ("clin:clinica0.5.0", "clin:adni2Bids"): ( + "clin:clinica0.5.0", + "clin:t1-linear", + ) + } + ] + if command in valid_list: + return True + return False + + +def write_prov_file( + list_prov_entries: ProvRecord, path_entity: Path, overwrite=False +) -> None: + """ + Create provenance file with current pipeline information + + params: + prov_entries: list of ProvEntry + entity_path: path of the prov-associated element + """ + + prov_path = get_path_prov(path_entity) + + create_prov_file(list_prov_entries, prov_path) + + return diff --git a/clinica/engine/provenance.py b/clinica/engine/provenance.py new file mode 100644 index 000000000..7559dae72 --- /dev/null +++ b/clinica/engine/provenance.py @@ -0,0 +1,170 @@ +import functools + + +def provenance(func): + @functools.wraps(func) + def run_wrapper(self, **kwargs): + ret = func(self) + + create_node_read(self) + create_node_update(self) + create_node_log(self) + connect_nodes(self) + + return ret + + return run_wrapper + + +def connect_nodes(self): + # fmt: off + + try: + output_field = self.get_output_fields()[0] + self.connect([(self.output_node, self.prov_log_node, [(output_field, "out_file")])]) + except IndexError: + self.connect([(self.output_node, self.prov_log_node, [("", "out_file")])]) + + self.connect( + [ + (self.input_node, self.prov_input_node, [("t1w", "input_files")]), + (self.input_node, self.prov_update_node, [("t1w", "input_files")]), + (self.prov_input_node, self.prov_update_node, [("prov_in_record", "prov_in_record")]), + (self.prov_update_node, self.prov_log_node,[("prov_upd_record", "prov_log_record")]), + ] + ) + return True + # fmt: on + + +def create_node_read(self): + import nipype.interfaces.utility as nutil + import nipype.pipeline.engine as npe + + self.prov_input_node = npe.Node( + nutil.Function( + input_names=["input_files"], + output_names=["prov_in_record"], + function=read_prov, + ), + name="ReadProvRecord", + ) + + +def create_node_update(self): + import nipype.interfaces.utility as nutil + import nipype.pipeline.engine as npe + + self.prov_update_node = npe.Node( + nutil.Function( + input_names=["input_files", "prov_in_record"], + output_names=["prov_upd_record"], + function=update_prov, + ), + name="UpdateRecord", + ) + + return True + + +def create_node_log(self): + import nipype.interfaces.utility as nutil + import nipype.pipeline.engine as npe + + self.prov_log_node = npe.Node( + nutil.Function( + input_names=["prov_log_record", "out_file", "out_dir"], + output_names=["output_record"], + function=log_prov, + ), + name="LogProv", + ) + + self.prov_log_node.inputs.out_dir = self.caps_directory + return + + +def read_prov(input_files): + """ + return: + a ProvRecord for the associated files in path_files + """ + from pathlib import Path + + from clinica.engine.prov_model import ProvContext, ProvRecord + from clinica.engine.prov_utils import get_path_prov, read_prov_jsonld + + prov_record = ProvRecord(ProvContext([]), []) + if isinstance(input_files, list): + paths_files = [Path(x) for x in input_files] + elif isinstance(input_files, str): + paths_files = [Path(input_files)] + + for path in paths_files: + prov_record_tmp = read_prov_jsonld(get_path_prov(path)) + if prov_record_tmp: + prov_record.context = prov_record_tmp.context + prov_record.elements.extend(prov_record_tmp.elements) + return prov_record + + +def update_prov(input_files, prov_in_record): + """ + params: + input_files: list of input entries + return: + ProvRecord associated with the launched pipeline + """ + from pathlib import Path + + from clinica.engine.prov_model import ProvRecord + from clinica.engine.prov_utils import ( + mint_activity, + mint_agent, + mint_entity, + validate_command, + ) + + elements = [] + new_agent = mint_agent() + elements.append(new_agent) + new_entities = [] + + if isinstance(input_files, list): + paths_files = [Path(x) for x in input_files] + elif isinstance(input_files, str): + paths_files = [Path(input_files)] + + for path in paths_files: + entity_curr = mint_entity(path) + new_entities.append(entity_curr) + elements.extend(new_entities) + + new_activity = mint_activity(new_agent, new_entities) + elements.append(new_activity) + + prov_current = ProvRecord(prov_in_record.context, elements=elements) + + if not validate_command(prov_in_record, prov_current): + raise ("Invalid commmand") + return prov_current + + +def log_prov(prov_log_record, out_file, out_dir): + from pathlib import Path + + from clinica.engine.prov_utils import write_prov_file + + out_files_paths = [] + + if isinstance(out_file, list): + for x in out_file: + out_files_paths.extend(list(Path(out_dir).rglob(x))) + elif isinstance(out_file, str): + + out_files_paths = list(Path(out_dir).rglob(Path(out_file).name)) + + for path_file in out_files_paths: + write_prov_file(prov_log_record, path_file) + print("Provenance registered succesfully") + return True diff --git a/clinica/pipelines/engine.py b/clinica/pipelines/engine.py index ec72d6f02..4b8e63cd6 100644 --- a/clinica/pipelines/engine.py +++ b/clinica/pipelines/engine.py @@ -8,6 +8,8 @@ import click from nipype.pipeline.engine import Workflow +import clinica.engine.provenance as prov + def postset(attribute, value): """Sets the attribute of an object after the execution. @@ -211,6 +213,7 @@ def has_output_connections(self): return False @postset("is_built", True) + @prov.provenance def build(self): """Builds the core, input and output nodes of the Pipeline. @@ -228,10 +231,10 @@ def build(self): self.check_dependencies() self.check_pipeline_parameters() if not self.has_input_connections(): - self.build_input_node() + self.input_files = self.build_input_node() self.build_core_nodes() if not self.has_output_connections(): - self.build_output_node() + self.output_files = self.build_output_node() return self def run(self, plugin=None, plugin_args=None, update_hash=False, bypass_check=False): diff --git a/clinica/pipelines/pet_linear/pet_linear_pipeline.py b/clinica/pipelines/pet_linear/pet_linear_pipeline.py index dd828dda2..448f08d32 100644 --- a/clinica/pipelines/pet_linear/pet_linear_pipeline.py +++ b/clinica/pipelines/pet_linear/pet_linear_pipeline.py @@ -43,9 +43,9 @@ def get_output_fields(self): A list of (string) output fields name. """ return [ - "registered_pet", - "transform_mat", - "registered_pet_in_t1w", + "suvr_pet", + "affine_mat", + "PETinT1w", ] # Fill here the list def build_input_node(self): diff --git a/clinica/pipelines/pet_surface/pet_surface_pipeline.py b/clinica/pipelines/pet_surface/pet_surface_pipeline.py index 9fe61d2a3..da77b98b4 100644 --- a/clinica/pipelines/pet_surface/pet_surface_pipeline.py +++ b/clinica/pipelines/pet_surface/pet_surface_pipeline.py @@ -41,7 +41,7 @@ def get_input_fields(self): def get_output_fields(self): """Specify the list of possible outputs of this pipeline.""" - return [] + return [""] def build_input_node(self): """Build and connect an input node to the pipeline.""" diff --git a/clinica/pipelines/statistics_surface/statistics_surface_pipeline.py b/clinica/pipelines/statistics_surface/statistics_surface_pipeline.py index 8a79cb9da..790b12020 100644 --- a/clinica/pipelines/statistics_surface/statistics_surface_pipeline.py +++ b/clinica/pipelines/statistics_surface/statistics_surface_pipeline.py @@ -74,7 +74,7 @@ def get_input_fields(self): Returns: A list of (string) input fields name. """ - return [] + return [""] def get_output_fields(self): """Specify the list of possible outputs of this pipeline. diff --git a/clinica/pipelines/statistics_volume_correction/statistics_volume_correction_pipeline.py b/clinica/pipelines/statistics_volume_correction/statistics_volume_correction_pipeline.py index b9afd5e63..3ad08980e 100644 --- a/clinica/pipelines/statistics_volume_correction/statistics_volume_correction_pipeline.py +++ b/clinica/pipelines/statistics_volume_correction/statistics_volume_correction_pipeline.py @@ -25,7 +25,7 @@ def get_output_fields(self): Returns: A list of (string) output fields name. """ - return [] + return [""] def build_input_node(self): """Build and connect an input node to the pipeline.""" diff --git a/clinica/pipelines/t1_volume_parcellation/t1_volume_parcellation_pipeline.py b/clinica/pipelines/t1_volume_parcellation/t1_volume_parcellation_pipeline.py index 0ff7193f6..32f847b31 100644 --- a/clinica/pipelines/t1_volume_parcellation/t1_volume_parcellation_pipeline.py +++ b/clinica/pipelines/t1_volume_parcellation/t1_volume_parcellation_pipeline.py @@ -36,6 +36,7 @@ def get_output_fields(self): Returns: A list of (string) output fields name. """ + return [""] def build_input_node(self): """Build and connect an input node to the pipeline."""