-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtestreactor.py
executable file
·287 lines (162 loc) · 6.93 KB
/
testreactor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
"""Support for writing Twisted unit tests"""
from twisted.internet import reactor, selectreactor, base, task
from twisted.internet.selectreactor import _select as realselect
from twisted.python.runtime import seconds as realtime
from twisted.python import failure
from time import sleep as realsleep
import unittest
__all__ = [
"install", "uninstall", "ReactorTestCase", "TimeoutError", "EarlyExit"
]
class TimeoutError(AssertionError):
"""A requested timeout expired
This is a subclass of AssertionError so that unittest will treat tests
raising it as "Failed" rather than as being in "Error". So, you don't
need to trap and reraise this error; just let it pass through.
"""
class EarlyExit(AssertionError):
"""The run loop was exited sooner than expected (e.g. before a result)
This is a subclass of AssertionError so that unittest will treat tests
raising it as "Failed" rather than as being in "Error". So, you don't
need to trap and reraise this error; just let it pass through.
"""
# Note - the class below includes 'object' in its bases so that __mro__
# will be sane for super() use when mixed with other TestCase derivatives
#
class ReactorTestCase(unittest.TestCase, object):
"""Test case mixin that ensures the test reactor is ready for use
Note that if you include other TestCase-derived base classes in your
subclass' bases, you should include them *after* this class, to ensure
that this class' setUp/tearDown methods get called. (It will then call
the remaining classes' setUp and tearDown methods using super().)
"""
def setUp(self):
super(ReactorTestCase,self).setUp()
install()
def tearDown(self):
uninstall()
super(ReactorTestCase,self).tearDown()
class TestReactor(selectreactor.SelectReactor):
"""Add test-specific features to the Twisted default reactor"""
def start(self):
"""Reset the reactor completely
You don't need to use this if you use ReactorTestCase, which wraps your
tests in fresh restarts.
"""
if self.waker is not None:
# make sure that subsequent shutdowns won't disconnect an
# active waker, due to wakers assuming they're singletons :(
self.removeReader(self.waker)
self.__dict__.clear()
self.__init__()
self._simulated_time = realtime()
self._use_realtime = None
def waitFor(self, seconds, early_ok=True):
"""Run the event loop for `seconds` of simulated time
If `early_ok` is set to False, this raises EarlyExit if the runloop
exits before the time expires.
"""
finish = self.callLater(seconds, self.crash)
self.running = 1
self.mainLoop()
if finish.active():
finish.cancel()
if not early_ok:
raise EarlyExit("Reactor exited before %s seconds" % seconds)
def waitUntil(self, deferred, timeout):
"""Return a result or error from `deferred`
`timeout` is a timeout value in seconds of simulated time. If the
timeout elapses without a result or error from the deferred,
TimeoutError is raised. This can also raise EarlyExit if the runloop
is exited without the deferred having fired.
"""
finish = self.callLater(timeout, self.crash)
result = []
def callback(value):
if finish.active():
# Only stop the reactor if the original call
# is still active
finish.cancel()
self.crash()
result.append(value)
if isinstance(value,failure.Failure):
value = None # don't pass on an exception
# always return the value, so that any chained callbacks work
return value
deferred.addCallbacks(callback, callback) # call it either way
if not result: # don't infinite loop if the deferred has already fired
self.running = 1
self.mainLoop()
if result:
result, = result
if isinstance(result,failure.Failure):
result.raiseException()
return result
if finish.active():
finish.cancel()
raise EarlyExit("Reactor exited before a result was produced")
else:
raise TimeoutError("%s simulated seconds elapsed" % timeout)
def getTime(self):
"""Get the current simulated (or real) time"""
if self._use_realtime:
return realtime()
self._use_realtime = False # once we've read it, disallow changing it
return self._simulated_time
def setTime(self,seconds):
"""Set the current simulated time"""
if self._use_realtime:
raise AssertionError("Can't set time when using real time")
self._simulated_time = seconds
def sleep(self,seconds):
"""Advance the simulated clock, or sleep for the number of seconds"""
if self._use_realtime:
realsleep(seconds)
else:
self.setTime(self.getTime()+seconds)
def select(self,r,w,e,timeout=None):
"""Pretend the select() time elapses, then select for 0 seconds"""
if not self._use_realtime:
if timeout is not None:
self.sleep(timeout)
timeout = 0
return realselect(r,w,e,timeout)
def _setMode(self,mode):
if self._use_realtime is not None and self._use_realtime<>mode:
raise AssertionError(
"Cannot change clock mode without restarting reactor"
)
self._use_realtime = mode
def useRealTime(self):
"""Use wall-clock time for the current test"""
self._setMode(True)
def useSimulatedTime(self):
"""Use simulated time for the current test (default)"""
self._setMode(False)
def install():
"""Install and reset the test reactor"""
if not isinstance(reactor,TestReactor):
assert reactor.__class__ is selectreactor.SelectReactor, (
"testreactor can only be installed over the default reactor"
)
reactor.__class__ = TestReactor
reactor.start()
# If Twisted defined these as methods on reactors, we wouldn't
# need to monkeypatch their globals like this. :(
base.seconds = reactor.getTime
task.seconds = reactor.getTime
selectreactor.sleep = reactor.sleep
selectreactor._select = reactor.select
def uninstall():
"""Shut down and uninstall the test reactor"""
try:
if reactor.running:
reactor.stop()
finally:
if isinstance(reactor,TestReactor):
reactor.__class__ = selectreactor.SelectReactor
# Use real time
base.seconds = realtime
task.seconds = realtime
selectreactor.sleep = realsleep
selectreactor._select = realselect