-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathindex.ts
75 lines (67 loc) · 2.41 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import { Transform } from 'stream'
import { IncomingMessage, OutgoingHttpHeaders } from "http"
function dataString(data: string|object): string {
if (typeof data === 'object') return dataString(JSON.stringify(data))
return data.split(/\r\n|\r|\n/).map(line => `data: ${line}\n`).join('')
}
interface Message {
data: string|object
comment?: string,
event?: string,
id?: string,
retry?: number,
}
interface WriteHeaders {
writeHead?(statusCode: number, headers?: OutgoingHttpHeaders): WriteHeaders
flushHeaders?(): void
}
export type HeaderStream = NodeJS.WritableStream & WriteHeaders
/**
* Transforms "messages" to W3C event stream content.
* See https://html.spec.whatwg.org/multipage/server-sent-events.html
* A message is an object with one or more of the following properties:
* - data (String or object, which gets turned into JSON)
* - event
* - id
* - retry
* - comment
*
* If constructed with a HTTP Request, it will optimise the socket for streaming.
* If this stream is piped to an HTTP Response, it will set appropriate headers.
*/
export default class SseStream extends Transform {
constructor(req?: IncomingMessage) {
super({ objectMode: true })
if (req) {
req.socket.setKeepAlive(true)
req.socket.setNoDelay(true)
req.socket.setTimeout(0)
}
}
pipe<T extends HeaderStream>(destination: T, options?: { end?: boolean; }): T {
if (destination.writeHead) {
destination.writeHead(200, {
'Content-Type': 'text/event-stream; charset=utf-8',
'Transfer-Encoding': 'identity',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
})
destination.flushHeaders()
}
// Some clients (Safari) don't trigger onopen until the first frame is received.
destination.write(':ok\n\n')
return super.pipe(destination, options)
}
_transform(message: Message, encoding: string, callback: (error?: (Error | null), data?: any) => void) {
if (message.comment) this.push(`: ${message.comment}\n`)
if (message.event) this.push(`event: ${message.event}\n`)
if (message.id) this.push(`id: ${message.id}\n`)
if (message.retry) this.push(`retry: ${message.retry}\n`)
if (message.data) this.push(dataString(message.data))
this.push('\n')
callback()
}
writeMessage(message: Message, encoding?: string, cb?: (error: Error | null | undefined) => void): boolean {
return this.write(message, encoding, cb)
}
}