Skip to content

Commit

Permalink
Update agg_stat_bootstrap.py
Browse files Browse the repository at this point in the history
replace safe.logger with safe_log.logger, to match import
  • Loading branch information
bikegeek authored Jan 2, 2025
1 parent e084bc5 commit be32b14
Showing 1 changed file with 52 additions and 52 deletions.
104 changes: 52 additions & 52 deletions metcalcpy/agg_stat_bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def __init__(self, in_params):
"""
self.logger = setup_logging(in_params)
logger = self.logger
safe.logger(logger, "debug", "Initializing AggStatBootstrap with parameters.")
safe_log.logger(logger, "debug", "Initializing AggStatBootstrap with parameters.")
self.statistic = None
self.derived_name_to_values = {}
self.params = in_params
Expand All @@ -94,53 +94,53 @@ def _init_out_frame(self, series_fields, series):
pandas data frame
"""
logger = self.logger
safe.logger(logger, "debug", "Initializing output data frame.")
safe_log.logger(logger, "debug", "Initializing output data frame.")
result = pd.DataFrame()
row_number = len(series)
safe.logger(logger, "debug", f"Number of rows to initialize: {row_number}")
safe_log.logger(logger, "debug", f"Number of rows to initialize: {row_number}")
# fill series variables and values
for field_ind, field in enumerate(series_fields):
result[field] = [row[field_ind] for row in series]
safe.logger(logger, "debug", f"Field '{field}' initialized with {len(result[field])} entries.")
safe_log.logger(logger, "debug", f"Field '{field}' initialized with {len(result[field])} entries.")
# fill the stats and CI values placeholders with None
result['fcst_var'] = [None] * row_number
result['stat_value'] = [None] * row_number
result['stat_btcl'] = [None] * row_number
result['stat_btcu'] = [None] * row_number
result['nstats'] = [None] * row_number

safe.logger(logger, "debug", "Stats and confidence interval placeholders added.")
safe.logger(logger, "debug", f"DataFrame initialized with columns: {result.columns.tolist()}")
safe_log.logger(logger, "debug", "Stats and confidence interval placeholders added.")
safe_log.logger(logger, "debug", f"DataFrame initialized with columns: {result.columns.tolist()}")

return result

def _proceed_with_axis(self, axis="1"):

logger = self.logger
safe.logger(logger, "info", f"Proceeding with axis: {axis}")
safe_log.logger(logger, "info", f"Proceeding with axis: {axis}")
if not self.input_data.empty:
# identify all possible points values by adding series values, indy values
# and statistics and then permute them
safe.logger(logger, "debug", "Input data is not empty. Proceeding with calculations.")
safe_log.logger(logger, "debug", "Input data is not empty. Proceeding with calculations.")
indy_vals = self.params['indy_vals']
series_val = self.params['series_val_' + axis]
all_fields_values = series_val.copy()
all_fields_values[self.params['indy_var']] = indy_vals
all_fields_values['stat_name'] = self.params['list_stat_' + axis]
all_points = list(itertools.product(*all_fields_values.values()))
safe.logger(logger, "debug", f"All points generated: {len(all_points)} points created for axis {axis}.")
safe_log.logger(logger, "debug", f"All points generated: {len(all_points)} points created for axis {axis}.")
fcst_var = None
if len(self.params['fcst_var_val_' + axis]) > 0 and 'fcst_var' in self.input_data.columns:
fcst_var = list(self.params['fcst_var_val_' + axis].keys())[0]
safe.logger(logger, "debug", f"Forecast variable identified: {fcst_var}")
safe_log.logger(logger, "debug", f"Forecast variable identified: {fcst_var}")
cases = []
out_frame = self._init_out_frame(all_fields_values.keys(), all_points)
safe.logger(logger, "debug", f"Output DataFrame initialized with {len(out_frame)} rows.")
safe_log.logger(logger, "debug", f"Output DataFrame initialized with {len(out_frame)} rows.")
point_to_distrib = {}

