-
-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(replays): Add buffered consumer implementation #85356
base: master
Are you sure you want to change the base?
Conversation
…ys-consumer-separate-processing-from-io
… into cmanallen/replays-optimize-consumer
Codecov ReportAttention: Patch coverage is ✅ All tests successful. No failed tests found. Additional details and impacted files@@ Coverage Diff @@
## master #85356 +/- ##
========================================
Coverage 87.88% 87.88%
========================================
Files 9714 9720 +6
Lines 550639 551075 +436
Branches 21449 21449
========================================
+ Hits 483917 484334 +417
- Misses 66342 66361 +19
Partials 380 380 |
…anallen/replays-optimize-consumer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would advise against building a consumer with this abstraction and make it work on top of Arroyo.
- Arroyo provides an abstraction level that is similar to the one provided by your PlatformStrategy. The two, though, go in fundamentally different directions. Building one on top of the other seems to create a quite complex architecture. It is quite hard to intuitively tell whether the system guarantees at least once delivery.
- We will not be able to move parts of the processing on a multi process pool in case it is needed for scale as that qould require using the RunTask abstraction.
- As we discussed in the past. I believe the dataflow model (even the small subset provided by arroyo) makes this kind of applications simpler to understand due to the sequential nature of the pipeline.
def create_with_partitions( | ||
self, | ||
commit: ArroyoCommit, | ||
partitions: Mapping[Partition, int], | ||
) -> ProcessingStrategy[KafkaPayload]: | ||
return PlatformStrategy(commit=commit, flags=self.flags, runtime=recording_runtime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it be considerably simpler to model this consumer as a sequence of these Arroyo operators:
- Batch the messages with the Reduce primitive (https://getsentry.github.io/arroyo/strategies/reduce.html#reduce-fold)
- Run Task in either multi threading or multi processing (https://getsentry.github.io/arroyo/strategies/reduce.html#reduce-fold) to perform the parsing and the
commit_recording_message
?
Modeling the system this way would:
- allow the parallelism either via processes and threads without application logic changes
- guarantee a pipeline approach that allows the batching step to keep batching new messages while the worker thread performs its work.
- hide offset management entirely from the application code.
def can_flush(self, model: Model[ProcessedRecordingMessage]) -> bool: | ||
# TODO: time.time is stateful and hard to test. We should enable the RunTime to perform | ||
# managed effects so we can properly test this behavior. | ||
return ( | ||
len(model.buffer) >= self.__max_buffer_length | ||
or (time.time() - self.__max_buffer_wait) >= self.__last_flushed_at | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Arroyo primitives manage these kind of concerns for you (when to flush a batch for example). Are you sure about the idea of pushing them into the product code instead ?
@fpacifici Thanks for the feedback! Let me preface by saying when I opened this PR I did so for three reasons:
So there are practical concerns intermixed with educational and exploratory concerns. I've since dropped the educational component and simplified what I'm doing in the PR with this morning's commits (it mostly doesn't impact your review -- but it did simplify the consumer implementation so it might be worth a second look). Hopefully that explains why I made some of the choices I did. Concerns Around CommittingI agree completely. They were a pain to manage and it was never necessary to manage them in application code (for my use case at least). I moved the offset handling into the RunTime where they're now managed at the platform level. Why Not Use ArroyoIts a research project so we're trying something new. But, critically, this is Arroyo. Its not a full implementation but it is an Arroyo strategy. That means I can prefix the step with any number of streaming primitives and I could suffix the step with any number of streaming primitives (could being the operative word because I'm currently hard coding the commit step as the next step -- that's an easy fix so I'm ignoring this oversight). This RunTime strategy is a generalization of all Arroyo strategies so its not surprising that specialized strategies exist that can solve components of this pipeline. Research aside I do want this to go to production and I'll mention why at the bottom. So the three concerns I'm keying in on are:
Three is gone; I've removed it. One is partially solved I think. We can prefix the RunTime step with a multi-threading/processing step. I'm not sure if the RunTime can be embedded into those steps so that may be a shortcoming. That's an area I could look into if this was ever an important component of Arroyo. Two is not solved as far as I'm aware. I wrote the I want this to go to production; why? tl;dr I can now unit-test my consumer end-to-end. Testing is a huge concern for me. I've refactored this consumer in the past and its led to production outages (using Arroyo streaming primitives as it happens -- which I don't blame). There are minor things that Arroyo does that can make testing more difficult. For example in the Reduce strategy we call One of the benefits of managing the state machine in the way I have is that I can intercept the commands being issued by my application and rewrite them. You can see in the MockRunTime class I'm using coroutines to rewrite commands in my test suite. This gives me a lot of insight into what the application is doing and the ability to redirect behavior in a way that does not require monkey patching. I can deterministically simulate all possible states and assert the outcome very cheaply. My implementation isn't perfect (I haven't abstracted all state yet) but there's already been a significant uplift in what I'm capable of asserting about my software. There are other reasons but this has already gotten too long so I'll leave it there. Let me know if I addressed your concerns well enough and thanks again for taking the time to review this! |
TODO description...
Partially addresses the DACI: https://www.notion.so/sentry/DACI-Session-Replay-Recording-Consumer-Stability-and-Performance-Improvements-19e8b10e4b5d80a192a1ecd46f13eebb
How it works:
This closely mirrors current production behavior except processing is no longer done in the thread-pool.
This PR also introduces a new RunTime abstraction for managing state changes in the consumer. Which I will document in a Notion doc.