Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(replays): Add buffered consumer implementation #85356

Open
wants to merge 55 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
b8cb413
Add buffered consumer runtime implementation
cmanallen Feb 18, 2025
d82c589
Begin adding process logic
cmanallen Feb 19, 2025
e82ce0f
Add tracing to each component of recording processing
cmanallen Feb 19, 2025
55cc3af
Delete unused function
cmanallen Feb 19, 2025
a243bdb
Report size metrics
cmanallen Feb 19, 2025
3d728b5
Merge branch 'cmanallen/replays-improve-tracing' into cmanallen/repla…
cmanallen Feb 19, 2025
890f170
Separate IO from processing
cmanallen Feb 19, 2025
2dcfaec
Add explicit return
cmanallen Feb 19, 2025
9e4e227
Merge branch 'cmanallen/replays-consumer-separate-processing-from-io'…
cmanallen Feb 19, 2025
db5fa11
Add buffer managers
cmanallen Feb 19, 2025
7a5a98c
Write FilePart rows and adopt a new subscription model
cmanallen Feb 20, 2025
44c899b
Add unit tests
cmanallen Feb 20, 2025
de76ad0
Add contextual errors
cmanallen Feb 20, 2025
abf22cf
Misc test updates
cmanallen Feb 21, 2025
2e16a01
Fully separate compute and io within the recording consumer
cmanallen Feb 24, 2025
208360f
Configure max workers
cmanallen Feb 25, 2025
13ea6ac
Merge branch 'master' into cmanallen/replays-add-separated-compute-an…
cmanallen Feb 25, 2025
fc8df9c
Remove conditional branch as its moved further up the hierarchy
cmanallen Feb 25, 2025
b0f518c
Merge branch 'cmanallen/replays-add-separated-compute-and-io' into cm…
cmanallen Feb 25, 2025
536c250
Use context manager
cmanallen Feb 25, 2025
ea571fa
Simplify buffer flushing (for now)
cmanallen Feb 25, 2025
1826ccb
Merge branch 'master' into cmanallen/replays-optimize-consumer
cmanallen Feb 27, 2025
3733886
Update tracing logic
cmanallen Feb 27, 2025
88747c1
Soften flag requirements and minor fixes
cmanallen Feb 27, 2025
74e811f
Remove buffer managers module
cmanallen Feb 27, 2025
8f8e5ce
Test clean up
cmanallen Feb 27, 2025
fac3204
Fix unit test
cmanallen Feb 27, 2025
ae59eb9
Add explicit return
cmanallen Feb 27, 2025
a6a32e5
Fix typing
cmanallen Feb 27, 2025
f1026ca
Remove unused option
cmanallen Feb 27, 2025
b363596
Reset dom_index module to align with master
cmanallen Feb 27, 2025
c656420
Update buffering run-time coverage
cmanallen Feb 27, 2025
29bb826
Update test ordering
cmanallen Feb 27, 2025
b4c6477
Add offset committing test
cmanallen Feb 27, 2025
2572955
Add docs
cmanallen Feb 27, 2025
7ca7462
More docstring fixes
cmanallen Feb 27, 2025
fb5731e
Add typing to flags and factory module
cmanallen Feb 27, 2025
17eb011
Adopt buffered strategy in callsite
cmanallen Feb 28, 2025
0c79eac
Add script for mocking recordings
cmanallen Feb 28, 2025
30ed5d6
Add handling for appending offsets when the message is not buffered
cmanallen Feb 28, 2025
52d8616
Add commit coverage
cmanallen Feb 28, 2025
b334400
Assert messages are committed regardless of if they're appended to th…
cmanallen Feb 28, 2025
8618e29
Fix typing
cmanallen Feb 28, 2025
0346a3c
Docstrings
cmanallen Feb 28, 2025
4ab0ff2
More docstrings
cmanallen Feb 28, 2025
77c2971
Yet more docstrings
cmanallen Feb 28, 2025
b956ad9
Implement declarative effect management
cmanallen Mar 3, 2025
5e2ce4f
Merge branch 'master' into cmanallen/replays-optimize-consumer
cmanallen Mar 3, 2025
fa42004
Move offset management into the runtime
cmanallen Mar 3, 2025
2215007
Fix typing
cmanallen Mar 3, 2025
4cc15d7
Remove comments on offsets
cmanallen Mar 3, 2025
4063f5d
Remove none-type messages
cmanallen Mar 3, 2025
ea64a50
Move offsets out of runtime and into platform strategy. Add support f…
cmanallen Mar 3, 2025
62d7433
Fix typing
cmanallen Mar 3, 2025
895185b
Docstrings and renames
cmanallen Mar 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions bin/mock-replay-recording
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#!/usr/bin/env python
""".

Helpful commands:

- Run the consumer.
- `sentry run consumer ingest-replay-recordings --consumer-group 0`
- Check if offsets are committed correctly.
- `docker exec -it kafka-kafka-1 kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group 0`
"""
from sentry.runner import configure

