-
Notifications
You must be signed in to change notification settings - Fork 93
/
Copy pathholidays_this_week.py
executable file
·65 lines (51 loc) · 2.56 KB
/
holidays_this_week.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
"""Returns the amount of US holidays for a given week"""
from h2oaicore.transformer_utils import CustomTransformer, convert_to_datetime
import datatable as dt
import numpy as np
import pandas as pd
import holidays
class HolidaysThisWeek(CustomTransformer):
_unsupervised = True
_modules_needed_by_name = ['holidays']
_display_name = 'HolidaysThisWeek'
@staticmethod
def get_default_properties():
return dict(col_type="date", min_cols=1, max_cols=1, relative_importance=1)
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.time_column = self.input_feature_names[0]
self.country = "US"
def fit(self, X, y=None, **fit_params):
self._output_feature_names = ["HolidaysThisWeek:%s" % self.time_column]
self._feature_desc = ["Amount of holidays in current week for %s" % self.time_column]
X = X[:, self.time_column].to_pandas()
X[self.time_column] = convert_to_datetime(X[self.time_column], self.datetime_formats[self.time_column])
mn_year = X[self.time_column].dt.year.min()
mx_year = X[self.time_column].dt.year.max()
if np.isnan(mn_year) or np.isnan(mx_year):
years = []
else:
years = np.arange(int(mn_year), int(mx_year + mx_year - mn_year + 2))
hdays = holidays.CountryHoliday(self.country)
for year in list(years):
hdays._populate(year)
hdays.observed = True
hdays = [date for date, name in sorted(hdays.items())]
self.memo = pd.DataFrame(hdays, columns=[self.time_column], dtype='datetime64[ns]')
self.memo['year'] = self.memo[self.time_column].dt.year
self.memo['week'] = self.memo[self.time_column].dt.weekofyear
self.memo.drop(self.time_column, axis=1, inplace=True)
self.memo = self.memo.groupby(by=['year', 'week'], as_index=False).size()
self.memo.rename(columns={'size': self._output_feature_names[0]}, inplace=True)
return self
def transform(self, X, **kwargs):
X = X[:, self.time_column].to_pandas()
X[self.time_column] = convert_to_datetime(X[self.time_column], self.datetime_formats[self.time_column])
X['year'] = X[self.time_column].dt.year
X['week'] = X[self.time_column].dt.weekofyear
X.drop(self.time_column, axis=1, inplace=True)
X = X.merge(self.memo, how='left', on=['year', 'week']).fillna(0)
X = X[[self._output_feature_names[0]]].astype(int)
return X
def fit_transform(self, X, y=None, **fit_params):
return self.fit(X, y, **fit_params).transform(X)