Skip to content

Commit

Permalink
refactor(crypto): Keep a long-lived DecryptionRetryTask in TimelineCo…
Browse files Browse the repository at this point in the history
…ntroller
  • Loading branch information
andybalaam committed Feb 28, 2025
1 parent dbaa36e commit 8cd7085
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::timeline::{
/// The underlying async task will stop soon after the [`DecryptionRetryTask`]
/// is dropped, because it waits for the channel to close, which happens when we
/// drop the sending side.
#[derive(Clone, Debug)]
pub struct DecryptionRetryTask<D: Decryptor> {
/// The sending side of the channel that we have open to the long-running
/// async task. Every time we want to retry decrypting some events, we
Expand All @@ -60,29 +61,30 @@ const CHANNEL_BUFFER_SIZE: usize = 100;
impl<D: Decryptor> DecryptionRetryTask<D> {
pub(crate) fn new<P: RoomDataProvider>(
state: Arc<RwLock<TimelineState>>,
settings: TimelineSettings,
room_data_provider: P,
) -> Self {
// We will send decryption requests down this channel to the long-running task
let (sender, receiver) = mpsc::channel(CHANNEL_BUFFER_SIZE);

// Spawn the long-running task, providing the receiver so we can listen for
// decryption requests
let handle = matrix_sdk::executor::spawn(decryption_task(
state.clone(),
settings,
room_data_provider,
receiver,
));
let handle =
matrix_sdk::executor::spawn(decryption_task(state, room_data_provider, receiver));

// Keep hold of the sender so we can send off decryption requests to the task.
Self { sender, _task_handle: Arc::new(handle) }
}

/// Use the supplied decryptor to attempt redecryption of the events
/// associated with the supplied session IDs.
pub(crate) async fn decrypt(&self, decryptor: D, session_ids: Option<BTreeSet<String>>) {
let res = self.sender.send(DecryptionRetryRequest { decryptor, session_ids }).await;
pub(crate) async fn decrypt(
&self,
decryptor: D,
session_ids: Option<BTreeSet<String>>,
settings: TimelineSettings,
) {
let res =
self.sender.send(DecryptionRetryRequest { decryptor, session_ids, settings }).await;

if let Err(error) = res {
error!("Failed to send decryption retry request: {}", error);
Expand All @@ -95,14 +97,14 @@ impl<D: Decryptor> DecryptionRetryTask<D> {
struct DecryptionRetryRequest<D: Decryptor> {
decryptor: D,
session_ids: Option<BTreeSet<String>>,
settings: TimelineSettings,
}

/// Long-running task that waits for decryption requests to come through the
/// supplied channel `receiver` and act on them. Stops when the channel is
/// closed, i.e. when the sender side is dropped.
async fn decryption_task<D: Decryptor>(
state: Arc<RwLock<TimelineState>>,
settings: TimelineSettings,
room_data_provider: impl RoomDataProvider,
mut receiver: Receiver<DecryptionRetryRequest<D>>,
) {
Expand All @@ -122,7 +124,7 @@ async fn decryption_task<D: Decryptor>(
debug!("Retrying decryption");
decrypt_by_index(
state.clone(),
settings.clone(),
&request.settings,
room_data_provider.clone(),
request.decryptor,
should_retry,
Expand Down Expand Up @@ -163,7 +165,7 @@ async fn item_indices_to_retry(
/// supplied decryption `request`.
async fn decrypt_by_index<D: Decryptor>(
state: Arc<RwLock<TimelineState>>,
settings: TimelineSettings,
settings: &TimelineSettings,
room_data_provider: impl RoomDataProvider,
decryptor: D,
should_retry: impl Fn(&str) -> bool,
Expand Down Expand Up @@ -242,7 +244,7 @@ async fn decrypt_by_index<D: Decryptor>(
retry_indices,
push_rules_context,
&room_data_provider,
&settings,
settings,
)
.await;
}
74 changes: 40 additions & 34 deletions crates/matrix-sdk-ui/src/timeline/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use ruma::{
TransactionId, UserId,
};
#[cfg(test)]
use ruma::{events::receipt::ReceiptEventContent, RoomId};
use ruma::{events::receipt::ReceiptEventContent, OwnedRoomId, RoomId};
use tokio::sync::{RwLock, RwLockWriteGuard};
use tracing::{debug, error, field::debug, info, instrument, trace, warn};

Expand Down Expand Up @@ -117,7 +117,7 @@ enum TimelineFocusData<P: RoomDataProvider> {
}

#[derive(Clone, Debug)]
pub(super) struct TimelineController<P: RoomDataProvider = Room> {
pub(super) struct TimelineController<P: RoomDataProvider = Room, D: Decryptor = Room> {
/// Inner mutable state.
state: Arc<RwLock<TimelineState>>,

Expand All @@ -131,6 +131,10 @@ pub(super) struct TimelineController<P: RoomDataProvider = Room> {

/// Settings applied to this timeline.
pub(super) settings: TimelineSettings,

/// Long-running task used to retry decryption of timeline items without
/// blocking main processing.
decryption_retry_task: DecryptionRetryTask<D>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -256,7 +260,7 @@ pub fn default_event_filter(event: &AnySyncTimelineEvent, room_version: &RoomVer
}
}

impl<P: RoomDataProvider> TimelineController<P> {
impl<P: RoomDataProvider, D: Decryptor> TimelineController<P, D> {
pub(super) fn new(
room_data_provider: P,
focus: TimelineFocus,
Expand Down Expand Up @@ -287,20 +291,26 @@ impl<P: RoomDataProvider> TimelineController<P> {
),
};

let state = TimelineState::new(
let state = Arc::new(RwLock::new(TimelineState::new(
focus_kind,
room_data_provider.own_user_id().to_owned(),
room_data_provider.room_version(),
internal_id_prefix,
unable_to_decrypt_hook,
is_room_encrypted,
);
)));

let settings = TimelineSettings::default();

let decryption_retry_task =
DecryptionRetryTask::new(state.clone(), room_data_provider.clone());

Self {
state: Arc::new(RwLock::new(state)),
state,
focus: Arc::new(RwLock::new(focus_data)),
room_data_provider,
settings: Default::default(),
settings,
decryption_retry_task,
}
}

Expand Down Expand Up @@ -1038,37 +1048,12 @@ impl<P: RoomDataProvider> TimelineController<P> {
true
}

#[instrument(skip(self, room), fields(room_id = ?room.room_id()))]
pub(super) async fn retry_event_decryption(
&self,
room: &Room,
session_ids: Option<BTreeSet<String>>,
) {
self.retry_event_decryption_inner(room.to_owned(), session_ids).await
}

#[cfg(test)]
pub(super) async fn retry_event_decryption_test(
&self,
room_id: &RoomId,
olm_machine: OlmMachine,
session_ids: Option<BTreeSet<String>>,
) {
self.retry_event_decryption_inner((olm_machine, room_id.to_owned()), session_ids).await
}

async fn retry_event_decryption_inner(
&self,
decryptor: impl Decryptor,
decryptor: D,
session_ids: Option<BTreeSet<String>>,
) {
let decryption_retry_task = DecryptionRetryTask::new(
self.state.clone(),
self.settings.clone(),
self.room_data_provider.clone(),
);

decryption_retry_task.decrypt(decryptor, session_ids).await;
self.decryption_retry_task.decrypt(decryptor, session_ids, self.settings.clone()).await;
}

pub(super) async fn set_sender_profiles_pending(&self) {
Expand Down Expand Up @@ -1489,6 +1474,27 @@ impl TimelineController {
let state = self.state.read().await;
state.items.all_remote_events().last().map(|event_meta| &event_meta.event_id).cloned()
}

#[instrument(skip(self, room), fields(room_id = ?room.room_id()))]
pub(super) async fn retry_event_decryption(
&self,
room: &Room,
session_ids: Option<BTreeSet<String>>,
) {
self.retry_event_decryption_inner(room.to_owned(), session_ids).await
}
}

#[cfg(test)]
impl<P: RoomDataProvider> TimelineController<P, (OlmMachine, OwnedRoomId)> {
pub(super) async fn retry_event_decryption_test(
&self,
room_id: &RoomId,
olm_machine: OlmMachine,
session_ids: Option<BTreeSet<String>>,
) {
self.retry_event_decryption_inner((olm_machine, room_id.to_owned()), session_ids).await
}
}

async fn fetch_replied_to_event(
Expand Down
7 changes: 4 additions & 3 deletions crates/matrix-sdk-ui/src/timeline/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use imbl::vector;
use indexmap::IndexMap;
use matrix_sdk::{
config::RequestConfig,
crypto::OlmMachine,
deserialized_responses::TimelineEvent,
event_cache::paginator::{PaginableRoom, PaginatorError},
room::{EventWithContextResponse, Messages, MessagesOptions},
Expand All @@ -50,8 +51,8 @@ use ruma::{
push::{PushConditionPowerLevelsCtx, PushConditionRoomCtx, Ruleset},
room_id,
serde::Raw,
uint, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId,
RoomVersionId, TransactionId, UInt, UserId,
uint, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedTransactionId,
OwnedUserId, RoomVersionId, TransactionId, UInt, UserId,
};
use tokio::sync::RwLock;

Expand Down Expand Up @@ -136,7 +137,7 @@ impl TestTimelineBuilder {
}

struct TestTimeline {
controller: TimelineController<TestRoomDataProvider>,
controller: TimelineController<TestRoomDataProvider, (OlmMachine, OwnedRoomId)>,

/// An [`EventFactory`] that can be used for creating events in this
/// timeline.
Expand Down

0 comments on commit 8cd7085

Please sign in to comment.