Skip to content

Commit

Permalink
send queue: add QueueStorage::client() helper method
Browse files Browse the repository at this point in the history
  • Loading branch information
bnjbvr committed Jul 24, 2024
1 parent 54c6f05 commit d973fef
Showing 1 changed file with 14 additions and 37 deletions.
51 changes: 14 additions & 37 deletions crates/matrix-sdk/src/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,11 @@ impl QueueStorage {
Self { room_id: room, being_sent: Default::default(), client }
}

/// Small helper to get a strong Client from the weak one.
fn client(&self) -> Result<Client, RoomSendQueueStorageError> {
self.client.get().ok_or(RoomSendQueueStorageError::ClientShuttingDown)
}

/// Push a new event to be sent in the queue.
///
/// Returns the transaction id chosen to identify the request.
Expand All @@ -613,9 +618,7 @@ impl QueueStorage {
) -> Result<OwnedTransactionId, RoomSendQueueStorageError> {
let transaction_id = TransactionId::new();

self.client
.get()
.ok_or(RoomSendQueueStorageError::ClientShuttingDown)?
self.client()?
.store()
.save_send_queue_event(&self.room_id, transaction_id.clone(), serializable)
.await?;
Expand All @@ -631,13 +634,7 @@ impl QueueStorage {
// Keep the lock until we're done touching the storage.
let mut being_sent = self.being_sent.write().await;

let queued_events = self
.client
.get()
.ok_or(RoomSendQueueStorageError::ClientShuttingDown)?
.store()
.load_send_queue_events(&self.room_id)
.await?;
let queued_events = self.client()?.store().load_send_queue_events(&self.room_id).await?;

if let Some(event) = queued_events.iter().find(|queued| !queued.is_wedged) {
being_sent.insert(event.transaction_id.clone());
Expand Down Expand Up @@ -667,9 +664,7 @@ impl QueueStorage {
being_sent.remove(transaction_id);

Ok(self
.client
.get()
.ok_or(RoomSendQueueStorageError::ClientShuttingDown)?
.client()?
.store()
.update_send_queue_event_status(&self.room_id, transaction_id, true)
.await?)
Expand All @@ -685,13 +680,8 @@ impl QueueStorage {
let mut being_sent = self.being_sent.write().await;
being_sent.remove(transaction_id);

let removed = self
.client
.get()
.ok_or(RoomSendQueueStorageError::ClientShuttingDown)?
.store()
.remove_send_queue_event(&self.room_id, transaction_id)
.await?;
let removed =
self.client()?.store().remove_send_queue_event(&self.room_id, transaction_id).await?;

if !removed {
warn!(txn_id = %transaction_id, "event marked as sent was missing from storage");
Expand All @@ -717,13 +707,8 @@ impl QueueStorage {
return Ok(false);
}

let removed = self
.client
.get()
.ok_or(RoomSendQueueStorageError::ClientShuttingDown)?
.store()
.remove_send_queue_event(&self.room_id, transaction_id)
.await?;
let removed =
self.client()?.store().remove_send_queue_event(&self.room_id, transaction_id).await?;

Ok(removed)
}
Expand All @@ -748,9 +733,7 @@ impl QueueStorage {
}

let edited = self
.client
.get()
.ok_or(RoomSendQueueStorageError::ClientShuttingDown)?
.client()?
.store()
.update_send_queue_event(&self.room_id, transaction_id, serializable)
.await?;
Expand All @@ -761,13 +744,7 @@ impl QueueStorage {
/// Returns a list of the local echoes, that is, all the events that we're
/// about to send but that haven't been sent yet (or are being sent).
async fn local_echoes(&self) -> Result<Vec<QueuedEvent>, RoomSendQueueStorageError> {
Ok(self
.client
.get()
.ok_or(RoomSendQueueStorageError::ClientShuttingDown)?
.store()
.load_send_queue_events(&self.room_id)
.await?)
Ok(self.client()?.store().load_send_queue_events(&self.room_id).await?)
}
}

Expand Down

0 comments on commit d973fef

Please sign in to comment.