Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline setup step 2: Fill forecast.py #93

Merged
merged 12 commits into from
Jan 15, 2025
6 changes: 3 additions & 3 deletions iup/eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ def check_date_match(data: IncidentUptakeData, pred: PointForecast):
(data["time_end"] == pred["time_end"]).all()

# 2. There should not be any duplicated date in either data or prediction.
assert not (any(data["time_end"].is_duplicated())), (
"Duplicated dates are found in data and prediction."
)
assert not (
any(data["time_end"].is_duplicated())
), "Duplicated dates are found in data and prediction."


def score(
Expand Down
14 changes: 8 additions & 6 deletions scripts/eval.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
for score_fun in score_funs:
score = eval.score(
incident_test_data, incident_projections, score_fun
)
print(f"{model=} {forecast_date=} {score_fun=} {score=}")
# save these scores somewhere
import iup.eval


def function_name(incident_test_data, incident_projections, score_funs):
for score_fun in score_funs:
score = iup.eval.score(incident_test_data, incident_projections, score_fun)
# save these scores somewhere
print(score)
75 changes: 51 additions & 24 deletions scripts/forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import polars as pl
import yaml

def run_all_forecasts() -> pl.DataFrame:
import iup.models


def run_all_forecasts(clean_data, config) -> pl.DataFrame:
"""Run all forecasts

Returns:
Expand All @@ -18,53 +21,77 @@ def run_all_forecasts() -> pl.DataFrame:
models = [getattr(iup.models, model_name) for model_name in config["models"]]
assert all(issubclass(model, iup.models.UptakeModel) for model in models)

all_forecast = pl.DataFrame()

for model in models:
for forecast_date in forecast_dates:
# Get data available as of the forecast date
for forecast_date in forecast_dates:
# Get data available as of the forecast date
forecast = run_forecast(
model,
clean_data,
grouping_factors=config["groups"],
forecast_start=forecast_date,
forecast_end=config["timeframe"]["end"],
)

forecast = forecast.with_columns(
forecast_start=forecast_date,
forecast_end=config["timeframe"]["end"],
model=pl.lit(model.__name__),
)

all_forecast = pl.concat([all_forecast, forecast])

return all_forecast


def run_forecast(
model,
observed_data,
grouping_factors,
forecast_start,
forecast_end,
) -> pl.DataFrame:
"""Run a single model for a single forecast date"""

# preprocess.py returns cumulative data, need to convert to incidence for LinearIncidentUptakeModel #
incident_data = iup.CumulativeUptakeData(observed_data).to_incident(
grouping_factors
)

def run_forecast() -> pl.DataFrame:
"""Run a single model for a single forecast date"""
incident_train_data = iup.IncidentUptakeData(
iup.IncidentUptakeData.split_train_test(
incident_data, config["timeframe"]["start"], "train"
)
iup.IncidentUptakeData.split_train_test(incident_data, forecast_start, "train")
)

# Fit models using the training data and make projections
fit_model = model().fit(incident_train_data, grouping_factors)

cumulative_projections = fit_model.predict(
config["timeframe"]["start"],
config["timeframe"]["end"],
forecast_start,
forecast_end,
config["timeframe"]["interval"],
grouping_factors,
)
# save these projections somewhere

incident_projections = cumulative_projections.to_incident(
grouping_factors
)
# save these projections somewhere

# Evaluation / Post-processing --------------------------------------------
incident_projections = cumulative_projections.to_incident(grouping_factors)

incident_test_data = iup.IncidentUptakeData(
iup.IncidentUptakeData.split_train_test(
incident_data, config["timeframe"]["start"], "test"
)
).filter(pl.col("date") <= config["timeframe"]["end"])
# Note that here returns incident projections only, for evaluation
return incident_projections
Copy link
Collaborator

@eschrom eschrom Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should return cumulative projections here. The reason is that incident data for a partial season loses information. Suppose you are making forecasts from Jan 1 - May 31. Cumulative forecasts contain all the incident information, but incident forecasts "forget" what the total cumulative uptake had been up to Jan 1. So if you convert to incident here, it will be harder to convert back to cumulative later (e.g. for calculating end-of-season absolute error) - you'll have to look up the observed data on Jan 1 again. But if you keep the cumulative version here, it is easy to convert to incident later (e.g. for calculating mspe).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. After all our discussions, my thinking now is that the inputs and outputs from the models should be in cumulative space (although many of the models might internally convert back and forth with incident)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just looked back at your eval.py code, and I realized that end-of-season absolute error is, more specifically the error in cumulative uptake since the forecast date. There is no error before the forecast date - that data is known. So your approach makes more sense to me now.

I still propose we follow the convention only to pass cumulative data around and only to convert to incident at the points it's required, but I wanted you to know that I resolved some of my own confusion!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that cumulative projections are more informative and should pass along. Just fixed this c2cbdd0. As #92 and #98 are merged, I think we can merge this one if there is no other issue.



if __name__ == "__main__":
p = argparse.ArgumentParser()
p.add_argument("--config", help="config file", default="scripts/config.yaml")
p.add_argument("--input", help="input data")
p.add_argument("--output", help="output parquet file")
args = p.parse_args()

with open(args.config, "r") as f:
config = yaml.safe_load(f)

input_data = pl.scan_parquet(args.input)
input_data = pl.scan_parquet(args.input).collect()

input_data = iup.CumulativeUptakeData(input_data)

run_all_forecasts(config=config, cache=args.cache)
all_forecast = run_all_forecasts(config=config, clean_data=input_data)
all_forecast.write_parquet(args.output)
Loading