# run the bootstrap flow for each independent variable value
for indy_val in indy_vals:
safe.logger(logger, "debug", f"Processing independent value: {indy_val}")
safe_log.logger(logger, "debug", f"Processing independent value: {indy_val}")
# extract the records for the current indy value
if is_string_integer(indy_val):
filtered_by_indy_data = \
Expand All @@ -155,7 +155,7 @@ def _proceed_with_axis(self, axis="1"):
all_fields_values = series_val.copy()

all_points = list(itertools.product(*all_fields_values.values()))
safe.logger(logger, "debug", f"Number of points for independent value '{indy_val}': {len(all_points)}.")
safe_log.logger(logger, "debug", f"Number of points for independent value '{indy_val}': {len(all_points)}.")

for point in all_points:
all_filters = []
Expand All @@ -182,7 +182,7 @@ def _proceed_with_axis(self, axis="1"):
# use numpy to select the rows where any record evaluates to True
mask = np.array(all_filters).all(axis=0)
point_data = filtered_by_indy_data.loc[mask]
safe.logger(logger, "debug", f"Point data filtered for point {point}. Number of records: {len(point_data)}")
safe_log.logger(logger, "debug", f"Point data filtered for point {point}. Number of records: {len(point_data)}")

# build a list of cases to sample
fcst_valid = point_data.loc[:, 'fcst_valid'].astype(str)
Expand All @@ -193,7 +193,7 @@ def _proceed_with_axis(self, axis="1"):
# calculate bootstrap for cases
for stat_upper in self.params['list_stat_' + axis]:
self.statistic = stat_upper.lower()
safe.logger(logger, "debug", f"Calculating bootstrap for statistic: {self.statistic}")
safe_log.logger(logger, "debug", f"Calculating bootstrap for statistic: {self.statistic}")
for point in all_points:
all_filters = []
out_frame_filter = []
Expand All @@ -218,7 +218,7 @@ def _proceed_with_axis(self, axis="1"):
mask_out_frame = np.array(out_frame_filter).all(axis=0)
point_data = filtered_by_indy_data.loc[mask]
bootstrap_results = self._get_bootstrapped_stats(point_data, cases)
safe.logger(logger, "debug", f"Bootstrap results calculated for point {point}: {bootstrap_results.value}")
safe_log.logger(logger, "debug", f"Bootstrap results calculated for point {point}: {bootstrap_results.value}")
# save bootstrap results
point_to_distrib[point] = bootstrap_results
n_stats = len(point_data)
Expand All @@ -235,32 +235,32 @@ def _proceed_with_axis(self, axis="1"):
out_frame.loc[index, 'stat_btcl'] = bootstrap_results.lower_bound
out_frame.loc[index, 'stat_btcu'] = bootstrap_results.upper_bound
out_frame.loc[index, 'nstats'] = n_stats
safe.logger(logger, "debug", f"Results saved to output DataFrame at index {index} for point {point}.")
safe_log.logger(logger, "debug", f"Results saved to output DataFrame at index {index} for point {point}.")
else:
out_frame = pd.DataFrame()
safe.logger(logger, "warning", "Input data is empty. Returning an empty DataFrame.")
safe_log.logger(logger, "warning", "Input data is empty. Returning an empty DataFrame.")

safe.logger(logger, "info", f"Completed processing for axis: {axis}")
safe_log.logger(logger, "info", f"Completed processing for axis: {axis}")
return out_frame

def _get_bootstrapped_stats(self, series_data, cases):
logger = self.logger
safe.logger(logger, "info", "Starting bootstrapping process.")
safe_log.logger(logger, "info", "Starting bootstrapping process.")

