diff --git a/crates/matrix-sdk/src/send_queue.rs b/crates/matrix-sdk/src/send_queue.rs index 3b1521c9e4a..5efb3e697de 100644 --- a/crates/matrix-sdk/src/send_queue.rs +++ b/crates/matrix-sdk/src/send_queue.rs @@ -59,15 +59,22 @@ use matrix_sdk_base::{ }; use matrix_sdk_common::executor::{spawn, JoinHandle}; use ruma::{ - events::{AnyMessageLikeEventContent, EventContent as _}, + events::{ + room::message::RoomMessageEventContentWithoutRelation, AnyMessageLikeEventContent, + EventContent as _, + }, serde::Raw, - OwnedEventId, OwnedRoomId, OwnedTransactionId, TransactionId, + EventId, OwnedEventId, OwnedRoomId, OwnedTransactionId, TransactionId, }; use tokio::sync::{broadcast, Notify, RwLock}; use tracing::{debug, error, info, instrument, trace, warn}; use crate::{ - client::WeakClient, config::RequestConfig, error::RetryKind, room::WeakRoom, Client, Room, + client::WeakClient, + config::RequestConfig, + error::RetryKind, + room::{edit::EditedContent, WeakRoom}, + Client, Room, }; /// A client-wide send queue, for all the rooms known by a client. @@ -412,6 +419,12 @@ impl RoomSendQueue { break; } + // Try to apply dependent events now; those applying to previously failed + // attempts (local echoes) would succeed now. + if let Err(err) = queue.apply_dependent_events().await { + warn!("errors when applying dependent events: {err}"); + } + if !locally_enabled.load(Ordering::SeqCst) { trace!("not enabled, sleeping"); // Wait for an explicit wakeup. @@ -455,7 +468,7 @@ impl RoomSendQueue { Ok(res) => { trace!(txn_id = %queued_event.transaction_id, event_id = %res.event_id, "successfully sent"); - match queue.mark_as_sent(&queued_event.transaction_id).await { + match queue.mark_as_sent(&queued_event.transaction_id, &res.event_id).await { Ok(()) => { let _ = updates.send(RoomSendQueueUpdate::SentEvent { transaction_id: queued_event.transaction_id, @@ -675,13 +688,21 @@ impl QueueStorage { async fn mark_as_sent( &self, transaction_id: &TransactionId, + event_id: &EventId, ) -> Result<(), RoomSendQueueStorageError> { // Keep the lock until we're done touching the storage. let mut being_sent = self.being_sent.write().await; being_sent.remove(transaction_id); - let removed = - self.client()?.store().remove_send_queue_event(&self.room_id, transaction_id).await?; + let client = self.client()?; + let store = client.store(); + + // Update all dependent events. + store + .update_dependent_send_queue_event(&self.room_id, transaction_id, event_id.to_owned()) + .await?; + + let removed = store.remove_send_queue_event(&self.room_id, transaction_id).await?; if !removed { warn!(txn_id = %transaction_id, "event marked as sent was missing from storage"); @@ -704,7 +725,17 @@ impl QueueStorage { let being_sent = self.being_sent.read().await; if being_sent.contains(transaction_id) { - return Ok(false); + // Save the intent to redact the event. + self.client()? + .store() + .save_dependent_send_queue_event( + &self.room_id, + transaction_id, + DependentQueuedEventKind::Redact, + ) + .await?; + + return Ok(true); } let removed = @@ -729,7 +760,17 @@ impl QueueStorage { let being_sent = self.being_sent.read().await; if being_sent.contains(transaction_id) { - return Ok(false); + // Save the intent to redact the event. + self.client()? + .store() + .save_dependent_send_queue_event( + &self.room_id, + transaction_id, + DependentQueuedEventKind::Edit { new_content: serializable }, + ) + .await?; + + return Ok(true); } let edited = self @@ -746,6 +787,170 @@ impl QueueStorage { async fn local_echoes(&self) -> Result, RoomSendQueueStorageError> { Ok(self.client()?.store().load_send_queue_events(&self.room_id).await?) } + + /// Try to apply a single dependent event, whether it's local or remote. + /// + /// This swallows errors that would retrigger every time if we retried + /// applying the dependent event: invalid edit content, etc. + #[instrument(skip_all)] + async fn try_apply_single_dependent_event( + &self, + client: &Client, + de: DependentQueuedEvent, + ) -> Result<(), RoomSendQueueError> { + let store = client.store(); + + match de.kind { + DependentQueuedEventKind::Edit { new_content } => { + if let Some(event_id) = de.event_id { + // The parent event has been sent, so send an edit event. + let room = client + .get_room(&self.room_id) + .ok_or(RoomSendQueueError::RoomDisappeared)?; + + // Check the event is one we know how to edit with an edit event. + + // 1. It must be deserializable… + let content = match new_content.deserialize() { + Ok(c) => c, + Err(err) => { + warn!("unable to deserialize: {err}"); + return Ok(()); + } + }; + + // 2. …and a room message, at this point. + let AnyMessageLikeEventContent::RoomMessage(room_message_content) = content + else { + warn!("trying to send an edit event for a non-room message: aborting"); + return Ok(()); + }; + + // Assume no relation. + let new_content: RoomMessageEventContentWithoutRelation = + room_message_content.into(); + + let edit_event = match room + .make_edit_event(&event_id, EditedContent::RoomMessage(new_content)) + .await + { + Ok(e) => e, + Err(err) => { + warn!("couldn't create edited event: {err}"); + return Ok(()); + } + }; + + // Queue the edit event in the send queue 🧠. + let serializable = SerializableEventContent::from_raw( + Raw::new(&edit_event) + .map_err(RoomSendQueueStorageError::JsonSerialization)?, + edit_event.event_type().to_string(), + ); + + store + .save_send_queue_event(&self.room_id, TransactionId::new(), serializable) + .await + .map_err(RoomSendQueueStorageError::StorageError)?; + } else { + // The parent event is still local (sending must have failed); update the local + // echo. + let edited = store + .update_send_queue_event(&self.room_id, &de.transaction_id, new_content) + .await + .map_err(RoomSendQueueStorageError::StorageError)?; + + if !edited { + warn!("missing local echo upon dependent edit"); + } + } + } + + DependentQueuedEventKind::Redact => { + if let Some(event_id) = de.event_id { + // The parent event has been sent; send a redaction. + let room = client + .get_room(&self.room_id) + .ok_or(RoomSendQueueError::RoomDisappeared)?; + + // Ideally we'd use the send queue to send the redaction, but the protocol has + // changed the shape of a room.redaction after v11, so keep it simple and try + // once here. + + // Note: no reason is provided because we materialize the intent of "cancel + // sending the parent event". + + if let Err(err) = room.redact(&event_id, None, None).await { + warn!("error when sending a redact for {event_id}: {err}"); + } + } else { + // The parent event is still local (sending must have failed); redact the local + // echo. + let removed = store + .remove_send_queue_event(&self.room_id, &de.transaction_id) + .await + .map_err(RoomSendQueueStorageError::StorageError)?; + + if !removed { + warn!("missing local echo upon dependent redact??"); + } + } + } + } + + Ok(()) + } + + #[instrument(skip(self))] + async fn apply_dependent_events(&self) -> Result<(), RoomSendQueueError> { + // Keep the lock until we're done touching the storage. + let _being_sent = self.being_sent.read().await; + + let client = self.client()?; + let store = client.store(); + + let dependent_events = store + .list_dependent_send_queue_events(&self.room_id) + .await + .map_err(RoomSendQueueStorageError::StorageError)?; + + let num_initial_dependent_events = dependent_events.len(); + + let canonicalized_dependent_events = canonicalize_dependent_events(&dependent_events); + let mut num_dependent_events = canonicalized_dependent_events.len(); + + debug!( + num_dependent_events, + num_initial_dependent_events, "starting handling of dependent events" + ); + + for dependent in canonicalized_dependent_events { + let dependent_id = dependent.id; + + match self.try_apply_single_dependent_event(&client, dependent).await { + Ok(()) => { + // The dependent event has been successfully applied, forget about it. + store + .remove_dependent_send_queue_event(&self.room_id, dependent_id) + .await + .map_err(RoomSendQueueStorageError::StorageError)?; + + num_dependent_events -= 1; + } + + Err(err) => { + warn!("error when applying single dependent event: {err}"); + } + } + } + + debug!( + leftovert_dependent_events = num_dependent_events, + "stopped handling dependent events" + ); + + Ok(()) + } } /// An event that has been locally queued for sending, but hasn't been sent yet. @@ -875,7 +1080,7 @@ impl SendHandle { Ok(true) } else { - debug!("event was being sent, can't abort"); + debug!("local echo didn't exist anymore, can't abort"); Ok(false) } } @@ -908,7 +1113,7 @@ impl SendHandle { Ok(true) } else { - debug!("event was being sent, can't edit"); + debug!("local echo doesn't exist anymore, can't edit"); Ok(false) } } diff --git a/crates/matrix-sdk/tests/integration/send_queue.rs b/crates/matrix-sdk/tests/integration/send_queue.rs index 7a6717b7a29..a0b339316d8 100644 --- a/crates/matrix-sdk/tests/integration/send_queue.rs +++ b/crates/matrix-sdk/tests/integration/send_queue.rs @@ -11,7 +11,9 @@ use assert_matches2::{assert_let, assert_matches}; use matrix_sdk::{ config::{RequestConfig, StoreConfig}, send_queue::{LocalEcho, RoomSendQueueError, RoomSendQueueUpdate}, - test_utils::{logged_in_client, logged_in_client_with_server, set_client_session}, + test_utils::{ + events::EventFactory, logged_in_client, logged_in_client_with_server, set_client_session, + }, Client, MemoryStore, }; use matrix_sdk_test::{async_test, InvitedRoomBuilder, JoinedRoomBuilder, LeftRoomBuilder}; @@ -26,7 +28,10 @@ use ruma::{ EventId, OwnedEventId, }; use serde_json::json; -use tokio::{sync::Mutex, time::timeout}; +use tokio::{ + sync::Mutex, + time::{sleep, timeout}, +}; use wiremock::{ matchers::{header, method, path_regex}, Mock, Request, ResponseTemplate, @@ -429,7 +434,7 @@ async fn test_error_then_locally_reenabling() { let error = error.as_client_api_error().unwrap(); assert_eq!(error.status_code, 500); - tokio::time::sleep(Duration::from_millis(50)).await; + sleep(Duration::from_millis(50)).await; assert!(watch.is_empty()); @@ -584,7 +589,7 @@ async fn test_reenabling_queue() { assert!(watch.is_empty()); // Messages aren't sent immediately. - tokio::time::sleep(Duration::from_millis(500)).await; + sleep(Duration::from_millis(500)).await; assert!(watch.is_empty()); @@ -729,6 +734,15 @@ async fn test_cancellation() { .mount(&server) .await; + // The redact of txn1 will happen because we asked for it previously. + Mock::given(method("PUT")) + .and(path_regex(r"^/_matrix/client/r0/rooms/.*/redact/.*?/.*?")) + .and(header("authorization", "Bearer 1234")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({"event_id": "$1"}))) + .expect(1) + .mount(&server) + .await; + let handle1 = q.send(RoomMessageEventContent::text_plain("msg1").into()).await.unwrap(); let handle2 = q.send(RoomMessageEventContent::text_plain("msg2").into()).await.unwrap(); q.send(RoomMessageEventContent::text_plain("msg3").into()).await.unwrap(); @@ -746,8 +760,9 @@ async fn test_cancellation() { // Let the background task start now. tokio::task::yield_now().await; - // The first item is already being sent, so we can't abort it. - assert!(!handle1.abort().await.unwrap()); + // While the first item is being sent, the system records the intent to edit it. + assert!(handle1.abort().await.unwrap()); + assert_update!(watch => cancelled { txn = txn1 }); assert!(watch.is_empty()); // The second item is pending, so we can abort it, using the handle returned by @@ -795,6 +810,10 @@ async fn test_edit() { let (client, server) = logged_in_client_with_server().await; + // TODO: (#3722) if the event cache isn't available, then making the edit event + // will fail. + client.event_cache().subscribe().unwrap(); + // Mark the room as joined. let room_id = room_id!("!a:b.c"); @@ -846,7 +865,27 @@ async fn test_edit() { "event_id": event_id, })) }) - .expect(2) + .expect(3) + .mount(&server) + .await; + + // The /event endpoint is used to retrieve the original event, during creation + // of the edit event. + Mock::given(method("GET")) + .and(path_regex(r"^/_matrix/client/r0/rooms/.*/event/")) + .and(header("authorization", "Bearer 1234")) + .respond_with( + ResponseTemplate::new(200).set_body_json( + EventFactory::new() + .text_msg("msg1") + .sender(client.user_id().unwrap()) + .room(room_id) + .into_raw_timeline() + .json(), + ), + ) + .expect(1) + .named("get_event") .mount(&server) .await; @@ -861,12 +900,13 @@ async fn test_edit() { // Let the background task start now. tokio::task::yield_now().await; - // The first item is already being sent, so we can't edit it. - assert!(!handle1 - .edit(RoomMessageEventContent::text_plain("it's too late!").into()) + // While the first item is being sent, the system remembers the intent to edit + // it, and will send it later. + assert!(handle1 + .edit(RoomMessageEventContent::text_plain("it's never too late!").into()) .await .unwrap()); - assert!(watch.is_empty()); + assert_update!(watch => edit { body = "it's never too late!", txn = txn1 }); // The second item is pending, so we can edit it, using the handle returned by // `send()`. @@ -883,7 +923,102 @@ async fn test_edit() { // Now the server will process the messages in order. assert_update!(watch => sent { txn = txn1, }); assert_update!(watch => sent { txn = txn2, }); + + // Let a bit of time to process the edit event sent to the server for txn1. + assert_update!(watch => sent {}); + + assert!(watch.is_empty()); +} + +#[async_test] +async fn test_edit_while_being_sent_and_fails() { + let (client, server) = logged_in_client_with_server().await; + + // TODO: (#3722) if the event cache isn't available, then making the edit event + // will fail. + client.event_cache().subscribe().unwrap(); + + // Mark the room as joined. + let room_id = room_id!("!a:b.c"); + + let room = mock_sync_with_new_room( + |builder| { + builder.add_joined_room(JoinedRoomBuilder::new(room_id)); + }, + &client, + &server, + room_id, + ) + .await; + + let q = room.send_queue(); + + let (local_echoes, mut watch) = q.subscribe().await.unwrap(); + + assert!(local_echoes.is_empty()); + assert!(watch.is_empty()); + + let lock = Arc::new(Mutex::new(())); + let lock_guard = lock.lock().await; + + let mock_lock = lock.clone(); + + mock_encryption_state(&server, false).await; + + Mock::given(method("PUT")) + .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/.*")) + .and(header("authorization", "Bearer 1234")) + .respond_with(move |_req: &Request| { + // Wait for the signal from the main thread that we can process this query. + let mock_lock = mock_lock.clone(); + std::thread::spawn(move || { + tokio::runtime::Runtime::new().unwrap().block_on(async { + drop(mock_lock.lock().await); + }); + }) + .join() + .unwrap(); + + ResponseTemplate::new(500) + }) + .expect(3) // reattempts, because of short_retry() + .mount(&server) + .await; + + let handle = q.send(RoomMessageEventContent::text_plain("yo").into()).await.unwrap(); + + // Receiving updates for local echoes. + let (txn1, _) = assert_update!(watch => local echo { body = "yo" }); + assert!(watch.is_empty()); + + // Let the background task start now. + tokio::task::yield_now().await; + + // While the first item is being sent, the system remembers the intent to edit + // it, and will send it later. + assert!(handle + .edit(RoomMessageEventContent::text_plain("it's never too late!").into()) + .await + .unwrap()); + assert_update!(watch => edit { body = "it's never too late!", txn = txn1 }); + + // Let the server process the responses. + drop(lock_guard); + + // Now the server will process the messages in order. + assert_update!(watch => error { recoverable = true, txn = txn1 }); + assert!(watch.is_empty()); + + // Looking back at the local echoes will indicate a local echo for `it's never + // too late`. + let (local_echoes, _) = q.subscribe().await.unwrap(); + assert_eq!(local_echoes.len(), 1); + assert_eq!(local_echoes[0].transaction_id, txn1); + + let event = local_echoes[0].serialized_event.deserialize().unwrap(); + assert_let!(AnyMessageLikeEventContent::RoomMessage(msg) = event); + assert_eq!(msg.body(), "it's never too late!"); } #[async_test] @@ -1068,6 +1203,85 @@ async fn test_abort_or_edit_after_send() { assert!(watch.is_empty()); } +#[async_test] +async fn test_abort_while_being_sent_and_fails() { + let (client, server) = logged_in_client_with_server().await; + + // Mark the room as joined. + let room_id = room_id!("!a:b.c"); + + let room = mock_sync_with_new_room( + |builder| { + builder.add_joined_room(JoinedRoomBuilder::new(room_id)); + }, + &client, + &server, + room_id, + ) + .await; + + let q = room.send_queue(); + + let (local_echoes, mut watch) = q.subscribe().await.unwrap(); + + assert!(local_echoes.is_empty()); + assert!(watch.is_empty()); + + let lock = Arc::new(Mutex::new(())); + let lock_guard = lock.lock().await; + + let mock_lock = lock.clone(); + + mock_encryption_state(&server, false).await; + + Mock::given(method("PUT")) + .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/.*")) + .and(header("authorization", "Bearer 1234")) + .respond_with(move |_req: &Request| { + // Wait for the signal from the main thread that we can process this query. + let mock_lock = mock_lock.clone(); + std::thread::spawn(move || { + tokio::runtime::Runtime::new().unwrap().block_on(async { + drop(mock_lock.lock().await); + }); + }) + .join() + .unwrap(); + + ResponseTemplate::new(500) + }) + .expect(3) // reattempts, because of short_retry() + .mount(&server) + .await; + + let handle = q.send(RoomMessageEventContent::text_plain("yo").into()).await.unwrap(); + + // Receiving updates for local echoes. + let (txn1, _) = assert_update!(watch => local echo { body = "yo" }); + assert!(watch.is_empty()); + + // Let the background task start now. + tokio::task::yield_now().await; + + // While the item is being sent, the system remembers the intent to redact it + // later. + assert!(handle.abort().await.unwrap()); + assert_update!(watch => cancelled { txn = txn1 }); + + // Let the server process the responses. + drop(lock_guard); + + // Now the server will process the messages in order. + assert_update!(watch => error { recoverable = true, txn = txn1 }); + + assert!(watch.is_empty()); + + // Looking back at the local echoes will indicate a local echo for `it's never + // too late`. + let (local_echoes, _) = q.subscribe().await.unwrap(); + assert!(local_echoes.is_empty()); +} + #[async_test] async fn test_unrecoverable_errors() { let (client, server) = logged_in_client_with_server().await; @@ -1271,7 +1485,7 @@ async fn test_reloading_rooms_with_unsent_events() { .unwrap(); // No errors, because the queue has been disabled. - tokio::time::sleep(Duration::from_millis(300)).await; + sleep(Duration::from_millis(300)).await; assert!(watch.is_empty()); server.reset().await; @@ -1281,7 +1495,7 @@ async fn test_reloading_rooms_with_unsent_events() { drop(watch); drop(q); drop(client); - tokio::time::sleep(Duration::from_secs(1)).await; + sleep(Duration::from_secs(1)).await; } // Create a new client with the same memory backend. As the send queues are @@ -1318,7 +1532,7 @@ async fn test_reloading_rooms_with_unsent_events() { client.send_queue().respawn_tasks_for_rooms_with_unsent_events().await; // Let the sending queues process events. - tokio::time::sleep(Duration::from_secs(1)).await; + sleep(Duration::from_secs(1)).await; // The real assertion is on the expect(2) on the above Mock. server.verify().await;