Skip to content

Commit

Permalink
Added logging statements and modified tests accordingly
Browse files Browse the repository at this point in the history
  • Loading branch information
ishita9 committed Sep 18, 2024
1 parent c6b4931 commit 5eb9e4d
Show file tree
Hide file tree
Showing 24 changed files with 581 additions and 193 deletions.
5 changes: 3 additions & 2 deletions metcalcpy/agg_eclv.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ def _get_bootstrapped_stats(self, series_data, thresholds):
ci_method=self.params['method'],
save_data=False,
block_length=block_length,
eclv=True
eclv=True,
logger=logger
)
logger.info(f"Bootstrapped statistics calculated for threshold {thresh}.")
except KeyError as err:
Expand Down Expand Up @@ -342,7 +343,7 @@ def calculate_stats_and_ci(self):
self.input_data = event_equalize(self.input_data, 'stat_name',
self.params['series_val_1'],
fix_vals_keys,
fix_vals_permuted_list, is_equalize_by_indep, False)
fix_vals_permuted_list, is_equalize_by_indep, False, logger)
logger.debug("Event equalization completed.")

# Process data to calculate statistics
Expand Down
3 changes: 2 additions & 1 deletion metcalcpy/agg_stat.py
Original file line number Diff line number Diff line change
Expand Up @@ -1166,7 +1166,8 @@ def _get_bootstrapped_stats(self, series_data, axis="1"):
num_threads=self.params['num_threads'],
ci_method=self.params['method'],
save_data=has_derived_series,
block_length=block_length
block_length=block_length,
logger=logger
)
logger.info("Bootstrapping and CI calculation completed.")

Expand Down
92 changes: 82 additions & 10 deletions metcalcpy/agg_stat_bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
from metcalcpy.util.mode_3d_volrat_statistics import *
from metcalcpy.util.mode_3d_ratio_statistics import *
from metcalcpy.util.utils import is_string_integer, parse_bool, sort_data, is_string_strictly_float

from metcalcpy.logging_config import setup_logging

class AggStatBootstrap:
"""A class that performs aggregation statistic logic fot MODE and MTD ratio statistics on input data frame.
Expand All @@ -68,7 +68,9 @@ def __init__(self, in_params):
Args:
in_params - input parameters as a dictionary
"""

