-
Notifications
You must be signed in to change notification settings - Fork 57
/
Copy pathrunner.py
429 lines (407 loc) · 14.2 KB
/
runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
from __future__ import annotations
import contextlib
import itertools
import json
import logging
import os
import sys
import tempfile
import time
import typing as t
from argparse import ArgumentParser
from datetime import datetime
from functools import partial
from importlib.metadata import EntryPoint
from pathlib import Path
import pebble.concurrent
from .exceptions import AocdError
from .models import _load_users
from .models import AOCD_CONFIG_DIR
from .models import NON_ANSWER
from .models import Puzzle
from .utils import _cli_guess
from .utils import AOC_TZ
from .utils import colored
from .utils import get_plugins
# from https://adventofcode.com/about
# every problem has a solution that completes in at most 15 seconds on ten-year-old hardware
DEFAULT_TIMEOUT: float = 60.
log: logging.Logger = logging.getLogger(__name__)
def main() -> t.NoReturn:
"""
Run user solver(s) against their inputs and render the results. Can use multiple
tokens to validate your code against multiple input datas.
"""
eps = get_plugins()
plugins = {ep.name: ep for ep in eps}
aoc_now = datetime.now(tz=AOC_TZ)
years = range(2015, aoc_now.year + int(aoc_now.month == 12))
days = range(1, 26)
users = _load_users()
parser = ArgumentParser(description="AoC runner")
parser.add_argument(
"-p",
"--plugins",
nargs="+",
choices=plugins,
default=list(plugins),
help=(
"List of plugins (solvers) to evaluate. "
"Runs against all available plugins by default."
),
)
parser.add_argument(
"-y",
"--years",
metavar=f"({years[0]}-{years[-1]})",
type=int,
nargs="+",
choices=years,
default=years,
help="AoC years to run. Runs all available by default.",
)
parser.add_argument(
"-d",
"--days",
metavar=f"({days[0]}-{days[-1]})",
type=int,
nargs="+",
choices=days,
default=days,
help="AoC days to run. Runs all 1-25 by default.",
)
group = parser.add_mutually_exclusive_group()
group.add_argument(
"-e",
"--example",
action="store_true",
help=(
"Run against examples, instead of against real user data. "
"This option is mutually exclusive with -u, because the sample "
"data is the same for all users."
),
)
group.add_argument(
"-u",
"--users",
nargs="+",
choices=users,
type=partial(_cli_guess, choices=users),
default=users,
help=(
"Users to run each plugin with (e.g. your google token, your reddit token, "
"your twitter token, your github token)."
),
)
parser.add_argument(
"-t",
"--timeout",
metavar="T",
type=int,
default=DEFAULT_TIMEOUT,
help=(
"Kill a solver if it exceeded this timeout, in seconds "
"(default: %(default)s). Can use value '0' to disable timeout."
),
)
parser.add_argument(
"-s",
"--no-submit",
action="store_false",
dest="autosubmit",
help=(
"Disable autosubmit. "
"By default, the runner will submit answers if necessary."
),
)
parser.add_argument(
"-r",
"--reopen",
action="store_true",
help="Open browser on NEW solves. Off by default.",
)
parser.add_argument(
"-q",
"--quiet",
action="store_true",
help=(
"Capture output from runner. "
"Can be used to suppress unwanted terminal output from a plugin."
),
)
parser.add_argument(
"-v",
"--verbose",
action="count",
help=(
"Increased logging (-v INFO, -vv DEBUG). "
"Default level is logging.WARNING."
),
)
args = parser.parse_args()
if not users:
path = AOCD_CONFIG_DIR / "tokens.json"
print(
"There are no datasets available to use.\n"
"Either export your AOC_SESSION or put some auth "
f"tokens into {path}",
file=sys.stderr,
)
sys.exit(1)
if not plugins:
print(
"There are no plugins available. Install some package(s) "
"with a registered 'adventofcode.user' entry-point.\n"
"See https://github.com/wimglenn/advent-of-code-sample "
"for an example plugin package structure.",
file=sys.stderr,
)
sys.exit(1)
if args.verbose is None:
log_level = logging.WARNING
elif args.verbose == 1:
log_level = logging.INFO
else:
log_level = logging.DEBUG
logging.basicConfig(level=log_level)
rc = run_for(
plugs=args.plugins,
years=args.years,
days=args.days,
datasets={k: users[k] for k in args.users},
example=args.example,
timeout=args.timeout,
autosubmit=args.autosubmit,
reopen=args.reopen,
capture=args.quiet,
)
sys.exit(rc)
def _timeout_wrapper(f, capture=False, timeout=DEFAULT_TIMEOUT, **kwargs):
# aocd.runner executes the user's solve in a subprocess, so that it can be reliably
# killed if it exceeds a time limit. you can't do that with threads.
func = pebble.concurrent.process(daemon=False, timeout=timeout)(_process_wrapper)
return func(f, capture, **kwargs)
def _process_wrapper(f, capture=False, **kwargs):
# used to suppress any output from the subprocess, if aoc was invoked with --quiet
with contextlib.ExitStack() as ctx:
if capture:
null = ctx.enter_context(open(os.devnull, "w"))
ctx.enter_context(contextlib.redirect_stderr(null))
ctx.enter_context(contextlib.redirect_stdout(null))
return f(**kwargs)
def run_with_timeout(
entry_point: EntryPoint,
timeout: float,
progress: str | None,
dt: float = 0.1,
capture: bool = False,
**kwargs: t.Any,
) -> tuple[str, str, float, str]:
"""
Execute a user solve, and display a progress spinner as it's running. Kill it if
the runtime exceeds `timeout` seconds.
"""
spinner = itertools.cycle(r"\|/-")
line = elapsed = format_time(0)
t0 = time.time()
func = entry_point.load()
future = _timeout_wrapper(func, capture=capture, timeout=timeout, **kwargs)
while not future.done():
if progress is not None:
line = "\r" + elapsed + " " + progress + " " + next(spinner)
sys.stderr.write(line)
sys.stderr.flush()
time.sleep(dt)
elapsed = format_time(time.time() - t0, timeout)
walltime = time.time() - t0
try:
a, b = future.result()
except Exception as err:
a = b = ""
error = repr(err)[:100]
else:
error = ""
# longest correct answer seen so far has been 57 chars
# that was the first example data from 2019/12/9 (i.e. the quine)
a = str(a)[:60]
b = str(b)[:60]
if progress is not None:
sys.stderr.write("\r" + " " * len(line) + "\r")
sys.stderr.flush()
return a, b, walltime, error
def format_time(t: float, timeout: float = DEFAULT_TIMEOUT) -> str:
"""
Used for rendering the puzzle solve time in color:
- green, if you're under a quarter of the timeout (15s default)
- yellow, if you're over a quarter but under a half (30s by default)
- red, if you're really slow (>30s by default)
"""
if t < timeout / 4:
color = "green"
elif t < timeout / 2:
color = "yellow"
else:
color = "red"
runtime = colored(f"{t: 7.2f}s", color)
return runtime
def run_one(
year: int,
day: int,
data: str,
entry_point: EntryPoint,
timeout: float = DEFAULT_TIMEOUT,
progress: str | None = None,
capture: bool = False,
) -> tuple[str, str, float, str]:
"""
Creates a temporary dir and change directory into it (restores cwd on exit).
Lays down puzzle input in a file called "input.txt" in this directory - user code
doesn't have to read this file if it doesn't want to, the puzzle input data will
also be passed to the entry_point directly as a string.
Execute user's puzzle solver (i.e. the `entry_point`) and capture the results.
Returns a 4-tuple of:
part a answer (computed by the user code)
part b answer (computed by the user code)
runtime of the solver (walltime)
any error message (str) if the user code raised exception, empty string otherwise
"""
prev = os.getcwd()
scratch = tempfile.mkdtemp(prefix=f"{year}-{day:02d}-")
os.chdir(scratch)
input_path = Path("input.txt")
assert not input_path.exists()
try:
input_path.write_text(data, encoding="utf-8")
a, b, walltime, error = run_with_timeout(
entry_point=entry_point,
timeout=timeout,
year=year,
day=day,
data=data,
progress=progress,
capture=capture,
)
finally:
input_path.unlink(missing_ok=True)
os.chdir(prev)
try:
os.rmdir(scratch)
except Exception as err:
log.warning("failed to remove scratch %s (%s: %s)", scratch, type(err), err)
return a, b, walltime, error
def run_for(
plugs: t.Collection[str],
years: t.Iterable[int],
days: t.Iterable[int],
datasets: t.Mapping[str, str],
example: bool = False,
timeout: float = DEFAULT_TIMEOUT,
autosubmit: bool = True,
reopen: bool = False,
capture: bool = False,
) -> int:
"""
Run with multiple users, multiple datasets, multiple years/days, and render the results.
"""
if timeout == 0:
timeout = float("inf")
aoc_now = datetime.now(tz=AOC_TZ)
eps = {ep.name: ep for ep in get_plugins() if ep.name in plugs}
matrix = itertools.product(years, days, plugs)
n_incorrect = 0
# padding values for alignment
wp = len(max(eps, key=len)) if eps else 3
wd = len(max(datasets, key=len)) if datasets else 8
for year, day, plugin in matrix:
if year == aoc_now.year and day > aoc_now.day:
continue
entry_point = eps[plugin]
puzzle = Puzzle(year, day)
if example:
autosubmit = False
examples = Puzzle(year, day).examples
datas = range(len(examples))
else:
datas = datasets
for dataset in datas:
if example:
data = examples[dataset].input_data
extra = examples[dataset].extra
if extra:
os.environ[f"AOCD_EXTRA"] = json.dumps(extra)
else:
token = datasets[dataset]
os.environ["AOC_SESSION"] = token
puzzle = Puzzle(year, day)
data = puzzle.input_data
title = puzzle.title
descr = f"example-{dataset + 1}" if example else dataset
progress = f"{year}/{day:<2d} - {title:<40} {plugin:>{wp}}/{descr:<{wd}}"
a, b, walltime, error = run_one(
year=year,
day=day,
data=data,
entry_point=entry_point,
timeout=timeout,
progress=progress,
capture=capture,
)
os.environ.pop(f"AOCD_EXTRA", None)
runtime = format_time(walltime, timeout)
line = " ".join([runtime, progress])
if a in NON_ANSWER and b in NON_ANSWER and not error:
a = b = ""
error = f"Skipping {year}/{day:<2d} (entry-point returned non-answers)"
if error:
assert a == b == ""
icon = colored("✖", "red")
n_incorrect += 1
line += f" {icon} {error}"
else:
for answer, part in zip((a, b), "ab"):
if day == 25 and part == "b":
# there's no part b on Christmas day, skip
continue
expected = None
try:
if example:
expected = getattr(examples[dataset], "answer_" + part)
else:
expected = getattr(puzzle, "answer_" + part)
except AttributeError:
post = part == "a" or (part == "b" and puzzle.answered_a)
if autosubmit and post:
try:
puzzle._submit(
value=answer,
part=part,
reopen=reopen,
quiet=True,
precheck=False,
)
except AocdError as err:
log.warning("error submitting - %s", err)
# Correct submission will have created the answer file
answer_path = getattr(puzzle, f"answer_{part}_path")
if answer_path.is_file():
expected = getattr(puzzle, "answer_" + part)
correct = expected is not None and str(expected) == answer
icon = colored("✔", "green") if correct else colored("✖", "red")
correction = ""
if not correct:
if expected is None:
icon = colored("?", "magenta")
correction = "(correct answer unknown)"
else:
correction = f"(expected: {expected})"
n_incorrect += 1
answer = f"{answer} {correction}"
if part == "a":
answer = answer.ljust(32)
if expected is None and example:
result = " " * 45
else:
result = f" {icon} part {part}: {answer}"
line += result
print(line)
return n_incorrect