From c78fc0f45e625ef7d941c4f8f12aaab37beefe3d Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Thu, 20 Feb 2025 17:07:11 +0100 Subject: [PATCH] feat(event cache): auto-shrink a room event cache's chunk after all listeners are left --- crates/matrix-sdk-ui/src/timeline/builder.rs | 1 + crates/matrix-sdk/src/event_cache/mod.rs | 96 ++++++- crates/matrix-sdk/src/event_cache/room/mod.rs | 247 +++++++++++++++++- 3 files changed, 336 insertions(+), 8 deletions(-) diff --git a/crates/matrix-sdk-ui/src/timeline/builder.rs b/crates/matrix-sdk-ui/src/timeline/builder.rs index 03d3fbf17b3..4626e1ec351 100644 --- a/crates/matrix-sdk-ui/src/timeline/builder.rs +++ b/crates/matrix-sdk-ui/src/timeline/builder.rs @@ -277,6 +277,7 @@ impl TimelineBuilder { match origin { EventsOrigin::Sync => RemoteEventOrigin::Sync, EventsOrigin::Pagination => RemoteEventOrigin::Pagination, + EventsOrigin::Cache => RemoteEventOrigin::Cache, }, ) .await; diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index b3b2d94e8c6..ecbe07fcc1a 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -57,7 +57,7 @@ use ruma::{ }; use tokio::sync::{ broadcast::{error::RecvError, Receiver}, - Mutex, RwLock, + mpsc, Mutex, RwLock, }; use tracing::{error, info, info_span, instrument, trace, warn, Instrument as _, Span}; @@ -129,6 +129,9 @@ pub struct EventCacheDropHandles { /// Task that listens to updates to the user's ignored list. ignore_user_list_update_task: JoinHandle<()>, + + /// The task used to automatically shrink the linked chunks. + auto_shrink_linked_chunk_task: JoinHandle<()>, } impl Debug for EventCacheDropHandles { @@ -141,6 +144,7 @@ impl Drop for EventCacheDropHandles { fn drop(&mut self) { self.listen_updates_task.abort(); self.ignore_user_list_update_task.abort(); + self.auto_shrink_linked_chunk_task.abort(); } } @@ -172,6 +176,7 @@ impl EventCache { by_room: Default::default(), drop_handles: Default::default(), all_events: Default::default(), + auto_shrink_sender: Default::default(), }), } } @@ -213,7 +218,19 @@ impl EventCache { client.subscribe_to_ignore_user_list_changes(), )); - Arc::new(EventCacheDropHandles { listen_updates_task, ignore_user_list_update_task }) + let (tx, rx) = mpsc::channel(32); + + // Force-initialize the sender in the [`RoomEventCacheInner`]. + self.inner.auto_shrink_sender.get_or_init(|| tx); + + let auto_shrink_linked_chunk_tasks = + spawn(Self::auto_shrink_linked_chunk_task(self.inner.clone(), rx)); + + Arc::new(EventCacheDropHandles { + listen_updates_task, + ignore_user_list_update_task, + auto_shrink_linked_chunk_task: auto_shrink_linked_chunk_tasks, + }) }); Ok(()) @@ -309,6 +326,60 @@ impl EventCache { } } + /// Spawns the task that will listen to auto-shrink notifications. + /// + /// The auto-shrink mechanism works this way: + /// + /// - Each time there's a new subscriber to a [`RoomEventCache`], it will + /// increment the active number of listeners to that room, aka + /// [`RoomEventCacheState::listener_count`]. + /// - When that subscriber is dropped, it will decrement that count; and + /// notify the task below if it reached 0. + /// - The task spawned here, owned by the [`EventCacheInner`], will listen + /// to such notifications that a room may be shrunk. It will attempt an + /// auto-shrink, by letting the inner state decide whether this is a good + /// time to do so (new listeners might have spawned in the meanwhile). + #[instrument(skip_all)] + async fn auto_shrink_linked_chunk_task( + inner: Arc, + mut rx: mpsc::Receiver, + ) { + while let Some(room_id) = rx.recv().await { + let room = match inner.for_room(&room_id).await { + Ok(room) => room, + Err(err) => { + warn!(for_room = %room_id, "error when getting a RoomEventCache: {err}"); + continue; + } + }; + + let mut state = room.inner.state.write().await; + + match state.auto_shrink_if_no_listeners().await { + Ok(diffs) => { + if let Some(diffs) = diffs { + // Hey, fun stuff: we shrunk the linked chunk, so there shouldn't be any + // listeners, right? RIGHT? Especially because the state is guarded behind + // a lock. + // + // However, better safe than sorry, and it's cheap to send an update here, + // so let's do it! + let _ = + room.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents { + diffs, + origin: EventsOrigin::Cache, + }); + } + } + + Err(err) => { + // There's not much we can do here, unfortunately. + warn!(for_room = %room_id, "error when attempting to shrink linked chunk: {err}"); + } + } + } + } + /// Return a room-specific view over the [`EventCache`]. pub(crate) async fn for_room( &self, @@ -534,8 +605,18 @@ struct EventCacheInner { /// Handles to keep alive the task listening to updates. drop_handles: OnceLock>, + + /// A sender for notifications that a room *may* need to be auto-shrunk. + /// + /// Needs to live here, so it may be passed to each [`RoomEventCache`] + /// instance. + /// + /// See doc comment of [`EventCache::auto_shrink_linked_chunk_task`]. + auto_shrink_sender: OnceLock>, } +type AutoShrinkChannelPayload = OwnedRoomId; + impl EventCacheInner { fn client(&self) -> Result { self.client.get().ok_or(EventCacheError::ClientDropped) @@ -644,12 +725,20 @@ impl EventCacheInner { RoomVersionId::V1 }); + // SAFETY: we must have subscribed before reaching this coed, otherwise + // something is very wrong. + let auto_shrink_sender = + self.auto_shrink_sender.get().cloned().expect( + "we must have called `EventCache::subscribe()` before calling here.", + ); + let room_event_cache = RoomEventCache::new( self.client.clone(), room_state, room_id.to_owned(), room_version, self.all_events.clone(), + auto_shrink_sender, ); by_room_guard.insert(room_id.to_owned(), room_event_cache.clone()); @@ -718,6 +807,9 @@ pub enum EventsOrigin { /// Events are coming from pagination. Pagination, + + /// The cause of the change is purely internal to the cache. + Cache, } #[cfg(test)] diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 1674423d1c5..6d0b6756e42 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -14,7 +14,15 @@ //! All event cache types for a single room. -use std::{collections::BTreeMap, fmt, sync::Arc}; +use std::{ + collections::BTreeMap, + fmt, + ops::{Deref, DerefMut}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, +}; use events::Gap; use eyeball_im::VectorDiff; @@ -29,13 +37,14 @@ use ruma::{ }; use tokio::sync::{ broadcast::{Receiver, Sender}, - Notify, RwLock, + mpsc, Notify, RwLock, }; use tracing::{trace, warn}; use super::{ paginator::{Paginator, PaginatorState}, - AllEventsCache, EventsOrigin, Result, RoomEventCacheUpdate, RoomPagination, + AllEventsCache, AutoShrinkChannelPayload, EventsOrigin, Result, RoomEventCacheUpdate, + RoomPagination, }; use crate::{client::WeakClient, room::WeakRoom}; @@ -55,6 +64,74 @@ impl fmt::Debug for RoomEventCache { } } +/// Thin wrapper for a room event cache listener, so as to trigger side-effects +/// when all listeners are gone. +#[allow(missing_debug_implementations)] +pub struct RoomEventCacheListener { + /// Underlying receiver of the room event cache's updates. + recv: Receiver, + + /// To which room are we listening? + room_id: OwnedRoomId, + + /// Sender to the auto-shrink channel. + auto_shrink_sender: mpsc::Sender, + + /// Shared instance of the auto-shrinker. + listener_count: Arc, +} + +impl Drop for RoomEventCacheListener { + fn drop(&mut self) { + let previous_listener_count = self.listener_count.fetch_sub(1, Ordering::SeqCst); + if previous_listener_count == 1 { + // We were the last instance of the listener; let the auto-shrinker know by + // notifying it of our room id. + + let mut room_id = self.room_id.clone(); + + // Try to send without waiting for channel capacity, and restart in a spin-loop + // if it failed (until a maximum number of attempts is reached, or + // the send was successful). The channel shouldn't be super busy in + // general, so this should resolve quickly enough. + + let mut num_attempts = 0; + + while let Err(err) = self.auto_shrink_sender.try_send(room_id) { + num_attempts += 1; + + if num_attempts > 1024 { + // If we've tried too many times, just give up with a warning; after all, this + // is only an optimization. + warn!("couldn't send notification to the auto-shrink channel after 1024 attempts; giving up"); + return; + } + + match err { + mpsc::error::TrySendError::Full(stolen_room_id) => { + room_id = stolen_room_id; + } + mpsc::error::TrySendError::Closed(_) => return, + } + } + } + } +} + +impl Deref for RoomEventCacheListener { + type Target = Receiver; + + fn deref(&self) -> &Self::Target { + &self.recv + } +} + +impl DerefMut for RoomEventCacheListener { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.recv + } +} + impl RoomEventCache { /// Create a new [`RoomEventCache`] using the given room and store. pub(super) fn new( @@ -63,6 +140,7 @@ impl RoomEventCache { room_id: OwnedRoomId, room_version: RoomVersionId, all_events_cache: Arc>, + auto_shrink_sender: mpsc::Sender, ) -> Self { Self { inner: Arc::new(RoomEventCacheInner::new( @@ -71,17 +149,28 @@ impl RoomEventCache { room_id, room_version, all_events_cache, + auto_shrink_sender, )), } } /// Subscribe to this room updates, after getting the initial list of /// events. - pub async fn subscribe(&self) -> (Vec, Receiver) { + pub async fn subscribe(&self) -> (Vec, RoomEventCacheListener) { let state = self.inner.state.read().await; let events = state.events().events().map(|(_position, item)| item.clone()).collect(); - (events, self.inner.sender.subscribe()) + state.listener_count.fetch_add(1, Ordering::SeqCst); + + let recv = self.inner.sender.subscribe(); + let listener = RoomEventCacheListener { + recv, + room_id: self.inner.room_id.clone(), + auto_shrink_sender: self.inner.auto_shrink_sender.clone(), + listener_count: state.listener_count.clone(), + }; + + (events, listener) } /// Return a [`RoomPagination`] API object useful for running @@ -224,6 +313,12 @@ pub(super) struct RoomEventCacheInner { /// paginator is only used for queries that interact with the actual event /// cache. pub paginator: Paginator, + + /// Sender to the auto-shrink channel. + /// + /// See doc comment around [`EventCache::auto_shrink_linked_chunk_task`] for + /// more details. + auto_shrink_sender: mpsc::Sender, } impl RoomEventCacheInner { @@ -235,6 +330,7 @@ impl RoomEventCacheInner { room_id: OwnedRoomId, room_version: RoomVersionId, all_events_cache: Arc>, + auto_shrink_sender: mpsc::Sender, ) -> Self { let sender = Sender::new(32); let weak_room = WeakRoom::new(client, room_id); @@ -246,6 +342,7 @@ impl RoomEventCacheInner { sender, pagination_batch_token_notifier: Default::default(), paginator: Paginator::new(weak_room), + auto_shrink_sender, } } @@ -543,7 +640,7 @@ pub(super) enum LoadMoreEventsBackwardsOutcome { // Use a private module to hide `events` to this parent module. mod private { - use std::sync::Arc; + use std::sync::{atomic::AtomicUsize, Arc}; use eyeball_im::VectorDiff; use matrix_sdk_base::{ @@ -584,6 +681,10 @@ mod private { /// the first time we try to run backward pagination. We reset /// that upon clearing the timeline events. pub waited_for_initial_prev_token: bool, + + /// An atomic count of the current number of listeners of the + /// [`super::RoomEventCache`]. + pub(super) listener_count: Arc, } impl RoomEventCacheState { @@ -640,6 +741,7 @@ mod private { events, deduplicator, waited_for_initial_prev_token: false, + listener_count: Default::default(), }) } @@ -821,6 +923,21 @@ mod private { Ok(Some(self.events.updates_as_vector_diffs())) } + /// Automatically shrink the room if there are no listeners, as + /// indicated by the atomic number of active listeners. + #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"] + pub(crate) async fn auto_shrink_if_no_listeners( + &mut self, + ) -> Result>>, EventCacheError> { + if self.listener_count.load(std::sync::atomic::Ordering::SeqCst) == 0 { + // If we are the last strong reference to the auto-shrinker, we can shrink the + // events data structure to its last chunk. + self.shrink_to_last_chunk().await + } else { + Ok(None) + } + } + /// Removes the bundled relations from an event, if they were present. /// /// Only replaces the present if it contained bundled relations. @@ -1986,4 +2103,122 @@ mod tests { assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1)); assert!(outcome.reached_start); } + + #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support. + #[async_test] + async fn test_auto_shrink_after_all_subscribers_are_gone() { + use eyeball_im::VectorDiff; + use tokio::task::yield_now; + + use crate::{assert_let_timeout, event_cache::RoomEventCacheUpdate}; + + let room_id = room_id!("!galette:saucisse.bzh"); + + let client = MockClientBuilder::new("http://localhost".to_owned()).build().await; + + let f = EventFactory::new().room(room_id); + + let evid1 = event_id!("$1"); + let evid2 = event_id!("$2"); + + let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event(); + let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event(); + + // Fill the event cache store with an initial linked chunk with 2 events chunks. + { + let store = client.event_cache_store(); + let store = store.lock().await.unwrap(); + store + .handle_linked_chunk_updates( + room_id, + vec![ + Update::NewItemsChunk { + previous: None, + new: ChunkIdentifier::new(0), + next: None, + }, + Update::PushItems { + at: Position::new(ChunkIdentifier::new(0), 0), + items: vec![ev1], + }, + Update::NewItemsChunk { + previous: Some(ChunkIdentifier::new(0)), + new: ChunkIdentifier::new(1), + next: None, + }, + Update::PushItems { + at: Position::new(ChunkIdentifier::new(1), 0), + items: vec![ev2], + }, + ], + ) + .await + .unwrap(); + } + + let event_cache = client.event_cache(); + event_cache.subscribe().unwrap(); + event_cache.enable_storage().unwrap(); + + client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); + let room = client.get_room(room_id).unwrap(); + let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); + + // Sanity check: lazily loaded, so only includes one item at start. + let (events1, mut stream1) = room_event_cache.subscribe().await; + assert_eq!(events1.len(), 1); + assert_eq!(events1[0].event_id().as_deref(), Some(evid2)); + assert!(stream1.is_empty()); + + // Force loading the full linked chunk by back-paginating. + let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap(); + assert_eq!(outcome.events.len(), 1); + assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1)); + assert!(outcome.reached_start); + + // We also get an update about the loading from the store. Ignore it, for this + // test's sake. + assert_let_timeout!( + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream1.recv() + ); + assert_eq!(diffs.len(), 1); + assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => { + assert_eq!(value.event_id().as_deref(), Some(evid1)); + }); + + assert!(stream1.is_empty()); + + // Have another listener subscribe to the event cache. + // Since it's not the first one, and the previous one loaded some more events, + // the second listener seems them all. + let (events2, stream2) = room_event_cache.subscribe().await; + assert_eq!(events2.len(), 2); + assert_eq!(events2[0].event_id().as_deref(), Some(evid1)); + assert_eq!(events2[1].event_id().as_deref(), Some(evid2)); + assert!(stream2.is_empty()); + + // Drop the first stream, and wait a bit. + drop(stream1); + yield_now().await; + + // The second stream remains undisturbed. + assert!(stream2.is_empty()); + + // Now drop the second stream, and wait a bit. + drop(stream2); + yield_now().await; + + // The linked chunk must have auto-shrunk by now. + + { + // Check the inner state: there's no more shared auto-shrinker. + let state = room_event_cache.inner.state.read().await; + assert_eq!(state.listener_count.load(std::sync::atomic::Ordering::SeqCst), 0); + } + + // Getting the events will only give us the latest chunk. + let (events3, _stream2) = room_event_cache.subscribe().await; + assert_eq!(events3.len(), 1); + assert_eq!(events3[0].event_id().as_deref(), Some(evid2)); + } }