-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathjobs.py
73 lines (48 loc) · 1.5 KB
/
jobs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#!/usr/bin/env python
"""Cron-like scheduler."""
import os
import sys
import click
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from loguru import logger
job_defaults = {
'coalesce': False,
'max_instances': 4
}
scheduler = BlockingScheduler(job_defaults=job_defaults)
logger.remove(0)
logger.add(sys.stdout, format="{time} {level} {message}")
@click.group()
def cli():
pass
@cli.command()
def list():
scheduler.print_jobs()
@cli.command()
def run():
logger.info("Starting job scheduler")
scheduler.start()
def system(cmd: str):
logger.info(cmd)
status = os.system(cmd)
logger.info(cmd + " exited with status {}", status)
@scheduler.scheduled_job(CronTrigger(hour=1, minute=0, second=0))
def daily_reindex():
system("./bin/flask reindex")
@scheduler.scheduled_job(CronTrigger(hour=6, minute=0, second=0))
def daily_sync():
system("./bin/flask ldap-sync")
system("./bin/flask syncbi")
system("./bin/flask update-retard")
@scheduler.scheduled_job(CronTrigger(hour=0, minute=0, second=0))
def daily_recap():
system("./bin/flask send-notifications daily")
@scheduler.scheduled_job(CronTrigger(day_of_week="thu", hour=0, minute=0, second=0))
def weekly_notif():
system("./bin/flask send-notifications weekly")
@scheduler.scheduled_job(CronTrigger(day_of_week="tue", hour=0, minute=0, second=0))
def weekly_recap():
system("./bin/flask send-recap")
if __name__ == "__main__":
cli()