Skip to content

Commit

Permalink
cove_bods/process.py: Code formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
Ed (ODSC) committed Jan 7, 2025
1 parent 05c484a commit 63cc263
Showing 1 changed file with 123 additions and 84 deletions.
207 changes: 123 additions & 84 deletions cove_bods/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,8 @@
from libcoveweb2.utils import get_file_type_for_flatten_tool
from libcoveweb2.utils import group_data_list_by

from logging import getLogger

logger = getLogger(__name__)


def check_table_file_new(input_file):
#logger.info(f"{filename} {type(filename)}")
if get_file_type_for_flatten_tool(input_file) == "xlsx":
data = pandas.read_excel(input_file.upload_dir_and_filename())
if "statementID" in data.head():
Expand All @@ -42,15 +37,16 @@ def check_table_file_new(input_file):
else:
with open(input_file.upload_dir_and_filename()) as file:
head = file.readline()
if "statementID" in data.head():
if "statementID" in head:
return False
else:
return True


def create_error_file(directory: str, name: str, data: dict):
"""Create temporary error file"""
filename = os.path.join(directory, f"{name}-error.json")
return default_storage.save(filename, ContentFile(json.dumps(data).encode('utf-8')))
return default_storage.save(filename, ContentFile(json.dumps(data).encode("utf-8")))


def error_file_exists(directory: str, name: str) -> bool:
Expand All @@ -62,7 +58,7 @@ def error_file_exists(directory: str, name: str) -> bool:
def read_error_file(directory: str, name: str) -> dict:
"""Read data from error file"""
filename = os.path.join(directory, f"{name}-error.json")
return json.loads(default_storage.open(filename).read().decode('utf-8'))
return json.loads(default_storage.open(filename).read().decode("utf-8"))


def delete_error_file(directory: str, name: str):
Expand All @@ -86,9 +82,9 @@ def get_context(self):
class SetOrTestSuppliedDataFormat(ProcessDataTask):

map_file_type_to_format = {
'json': 'json',
'xlsx': 'spreadsheet',
'ods': 'spreadsheet'
"json": "json",
"xlsx": "spreadsheet",
"ods": "spreadsheet",
}

def is_processing_applicable(self) -> bool:
Expand All @@ -103,10 +99,14 @@ def process(self, process_data: dict) -> dict:
supplied_data_files = SuppliedDataFile.objects.filter(
supplied_data=self.supplied_data
)
all_file_types = [get_file_type_for_flatten_tool(i) for i in supplied_data_files]
all_file_types = [
get_file_type_for_flatten_tool(i) for i in supplied_data_files
]
file_types_reduced = list(set([i for i in all_file_types if i]))
if len(file_types_reduced) == 1:
self.supplied_data.format = self.map_file_type_to_format[file_types_reduced[0]]
self.supplied_data.format = self.map_file_type_to_format[
file_types_reduced[0]
]
self.supplied_data.save()

elif len(file_types_reduced) == 0:
Expand Down Expand Up @@ -206,12 +206,12 @@ def process(self, process_data: dict) -> dict:

if check_table_file_new(supplied_data_json_file):
statement_id_name = "statementID"
schema = config['schema_versions']['0.2']['schema_url']
schema = config["schema_versions"]["0.2"]["schema_url"]
else:
statement_id_name = "statementId"
schema = schema_registry(
config['schema_versions'][config['schema_latest_version']]['schema_url']
).contents("urn:statement")
config["schema_versions"][config["schema_latest_version"]]["schema_url"]
).contents("urn:statement")

unflatten_kwargs = {
"output_name": os.path.join(output_dir, "unflattened.json"),
Expand All @@ -223,8 +223,6 @@ def process(self, process_data: dict) -> dict:
"schema": schema,
}

logger.info(f"{input_filename} {unflatten_kwargs}")

flattentool.unflatten(input_filename, **unflatten_kwargs)

return process_data
Expand Down Expand Up @@ -254,9 +252,7 @@ def __init__(
self, supplied_data: SuppliedData, supplied_data_files: List[SuppliedDataFile]
):
super().__init__(supplied_data, supplied_data_files)
self.data_filename = os.path.join(
self.supplied_data.data_dir(), "schema.json"
)
self.data_filename = os.path.join(self.supplied_data.data_dir(), "schema.json")

def is_processing_applicable(self) -> bool:
return True
Expand All @@ -266,22 +262,22 @@ def is_processing_needed(self) -> bool:

