diff --git a/stream_reader.go b/stream_reader.go index 4210a194..ecfa2680 100644 --- a/stream_reader.go +++ b/stream_reader.go @@ -32,17 +32,28 @@ type streamReader[T streamable] struct { } func (stream *streamReader[T]) Recv() (response T, err error) { - if stream.isFinished { - err = io.EOF + rawLine, err := stream.RecvRaw() + if err != nil { return } - response, err = stream.processLines() - return + err = stream.unmarshaler.Unmarshal(rawLine, &response) + if err != nil { + return + } + return response, nil +} + +func (stream *streamReader[T]) RecvRaw() ([]byte, error) { + if stream.isFinished { + return nil, io.EOF + } + + return stream.processLines() } //nolint:gocognit -func (stream *streamReader[T]) processLines() (T, error) { +func (stream *streamReader[T]) processLines() ([]byte, error) { var ( emptyMessagesCount uint hasErrorPrefix bool @@ -53,9 +64,9 @@ func (stream *streamReader[T]) processLines() (T, error) { if readErr != nil || hasErrorPrefix { respErr := stream.unmarshalError() if respErr != nil { - return *new(T), fmt.Errorf("error, %w", respErr.Error) + return nil, fmt.Errorf("error, %w", respErr.Error) } - return *new(T), readErr + return nil, readErr } noSpaceLine := bytes.TrimSpace(rawLine) @@ -68,11 +79,11 @@ func (stream *streamReader[T]) processLines() (T, error) { } writeErr := stream.errAccumulator.Write(noSpaceLine) if writeErr != nil { - return *new(T), writeErr + return nil, writeErr } emptyMessagesCount++ if emptyMessagesCount > stream.emptyMessagesLimit { - return *new(T), ErrTooManyEmptyStreamMessages + return nil, ErrTooManyEmptyStreamMessages } continue @@ -81,16 +92,10 @@ func (stream *streamReader[T]) processLines() (T, error) { noPrefixLine := bytes.TrimPrefix(noSpaceLine, headerData) if string(noPrefixLine) == "[DONE]" { stream.isFinished = true - return *new(T), io.EOF - } - - var response T - unmarshalErr := stream.unmarshaler.Unmarshal(noPrefixLine, &response) - if unmarshalErr != nil { - return *new(T), unmarshalErr + return nil, io.EOF } - return response, nil + return noPrefixLine, nil } } diff --git a/stream_reader_test.go b/stream_reader_test.go index cd6e46ef..449a14b4 100644 --- a/stream_reader_test.go +++ b/stream_reader_test.go @@ -63,3 +63,16 @@ func TestStreamReaderReturnsErrTestErrorAccumulatorWriteFailed(t *testing.T) { _, err := stream.Recv() checks.ErrorIs(t, err, test.ErrTestErrorAccumulatorWriteFailed, "Did not return error when write failed", err.Error()) } + +func TestStreamReaderRecvRaw(t *testing.T) { + stream := &streamReader[ChatCompletionStreamResponse]{ + reader: bufio.NewReader(bytes.NewReader([]byte("data: {\"key\": \"value\"}\n"))), + } + rawLine, err := stream.RecvRaw() + if err != nil { + t.Fatalf("Did not return raw line: %v", err) + } + if !bytes.Equal(rawLine, []byte("{\"key\": \"value\"}")) { + t.Fatalf("Did not return raw line: %v", string(rawLine)) + } +}