Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Multithreading for Enhanced Performance in Custom Check Processing #284

Merged
merged 6 commits into from
May 6, 2024
93 changes: 66 additions & 27 deletions pyQuARC/code/checker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json

from xmltodict import parse
from concurrent.futures import ThreadPoolExecutor, as_completed

from .custom_checker import CustomChecker
from .schema_validator import SchemaValidator
Expand Down Expand Up @@ -154,43 +155,81 @@ def _check_dependencies_validity(self, dependencies, field_dict):
return False
return True

def _process_field(
self,
func,
check,
rule_id,
metadata_content,
field_dict,
result_dict,
rule_mapping,
):
"""
Process a single field according to the given rule and update result_dict
"""
external_data = rule_mapping.get("data", [])
relation = rule_mapping.get("relation")
dependencies = self.scheduler.get_all_dependencies(
rule_mapping, check, field_dict
)
main_field = field_dict["fields"][0]
external_data = field_dict.get("data", external_data)
result_dict.setdefault(main_field, {})

if not self._check_dependencies_validity(dependencies, field_dict):
return

result = self.custom_checker.run(
func, metadata_content, field_dict, external_data, relation
)

self.tracker.update_data(rule_id, main_field, result["valid"])

# Avoid adding null valid results for rules that are not applied
if result["valid"] is None:
return

result_dict[main_field][rule_id] = result

message = self.build_message(result, rule_id)
if message:
result["message"] = message
result["remediation"] = self.message(rule_id, "remediation")

def _run_func(self, func, check, rule_id, metadata_content, result_dict):
"""
Run the check function for `rule_id` and update `result_dict`
"""
rule_mapping = self.rules_override.get(rule_id) or self.rule_mapping.get(
rule_id
)
external_data = rule_mapping.get("data", [])
relation = rule_mapping.get("relation")
list_of_fields_to_apply = rule_mapping.get("fields_to_apply").get(
self.metadata_format, {}
)

for field_dict in list_of_fields_to_apply:
dependencies = self.scheduler.get_all_dependencies(
rule_mapping, check, field_dict
)
main_field = field_dict["fields"][0]
external_data = field_dict.get("data", external_data)
result_dict.setdefault(main_field, {})
if not self._check_dependencies_validity(dependencies, field_dict):
continue
result = self.custom_checker.run(
func, metadata_content, field_dict, external_data, relation
)

self.tracker.update_data(rule_id, main_field, result["valid"])

# this is to avoid "valid" = null in the result, for rules that are not applied
if result["valid"] is None:
continue
result_dict[main_field][rule_id] = result

message = self.build_message(result, rule_id)
if message:
result["message"] = message
result["remediation"] = self.message(rule_id, "remediation")
with ThreadPoolExecutor(max_workers=5) as executor:
futures = []
for field_dict in list_of_fields_to_apply:
future = executor.submit(
self._process_field,
func,
check,
rule_id,
metadata_content,
field_dict,
result_dict,
rule_mapping,
)
futures.append(future)

# Wait for all futures to complete
for future in as_completed(futures):
# Retrieve the result or raise an exception if an error occurred
try:
future.result()
except Exception as e:
# Handle the exception from the thread
raise e

def perform_custom_checks(self, metadata_content):
"""
Expand Down
75 changes: 57 additions & 18 deletions pyQuARC/code/custom_checker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from urllib.parse import urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed


class CustomChecker:
Expand Down Expand Up @@ -103,6 +104,33 @@ def _get_path_value(content_to_validate, path_string):
)
return container

@staticmethod
def _process_argument(arg, func, relation, external_data, external_relation):
"""
Process the argument by calling the provided function with the given arguments.

Args:
arg: The argument to be processed.
func: The function to be called.
relation: The relation argument.
external_data: The external data argument.
external_relation: The external relation argument.

Returns:
A dict containing the updated invalid_values list and the updated validity flag.
"""

function_args = [*arg]
function_args.extend(
[
extra_arg
for extra_arg in [relation, *external_data, external_relation]
if extra_arg
]
)
func_return = func(*function_args)
return func_return

def run(
self, func, content_to_validate, field_dict, external_data, external_relation
):
Expand Down Expand Up @@ -137,24 +165,35 @@ def run(

invalid_values = []
validity = None
for arg in args:
function_args = [*arg]
function_args.extend(
[
extra_arg
for extra_arg in [relation, *external_data, external_relation]
if extra_arg
]
)
func_return = func(*function_args)
valid = func_return["valid"] # can be True, False or None
if valid is not None:
if valid:
validity = validity or (validity is None)
else:
if "value" in func_return:
invalid_values.append(func_return["value"])
validity = False

# Process arguments using multithreading
with ThreadPoolExecutor() as executor:
future_results = []
for arg in args:
future = executor.submit(
self._process_argument,
arg,
func,
relation,
external_data,
external_relation,
)
future_results.append(future)

# Retrieve results from futures
for future in as_completed(future_results):
try:
func_return = future.result()
valid = func_return["valid"] # can be True, False or None
if valid is not None:
if valid:
validity = validity or (validity is None)
else:
if "value" in func_return:
invalid_values.append(func_return["value"])
validity = False
except Exception as e:
raise e
result["valid"] = validity
result["value"] = invalid_values
return result
Loading