Skip to content

Commit

Permalink
send queue: make use of dependent events to remember an intent to edi…
Browse files Browse the repository at this point in the history
…t/redact an event

This should get rid of most of the race conditions while
editing/redacting an event, and this paves the way for sending reactions
via the send queue.
  • Loading branch information
bnjbvr committed Jul 24, 2024
1 parent d973fef commit 0246863
Show file tree
Hide file tree
Showing 2 changed files with 443 additions and 24 deletions.
225 changes: 215 additions & 10 deletions crates/matrix-sdk/src/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,22 @@ use matrix_sdk_base::{
};
use matrix_sdk_common::executor::{spawn, JoinHandle};
use ruma::{
events::{AnyMessageLikeEventContent, EventContent as _},
events::{
room::message::RoomMessageEventContentWithoutRelation, AnyMessageLikeEventContent,
EventContent as _,
},
serde::Raw,
OwnedEventId, OwnedRoomId, OwnedTransactionId, TransactionId,
EventId, OwnedEventId, OwnedRoomId, OwnedTransactionId, TransactionId,
};
use tokio::sync::{broadcast, Notify, RwLock};
use tracing::{debug, error, info, instrument, trace, warn};

use crate::{
client::WeakClient, config::RequestConfig, error::RetryKind, room::WeakRoom, Client, Room,
client::WeakClient,
config::RequestConfig,
error::RetryKind,
room::{edit::EditedContent, WeakRoom},
Client, Room,
};

/// A client-wide send queue, for all the rooms known by a client.
Expand Down Expand Up @@ -412,6 +419,12 @@ impl RoomSendQueue {
break;
}

// Try to apply dependent events now; those applying to previously failed
// attempts (local echoes) would succeed now.
if let Err(err) = queue.apply_dependent_events().await {
warn!("errors when applying dependent events: {err}");
}

