Skip to content

Commit

Permalink
Call data dims when entry name is present
Browse files Browse the repository at this point in the history
  • Loading branch information
domna committed Jul 22, 2024
1 parent 619095a commit a1756ad
Showing 1 changed file with 88 additions and 70 deletions.
158 changes: 88 additions & 70 deletions src/pynxtools/dataconverter/readers/multi/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
logger.setLevel(logging.INFO)


def fill_wildcard_data_indices(config_file_dict, callbacks):
def fill_wildcard_data_indices(config_file_dict, key, value, dims):
"""
Replaces the wildcard data indices (*) with the respective dimension entries.
"""
Expand All @@ -47,22 +47,17 @@ def get_vals_on_same_level(key):
if k.startswith(key):
yield v

dims = callbacks.dims(callbacks.entry_name)
dim_dict = {}
for dim in dims:
new_key = key.replace("*", dim)
new_val = value.replace("*", dim)

for key, value in list(config_file_dict.items()):
if "*" not in key:
continue
for dim in dims:
new_key = key.replace("*", dim)
new_val = value.replace("*", dim)
if new_key not in config_file_dict and new_val not in get_vals_on_same_level(
new_key
):
dim_dict[new_key] = new_val

if (
new_key not in config_file_dict
and new_val not in get_vals_on_same_level(new_key)
):
config_file_dict[new_key] = new_val

del config_file_dict[key]
return dim_dict


class ParseJsonCallbacks:
Expand All @@ -86,15 +81,15 @@ class ParseJsonCallbacks:

special_key_map: Dict[str, Callable[[str], Any]]
entry_name: str
dims: Callable[[], List[str]]
dims: Callable[[str], List[str]]

