Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Consumer::receive_all #18

Merged
merged 1 commit into from
Oct 11, 2023
Merged

add Consumer::receive_all #18

merged 1 commit into from
Oct 11, 2023

Conversation

svix-onelson
Copy link
Contributor

@svix-onelson svix-onelson commented Oct 10, 2023

Support for batched reads is not uniform among the various backends we have.
In some cases, partial batches will wait longer or shorter than the specified deadline.

The odd one out right now is RabbitMQ which supports configuring a channel with a pre-fetch limit, but you still have to just consume messages one-by-one (the batching is all under the hood). Additionally, there doesn't seem to be a way to specify a timeout or deadline (apparently this is left up to individual clients to decide what makes the most sense).

As such, RabbitMQ is under-served compared to the rest. The implementation for batch reads is entirely done in the client code (i.e. reading messages one-by-one, buffering them, then returning).

One possible solution is to add a max prefetch config option and use this when initializing a consumer channel, but this would also mean the max_messages argument would be meaningless for RabbitMQ, and using receive_all at all would probably be irrelevant. Callers should probably just use receive instead.

For now, I've left some comments in the rabbit impl and will leave this as future work.


Support for batched reads is not uniform among the various backends we have.

The in-memory and rabbitmq impls differ from the rest as these consumers are channel/stream based, with no explicit option for consuming messages in batches.

Using tokio::time::timeout for the first item, then looping to opportunistically try to fill up the batch with any remaining already ready messages, we can emulate a similar behavior as the other backends.

In all cases, receive_all should:

  • not block longer (+/-) than the specified deadline, even if no items
    are available.
  • not linger waiting for more items once it as received any.

Additionally, a new bit of config has been exposed for rabbitmq so callers can specify a prefetch count. This didn't seem to have any impact on the tests, but seems good to have.

@svix-onelson svix-onelson marked this pull request as ready for review October 10, 2023 02:07
@svix-onelson svix-onelson requested a review from a team October 10, 2023 02:07
omniqueue/src/backends/sqs.rs Outdated Show resolved Hide resolved
omniqueue/src/backends/rabbitmq.rs Outdated Show resolved Hide resolved
omniqueue/src/backends/memory_queue.rs Show resolved Hide resolved
@svix-onelson svix-onelson force-pushed the onelson/recv-all branch 4 times, most recently from 3b5c9c0 to a7c3e72 Compare October 11, 2023 19:21
Support for batched reads is not uniform among the various backends we
have.

The in-memory and rabbitmq impls differ from the rest as these consumers
are channel/stream based, with no explicit option for consuming messages
in batches.

Using `tokio::time::timeout` for the first item, then looping to
opportunistically try to fill up the batch with any remaining
_already ready_ messages, we can emulate a similar behavior as the other
backends.

In all cases, receive_all should:
- not block longer (+/-) than the specified deadline, even if no items
  are available.
- not linger waiting for _more items_ once it as received _any_.

Additionally, a new bit of config has been exposed for rabbitmq so callers
can specify a prefetch count. This didn't seem to have any impact on the
tests, but seems good to have.
Copy link
Contributor

@svix-gabriel svix-gabriel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now_or_never is pretty cool

@svix-onelson svix-onelson merged commit 6ed3b4b into main Oct 11, 2023
2 checks passed
@svix-onelson svix-onelson deleted the onelson/recv-all branch October 11, 2023 19:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants