Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(event cache): automatically shrink a room's linked chunk when all subscribers are gone #4703

Merged
merged 1 commit into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/matrix-sdk-ui/src/timeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ impl TimelineBuilder {
match origin {
EventsOrigin::Sync => RemoteEventOrigin::Sync,
EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
EventsOrigin::Cache => RemoteEventOrigin::Cache,
},
)
.await;
Expand Down
96 changes: 94 additions & 2 deletions crates/matrix-sdk/src/event_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use ruma::{
};
use tokio::sync::{
broadcast::{error::RecvError, Receiver},
Mutex, RwLock,
mpsc, Mutex, RwLock,
};
use tracing::{error, info, info_span, instrument, trace, warn, Instrument as _, Span};

Expand Down Expand Up @@ -129,6 +129,9 @@ pub struct EventCacheDropHandles {

/// Task that listens to updates to the user's ignored list.
ignore_user_list_update_task: JoinHandle<()>,

/// The task used to automatically shrink the linked chunks.
auto_shrink_linked_chunk_task: JoinHandle<()>,
}

impl Debug for EventCacheDropHandles {
Expand All @@ -141,6 +144,7 @@ impl Drop for EventCacheDropHandles {
fn drop(&mut self) {
self.listen_updates_task.abort();
self.ignore_user_list_update_task.abort();
self.auto_shrink_linked_chunk_task.abort();
}
}

Expand Down Expand Up @@ -172,6 +176,7 @@ impl EventCache {
by_room: Default::default(),
drop_handles: Default::default(),
all_events: Default::default(),
auto_shrink_sender: Default::default(),
}),
}
}
Expand Down Expand Up @@ -213,7 +218,19 @@ impl EventCache {
client.subscribe_to_ignore_user_list_changes(),
));

Arc::new(EventCacheDropHandles { listen_updates_task, ignore_user_list_update_task })
let (tx, rx) = mpsc::channel(32);

// Force-initialize the sender in the [`RoomEventCacheInner`].
self.inner.auto_shrink_sender.get_or_init(|| tx);

let auto_shrink_linked_chunk_tasks =
spawn(Self::auto_shrink_linked_chunk_task(self.inner.clone(), rx));

Arc::new(EventCacheDropHandles {
listen_updates_task,
ignore_user_list_update_task,
auto_shrink_linked_chunk_task: auto_shrink_linked_chunk_tasks,
})
});

Ok(())
Expand Down Expand Up @@ -309,6 +326,60 @@ impl EventCache {
}
}

/// Spawns the task that will listen to auto-shrink notifications.
///
/// The auto-shrink mechanism works this way:
///
/// - Each time there's a new subscriber to a [`RoomEventCache`], it will
/// increment the active number of listeners to that room, aka
/// [`RoomEventCacheState::listener_count`].
/// - When that subscriber is dropped, it will decrement that count; and
/// notify the task below if it reached 0.
/// - The task spawned here, owned by the [`EventCacheInner`], will listen
/// to such notifications that a room may be shrunk. It will attempt an
/// auto-shrink, by letting the inner state decide whether this is a good
/// time to do so (new listeners might have spawned in the meanwhile).
#[instrument(skip_all)]
async fn auto_shrink_linked_chunk_task(
inner: Arc<EventCacheInner>,
mut rx: mpsc::Receiver<AutoShrinkChannelPayload>,
) {
while let Some(room_id) = rx.recv().await {
let room = match inner.for_room(&room_id).await {
Ok(room) => room,
Err(err) => {
warn!(for_room = %room_id, "error when getting a RoomEventCache: {err}");
continue;
}
};

let mut state = room.inner.state.write().await;

match state.auto_shrink_if_no_listeners().await {
Ok(diffs) => {
if let Some(diffs) = diffs {
// Hey, fun stuff: we shrunk the linked chunk, so there shouldn't be any
// listeners, right? RIGHT? Especially because the state is guarded behind
// a lock.
//
// However, better safe than sorry, and it's cheap to send an update here,
// so let's do it!
let _ =
room.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
diffs,
origin: EventsOrigin::Cache,
});
}
}

Err(err) => {
// There's not much we can do here, unfortunately.
warn!(for_room = %room_id, "error when attempting to shrink linked chunk: {err}");
}
}
}
}

/// Return a room-specific view over the [`EventCache`].
pub(crate) async fn for_room(
&self,
Expand Down Expand Up @@ -534,8 +605,18 @@ struct EventCacheInner {

/// Handles to keep alive the task listening to updates.
drop_handles: OnceLock<Arc<EventCacheDropHandles>>,

/// A sender for notifications that a room *may* need to be auto-shrunk.
///
/// Needs to live here, so it may be passed to each [`RoomEventCache`]
/// instance.
///
/// See doc comment of [`EventCache::auto_shrink_linked_chunk_task`].
auto_shrink_sender: OnceLock<mpsc::Sender<AutoShrinkChannelPayload>>,
}

type AutoShrinkChannelPayload = OwnedRoomId;

impl EventCacheInner {
fn client(&self) -> Result<Client> {
self.client.get().ok_or(EventCacheError::ClientDropped)
Expand Down Expand Up @@ -644,12 +725,20 @@ impl EventCacheInner {
RoomVersionId::V1
});

// SAFETY: we must have subscribed before reaching this coed, otherwise
// something is very wrong.
let auto_shrink_sender =
self.auto_shrink_sender.get().cloned().expect(
"we must have called `EventCache::subscribe()` before calling here.",
);

let room_event_cache = RoomEventCache::new(
self.client.clone(),
room_state,
room_id.to_owned(),
room_version,
self.all_events.clone(),
auto_shrink_sender,
);

by_room_guard.insert(room_id.to_owned(), room_event_cache.clone());
Expand Down Expand Up @@ -718,6 +807,9 @@ pub enum EventsOrigin {

/// Events are coming from pagination.
Pagination,

/// The cause of the change is purely internal to the cache.
Cache,
}

#[cfg(test)]
Expand Down
Loading
Loading