safe.logger(logger, "debug", "Sorting series data.")
safe_log.logger(logger, "debug", "Sorting series data.")
self.series_data = sort_data(series_data)
safe.logger(logger, "debug", f"Data sorted. Number of rows: {len(self.series_data)}")
safe_log.logger(logger, "debug", f"Data sorted. Number of rows: {len(self.series_data)}")
if self.params['num_iterations'] == 1:
safe.logger(logger, "info", "Only one iteration specified. Skipping bootstrapping.")
safe_log.logger(logger, "info", "Only one iteration specified. Skipping bootstrapping.")
stat_val = self._calc_stats(cases)[0]
safe.logger(logger, "debug", f"Statistic calculated: {stat_val}")
safe_log.logger(logger, "debug", f"Statistic calculated: {stat_val}")
results = BootstrapResults(lower_bound=None,
value=stat_val,
upper_bound=None)
safe.logger(logger, "info", "Statistic calculated without bootstrapping.")
safe_log.logger(logger, "info", "Statistic calculated without bootstrapping.")
else:
# need bootstrapping and CI calculation in addition to
safe.logger(logger, "info", "Performing bootstrapping and confidence interval calculation.")
safe_log.logger(logger, "info", "Performing bootstrapping and confidence interval calculation.")
try:
results = bootstrap_and_value_mode(
self.series_data,
Expand All @@ -271,13 +271,13 @@ def _get_bootstrapped_stats(self, series_data, cases):
ci_method=self.params['method'],
logger=logger
)
safe.logger(logger, "debug", "Bootstrapping completed successfully.")
safe_log.logger(logger, "debug", "Bootstrapping completed successfully.")
except KeyError as err:
safe.logger(logger, "error", f"Error during bootstrapping: {err}")
safe_log.logger(logger, "error", f"Error during bootstrapping: {err}")
results = BootstrapResults(None, None, None)
safe.logger(logger, "info", "Returning empty BootstrapResults due to error.")
safe_log.logger(logger, "info", "Returning empty BootstrapResults due to error.")
print(err)
safe.logger(logger, "info", "Bootstrapping process completed.")
safe_log.logger(logger, "info", "Bootstrapping process completed.")
return results

def _calc_stats(self, cases):
Expand All @@ -294,23 +294,23 @@ def _calc_stats(self, cases):
"""
logger = self.logger
func_name = f'calculate_{self.statistic}'
safe.logger(logger, "info", f"Starting statistic calculation using function: {func_name}")
safe_log.logger(logger, "info", f"Starting statistic calculation using function: {func_name}")
if cases is not None and cases.ndim == 2:
# The single value case
safe.logger(logger, "debug", "Processing single-value case.")
safe_log.logger(logger, "debug", "Processing single-value case.")

# build a data frame with the sampled data
data_cases = np.asarray(self.series_data['case'])
flat_cases = cases.flatten()
values = self.series_data[np.in1d(data_cases, flat_cases)].to_numpy()
safe.logger(logger, "debug", f"Number of values selected for single case: {len(values)}")
safe_log.logger(logger, "debug", f"Number of values selected for single case: {len(values)}")
# Calculate the statistic for each bootstrap iteration
try:
stat_value = globals()[func_name](values, self.column_names, logger=logger)
stat_values.append([stat_value])
safe.logger(logger, "info", f"Statistic calculated for bootstrap iteration: {stat_value}")
safe_log.logger(logger, "info", f"Statistic calculated for bootstrap iteration: {stat_value}")
except Exception as e:
safe.logger(logger, "error", f"Error calculating statistic for bootstrap iteration: {e}")
safe_log.logger(logger, "error", f"Error calculating statistic for bootstrap iteration: {e}")
raise

elif cases is not None and cases.ndim == 3:
Expand All @@ -319,17 +319,17 @@ def _calc_stats(self, cases):
for row in cases:
values_ind = self.series_data['case'].isin(row.flatten())
values = self.series_data[values_ind]
safe.logger(logger, "debug", f"Number of values selected for bootstrap iteration: {len(values)}")
safe_log.logger(logger, "debug", f"Number of values selected for bootstrap iteration: {len(values)}")
# Calculate the statistic for each bootstrap iteration
try:
stat_value = globals()[func_name](values, self.column_names, logger=logger)
stat_values.append([stat_value])
safe.logger(logger, "info", f"Statistic calculated for bootstrap iteration: {stat_value}")
safe_log.logger(logger, "info", f"Statistic calculated for bootstrap iteration: {stat_value}")
except Exception as e:
safe.logger(logger, "error", f"Error calculating statistic for bootstrap iteration: {e}")
safe_log.logger(logger, "error", f"Error calculating statistic for bootstrap iteration: {e}")
raise
else:
safe.logger(logger, "error", "Invalid input for cases. Cannot calculate statistic.")
safe_log.logger(logger, "error", "Invalid input for cases. Cannot calculate statistic.")
raise KeyError("can't calculate statistic")
return stat_values

Expand All @@ -338,46 +338,46 @@ def calculate_values(self):
Writes output data to the file
"""
logger = self.logger
safe.logger(logger, "info", "Starting calculation of values.")
safe_log.logger(logger, "info", "Starting calculation of values.")
if not self.input_data.empty:
safe.logger(logger, "debug", "Input data is not empty. Proceeding with calculations.")
safe_log.logger(logger, "debug", "Input data is not empty. Proceeding with calculations.")
if self.params['random_seed'] is not None and self.params['random_seed'] != 'None':
safe.logger(logger, "debug", f"Random seed set to: {self.params['random_seed']}")
safe_log.logger(logger, "debug", f"Random seed set to: {self.params['random_seed']}")
np.random.seed(self.params['random_seed'])