configure()
import logging
import os
import time
import uuid

import click
from arroyo import Topic as ArroyoTopic
from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
from sentry_kafka_schemas.codecs import Codec
from sentry_kafka_schemas.schema_types.ingest_replay_recordings_v1 import ReplayRecording

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "sentry.conf.server")

import django

django.setup()

from sentry.conf.types.kafka_definition import Topic, get_topic_codec
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition

logger = logging.getLogger(__name__)


def get_producer() -> KafkaProducer:
cluster_name = get_topic_definition(Topic.INGEST_REPLAYS_RECORDINGS)["cluster"]
producer_config = get_kafka_producer_cluster_options(cluster_name)
return KafkaProducer(build_kafka_configuration(default_config=producer_config))


RECORDING_CODEC: Codec[ReplayRecording] = get_topic_codec(Topic.INGEST_REPLAYS_RECORDINGS)


@click.command()
@click.option("--organization-id", type=int, required=True, help="Organization ID")
@click.option("--project-id", type=int, required=True, help="Project ID")
def main(organization_id: int, project_id: int) -> None:
"""Produce a mock uptime result message to the INGEST_REPLAYS_RECORDINGS topic."""
message: ReplayRecording = {
"key_id": None,
"org_id": organization_id,
"payload": b'{"segment_id"',
"project_id": project_id,
"received": int(time.time()),
"replay_event": None,
"replay_id": uuid.uuid4().hex,
"replay_video": None,
"retention_days": 30,
"type": "replay_recording_not_chunked",
"version": 1,
}

producer = get_producer()
topic = get_topic_definition(Topic.INGEST_REPLAYS_RECORDINGS)["real_topic_name"]
payload = KafkaPayload(None, RECORDING_CODEC.encode(message), [])

producer.produce(ArroyoTopic(topic), payload)
producer.close()

logger.info("Successfully produced message to %s", topic)


if __name__ == "__main__":
main()
25 changes: 6 additions & 19 deletions src/sentry/consumers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,24 +84,11 @@ def ingest_replay_recordings_options() -> list[click.Option]:

