-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathScheduler.py
192 lines (147 loc) · 5.73 KB
/
Scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future import standard_library
standard_library.install_aliases()
from builtins import *
from builtins import object
from past.utils import old_div
import logging
import os
import errno
from utils.sentry import sentry_wrapper
from taskrunner.DownloadStatusScanner import download_status_scanner
FORMAT = '%(asctime)-15s %(module)s:%(lineno)d %(message)s'
logging.basicConfig(format=FORMAT, datefmt='%Y/%m/%d %H:%M:%S')
logger = logging.getLogger()
isDebug = os.getenv('DEBUG', False)
if isDebug:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
import platform
if platform.system() == 'Linux':
from twisted.internet import epollreactor
epollreactor.install()
else:
from twisted.internet import selectreactor
selectreactor.install()
from twisted.internet import reactor, threads
from yaml import safe_load
from utils.VideoManager import video_manager
from twisted.internet.task import deferLater
from utils.DownloadManager import download_manager
from taskrunner.InfoScanner import info_scanner
from taskrunner.FeedScanner import FeedScanner
from taskrunner.DmhyScanner import DmhyScanner
from taskrunner.BangumiMoeScanner import BangumiMoeScanner
from taskrunner.AcgripScanner import AcgripScanner
from taskrunner.LibyksoScanner import LibyksoScanner
from taskrunner.NyaaScanner import NyaaScanner
from taskrunner.UniversalScanner import UniversalScanner
from taskrunner.DeleteScanner import DeleteScanner
from rpc.rpc_interface import setup_server
from web_hook.keep_alive_checker import keep_alive_checker
class Scheduler(object):
def __init__(self):
fr = open('./config/config.yml', 'r')
config = safe_load(fr)
self.interval = int(config['task']['interval']) * 60
self.base_path = config['download']['location']
self.feedparser = config['feedparser']
self.delete_delay = {'bangumi': 10, 'episode': 1}
if config['task'].get('delete_delay') is None:
logger.warning('delete delay section is not set, please update your config file!')
else:
self.delete_delay = config['task']['delete_delay']
if 'universal' in config:
self.universal = config['universal']
else:
self.universal = None
try:
if not os.path.exists(self.base_path):
os.makedirs(self.base_path)
logger.info('create base dir %s successfully', self.base_path)
except OSError as exception:
if exception.errno == errno.EACCES:
# permission denied
raise exception
else:
logger.error(exception)
def start(self):
self.start_scan_bangumi_moe()
deferLater(reactor, int(old_div(self.interval, 4)), self.start_scan_dmhy)
deferLater(reactor, int(old_div(self.interval, 2)), self.start_scan_acgrip)
# libyk scanner don't have chance conflict with other scanner, so we can start simultaneously
self.start_scan_libykso()
self.start_scan_nyaa() # usually this doesn't conflict
# consider using universal instead of old dmhy scanner
self.start_scan_universal()
def scheduleFail(self, failure):
logger.error(failure)
def scheduleDone(self, result):
logger.error(result)
def start_scan_dmhy(self):
logger.debug('start dmhy')
dmhy_scanner = DmhyScanner(self.base_path, self.interval)
dmhy_scanner.start()
def start_scan_acgrip(self):
logger.debug('start acgrip')
acgrip_scanner = AcgripScanner(self.base_path, self.interval)
acgrip_scanner.start()
def start_scan_libykso(self):
logger.debug('start libykso')
libyk_scanner = LibyksoScanner(self.base_path, self.interval)
libyk_scanner.start()
def start_scan_bangumi_moe(self):
logger.debug('start bangumi_moe')
bangumi_moe_scanner = BangumiMoeScanner(self.base_path, self.interval)
bangumi_moe_scanner.start()
def start_scan_nyaa(self):
logger.debug('start nyaa')
nyaa_scanner = NyaaScanner(self.base_path, self.interval)
nyaa_scanner.start()
def start_scan_feed(self):
feed_scanner = FeedScanner(self.base_path)
feed_scanner.start()
def start_scan_delete(self):
delete_scanner = DeleteScanner(self.base_path, self.delete_delay)
delete_scanner.start()
def start_scan_universal(self):
if self.universal is None:
return
logger.debug('start universal')
for mode in self.universal:
universal_scanner = UniversalScanner(self.base_path, self.interval, mode)
universal_scanner.start()
scheduler = Scheduler()
video_manager.set_base_path(scheduler.base_path)
sentry_wrapper.scheduler_sentry()
def on_connected(result):
logger.info(result)
# set disconnect callback
download_manager.set_disconnect_cb(on_disconnect)
scheduler.start()
info_scanner.start()
download_status_scanner.start()
keep_alive_checker.start()
scheduler.start_scan_feed()
scheduler.start_scan_delete()
def on_connect_fail(result):
logger.error(result)
sentry_wrapper.client.captureMessage(result)
reactor.stop()
def on_disconnect():
sentry_wrapper.client.captureMessage('deluge disconnect unexpectedly')
reactor.stop()
def stop_tasks():
logger.info('shutting down...')
logger.info('stopping InfoScanner...')
info_scanner.stop()
d = download_manager.connect()
d.addCallback(on_connected)
d.addErrback(on_connect_fail)
setup_server()
reactor.addSystemEventTrigger('before', 'shutdown', stop_tasks)
reactor.run()