Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Intialize Provenance Tracking #423

Draft
wants to merge 24 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
5dc0d02
Init traceability feature
omar-rifai Sep 23, 2021
b1bc38e
Add function to create new provenance files
omar-rifai Sep 23, 2021
9c0ee0d
Update clinica_file_reader call
omar-rifai Jan 3, 2022
aa0cb6f
Add data model
omar-rifai Jan 4, 2022
6b28410
rename files
omar-rifai Jan 4, 2022
8fdf9e7
Update prov with Data Model
omar-rifai Jan 11, 2022
47f9ce5
Fix typing with list
omar-rifai Jan 18, 2022
a3ef086
Fix various issues
omar-rifai Jan 21, 2022
bd4636b
Rename prov extraction functions
omar-rifai Jan 27, 2022
5dc8475
Deserialize json-ld
omar-rifai Jan 31, 2022
8f384c5
Update record data model and serialization/deserialization
omar-rifai Feb 2, 2022
d32e06f
Clean up unused code
omar-rifai Feb 3, 2022
126ce0b
fix conflict in rebase
omar-rifai Feb 3, 2022
672c28c
Update prov jsonld representation
omar-rifai Feb 7, 2022
39a2064
manually lint code
omar-rifai Feb 8, 2022
46b9892
Start extending workflow to other pipelines
omar-rifai Feb 15, 2022
f2c5300
Connect provenance through nipype nodes
omar-rifai Feb 23, 2022
fdf8ed7
Add context to provenance
omar-rifai Mar 1, 2022
203d84d
specify error type in try catch
omar-rifai Mar 2, 2022
cb31dd1
Revert changes in clinica/utils/ after architecture change
omar-rifai Mar 2, 2022
4687211
Homogenize the funcs get_input_fields() get_output_fields()
omar-rifai Mar 2, 2022
ddc82c8
Remove unused imports
omar-rifai Mar 3, 2022
54b3ffe
Lint __init__ file in engine
omar-rifai Mar 3, 2022
07204cc
Update fields returned in pet-linear for prov compatibility
omar-rifai Mar 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions clinica/engine/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
from nipype import config

from .cmdparser import CmdParser

config.enable_debug_mode()
135 changes: 135 additions & 0 deletions clinica/engine/prov_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
from abc import ABC, abstractmethod
from typing import List

import attr
from attr import define, field

# Define PROV abstract concepts


@define
class ProvContext:
_namespaces: list


@define
class Namespace:
id: str
uri: str


@define
class Identifier:
label: str = field(
validator=attr.validators.optional(attr.validators.instance_of(str)),
)

def __repr__(self):
return "%s" % self.label


class ProvElement(ABC):
@property
@classmethod
@abstractmethod
def uid(cls):
"""id is required for ProvElements"""
return NotImplementedError

@property
def attributes(cls):
"""attributes are optional"""
return NotImplementedError

@classmethod
def get_type(cls):
return type(cls).__name__


class ProvRelation(ABC):

id: Identifier
src: ProvElement
dest: ProvElement


# Define PROV Types


@define
class ProvEntity(ProvElement):
"""Provenance Entity element"""

uid: Identifier = field(validator=[attr.validators.instance_of(Identifier)])
attributes: dict = field(default=attr.Factory(dict))

def unstruct(self):
return {"id": str(self.uid), **self.attributes}


@define
class ProvActivity(ProvElement):
"""Provenance Activity element"""

uid: Identifier = field(validator=[attr.validators.instance_of(Identifier)])
attributes: dict = field(default=attr.Factory(dict))

def unstruct(self):
return {"id": str(self.uid), **self.attributes}


@define
class ProvAgent(ProvElement):
"""Provenance Agent element"""

uid: Identifier = field(validator=[attr.validators.instance_of(Identifier)])
attributes: dict = field(default=attr.Factory(dict))

def unstruct(self):
return {"id": str(self.uid), **self.attributes}


@define
class ProvEntry:
"""
A prov entry in triple form
"""

subject: ProvElement
predicate: ProvRelation
object: ProvElement


@define
class ProvRecord:
"""
A provenance document containting a PROV context and a list of entries
"""

context: ProvContext = field()
elements: List[ProvElement] = field(default=[])

def __getitem__(self, idx):
for element in self.elements:
if element.uid == idx:
return element

def json(self):

json_dict = {}

context_keys = [x.id for x in self.context._namespaces]
context_vals = [y.uri for y in self.context._namespaces]

json_dict["@context"] = dict(zip(context_keys, context_vals))

json_dict["prov:Agent"] = [
x.unstruct() for x in self.elements if isinstance(x, ProvAgent)
]
json_dict["prov:Activity"] = [
x.unstruct() for x in self.elements if isinstance(x, ProvActivity)
]
json_dict["prov:Entity"] = [
x.unstruct() for x in self.elements if isinstance(x, ProvEntity)
]
return json_dict
238 changes: 238 additions & 0 deletions clinica/engine/prov_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
from pathlib import Path
from typing import List, Optional

from clinica.engine.prov_model import (
Identifier,
Namespace,
ProvActivity,
ProvAgent,
ProvContext,
ProvEntity,
ProvRecord,
)


def mint_agent() -> ProvAgent:
"""
return
ProvAgent associated with running version of the software
"""
from clinica import __name__, __version__
from clinica.engine.prov_utils import generate_agent_id

new_agent = ProvAgent(uid=generate_agent_id())

