-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfp-streams.py
58 lines (44 loc) · 1.21 KB
/
fp-streams.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import event_loop
def noop():
pass
def later(delay):
def func(f):
timer = loop.add_timer(delay, f)
return lambda: loop.cancel_timer(timer)
return func
def streamof(seq, delay=1e-4):
def func(cb):
it, stream, unsubcribe = iter(seq), later(delay), noop
def doit():
nonlocal unsubcribe
try:
current = next(it)
unsubcribe = stream(lambda: doit())
cb(current)
except StopIteration:
pass
unsubcribe = stream(lambda: doit())
return lambda: unsubcribe()
return func
def map_stream(f, stream):
def func(cb):
return stream(lambda x: cb(f(x)))
return func
def filter_stream(f, stream):
def func(cb):
def predicate(x):
if f(x):
cb(x)
return stream(predicate)
return func
def infinite_seq():
counter = 0
while True:
yield counter
counter += 1
loop = event_loop.SelectLoop()
nums = streamof(infinite_seq(), delay=0.5)
even_nums = filter_stream(lambda x: x % 2 == 0, nums)
unsubscribe = map_stream(lambda x: [x, x ** 2], even_nums)(print)
loop.add_timer(8.1, unsubscribe)
loop.run()