def process(self, process_data: dict) -> dict:
# Make things and set info for later in processing
process_data['data_reader'] = libcovebods.data_reader.DataReader(
process_data["json_data_filename"], sample_mode=process_data['sample_mode']
process_data["data_reader"] = libcovebods.data_reader.DataReader(
process_data["json_data_filename"], sample_mode=process_data["sample_mode"]
)
process_data['config'] = LibCoveBODSConfig()
process_data["config"] = LibCoveBODSConfig()
try:
process_data['schema'] = SchemaBODS(process_data['data_reader'], process_data['config'])
process_data["schema"] = SchemaBODS(
process_data["data_reader"], process_data["config"]
)
except json.decoder.JSONDecodeError:
raise ValueError("JSON: Data parsing error")
logger.info("Schema version:", process_data['schema'].schema_version)
# Save some to disk for templates
if not os.path.exists(self.data_filename):
save_data = {
"schema_version_used": process_data['schema'].schema_version
}
if (packaging_version.parse(process_data['schema'].schema_version)
< packaging_version.parse("0.4")):
save_data = {"schema_version_used": process_data["schema"].schema_version}
if packaging_version.parse(
process_data["schema"].schema_version
) < packaging_version.parse("0.4"):
save_data["record_schema_used"] = False
else:
save_data["record_schema_used"] = True
Expand Down Expand Up @@ -327,7 +323,9 @@ def is_processing_applicable(self) -> bool:
def is_processing_needed(self) -> bool:
if os.path.exists(self.xlsx_filename):
return False
if error_file_exists(self.supplied_data.storage_dir(), "ConvertJSONIntoSpreadsheets"):
if error_file_exists(
self.supplied_data.storage_dir(), "ConvertJSONIntoSpreadsheets"
):
return False
return True

Expand All @@ -339,13 +337,16 @@ def process(self, process_data: dict) -> dict:

os.makedirs(self.output_dir, exist_ok=True)

if os.path.isdir(process_data['schema'].pkg_schema_url):
schema = schema_registry(process_data['schema'].pkg_schema_url).contents("urn:statement")
if os.path.isdir(process_data["schema"].pkg_schema_url):
schema = schema_registry(process_data["schema"].pkg_schema_url).contents(
"urn:statement"
)
else:
schema = process_data['schema'].pkg_schema_url
schema = process_data["schema"].pkg_schema_url

if (packaging_version.parse(process_data['schema'].schema_version)
< packaging_version.parse("0.4")):
if packaging_version.parse(
process_data["schema"].schema_version
) < packaging_version.parse("0.4"):
statement_id_name = "statementID"
else:
statement_id_name = "statementId"
Expand All @@ -363,9 +364,14 @@ def process(self, process_data: dict) -> dict:
flattentool.flatten(process_data["json_data_filename"], **flatten_kwargs)
except Exception as err:
capture_exception(err)
create_error_file(self.supplied_data.storage_dir(), "ConvertJSONIntoSpreadsheets",
{"type": type(err).__name__,
"filename": process_data["json_data_filename"].split('/')[-1]})
create_error_file(
self.supplied_data.storage_dir(),
"ConvertJSONIntoSpreadsheets",
{
"type": type(err).__name__,
"filename": process_data["json_data_filename"].split("/")[-1],
},
)

return process_data

Expand All @@ -382,10 +388,15 @@ def get_context(self):
context["download_xlsx_size"] = os.stat(self.xlsx_filename).st_size
else:
context["can_download_xlsx"] = False
if error_file_exists(self.supplied_data.storage_dir(), "ConvertJSONIntoSpreadsheets"):
context["xlsx_error"] = read_error_file(self.supplied_data.storage_dir(),
"ConvertJSONIntoSpreadsheets")
delete_error_file(self.supplied_data.storage_dir(), "ConvertJSONIntoSpreadsheets")
if error_file_exists(
self.supplied_data.storage_dir(), "ConvertJSONIntoSpreadsheets"
):
context["xlsx_error"] = read_error_file(
self.supplied_data.storage_dir(), "ConvertJSONIntoSpreadsheets"
)
delete_error_file(
self.supplied_data.storage_dir(), "ConvertJSONIntoSpreadsheets"
)
else:
context["xlsx_error"] = False
# done!
Expand All @@ -398,58 +409,85 @@ class PythonValidateTask(TaskWithState):

def process_get_state(self, process_data: dict) -> dict:
context = libcovebods.run_tasks.process_additional_checks(
process_data['data_reader'],
process_data['config'],
process_data['schema'],
task_classes=libcovebods.run_tasks.TASK_CLASSES_IN_SAMPLE_MODE if
process_data["sample_mode"] else libcovebods.run_tasks.TASK_CLASSES
process_data["data_reader"],
process_data["config"],
process_data["schema"],
task_classes=libcovebods.run_tasks.TASK_CLASSES_IN_SAMPLE_MODE
if process_data["sample_mode"]
else libcovebods.run_tasks.TASK_CLASSES,
)

# counts
context["additional_checks_count"] = len(context["additional_checks"])

# We need to calculate some stats for showing in the view
total_ownership_or_control_interest_statements = 0
for key, count in \
context['statistics']['count_ownership_or_control_statement_interest_statement_types'].items():
for key, count in context["statistics"][
"count_ownership_or_control_statement_interest_statement_types"
].items():
total_ownership_or_control_interest_statements += count
context['statistics'][
'count_ownership_or_control_interest_statement'] = total_ownership_or_control_interest_statements # noqa
context["statistics"][
"count_ownership_or_control_interest_statement"
] = total_ownership_or_control_interest_statements # noqa

# The use of r_e_type is to stop flake8 complaining about line length
r_e_type = 'registeredEntity'
context['statistics']['count_entities_registeredEntity_legalEntity_with_any_identifier'] = (
context['statistics']['count_entity_statements_types_with_any_identifier'][r_e_type] +
context['statistics']['count_entity_statements_types_with_any_identifier']['legalEntity'])
context['statistics']['count_entities_registeredEntity_legalEntity_with_any_identifier_with_id_and_scheme'] = (
context['statistics']['count_entity_statements_types_with_any_identifier_with_id_and_scheme'][
r_e_type] +
context['statistics']['count_entity_statements_types_with_any_identifier_with_id_and_scheme'][
'legalEntity'])
context['statistics']['count_entities_registeredEntity_legalEntity'] = (
context['statistics']['count_entity_statements_types'][r_e_type] +
context['statistics']['count_entity_statements_types']['legalEntity'])
unknown_schema_version_used = \
[i for i in context['additional_checks'] if i['type'] == 'unknown_schema_version_used']
context['unknown_schema_version_used'] = unknown_schema_version_used[0] \
if unknown_schema_version_used else None
context['inconsistent_schema_version_used_count'] = \
len([i for i in context['additional_checks'] if i['type'] == 'inconsistent_schema_version_used'])

context['checks_not_run_in_sample_mode'] = []
r_e_type = "registeredEntity"
context["statistics"][
"count_entities_registeredEntity_legalEntity_with_any_identifier"
] = (
context["statistics"]["count_entity_statements_types_with_any_identifier"][
r_e_type
]
+ context["statistics"][
"count_entity_statements_types_with_any_identifier"
]["legalEntity"]
)
context["statistics"][
"count_entities_registeredEntity_legalEntity_with_any_identifier_with_id_and_scheme"
] = (
context["statistics"][
"count_entity_statements_types_with_any_identifier_with_id_and_scheme"
][r_e_type]
+ context["statistics"][
"count_entity_statements_types_with_any_identifier_with_id_and_scheme"
]["legalEntity"]
)
context["statistics"]["count_entities_registeredEntity_legalEntity"] = (
context["statistics"]["count_entity_statements_types"][r_e_type]
+ context["statistics"]["count_entity_statements_types"]["legalEntity"]
)
unknown_schema_version_used = [
i
for i in context["additional_checks"]
if i["type"] == "unknown_schema_version_used"
]
context["unknown_schema_version_used"] = (
unknown_schema_version_used[0] if unknown_schema_version_used else None
)
context["inconsistent_schema_version_used_count"] = len(
[
i
for i in context["additional_checks"]
if i["type"] == "inconsistent_schema_version_used"
]
)

context["checks_not_run_in_sample_mode"] = []
if process_data["sample_mode"]:
classes_not_run_in_sample_mode = [
x for x in libcovebods.run_tasks.TASK_CLASSES
x
for x in libcovebods.run_tasks.TASK_CLASSES
if x not in libcovebods.run_tasks.TASK_CLASSES_IN_SAMPLE_MODE
]
for class_not_run_in_sample_mode in classes_not_run_in_sample_mode:
context['checks_not_run_in_sample_mode'].extend(
context["checks_not_run_in_sample_mode"].extend(
class_not_run_in_sample_mode.get_additional_check_types_possible(
process_data['config'],
process_data['schema']
process_data["config"], process_data["schema"]
)
)
context['checks_not_run_in_sample_mode'] = list(set(context['checks_not_run_in_sample_mode']))
context["checks_not_run_in_sample_mode"] = list(
set(context["checks_not_run_in_sample_mode"])
)

return context, process_data

Expand All @@ -459,18 +497,19 @@ class JsonSchemaValidateTask(TaskWithState):
state_filename: str = "jsonschema_validate.json"

def process_get_state(self, process_data: dict) -> dict:
worker = JSONSchemaValidator(process_data['schema'])
worker = JSONSchemaValidator(process_data["schema"])

# Get list of validation errors
validation_errors = worker.validate(process_data['data_reader'])
validation_errors = worker.validate(process_data["data_reader"])
validation_errors = [i.json() for i in validation_errors]

# Context
context = {
"validation_errors_count": len(validation_errors),
"validation_errors": group_data_list_by(
validation_errors, lambda i: i["validator"] + str(i['path_ending']) + i["message"]
)
validation_errors,
lambda i: i["validator"] + str(i["path_ending"]) + i["message"],
),
}

return context, process_data
Expand All @@ -481,9 +520,9 @@ class AdditionalFieldsChecksTask(TaskWithState):
state_filename: str = "additional_fields.json"

def process_get_state(self, process_data: dict) -> dict:
worker = AdditionalFields(process_data['schema'])
worker = AdditionalFields(process_data["schema"])

output = worker.process(process_data['data_reader'])
output = worker.process(process_data["data_reader"])
context = {"additional_fields": output}
context["any_additional_fields_exist"] = len(output) > 0

Expand Down

0 comments on commit 63cc263

Please sign in to comment.