diff --git a/lib/spear/reading/stream.ex b/lib/spear/reading/stream.ex index 3bd4eb2..541325b 100644 --- a/lib/spear/reading/stream.ex +++ b/lib/spear/reading/stream.ex @@ -11,7 +11,9 @@ defmodule Spear.Reading.Stream do :resolve_links?, :timeout, :buffer, - :credentials + :credentials, + # The number of messages read from a single chunk. + read_count: 0 ] @type t :: %__MODULE__{} @@ -90,17 +92,24 @@ defmodule Spear.Reading.Stream do ) end - # in this case the buffer has run dry and we need to request more events - # (a new buffer) with a new ReadReq @spec unfold_continuous(t()) :: {emitted_element :: tuple(), t()} | nil + defp unfold_continuous(%__MODULE__{buffer: <<>>, read_count: read_count, max_count: max_count}) + when read_count < max_count do + # If we are done reading this chunk and the amount that we read is smaller than + # what we requested, the stream must be finished. + nil + end + defp unfold_continuous(%__MODULE__{buffer: <<>>, from: from} = state) do + # in this case the buffer has run dry and we need to request more events + # (a new buffer) with a new ReadReq response = request!(%__MODULE__{state | max_count: state.max_count + 1}) # discard the first message since it is `from` with {^from, <<_head, _::binary>> = rest} <- unfold_chunk(response), # look ahead in `rest` to ensure it's an event read response {event(), _} <- unfold_chunk(rest) do - unfold_continuous(%__MODULE__{state | buffer: rest}) + unfold_continuous(%__MODULE__{state | buffer: rest, read_count: 0}) else # discard trailing stream position message _ -> @@ -111,7 +120,13 @@ defmodule Spear.Reading.Stream do defp unfold_continuous(%__MODULE__{buffer: buffer} = state) do case unfold_chunk(buffer) do {event() = message, remaining_buffer} -> - {message, %__MODULE__{state | buffer: remaining_buffer, from: message}} + {message, + %__MODULE__{ + state + | buffer: remaining_buffer, + from: message, + read_count: state.read_count + 1 + }} # skip non-event read responses # coveralls-ignore-start