Skip to content

Commit

Permalink
fix(stream-text): ensure SSE handling conforms to the spec (#33)
Browse files Browse the repository at this point in the history
* fix(stream-text): ensure SSE handling conforms to the spec

* fix(stream-text): currently, we don't supports line break other '\n'

* Update packages/stream-text/src/index.ts

Co-authored-by: 藍+85CD <[email protected]>

* Update packages/stream-text/src/index.ts

Co-authored-by: 藍+85CD <[email protected]>

* fix(stream-text): keep comments

---------

Co-authored-by: 藍+85CD <[email protected]>
  • Loading branch information
iseki0 and kwaa authored Jan 19, 2025
1 parent 5c73eb3 commit 2843900
Showing 1 changed file with 50 additions and 34 deletions.
84 changes: 50 additions & 34 deletions packages/stream-text/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ export interface StreamTextResult {
usage?: Usage
}

const chunkHeaderPrefix = 'data: '
const chunkErrorPrefix = `{"error":`
const chunkHeaderPrefix = 'data:'

/**
* @experimental WIP, does not support function calling (tools).
Expand All @@ -58,47 +57,64 @@ export const streamText = async (options: StreamTextOptions): Promise<StreamText

let finishReason: string | undefined
let usage: undefined | Usage

/**
* Process a single line of SSE data
* @param line The line to process
* @param controller The transform stream controller
* @returns true if processing should stop (e.g. [DONE] or error)
*/
const processLine = async (line: string, controller: TransformStreamDefaultController<ChunkResult>) => {
// Skip empty lines
if (!line || !line.startsWith(chunkHeaderPrefix))
return

// Extract content after "data:" prefix
const content = line.slice(chunkHeaderPrefix.length)
// Remove leading single space if present
const data = content.startsWith(' ') ? content.slice(1) : content

// Handle special cases
if (data === '[DONE]') {
controller.terminate()
return true
}

// TODO: should we use JSON.parse?
if (data.startsWith('{') && data.includes('"error":')) {
controller.error(new Error(`Error from server: ${data}`))
return true
}

// Process normal chunk
const chunk: ChunkResult = JSON.parse(data)
controller.enqueue(chunk)

if (options.onChunk)
await options.onChunk(chunk)

if (chunk.choices[0].finish_reason) {
finishReason = chunk.choices[0].finish_reason
}
if (chunk.usage) {
usage = chunk.usage
}
}

let buffer = ''

// null body handled by import('@xsai/shared-chat').chat()
const rawChunkStream = res.body!.pipeThrough(new TransformStream({
transform: async (chunk, controller) => {
buffer += decoder.decode(chunk, { stream: true })
const lines = buffer.split('\n\n')
const text = decoder.decode(chunk, { stream: true })
buffer += text
const lines = buffer.split('\n')
buffer = lines.pop() || ''

// Process complete lines
for (const line of lines) {
// Some cases:
// - Empty chunk
// - :ROUTER PROCESSING from OpenRouter
if (!line || !line.startsWith(chunkHeaderPrefix)) {
continue
}

const lineWithoutPrefix = line.slice(chunkHeaderPrefix.length)
if (lineWithoutPrefix.startsWith(chunkErrorPrefix)) {
// About controller error: https://developer.mozilla.org/en-US/docs/Web/API/TransformStreamDefaultController/error
controller.error(new Error(`Error from server: ${lineWithoutPrefix}`))
break
}

if (lineWithoutPrefix === '[DONE]') {
controller.terminate()
if (await processLine(line, controller))
break
}

const chunk: ChunkResult = JSON.parse(lineWithoutPrefix)
controller.enqueue(chunk)

if (options.onChunk)
await options.onChunk(chunk)

if (chunk.choices[0].finish_reason) {
finishReason = chunk.choices[0].finish_reason
}
if (chunk.usage) {
usage = chunk.usage
}
}
},
}))
Expand Down

0 comments on commit 2843900

Please sign in to comment.