From efe560527f06b33ecd9023d5783390f365d3ef36 Mon Sep 17 00:00:00 2001 From: Fuhan Yang Date: Fri, 10 Jan 2025 12:17:32 -0500 Subject: [PATCH 1/7] update makefile --- Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 1c0c2a3..695ada6 100644 --- a/Makefile +++ b/Makefile @@ -10,8 +10,8 @@ SCORES = data/scores.parquet all: $(SCORES) -$(SCORE): scripts/eval.py $(FORECASTS) - python $< --input=$(FORECASTS) --output=$@ +$(SCORES): scripts/eval.py $(FORECASTS) + python $< --pred=$(FORECASTS) --obs=$(RAW_DATA) --output=$@ $(FORECASTS): scripts/forecast.py $(RAW_DATA) python $< --input=$(RAW_DATA) --output=$@ From f8be0c4e04a4ef6d308b54390f3c87cbfc371f2a Mon Sep 17 00:00:00 2001 From: Fuhan Yang Date: Fri, 10 Jan 2025 12:17:54 -0500 Subject: [PATCH 2/7] add eval --- scripts/eval.py | 75 +++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 69 insertions(+), 6 deletions(-) diff --git a/scripts/eval.py b/scripts/eval.py index 7d8af84..1cc131f 100644 --- a/scripts/eval.py +++ b/scripts/eval.py @@ -1,6 +1,69 @@ - for score_fun in score_funs: - score = eval.score( - incident_test_data, incident_projections, score_fun - ) - print(f"{model=} {forecast_date=} {score_fun=} {score=}") - # save these scores somewhere +import argparse + +import polars as pl +import yaml + +import iup +from iup import eval + + +def eval_all_forecasts(test, pred, config): + """Evaluate the forecasts for all models, all forecast ends, and all scores""" + score_names = config["score_funs"] + model_names = pred["model"].unique() + forecast_ends = pred["forecast_end"].unique() + + # only 'incident' type is evaluated # + incident_pred = pred.filter(pl.col("estimate_type") == "incident").with_columns( + quantile=0.5 + ) + # This step is arbitrary, but it is necessary to pass PointForecast validation # + + all_scores = pl.DataFrame() + + for score_name in score_names: + score_fun = getattr(eval, score_name) + + for model in model_names: + for forecast_end in forecast_ends: + pred_data = incident_pred.filter( + pl.col("model") == model, pl.col("forecast_end") == forecast_end + ) + + assert (pred_data["forecast_start"] == test["time_end"].min()).all() + + test = iup.IncidentUptakeData(test) + pred_data = iup.PointForecast(pred_data) + + score = eval.score(test, pred_data, score_fun) + score = score.with_columns(score_fun=score_name, model=model) + + all_scores = pl.concat([all_scores, score]) + + return all_scores + + +if __name__ == "__main__": + p = argparse.ArgumentParser() + p.add_argument("--config", help="config file", default="scripts/config.yaml") + p.add_argument("--pred", help="forecast data") + p.add_argument("--obs", help="observed data") + p.add_argument("--output", help="output parquet file") + args = p.parse_args() + + with open(args.config, "r") as f: + config = yaml.safe_load(f) + + pred_data = pl.scan_parquet(args.pred).collect() + obs_data = pl.scan_parquet(args.obs).collect() + obs_data = obs_data.filter(pl.col("estimate_type") == "incident") + + # ensure the same test data is used for all models + test_data = iup.IncidentUptakeData.split_train_test( + obs_data, config["timeframe"]["start"], "test" + ).filter(pl.col("time_end") <= config["timeframe"]["end"]) + + test_data = iup.IncidentUptakeData(test_data) + + all_scores = eval_all_forecasts(test_data, pred_data, config) + all_scores.write_parquet(args.output) From 01e2396582cf60ee5f3e180f9855a98c390299f7 Mon Sep 17 00:00:00 2001 From: Fuhan Yang Date: Mon, 13 Jan 2025 17:20:32 -0500 Subject: [PATCH 3/7] change forecast_end to forecast_start --- scripts/eval.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/scripts/eval.py b/scripts/eval.py index 1cc131f..e79452d 100644 --- a/scripts/eval.py +++ b/scripts/eval.py @@ -11,7 +11,7 @@ def eval_all_forecasts(test, pred, config): """Evaluate the forecasts for all models, all forecast ends, and all scores""" score_names = config["score_funs"] model_names = pred["model"].unique() - forecast_ends = pred["forecast_end"].unique() + forecast_starts = pred["forecast_start"].unique() # only 'incident' type is evaluated # incident_pred = pred.filter(pl.col("estimate_type") == "incident").with_columns( @@ -25,9 +25,9 @@ def eval_all_forecasts(test, pred, config): score_fun = getattr(eval, score_name) for model in model_names: - for forecast_end in forecast_ends: + for forecast_start in forecast_starts: pred_data = incident_pred.filter( - pl.col("model") == model, pl.col("forecast_end") == forecast_end + pl.col("model") == model, pl.col("forecast_start") == forecast_start ) assert (pred_data["forecast_start"] == test["time_end"].min()).all() From 5589b8e4217117f47cdb2ee2517f64bc84c253ca Mon Sep 17 00:00:00 2001 From: Fuhan Yang Date: Mon, 13 Jan 2025 17:28:05 -0500 Subject: [PATCH 4/7] add pre-commit compliance --- iup/eval.py | 6 ++--- scripts/forecast.py | 53 +++++++++++++++++++++++++-------------------- 2 files changed, 33 insertions(+), 26 deletions(-) diff --git a/iup/eval.py b/iup/eval.py index ee4c27b..2170d6d 100644 --- a/iup/eval.py +++ b/iup/eval.py @@ -30,9 +30,9 @@ def check_date_match(data: IncidentUptakeData, pred: PointForecast): (data["time_end"] == pred["time_end"]).all() # 2. There should not be any duplicated date in either data or prediction. - assert not (any(data["time_end"].is_duplicated())), ( - "Duplicated dates are found in data and prediction." - ) + assert not ( + any(data["time_end"].is_duplicated()) + ), "Duplicated dates are found in data and prediction." def score( diff --git a/scripts/forecast.py b/scripts/forecast.py index 6ec0f76..f722035 100644 --- a/scripts/forecast.py +++ b/scripts/forecast.py @@ -3,12 +3,18 @@ import polars as pl import yaml +import iup + + def run_all_forecasts() -> pl.DataFrame: """Run all forecasts Returns: pl.DataFrame: data frame of forecasts, organized by model and forecast date """ + raise NotImplementedError + models = None + forecast_dates = pl.date_range( config["timeframe"]["start"], config["timeframe"]["end"], @@ -19,41 +25,42 @@ def run_all_forecasts() -> pl.DataFrame: assert all(issubclass(model, iup.models.UptakeModel) for model in models) for model in models: - for forecast_date in forecast_dates: - # Get data available as of the forecast date + for forecast_date in forecast_dates: + # Get data available as of the forecast date + pass def run_forecast() -> pl.DataFrame: """Run a single model for a single forecast date""" - incident_train_data = iup.IncidentUptakeData( - iup.IncidentUptakeData.split_train_test( - incident_data, config["timeframe"]["start"], "train" - ) - ) + raise NotImplementedError - # Fit models using the training data and make projections - fit_model = model().fit(incident_train_data, grouping_factors) + # incident_train_data = iup.IncidentUptakeData( + # iup.IncidentUptakeData.split_train_test( + # incident_data, config["timeframe"]["start"], "train" + # ) + # ) - cumulative_projections = fit_model.predict( - config["timeframe"]["start"], - config["timeframe"]["end"], - config["timeframe"]["interval"], - grouping_factors, - ) + # Fit models using the training data and make projections + # fit_model = model().fit(incident_train_data, grouping_factors) + + # cumulative_projections = fit_model.predict( + # config["timeframe"]["start"], + # config["timeframe"]["end"], + # config["timeframe"]["interval"], + # grouping_factors, + # ) # save these projections somewhere - incident_projections = cumulative_projections.to_incident( - grouping_factors - ) + # incident_projections = cumulative_projections.to_incident(grouping_factors) # save these projections somewhere # Evaluation / Post-processing -------------------------------------------- - incident_test_data = iup.IncidentUptakeData( - iup.IncidentUptakeData.split_train_test( - incident_data, config["timeframe"]["start"], "test" - ) - ).filter(pl.col("date") <= config["timeframe"]["end"]) + # incident_test_data = iup.IncidentUptakeData( + # iup.IncidentUptakeData.split_train_test( + # incident_data, config["timeframe"]["start"], "test" + # ) + # ).filter(pl.col("date") <= config["timeframe"]["end"]) if __name__ == "__main__": From 83d53413aab36b785544033b34fdc191cb44b09a Mon Sep 17 00:00:00 2001 From: Fuhan Yang Date: Mon, 13 Jan 2025 17:55:44 -0500 Subject: [PATCH 5/7] fix bug to separate test data --- scripts/eval.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/scripts/eval.py b/scripts/eval.py index e79452d..07e9af9 100644 --- a/scripts/eval.py +++ b/scripts/eval.py @@ -7,16 +7,14 @@ from iup import eval -def eval_all_forecasts(test, pred, config): +def eval_all_forecasts(data, pred, config): """Evaluate the forecasts for all models, all forecast ends, and all scores""" score_names = config["score_funs"] model_names = pred["model"].unique() forecast_starts = pred["forecast_start"].unique() # only 'incident' type is evaluated # - incident_pred = pred.filter(pl.col("estimate_type") == "incident").with_columns( - quantile=0.5 - ) + incident_pred = pred.with_columns(quantile=0.5) # This step is arbitrary, but it is necessary to pass PointForecast validation # all_scores = pl.DataFrame() @@ -30,6 +28,11 @@ def eval_all_forecasts(test, pred, config): pl.col("model") == model, pl.col("forecast_start") == forecast_start ) + test = data.filter( + pl.col("time_end") >= forecast_start, + pl.col("time_end") < config["timeframe"]["end"], + ) + assert (pred_data["forecast_start"] == test["time_end"].min()).all() test = iup.IncidentUptakeData(test) @@ -56,14 +59,9 @@ def eval_all_forecasts(test, pred, config): pred_data = pl.scan_parquet(args.pred).collect() obs_data = pl.scan_parquet(args.obs).collect() - obs_data = obs_data.filter(pl.col("estimate_type") == "incident") # ensure the same test data is used for all models - test_data = iup.IncidentUptakeData.split_train_test( - obs_data, config["timeframe"]["start"], "test" - ).filter(pl.col("time_end") <= config["timeframe"]["end"]) - - test_data = iup.IncidentUptakeData(test_data) + obs_data = iup.CumulativeUptakeData.to_incident(config["groups"]) - all_scores = eval_all_forecasts(test_data, pred_data, config) + all_scores = eval_all_forecasts(obs_data, pred_data, config) all_scores.write_parquet(args.output) From f9f1526f73328afdd3cc5181de3df4966b377f00 Mon Sep 17 00:00:00 2001 From: Fuhan Yang Date: Wed, 15 Jan 2025 14:53:51 -0500 Subject: [PATCH 6/7] update per returned cumulative predictions --- scripts/eval.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/scripts/eval.py b/scripts/eval.py index 07e9af9..234e112 100644 --- a/scripts/eval.py +++ b/scripts/eval.py @@ -13,10 +13,6 @@ def eval_all_forecasts(data, pred, config): model_names = pred["model"].unique() forecast_starts = pred["forecast_start"].unique() - # only 'incident' type is evaluated # - incident_pred = pred.with_columns(quantile=0.5) - # This step is arbitrary, but it is necessary to pass PointForecast validation # - all_scores = pl.DataFrame() for score_name in score_names: @@ -24,21 +20,28 @@ def eval_all_forecasts(data, pred, config): for model in model_names: for forecast_start in forecast_starts: - pred_data = incident_pred.filter( + this_pred = pred.filter( pl.col("model") == model, pl.col("forecast_start") == forecast_start ) + # only 'incident' type is evaluated # + incident_pred = iup.CumulativeUptakeData(this_pred).to_incident( + config["data"]["groups"] + ) + incident_pred = incident_pred.with_columns(quantile=0.5) + incident_pred = iup.PointForecast(incident_pred) + # This step is arbitrary, but it is necessary to pass PointForecast validation # + test = data.filter( pl.col("time_end") >= forecast_start, pl.col("time_end") < config["timeframe"]["end"], ) - assert (pred_data["forecast_start"] == test["time_end"].min()).all() + assert (incident_pred["forecast_start"] == test["time_end"].min()).all() test = iup.IncidentUptakeData(test) - pred_data = iup.PointForecast(pred_data) - score = eval.score(test, pred_data, score_fun) + score = eval.score(test, incident_pred, score_fun) score = score.with_columns(score_fun=score_name, model=model) all_scores = pl.concat([all_scores, score]) @@ -60,7 +63,7 @@ def eval_all_forecasts(data, pred, config): pred_data = pl.scan_parquet(args.pred).collect() obs_data = pl.scan_parquet(args.obs).collect() - # ensure the same test data is used for all models + # ensure the same incident test data is used for all models obs_data = iup.CumulativeUptakeData.to_incident(config["groups"]) all_scores = eval_all_forecasts(obs_data, pred_data, config) From 8e2c8b196daac80140e38bb4b85e8b516725af11 Mon Sep 17 00:00:00 2001 From: Fuhan Yang Date: Wed, 15 Jan 2025 17:11:09 -0500 Subject: [PATCH 7/7] fix ruff --- iup/eval.py | 6 +++--- scripts/eval.py | 3 --- scripts/forecast.py | 6 +++--- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/iup/eval.py b/iup/eval.py index 2170d6d..ee4c27b 100644 --- a/iup/eval.py +++ b/iup/eval.py @@ -30,9 +30,9 @@ def check_date_match(data: IncidentUptakeData, pred: PointForecast): (data["time_end"] == pred["time_end"]).all() # 2. There should not be any duplicated date in either data or prediction. - assert not ( - any(data["time_end"].is_duplicated()) - ), "Duplicated dates are found in data and prediction." + assert not (any(data["time_end"].is_duplicated())), ( + "Duplicated dates are found in data and prediction." + ) def score( diff --git a/scripts/eval.py b/scripts/eval.py index 478c404..86a30a4 100644 --- a/scripts/eval.py +++ b/scripts/eval.py @@ -1,4 +1,3 @@ - import argparse import polars as pl @@ -33,7 +32,6 @@ def eval_all_forecasts(data, pred, config): incident_pred = incident_pred.with_columns(quantile=0.5) incident_pred = iup.PointForecast(incident_pred) - test = data.filter( pl.col("time_end") >= forecast_start, pl.col("time_end") < config["timeframe"]["end"], @@ -70,4 +68,3 @@ def eval_all_forecasts(data, pred, config): all_scores = eval_all_forecasts(obs_data, pred_data, config) all_scores.write_parquet(args.output) - diff --git a/scripts/forecast.py b/scripts/forecast.py index 17feebc..eb26c3e 100644 --- a/scripts/forecast.py +++ b/scripts/forecast.py @@ -7,7 +7,6 @@ def run_all_forecasts(clean_data, config) -> pl.DataFrame: - """Run all forecasts Returns: @@ -20,7 +19,7 @@ def run_all_forecasts(clean_data, config) -> pl.DataFrame: config["timeframe"]["interval"], eager=True, ) - + models = [getattr(iup.models, model_name) for model_name in config["models"]] assert all(issubclass(model, iup.models.UptakeModel) for model in models) @@ -66,7 +65,7 @@ def run_forecast( incident_train_data = iup.IncidentUptakeData( iup.IncidentUptakeData.split_train_test(incident_data, forecast_start, "train") ) - + # Fit models using the training data and make projections fit_model = model().fit(incident_train_data, grouping_factors) @@ -79,6 +78,7 @@ def run_forecast( return cumulative_projections + if __name__ == "__main__": p = argparse.ArgumentParser() p.add_argument("--config", help="config file", default="scripts/config.yaml")