self.logger = setup_logging(in_params)
logger = self.logger
logger.debug("Initializing AggStatBootstrap with parameters.")
self.statistic = None
self.derived_name_to_values = {}
self.params = in_params
Expand All @@ -90,40 +92,54 @@ def _init_out_frame(self, series_fields, series):
Returns:
pandas data frame
"""
logger = self.logger
logger.debug("Initializing output data frame.")
result = pd.DataFrame()
row_number = len(series)
logger.debug(f"Number of rows to initialize: {row_number}")
# fill series variables and values
for field_ind, field in enumerate(series_fields):
result[field] = [row[field_ind] for row in series]

logger.debug(f"Field '{field}' initialized with {len(result[field])} entries.")
# fill the stats and CI values placeholders with None
result['fcst_var'] = [None] * row_number
result['stat_value'] = [None] * row_number
result['stat_btcl'] = [None] * row_number
result['stat_btcu'] = [None] * row_number
result['nstats'] = [None] * row_number

logger.debug("Stats and confidence interval placeholders added.")
logger.debug(f"DataFrame initialized with columns: {result.columns.tolist()}")

return result

def _proceed_with_axis(self, axis="1"):

logger = self.logger
logger.info(f"Proceeding with axis: {axis}")
if not self.input_data.empty:
# identify all possible points values by adding series values, indy values
# and statistics and then permute them
logger.debug("Input data is not empty. Proceeding with calculations.")
indy_vals = self.params['indy_vals']
series_val = self.params['series_val_' + axis]
all_fields_values = series_val.copy()
all_fields_values[self.params['indy_var']] = indy_vals
all_fields_values['stat_name'] = self.params['list_stat_' + axis]
all_points = list(itertools.product(*all_fields_values.values()))
logger.debug(f"All points generated: {len(all_points)} points created for axis {axis}.")
fcst_var = None
if len(self.params['fcst_var_val_' + axis]) > 0 and 'fcst_var' in self.input_data.columns:
fcst_var = list(self.params['fcst_var_val_' + axis].keys())[0]

logger.debug(f"Forecast variable identified: {fcst_var}")
cases = []
out_frame = self._init_out_frame(all_fields_values.keys(), all_points)
logger.debug(f"Output DataFrame initialized with {len(out_frame)} rows.")
point_to_distrib = {}

# run the bootstrap flow for each independent variable value
for indy_val in indy_vals:
logger.debug(f"Processing independent value: {indy_val}")
# extract the records for the current indy value
if is_string_integer(indy_val):
filtered_by_indy_data = \
Expand All @@ -138,6 +154,7 @@ def _proceed_with_axis(self, axis="1"):
all_fields_values = series_val.copy()

all_points = list(itertools.product(*all_fields_values.values()))
logger.debug(f"Number of points for independent value '{indy_val}': {len(all_points)}.")

for point in all_points:
all_filters = []
Expand All @@ -164,6 +181,7 @@ def _proceed_with_axis(self, axis="1"):
# use numpy to select the rows where any record evaluates to True
mask = np.array(all_filters).all(axis=0)
point_data = filtered_by_indy_data.loc[mask]
logger.debug(f"Point data filtered for point {point}. Number of records: {len(point_data)}")

# build a list of cases to sample
fcst_valid = point_data.loc[:, 'fcst_valid'].astype(str)
Expand All @@ -174,6 +192,7 @@ def _proceed_with_axis(self, axis="1"):
# calculate bootstrap for cases
for stat_upper in self.params['list_stat_' + axis]:
self.statistic = stat_upper.lower()
logger.debug(f"Calculating bootstrap for statistic: {self.statistic}")
for point in all_points:
all_filters = []
out_frame_filter = []
Expand All @@ -198,6 +217,7 @@ def _proceed_with_axis(self, axis="1"):
mask_out_frame = np.array(out_frame_filter).all(axis=0)
point_data = filtered_by_indy_data.loc[mask]
bootstrap_results = self._get_bootstrapped_stats(point_data, cases)
logger.debug(f"Bootstrap results calculated for point {point}: {bootstrap_results.value}")
# save bootstrap results
point_to_distrib[point] = bootstrap_results
n_stats = len(point_data)
Expand All @@ -214,31 +234,48 @@ def _proceed_with_axis(self, axis="1"):
out_frame.loc[index, 'stat_btcl'] = bootstrap_results.lower_bound
out_frame.loc[index, 'stat_btcu'] = bootstrap_results.upper_bound
out_frame.loc[index, 'nstats'] = n_stats
logger.debug(f"Results saved to output DataFrame at index {index} for point {point}.")
else:
out_frame = pd.DataFrame()
logger.warning("Input data is empty. Returning an empty DataFrame.")

logger.info(f"Completed processing for axis: {axis}")
return out_frame

def _get_bootstrapped_stats(self, series_data, cases):
logger = self.logger
logger.info("Starting bootstrapping process.")

logger.debug("Sorting series data.")
self.series_data = sort_data(series_data)
logger.debug(f"Data sorted. Number of rows: {len(self.series_data)}")
if self.params['num_iterations'] == 1:
logger.info("Only one iteration specified. Skipping bootstrapping.")
stat_val = self._calc_stats(cases)[0]
logger.debug(f"Statistic calculated: {stat_val}")
results = BootstrapResults(lower_bound=None,
value=stat_val,
upper_bound=None)
logger.info("Statistic calculated without bootstrapping.")
else:
# need bootstrapping and CI calculation in addition to statistic
# need bootstrapping and CI calculation in addition to
logger.info("Performing bootstrapping and confidence interval calculation.")
try:
results = bootstrap_and_value_mode(
self.series_data,
cases,
stat_func=self._calc_stats,
num_iterations=self.params['num_iterations'],
num_threads=self.params['num_threads'],
ci_method=self.params['method'])

ci_method=self.params['method'],
logger=logger)
logger.debug("Bootstrapping completed successfully.")
except KeyError as err:
logger.error(f"Error during bootstrapping: {err}", exc_info=True)
results = BootstrapResults(None, None, None)
logger.info("Returning empty BootstrapResults due to error.")
print(err)
logger.info("Bootstrapping process completed.")
return results

def _calc_stats(self, cases):
Expand All @@ -253,58 +290,93 @@ def _calc_stats(self, cases):
an error
"""
logger = self.logger
func_name = f'calculate_{self.statistic}'
logger.info(f"Starting statistic calculation using function: {func_name}")
if cases is not None and cases.ndim == 2:
# The single value case
logger.debug("Processing single-value case.")

