Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(replays): Add buffered consumer implementation #85356

Open
wants to merge 50 commits into
base: master
Choose a base branch
from

Conversation

cmanallen
Copy link
Member

@cmanallen cmanallen commented Feb 18, 2025

TODO description...

Partially addresses the DACI: https://www.notion.so/sentry/DACI-Session-Replay-Recording-Consumer-Stability-and-Performance-Improvements-19e8b10e4b5d80a192a1ecd46f13eebb

How it works:

  • As messages come in they are processed and their processed results are stored on a queue.
  • When the queue fills up the processed messages are flushed.
    • Flushing involves committing data to GCS, ClickHouse, BigQuery, DataDog.
    • Anything I/O related.
  • Flushing happens in a thread-pool.

This closely mirrors current production behavior except processing is no longer done in the thread-pool.

This PR also introduces a new RunTime abstraction for managing state changes in the consumer. Which I will document in a Notion doc.

@github-actions github-actions bot added the Scope: Backend Automatically applied to PRs that change backend components label Feb 18, 2025
Copy link

codecov bot commented Feb 20, 2025

Codecov Report

Attention: Patch coverage is 97.70115% with 10 lines in your changes missing coverage. Please review.

✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
src/sentry/replays/consumers/buffered/platform.py 95.72% 5 Missing ⚠️
...ests/sentry/replays/unit/consumers/test_helpers.py 88.00% 3 Missing ⚠️
src/sentry/replays/consumers/buffered/consumer.py 98.79% 1 Missing ⚠️
...ests/sentry/replays/unit/consumers/test_runtime.py 98.00% 1 Missing ⚠️
Additional details and impacted files
@@           Coverage Diff            @@
##           master   #85356    +/-   ##
========================================
  Coverage   87.88%   87.88%            
========================================
  Files        9714     9720     +6     
  Lines      550639   551075   +436     
  Branches    21449    21449            
========================================
+ Hits       483917   484334   +417     
- Misses      66342    66361    +19     
  Partials      380      380            

@cmanallen cmanallen marked this pull request as ready for review February 28, 2025 16:58
@cmanallen cmanallen requested review from a team as code owners February 28, 2025 16:58
Copy link
Contributor

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would advise against building a consumer with this abstraction and make it work on top of Arroyo.

  • Arroyo provides an abstraction level that is similar to the one provided by your PlatformStrategy. The two, though, go in fundamentally different directions. Building one on top of the other seems to create a quite complex architecture. It is quite hard to intuitively tell whether the system guarantees at least once delivery.
  • We will not be able to move parts of the processing on a multi process pool in case it is needed for scale as that qould require using the RunTask abstraction.
  • As we discussed in the past. I believe the dataflow model (even the small subset provided by arroyo) makes this kind of applications simpler to understand due to the sequential nature of the pipeline.

Comment on lines 27 to 32
def create_with_partitions(
self,
commit: ArroyoCommit,
partitions: Mapping[Partition, int],
) -> ProcessingStrategy[KafkaPayload]:
return PlatformStrategy(commit=commit, flags=self.flags, runtime=recording_runtime)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be considerably simpler to model this consumer as a sequence of these Arroyo operators:

Modeling the system this way would:

  • allow the parallelism either via processes and threads without application logic changes
  • guarantee a pipeline approach that allows the batching step to keep batching new messages while the worker thread performs its work.
  • hide offset management entirely from the application code.

Comment on lines 57 to 63
def can_flush(self, model: Model[ProcessedRecordingMessage]) -> bool:
# TODO: time.time is stateful and hard to test. We should enable the RunTime to perform
# managed effects so we can properly test this behavior.
return (
len(model.buffer) >= self.__max_buffer_length
or (time.time() - self.__max_buffer_wait) >= self.__last_flushed_at
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arroyo primitives manage these kind of concerns for you (when to flush a batch for example). Are you sure about the idea of pushing them into the product code instead ?

@cmanallen
Copy link
Member Author

@fpacifici Thanks for the feedback! Let me preface by saying when I opened this PR I did so for three reasons:

  1. To demonstrate the behaviors I outlined in my streaming platform proposal so you would have something tangible to reference.
  2. To explore new ideas as part of a research project. The goal is exploration so using Arroyo primitives would have gone against my goals.
  3. I'm trying to solve a real problem from a pre-determined starting point with constraints on what can change and how fast.

So there are practical concerns intermixed with educational and exploratory concerns. I've since dropped the educational component and simplified what I'm doing in the PR with this morning's commits (it mostly doesn't impact your review -- but it did simplify the consumer implementation so it might be worth a second look). Hopefully that explains why I made some of the choices I did.

Concerns Around Committing

I agree completely. They were a pain to manage and it was never necessary to manage them in application code (for my use case at least). I moved the offset handling into the RunTime where they're now managed at the platform level.

Why Not Use Arroyo

Its a research project so we're trying something new. But, critically, this is Arroyo. Its not a full implementation but it is an Arroyo strategy. That means I can prefix the step with any number of streaming primitives and I could suffix the step with any number of streaming primitives (could being the operative word because I'm currently hard coding the commit step as the next step -- that's an easy fix so I'm ignoring this oversight).

This RunTime strategy is a generalization of all Arroyo strategies so its not surprising that specialized strategies exist that can solve components of this pipeline. Research aside I do want this to go to production and I'll mention why at the bottom.


So the three concerns I'm keying in on are:

  1. allow the parallelism either via processes and threads without application logic changes
  2. guarantee a pipeline approach that allows the batching step to keep batching new messages while the worker thread performs its work.
  3. hide offset management entirely from the application code.

Three is gone; I've removed it. One is partially solved I think. We can prefix the RunTime step with a multi-threading/processing step. I'm not sure if the RunTime can be embedded into those steps so that may be a shortcoming. That's an area I could look into if this was ever an important component of Arroyo.

Two is not solved as far as I'm aware. I wrote the Buffer strategy in Arroyo and the Reduce strategy implements the Buffer strategy. I'm not aware how those strategies flush their buffers in a worker thread. As far as I know they block the main thread. But if that's not the case and, there is some platform magic happening, then I don't see why the RunTime strategy couldn't also have the benefit of flushing off the main thread.


I want this to go to production; why? tl;dr I can now unit-test my consumer end-to-end.

Testing is a huge concern for me. I've refactored this consumer in the past and its led to production outages (using Arroyo streaming primitives as it happens -- which I don't blame).

There are minor things that Arroyo does that can make testing more difficult. For example in the Reduce strategy we call time.time(). That can make unit-testing difficult. You have to mock or just not test certain behaviors which is not ideal. However, there's a larger problem that I'm trying to address and that's the difficulty in testing how state is threaded through a consumer. Arroyo does not provide any facilities for this and my PR did not have any until this morning.

One of the benefits of managing the state machine in the way I have is that I can intercept the commands being issued by my application and rewrite them. You can see in the MockRunTime class I'm using coroutines to rewrite commands in my test suite. This gives me a lot of insight into what the application is doing and the ability to redirect behavior in a way that does not require monkey patching. I can deterministically simulate all possible states and assert the outcome very cheaply.

My implementation isn't perfect (I haven't abstracted all state yet) but there's already been a significant uplift in what I'm capable of asserting about my software.

There are other reasons but this has already gotten too long so I'll leave it there. Let me know if I addressed your concerns well enough and thanks again for taking the time to review this!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Scope: Backend Automatically applied to PRs that change backend components
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants