From 2843900e72729c084efed93c1a98715bee7dfcf1 Mon Sep 17 00:00:00 2001 From: iseki Date: Sun, 19 Jan 2025 13:29:56 +0800 Subject: [PATCH] fix(stream-text): ensure SSE handling conforms to the spec (#33) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(stream-text): ensure SSE handling conforms to the spec * fix(stream-text): currently, we don't supports line break other '\n' * Update packages/stream-text/src/index.ts Co-authored-by: 藍+85CD <50108258+kwaa@users.noreply.github.com> * Update packages/stream-text/src/index.ts Co-authored-by: 藍+85CD <50108258+kwaa@users.noreply.github.com> * fix(stream-text): keep comments --------- Co-authored-by: 藍+85CD <50108258+kwaa@users.noreply.github.com> --- packages/stream-text/src/index.ts | 84 ++++++++++++++++++------------- 1 file changed, 50 insertions(+), 34 deletions(-) diff --git a/packages/stream-text/src/index.ts b/packages/stream-text/src/index.ts index 6871565..700eb18 100644 --- a/packages/stream-text/src/index.ts +++ b/packages/stream-text/src/index.ts @@ -44,8 +44,7 @@ export interface StreamTextResult { usage?: Usage } -const chunkHeaderPrefix = 'data: ' -const chunkErrorPrefix = `{"error":` +const chunkHeaderPrefix = 'data:' /** * @experimental WIP, does not support function calling (tools). @@ -58,47 +57,64 @@ export const streamText = async (options: StreamTextOptions): Promise) => { + // Skip empty lines + if (!line || !line.startsWith(chunkHeaderPrefix)) + return + + // Extract content after "data:" prefix + const content = line.slice(chunkHeaderPrefix.length) + // Remove leading single space if present + const data = content.startsWith(' ') ? content.slice(1) : content + + // Handle special cases + if (data === '[DONE]') { + controller.terminate() + return true + } + + // TODO: should we use JSON.parse? + if (data.startsWith('{') && data.includes('"error":')) { + controller.error(new Error(`Error from server: ${data}`)) + return true + } + + // Process normal chunk + const chunk: ChunkResult = JSON.parse(data) + controller.enqueue(chunk) + + if (options.onChunk) + await options.onChunk(chunk) + + if (chunk.choices[0].finish_reason) { + finishReason = chunk.choices[0].finish_reason + } + if (chunk.usage) { + usage = chunk.usage + } + } + let buffer = '' // null body handled by import('@xsai/shared-chat').chat() const rawChunkStream = res.body!.pipeThrough(new TransformStream({ transform: async (chunk, controller) => { - buffer += decoder.decode(chunk, { stream: true }) - const lines = buffer.split('\n\n') + const text = decoder.decode(chunk, { stream: true }) + buffer += text + const lines = buffer.split('\n') buffer = lines.pop() || '' + // Process complete lines for (const line of lines) { - // Some cases: - // - Empty chunk - // - :ROUTER PROCESSING from OpenRouter - if (!line || !line.startsWith(chunkHeaderPrefix)) { - continue - } - - const lineWithoutPrefix = line.slice(chunkHeaderPrefix.length) - if (lineWithoutPrefix.startsWith(chunkErrorPrefix)) { - // About controller error: https://developer.mozilla.org/en-US/docs/Web/API/TransformStreamDefaultController/error - controller.error(new Error(`Error from server: ${lineWithoutPrefix}`)) - break - } - - if (lineWithoutPrefix === '[DONE]') { - controller.terminate() + if (await processLine(line, controller)) break - } - - const chunk: ChunkResult = JSON.parse(lineWithoutPrefix) - controller.enqueue(chunk) - - if (options.onChunk) - await options.onChunk(chunk) - - if (chunk.choices[0].finish_reason) { - finishReason = chunk.choices[0].finish_reason - } - if (chunk.usage) { - usage = chunk.usage - } } }, }))