-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathstatsmodels.py
188 lines (159 loc) · 6.78 KB
/
statsmodels.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import pandas as pd
import numpy as np
import statsmodels
import statsmodels.tsa.holtwinters
import statsmodels.tsa.exponential_smoothing.ets
from typing import Literal
import time
from .base import Baseline
from ..base import BaseTask
class ExponentialSmoothingForecaster(Baseline):
__version__ = "0.0.3" # Modification will trigger re-caching
def __init__(
self,
trend: Literal["add", "mul", None] = "add",
seasonal: Literal["add", "mul", None] = "add",
):
"""
Get predictions from an Exponential Smoothing model.
Parameters:
-----------
trend: ["add", "mul", or None]
Whether to add a trend component to the forecast.
If "add", the component is additive, and if "mul", it is multiplicative.
seasonal: ["add", "mul", or None]
Whether to add a seasonal component to the forecast.
If "add", the component is additive, and if "mul", it is multiplicative.
Notes:
------
This model requires a seasonal periodicity, which it currently get from a
hard coded association from the data index frequency (hourly -> 24 hours periods).
"""
super().__init__()
self.trend = trend
self.seasonal = seasonal
def __call__(self, task_instance: BaseTask, n_samples: int) -> np.ndarray:
starting_time = time.time()
samples = self.forecast(
past_time=task_instance.past_time,
future_time=task_instance.future_time,
seasonal_periods=task_instance.seasonal_period,
n_samples=n_samples,
)
extra_info = {
"total_time": time.time() - starting_time,
}
return samples, extra_info
def forecast(
self,
past_time: pd.DataFrame,
future_time: pd.DataFrame,
seasonal_periods: int,
n_samples: int,
) -> np.ndarray:
"""
This method allows a forecast to be done without requiring a complete BaseTask instance.
This is primarly meant to be called inside a BaseTask constructor when doing rejection sampling or similar approaches.
Note: If seasonal_periods is <= 0, then the seasonal component is skipped.
"""
# With the trend, we will have 4 parameters to be fitted, so disable it if we don't have any points to fit with
# Note: this still requires an absolute minimum of 3 values in past_time
disable_trend = len(past_time) < 5
# Disable the periodic component if there is not at least two periods in history
if len(past_time) < 2 * seasonal_periods:
seasonal_periods = -1
# If there is no period, then disable the seasonal component of the model (seasonal_periods will be ignored)
model = statsmodels.tsa.holtwinters.ExponentialSmoothing(
endog=past_time[past_time.columns[-1]],
trend=self.trend if not disable_trend else None,
seasonal=self.seasonal if seasonal_periods >= 1 else None,
seasonal_periods=seasonal_periods,
)
result = model.fit()
simulations = result.simulate(
nsimulations=future_time.shape[0], repetitions=n_samples
)
return simulations.to_numpy().transpose()[..., None]
@property
def cache_name(self) -> str:
args_to_include = ["trend", "seasonal"]
return f"{self.__class__.__name__}_" + "_".join(
[f"{k}={getattr(self, k)}" for k in args_to_include]
)
class ETSModelForecaster(Baseline):
__version__ = "0.0.2" # Modification will trigger re-caching
def __init__(
self,
trend: Literal["add", "mul", None] = "add",
seasonal: Literal["add", "mul", None] = "add",
error: Literal["add", "mul"] = "add",
):
"""
Get predictions from an ETS (Error-Trend-Seasonality) model.
Parameters:
-----------
trend: ["add", "mul", or None]
Whether to add a trend component to the forecast.
If "add", the component is additive, and if "mul", it is multiplicative.
seasonal: ["add", "mul", or None]
Whether to add a seasonal component to the forecast.
If "add", the component is additive, and if "mul", it is multiplicative.
error: ["add", "mul"]
Configuration for the error component to the forecast.
If "add", the component is additive, and if "mul", it is multiplicative.
Notes:
------
This model requires a seasonal periodicity, which it currently get from a
hard coded association from the data index frequency (hourly -> 24 hours periods).
"""
super().__init__()
self.trend = trend
self.seasonal = seasonal
self.error = error
def __call__(self, task_instance: BaseTask, n_samples: int) -> np.ndarray:
starting_time = time.time()
samples = self.forecast(
past_time=task_instance.past_time,
future_time=task_instance.future_time,
seasonal_periods=task_instance.seasonal_period,
n_samples=n_samples,
)
extra_info = {
"total_time": time.time() - starting_time,
}
return samples, extra_info
def forecast(
self,
past_time: pd.DataFrame,
future_time: pd.DataFrame,
seasonal_periods: int,
n_samples: int,
) -> np.ndarray:
"""
This method allows a forecast to be done without requiring a complete BaseTask instance.
This is primarly meant to be called inside a BaseTask constructor when doing rejection sampling or similar approaches.
Note: If seasonal_periods is <= 0, then the seasonal component is skipped.
"""
# Disable the periodic component if there is not at least two periods in history
if len(past_time) < 2 * seasonal_periods:
seasonal_periods = -1
# If there is no period, then disable the seasonal component of the model (seasonal_periods will be ignored)
model = statsmodels.tsa.exponential_smoothing.ets.ETSModel(
endog=past_time[past_time.columns[-1]],
trend=self.trend,
seasonal=self.seasonal if seasonal_periods >= 1 else None,
error=self.error,
seasonal_periods=seasonal_periods,
)
# Avoid L-BFGS-B output spam
result = model.fit(disp=False)
simulations = result.simulate(
nsimulations=future_time.shape[0], repetitions=n_samples
)
return simulations.to_numpy().transpose()[..., None]
@property
def cache_name(self) -> str:
args_to_include = ["trend", "seasonal", "error"]
return f"{self.__class__.__name__}_" + "_".join(
[f"{k}={getattr(self, k)}" for k in args_to_include]
)