def __init__(
self,
attrs_callback: Optional[Callable[[str], Any]] = None,
data_callback: Optional[Callable[[str], Any]] = None,
link_callback: Optional[Callable[[str], Any]] = None,
eln_callback: Optional[Callable[[str], Any]] = None,
dims: Optional[Callable[[], List[str]]] = None,
dims: Optional[Callable[[str], List[str]]] = None,
entry_name: str = "entry",
):
self.special_key_map = {
Expand All @@ -104,7 +99,7 @@ def __init__(
"eln": eln_callback if eln_callback is not None else self.identity,
}

self.dims = dims if dims is not None else lambda: []
self.dims = dims if dims is not None else lambda _: []
self.entry_name = entry_name

def link_callback(self, value: str) -> Dict[str, Any]:
Expand All @@ -120,15 +115,13 @@ def apply_special_key(self, precursor, key, value):
return self.special_key_map.get(precursor, self.identity)(value)


def parse_json_config(
file_path: str,
entry_names: List[str],
callbacks: Optional[ParseJsonCallbacks] = None,
) -> dict:
"""
Parses a json file and returns the data as a dictionary.
"""

def resolve_special_keys(
new_entry_dict: Dict[str, Any],
key: str,
value: Any,
optional_groups_to_remove: List[str],
callbacks: ParseJsonCallbacks,
) -> None:
def try_convert(value: str) -> Union[str, float, int, bool]:
"""
Try to convert the value to float, int or bool.
Expand All @@ -144,12 +137,66 @@ def try_convert(value: str) -> Union[str, float, int, bool]:

return value

if not isinstance(value, str) or ":" not in value:
new_entry_dict[key] = value
return

prefix_part, value = value.split(":", 1)
prefixes = re.findall(r"@(\w+)", prefix_part)

if prefix_part.startswith("!"):
optional_groups_to_remove.append(key.rsplit("/", 1)[0])
optional_groups_to_remove.append(key)
prefix_part = prefix_part[1:]

for prefix in prefixes:
new_entry_dict[key] = callbacks.apply_special_key(prefix, key, value)

# We found a match. Stop resolving other prefixes.
if new_entry_dict[key] is not None:
break

last_value = prefix_part.rsplit(",", 1)[-1]
if new_entry_dict[key] is None and last_value[0] not in ("@", "!"):
new_entry_dict[key] = try_convert(last_value)

if prefix_part.startswith("!") and new_entry_dict[key] is None:
group_to_delete = key.rsplit("/", 1)[0]
logger.info(
f"Main element {key} not provided. "
f"Removing the parent group {group_to_delete}."
)
optional_groups_to_remove.append(group_to_delete)
return

if prefixes and new_entry_dict[key] is None:
del new_entry_dict[key]
logger.warning(
f"Could not find value for key {key} with value {value}.\n"
f"Tried prefixes: {prefixes}."
)

# after filling, resolve links again:
if isinstance(new_entry_dict.get(key), str) and new_entry_dict[key].startswith(
"@link:"
):
new_entry_dict[key] = {"link": new_entry_dict[key][6:]}


def parse_json_config(
file_path: str,
entry_names: List[str],
callbacks: Optional[ParseJsonCallbacks] = None,
) -> dict:
"""
Parses a json file and returns the data as a dictionary.
"""

if callbacks is None:
# Use default callbacks if none are explicitly provided
callbacks = ParseJsonCallbacks()

config_file_dict = parse_flatten_json(file_path)
fill_wildcard_data_indices(config_file_dict, callbacks)

optional_groups_to_remove: List[str] = []
new_entry_dict = {}
Expand All @@ -165,50 +212,21 @@ def try_convert(value: str) -> Union[str, float, int, bool]:
if key.rsplit("/", 1)[0] in optional_groups_to_remove:
continue

if not isinstance(value, str) or ":" not in value:
new_entry_dict[key] = value
continue

prefix_part, value = value.split(":", 1)
prefixes = re.findall(r"@(\w+)", prefix_part)

if prefix_part.startswith("!"):
optional_groups_to_remove.append(key.rsplit("/", 1)[0])
optional_groups_to_remove.append(key)
prefix_part = prefix_part[1:]

for prefix in prefixes:
new_entry_dict[key] = callbacks.apply_special_key(prefix, key, value)

# We found a match. Stop resolving other prefixes.
if new_entry_dict[key] is not None:
break

last_value = prefix_part.rsplit(",", 1)[-1]
if new_entry_dict[key] is None and last_value[0] not in ("@", "!"):
new_entry_dict[key] = try_convert(last_value)

if prefix_part.startswith("!") and new_entry_dict[key] is None:
group_to_delete = key.rsplit("/", 1)[0]
logger.info(
f"Main element {key} not provided. "
f"Removing the parent group {group_to_delete}."
if "*" in key:
dims = callbacks.dims(value)
dim_data = fill_wildcard_data_indices(
config_file_dict, key, value, dims
)
optional_groups_to_remove.append(group_to_delete)
for k, v in dim_data.items():
resolve_special_keys(
dim_data, k, v, optional_groups_to_remove, callbacks
)
new_entry_dict.update(dim_data)
continue

if prefixes and new_entry_dict[key] is None:
del new_entry_dict[key]
logger.warning(
f"Could not find value for key {key} with value {value}.\n"
f"Tried prefixes: {prefixes}."
)

# after filling, resolve links again:
if isinstance(new_entry_dict.get(key), str) and new_entry_dict[
key
].startswith("@link:"):
new_entry_dict[key] = {"link": new_entry_dict[key][6:]}
resolve_special_keys(
new_entry_dict, key, value, optional_groups_to_remove, callbacks
)

return new_entry_dict

Expand Down Expand Up @@ -270,7 +288,7 @@ def get_eln_data(self, path: str) -> Any:
"""
return {}

def get_data_dims(self) -> List[str]:
def get_data_dims(self, path: str) -> List[str]:
"""
Returns the dimensions of the data from the given path.
"""
Expand Down

0 comments on commit a1756ad

Please sign in to comment.