Skip to content

Commit

Permalink
feat(event cache): auto-shrink a room event cache's chunk after all l…
Browse files Browse the repository at this point in the history
…isteners are left
  • Loading branch information
bnjbvr committed Feb 24, 2025
1 parent f3f37a3 commit c78fc0f
Show file tree
Hide file tree
Showing 3 changed files with 336 additions and 8 deletions.
1 change: 1 addition & 0 deletions crates/matrix-sdk-ui/src/timeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ impl TimelineBuilder {
match origin {
EventsOrigin::Sync => RemoteEventOrigin::Sync,
EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
EventsOrigin::Cache => RemoteEventOrigin::Cache,
},
)
.await;
Expand Down
96 changes: 94 additions & 2 deletions crates/matrix-sdk/src/event_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use ruma::{
};
use tokio::sync::{
broadcast::{error::RecvError, Receiver},
Mutex, RwLock,
mpsc, Mutex, RwLock,
};
use tracing::{error, info, info_span, instrument, trace, warn, Instrument as _, Span};

Expand Down Expand Up @@ -129,6 +129,9 @@ pub struct EventCacheDropHandles {

/// Task that listens to updates to the user's ignored list.
ignore_user_list_update_task: JoinHandle<()>,

/// The task used to automatically shrink the linked chunks.
auto_shrink_linked_chunk_task: JoinHandle<()>,
}

impl Debug for EventCacheDropHandles {
Expand All @@ -141,6 +144,7 @@ impl Drop for EventCacheDropHandles {
fn drop(&mut self) {
self.listen_updates_task.abort();
self.ignore_user_list_update_task.abort();
self.auto_shrink_linked_chunk_task.abort();
}
}

Expand Down Expand Up @@ -172,6 +176,7 @@ impl EventCache {
by_room: Default::default(),
drop_handles: Default::default(),
all_events: Default::default(),
auto_shrink_sender: Default::default(),
}),
}
}
Expand Down Expand Up @@ -213,7 +218,19 @@ impl EventCache {
client.subscribe_to_ignore_user_list_changes(),
));

Arc::new(EventCacheDropHandles { listen_updates_task, ignore_user_list_update_task })
let (tx, rx) = mpsc::channel(32);

// Force-initialize the sender in the [`RoomEventCacheInner`].
self.inner.auto_shrink_sender.get_or_init(|| tx);

let auto_shrink_linked_chunk_tasks =
spawn(Self::auto_shrink_linked_chunk_task(self.inner.clone(), rx));

Arc::new(EventCacheDropHandles {
listen_updates_task,
ignore_user_list_update_task,
auto_shrink_linked_chunk_task: auto_shrink_linked_chunk_tasks,
})
});

Ok(())
Expand Down Expand Up @@ -309,6 +326,60 @@ impl EventCache {
}
}

/// Spawns the task that will listen to auto-shrink notifications.
///
/// The auto-shrink mechanism works this way:
///
/// - Each time there's a new subscriber to a [`RoomEventCache`], it will
/// increment the active number of listeners to that room, aka
/// [`RoomEventCacheState::listener_count`].
/// - When that subscriber is dropped, it will decrement that count; and
/// notify the task below if it reached 0.
/// - The task spawned here, owned by the [`EventCacheInner`], will listen
/// to such notifications that a room may be shrunk. It will attempt an
/// auto-shrink, by letting the inner state decide whether this is a good
/// time to do so (new listeners might have spawned in the meanwhile).
#[instrument(skip_all)]
async fn auto_shrink_linked_chunk_task(
inner: Arc<EventCacheInner>,
mut rx: mpsc::Receiver<AutoShrinkChannelPayload>,
) {
while let Some(room_id) = rx.recv().await {
let room = match inner.for_room(&room_id).await {
Ok(room) => room,
Err(err) => {
warn!(for_room = %room_id, "error when getting a RoomEventCache: {err}");
continue;
}
};

let mut state = room.inner.state.write().await;

match state.auto_shrink_if_no_listeners().await {
Ok(diffs) => {
if let Some(diffs) = diffs {
// Hey, fun stuff: we shrunk the linked chunk, so there shouldn't be any
// listeners, right? RIGHT? Especially because the state is guarded behind
// a lock.
//
// However, better safe than sorry, and it's cheap to send an update here,
// so let's do it!
let _ =
room.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
diffs,
origin: EventsOrigin::Cache,
});
}
}

Err(err) => {
// There's not much we can do here, unfortunately.
warn!(for_room = %room_id, "error when attempting to shrink linked chunk: {err}");
}
}
}
}

/// Return a room-specific view over the [`EventCache`].
pub(crate) async fn for_room(
&self,
Expand Down Expand Up @@ -534,8 +605,18 @@ struct EventCacheInner {

/// Handles to keep alive the task listening to updates.
drop_handles: OnceLock<Arc<EventCacheDropHandles>>,

/// A sender for notifications that a room *may* need to be auto-shrunk.
///
/// Needs to live here, so it may be passed to each [`RoomEventCache`]
/// instance.
///
/// See doc comment of [`EventCache::auto_shrink_linked_chunk_task`].
auto_shrink_sender: OnceLock<mpsc::Sender<AutoShrinkChannelPayload>>,
}

type AutoShrinkChannelPayload = OwnedRoomId;

impl EventCacheInner {
fn client(&self) -> Result<Client> {
self.client.get().ok_or(EventCacheError::ClientDropped)
Expand Down Expand Up @@ -644,12 +725,20 @@ impl EventCacheInner {
RoomVersionId::V1
});

// SAFETY: we must have subscribed before reaching this coed, otherwise
// something is very wrong.
let auto_shrink_sender =
self.auto_shrink_sender.get().cloned().expect(
"we must have called `EventCache::subscribe()` before calling here.",
);

let room_event_cache = RoomEventCache::new(
self.client.clone(),
room_state,
room_id.to_owned(),
room_version,
self.all_events.clone(),
auto_shrink_sender,
);

by_room_guard.insert(room_id.to_owned(), room_event_cache.clone());
Expand Down Expand Up @@ -718,6 +807,9 @@ pub enum EventsOrigin {

/// Events are coming from pagination.
Pagination,

/// The cause of the change is purely internal to the cache.
Cache,
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit c78fc0f

Please sign in to comment.