-
Notifications
You must be signed in to change notification settings - Fork 0
/
cacheTails.ts
67 lines (65 loc) · 2.01 KB
/
cacheTails.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import DIE from "phpdie";
import { equals } from "rambda";
import type { Awaitable } from "./Awaitable";
import { never } from "./never";
/**
* Assume Stream content is ordered plain json object, (class is not supported)
* And new element always insert into head
*
* Set ttl in your store settings
*
* 1. cache whole list once upstream flushed
* 2. Stop upstream and Continue with cached list once head matched
*
* This step should place at near the output end.
*/
export function cacheTails<T>(
store: {
has?: (key: string) => Awaitable<boolean>;
get: (key: string) => Awaitable<T[] | undefined>;
set: (key: string, chunks: T[]) => Awaitable<any>;
},
_options?:
| string
| {
/**
* Key could step name,
* or defaults to `new Error().stack` if you r lazy enough
*/
key?: string;
}
) {
// parse options
const { key = new Error().stack ?? DIE("missing cache key") } =
typeof _options === "string" ? { key: _options } : _options ?? {};
const chunks: T[] = [];
const cachePromise = Promise.withResolvers<T[]>();
const t = new TransformStream();
const w = t.writable.getWriter();
const writable = new WritableStream({
start: async () => cachePromise.resolve(await store.get(key)),
write: async (chunk, ctrl) => {
const cache = await cachePromise.promise;
if (cache && equals(chunk, cache[0])) {
// save cache
await store.set(key, [...chunks, ...cache]);
// emit whole cache as tail into downstream
for await (const item of cache) await w.write(item);
await w.close();
// await t.readable.cancel(new Error('Cached'))
// cancel upstream if need...
ctrl.error(new Error("cached"));
// return;
return await never();
}
chunks.push(chunk);
await w.write(chunk);
},
close: async () => {
await store.set(key, [...chunks]);
await w.close();
},
abort: () => w.abort(),
});
return { writable, readable: t.readable };
}