For basic uses of the Trellis, no event loop is needed. Rules simply re-run whenever their input values change. However, for programs that require timeouts or delays, or want to do multi-tasking (either using threads or co-routine generators), a little more sophistication is required.
The peak.events.activity
module provides services that can wrap just about
any sort of event loop (e.g. Twisted, wx, etc.), and allow you to implement
your own as well. It also provides various "time" services for managing
timeouts and delays, that integrate with the event loop services to fire rules
at appropriate times. Last, but not least, it provides a co-operative
multitasking facility that lets you create and run as many pseudo-threads as
you want, using Python generator functions.
>>> from peak.events import trellis, activity
Table of Contents
Real-life applications often need to work with intervals of physical or "real" time, not just logical "Trellis time". In addition, they often need to manage sequential or simultaneous activities. For example, a desktop application may have background tasks that perform synchronization, download mail, etc. A server application may have logical tasks handling requests, and so on. These activities may need to start or stop at various times, manage timeouts, display or log progress, etc.
So, the peak.events.activity
module includes support for time tracking as
well as controlling activities and monitoring their progress.
The Trellis measures time using "timers". A timer represents a moment in time, but you can't tell directly what moment it represents. All you can do is measure the interval between two timers, or tell whether the moment defined by a timer has been reached.
The "zero" timer is activity.EPOCH
, representing an arbitrary starting
point in relative time:
>>> t = activity.EPOCH >>> t <...activity._Timer object at ...>
As you can see, timer objects aren't very informative by themselves. However, you can use subscripting to create new timers relative to an existing timer, and subtract timers from each other to produce an interval in seconds, e.g.:
>>> t10 = t[10] >>> t10 - t 10 >>> t10[-10] - t 0 >>> t10[3] - t 13
Timers compare equal to one another, if and only if they represent the same moment:
>>> t==t True >>> t!=t False >>> t10[-10] == t True >>> t10[-10] != t False
And the other comparison operators work on timers according to their relative positions in time, e.g.:
>>> t[-1] < t <= t[1] True >>> t[-1] > t[-2] True >>> t[-2] > t[-1] False >>> t[-1] >= t[-1] True >>> t<=t True >>> t<=t[1] True >>> t<=t[-1] False
Of course, if arithmetic were all you could do with timers, they wouldn't be very useful. Their real value comes when you perform dynamic time calculations, to answer questions like "How long has it been since X happened?", or "Has Y seconds elapsed since X happened?" And of course, we want any rules that ask these questions to be recalculated if the answers change!
This is where the activity.Time
service comes into play. The Time
class is a context.Service
(see the Contextual docs for more details)
that tracks the current time, and takes care of letting the Trellis know when a
rule needs to be recalculated because of a change in the current time.
By default, the Time
service uses time.time()
to track the current
time, whenever a trellis value is changed. But to get consistent timings
while testing, we'll turn this automatic updating off:
>>> from peak.events.activity import Time >>> Time.auto_update = False
With auto-update off, the time will only advance if we explicitly call
Time.tick()
or Time.advance()
. tick()
updates the current time
to match time.time()
, while Time.advance()
moves the time ahead by a
specified amount (so you can run tests in "simulated time" with perfect
repeatability).
So now let's do some dynamic time calculations. In most programs, what you need to know in a rule is whether a certain amount of time has elapsed since something has happened, or whether a certain future time has arrived.
To do that, you can simply create a timer for the desired moment, and check its boolean (truth) value:
>>> twenty = Time[20] # go off 20 secs. from now >>> bool(twenty) # but we haven't gone off yet False >>> Time.advance(5) >>> bool(twenty) # not time yet... False >>> Time.advance(15) # bingo! >>> bool(twenty) True >>> Time.advance(7) >>> bool(twenty) # remains true even after the exact moment has passed True
And of course, you can use this boolean test in a rule, to trigger some action:
>>> class AlarmClock(trellis.Component): ... timeout = trellis.attr(None) ... trellis.maintain() ... def alert(self): ... if self.timeout: ... print "timed out!" >>> clock = AlarmClock(timeout=Time[20]) >>> Time.advance(15) >>> Time.advance(15) timed out!
Notice, by the way, that the Time
service can be subscripted with a value
in seconds, to get a timer representing that many seconds from the current
time. (However, Time
is not really a timer object, so don't try to use it
as one!)
This alarm implementation works by getting a future timer (timeout
), and
then "goes off" when that future moment is reached. However, we can also
create an "elapsed" timer, and trigger when a certain amount of time has
passed:
>>> class Elapsed(trellis.Component): ... duration = trellis.attr(20) ... has_run_for = trellis.maintain(lambda self: Time[0]) ... trellis.maintain() ... def alarm(self): ... if self.has_run_for[self.duration]: ... print "timed out!" >>> t = Elapsed() # Capture a start time >>> Time.advance(15) # duration is 20, so no alarm yet >>> t.duration = 10 # duration changed, and already reached timed out! >>> t.duration = 15 # duration changed, but still reached timed out! >>> t.duration = 20 # not reached yet... >>> Time.advance(5) timed out!
As you can see, the has_run_for
attribute is a timer that records the
moment when the Elapsed
instance is created. The alarm
rule is then
recalculated whenever the duration
changes -- or elapses.
Of course, in complex programs, one usually needs to be able to measure the amount of time that some condition has been true (or false). For example, how long a process has been idle (or busy):
>>> class IdleTimer(trellis.Component): ... trellis.attrs( ... idle_timeout = 20, ... busy = False, ... ) ... idle_for = trellis.maintain( ... lambda self: self.idle_for.begins_with(not self.busy), ... initially = activity.NOT_YET ... ) ... trellis.maintain() ... def alarm(self): ... if self.idle_for[self.idle_timeout]: ... print "timed out!"
The way this code works, is that initially the idle_for
timer is equal to
the special NOT_YET
value, representing a moment that will never be
reached.
The begins_with()
method of timer objects takes a boolean value. If the
value is false, NOT_YET
is returned. If the value is true, the lesser of
the existing timer value or Time[0]
(the present moment) is returned.
Thus, a statement like:
a_timer = a_timer.begins_with(condition)
ensures that a_timer
equals the most recent moment at which condition
was observed to become true. (Or NOT_YET
, in the case where condition
is false.)
So, the IdleTimer.alarm
rule effectively checks whether busy
has been
false for more than idle_timeout
seconds. If busy
is currently true,
then self.idle_for
will be NOT_YET
, and subscripting NOT_YET
always returns NOT_YET
. Since NOT_YET
is a moment that can never be
reached, the boolean value of the expression is always false while busy
is true.
Let's look at the IdleTimer
in action:
>>> it = IdleTimer() >>> it.busy = True >>> Time.advance(30) # busy for 30 seconds >>> it.busy = False >>> Time.advance(10) # idle for 10 seconds, no timeout yet >>> Time.advance(10) # ...20 seconds! timed out! >>> Time.advance(15) # idle 35 seconds, no new timeout >>> it.busy = True # busy again >>> Time.advance(5) # for 5 seconds... >>> it.busy = False >>> Time.advance(30) # idle 30 seconds, timeout! timed out! >>> it.idle_timeout = 15 # already at 30, fires again timed out! >>> print Time.next_event_time() None
In our examples, we've been manually updating the time. But if auto_update
is true, then the time automatically advances whenever a trellis value is
changed:
>>> Time.auto_update = True >>> c = trellis.Cell() >>> c.value = 42 >>> now = Time[0] >>> from time import sleep >>> sleep(0.1) >>> now == Time[0] # time hasn't actually moved forward yet... True >>> c.value = 24 >>> now == Time[0] # but now it has, since a recalculation occurred False
This ensures that any rules that use a current time value, or that are waiting for a timeout, will see the correct time.
Note, however, that if your application doesn't change any trellis values for a
long time, then any pending timeouts may not fire for an excessive period of
time. You can, however, force an update to occur by using the Time.tick()
method:
>>> now = Time[0] >>> sleep(0.1) >>> now == Time[0] # time hasn't actually moved forward yet... True >>> Time.tick() >>> now == Time[0] # but now it has! False
So, an application's main loop can call Time.tick()
repeatedly in order to
ensure that any pending timeouts are being fired.
You can reduce the number of tick()
calls significantly, however, if you
make use of the next_event_time()
method. If there are no scheduled events
pending, it returns None
:
>>> print Time.next_event_time() None
But if anything is waiting, like say, our IdleTimeout
object from the
previous section, it returns the relative or absolute time of the next time
tick()
will need to be called:
>>> Time.auto_update = False >>> it = IdleTimer(idle_timeout=30) >>> Time.next_event_time(relative=True) 30.0 >>> when = activity.EPOCH[Time.next_event_time(relative=False)] >>> when - Time[0] 30.0 >>> Time.advance(30) timed out!
(We can't show the absolute time in this example, because it would change every
time this document was run. But we can offset it from the EPOCH
, and then
subtract it from the current time, to prove that it's equal to an absolute time
30 seconds after the current time.)
Armed with this method, you can now write code for your application's event
loop that calls tick()
at the appropriate interval. You will simply need
to define a Trellis rule somewhere that monitors the next_event_time()
and
schedules a call to Time.tick()
if the next event time is not None. You
can use whatever scheduling mechanism your application already includes, such
as a wx.Timer
or Twisted's reactor.callLater
, etc.
When the scheduled call to tick()
occurs, your monitoring rule will be
run again (because next_event_time()
depends on the current time), thus
repeating the cycle as often as necessary.
Note, however, that your rule may be run again before the scheduled
tick()
occurs, and so may end up scheduling extra calls to tick()
.
This should be harmless, however, but if you want to avoid the repeats you can
always write your rule so that it updates the existing scheduled call time, if
one is pending. (E.g. by updating the wx.Timer
or changing the Twisted
"appointment".)
We'll talk more about the interaction between Time
and event loops in the
section on Creating A Custom Event Loop.
The activity.EventLoop
service allows you to write components that will
run under any event-driven framework that has an EventLoop
implementation.
It provides a framework-independent way to request that a function be called
at an "idle" moment. (This mainly used to support Co-operative Multitasking,
as will be described in a later section.)
EventLoop
service instances have the following methods:
- call(func, *args, **kw)
- Call the given function at the next idle moment. What "idle" means depends on the specific event loop implementation, but it should generally mean "as soon as possible after all currently pending events/callbacks are processed". Callbacks are invoked in the exact order they are registered in.
- run()
- Run the event loop until
stop()
is called, or the event loop exits for some other reason. (Raises an error if the event loop is already running.) - stop()
- Request that the event loop stop as soon as possible. Whether any
outstanding
call()
callbacks are processed first is implementation- defined. - poll()
- Try to run one outstanding
call()
callback, if one is pending. - flush(count=None)
- Run up to count outstanding
call()
callbacks, if any are pending. If count isNone
, run all pending callbacks.
The poll()
and flush()
are mainly intended for your convenience when
writing tests of code that would ordinarily be run inside an event loop. In
other words, you'll generally use them in place of run()
calls when
testing your components. We'll see plenty of examples of this when we get
to the section on Co-operative Multitasking.
Let's take a look at an example of using the default EventLoop
implementation:
>>> def hello(*args, **kw): ... print "called with", args, kw >>> from peak.events.activity import EventLoop >>> EventLoop.call(hello, 1, a='b') >>> EventLoop.call(hello, 2) >>> EventLoop.call(hello, this=3) >>> EventLoop.call(EventLoop.stop) >>> EventLoop.run() called with (1,) {'a': 'b'} called with (2,) {} called with () {'this': 3}
As you can see, the hello()
function was called back three times with the
various arguments we requested, then a call to the stop()
method caused
the run()
to exit. But you can't stop an already stopped loop:
>>> EventLoop.stop() Traceback (most recent call last): ... AssertionError: EventLoop isn't running
Or run()
one that's already running:
>>> EventLoop.call(EventLoop.run) >>> EventLoop.run() Traceback (most recent call last): ... AssertionError: EventLoop is already running
You can check an event loop's status using its running
and
stop_requested
attributes, which are both normally false:
>>> @trellis.Performer ... def LoopWatch(): ... print "Running =", EventLoop.running ... print "Stopping =", EventLoop.stop_requested ... print "----------------" Running = False Stopping = False ---------------- >>> EventLoop.call(EventLoop.stop) >>> EventLoop.run() Running = True Stopping = False ---------------- Running = True Stopping = True ---------------- Running = False Stopping = False ---------------- >>> del LoopWatch # don't print this stuff out any more
As you can see, the running
attribute turns true once the event loop starts
running. When stop()
is called, the stop_requested
attribute becomes
true, and then both running
and stop_requested
return to their normal
values.
Unless you have a relatively simple program or are writing tests, you probably
won't use the default EventLoop
implementation. More likely, you'll use
something like the Twisted or wxPython event loops:
>>> from peak.events.activity import TwistedEventLoop, WXEventLoop
You'll need to install the appropriate event loop service before your program makes any use of it (or else create a new service context; see the Contextual docs for more details). In the simplest cases this can be accomplished by adding a line like this near the beginning of your program:
activity.EventLoop <<= TwistedEventLoop
This configures the EventLoop
service to create a TwistedEventLoop
in place of the default implementation. EventLoop
API calls will then be
routed to the Twisted reactor, as appropriate.
Note that if you use the TwistedEventLoop
, you must first configure your
desired reactor implementation before you use any EventLoop
APIs.
Similarly, if you use the WXEventLoop
, you must create your wx.App
first.
(If you are using both Twisted and wxPython in the same application, we suggest
using Twisted's wxreactor
with the TwistedEventLoop
.)
If you need to use an event-driven framework other than Twisted or wxPython,
and someone else hasn't already implemented an EventLoop
service for it,
you'll need to see the section on Creating A Custom Event Loop to find out
how to roll your own.
The Trellis allows for a limited form of co-operative multitasking, using
generator functions. By declaring a generator function as a @task
method,
you can get it to run across multiple trellis recalculations, retaining its
state along the way. For example:
>>> class TaskExample(trellis.Component): ... trellis.attrs( ... start = False, ... stop = False ... ) ... @activity.task ... def demo(self): ... print "waiting to start" ... while not self.start: ... yield activity.Pause ... print "starting" ... while not self.stop: ... print "waiting to stop" ... yield activity.Pause ... print "stopped" >>> t = TaskExample() >>> EventLoop.flush() # this wouldn't be needed if we were *in* the loop! waiting to start >>> t.start = True >>> EventLoop.flush() # this wouldn't be needed if we were *in* the loop! starting waiting to stop >>> t.stop = True >>> EventLoop.flush() # this wouldn't be needed if we were *in* the loop! stopped
The @activity.task
decorator is used to turn a generator function into a
co-routine that will run as a semi-independent task. An activity.TaskCell
will be created for the corresponding attribute when an instance of the
enclosing class is created (unless you also use @trellis.optional
to mark
it as an optional attribute).
When the TaskCell
is created, an EventLoop.call()
is used to request
that the generator be iterated when possible. Each iteration of the generator
is run as if it were a @modifier
; that is, the effects of changes made
during one iteration of the generator will not be seen until a subsequent
iteration. So the generator can yield a special activity.Pause
in order to
suspend itself until a cell it depends on has changed.
In the above example, the task initially depends on the value of the start
cell, so it is not resumed until start
is set to True
. Then it prints
"starting", and waits for self.stop
to become true.
We then set stop
to true, which causes the loop to exit. The task is now
finished, and any further changes will not re-invoke it.
(By the way, notice that we had to call EventLoop.flush()
a few times in
order to advance the generator. That's because the generator is advanced via
an EventLoop.call()
that's registered whenever the conditions the generator
depends on have changed. If this were a real application, running under
EventLoop.run()
, we wouldn't need to do this since the callbacks would be
flushed regularly. But when you test code like this example, you may wish
to explicitly flush callbacks instead of trying to run an entire event loop
before checking your results.)
Tasks can invoke or "call" other generators by yielding them. For example, we can rewrite our example like this, for more modularity:
>>> class TaskExample(trellis.Component): ... trellis.attrs( ... start = False, ... stop = False ... ) ... ... def wait_for_start(self): ... print "waiting to start" ... while not self.start: ... yield activity.Pause ... ... def wait_for_stop(self): ... while not self.stop: ... print "waiting to stop" ... yield activity.Pause ... ... @activity.task ... def demo(self): ... yield self.wait_for_start() ... print "starting" ... yield self.wait_for_stop() ... print "stopped" >>> t = TaskExample() >>> EventLoop.flush() waiting to start >>> t.start = True >>> EventLoop.flush() starting waiting to stop >>> t.stop = True >>> EventLoop.flush() stopped
Yielding a generator from a @task
causes that generator to temporarily
replace the main generator -- until the child generator returns, yields a
non-Pause
value, or raises an exception. At that point, control returns to
the "parent" generator. Subtasks may be nested to any depth.
If you are targeting Python 2.5 or higher, you don't need to do anything special to receive values yielded by subtasks, or to ensure that subtask exceptions are propagated. You can receive return values using expressions like:
result = yield someGenerator(someArgs)
However, in earlier versions of Python, this syntax doesn't exist, so you must
use the special activity.resume()
function instead, e.g.:
yield someGenerator(someArgs); result = activity.resume()
If you are writing code intended to run on Python 2.3 or 2.4 (as well as 2.5),
you should always call activity.resume()
immediately after a subtask
invocation (preferably on the same line, as shown), even if you don't need to
get the result. E.g.:
yield someGenerator(someArgs); activity.resume()
The reason you should do this is that Python versions before 2.5 do not allow
you to pass exceptions into a generator, so the Trellis can't cause the
yield
statement to propagate an error from someGenerator()
. If the
subtask raised an exception, it will silently vanish unless the resume()
function is called.
The reason to put it on the same line as the yield is so that you can see the
subtask call in the error's traceback, instead of just a line saying
activity.resume()
! (Note, by the way, that it's perfectly valid to use
activity.resume()
in code that will also run under Python 2.5; it's just
redundant unless the code will be used with older Python versions as well.)
The ability to receive values from a subtask lets you create utility functions that wait for events to occur in some non-Trellis system. For example, you could create a function like this, to let you wait for a Twisted "deferred" to fire:
def wait_for(deferred): result = trellis.Cell(None, activity.Pause) deferred.addBoth(result.set_value) # firing will set the result cell while result.value is activity.Pause: yield activity.Pause if isinstance(result.value, failure.Failure): try: result.value.raiseException() finally: del result # get rid of the traceback reference cycle yield activity.Return(result.value)
You would then use it like this (Python 2.5+ only):
result = yield wait_for(someTwistedFuncReturningADeferred(...))
Or like this (compatible with earlier Python versions):
yield wait_for(someTwistedFunc(...)); result = activity.resume()
This example wait_for()
function creates a cell and adds its set_value()
method as a callback to the deferred, to receive either a value or an error.
It then waits until the callback occurs, by yielding Pause
objects. If the
result is a Twisted Failure
, it raises the exception represented by the
failure. Otherwise, it wraps the result in a activity.Return()
and yields
it to its calling task, where it will be received as the result of the
yield
expression (in Python 2.5+) or of the activity.resume()
call
(versions <2.5).
Note, by the way, that when we say the generator above will "wait" until the
callback occurs, we actually mean no such thing! What really happens is that
this generator yields Pause
, recalculation finishes normally, and control
is returned to whatever non-Trellis code caused a recalculation to occur in
the first place. Then, later, when the deferred fires and a callback occurs to
set the result
cell's value, this triggers a recalculation sweep, in
which an implementation rule uses EventLoop.call()
to set up the generator
to be resumed. The recalculation then finishes and control is returned to the
code that caused the deferred to fire.
Finally, when the event loop flushes its callbacks, the generator will actually be resumed. It then yields the result or raises an exception, which in either case is propagated back to whatever generator "called" it, which may then go on to do other things with the value or exception before it pauses or returns in its own turn.
Thus, "time" in the Trellis (and especially for tasks) moves forward only when something changes. It's the setting of cell values that triggers recalculation sweeps, and tasks only resume after sweeps where one of their dependencies have changed.
A task is considered to depend on any cells whose value it has read since the
last time it (or a subtask) yielded a Pause
. Each time a task is resumed,
its old dependencies are thrown out, and a new set are accumulated.
A task must also Pause
in order to see the effects of any changes it makes
to trellis-managed data structures. For example:
>>> c = trellis.List([1,2]) >>> c [1, 2] >>> def demo_task(): ... c.append(3) ... print c ... yield activity.Pause ... print c >>> activity.TaskCell(demo_task).value >>> EventLoop.flush() [1, 2] >>> EventLoop.flush() [1, 2, 3]
As you can see, modifying the list inside a task is like changing it inside a
@modifier
-- the change doesn't take effect until a new recalculation
occurs, and the current recalculation can't finish until the task yields a
Pause
or returns (i.e., exits entirely).
In this example, the task is resumed immediately after the pause because the
task depended on c
(by printing it), and its value changed in the
subsequent sweep (because the task set it). So the task was resumed
immediately, and scheduled to be run as soon as the event loop is flushed
again.
But what if a task doesn't have any dependencies? If it doesn't depend on anything, how does it get resumed after a pause? Let's see what happens:
>>> def demo_task(): ... print 1 ... yield activity.Pause ... print 2 >>> activity.TaskCell(demo_task).value >>> EventLoop.flush() 1 >>> EventLoop.flush() 2
As you can see, a task with no dependencies, (i.e., one that hasn't looked at
any cells since its last Pause
), is automatically resumed. The Trellis
effectively pretends that the task both set and depended on an imaginary cell,
forcing the task to be scheduled for execution again. This prevents tasks from
accidently suspending themselves indefinitely.
There are quite a few event-driven application frameworks used with Python,
including those of various GUI toolkits, co-operative multitasking frameworks,
etc. If you need to integrate the trellis with one, it's fairly
straightforward to wrap its API in an EventLoop
implementation. Here are
the attributes and methods you'll need to implement:
- _ticker
- This should be a
@trellis.perform
rule that only executes when the event loop is running, and handles scheduling forTime.tick()
calls. Whenself._next_time
is notNone
, it is a value indicating the number of seconds until the next scheduled event, and you should arrange to callTime.tick()
at that time, if possible. Also, ifself.stop_requested
is true, the observer should request that the real event loop exit, if it's currently running and hasn't already been asked to exit. - _loop()
- This method should run the real event loop.
- _arrange_callback(func)
- Arrange for func to be called back with zero arguments when the real event loop is idle.
- _setup()
- Do any one-time setup that might be required before callbacks are arranged or the loop is run. Typically, this routine will import the targeted event loop API and do any configuration, or initialize any private attributes needed to implement the other methods.
Let's take a look at an example implementation:
>>> from peak import context >>> class MyEventLoop(activity.EventLoop): ... context.replaces(activity.EventLoop) # <-- must have this line! ... ... @trellis.perform ... def _ticker(self): ... if self.running: ... if Time.auto_update: ... if self._next_time is not None: ... print "tick() needed after", self._next_time, "secs" ... if self.stop_requested: ... print "ask the event loop to exit now" ... ... def _loop(self): ... print "actually run the event loop here" ... ... def _arrange_callback(self, func): ... print "arrange to call back", func ... ... def _setup(self): ... print "do any setup here" >>> m = MyEventLoop() >>> m.call(hello, 'test 1') do any setup here arrange to call back <bound method MyEventLoop._callback ...> >>> m.call(hello, 'test 2') >>> m.run() actually run the event loop here ask the event loop to exit now
This event loop implementation is actually a bit broken, because it doesn't
really arrange for a callback in the _arrange_callback()
method. If we
had actually arranged for the callback to be called back by some external
event loop API, our call()
would have worked:
>>> m._callback() called with ('test 1',) {} arrange to call back <bound method MyEventLoop._callback ...>
For more examples, check out the source code of the TwistedEventLoop
and
WXEventLoop
classes in peak.events.activity
: they're both quite short.