def ingest_replay_recordings_buffered_options() -> list[click.Option]:
"""Return a list of ingest-replay-recordings-buffered options."""
options = [
click.Option(
["--max-buffer-message-count", "max_buffer_message_count"],
type=int,
default=100,
),
click.Option(
["--max-buffer-size-in-bytes", "max_buffer_size_in_bytes"],
type=int,
default=2_500_000,
),
click.Option(
["--max-buffer-time-in-seconds", "max_buffer_time_in_seconds"],
type=int,
default=1,
),
return [
click.Option(["--max-buffer-length", "max_buffer_length"], type=int, default=8),
click.Option(["--max-buffer-wait", "max_buffer_wait"], type=int, default=1),
click.Option(["--max-workers", "max_workers"], type=int, default=8),
]
return options


def ingest_monitors_options() -> list[click.Option]:
Expand Down Expand Up @@ -269,8 +256,8 @@ def ingest_transactions_options() -> list[click.Option]:
},
"ingest-replay-recordings": {
"topic": Topic.INGEST_REPLAYS_RECORDINGS,
"strategy_factory": "sentry.replays.consumers.recording.ProcessReplayRecordingStrategyFactory",
"click_options": ingest_replay_recordings_options(),
"strategy_factory": "sentry.replays.consumers.buffered.factory.PlatformStrategyFactory",
"click_options": ingest_replay_recordings_buffered_options(),
},
"ingest-replay-recordings-buffered": {
"topic": Topic.INGEST_REPLAYS_RECORDINGS,
Expand Down
Empty file.
196 changes: 196 additions & 0 deletions src/sentry/replays/consumers/buffered/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
"""Session Replay recording consumer implementation.

The consumer implementation follows a batching flush strategy. We accept messages, process them,
buffer them, and when some threshold is reached we flush the buffer. The batch has finished work
after the buffer is flushed so we commit with a None value.
"""

import contextlib
import time
from concurrent.futures import FIRST_EXCEPTION, ThreadPoolExecutor, wait
from dataclasses import dataclass
from typing import TypedDict

import sentry_sdk

from sentry.replays.consumers.buffered.platform import (
Cmd,
Commit,
Effect,
Join,
Nothing,
Poll,
RunTime,
Sub,
Task,
)
from sentry.replays.usecases.ingest import (
DropSilently,
ProcessedRecordingMessage,
commit_recording_message,
parse_recording_message,
process_recording_message,
track_recording_metadata,
)

# Types.


class Flags(TypedDict):
max_buffer_length: int
max_buffer_wait: int
max_workers: int


@dataclass
class Model:
buffer: list[ProcessedRecordingMessage]
last_flushed_at: float
max_buffer_length: int
max_buffer_wait: int
max_workers: int


@dataclass(frozen=True)
class Append:
"""Append the item to the buffer."""

item: ProcessedRecordingMessage


class Committed:
"""The platform committed offsets. Our buffer is now completely done."""


class Flush:
"""Our application hit the flush threshold and has been instructed to flush."""


@dataclass(frozen=True)
class Flushed:
"""Our application successfully flushed."""

now: float


class Skip:
"""Skip the message."""


@dataclass(frozen=True)
class TryFlush:
"""Instruct the application to flush the buffer if its time."""

now: float


# A "Msg" is the union of all application messages our RunTime will accept.
Msg = Append | Committed | Flush | Flushed | Skip | TryFlush


# State machine functions.


def init(flags: Flags) -> tuple[Model, Cmd[Msg, None]]:
return (
Model(
buffer=[],
last_flushed_at=time.time(),
max_buffer_wait=flags["max_buffer_wait"],
max_workers=flags["max_workers"],
max_buffer_length=flags["max_buffer_length"],
),
Nothing(),
)


@sentry_sdk.trace
def process(_: Model, message: bytes) -> Msg:
try:
item = process_recording_message(parse_recording_message(message))
return Append(item=item)
except Exception:
return Skip()


def update(model: Model, msg: Msg) -> tuple[Model, Cmd[Msg, None]]:
match msg:
case Append(item=item):
model.buffer.append(item)
return (model, Effect(fun=time.time, msg=lambda now: TryFlush(now=now)))
case Skip():
return (model, Effect(fun=time.time, msg=lambda now: TryFlush(now=now)))
case Committed():
return (model, Nothing())
case Flush():
return (model, Effect(fun=FlushBuffer(model), msg=lambda now: Flushed(now=now)))
case Flushed(now=now):
model.buffer = []
model.last_flushed_at = now
return (model, Commit(msg=Committed(), value=None))
case TryFlush(now=now):
return (model, Task(msg=Flush())) if can_flush(model, now) else (model, Nothing())


def subscription(model: Model) -> list[Sub[Msg]]:
return [
Join(msg=Flush),
Poll(msg=lambda: TryFlush(now=time.time())),
]


# Helpers.


def can_flush(model: Model, now: float) -> bool:
return (
len(model.buffer) >= model.max_buffer_length
or (now - model.max_buffer_wait) >= model.last_flushed_at
)


@dataclass(frozen=True)
class FlushBuffer:
model: Model

def __call__(self) -> float:
@sentry_sdk.trace
def flush_message(message: ProcessedRecordingMessage) -> None:
with contextlib.suppress(DropSilently):
commit_recording_message(message)

if len(self.model.buffer) == 0:
return time.time()

with ThreadPoolExecutor(max_workers=self.model.max_workers) as pool:
futures = [pool.submit(flush_message, message) for message in self.model.buffer]

# Tasks can fail. We check the done set for any failures. We will wait for all the
# futures to complete before running this step or eagerly run this step if any task
# errors.
done, _ = wait(futures, return_when=FIRST_EXCEPTION)
for future in done:
exc = future.exception()
# Raising preserves the existing behavior. We can address error handling in a
# follow up.
if exc is not None and not isinstance(exc, DropSilently):
raise exc

# Recording metadata is not tracked in the threadpool. This is because this function will
# log. Logging will acquire a lock and make our threading less useful due to the speed of
# the I/O we do in this step.
for message in self.model.buffer:
track_recording_metadata(message)

return time.time()


# Consumer.


recording_consumer = RunTime(
init=init,
process=process,
subscription=subscription,
update=update,
)
34 changes: 34 additions & 0 deletions src/sentry/replays/consumers/buffered/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Session Replay recording consumer strategy factory.

This module exists solely to abstract the bootstraping process of the application and runtime in
`sentry/consumers/__init__.py`.
"""

from collections.abc import Mapping

from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.processing.strategies import ProcessingStrategy, ProcessingStrategyFactory
from arroyo.processing.strategies.commit import CommitOffsets
from arroyo.types import Commit, Partition

from sentry.replays.consumers.buffered.consumer import Flags, recording_consumer
from sentry.replays.consumers.buffered.platform import PlatformStrategy


class PlatformStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):

def __init__(self, max_buffer_length: int, max_buffer_wait: int, max_workers: int) -> None:
self.flags: Flags = {
"max_buffer_length": max_buffer_length,
"max_buffer_wait": max_buffer_wait,
"max_workers": max_workers,
}

def create_with_partitions(
self,
commit: Commit,
partitions: Mapping[Partition, int],
) -> ProcessingStrategy[KafkaPayload]:
return PlatformStrategy(
next_step=CommitOffsets(commit), flags=self.flags, runtime=recording_consumer
)
Loading
Loading