# perform EE if needed
is_event_equal = parse_bool(self.params['event_equal'])
if is_event_equal:
safe.logger(logger, "info", "Event equalization required. Performing event equalization.")
safe_log.logger(logger, "info", "Event equalization required. Performing event equalization.")
self._perform_event_equalization()
safe.logger(logger, "debug", "Event equalization completed.")
safe_log.logger(logger, "debug", "Event equalization completed.")

# build the case information for each record
safe.logger(logger, "debug", "Building case information for each record.")
safe_log.logger(logger, "debug", "Building case information for each record.")
fcst_valid = self.input_data.loc[:, 'fcst_valid'].astype(str)
indy_var = self.input_data.loc[:, self.params['indy_var']].astype(str)
self.input_data['case'] = fcst_valid + '#' + indy_var
safe.logger(logger, "debug", "Case information added to the input data.")
safe_log.logger(logger, "debug", "Case information added to the input data.")

# get results for axis1
safe.logger(logger, "info", "Calculating results for axis 1.")
safe_log.logger(logger, "info", "Calculating results for axis 1.")
out_frame = self._proceed_with_axis("1")
if self.params['series_val_2']:
safe.logger(logger, "info", "Series values for axis 2 detected. Calculating results for axis 2.")
safe_log.logger(logger, "info", "Series values for axis 2 detected. Calculating results for axis 2.")
out_frame = pd.concat([out_frame, self._proceed_with_axis("2")])
safe.logger(logger, "debug", "Results for axis 2 calculated and combined with axis 1.")
safe_log.logger(logger, "debug", "Results for axis 2 calculated and combined with axis 1.")

else:
safe.logger(logger, "warning", "Input data is empty. Returning an empty DataFrame.")
safe_log.logger(logger, "warning", "Input data is empty. Returning an empty DataFrame.")
out_frame = pd.DataFrame()

header = True
mode = 'w'
safe.logger(logger, "info", f"Exporting results to {self.params['agg_stat_output']}")
safe_log.logger(logger, "info", f"Exporting results to {self.params['agg_stat_output']}")
export_csv = out_frame.to_csv(self.params['agg_stat_output'],
index=None, header=header, mode=mode,
sep="\t", na_rep="NA")
safe.logger(logger, "info", "Results successfully exported to CSV.")
safe_log.logger(logger, "info", "Results successfully exported to CSV.")


def _perform_event_equalization(self):
Expand Down

0 comments on commit be32b14

Please sign in to comment.