new_agent.attributes["version"] = __version__
new_agent.attributes["label"] = __name__

return new_agent


def mint_activity(agent: Identifier, entities: List[ProvEntity]) -> ProvActivity:
"""
return
ProvActivity from related entities and associated agent
"""
import sys

from clinica.engine.prov_utils import generate_activity_id

new_activity = ProvActivity(uid=generate_activity_id("testfullname"))

new_activity.attributes["parameters"] = "testparameters"
new_activity.attributes["label"] = "testfullname"
new_activity.attributes["command"] = sys.argv[1:]
new_activity.attributes["used"] = [str(x.uid) for x in entities]
new_activity.attributes["wasAssociatedWith"] = str(agent.uid)

return new_activity


def mint_entity(path_curr: Path) -> ProvEntity:
"""
return an Entity object from the file in path_curr
"""

from clinica.engine.prov_utils import generate_entity_id, get_last_activity

new_entity = ProvEntity(uid=generate_entity_id(path_curr))
new_entity.attributes["label"] = path_curr.name
new_entity.attributes["path"] = str(path_curr)

# TODO: implement function to return the latest associated activity
new_entity.attributes["wasGeneratedBy"] = get_last_activity(path_curr)

return new_entity


def generate_entity_id(path_file: Path) -> Identifier:
id = Identifier(label=path_file.with_suffix("").name)
return id


def generate_activity_id(pipeline_name: str) -> Identifier:
id = Identifier(label="clin:" + pipeline_name)
return id


def generate_agent_id() -> Identifier:
id = Identifier(label="RRID:Clinica")
return id


def get_last_activity(path_entity: Path) -> Optional[ProvActivity]:

"""
return the last activity executed on the file
"""

prov_record = read_prov_jsonld(get_path_prov(path_entity))
if prov_record and prov_record.elements:
# TODO: filter activities by date
last_activity = [
x for x in prov_record.elements if isinstance(x, ProvActivity)
][-1]
return str(last_activity.uid)
return None


def get_path_prov(path_entity: Path) -> Path:
"""
return: Path of the provenance file associated with an entity
"""
if path_entity.is_file():
while path_entity.suffix != "":
path_entity = path_entity.with_suffix("")
path_prov = path_entity.with_suffix(".jsonld")
return path_prov
else:
return None


def create_prov_file(prov_command, prov_path):
"""
Create new provenance file based on command
"""
import json

with open(prov_path, "w") as fp:
json.dump(prov_command.json(), fp, indent=4)

return


def read_prov_jsonld(path_prov: Path) -> Optional[ProvRecord]:
"""
return: ProvRecord in a specific location stored in jsonld format
"""

if path_prov and path_prov.exists():
prov_record = deserialize_jsonld(path_prov)
return prov_record

return None


def deserialize_jsonld(path_prov) -> ProvRecord:
"""
params:

return ProvRecord object from jsonld dictionary data
"""

import rdflib

g = rdflib.Graph(identifier="prov_graph_records")
built_in_namepsaces = list(g.namespace_manager.namespaces())
g.parse(path_prov, format="json-ld")
json_namespaces = list(g.namespace_manager.namespaces())
json_namespaces = list(set(json_namespaces) - set(built_in_namepsaces))

elements = {}

# fetch context:
context = ProvContext([])

for lbl, link in json_namespaces:
namespace = Namespace(lbl, link.n3())
context._namespaces.append(namespace)

for s, p, o in g:
if str(p) == "http://www.w3.org/ns/prov#Activity":
id = Identifier(label=g.namespace_manager.qname(o))
elements[id.label] = ProvActivity(id)

elif str(p) == "http://www.w3.org/ns/prov#Agent":
id = Identifier(label=g.namespace_manager.qname(o))
elements[id.label] = ProvAgent(id)

elif str(p) == "http://www.w3.org/ns/prov#Entity":
id = Identifier(label=g.namespace_manager.qname(o))
elements[id.label] = ProvEntity(id)

for s, p, o in g:
if type(s) != rdflib.term.BNode:
attr = g.namespace_manager.qname(p).split(":")[1]

subj = elements[g.namespace_manager.qname(s)]
subj.attributes[attr] = str(o)

prov_rec = ProvRecord(context=context, elements=list(elements.values()))

return prov_rec


def clean_arguments(pipeline_args, file_func):
import inspect

argspec = inspect.getargspec(file_func)
if not argspec.keywords:
for key in pipeline_args.copy().keys():
if key not in argspec.args:
del pipeline_args[key]
return pipeline_args


def validate_command(prov_history: ProvRecord, prov_current: ProvRecord) -> bool:
"""
Check the command is valid on the data being run
"""
flag = True

for a in prov_history.elements:
for b in prov_current.elements:
# TODO: check that the record entries are compatible with the current entry
flag = True
return flag


def is_valid(command: dict) -> bool:
valid_list = [
{
("clin:clinica0.5.0", "clin:adni2Bids"): (
"clin:clinica0.5.0",
"clin:t1-linear",
)
}
]
if command in valid_list:
return True
return False


def write_prov_file(
list_prov_entries: ProvRecord, path_entity: Path, overwrite=False
) -> None:
"""
Create provenance file with current pipeline information

params:
prov_entries: list of ProvEntry
entity_path: path of the prov-associated element
"""

prov_path = get_path_prov(path_entity)

create_prov_file(list_prov_entries, prov_path)

return
Loading