-
-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Documentation/Examples for streaming requests and responses #5344
Comments
Don't have time to write the docs myself, at least right now — but here are a few things to know about readable streams and multipart/form-data:
Whoever produces sample code for this will need to make sure the sample code doesn't fall into the most common pitfall of stream-handling, which is assuming that chunk boundaries will be nice. A chunk boundary that splits a UTF-8 sequence or a multipart/form-data boundary marker is going to cause issues to naive code, and the examples should probably show how to do it right. |
Having said that, here's a general guideline to handling large file uploads in a form with other data:
|
I'd love to use an existing library for this rather than rolling our own multipart form data parser, even if we abstract it somehow. Unfortunately the ones that I know of (busboy, formidable) are wedded to Node AFAICT. Anyone know of something more modern, that can take a |
Streaming A Large Dataset With A Pipe To Response Writable would be handy. So documentation with an example of a large file being read by an endpoint, converted to a readable stream, and then being fetched by a page is a common use case, avoiding filling up memory. I tried some usual code that would work in Express or Hyper-Express with piping a readableStream, but I get a dest.on is not a function type error.
Something like how Express or Hyper-express gives examples for piping would be great. Makes it easy to pipe to a stream to another stream etc. etc.
This seems like a common use case. |
I hope this is relevant here. From the conversations I see that this might be related here - how would I with the new feature enable the package to use ReadableStream? |
I've been testing Server Sent Events (SSEs) with Kit recently. It's almost working properly, but the non-streamable In a comment on a related issue, Rich said SSEs are a special case of Ideally, we would be able to have access to all the Thank you @Rich-Harris and the rest of the team for all the work you've done on this project. |
@JuliaBonita +server.js import { createSSE } from './sse';
import { bus } from './bus';
/** @type {import('./$types').RequestHandler} */
export async function GET({ request }) {
// does not have to be a number, this is just an example
const last_event_id = Number(request.headers.get('last-event-id')) || 0;
const { readable, subscribe } = createSSE(last_event_id);
subscribe(bus, 'time');
subscribe(bus, 'date');
return new Response(readable, {
headers: {
'cache-control': 'no-cache',
'content-type': 'text/event-stream',
}
});
} sse.js export function createSSE(last_id = 0, retry = 0) {
let id = last_id;
const { readable, writable } = new TransformStream({
start(controller) {
controller.enqueue(': hello\n\n');
if (retry > 0) controller.enqueue(`retry: ${retry}\n\n`);
},
transform({ event, data }, controller) {
let msg = `id: ${++id}\n`;
if (event) msg += `event: ${event}\n`;
if (typeof data === 'string') {
msg += 'data: ' + data.trim().replace(/\n+/gm, '\ndata: ') + '\n';
} else {
msg += `data: ${JSON.stringify(data)}\n`;
}
controller.enqueue(msg + '\n');
}
});
const writer = writable.getWriter();
return {
readable,
/**
* @param {import('node:events').EventEmitter} eventEmitter
* @param {string} event
*/
async subscribe(eventEmitter, event) {
function listener(/** @type {any} */ data) {
writer.write({ event, data });
}
eventEmitter.on(event, listener);
await writer.closed.catch(() => { });
eventEmitter.off(event, listener);
}
};
} bus.js import { EventEmitter } from 'node:events';
export const bus = new EventEmitter();
setInterval(() => {
bus.emit('time', new Date().toLocaleTimeString());
}, 2e3);
setInterval(() => {
bus.emit('date', new Date().toLocaleDateString());
}, 5e3); |
@repsac-by Thank you very much for your help. I appreciate this and will spend more time learning the Streams API if necessary, but it's a totally different API and much more complex than the simple SSE API. The simplicity of the SSE API for simple server-push applications is the most significant feature that differentiates it from WebSockets. If we have to use the more complicated Streams API in Kit, then it seems we might as well use WebSockets and assume our apps will use bi-directional channels even if we only need data pushed from the server to the client. My assumption (maybe incorrect) was that Kit would include a way to use the actual SSE API with the same simple functionality that we have with vanilla Node/Express. Your example is excellent for the Streams API, but it still requires a significant learning curve to achieve the equivalent functionality and reliability that is trivially easy to achieve with SSEs, which is sufficient for the vast majority of text-based server-push applications. So before I go down the more complicated Streams API path, can you please confirm that there is no efficient way to implement SSE in Kit to keep connections open and manage the connections like we can easily do in Node/Express? |
I just looked at the source code for the Streams API "Hello World" example. It requires 80 lines of code (including markup) just to display "Hello World" on the page. Swimming through all the Streams API docs is a deep rabbit hole. Clearly the Streams API is not ergonomic, which is why very few people actually understand or use it. It's also overkill for virtually all SSE text-based applications. I bet over 95% of the Svelte community will never take the time to learn the Streams API well enough to use it effectively, which makes Kit much less likely to be used in large-scale apps. So I hope there's a way to use actual SSEs in Kit (including proper event handling on the server like with Express). |
@JuliaBonita export async function GET() {
const readable = new ReadableStream({
start(ctr) {
this.interval = setInterval(() => ctr.enqueue('data: ping\n\n'), 3000);
},
cancel() {
clearInterval(this.interval);
}
});
return new Response(readable, {
headers: {
'content-type': 'text/event-stream',
}
});
} |
@repsac-by Thank you. The second example helps to show that the Streams API is basically a transport mechanism to stream any data flow, including higher level protocols like SSEs. This should be enough for me to figure out how to apply it to my app. In the future, I suspect Svelte devs will ask for a more simplified interface for streams and SSEs like Express provides, but for now, at least I can see a path forward. Thank you. |
@repsac-by Thnx. Your "SSE in a few lines" comment helped me to push ticker data from server to client. |
@voscausa you forgot something /** @type {import('./$types').RequestHandler} */
export function GET({ request }) {
+ const ac = new AbortController();
console.log("GET api: yahoo-finance-ticker")
const stream = new ReadableStream({
start(controller) {
tickerListener.on("ticker", (ticker) => {
console.log(ticker.price);
controller.enqueue(String(ticker.price));
- })
+ }, { signal: ac.signal });
+ },
+ cancel() {
+ ac.abort();
},
})
return new Response(stream, {
headers: {
'content-type': 'text/event-stream',
}
});
} |
OK. Thanks for the update. It solved the exeption when changing page route. |
@repsac-by Now it's working fine. I also included abort in the +page.svelte route routes/yahoo/+page.svelte <script>
import { onDestroy } from "svelte";
let result = "";
const ac = new AbortController();
const signal = ac.signal;
async function getStream() {
const pairs = ["BTC-USD", "EURUSD=X"].join(",");
const response = await fetch(`/api/yahoo-finance-ticker?pairs=${pairs}&logging=true`, {
signal,
});
const reader = response.body.pipeThrough(new TextDecoderStream()).getReader();
while (true) {
const { value, done } = await reader.read();
console.log("resp", done, value);
if (done) break;
result += `${value}<br>`;
}
}
getStream();
onDestroy(() => {
ac.abort();
console.log("ticker fetch **aborted");**
});
</script> routes/api/yahoo-finance-ticker/+server.js import YahooFinanceTicker from "yahoo-finance-ticker";
/** @type {import('./$types').RequestHandler} */
export function GET({ url }) {
const ac = new AbortController();
const ticker = new YahooFinanceTicker();
ticker.setLogging(Boolean(url.searchParams.get('logging') ?? 'false'));
const pairs = (url.searchParams.get('pairs') ?? '').split(",");
console.log("GET api: yahoo-finance-ticker")
const stream = new ReadableStream({
start(controller) {
(async () => {
const tickerListener = await ticker.subscribe(pairs);
tickerListener.on("ticker", (ticker) => {
console.log(ticker.price);
controller.enqueue(String(ticker.price));
}, { signal: ac.signal }); // ?? { signal: ac.signal } @repsac-by
})().catch(err => console.error(err));
},
cancel() {
console.log("cancel and abort");
ticker.unsubscribe();
ac.abort();
},
})
return new Response(stream, {
headers: {
'content-type': 'text/event-stream',
}
});
} |
Just found out about |
unstable_parsemultipartfoemdata looks like what was needed. I'll fiddle around and see if I can come up with something that works |
+1 for this feature |
Would be really great to have a small consensual example here while waiting for an official doc! |
Here is how I get SSE working for me
function delay(ms: number): Promise<void> {
return new Promise((res) => setTimeout(res, ms));
}
export function GET() {
const encoder = new TextEncoder();
const readable = new ReadableStream({
async start(controller) {
for (let i = 0; i < 20; i++) {
controller.enqueue(encoder.encode('hello'));
await delay(1000)
}
controller.close()
}
});
return new Response(readable, {
headers: {
'content-type': 'text/event-stream',
}
});
}
<script lang="ts">
import { onMount } from 'svelte';
const result: string[] = [];
async function subscribe() {
const response = await fetch('/sse');
const reader = response.body.pipeThrough(new TextDecoderStream()).getReader();
while (true) {
const { value, done } = await reader.read();
if (done) break;
result.push(value)
result = result
}
}
onMount(subscribe);
</script>
{#each result as str}
<p>{str}</p>
{/each} |
This comment was marked as duplicate.
This comment was marked as duplicate.
Example using EventSource in the client.
<!-- src/routes/sse/+server.ts -->
<script lang="ts">
import { onMount } from 'svelte';
let messages: string[] = [];
onMount(() => {
const eventSource = new EventSource('/sse');
eventSource.addEventListener('message', (event) => {
if (!event.data) return;
messages.push(event.data);
messages = messages;
});
eventSource.addEventListener('close', eventSource.close);
// TODO Cleanup event listeners
});
</script>
<ol>
{#each messages as message (message)}
<li>{message}</li>
{/each}
</ol> When the server responds with a custom event 'close', the client closes the EventSource and does not reconnect. // src/routes/sse/+server.ts
const duration = 5 * 1000; // 5 seconds
export const GET = () => {
const encoder = new TextEncoder();
const readable = new ReadableStream({
start: async (controller) => {
const startedAt = new Date();
while (true) {
const message = new Date().toISOString();
controller.enqueue(encoder.encode(`data: ${message}\n\n`));
const elapsedFor = Date.now() - startedAt.valueOf();
if (elapsedFor > duration) break;
await new Promise((resolve) => setTimeout(resolve, 1000));
}
controller.enqueue(encoder.encode('event: close\ndata:\n\n'));
controller.close();
}
});
return new Response(readable, { headers: { 'content-type': 'text/event-stream' } });
}; The server does not check for client disconnection in this example. Reference the following issue: #11751
|
Since the OP asked for a
<script>
import { enhance } from '$app/forms';
</script>
<form method="post" use:enhance enctype="multipart/form-data">
<input type="file" name="image" accept="image/avif, image/jpeg, image/png, image/webp" required />
<button>Submit</button>
</form> import { error, fail } from '@sveltejs/kit';
import { randomUUID } from 'node:crypto';
import { createWriteStream } from 'node:fs';
import { Readable } from 'node:stream';
import type { ReadableStream } from 'node:stream/web';
import { file, parse, picklist } from 'valibot'; // Optional
export const actions = {
default: async ({ request }) => {
if (!request.headers.get('content-type')?.startsWith('multipart/form-data')) error(415);
const formData = await request.formData();
// File
const image = parse(file(), formData.get('image'));
if (image.size > 10 * 1000 * 1024) return fail(413);
// "image/avif" | "image/jpeg" | "image/png" | "image/webp"
const type = parse(
picklist(['image/avif', 'image/jpeg', 'image/png', 'image/webp']),
image.type
);
const filename = `${randomUUID()}.${type.substring(6)}`;
// Same name, but completely different types:
// Argument of type 'ReadableStream<Uint8Array>'
// is not assignable to parameter of type 'ReadableStream<any>'.
const readable = Readable.fromWeb(
// globalThis.ReadableStream<Uint8Array>
// node:stream/web.ReadableStream
image.stream() as ReadableStream
);
const writeStream = createWriteStream(filename);
readable.pipe(writeStream);
try {
await new Promise<void>((resolve, reject) => {
let readEnded = false;
readable.on('error', reject);
readable.on('end', () => {
readEnded = true;
if (writeEnded) resolve();
});
let writeEnded = false;
writeStream.on('error', reject);
writeStream.on('finish', () => {
writeEnded = true;
if (readEnded) resolve();
});
});
} catch {
// TODO Handle upload error
} finally {
readable.removeAllListeners();
readable.destroy();
writeStream.end();
}
}
}; Note
Important
Would this work? @Rich-Harris |
Describe the problem
As of #5291 SvelteKit now supports using
ReadableStream
to read the request body in endpoints and supports returning it as a response body, however there are currently no examples in the docs on how to do common things, for example reading files submitted from amultipart/form-data
as a stream and write them to disk, reading files from disk and returning them as a stream, etc.Describe the proposed solution
Adding examples for common use-cases of streaming in the docs
Alternatives considered
No response
Importance
nice to have
Additional Information
I would help with this myself, but I honestly couldn't figure it out, and I wouldn't trust the code I write to be up to the standard of the docs. I specifically wanted to handle super-large (over 1gb) file uploads that would be in a
multipart/form-data
request with other fields but couldn't figure it out.Also feel like I should mention this: in #3419, Rich mentions having utility functions for handling multipart/octet streams, so if the maintainers are planning on adding them then it would make sense to not add documentation until they are added, as they would simplify any code that could be written without them.
The text was updated successfully, but these errors were encountered: