From 5eb9e4d6fe51baf52e344e752d1bd10bea052d21 Mon Sep 17 00:00:00 2001 From: ishitas <ishitas@ucar.edu> Date: Wed, 18 Sep 2024 21:41:45 +0000 Subject: [PATCH] Added logging statements and modified tests accordingly --- metcalcpy/agg_eclv.py | 5 +- metcalcpy/agg_stat.py | 3 +- metcalcpy/agg_stat_bootstrap.py | 92 ++++++++- metcalcpy/agg_stat_eqz.py | 57 +++++- metcalcpy/agg_stat_event_equalize.py | 46 ++++- metcalcpy/bootstrap.py | 133 +++++++++---- metcalcpy/calc_difficulty_index.py | 57 +++++- metcalcpy/event_equalize.py | 23 ++- metcalcpy/event_equalize_against_values.py | 21 +- metcalcpy/logging_config.py | 2 +- metcalcpy/piecewise_linear.py | 7 +- metcalcpy/scorecard.py | 221 +++++++++++++-------- metcalcpy/sum_stat.py | 57 ++++-- metcalcpy/validate_mv_python.py | 21 ++ test/ecnt_agg_stat.yaml | 2 +- test/test_agg_eclv.py | 2 +- test/test_agg_ratio.py | 5 +- test/test_calc_difficulty_index.py | 6 +- test/test_event_equalize.py | 2 +- test/test_event_equalize_against_values.py | 2 +- test/test_scorecard.py | 4 +- test/val1l2_agg_stat.yaml | 2 +- test/vcnt_agg_stat.yaml | 2 +- test/vl1l2_agg_stat_met_v12.yaml | 2 +- 24 files changed, 581 insertions(+), 193 deletions(-) diff --git a/metcalcpy/agg_eclv.py b/metcalcpy/agg_eclv.py index 8c3e8cd3..27567a85 100644 --- a/metcalcpy/agg_eclv.py +++ b/metcalcpy/agg_eclv.py @@ -207,7 +207,8 @@ def _get_bootstrapped_stats(self, series_data, thresholds): ci_method=self.params['method'], save_data=False, block_length=block_length, - eclv=True + eclv=True, + logger=logger ) logger.info(f"Bootstrapped statistics calculated for threshold {thresh}.") except KeyError as err: @@ -342,7 +343,7 @@ def calculate_stats_and_ci(self): self.input_data = event_equalize(self.input_data, 'stat_name', self.params['series_val_1'], fix_vals_keys, - fix_vals_permuted_list, is_equalize_by_indep, False) + fix_vals_permuted_list, is_equalize_by_indep, False, logger) logger.debug("Event equalization completed.") # Process data to calculate statistics diff --git a/metcalcpy/agg_stat.py b/metcalcpy/agg_stat.py index 2d04f2e1..0df3199d 100644 --- a/metcalcpy/agg_stat.py +++ b/metcalcpy/agg_stat.py @@ -1166,7 +1166,8 @@ def _get_bootstrapped_stats(self, series_data, axis="1"): num_threads=self.params['num_threads'], ci_method=self.params['method'], save_data=has_derived_series, - block_length=block_length + block_length=block_length, + logger=logger ) logger.info("Bootstrapping and CI calculation completed.") diff --git a/metcalcpy/agg_stat_bootstrap.py b/metcalcpy/agg_stat_bootstrap.py index 10ffaeb6..ecec9782 100644 --- a/metcalcpy/agg_stat_bootstrap.py +++ b/metcalcpy/agg_stat_bootstrap.py @@ -49,7 +49,7 @@ from metcalcpy.util.mode_3d_volrat_statistics import * from metcalcpy.util.mode_3d_ratio_statistics import * from metcalcpy.util.utils import is_string_integer, parse_bool, sort_data, is_string_strictly_float - +from metcalcpy.logging_config import setup_logging class AggStatBootstrap: """A class that performs aggregation statistic logic fot MODE and MTD ratio statistics on input data frame. @@ -68,7 +68,9 @@ def __init__(self, in_params): Args: in_params - input parameters as a dictionary """ - + self.logger = setup_logging(in_params) + logger = self.logger + logger.debug("Initializing AggStatBootstrap with parameters.") self.statistic = None self.derived_name_to_values = {} self.params = in_params @@ -90,40 +92,54 @@ def _init_out_frame(self, series_fields, series): Returns: pandas data frame """ + logger = self.logger + logger.debug("Initializing output data frame.") result = pd.DataFrame() row_number = len(series) + logger.debug(f"Number of rows to initialize: {row_number}") # fill series variables and values for field_ind, field in enumerate(series_fields): result[field] = [row[field_ind] for row in series] - + logger.debug(f"Field '{field}' initialized with {len(result[field])} entries.") # fill the stats and CI values placeholders with None result['fcst_var'] = [None] * row_number result['stat_value'] = [None] * row_number result['stat_btcl'] = [None] * row_number result['stat_btcu'] = [None] * row_number result['nstats'] = [None] * row_number + + logger.debug("Stats and confidence interval placeholders added.") + logger.debug(f"DataFrame initialized with columns: {result.columns.tolist()}") + return result def _proceed_with_axis(self, axis="1"): + + logger = self.logger + logger.info(f"Proceeding with axis: {axis}") if not self.input_data.empty: # identify all possible points values by adding series values, indy values # and statistics and then permute them + logger.debug("Input data is not empty. Proceeding with calculations.") indy_vals = self.params['indy_vals'] series_val = self.params['series_val_' + axis] all_fields_values = series_val.copy() all_fields_values[self.params['indy_var']] = indy_vals all_fields_values['stat_name'] = self.params['list_stat_' + axis] all_points = list(itertools.product(*all_fields_values.values())) + logger.debug(f"All points generated: {len(all_points)} points created for axis {axis}.") fcst_var = None if len(self.params['fcst_var_val_' + axis]) > 0 and 'fcst_var' in self.input_data.columns: fcst_var = list(self.params['fcst_var_val_' + axis].keys())[0] - + logger.debug(f"Forecast variable identified: {fcst_var}") cases = [] out_frame = self._init_out_frame(all_fields_values.keys(), all_points) + logger.debug(f"Output DataFrame initialized with {len(out_frame)} rows.") point_to_distrib = {} # run the bootstrap flow for each independent variable value for indy_val in indy_vals: + logger.debug(f"Processing independent value: {indy_val}") # extract the records for the current indy value if is_string_integer(indy_val): filtered_by_indy_data = \ @@ -138,6 +154,7 @@ def _proceed_with_axis(self, axis="1"): all_fields_values = series_val.copy() all_points = list(itertools.product(*all_fields_values.values())) + logger.debug(f"Number of points for independent value '{indy_val}': {len(all_points)}.") for point in all_points: all_filters = [] @@ -164,6 +181,7 @@ def _proceed_with_axis(self, axis="1"): # use numpy to select the rows where any record evaluates to True mask = np.array(all_filters).all(axis=0) point_data = filtered_by_indy_data.loc[mask] + logger.debug(f"Point data filtered for point {point}. Number of records: {len(point_data)}") # build a list of cases to sample fcst_valid = point_data.loc[:, 'fcst_valid'].astype(str) @@ -174,6 +192,7 @@ def _proceed_with_axis(self, axis="1"): # calculate bootstrap for cases for stat_upper in self.params['list_stat_' + axis]: self.statistic = stat_upper.lower() + logger.debug(f"Calculating bootstrap for statistic: {self.statistic}") for point in all_points: all_filters = [] out_frame_filter = [] @@ -198,6 +217,7 @@ def _proceed_with_axis(self, axis="1"): mask_out_frame = np.array(out_frame_filter).all(axis=0) point_data = filtered_by_indy_data.loc[mask] bootstrap_results = self._get_bootstrapped_stats(point_data, cases) + logger.debug(f"Bootstrap results calculated for point {point}: {bootstrap_results.value}") # save bootstrap results point_to_distrib[point] = bootstrap_results n_stats = len(point_data) @@ -214,19 +234,32 @@ def _proceed_with_axis(self, axis="1"): out_frame.loc[index, 'stat_btcl'] = bootstrap_results.lower_bound out_frame.loc[index, 'stat_btcu'] = bootstrap_results.upper_bound out_frame.loc[index, 'nstats'] = n_stats + logger.debug(f"Results saved to output DataFrame at index {index} for point {point}.") else: out_frame = pd.DataFrame() + logger.warning("Input data is empty. Returning an empty DataFrame.") + + logger.info(f"Completed processing for axis: {axis}") return out_frame def _get_bootstrapped_stats(self, series_data, cases): + logger = self.logger + logger.info("Starting bootstrapping process.") + + logger.debug("Sorting series data.") self.series_data = sort_data(series_data) + logger.debug(f"Data sorted. Number of rows: {len(self.series_data)}") if self.params['num_iterations'] == 1: + logger.info("Only one iteration specified. Skipping bootstrapping.") stat_val = self._calc_stats(cases)[0] + logger.debug(f"Statistic calculated: {stat_val}") results = BootstrapResults(lower_bound=None, value=stat_val, upper_bound=None) + logger.info("Statistic calculated without bootstrapping.") else: - # need bootstrapping and CI calculation in addition to statistic + # need bootstrapping and CI calculation in addition to + logger.info("Performing bootstrapping and confidence interval calculation.") try: results = bootstrap_and_value_mode( self.series_data, @@ -234,11 +267,15 @@ def _get_bootstrapped_stats(self, series_data, cases): stat_func=self._calc_stats, num_iterations=self.params['num_iterations'], num_threads=self.params['num_threads'], - ci_method=self.params['method']) - + ci_method=self.params['method'], + logger=logger) + logger.debug("Bootstrapping completed successfully.") except KeyError as err: + logger.error(f"Error during bootstrapping: {err}", exc_info=True) results = BootstrapResults(None, None, None) + logger.info("Returning empty BootstrapResults due to error.") print(err) + logger.info("Bootstrapping process completed.") return results def _calc_stats(self, cases): @@ -253,23 +290,44 @@ def _calc_stats(self, cases): an error """ + logger = self.logger func_name = f'calculate_{self.statistic}' + logger.info(f"Starting statistic calculation using function: {func_name}") if cases is not None and cases.ndim == 2: # The single value case + logger.debug("Processing single-value case.") # build a data frame with the sampled data data_cases = np.asarray(self.series_data['case']) flat_cases = cases.flatten() values = self.series_data[np.in1d(data_cases, flat_cases)].to_numpy() - stat_values = [globals()[func_name](values, self.column_names)] + logger.debug(f"Number of values selected for single case: {len(values)}") + # Calculate the statistic for each bootstrap iteration + try: + stat_value = globals()[func_name](values, self.column_names) + stat_values.append([stat_value]) + logger.info(f"Statistic calculated for bootstrap iteration: {stat_value}") + except Exception as e: + logger.error(f"Error calculating statistic for bootstrap iteration: {e}", exc_info=True) + raise + elif cases is not None and cases.ndim == 3: # bootstrapped case stat_values = [] for row in cases: values_ind = self.series_data['case'].isin(row.flatten()) values = self.series_data[values_ind] - stat_values.append([globals()[func_name](values, self.column_names)]) + logger.debug(f"Number of values selected for bootstrap iteration: {len(values)}") + # Calculate the statistic for each bootstrap iteration + try: + stat_value = globals()[func_name](values, self.column_names) + stat_values.append([stat_value]) + logger.info(f"Statistic calculated for bootstrap iteration: {stat_value}") + except Exception as e: + logger.error(f"Error calculating statistic for bootstrap iteration: {e}", exc_info=True) + raise else: + logger.error("Invalid input for cases. Cannot calculate statistic.") raise KeyError("can't calculate statistic") return stat_values @@ -277,34 +335,48 @@ def calculate_values(self): """ Performs EE if needed followed by aggregation statistic logic Writes output data to the file """ + logger = self.logger + logger.info("Starting calculation of values.") if not self.input_data.empty: + logger.debug("Input data is not empty. Proceeding with calculations.") if self.params['random_seed'] is not None and self.params['random_seed'] != 'None': + logger.debug(f"Random seed set to: {self.params['random_seed']}") np.random.seed(self.params['random_seed']) # perform EE if needed is_event_equal = parse_bool(self.params['event_equal']) if is_event_equal: + logger.info("Event equalization required. Performing event equalization.") self._perform_event_equalization() + logger.debug("Event equalization completed.") # build the case information for each record + logger.debug("Building case information for each record.") fcst_valid = self.input_data.loc[:, 'fcst_valid'].astype(str) indy_var = self.input_data.loc[:, self.params['indy_var']].astype(str) self.input_data['case'] = fcst_valid + '#' + indy_var + logger.debug("Case information added to the input data.") # get results for axis1 + logger.info("Calculating results for axis 1.") out_frame = self._proceed_with_axis("1") if self.params['series_val_2']: + logger.info("Series values for axis 2 detected. Calculating results for axis 2.") out_frame = pd.concat([out_frame, self._proceed_with_axis("2")]) + logger.debug("Results for axis 2 calculated and combined with axis 1.") else: + logger.warning("Input data is empty. Returning an empty DataFrame.") out_frame = pd.DataFrame() header = True mode = 'w' - + logger.info(f"Exporting results to {self.params['agg_stat_output']}") export_csv = out_frame.to_csv(self.params['agg_stat_output'], index=None, header=header, mode=mode, sep="\t", na_rep="NA") + logger.info("Results successfully exported to CSV.") + def _perform_event_equalization(self): """ Performs event equalisation on input data diff --git a/metcalcpy/agg_stat_eqz.py b/metcalcpy/agg_stat_eqz.py index 0102389e..fb3c38e2 100644 --- a/metcalcpy/agg_stat_eqz.py +++ b/metcalcpy/agg_stat_eqz.py @@ -41,7 +41,7 @@ from metcalcpy import GROUP_SEPARATOR from metcalcpy.event_equalize_against_values import event_equalize_against_values from metcalcpy.util.utils import parse_bool - +from metcalcpy.logging_config import setup_logging class AggStatEventEqz: """A class that performs event equalisation logic on input data @@ -58,6 +58,9 @@ class AggStatEventEqz: """ def __init__(self, in_params): + self.logger = setup_logging(in_params) + logger = self.logger + logger.debug("Initializing AggStatEventEqz with parameters.") self.params = in_params self.input_data = pd.read_csv( @@ -72,33 +75,50 @@ def __init__(self, in_params): def calculate_values(self): """Performs event equalisation if needed and saves equalized data to the file. """ + logger = self.logger is_event_equal = parse_bool(self.params['event_equal']) + logger.debug(f"Event equalization flag is set to: {is_event_equal}") # check if EE is needed if not self.input_data.empty and is_event_equal: - # read previously calculated cases - prev_cases = pd.read_csv( - self.params['agg_stat_input_ee'], - header=[0], - sep='\t' - ) + logger.info("Event equalization is required. Starting EE process.") + try: + logger.debug(f"Reading previous cases from: {self.params['agg_stat_input_ee']}") + prev_cases = pd.read_csv( + self.params['agg_stat_input_ee'], + header=[0], + sep='\t' + ) + logger.debug(f"Successfully read previous cases. Number of records: {len(prev_cases)}") + except FileNotFoundError as e: + logger.error(f"File not found: {self.params['agg_stat_input_ee']}", exc_info=True) + raise + except Exception as e: + logger.error(f"Error reading previous cases: {self.params['agg_stat_input_ee']}", exc_info=True) + raise # perform for axis 1 + logger.info("Performing event equalization for axis 1.") output_ee_data = self.perform_ee_on_axis(prev_cases, '1') + logger.debug("Event equalization for axis 1 completed.") # perform for axis 2 if self.params['series_val_2']: + logger.info("Series values for axis 2 detected. Performing event equalization for axis 2.") output_ee_data = pd.concat([output_ee_data, self.perform_ee_on_axis(prev_cases, '2')]) + logger.debug("Event equalization for axis 2 completed.") else: output_ee_data = self.input_data if self.input_data.empty: - logging.info( + logger.warning( 'Event equalisation was not performed because the input data is empty.' ) output_ee_data.to_csv(self.params['agg_stat_output'], index=None, header=True, mode='w', sep="\t", na_rep="NA") + output_file = self.params['agg_stat_output'] + logger.info(f"Data successfully saved to {output_file}.") def perform_ee_on_axis(self, prev_cases, axis='1'): """Performs event equalisation against previously calculated cases for the selected axis @@ -106,17 +126,24 @@ def perform_ee_on_axis(self, prev_cases, axis='1'): A data frame that contains equalized records """ warnings.filterwarnings('error') + logger = self.logger + logger.debug(f"Performing event equalization for axis {axis}.") output_ee_data = pd.DataFrame() for fcst_var, fcst_var_stats in self.params['fcst_var_val_' + axis].items(): - for series_var, series_var_vals in self.params['series_val_' + axis].items(): + logger.debug(f"Processing forecast variable: {fcst_var}") + for series_var, series_var_vals in self.params['series_val_' + axis].items(): + logger.debug(f"Processing series variable: {series_var}") + # remove group separator from series values series_var_vals_no_group = [] for val in series_var_vals: split_val = val.split(GROUP_SEPARATOR) series_var_vals_no_group.extend(split_val) + logger.debug(f"Series variable values (no group): {series_var_vals_no_group}") # filter input data based on fcst_var, statistic and all series variables values + logger.debug("Filtering input data for event equalization.") series_data_for_ee = self.input_data[ (self.input_data['fcst_var'] == fcst_var) & (self.input_data[series_var].isin(series_var_vals_no_group)) @@ -127,20 +154,28 @@ def perform_ee_on_axis(self, prev_cases, axis='1'): (prev_cases['fcst_var'] == fcst_var) & (prev_cases[series_var].isin(series_var_vals_no_group)) ] + logger.debug(f"Number of records after filtering input data: {len(series_data_for_ee)}") # get unique cases from filtered previous cases - + logger.debug("Filtering previous cases for event equalization.") series_data_for_prev_cases_unique = series_data_for_prev_cases['equalize'].unique() + logger.debug(f"Unique previous cases for event equalization: {len(series_data_for_prev_cases_unique)}") # perform ee + logger.info("Performing event equalization against previous cases.") series_data_after_ee = event_equalize_against_values( series_data_for_ee, - series_data_for_prev_cases_unique) + series_data_for_prev_cases_unique, logger) + logger.debug(f"Number of records after event equalization: {len(series_data_after_ee)}") # append EE data to result if output_ee_data.empty: output_ee_data = series_data_after_ee + logger.debug("Initialized output data with first set of event equalized data.") else: output_ee_data = pd.concat([output_ee_data, series_data_after_ee]) + logger.debug("Appended event equalized data to the output.") + + logger.info(f"Event equalization for axis {axis} completed. Total records: {len(output_ee_data)}") return output_ee_data diff --git a/metcalcpy/agg_stat_event_equalize.py b/metcalcpy/agg_stat_event_equalize.py index 9a5d49fc..a5d8f54c 100644 --- a/metcalcpy/agg_stat_event_equalize.py +++ b/metcalcpy/agg_stat_event_equalize.py @@ -41,6 +41,7 @@ from metcalcpy import GROUP_SEPARATOR from metcalcpy.event_equalize import event_equalize +from metcalcpy.logging_config import setup_logging class AggStatEventEqualize: """A class that performs event equalisation logic on input data @@ -56,6 +57,9 @@ class AggStatEventEqualize: """ def __init__(self, in_params): + self.logger = setup_logging(in_params) + logger = self.logger + logger.debug("Initializing AggStatEventEqualize with parameters.") self.params = in_params self.input_data = pd.read_csv( @@ -70,84 +74,110 @@ def __init__(self, in_params): def calculate_values(self): """Performs event equalisation if needed and saves equalized data to the file. """ - if not self.input_data.empty: + logger = self.logger + logger.info("Event equalization is required. Starting EE process.") + if not self.input_data.empty: + logger.info("Input data is available. Proceeding with event equalization.") # list all fixed variables if 'fixed_vars_vals_input' in self.params: + logger.info("Fixed variables detected. Preparing permutations of fixed values.") fix_vals_permuted_list = [] for key in self.params['fixed_vars_vals_input']: vals_permuted = list(itertools.product(*self.params['fixed_vars_vals_input'][key].values())) fix_vals_permuted_list.append(vals_permuted) + logger.debug(f"Fixed values for {key}: {vals_permuted}") fix_vals_permuted = [item for sublist in fix_vals_permuted_list for item in sublist] - + logger.debug(f"All fixed value permutations: {fix_vals_permuted}") else: fix_vals_permuted = [] - + logger.info("No fixed variables provided for event equalization.") # perform EE for each forecast variable on y1 axis + logger.info("Running event equalization on axis 1.") output_ee_data = self.run_ee_on_axis(fix_vals_permuted, '1') + logger.debug(f"Event equalization for axis 1 completed. Number of records: {len(output_ee_data)}") # if the second Y axis is present - run event equalizer on Y1 # and then run event equalizer on Y1 and Y2 equalized data if self.params['series_val_2']: + logger.info("Series values for axis 2 detected. Running event equalization on axis 2.") output_ee_data_2 = self.run_ee_on_axis(fix_vals_permuted, '2') - + logger.debug(f"Event equalization for axis 2 completed. Number of records: {len(output_ee_data_2)}") output_ee_data = output_ee_data.drop('equalize', axis=1) output_ee_data_2 = output_ee_data_2.drop('equalize', axis=1) + logger.info("Concatenating event equalized data from axis 1 and axis 2.") warnings.simplefilter(action='error', category=FutureWarning) all_ee_records = pd.concat([output_ee_data, output_ee_data_2]).reindex() all_series_vars = {} for key in self.params['series_val_2']: all_series_vars[key] = np.unique(self.params['series_val_2'][key] + self.params['series_val_2'][key]) + logger.debug(f"Series variable values for {key}: {all_series_vars[key]}") + logger.info("Running event equalization on all data.") output_ee_data = event_equalize(all_ee_records, self.params['indy_var'], all_series_vars, list(self.params['fixed_vars_vals_input'].keys()), fix_vals_permuted, True, - False) + False, logger) + logger.debug(f"Final event equalization completed. Number of records: {len(output_ee_data)}") else: output_ee_data = pd.DataFrame() + logger.warning("Input data is empty. No event equalization will be performed.") + header = True mode = 'w' output_ee_data.to_csv(self.params['agg_stat_output'], index=None, header=header, mode=mode, sep="\t", na_rep="NA") + logger.info(f"Data successfully saved to {output_file}.") def run_ee_on_axis(self, fix_vals_permuted, axis='1'): """Performs event equalisation against previously calculated cases for the selected axis Returns: A data frame that contains equalized records """ + logger = self.logger + logger.debug(f"Running event equalization for axis {axis}.") output_ee_data = pd.DataFrame() for series_var, series_var_vals in self.params['series_val_' + axis].items(): + logger.debug(f"Processing series variable: {series_var}") # ungroup series value series_var_vals_no_group = [] for val in series_var_vals: split_val = val.split(GROUP_SEPARATOR) series_var_vals_no_group.extend(split_val) - + logger.debug(f"Ungrouped series variable values: {series_var_vals_no_group}") # filter input data based on fcst_var, statistic and all series variables values + logger.debug("Filtering input data for event equalization.") series_data_for_ee = self.input_data[ self.input_data[series_var].isin(series_var_vals_no_group) ] + logger.debug(f"Number of records after filtering: {len(series_data_for_ee)}") + # perform EE on filtered data + logger.info(f"Performing event equalization on {series_var}.") series_data_after_ee = \ event_equalize(series_data_for_ee, self.params['indy_var'], self.params['series_val_' + axis], list(self.params['fixed_vars_vals_input'].keys()), - fix_vals_permuted, True, False) + fix_vals_permuted, True, False, logger) + logger.debug(f"Number of records after event equalization for {series_var}: {len(series_data_after_ee)}") # append EE data to result if output_ee_data.empty: output_ee_data = series_data_after_ee + logger.debug("Initialized output data with the first set of event equalized data.") else: warnings.simplefilter(action="error", category=FutureWarning) output_ee_data = pd.concat([output_ee_data, series_data_after_ee]) - + logger.debug("Appended event equalized data to the output.") + + logger.info(f"Event equalization for axis {axis} completed. Total records: {len(output_ee_data)}") return output_ee_data diff --git a/metcalcpy/bootstrap.py b/metcalcpy/bootstrap.py index f105165e..f3dbfa54 100644 --- a/metcalcpy/bootstrap.py +++ b/metcalcpy/bootstrap.py @@ -16,7 +16,7 @@ from collections.abc import Iterable import multiprocessing as _multiprocessing import scipy.sparse as _sparse - +from metcalcpy.logging_config import setup_logging __author__ = 'Tatiana Burek' __version__ = '0.1.0' @@ -102,7 +102,7 @@ def set_distributions(self, distributions): def bootstrap_and_value(values, stat_func, alpha=0.05, num_iterations=1000, iteration_batch_size=None, num_threads=1, ci_method='perc', - save_data=True, save_distributions=False, block_length: int = 1, eclv: bool = False): + save_data=True, save_distributions=False, block_length: int = 1, eclv: bool = False, logger): """Returns bootstrap estimate. Can do the independent and identically distributed (IID) or Circular Block Bootstrap (CBB) methods depending on the block_length Args: @@ -155,13 +155,13 @@ def do_division(distr): stat_func_lists, num_iterations, iteration_batch_size, - num_threads, block_length) + num_threads, block_length, logger) bootstrap_dist = do_division(*distributions) if eclv: - result = _get_confidence_interval_and_value_eclv(bootstrap_dist, stat_val, alpha, ci_method) + result = _get_confidence_interval_and_value_eclv(bootstrap_dist, stat_val, alpha, ci_method, logger) else: - result = _get_confidence_interval_and_value(bootstrap_dist, stat_val, alpha, ci_method) + result = _get_confidence_interval_and_value(bootstrap_dist, stat_val, alpha, ci_method, logger) if save_data: result.set_original_values(values) if save_distributions: @@ -170,7 +170,7 @@ def do_division(distr): def _bootstrap_distribution_cbb(values_lists, stat_func_lists, - num_iterations, iteration_batch_size, num_threads, block_length=1): + num_iterations, iteration_batch_size, num_threads, block_length=1, logger): '''Returns the simulated bootstrap distribution. The idea is to sample the same indexes in a bootstrap re-sample across all arrays passed into values_lists. @@ -205,6 +205,8 @@ def _bootstrap_distribution_cbb(values_lists, stat_func_lists, the bootsrapped values. ''' + logger.info("Starting circular block bootstrap distribution process.") + logger.debug("Validating input arrays.") _validate_arrays(values_lists) if iteration_batch_size is None: @@ -212,7 +214,7 @@ def _bootstrap_distribution_cbb(values_lists, stat_func_lists, num_iterations = int(num_iterations) iteration_batch_size = int(iteration_batch_size) - + logger.debug(f"Iteration batch size set to {iteration_batch_size}.") num_threads = int(num_threads) if num_threads == -1: @@ -225,25 +227,27 @@ def _bootstrap_distribution_cbb(values_lists, stat_func_lists, pool = _multiprocessing.Pool(num_threads) iter_per_job = _np.ceil(num_iterations * 1.0 / num_threads) + logger.debug(f"Iterations per thread: {iter_per_job}.") results = [] for seed in _np.random.randint(0, 2 ** 32 - 1, num_threads): + logger.debug(f"Starting thread with seed {seed}.") r = pool.apply_async(_bootstrap_sim_cbb, (values_lists, stat_func_lists, iter_per_job, iteration_batch_size, seed, block_length)) results.append(r) - + logger.debug("Collecting results from all threads.") results = _np.hstack([res.get() for res in results]) pool.close() - + return results def bootstrap_and_value_mode(values, cases, stat_func, alpha=0.05, num_iterations=1000, iteration_batch_size=None, num_threads=1, ci_method='perc', - save_data=True, save_distributions=False, block_length=1): + save_data=True, save_distributions=False, block_length=1, logger): """Returns bootstrap estimate. Args: values: numpy array of values to bootstrap @@ -288,26 +292,38 @@ def bootstrap_and_value_mode(values, cases, stat_func, alpha=0.05, def do_division(distr): return distr - data_cases = _np.asarray(values['case']) - flat_cases = cases.flatten() - values_current = values[_np.in1d(data_cases, flat_cases)].to_numpy() - stat_val = stat_func(values_current)[0] - distributions = _bootstrap_distribution_cbb(values_lists, - stat_func_lists, - num_iterations, - iteration_batch_size, - num_threads, block_length) - - bootstrap_dist = do_division(*distributions) - result = _get_confidence_interval_and_value(bootstrap_dist, stat_val, alpha, ci_method) - if save_data: - result.set_original_values(values) - if save_distributions: - result.set_distributions(bootstrap_dist.flatten('F')) + try: + data_cases = _np.asarray(values['case']) + flat_cases = cases.flatten() + values_current = values[_np.in1d(data_cases, flat_cases)].to_numpy() + logger.debug(f"Selected {len(values_current)} cases for calculation.") + stat_val = stat_func(values_current)[0] + logger.info(f"Calculated statistic value: {stat_val}") + distributions = _bootstrap_distribution_cbb(values_lists, + stat_func_lists, + num_iterations, + iteration_batch_size, + num_threads, block_length, logger) + logger.debug(f"Bootstrap distributions: {distributions}") + + bootstrap_dist = do_division(*distributions) + logger.debug(f"Result after division operation: {bootstrap_dist}") + result = _get_confidence_interval_and_value(bootstrap_dist, stat_val, alpha, ci_method, logger) + logger.info(f"Confidence intervals calculated: {result.lower_bound}, {result.upper_bound}") + + if save_data: + logger.debug("Saving original values to the result.") + result.set_original_values(values) + if save_distributions: + logger.debug("Saving bootstrap distributions to the result.") + result.set_distributions(bootstrap_dist.flatten('F')) + except Exception as e: + logger.error(f"An error occurred during the bootstrap and calculation process: {e}", exc_info=True) + raise return result -def _get_confidence_interval_and_value(bootstrap_dist, stat_val, alpha, ci_method): +def _get_confidence_interval_and_value(bootstrap_dist, stat_val, alpha, ci_method, logger): """Get the bootstrap confidence interval for a given distribution. Args: bootstrap_dist: numpy array of bootstrap results from @@ -323,16 +339,24 @@ def _get_confidence_interval_and_value(bootstrap_dist, stat_val, alpha, ci_metho # TODO Only percentile method for the confident intervals is implemented if stat_val is None: + logger.warning("Statistic value is None. Setting confidence interval method to 'None'.") ci_method = "None" if ci_method == 'pivotal': - low = 2 * stat_val - _np.percentile(bootstrap_dist, 100 * (1 - alpha / 2.)) + logger.info("Using pivotal method to calculate confidence intervals.") + try: + low = 2 * stat_val - _np.percentile(bootstrap_dist, 100 * (1 - alpha / 2.)) + high = 2 * stat_val - _np.percentile(bootstrap_dist, 100 * (alpha / 2.)) + except Exception as e: + logger.error(f"An error occurred during the calculation of confidence intervals: {e}", exc_info=True) + raise val = stat_val - high = 2 * stat_val - _np.percentile(bootstrap_dist, 100 * (alpha / 2.)) elif ci_method == 'perc': + logger.info("Using percentile method to calculate confidence intervals.") # check if All values of bootstrap_dist are equal and if YES - # display a warning and do not calculate CIs - like boot.ci in R if _all_the_same(bootstrap_dist): + logger.warning(f"All values of the bootstrap distribution are equal to {bootstrap_dist[0]}. Cannot calculate confidence intervals.") print(f'All values of t are equal to {bootstrap_dist[0]}. Cannot calculate confidence intervals') low = None high = None @@ -340,17 +364,20 @@ def _get_confidence_interval_and_value(bootstrap_dist, stat_val, alpha, ci_metho bd = bootstrap_dist[bootstrap_dist != _np.array([None])] low = _np.percentile(bd, 100 * (alpha / 2.), method='linear') high = _np.percentile(bd, 100 * (1 - alpha / 2.), method='linear') + logger.debug(f"Percentile method results: low={low}, stat_val={stat_val}, high={high}") val = stat_val else: + logger.warning("No valid confidence interval method selected.") low = None val = None high = None + logger.info(f"Finished confidence interval calculation: low={low}, value={val}, high={high}") return BootstrapResults(lower_bound=low, value=val, upper_bound=high) -def _get_confidence_interval_and_value_eclv(bootstrap_dist, stat_val, alpha, ci_method): +def _get_confidence_interval_and_value_eclv(bootstrap_dist, stat_val, alpha, ci_method, logger): """Get the bootstrap confidence interval for a given distribution for the Economic Cost Loss Relative Value Args: bootstrap_dist: numpy array of bootstrap results from @@ -364,13 +391,15 @@ def _get_confidence_interval_and_value_eclv(bootstrap_dist, stat_val, alpha, ci_ """ # TODO Only percentile method for the confident intervals is implemented - + logger = self.logger if stat_val is None: + logger.warning("Statistic value is None. Skipping confidence interval calculation.") val = None low = None high = None else: + logger.debug("Preparing bootstrap distribution for confidence interval calculation.") bd = bootstrap_dist[bootstrap_dist != _np.array([None])] all_values = [] for dist_member in bd: @@ -381,6 +410,8 @@ def _get_confidence_interval_and_value_eclv(bootstrap_dist, stat_val, alpha, ci_ none_in_values = len(stat_val['V']) != sum(x is not None for x in stat_val['V']) stat_btcl = [None] * steps_len stat_btcu = [None] * steps_len + logger.debug(f"Calculated steps length: {steps_len}.") + logger.debug(f"Checking if values contain None: {none_in_values}.") for ind in range(steps_len): low = None @@ -389,9 +420,12 @@ def _get_confidence_interval_and_value_eclv(bootstrap_dist, stat_val, alpha, ci_ if ci_method == 'pivotal': low = 2 * stat_val - _np.percentile(column, 100 * (1 - alpha / 2.)) high = 2 * stat_val - _np.percentile(column, 100 * (alpha / 2.)) + logger.debug(f"Pivotal method result for step {ind}: low={low}, high={high}.") elif ci_method == 'perc': + logger.info("Using percentile method for confidence interval calculation.") if _all_the_same(column): + logger.warning(f"All values of column at step {ind} are equal to {column[0]}. Skipping confidence interval calculation.") print(f'All values of t are equal to {column[0]}. Cannot calculate confidence intervals') low = None high = None @@ -406,6 +440,7 @@ def _get_confidence_interval_and_value_eclv(bootstrap_dist, stat_val, alpha, ci_ low = stat_btcl high = stat_btcu + logger.info(f"Finished confidence interval calculation: value={val}, lower_bounds={low}, upper_bounds={high}.") return BootstrapResults(lower_bound=low, value=val, upper_bound=high) @@ -421,7 +456,7 @@ def flatten(lis): def _bootstrap_sim_cbb(values_lists, stat_func_lists, num_iterations, - iteration_batch_size, seed, block_length=1): + iteration_batch_size, seed, block_length=1, logger): """Returns simulated bootstrap distribution. Can do the independent and identically distributed (IID) or Circular Block Bootstrap (CBB) methods depending on the block_length Args: @@ -448,25 +483,32 @@ def _bootstrap_sim_cbb(values_lists, stat_func_lists, num_iterations, """ if seed is not None: + logger.debug(f"Setting random seed: {seed}") _np.random.seed(seed) num_iterations = int(num_iterations) iteration_batch_size = int(iteration_batch_size) - + logger.info(f"Number of iterations: {num_iterations}, iteration batch size: {iteration_batch_size}, block length: {block_length}.") results = [[] for _ in values_lists] for rng in range(0, num_iterations, iteration_batch_size): max_rng = min(iteration_batch_size, num_iterations - rng) - - values_sims = _generate_distributions_cbb(values_lists, max_rng, block_length) - + logger.debug(f"Running bootstrap iteration batch from {rng} to {rng + max_rng}.") + try: + values_sims = _generate_distributions_cbb(values_lists, max_rng, block_length, logger) + logger.debug(f"Generated {max_rng} simulated distributions.") + except Exception as e: + logger.error(f"Error generating distributions in bootstrap: {e}", exc_info=True) + raise + for i, values_sim, stat_func in zip(range(len(values_sims)), values_sims, stat_func_lists): + logger.debug(f"Calculating statistic for distribution set {i}.") results[i].extend(stat_func(values_sim)) - + logger.info("Completed bootstrap simulation.") return _np.array(results) -def _generate_distributions_cbb(values_lists, num_iterations, block_length=1): +def _generate_distributions_cbb(values_lists, num_iterations, block_length=1, logger): values_shape = values_lists[0].shape[0] ids = _np.random.choice( values_shape, @@ -479,6 +521,7 @@ def apply_cbb(row): Applyes Circular Block Bootstrap (CBB) method to each row :param row: """ + counter = 0 init_val = row[0] for ind, val in enumerate(row): @@ -495,14 +538,16 @@ def apply_cbb(row): counter = counter + 1 if counter == block_length: # start a new block - counter = 0 + counter = return row if block_length > 1: # uss CBB + logger.info(f"Applying Circular Block Bootstrap (CBB) with block length: {block_length}") ids = _np.apply_along_axis(apply_cbb, axis=1, arr=ids) results = [values[ids] for values in values_lists] + logger.info("CBB applied to all rows.") return results @@ -525,29 +570,37 @@ def _all_the_same(elements): return result -def _validate_arrays(values_lists): +def _validate_arrays(values_lists, logger): + logger = self.logger t = values_lists[0] t_type = type(t) + logger.debug(f"Validating arrays. First array type: {t_type}, shape: {t.shape}") if not isinstance(t, _sparse.csr_matrix) and not isinstance(t, _np.ndarray): + logger.error("Arrays must either be of type scipy.sparse.csr_matrix or numpy.array.") raise ValueError(('The arrays must either be of type ' 'scipy.sparse.csr_matrix or numpy.array')) for _, values in enumerate(values_lists[1:]): if not isinstance(values, t_type): + logger.error(f"Array at index {index} is not of the same type as the first array.") raise ValueError('The arrays must all be of the same type') if t.shape != values.shape: + logger.error(f"Array at index {index} has a different shape: {values.shape}. Expected: {t.shape}.") raise ValueError('The arrays must all be of the same shape') if isinstance(t, _sparse.csr_matrix): if values.shape[0] > 1: + logger.error("Sparse matrix must have shape 1 row X N columns.") raise ValueError(('The sparse matrix must have shape 1 row X N' ' columns')) if isinstance(t, _sparse.csr_matrix): if _needs_sparse_unification(values_lists): + logger.error("Non-zero entries in the sparse arrays are not aligned.") raise ValueError(('The non-zero entries in the sparse arrays' ' must be aligned')) + logger.info("Array validation completed successfully.") def _needs_sparse_unification(values_lists): diff --git a/metcalcpy/calc_difficulty_index.py b/metcalcpy/calc_difficulty_index.py index 5e6de792..8c544a4b 100644 --- a/metcalcpy/calc_difficulty_index.py +++ b/metcalcpy/calc_difficulty_index.py @@ -31,7 +31,19 @@ # Only allow 2D fields for now FIELD_DIM = 2 -def _input_check(sigmaij, muij, threshold, fieldijn, sigma_over_mu_ref, under_factor): +def safe_log(logger, log_method, message): + """ + Safely logs a message using the provided logger and log method. + + Args: + logger (logging.Logger): The logger object. If None, the message will not be logged. + log_method (callable): The logging method to use (e.g., logger.info, logger.debug). + message (str): The message to log. + """ + if logger: + log_method(message) + +def _input_check(sigmaij, muij, threshold, fieldijn, sigma_over_mu_ref, under_factor, logger): """ Check for valid input to _difficulty_index. @@ -55,22 +67,33 @@ def _input_check(sigmaij, muij, threshold, fieldijn, sigma_over_mu_ref, under_fa None. """ + safe_log(logger, logger.debug,f"Checking input parameters: sigmaij shape: {np.shape(sigmaij)}, " + f"muij type: {type(muij)}, threshold: {threshold}, " + f"fieldijn shape: {np.shape(fieldijn)}, sigma_over_mu_ref: {sigma_over_mu_ref}, " + f"under_factor: {under_factor}") assert isinstance(threshold, (int, float, np.int32, np.float32)) + safe_log(logger, logger.debug, f"Threshold type is valid: {type(threshold)}") assert np.ndim(sigmaij) == FIELD_DIM + safe_log(logger, logger.debug, f"sigmaij is {FIELD_DIM}D as expected.") assert np.all(sigmaij) >= EPS fieldshape = np.shape(fieldijn) # muij is a scalar or 2D array if isinstance(muij, np.ndarray): # If muij is an array, it must have the same shape as sigmaij assert np.shape(muij) == np.shape(sigmaij) + safe_log(logger, logger.debug, "muij is a valid array and matches the shape of sigmaij.") + else: + safe_log(logger, logger.debug, "muij is a scalar.") assert sigma_over_mu_ref >= EPS + safe_log(logger, logger.debug, "sigma_over_mu_ref is valid.") assert np.shape(sigmaij) == tuple(np.squeeze(fieldshape[0:-1])) + safe_log(logger, logger.debug, "sigmaij and fieldijn shapes are compatible.") assert isinstance(under_factor, (int, float, np.int32, np.float32)) assert 0.0 <= under_factor <= 1.0 + safe_log(logger, logger.debug, "under_factor is valid and within range.") - -def _difficulty_index(sigmaij, muij, threshold, fieldijn, Aplin, sigma_over_mu_ref=EPS, under_factor=0.5): +def _difficulty_index(sigmaij, muij, threshold, fieldijn, Aplin, sigma_over_mu_ref=EPS, under_factor=0.5, logger): """ Calculates version 6.1 of forecast difficulty index. The threshold terms all penalize equal (or slightly unequal) spread. @@ -100,39 +123,45 @@ def _difficulty_index(sigmaij, muij, threshold, fieldijn, Aplin, sigma_over_mu_r means more difficult. """ + safe_log(logger, logger.debug, f"Checking input: sigmaij shape: {sigmaij.shape}, muij shape: {muij.shape if isinstance(muij, np.ndarray) else 'scalar'}, " + f"threshold: {threshold}, fieldijn shape: {fieldijn.shape}, sigma_over_mu_ref: {sigma_over_mu_ref}, under_factor: {under_factor}") # Check for valid input - _input_check(sigmaij, muij, threshold, fieldijn, sigma_over_mu_ref, under_factor) - + _input_check(sigmaij, muij, threshold, fieldijn, sigma_over_mu_ref, under_factor, safe_log(logger, logger) + safe_log(logger, logger.debug, "Input check passed successfully.") # Variance term in range 0 to 1 + safe_log(logger, logger.debug, "Calculating variance term.") sigma_over_mu = sigmaij / muij sigma_over_mu_max = np.nanmax(sigma_over_mu) + safe_log(logger, logger.debug, f"sigma_over_mu_max: {sigma_over_mu_max}") # Force reference value to be greater than current max of sigmaij / muij sigma_over_mu_ref = np.nanmax([sigma_over_mu_ref, sigma_over_mu_max]) variance_term = sigma_over_mu / sigma_over_mu_ref + safe_log(logger, logger.debug, f"variance_term calculated, max variance_term: {np.nanmax(variance_term)}") # Depends on under_factor. under_threshold_count =\ np.ma.masked_greater_equal(fieldijn, threshold).count(axis=-1) nmembers = np.shape(fieldijn)[-1] under_prob = under_threshold_count / nmembers - + safe_log(logger, logger.debug, f"under_threshold_count: {under_threshold_count}, under_prob: {under_prob}") # Threshold term in range 0 to 1 threshold_term = 1.0 - np.abs(1.0 - under_factor - under_prob) + safe_log(logger, logger.debug, f"threshold_term: {threshold_term}") # Linear taper factor taper_term = Aplin.values(muij) - + safe_log(logger, logger.debug, f"taper_term: {taper_term}") # Difficulty index is the average of the two terms # multiplied by a linear taper factor dij = 0.5 * taper_term * (variance_term + threshold_term) - + safe_log(logger, logger.info, f"Difficulty index calculation complete. Max dij: {np.nanmax(dij)}, Min dij: {np.nanmin(dij)}") return dij def forecast_difficulty(sigmaij, muij, threshold, fieldijn, - Aplin, sigma_over_mu_ref=EPS): + Aplin, sigma_over_mu_ref=EPS, logger): """ Calls private function _difficulty_index, to calculate version (v6.1) of forecast difficulty index. @@ -160,7 +189,10 @@ def forecast_difficulty(sigmaij, muij, threshold, fieldijn, Larger (> 0.5) means more difficult. """ + safe_log(logger, logger.debug, f"sigmaij shape: {sigmaij.shape}, muij: {'scalar' if isinstance(muij, float) else muij.shape}, " + f"threshold: {threshold}, fieldijn shape: {fieldijn.shape}, sigma_over_mu_ref: {sigma_over_mu_ref}") if Aplin is None: + safe_log(logger, logger.info, "No Aplin provided. Creating default Aplin object (version 6.1).") # Default to envelope version 6.1 xunits="feet" A6_1_name = "A6_1" @@ -171,9 +203,12 @@ def forecast_difficulty(sigmaij, muij, threshold, fieldijn, Aplin =\ plin(A6_1_xlist, A6_1_ylist, xunits=xunits, right=A6_1_right, left=A6_1_left, - name=A6_1_name) + name=A6_1_name, logger=logger) + safe_log(logger, logger.debug, "Default Aplin object created.") + safe_log(logger, logger.debug, "Calling _difficulty_index function.") dij = _difficulty_index(sigmaij, muij, threshold, fieldijn, - Aplin, sigma_over_mu_ref) + Aplin, sigma_over_mu_ref, safe_log(logger, logger) + safe_log(logger, logger.info, "Forecast difficulty index calculation completed.") return dij if __name__ == "__main__": diff --git a/metcalcpy/event_equalize.py b/metcalcpy/event_equalize.py index dfb172b6..973e5e14 100644 --- a/metcalcpy/event_equalize.py +++ b/metcalcpy/event_equalize.py @@ -21,10 +21,22 @@ import pandas as pd from metcalcpy import GROUP_SEPARATOR, DATE_TIME_REGEX +from metcalcpy.logging_config import setup_logging +def safe_log(logger, log_method, message): + """ + Safely logs a message using the provided logger and log method. + + Args: + logger (logging.Logger): The logger object. If None, the message will not be logged. + log_method (callable): The logging method to use (e.g., logger.info, logger.debug). + message (str): The message to log. + """ + if logger: + log_method(message) def event_equalize(series_data, indy_var, series_var_vals, fix_vars, - fix_vals_permuted, equalize_by_indep, multi): + fix_vals_permuted, equalize_by_indep, multi, logger): """Performs event equalisation. event_equalize assumes that the input series_data contains data indexed by fcst_valid_beg, @@ -49,6 +61,7 @@ def event_equalize(series_data, indy_var, series_var_vals, fix_vars, A data frame that contains equalized records """ pd.options.mode.chained_assignment = None + safe_log(logger, logger.info, "Starting event equalization.") column_names = list(series_data) exception_columns = ["", "fcst_valid_beg", 'fcst_lead', 'fcst_valid', 'fcst_init', 'fcst_init_beg', 'VALID', 'LEAD'] if isinstance(fix_vars, str): @@ -62,11 +75,14 @@ def event_equalize(series_data, indy_var, series_var_vals, fix_vars, series_data['equalize'] = series_data.loc[:, 'fcst_valid'].astype(str) + ' ' \ + series_data.loc[:, 'fcst_lead'].astype(str) + safe_log(logger, logger.debug, "Equalization column added to the data frame.") + # add independent variable if needed if equalize_by_indep and indy_var not in exception_columns: series_data['equalize'] = series_data.loc[:, 'equalize'].astype(str) + " " \ + series_data.loc[:, indy_var].astype(str) + safe_log(logger, logger.debug, "Equalization column added to the data frame.") vars_for_ee = dict() @@ -83,6 +99,7 @@ def event_equalize(series_data, indy_var, series_var_vals, fix_vars, actual_vals = series_val.split(GROUP_SEPARATOR) series_vals_no_groups.extend(actual_vals) vars_for_ee[series_var] = series_vals_no_groups + safe_log(logger, logger.debug, f"Series variables added for equalization: {vars_for_ee}") # add fixed variables if present if fix_vars: @@ -92,6 +109,7 @@ def event_equalize(series_data, indy_var, series_var_vals, fix_vars, if isinstance(vals, str): vals = [vals] vars_for_ee[fix_var] = vals + safe_log(logger, logger.debug, f"Fixed variables added for equalization: {vars_for_ee}") # create a list of permutations representing the all variables for vars_for_ee_permuted = list(itertools.product(*vars_for_ee.values())) @@ -116,6 +134,7 @@ def event_equalize(series_data, indy_var, series_var_vals, fix_vars, # if the list contains repetitive values, show a warning if multi is False and len(permutation_data['equalize']) \ != len(set(permutation_data['equalize'])): + safe_log(logger, logger.warning, f"Non-unique events detected for permutation {permutation}.") print( f"WARNING: eventEqualize() detected non-unique events for {permutation}" f" using [fcst_valid_beg,fcst_lead)]") @@ -143,6 +162,7 @@ def event_equalize(series_data, indy_var, series_var_vals, fix_vars, discarded_cases = pd.concat([discarded_cases, permutation_cases_not_in_common_cases]) # report the discarded records for discarded_case in discarded_cases: + safe_log(logger, logger.warning, f"Discarding series member with case {discarded_case} for {permutation}") print(f"WARNING: discarding series member with case {discarded_case}" f" for {permutation}") @@ -156,6 +176,7 @@ def event_equalize(series_data, indy_var, series_var_vals, fix_vars, series_data_ee = series_data[equalization_cases_ind] if len(series_data_ee) != len(series_data): + safe_log(logger, logger.warning, f"Event equalization removed {len(series_data) - len(series_data_ee)} rows.") print(f"WARNING: event equalization removed {len(series_data) - len(series_data_ee)} rows") return series_data_ee diff --git a/metcalcpy/event_equalize_against_values.py b/metcalcpy/event_equalize_against_values.py index 6681ffae..455d789d 100644 --- a/metcalcpy/event_equalize_against_values.py +++ b/metcalcpy/event_equalize_against_values.py @@ -17,8 +17,19 @@ __author__ = 'Tatiana Burek' __version__ = '0.1.0' - -def event_equalize_against_values(series_data, input_unique_cases): +def safe_log(logger, log_method, message): + """ + Safely logs a message using the provided logger and log method. + + Args: + logger (logging.Logger): The logger object. If None, the message will not be logged. + log_method (callable): The logging method to use (e.g., logger.info, logger.debug). + message (str): The message to log. + """ + if logger: + log_method(message) + +def event_equalize_against_values(series_data, input_unique_cases, logger): """Performs event equalisation. event_equalize_against_values assumes that the input series_data contains data @@ -38,6 +49,8 @@ def event_equalize_against_values(series_data, input_unique_cases): warning_remove = "WARNING: event equalization removed {} rows" + safe_log(logger, logger.info, "Starting event equalization.") + column_names = list(series_data) if 'fcst_valid' in column_names: @@ -48,6 +61,7 @@ def event_equalize_against_values(series_data, input_unique_cases): + ' ' + series_data['fcst_lead'].astype(str)) else: + safe_log(logger, logger.warning, "WARNING: eventEqualize() did not run due to lack of valid time field.") print("WARNING: eventEqualize() did not run due to lack of valid time field") return pd.DataFrame() @@ -55,13 +69,16 @@ def event_equalize_against_values(series_data, input_unique_cases): data_for_unique_cases = series_data[(series_data['equalize'].isin(input_unique_cases))] n_row_cases = len(data_for_unique_cases) if n_row_cases == 0: + safe_log(logger, logger.warning, "WARNING: discarding all members. No matching cases found.") print(" WARNING: discarding all members") return pd.DataFrame() n_row_ = len(series_data) if n_row_cases != n_row_: + safe_log(logger, logger.warning, warning_remove.format(n_row_ - n_row_cases)) print(warning_remove.format(n_row_ - n_row_cases)) # remove 'equalize' column data_for_unique_cases = data_for_unique_cases.drop(['equalize'], axis=1) + safe_log(logger, logger.info, "Event equalization completed successfully.") return data_for_unique_cases diff --git a/metcalcpy/logging_config.py b/metcalcpy/logging_config.py index dd0df94a..67390e2d 100644 --- a/metcalcpy/logging_config.py +++ b/metcalcpy/logging_config.py @@ -55,7 +55,7 @@ def setup_logging(config_params): # Set log level from YAML or use default; convert to appropriate logging level log_level = config_params.get('log_level') # No default here, expect it from YAML if not log_level: - log_level = 'INFO' # Set default only if not provided + log_level = 'WARNING' # Set default only if not provided log_level = log_level.upper() diff --git a/metcalcpy/piecewise_linear.py b/metcalcpy/piecewise_linear.py index 5fd31bee..f13ee9dd 100644 --- a/metcalcpy/piecewise_linear.py +++ b/metcalcpy/piecewise_linear.py @@ -36,15 +36,20 @@ class PiecewiseLinear(): """ def __init__(self, x_domain, y_range, xunits='feet', - left=np.nan, right=np.nan, name=""): + left=np.nan, right=np.nan, name="", logger): + + self.logger = logger len_x = len(x_domain) if len_x < 2: + logger.error('Length of x_domain must be at least 2.') raise IncompatibleLengths('Length of xdomain must be at least 2.') if np.any(np.diff(x_domain)) < 0: + logger.error(f"X_domain (in {xunits}) is {x_domain}") print("X_domain (in {}) is {}".format(xunits, x_domain)) raise UnsortedArray('Xdomain must be sorted in ascending order.') len_y = len(y_range) if len_x != len_y: + self.logger.error("X_domain and Y_range must have the same length.") raise IncompatibleLengths('X_domain and Y_range must have same ' + 'length.\n Use left and right to set ' + 'value for points outside the x_domain\n') diff --git a/metcalcpy/scorecard.py b/metcalcpy/scorecard.py index f128f552..c6c403a0 100644 --- a/metcalcpy/scorecard.py +++ b/metcalcpy/scorecard.py @@ -34,7 +34,6 @@ from typing import Union import pandas as pd import yaml -import logging import argparse import sys import itertools @@ -51,6 +50,7 @@ from metcalcpy.util.utils import intersection, get_derived_curve_name, \ is_derived_point, is_string_integer, OPERATION_TO_SIGN, calc_derived_curve_value, \ perfect_score_adjustment, sort_data, PRECISION, DerivedCurveComponent, is_string_strictly_float +from metcalcpy.logging_config import setup_logging COLUMNS_TO_REMOVE = ['equalize', 'stat_ncl', 'stat_ncu', 'stat_bcl', 'stat_bcu', 'fcst_valid_beg', 'fcst_init_beg'] @@ -78,6 +78,9 @@ def __init__(self, in_params): or doesn't have data """ + self.logger = setup_logging(in_params) + logger = self.logger + logger.debug("Initializing Scorecard with parameters.") self.params = in_params self.derived_name_to_values = {} # import pandas @@ -89,7 +92,7 @@ def __init__(self, in_params): ) if self.input_data.empty: - logging.warning('The input data is empty. The empty output file was created') + logger.warning('The input data is empty. The empty output file was created') export_csv = self.input_data.to_csv(self.params['sum_stat_output'], index=None, header=True, mode="w", sep="\t") @@ -99,7 +102,7 @@ def __init__(self, in_params): except pd.errors.EmptyDataError: raise except KeyError as ex: - logging.error('Parameter with key %s is missing', ex) + logger.error('Parameter with key %s is missing', ex) raise self.group_to_value = {} @@ -111,10 +114,14 @@ def calculate_scorecard_data(self): Saves the data into the file """ + logger = self.logger + logger.debug("Calculating Scorecard data.") # identify all possible points values by adding series values, indy values # and statistics and then permute them if self.input_data.empty: + logger.warning("Input data is empty. Returning an empty DataFrame.") return pd.DataFrame() + logger.info("Permuting all possible point values for series and independent variables.") series_val = self.params['series_val_1'] all_fields_values = series_val.copy() @@ -123,7 +130,10 @@ def calculate_scorecard_data(self): if indy_vals: all_fields_values[self.params['indy_var']] = indy_vals all_points = list(itertools.product(*all_fields_values.values())) + logger.debug(f"Generated {len(all_points)} points for calculation.") + if self.params['derived_series_1']: + logger.info("Adding derived points for calculation.") # identifies and add all possible derived points values all_points.extend(self._get_derived_points(series_val, indy_vals)) @@ -131,6 +141,7 @@ def calculate_scorecard_data(self): derived_frame = None # for each point for point_ind, point in enumerate(all_points): + logger.debug(f"Processing point {point_ind + 1} / {len(all_points)}: {point}") is_derived = is_derived_point(point) if not is_derived: # only get the data for each point - no calculations needed @@ -176,6 +187,7 @@ def calculate_scorecard_data(self): # print the result to file if derived_frame is not None: + logger.info(f"Writing results to {self.params['sum_stat_output']}.") header = True mode = 'w' if 'append_to_file' in self.params.keys() and self.params['append_to_file'] == 'True': @@ -184,6 +196,9 @@ def calculate_scorecard_data(self): export_csv = derived_frame.to_csv(self.params['sum_stat_output'], index=None, header=header, mode=mode, sep="\t", na_rep="NA", float_format='%.' + str(PRECISION) + 'f') + logger.info("Results successfully written to file.") + else: + logger.warning("No results to write.") def _get_stats_for_derived(self, series, series_to_data) -> Union[DataFrame, None]: """ Calculates derived statistic value for input data @@ -198,17 +213,22 @@ def _get_stats_for_derived(self, series, series_to_data) -> Union[DataFrame, Non DataFrame containing 1 row with the resulting data or None """ - + logger = self.logger + logger.debug(f"Calculating derived statistic for series {series}.") # get derived name derived_name = '' for operation in OPERATION_TO_SIGN: for point_component in series: if point_component.startswith((operation + '(', operation + ' (')): derived_name = point_component + logger.debug(f"Derived name found: {derived_name}") break # find all components for the 1st and 2nd series derived_curve_component = self.derived_name_to_values[derived_name] + if not derived_curve_component: + logger.error(f"No derived curve component found for {derived_name}") + return None permute_for_first_series = derived_curve_component.first_component.copy() for series_comp in series[1:]: if series_comp not in permute_for_first_series: @@ -229,6 +249,9 @@ def _get_stats_for_derived(self, series, series_to_data) -> Union[DataFrame, Non if perm in self.group_to_value: permute_for_second_series[i] = self.group_to_value[perm] + logger.debug(f"Permuted components for first series: {permute_for_first_series}") + logger.debug(f"Permuted components for second series: {permute_for_second_series}") + ds_1 = None ds_2 = None @@ -243,6 +266,7 @@ def _get_stats_for_derived(self, series, series_to_data) -> Union[DataFrame, Non if ds_1.values is None or ds_2.values is None \ or ds_1.values.size == 0 or ds_2.values.size == 0: + logger.error("One or both series are missing data. Unable to calculate derived statistic.") return None # validate data @@ -266,9 +290,11 @@ def _get_stats_for_derived(self, series, series_to_data) -> Union[DataFrame, Non # find the number of unique combinations unique_date_size = len(set(map(tuple, date_lead_stat))) except TypeError as err: + logger.error(f"Error during validation: {err}") print(err) unique_date_size = [] if unique_date_size != len(ds_1.values): + logger.error("Derived curve can't be calculated due to multiple values for one valid date/fcst_lead.") raise NameError("Derived curve can't be calculated." " Multiple values for one valid date/fcst_lead") @@ -282,12 +308,14 @@ def _get_stats_for_derived(self, series, series_to_data) -> Union[DataFrame, Non # calculate derived statistic based on the operation and stat_flag derived_stat = None if derived_curve_component.derived_operation == 'DIFF_SIG': + logger.debug("Calculating DIFF_SIG.") if self.params['stat_flag'] == 'EMC': derived_stat = self._calculate_diff_sig_emc(stat_values_1, stat_values_2) else: derived_stat = self._calculate_diff_sig_ncar(stat_values_1, stat_values_2) elif derived_curve_component.derived_operation == 'DIFF': + logger.debug("Calculating DIFF.") # perform the derived operation derived_stat_list = calc_derived_curve_value( stat_values_1, @@ -296,6 +324,7 @@ def _get_stats_for_derived(self, series, series_to_data) -> Union[DataFrame, Non derived_stat = statistics.mean(derived_stat_list) elif derived_curve_component.derived_operation == 'SINGLE': + logger.debug("Calculating SINGLE.") derived_stat = statistics.mean(stat_values_1) # create dataframe from teh 1st row of the original data @@ -321,6 +350,7 @@ def _get_stats_for_derived(self, series, series_to_data) -> Union[DataFrame, Non # set model df.at[0, 'model'] = derived_name + logger.info(f"Derived statistic for {derived_name} calculated successfully.") return df def _calculate_diff_sig_ncar(self, ds_1_values, ds_2_values) -> float: @@ -332,17 +362,33 @@ def _calculate_diff_sig_ncar(self, ds_1_values, ds_2_values) -> float: Returns: the difference significance """ - # perform the derived operation - derived_stat_list = calc_derived_curve_value(ds_1_values, ds_2_values, 'DIFF_SIG') - avg = statistics.mean(derived_stat_list) - sdv = statistics.stdev(derived_stat_list) - total = len(derived_stat_list) - t = avg / (sdv / np.sqrt(total)) - p_val = 1 - 2 * pt(abs(t), total - 1, lower_tail=False) - derived_stat = perfect_score_adjustment(statistics.mean(ds_1_values), - statistics.mean(ds_2_values), - self.params['list_stat_1'][0], - p_val) + logger = self.logger + logger.debug("Calculating DIFF_SIG.") + + # Check if input arrays are not empty + if not ds_1_values or not ds_2_values: + logger.error("Input arrays for DIFF_SIG calculation are empty.") + raise ValueError("Input arrays for DIFF_SIG calculation must not be empty.") + + try: + logger.debug(f"Calculating derived statistics between ds_1: {ds_1_values} and ds_2: {ds_2_values}.") + # perform the derived operation + derived_stat_list = calc_derived_curve_value(ds_1_values, ds_2_values, 'DIFF_SIG') + avg = statistics.mean(derived_stat_list) + sdv = statistics.stdev(derived_stat_list) + total = len(derived_stat_list) + logger.debug(f"Derived statistics - Avg: {avg}, Stdev: {sdv}, Total: {total}.") + t = avg / (sdv / np.sqrt(total)) + p_val = 1 - 2 * pt(abs(t), total - 1, lower_tail=False) + logger.debug(f"T-statistic: {t}, p-value: {p_val}.") + derived_stat = perfect_score_adjustment(statistics.mean(ds_1_values), + statistics.mean(ds_2_values), + self.params['list_stat_1'][0], + p_val) + logger.info(f"DIFF_SIG calculation completed successfully. Result: {derived_stat}") + except Exception as err: + logger.error(f"Error during DIFF_SIG calculation: {err}") + raise err return derived_stat def _calculate_diff_sig_emc(self, ds_1_values, ds_2_values) -> float: @@ -354,67 +400,80 @@ def _calculate_diff_sig_emc(self, ds_1_values, ds_2_values) -> float: Returns: the difference significance """ - derived_stat = None - values_1 = np.array(ds_1_values) - values_2 = np.array(ds_2_values) - val2_minus_val1 = np.subtract(values_2, values_1) - acdm = sum(val2_minus_val1) / self.params['ndays'] - acdm_list = [acdm] * len(values_1) - std = math.sqrt(sum(np.subtract(val2_minus_val1, acdm_list) * np.subtract(val2_minus_val1, acdm_list)) / - self.params['ndays']) - nsz = len(ds_1_values) - intvl = round(1.960 * std / math.sqrt(nsz - 1), 6) - mean1 = round(statistics.mean(values_1), 6) - mean2 = round(statistics.mean(values_2), 6) - if self.params['list_stat_1'][0].startswith('ME') or self.params['list_stat_1'][0].startswith('BIAS'): - ds = (abs(mean2 - mean1)) / intvl - sss = abs(mean2) - abs(mean1) - if sss is not None and sss < 0: - ds = (-1) * ds - elif self.params['list_stat_1'][0].startswith('RMSE') \ - or self.params['list_stat_1'][0].startswith('RMSVE'): - ds = (mean2 - mean1) / intvl - else: - ds = (mean1 - mean2) / intvl - if ds is not None: - ds = round(ds, 3) - if self.params['ndays'] >= 80: - alpha1 = 1.960 # 95% confidence level - alpha2 = 2.576 # 99% confidence level - alpha3 = 3.291 # 99.9% confidence level - elif self.params['ndays'] >= 40 and self.params['ndays'] < 80: - alpha1 = 2.0 # 95% confidence level - alpha2 = 2.66 # 99% confidence level - alpha3 = 3.46 # 99.9% confidence level - elif self.params['ndays'] >= 20 and self.params['ndays'] < 40: - alpha1 = 2.042 # 95% confidence level - alpha2 = 2.75 # 99% confidence level - alpha3 = 3.646 # 99.9% confidence level - elif self.params['ndays'] < 20: - alpha1 = 2.228 # 95% confidence level - alpha2 = 3.169 # 99% confidence level - alpha3 = 4.587 # 99.9% confidence level - ds95 = ds - ds99 = ds * alpha1 / alpha2 - ds99 = round(ds99, 3) - ds999 = ds * alpha1 / alpha3; - ds999 = round(ds999, 3) - if ds999 >= 1: - derived_stat = 1 - elif ds999 < 1 and ds99 >= 1: - derived_stat = 0.99 - elif ds99 < 1 and ds95 >= 1: - derived_stat = 0.95 - elif ds95 > -1 and ds95 < 1: - derived_stat = 0 - elif ds95 <= -1 and ds99 > -1: - derived_stat = -0.95 - elif ds99 <= -1 and ds999 > -1: - derived_stat = -0.99 - elif ds999 <= -1 and ds999 > -100.0: - derived_stat = -1 - elif ds999 < -100.0: - derived_stat = -1 + logger = self.logger + logger.debug("Starting DIFF_SIG EMC calculation.") + try: + # perform the derived operation + derived_stat = None + values_1 = np.array(ds_1_values) + values_2 = np.array(ds_2_values) + val2_minus_val1 = np.subtract(values_2, values_1) + logger.debug(f"Values 1: {values_1}, Values 2: {values_2}, Difference: {val2_minus_val1}") + acdm = sum(val2_minus_val1) / self.params['ndays'] + acdm_list = [acdm] * len(values_1) + std = math.sqrt(sum(np.subtract(val2_minus_val1, acdm_list) * np.subtract(val2_minus_val1, acdm_list)) / + self.params['ndays']) + logger.debug(f"ACDM: {acdm}, Standard Deviation: {std}") + nsz = len(ds_1_values) + intvl = round(1.960 * std / math.sqrt(nsz - 1), 6) + mean1 = round(statistics.mean(values_1), 6) + mean2 = round(statistics.mean(values_2), 6) + logger.debug(f"Mean 1: {mean1}, Mean 2: {mean2}, Interval: {intvl}") + if self.params['list_stat_1'][0].startswith('ME') or self.params['list_stat_1'][0].startswith('BIAS'): + ds = (abs(mean2 - mean1)) / intvl + sss = abs(mean2) - abs(mean1) + if sss is not None and sss < 0: + ds = (-1) * ds + elif self.params['list_stat_1'][0].startswith('RMSE') \ + or self.params['list_stat_1'][0].startswith('RMSVE'): + ds = (mean2 - mean1) / intvl + else: + ds = (mean1 - mean2) / intvl + if ds is not None: + ds = round(ds, 3) + logger.debug(f"DS value before significance thresholding: {ds}") + if self.params['ndays'] >= 80: + alpha1 = 1.960 # 95% confidence level + alpha2 = 2.576 # 99% confidence level + alpha3 = 3.291 # 99.9% confidence level + elif self.params['ndays'] >= 40 and self.params['ndays'] < 80: + alpha1 = 2.0 # 95% confidence level + alpha2 = 2.66 # 99% confidence level + alpha3 = 3.46 # 99.9% confidence level + elif self.params['ndays'] >= 20 and self.params['ndays'] < 40: + alpha1 = 2.042 # 95% confidence level + alpha2 = 2.75 # 99% confidence level + alpha3 = 3.646 # 99.9% confidence level + elif self.params['ndays'] < 20: + alpha1 = 2.228 # 95% confidence level + alpha2 = 3.169 # 99% confidence level + alpha3 = 4.587 # 99.9% confidence level + ds95 = ds + ds99 = ds * alpha1 / alpha2 + ds99 = round(ds99, 3) + ds999 = ds * alpha1 / alpha3; + ds999 = round(ds999, 3) + logger.debug(f"DS95: {ds95}, DS99: {ds99}, DS999: {ds999}") + if ds999 >= 1: + derived_stat = 1 + elif ds999 < 1 and ds99 >= 1: + derived_stat = 0.99 + elif ds99 < 1 and ds95 >= 1: + derived_stat = 0.95 + elif ds95 > -1 and ds95 < 1: + derived_stat = 0 + elif ds95 <= -1 and ds99 > -1: + derived_stat = -0.95 + elif ds99 <= -1 and ds999 > -1: + derived_stat = -0.99 + elif ds999 <= -1 and ds999 > -100.0: + derived_stat = -1 + elif ds999 < -100.0: + derived_stat = -1 + except Exception as err: + logger.error(f"Error during EMC calculation: {err}") + raise err + logger.info(f"EMC DIFF_SIG calculation completed. Derived Statistic: {derived_stat}") return derived_stat def _get_derived_points(self, series_val, indy_vals): @@ -426,11 +485,13 @@ def _get_derived_points(self, series_val, indy_vals): Returns: a list of all possible values for each derived points """ - + logger = self.logger + logger.debug("Starting to calculate derived points.") # for each derived series result = [] for derived_serie in self.params['derived_series_1']: # series 1 components + logger.debug(f"Processing derived series: {derived_serie}") ds_1 = derived_serie[0].split(' ') # series 2 components @@ -440,6 +501,7 @@ def _get_derived_points(self, series_val, indy_vals): for ind, name in enumerate(ds_1): if name != ds_2[ind]: series_var_vals = (name, ds_2[ind]) + logger.debug(f"Identified series variable values: {series_var_vals}") break series_var = list(series_val.keys())[-1] @@ -447,6 +509,7 @@ def _get_derived_points(self, series_val, indy_vals): for var in series_val.keys(): if all(elem in series_val[var] for elem in series_var_vals): series_var = var + logger.debug(f"Matched series variable: {series_var}") break derived_val = series_val.copy() @@ -459,6 +522,7 @@ def _get_derived_points(self, series_val, indy_vals): derived_val[var] = intersection(derived_val[var], ds_1) derived_curve_name = get_derived_curve_name(derived_serie) + logger.debug(f"Derived curve name: {derived_curve_name}") derived_val[series_var] = [derived_curve_name] if len(indy_vals) > 0: derived_val[self.params['indy_var']] = indy_vals @@ -470,7 +534,8 @@ def _get_derived_points(self, series_val, indy_vals): else: derived_val['stat_name'] = [ds_1[-1] + "," + ds_2[-1]] result.append(list(itertools.product(*derived_val.values()))) - + + logger.debug(f"Total derived points calculated: {len(result)}") return [y for x in result for y in x] diff --git a/metcalcpy/sum_stat.py b/metcalcpy/sum_stat.py index 1b7030db..7081ebe0 100644 --- a/metcalcpy/sum_stat.py +++ b/metcalcpy/sum_stat.py @@ -56,7 +56,7 @@ from metcalcpy.util.utils import is_string_integer, parse_bool, \ aggregate_field_values, perform_event_equalization, is_string_strictly_float - +from metcalcpy.logging_config import setup_logging class SumStat: """A class that performs event equalisation if needed and statistics calculation @@ -80,7 +80,9 @@ def __init__(self, in_params): Raises: EmptyDataError or ValueError when the input DataFrame is empty or doesn't have data """ - + self.logger = setup_logging(in_params) + logger = self.logger + logger.debug("Initializing Scorecard with parameters.") self.params = in_params # import pandas try: @@ -91,7 +93,7 @@ def __init__(self, in_params): ) if self.input_data.empty: - logging.warning('The input data is empty. The empty output file was created') + logger.warning('The input data is empty. The empty output file was created') export_csv = self.input_data.to_csv(self.params['sum_stat_output'], index=None, header=True, mode="w", sep="\t") @@ -99,9 +101,10 @@ def __init__(self, in_params): self.column_names = self.input_data.columns.values except pd.errors.EmptyDataError: + logger.error('The input data is empty. The empty output file was created') raise except KeyError as ex: - logging.error('Parameter with key %s is missing', ex) + logger.error('Parameter with key %s is missing', ex) raise STATISTIC_TO_FIELDS = {'ctc': {"total", "fy_oy", "fy_on", "fn_oy", "fn_on"}, @@ -123,6 +126,8 @@ def calculate_stats(self): """ Calculates summary statistics for each series point Writes output data to the file """ + logger =self.logger + logger.debug("Calculating summary statistics.") fields = [] try: fields = self.STATISTIC_TO_FIELDS[self.params['line_type']] @@ -154,11 +159,11 @@ def calculate_stats(self): # self.process_row, num_of_processes=mp.cpu_count()) print("--- %s seconds ---" % (time.time() - start_time)) else: - logging.warning('Event equalisation removed all data. ' + logger.warning('Event equalisation removed all data. ' 'The empty output file is created') except KeyError as ex: - logging.error('Parameter with key %s is missing. The empty output file is created', ex) + logger.error('Parameter with key %s is missing. The empty output file is created', ex) # remove the original fields to save the space for column in fields: @@ -185,28 +190,34 @@ def aggregate_special_fields(self, axis='1'): axis - y1 or y1 axis """ warnings.filterwarnings('error') + logger = self.logger + logger.debug("Aggregating special fields.") # check if indy_vals have a field that need to be aggregated - the field with ';' has_agg_indy_field = any(any(GROUP_SEPARATOR in i for i in item) for item in self.params['indy_vals']) - + logger.debug(f"Independent variable field with ';' detected: {has_agg_indy_field}") # look if there is a field that need to be aggregated first - the field with ';' series_var_val = self.params['series_val_' + axis] has_agg_series_field = any(any(GROUP_SEPARATOR in i for i in item) for item in series_var_val) - + logger.debug(f"Series variable field with ';' detected: {has_agg_series_field}") if series_var_val and (has_agg_indy_field or has_agg_series_field): # the special field was detected - + logger.info("Special fields detected for aggregation. Starting aggregation process.") all_points = list(itertools.product(*series_var_val.values())) aggregated_values = pd.DataFrame() series_vars = list(series_var_val.keys()) + logger.debug(f"All points for aggregation: {all_points}") + for indy_val in self.params['indy_vals']: # filter the input frame by each indy value if indy_val is None: filtered_by_indy = self.input_data + logger.debug(f"Using all input data for indy_val: {indy_val}") else: # filter by value or split the value and filter by multiple values filtered_by_indy = self.input_data[ self.input_data[self.params['indy_var']].isin(indy_val.split(';'))] + logger.debug(f"Filtered data for indy_val {indy_val}. Rows remaining: {len(filtered_by_indy)}") for point in all_points: point_data = filtered_by_indy @@ -217,6 +228,8 @@ def aggregate_special_fields(self, axis='1'): actual_series_vals = point[index].split(';') else: actual_series_vals = point[index].split(GROUP_SEPARATOR) + logger.debug(f"Actual series values for {series_var}: {actual_series_vals}") + for ind, val in enumerate(actual_series_vals): if is_string_integer(val): actual_series_vals[ind] = int(val) @@ -224,9 +237,12 @@ def aggregate_special_fields(self, axis='1'): actual_series_vals[ind] = float(val) point_data = \ point_data[point_data[series_vars[index]].isin(actual_series_vals)] + logger.debug(f"Filtered point data for series_var {series_var}. Rows remaining: {len(point_data)}") + # aggregate point data if any(';' in series_val for series_val in point): + logger.debug("Aggregating data based on series values.") point_data = aggregate_field_values(series_var_val, point_data, self.params['line_type']) @@ -241,6 +257,10 @@ def aggregate_special_fields(self, axis='1'): aggregated_values = pd.concat([aggregated_values, point_data]) self.input_data = aggregated_values self.input_data.reset_index(inplace=True, drop=True) + logger.info(f"Aggregation completed. Rows after aggregation: {len(self.input_data)}") + else: + logger.debug("No special fields detected for aggregation.") + def process_rows(self): """For each row in the data frame finds the row statistic name, @@ -287,12 +307,19 @@ def calculate_statistic(values, columns_names, stat_name, aggregation=False): Raises: an error """ - func_name = f'calculate_{stat_name}' - num_parameters = len(signature(globals()[func_name]).parameters) - if num_parameters == 2: - stat = globals()[func_name](values, columns_names) - else: - stat = globals()[func_name](values, columns_names, aggregation) + logger = self.logger + logger.debug(f"Calculating statistic '{stat_name}' with aggregation: {aggregation}.") + try: + func_name = f'calculate_{stat_name}' + num_parameters = len(signature(globals()[func_name]).parameters) + if num_parameters == 2: + stat = globals()[func_name](values, columns_names) + else: + stat = globals()[func_name](values, columns_names, aggregation) + logger.info(f"Successfully calculated statistic '{stat_name}'.") + except Exception as e: + logger.error(f"An error occurred while calculating statistic '{stat_name}': {e}", exc_info=True) + raise return stat diff --git a/metcalcpy/validate_mv_python.py b/metcalcpy/validate_mv_python.py index 7e7eeaf9..e34d0f3f 100644 --- a/metcalcpy/validate_mv_python.py +++ b/metcalcpy/validate_mv_python.py @@ -40,7 +40,19 @@ import yaml from metcalcpy.compare_images import CompareImages +from metcalcpy.logging_config import setup_logging +def safe_log(logger, log_method, message): + """ + Safely logs a message using the provided logger and log method. + + Args: + logger (logging.Logger): The logger object. If None, the message will not be logged. + log_method (callable): The logging method to use (e.g., logger.info, logger.debug). + message (str): The message to log. + """ + if logger: + log_method(message) def replace_name(old_name, postfix): """Adds postfix to the end of a file name but before the extension @@ -83,13 +95,17 @@ def main(params): """ # remove old output + logger = setup_logging(params) + logger.info("Cleaning old outputs.") clean(params) # find XML files test_xml = get_test_xml(params) + logger.info(f"Found {len(test_xml)} XML files to process.") # rerun each XML with Python for file in test_xml: + logger.info(f'Checking {file}') print(f'\nChecking {file}') doc = xml.dom.minidom.parse(file) @@ -116,6 +132,7 @@ def main(params): new_xml_file = params['output_xml_dir'] + os.path.basename(replace_name(file, 'py')) with open(new_xml_file, "w") as xml_file: doc.writexml(xml_file) + logger.info(f"New XML saved: {new_xml_file}") # run METviewer with the new XML and wait till it is done process = subprocess.Popen([params['mv_home'] + '/bin/mv_batch.sh', new_xml_file], @@ -131,6 +148,7 @@ def main(params): if not os.path.exists(original_plot_path) \ and not os.path.exists(new_image_path): # if both images don't exist - success + logger.info(f'SUCCESS: For {plot_name} both images do not exist.') print(f'SUCCESS: For {plot_name} both images don\'t exist') # remove new XML os.remove(new_xml_file) @@ -145,6 +163,7 @@ def main(params): compare = CompareImages(original_plot_path, new_image_path) ssim = compare.get_mssim() if ssim == 1.0: + logger.info(f'SUCCESS: For {plot_name} images are identical.') print(f'SUCCESS: For {plot_name} images are identical') # remove new image os.remove(new_image_path) @@ -156,12 +175,14 @@ def main(params): delete_similar_files(params['output_data_dir'], plot_name) else: + logger.error(f'ERROR: For {plot_name} images are different.') print(f'ERROR: For {plot_name} images are different') # add more diagnostic images compare.save_thresh_image(params['output_plots_dir'] + replace_name(plot_name, 'thresh')) except KeyError as err: + logger.error(f'ERROR: For {plot_name} : {err}', exc_info=True) print(f'ERROR: For {plot_name} : {err}') diff --git a/test/ecnt_agg_stat.yaml b/test/ecnt_agg_stat.yaml index a09538c5..91a097d9 100644 --- a/test/ecnt_agg_stat.yaml +++ b/test/ecnt_agg_stat.yaml @@ -29,4 +29,4 @@ series_val_1: series_val_2: {} log_dir: !ENV "${TEST_DIR}/logs" log_filename: log_agg_stat_ecnt.txt -log_level: DEBUG +log_level: WARNING diff --git a/test/test_agg_eclv.py b/test/test_agg_eclv.py index 2ae137df..52b54c9e 100644 --- a/test/test_agg_eclv.py +++ b/test/test_agg_eclv.py @@ -46,7 +46,7 @@ def settings(): 'cl_step': 0.05, 'log_dir': f'{cwd}/logs/', 'log_filename': 'log_agg_eclv.txt', - 'log_level': 'INFO' + 'log_level': 'WARNING' } agg_stat = AggEclv(params) settings_dict = dict() diff --git a/test/test_agg_ratio.py b/test/test_agg_ratio.py index 05254db2..612bfb5c 100644 --- a/test/test_agg_ratio.py +++ b/test/test_agg_ratio.py @@ -67,7 +67,10 @@ def settings(): 'APCP_03': ['ECNT_RMSE','ECNT_SPREAD'] }, 'list_stat_1':['ECNT_RMSE', 'ECNT_SPREAD'], - 'list_stat_2':[] + 'list_stat_2':[], + 'log_dir': f'{cwd}/logs/', + 'log_filename': 'log_agg_stat.txt', + 'log_level': 'WARNING' } agg_stat = AggStat(params) settings_dict = dict() diff --git a/test/test_calc_difficulty_index.py b/test/test_calc_difficulty_index.py index 5567204c..bfd8c95f 100644 --- a/test/test_calc_difficulty_index.py +++ b/test/test_calc_difficulty_index.py @@ -47,11 +47,11 @@ def test_forecast_difficulty(): kwargs = {'thresh_eps': thresh_eps, 'threshold_type': 'proximity'} assert 0.9095608641027515 == forecast_difficulty(sigmaij, muij, threshold, fieldijn, - Aplin=None, sigma_over_mu_ref=EPS)[0][0] + Aplin=None, sigma_over_mu_ref=EPS, None)[0][0] assert 0.8191620255148825 == forecast_difficulty(sigmaij, muij, threshold, fieldijn, - Aplin=None, sigma_over_mu_ref=EPS)[8][17] + Aplin=None, sigma_over_mu_ref=EPS, None)[8][17] assert 1.227707670365556 == forecast_difficulty(sigmaij, muij, threshold, fieldijn, - Aplin=None, sigma_over_mu_ref=EPS)[4][9] + Aplin=None, sigma_over_mu_ref=EPS, None)[4][9] if __name__ == "__main__": diff --git a/test/test_event_equalize.py b/test/test_event_equalize.py index ba452fa5..1f8efd8b 100755 --- a/test/test_event_equalize.py +++ b/test/test_event_equalize.py @@ -90,7 +90,7 @@ def perform_event_equalize(fcst_var_val, fixed_vars_vals_input, indy_var, input_ start = time.time() series_data = \ event_equalize(series_data, indy_var, series_val, fix_vars, - fix_vals_permuted, True, bool_multi) + fix_vals_permuted, True, bool_multi, None) end = time.time() print("one EE:" + str(end - start)) diff --git a/test/test_event_equalize_against_values.py b/test/test_event_equalize_against_values.py index 393993e9..0e4b2fa6 100644 --- a/test/test_event_equalize_against_values.py +++ b/test/test_event_equalize_against_values.py @@ -39,7 +39,7 @@ def test_event_equalize_against_values(): & (stats_data[series_var].isin(series_var_vals_no_group)) ] ee_stats_equalize_unique = (list(set(ee_stats_equalize['equalize']))) - f_plot = event_equalize_against_values(f_plot, ee_stats_equalize_unique) + f_plot = event_equalize_against_values(f_plot, ee_stats_equalize_unique, None) # append EE data to result if output_data.empty: diff --git a/test/test_scorecard.py b/test/test_scorecard.py index e94ae094..5ae16702 100644 --- a/test/test_scorecard.py +++ b/test/test_scorecard.py @@ -85,7 +85,9 @@ def settings(): 'stat_flag': 'NCAR', 'sum_stat_input': f'{cwd}/data/scorecard.data', 'sum_stat_output': f'{cwd}/data/scorecard_output.data' - + 'log_dir': f'{cwd}/logs/', + 'log_filename': 'log_scorecard.txt', + 'log_level': 'WARNING' } scorecard = Scorecard(params) settings_dict = dict() diff --git a/test/val1l2_agg_stat.yaml b/test/val1l2_agg_stat.yaml index 503c30d1..309bce36 100644 --- a/test/val1l2_agg_stat.yaml +++ b/test/val1l2_agg_stat.yaml @@ -43,4 +43,4 @@ series_val_1: series_val_2: {} log_dir: !ENV "${TEST_DIR}/logs" log_filename: log_agg_stat_val1l2.txt -log_level: DEBUG +log_level: WARNING diff --git a/test/vcnt_agg_stat.yaml b/test/vcnt_agg_stat.yaml index 2d55ddb7..78573d7a 100644 --- a/test/vcnt_agg_stat.yaml +++ b/test/vcnt_agg_stat.yaml @@ -45,4 +45,4 @@ series_val_1: series_val_2: {} log_dir: !ENV "${TEST_DIR}/logs" log_filename: log_agg_stat_vcnt.txt -log_level: DEBUG +log_level: WARNING diff --git a/test/vl1l2_agg_stat_met_v12.yaml b/test/vl1l2_agg_stat_met_v12.yaml index dacb3616..3db4df7a 100644 --- a/test/vl1l2_agg_stat_met_v12.yaml +++ b/test/vl1l2_agg_stat_met_v12.yaml @@ -43,4 +43,4 @@ series_val_1: series_val_2: {} log_dir: !ENV "${TEST_DIR}/logs" log_filename: log_agg_stat_vl1l2.txt -log_level: DEBUG +log_level: WARNING