# build a data frame with the sampled data
data_cases = np.asarray(self.series_data['case'])
flat_cases = cases.flatten()
values = self.series_data[np.in1d(data_cases, flat_cases)].to_numpy()
stat_values = [globals()[func_name](values, self.column_names)]
logger.debug(f"Number of values selected for single case: {len(values)}")
# Calculate the statistic for each bootstrap iteration
try:
stat_value = globals()[func_name](values, self.column_names)
stat_values.append([stat_value])
logger.info(f"Statistic calculated for bootstrap iteration: {stat_value}")
except Exception as e:
logger.error(f"Error calculating statistic for bootstrap iteration: {e}", exc_info=True)
raise

elif cases is not None and cases.ndim == 3:
# bootstrapped case
stat_values = []
for row in cases:
values_ind = self.series_data['case'].isin(row.flatten())
values = self.series_data[values_ind]
stat_values.append([globals()[func_name](values, self.column_names)])
logger.debug(f"Number of values selected for bootstrap iteration: {len(values)}")
# Calculate the statistic for each bootstrap iteration
try:
stat_value = globals()[func_name](values, self.column_names)
stat_values.append([stat_value])
logger.info(f"Statistic calculated for bootstrap iteration: {stat_value}")
except Exception as e:
logger.error(f"Error calculating statistic for bootstrap iteration: {e}", exc_info=True)
raise
else:
logger.error("Invalid input for cases. Cannot calculate statistic.")
raise KeyError("can't calculate statistic")
return stat_values

def calculate_values(self):
""" Performs EE if needed followed by aggregation statistic logic
Writes output data to the file
"""
logger = self.logger
logger.info("Starting calculation of values.")
if not self.input_data.empty:
logger.debug("Input data is not empty. Proceeding with calculations.")
if self.params['random_seed'] is not None and self.params['random_seed'] != 'None':
logger.debug(f"Random seed set to: {self.params['random_seed']}")
np.random.seed(self.params['random_seed'])

# perform EE if needed
is_event_equal = parse_bool(self.params['event_equal'])
if is_event_equal:
logger.info("Event equalization required. Performing event equalization.")
self._perform_event_equalization()
logger.debug("Event equalization completed.")

# build the case information for each record
logger.debug("Building case information for each record.")
fcst_valid = self.input_data.loc[:, 'fcst_valid'].astype(str)
indy_var = self.input_data.loc[:, self.params['indy_var']].astype(str)
self.input_data['case'] = fcst_valid + '#' + indy_var
logger.debug("Case information added to the input data.")

# get results for axis1
logger.info("Calculating results for axis 1.")
out_frame = self._proceed_with_axis("1")
if self.params['series_val_2']:
logger.info("Series values for axis 2 detected. Calculating results for axis 2.")
out_frame = pd.concat([out_frame, self._proceed_with_axis("2")])
logger.debug("Results for axis 2 calculated and combined with axis 1.")

else:
logger.warning("Input data is empty. Returning an empty DataFrame.")
out_frame = pd.DataFrame()

header = True
mode = 'w'

logger.info(f"Exporting results to {self.params['agg_stat_output']}")
export_csv = out_frame.to_csv(self.params['agg_stat_output'],
index=None, header=header, mode=mode,
sep="\t", na_rep="NA")
logger.info("Results successfully exported to CSV.")


def _perform_event_equalization(self):
""" Performs event equalisation on input data
Expand Down
Loading

0 comments on commit 5eb9e4d

Please sign in to comment.