-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathalgorithm.py
216 lines (161 loc) · 8.29 KB
/
algorithm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
from cvxpy import *
import numpy as np
import cvxpy as cp
from time import perf_counter
# Defining weights
U_3_1 = 400
U_3_2 = 50
U_3_3 = 700
U_3_4 = 50
U_3_5 = 100
# Weight fxn used in term 3.5 (consistent weekly hours)
lambda_func = lambda x: np.exp(-0.2 * x)
# Mapping between rating and displeasure used in term 3.4 (minimize displeasure)
RATE_TO_DISPLEASURE_MAPPING = {1: 0, 2: 2, 3: 8, 4: 16, 5: 1e8}
def var_to_np(decision_var):
"""Converts decision variable (np array of cp.Variable objects) into np array of integers
Args:
decision_var (np.ndarray): np array of cp.Variable objects
Returns:
np.ndarray of decision variable results
"""
get_value = lambda var: var.value
vfunc = np.vectorize(get_value)
return vfunc(decision_var)
def run_algorithm(inputs):
input_oh_demand = inputs[0] # (# of future weeks, 5, 12)
input_previous_weeks_assignments = inputs[1] # (# of day one staff, # of past weeks, 5, 12)
input_staff_availabilities = inputs[2] # (# of all staff, 5, 12)
input_max_contig = inputs[3] # (# of all staff, )
input_target_total_future_hours = inputs[4] # (# of all staff, )
input_target_weekly_hours = inputs[5] # (# of all staff, )
input_preferred_contiguous_hours = inputs[6] # (# of all staff, )
input_changed_hours_weightings = inputs[7] # (# of day one staff, )
input_non_day_one_indices = inputs[8] # (# of non day one staff, )
print(input_previous_weeks_assignments.shape)
m = input_max_contig.shape[0]
m_non_day_ones = input_non_day_one_indices.shape[0] # WARNING: assumes non-day one staff are
# added to the end of each prev_assignments array
m_day_ones = m - m_non_day_ones
n = input_oh_demand.shape[0]
try:
p = input_previous_weeks_assignments.shape[1] # TODO: change later, but we probably don't want to perform look-behind for all prev weeks, fixed look behind sliding window keeps computational complexity down with minimal resulting tradeoff
except IndexError as e:
p = None
print("No previous weeks. Removing past consistency constraint.")
print("Setting up algorithm...")
# Define the decision variable
A = np.empty(shape = (m, n, 5, 12), dtype = object)
for i in range(m):
for j in range(n):
for k in range(5):
for l in range(12):
A[i, j, k, l] = cp.Variable(boolean=True)
# ---------------- Hard Constraints (CP constraints) ----------------
constraints = []
# 2.1: Staff time slot existence (At least 1 staff member per NONZERO time slot)
X_2_1 = np.sum(A, axis = 0)
for week_i in range(n):
for day_i in range(5):
for hour_i in range(12):
if input_oh_demand[week_i, day_i, hour_i] != 0:
constraints.append(X_2_1[week_i, day_i, hour_i] >= 1)
# 2.2: Maximum Contiguous Hours (TODO: OFF FOR NOW)
# for staff_i in range(m):
# for week_i in range(n):
# for day_i in range(5):
# window_size = (input_max_contig[staff_i] + 1)
# for start in range(12 - window_size + 1):
# total = 0
# for i in range(window_size):
# total += A[staff_i, week_i, day_i, start + i]
# constraints.append(total <= input_max_contig[staff_i])
# 2.3 (TESTING) no timeslot should have > 3 number of absences
X_2_3 = np.sum(A, axis=0)
for week_i in range(n):
for day_i in range(5):
for hour_i in range(12):
constraints.append(input_oh_demand[week_i, day_i, hour_i] - X_2_3[week_i, day_i, hour_i] <= 3)
# 2.4 (TEMP/TESTING) no one should be doing >3+ their target weekly hours
X_2_4 = A.sum((2, 3))
T = input_target_weekly_hours[:, None].repeat(n, axis=1)
for staff_i in range(m):
for week_i in range(n):
constraints.append(X_2_4[staff_i, week_i] - T[staff_i, week_i] <= 1)
# 2.5 (TEMP/TESTING) no assignments during times of 0 demand
X_2_5 = A.sum(0)
for week_i in range(n):
for day_i in range(5):
for hour_i in range(12):
if input_oh_demand[week_i, day_i, hour_i] == 0:
constraints.append(X_2_5[week_i, day_i, hour_i] == 0)
# 2.6 (TEMP/TESTING) ensure no one gets assigned to a slot they are unavailable for TODO
# ---------------- Soft Constraints (CP objective) ----------------
# 3.1: Minimize Maximum-Weekly-Hour
X = A.sum((2, 3)) # shape: (# of staff, # of remaining weeks)
T = input_target_weekly_hours[:, None].repeat(n, axis=1)
X_minus_T = X - T
T_minus_X = T - X
# Apply maximum on each variable with 0
term_3_1 = 0
for staff_i in range(m):
for week_i in range(n):
term_3_1 += cp.maximum(X_minus_T[staff_i, week_i], 0)
term_3_1 += cp.maximum(T_minus_X[staff_i, week_i], 0)
# 3.2 (w/o QC): Minimize Total Future Hour Violations Per Staff
X = A.sum((1, 2, 3)) # shape: (num staff, )
term_3_2 = 0
for staff_i in range(m):
term_3_2 += cp.maximum(X[staff_i] - input_target_total_future_hours[staff_i], 0)
# 3.3 (w/o QC): Minimize Total # of violations where we've assigned too few people in a slot
X = A.sum(0)
term_3_3 = 0
for week_i in range(n):
for day_i in range(5):
for hour_i in range(12):
term_3_3 += cp.maximum(input_oh_demand[week_i, day_i, hour_i] - X[week_i, day_i, hour_i], 0)
term_3_3 += cp.maximum(X[week_i, day_i, hour_i] - input_oh_demand[week_i, day_i, hour_i], 0)
# 3.4: Scheduling Assignment Displeasure
staff_availabilities_extended = input_staff_availabilities[:, None, :, :].repeat(n, axis=1)
apply_mapping = np.vectorize(lambda val: RATE_TO_DISPLEASURE_MAPPING[val])
mapped_staff_availabilities_extended = apply_mapping(staff_availabilities_extended)
multiplied = A * mapped_staff_availabilities_extended
term_3_4 = np.sum(multiplied)
# 3.5: Consistent Weekly Hours (w/o MIQP)
current_week = A[:, 0, :, :] # shape: (# of staff, 5, 12)
# Assuming input_previous_weeks_assignments[0] is the first week the OH scheduler ran
term_3_5 = 0
# Match current week with the past
if p:
prev_weeks_weights = list(reversed(list(map(lambda_func, np.arange(1, p + 1)))))
for prev_i in range(p):
for staff_i in range(m_day_ones):
# if a staff member doesn't work for a week, then skip them for that week
if input_previous_weeks_assignments[staff_i, prev_i].sum() == 0:
continue
for day_i in range(5):
for hour_i in range(12):
term_3_5 += cp.maximum(input_previous_weeks_assignments[staff_i, prev_i, day_i, hour_i] - \
current_week[staff_i, day_i, hour_i], 0) * \
prev_weeks_weights[prev_i] * \
(1 - input_changed_hours_weightings[staff_i])
# Match current week with future weeks
future_weeks_weights = list(map(lambda_func, np.arange(1, n + 1)))
for future_i in range(1, n):
for staff_i in range(m_day_ones):
for day_i in range(5):
for hour_i in range(12):
term_3_5 += cp.maximum(current_week[staff_i, day_i, hour_i] - A[staff_i, future_i, day_i, hour_i], 0) * future_weeks_weights[future_i]
obj = cp.Minimize(U_3_1 * term_3_1 + U_3_2 * term_3_2 + U_3_3 * term_3_3 + U_3_4 * term_3_4 + U_3_5 * term_3_5)
# Optimization Problem
print(f"Number of variables: {m * n * 5 * 12}")
print(f"Number of constraints: {len(constraints)}")
print("Running algorithm...")
start = perf_counter()
prob = Problem(obj, constraints)
prob.solve(verbose=False)
print(f"Algorithm status: {prob.status}. Objective value: {prob.value}")
print(f"Time elapsed: {perf_counter() - start}")
all_assignments = var_to_np(A)
np.save("assignments.npy", all_assignments)
return all_assignments[:, 0, :, :]