Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Multithreading for Enhanced Performance in Custom Check Processing #284

Merged
merged 6 commits into from
May 6, 2024
82 changes: 55 additions & 27 deletions pyQuARC/code/checker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json

from xmltodict import parse
from concurrent.futures import ThreadPoolExecutor

from .custom_checker import CustomChecker
from .schema_validator import SchemaValidator
Expand Down Expand Up @@ -154,43 +155,70 @@ def _check_dependencies_validity(self, dependencies, field_dict):
return False
return True

def _process_field(
self,
func,
check,
rule_id,
metadata_content,
field_dict,
result_dict,
rule_mapping,
):
"""
Process a single field according to the given rule and update result_dict
"""
external_data = rule_mapping.get("data", [])
relation = rule_mapping.get("relation")
dependencies = self.scheduler.get_all_dependencies(
rule_mapping, check, field_dict
)
main_field = field_dict["fields"][0]
external_data = field_dict.get("data", external_data)
result_dict.setdefault(main_field, {})

if not self._check_dependencies_validity(dependencies, field_dict):
return

result = self.custom_checker.run(
func, metadata_content, field_dict, external_data, relation
)

self.tracker.update_data(rule_id, main_field, result["valid"])

# Avoid adding null valid results for rules that are not applied
if result["valid"] is None:
return

result_dict[main_field][rule_id] = result

message = self.build_message(result, rule_id)
if message:
result["message"] = message
result["remediation"] = self.message(rule_id, "remediation")

def _run_func(self, func, check, rule_id, metadata_content, result_dict):
"""
Run the check function for `rule_id` and update `result_dict`
"""
rule_mapping = self.rules_override.get(rule_id) or self.rule_mapping.get(
rule_id
)
external_data = rule_mapping.get("data", [])
relation = rule_mapping.get("relation")
list_of_fields_to_apply = rule_mapping.get("fields_to_apply").get(
self.metadata_format, {}
)

for field_dict in list_of_fields_to_apply:
dependencies = self.scheduler.get_all_dependencies(
rule_mapping, check, field_dict
)
main_field = field_dict["fields"][0]
external_data = field_dict.get("data", external_data)
result_dict.setdefault(main_field, {})
if not self._check_dependencies_validity(dependencies, field_dict):
continue
result = self.custom_checker.run(
func, metadata_content, field_dict, external_data, relation
)

self.tracker.update_data(rule_id, main_field, result["valid"])

# this is to avoid "valid" = null in the result, for rules that are not applied
if result["valid"] is None:
continue
result_dict[main_field][rule_id] = result

message = self.build_message(result, rule_id)
if message:
result["message"] = message
result["remediation"] = self.message(rule_id, "remediation")
with ThreadPoolExecutor() as executor:
for field_dict in list_of_fields_to_apply:
executor.submit(
self._process_field,
func,
check,
rule_id,
metadata_content,
field_dict,
result_dict,
rule_mapping,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't running this in parallel lead to missing values? Eg:
result_dict = {}

Running this method in parallel will pass result_dict to the number of parallel method calls. when updating parallelly, wouldn't result_dict be missing some elements?

Not sure if pass by value takes care of the issue. proper testing (unit and manual testing) is required.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did manual unit and integration tests with a curated list of concept ids, results obtained from pyquarc with and without using multithreading are exactly same.


def perform_custom_checks(self, metadata_content):
"""
Expand Down
79 changes: 61 additions & 18 deletions pyQuARC/code/custom_checker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from urllib.parse import urlparse
from concurrent.futures import ThreadPoolExecutor


class CustomChecker:
Expand Down Expand Up @@ -103,6 +104,45 @@ def _get_path_value(content_to_validate, path_string):
)
return container

@staticmethod
def _process_argument(
arg, func, relation, external_data, external_relation, invalid_values, validity
):
"""
Process the argument by calling the provided function with the given arguments.

Args:
arg: The argument to be processed.
func: The function to be called.
relation: The relation argument.
external_data: The external data argument.
external_relation: The external relation argument.
invalid_values: A list to store invalid values.
validity: The validity flag.

Returns:
A tuple containing the updated invalid_values list and the updated validity flag.
"""

function_args = [*arg]
function_args.extend(
[
extra_arg
for extra_arg in [relation, *external_data, external_relation]
if extra_arg
]
)
func_return = func(*function_args)
valid = func_return["valid"] # can be True, False or None
if valid is not None:
if valid:
validity = validity or (validity is None)
else:
if "value" in func_return:
invalid_values.append(func_return["value"])
validity = False
return invalid_values, validity

def run(
self, func, content_to_validate, field_dict, external_data, external_relation
):
Expand Down Expand Up @@ -137,24 +177,27 @@ def run(

invalid_values = []
validity = None
for arg in args:
function_args = [*arg]
function_args.extend(
[
extra_arg
for extra_arg in [relation, *external_data, external_relation]
if extra_arg
]
)
func_return = func(*function_args)
valid = func_return["valid"] # can be True, False or None
if valid is not None:
if valid:
validity = validity or (validity is None)
else:
if "value" in func_return:
invalid_values.append(func_return["value"])
validity = False

# Process arguments using multithreading
with ThreadPoolExecutor() as executor:
future_results = []
for arg in args:
future = executor.submit(
self._process_argument,
arg,
func,
relation,
external_data,
external_relation,
invalid_values,
validity,
)
future_results.append(future)

# Retrieve results from futures
for future in future_results:
invalid_values, validity = future.result()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't sub-threading be an issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did manual testing with a curated list of concept ids, no issue at all.


result["valid"] = validity
result["value"] = invalid_values
return result
Loading