-
Notifications
You must be signed in to change notification settings - Fork 29
/
Copy pathtime.ts
121 lines (109 loc) · 3.42 KB
/
time.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import { Time, State } from "./common";
import { cons } from "./datastructures";
import { Stream } from "./stream";
import { Behavior, fromFunction } from "./behavior";
import { sample, Now, perform } from "./now";
/*
* Time related behaviors and functions
*/
export class DelayStream<A> extends Stream<A> {
constructor(parent: Stream<A>, readonly ms: number) {
super();
this.parents = cons(parent);
}
pushS(t: number, a: A): void {
setTimeout(() => {
this.pushSToChildren(t, a);
}, this.ms);
}
}
export function delay<A>(ms: number, stream: Stream<A>): Now<Stream<A>> {
return perform(() => new DelayStream(stream, ms));
}
class ThrottleStream<A> extends Stream<A> {
constructor(parent: Stream<A>, readonly ms: number) {
super();
this.parents = cons(parent);
}
private isSilenced = false;
pushS(t: number, a: A): void {
if (!this.isSilenced) {
this.pushSToChildren(t, a);
this.isSilenced = true;
setTimeout(() => {
this.isSilenced = false;
}, this.ms);
}
}
}
export function throttle<A>(ms: number, stream: Stream<A>): Now<Stream<A>> {
return perform(() => new ThrottleStream<A>(stream, ms));
}
class DebounceStream<A> extends Stream<A> {
constructor(parent: Stream<A>, readonly ms: number) {
super();
this.parents = cons(parent);
}
private timer: NodeJS.Timeout | undefined = undefined;
pushS(t: number, a: A): void {
clearTimeout(this.timer);
this.timer = setTimeout(() => {
this.pushSToChildren(t, a);
}, this.ms);
}
}
export function debounce<A>(ms: number, stream: Stream<A>): Now<Stream<A>> {
return perform(() => new DebounceStream<A>(stream, ms));
}
/**
* A behavior whose value is the number of milliseconds elapsed in
* UNIX epoch. I.e. its current value is equal to the value got by
* calling `Date.now`.
*/
export const time: Behavior<Time> = fromFunction((_) => Date.now());
/**
* A behavior giving access to continuous time. When sampled the outer
* behavior gives a behavior with values that contain the difference
* between the current sample time and the time at which the outer
* behavior was sampled.
*/
export const measureTimeFrom = time.map((from) => time.map((t) => t - from));
export const measureTime = sample(measureTimeFrom);
class IntegrateBehavior extends Behavior<number> {
private lastPullTime: Time;
constructor(private parent: Behavior<number>, t: number) {
super();
this.lastPullTime = time.at(t);
this.state = State.Pull;
this.last = 0;
this.pulledAt = t;
this.changedAt = t;
this.parents = cons(parent, cons(time));
}
update(_t: Time): number {
const currentPullTime = time.last;
const deltaMs = currentPullTime - this.lastPullTime;
const value = this.last + deltaMs * this.parent.last;
this.lastPullTime = currentPullTime;
return value;
}
changeStateDown(_: State): void {
// No-op as an `IntegrateBehavior` is always in `Pull` state
}
}
/**
* Returns a `Now` computation of a behavior of the integral of the given behavior.
*/
export function integrate(behavior: Behavior<number>): Now<Behavior<number>> {
return sample(integrateFrom(behavior));
}
/**
* Integrate a behavior with respect to time.
*
* The value of the behavior is treated as a rate of change per millisecond.
*/
export function integrateFrom(
behavior: Behavior<number>
): Behavior<Behavior<number>> {
return fromFunction((t) => new IntegrateBehavior(behavior, t));
}