Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/scalar with window #2021

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 29 additions & 10 deletions darts/models/forecasting/forecasting_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

from darts import metrics
from darts.dataprocessing.encoders import SequentialEncoder
from darts.dataprocessing.transformers import Scaler
from darts.logging import get_logger, raise_if, raise_if_not, raise_log
from darts.timeseries import TimeSeries
from darts.utils import _build_tqdm_iterator, _parallel_apply, _with_sanity_checks
Expand Down Expand Up @@ -603,6 +604,7 @@ def historical_forecasts(
show_warnings: bool = True,
predict_likelihood_parameters: bool = False,
enable_optimization: bool = True,
scaler: Scaler = None,
madtoinou marked this conversation as resolved.
Show resolved Hide resolved
JanFidor marked this conversation as resolved.
Show resolved Hide resolved
) -> Union[
TimeSeries, List[TimeSeries], Sequence[TimeSeries], Sequence[List[TimeSeries]]
]:
Expand Down Expand Up @@ -709,6 +711,8 @@ def historical_forecasts(
Default: ``False``
enable_optimization
Whether to use the optimized version of historical_forecasts when supported and available.
scaler
Parameter Scaler applied to each historical forecast.
JanFidor marked this conversation as resolved.
Show resolved Hide resolved

Returns
-------
Expand Down Expand Up @@ -846,6 +850,7 @@ def retrain_func(
verbose=verbose,
show_warnings=show_warnings,
predict_likelihood_parameters=predict_likelihood_parameters,
scaler=scaler,
)

if len(series) == 1:
Expand Down Expand Up @@ -959,24 +964,34 @@ def retrain_func(
if train_length_ and len(train_series) > train_length_:
train_series = train_series[-train_length_:]

# testing `retrain` to exclude `False` and `0`
if (
forecast_time_index_correct = (
madtoinou marked this conversation as resolved.
Show resolved Hide resolved
retrain
and historical_forecasts_time_index_train is not None
and historical_forecasts_time_index_train[0]
<= pred_time
<= historical_forecasts_time_index_train[-1]
):
)
is_retrain_func = retrain_func(
counter=_counter_train,
pred_time=pred_time,
train_series=train_series,
past_covariates=past_covariates_,
future_covariates=future_covariates_,
)
is_scalar_used = (
scaler is not None
and forecast_time_index_correct
and is_retrain_func
)
# testing `retrain` to exclude `False` and `0`
if forecast_time_index_correct:
# retrain_func processes the series that would be used for training
if retrain_func(
counter=_counter_train,
pred_time=pred_time,
train_series=train_series,
past_covariates=past_covariates_,
future_covariates=future_covariates_,
):
if is_retrain_func:
# avoid fitting the same model multiple times
model = model.untrained_model()
if is_scalar_used:
madtoinou marked this conversation as resolved.
Show resolved Hide resolved
train_series = scaler.fit_transform(train_series)

model._fit_wrapper(
series=train_series,
past_covariates=past_covariates_,
Expand Down Expand Up @@ -1032,6 +1047,9 @@ def retrain_func(
verbose=verbose,
predict_likelihood_parameters=predict_likelihood_parameters,
)
if is_scalar_used:
madtoinou marked this conversation as resolved.
Show resolved Hide resolved
forecast = scaler.inverse_transform(forecast)

if forecast_components is None:
forecast_components = forecast.columns

Expand Down Expand Up @@ -1977,6 +1995,7 @@ def _optimized_historical_forecasts(
verbose: bool = False,
show_warnings: bool = True,
predict_likelihood_parameters: bool = False,
scaler=None,
JanFidor marked this conversation as resolved.
Show resolved Hide resolved
) -> Union[
TimeSeries, List[TimeSeries], Sequence[TimeSeries], Sequence[List[TimeSeries]]
]:
Expand Down
2 changes: 2 additions & 0 deletions darts/models/forecasting/regression_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -1117,6 +1117,7 @@ def _optimized_historical_forecasts(
verbose: bool = False,
show_warnings: bool = True,
predict_likelihood_parameters: bool = False,
scaler=None,
JanFidor marked this conversation as resolved.
Show resolved Hide resolved
) -> Union[
TimeSeries, List[TimeSeries], Sequence[TimeSeries], Sequence[List[TimeSeries]]
]:
Expand Down Expand Up @@ -1168,6 +1169,7 @@ def _optimized_historical_forecasts(
overlap_end=overlap_end,
show_warnings=show_warnings,
predict_likelihood_parameters=predict_likelihood_parameters,
scaler=scaler,
JanFidor marked this conversation as resolved.
Show resolved Hide resolved
)
else:
return _optimized_historical_forecasts_regression_all_points(
Expand Down
4 changes: 4 additions & 0 deletions darts/tests/models/forecasting/test_regression_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
FutureCyclicEncoder,
PastDatetimeAttributeEncoder,
)
from darts.dataprocessing.transformers import Scaler
from darts.logging import get_logger
from darts.metrics import mae, rmse
from darts.models import (
Expand Down Expand Up @@ -1224,6 +1225,7 @@ def test_historical_forecast(self, mode):
overlap_end=False,
last_points_only=True,
verbose=False,
scaler=Scaler(),
)
assert len(result) == 21

Expand All @@ -1238,6 +1240,7 @@ def test_historical_forecast(self, mode):
overlap_end=False,
last_points_only=True,
verbose=False,
scaler=Scaler(),
)
assert len(result) == 21

Expand All @@ -1254,6 +1257,7 @@ def test_historical_forecast(self, mode):
overlap_end=False,
last_points_only=True,
verbose=False,
scaler=Scaler(),
)
assert len(result) == 21

Expand Down
65 changes: 40 additions & 25 deletions darts/utils/historical_forecasts/optimized_historical_forecasts.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pandas as pd
from numpy.lib.stride_tricks import sliding_window_view

from darts.dataprocessing.transformers import Scaler
from darts.logging import get_logger
from darts.timeseries import TimeSeries
from darts.utils.data.tabularization import create_lagged_prediction_data
Expand All @@ -31,6 +32,7 @@ def _optimized_historical_forecasts_regression_last_points_only(
overlap_end: bool = False,
show_warnings: bool = True,
predict_likelihood_parameters: bool = False,
scaler: Scaler = None,
) -> Union[
TimeSeries, List[TimeSeries], Sequence[TimeSeries], Sequence[List[TimeSeries]]
]:
Expand All @@ -40,6 +42,7 @@ def _optimized_historical_forecasts_regression_last_points_only(
Rely on _check_optimizable_historical_forecasts() to check that the assumptions are verified.
"""
forecasts_list = []

for idx, series_ in enumerate(series):
past_covariates_ = past_covariates[idx] if past_covariates is not None else None
future_covariates_ = (
Expand Down Expand Up @@ -113,7 +116,7 @@ def _optimized_historical_forecasts_regression_last_points_only(
)

# stride can be applied directly (same for input and historical forecasts)
X = X[0][::stride, :, 0]
X = X[0][::stride, :, 0] # shape ()

# repeat rows for probabilistic forecast
forecast = model._predict_and_sample(
Expand All @@ -122,6 +125,9 @@ def _optimized_historical_forecasts_regression_last_points_only(
predict_likelihood_parameters=predict_likelihood_parameters,
)
# forecast has shape ((forecastable_index_length-1)*num_samples, k, n_component)

# transpose to
# (k, (forecastable_index_length-1)*num_samples, n_component, 1)
# where k = output_chunk length if multi_models, 1 otherwise

# reshape into (forecasted indexes, n_components, n_samples), components are interleaved
Expand All @@ -137,22 +143,26 @@ def _optimized_historical_forecasts_regression_last_points_only(
:,
]

forecasts_list.append(
TimeSeries.from_times_and_values(
times=times[0]
if stride == 1 and model.output_chunk_length == 1
else generate_index(
start=hist_fct_start + (forecast_horizon - 1) * freq,
length=forecast.shape[0],
freq=freq * stride,
name=series_.time_index.name,
),
values=forecast,
columns=forecast_components,
static_covariates=series_.static_covariates,
hierarchy=series_.hierarchy,
)
forecast_value = TimeSeries.from_times_and_values(
times=times[0]
if stride == 1 and model.output_chunk_length == 1
madtoinou marked this conversation as resolved.
Show resolved Hide resolved
else generate_index(
start=hist_fct_start + (forecast_horizon - 1) * freq,
length=forecast.shape[0],
freq=freq * stride,
name=series_.time_index.name,
),
values=forecast,
columns=forecast_components,
static_covariates=series_.static_covariates,
hierarchy=series_.hierarchy,
)
is_scaler_used = len(model.lags.get("target", [])) != 0 and scaler is not None
if is_scaler_used:
scaling_values = series_[:hist_fct_tgt_end]
forecast_value = scaler.fit(scaling_values).transform(forecast_value)
forecasts_list.append(forecast_value)

return forecasts_list if len(series) > 1 else forecasts_list[0]


Expand All @@ -169,6 +179,7 @@ def _optimized_historical_forecasts_regression_all_points(
overlap_end: bool = False,
show_warnings: bool = True,
predict_likelihood_parameters: bool = False,
scaler: Scaler = None,
) -> Union[
TimeSeries, List[TimeSeries], Sequence[TimeSeries], Sequence[List[TimeSeries]]
]:
Expand Down Expand Up @@ -326,15 +337,19 @@ def _optimized_historical_forecasts_regression_all_points(
for idx_ftc, step_fct in enumerate(
range(0, forecast.shape[0] * stride, stride)
):
forecasts_.append(
TimeSeries.from_times_and_values(
times=new_times[step_fct : step_fct + forecast_horizon],
values=forecast[idx_ftc],
columns=forecast_components,
static_covariates=series_.static_covariates,
hierarchy=series_.hierarchy,
)
forecast_value = TimeSeries.from_times_and_values(
times=new_times[step_fct : step_fct + forecast_horizon],
values=forecast[idx_ftc],
columns=forecast_components,
static_covariates=series_.static_covariates,
hierarchy=series_.hierarchy,
)

is_scaler_used = (
len(model.lags.get("target", [])) != 0 and scaler is not None
)
if is_scaler_used:
scaling_values = series_[:hist_fct_tgt_end]
forecast_value = scaler.fit(scaling_values).transform(forecast_value)
forecasts_.append(forecast_value)
forecasts_list.append(forecasts_)
return forecasts_list if len(series) > 1 else forecasts_list[0]
33 changes: 33 additions & 0 deletions darts/utils/historical_forecasts/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import math
from copy import copy
from types import SimpleNamespace
from typing import Any, Callable, Optional, Tuple, Union

Expand All @@ -9,6 +11,7 @@
import numpy as np
import pandas as pd

from darts.dataprocessing.transformers import Scaler
from darts.logging import get_logger, raise_if_not, raise_log
from darts.timeseries import TimeSeries
from darts.utils.timeseries_generation import generate_index
Expand Down Expand Up @@ -661,3 +664,33 @@ def _get_historical_forecast_boundaries(
hist_fct_fc_start,
hist_fct_fc_end,
)


def window_fit_transform_array(scaler: Scaler, X: np.ndarray, stride: int):
time_dim_size = X.shape[0]
return np.array(
[
copy(scaler).fit_transform(TimeSeries.from_values(X[:i])).data_array()
for i in range(1, math.ceil(time_dim_size / stride))
]
).reshape(X.shape)


def window_scaled_forecasts(scaler: Scaler, X: np.ndarray, y: np.ndarray):
n_forecasts = y.shape[0]
y_shape = y.shape

scaling_inputs = [
TimeSeries.from_values(np.expand_dims(X[i], axis=1)) for i in range(n_forecasts)
]
scaled_forecasts = np.array(
[
copy(scaler)
.fit(scaling_inputs[: i + 1])
.transform(TimeSeries.from_values(np.expand_dims(y[i], axis=1)))
.data_array()
for i in range(n_forecasts)
]
).reshape(y_shape)

return scaled_forecasts