if !locally_enabled.load(Ordering::SeqCst) {
trace!("not enabled, sleeping");
// Wait for an explicit wakeup.
Expand Down Expand Up @@ -455,7 +468,7 @@ impl RoomSendQueue {
Ok(res) => {
trace!(txn_id = %queued_event.transaction_id, event_id = %res.event_id, "successfully sent");

match queue.mark_as_sent(&queued_event.transaction_id).await {
match queue.mark_as_sent(&queued_event.transaction_id, &res.event_id).await {
Ok(()) => {
let _ = updates.send(RoomSendQueueUpdate::SentEvent {
transaction_id: queued_event.transaction_id,
Expand Down Expand Up @@ -675,13 +688,21 @@ impl QueueStorage {
async fn mark_as_sent(
&self,
transaction_id: &TransactionId,
event_id: &EventId,
) -> Result<(), RoomSendQueueStorageError> {
// Keep the lock until we're done touching the storage.
let mut being_sent = self.being_sent.write().await;
being_sent.remove(transaction_id);

let removed =
self.client()?.store().remove_send_queue_event(&self.room_id, transaction_id).await?;
let client = self.client()?;
let store = client.store();

// Update all dependent events.
store
.update_dependent_send_queue_event(&self.room_id, transaction_id, event_id.to_owned())
.await?;

let removed = store.remove_send_queue_event(&self.room_id, transaction_id).await?;

if !removed {
warn!(txn_id = %transaction_id, "event marked as sent was missing from storage");
Expand All @@ -704,7 +725,17 @@ impl QueueStorage {
let being_sent = self.being_sent.read().await;

if being_sent.contains(transaction_id) {
return Ok(false);
// Save the intent to redact the event.
self.client()?
.store()
.save_dependent_send_queue_event(
&self.room_id,
transaction_id,
DependentQueuedEventKind::Redact,
)
.await?;

return Ok(true);
}

let removed =
Expand All @@ -729,7 +760,17 @@ impl QueueStorage {
let being_sent = self.being_sent.read().await;

if being_sent.contains(transaction_id) {
return Ok(false);
// Save the intent to redact the event.
self.client()?
.store()
.save_dependent_send_queue_event(
&self.room_id,
transaction_id,
DependentQueuedEventKind::Edit { new_content: serializable },
)
.await?;

return Ok(true);
}

let edited = self
Expand All @@ -746,6 +787,170 @@ impl QueueStorage {
async fn local_echoes(&self) -> Result<Vec<QueuedEvent>, RoomSendQueueStorageError> {
Ok(self.client()?.store().load_send_queue_events(&self.room_id).await?)
}

/// Try to apply a single dependent event, whether it's local or remote.
///
/// This swallows errors that would retrigger every time if we retried
/// applying the dependent event: invalid edit content, etc.
#[instrument(skip_all)]
async fn try_apply_single_dependent_event(
&self,
client: &Client,
de: DependentQueuedEvent,
) -> Result<(), RoomSendQueueError> {
let store = client.store();

match de.kind {
DependentQueuedEventKind::Edit { new_content } => {
if let Some(event_id) = de.event_id {
// The parent event has been sent, so send an edit event.
let room = client
.get_room(&self.room_id)
.ok_or(RoomSendQueueError::RoomDisappeared)?;

// Check the event is one we know how to edit with an edit event.

// 1. It must be deserializable…
let content = match new_content.deserialize() {
Ok(c) => c,
Err(err) => {
warn!("unable to deserialize: {err}");
return Ok(());
}
};

// 2. …and a room message, at this point.
let AnyMessageLikeEventContent::RoomMessage(room_message_content) = content
else {
warn!("trying to send an edit event for a non-room message: aborting");
return Ok(());
};

// Assume no relation.
let new_content: RoomMessageEventContentWithoutRelation =
room_message_content.into();

let edit_event = match room
.make_edit_event(&event_id, EditedContent::RoomMessage(new_content))
.await
{
Ok(e) => e,
Err(err) => {
warn!("couldn't create edited event: {err}");
return Ok(());
}
};

// Queue the edit event in the send queue 🧠.
let serializable = SerializableEventContent::from_raw(
Raw::new(&edit_event)
.map_err(RoomSendQueueStorageError::JsonSerialization)?,
edit_event.event_type().to_string(),
);

store
.save_send_queue_event(&self.room_id, TransactionId::new(), serializable)
.await
.map_err(RoomSendQueueStorageError::StorageError)?;
} else {
// The parent event is still local (sending must have failed); update the local
// echo.
let edited = store
.update_send_queue_event(&self.room_id, &de.transaction_id, new_content)
.await
.map_err(RoomSendQueueStorageError::StorageError)?;

if !edited {
warn!("missing local echo upon dependent edit");
}
}
}

DependentQueuedEventKind::Redact => {
if let Some(event_id) = de.event_id {
// The parent event has been sent; send a redaction.
let room = client
.get_room(&self.room_id)
.ok_or(RoomSendQueueError::RoomDisappeared)?;

// Ideally we'd use the send queue to send the redaction, but the protocol has
// changed the shape of a room.redaction after v11, so keep it simple and try
// once here.

// Note: no reason is provided because we materialize the intent of "cancel
// sending the parent event".

if let Err(err) = room.redact(&event_id, None, None).await {
warn!("error when sending a redact for {event_id}: {err}");
}
} else {
// The parent event is still local (sending must have failed); redact the local
// echo.
let removed = store
.remove_send_queue_event(&self.room_id, &de.transaction_id)
.await
.map_err(RoomSendQueueStorageError::StorageError)?;

if !removed {
warn!("missing local echo upon dependent redact??");
}
}
}
}

Ok(())
}

#[instrument(skip(self))]
async fn apply_dependent_events(&self) -> Result<(), RoomSendQueueError> {
// Keep the lock until we're done touching the storage.
let _being_sent = self.being_sent.read().await;

let client = self.client()?;
let store = client.store();

let dependent_events = store
.list_dependent_send_queue_events(&self.room_id)
.await
.map_err(RoomSendQueueStorageError::StorageError)?;

let num_initial_dependent_events = dependent_events.len();

let canonicalized_dependent_events = canonicalize_dependent_events(&dependent_events);
let mut num_dependent_events = canonicalized_dependent_events.len();

debug!(
num_dependent_events,
num_initial_dependent_events, "starting handling of dependent events"
);

for dependent in canonicalized_dependent_events {
let dependent_id = dependent.id;

match self.try_apply_single_dependent_event(&client, dependent).await {
Ok(()) => {
// The dependent event has been successfully applied, forget about it.
store
.remove_dependent_send_queue_event(&self.room_id, dependent_id)
.await
.map_err(RoomSendQueueStorageError::StorageError)?;

num_dependent_events -= 1;
}

Err(err) => {
warn!("error when applying single dependent event: {err}");
}
}
}

debug!(
leftovert_dependent_events = num_dependent_events,
"stopped handling dependent events"
);

Ok(())
}
}

/// An event that has been locally queued for sending, but hasn't been sent yet.
Expand Down Expand Up @@ -875,7 +1080,7 @@ impl SendHandle {

Ok(true)
} else {
debug!("event was being sent, can't abort");
debug!("local echo didn't exist anymore, can't abort");
Ok(false)
}
}
Expand Down Expand Up @@ -908,7 +1113,7 @@ impl SendHandle {

Ok(true)
} else {
debug!("event was being sent, can't edit");
debug!("local echo doesn't exist anymore, can't edit");
Ok(false)
}
}
Expand Down
Loading

0 comments on commit 0246863

Please sign in to comment.