Skip to content

Commit

Permalink
send queue: add an own transaction id for dependent events
Browse files Browse the repository at this point in the history
The previously named `transaction_id` is also renamed to
`parent_transaction_id` to make it clearer.
  • Loading branch information
bnjbvr committed Jul 24, 2024
1 parent 0246863 commit 2eb6930
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 85 deletions.
36 changes: 26 additions & 10 deletions crates/matrix-sdk-base/src/store/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1498,14 +1498,21 @@ impl StateStoreIntegrationTests for DynStateStore {
assert!(self.list_dependent_send_queue_events(room_id).await.unwrap().is_empty());

// Save a redaction for that event.
self.save_dependent_send_queue_event(room_id, &txn0, DependentQueuedEventKind::Redact)
.await
.unwrap();
let child_txn = TransactionId::new();
self.save_dependent_send_queue_event(
room_id,
&txn0,
child_txn.clone(),
DependentQueuedEventKind::Redact,
)
.await
.unwrap();

// It worked.
let dependents = self.list_dependent_send_queue_events(room_id).await.unwrap();
assert_eq!(dependents.len(), 1);
assert_eq!(dependents[0].transaction_id, txn0);
assert_eq!(dependents[0].parent_transaction_id, txn0);
assert_eq!(dependents[0].own_transaction_id, child_txn);
assert!(dependents[0].event_id.is_none());
assert_matches!(dependents[0].kind, DependentQueuedEventKind::Redact);

Expand All @@ -1518,13 +1525,16 @@ impl StateStoreIntegrationTests for DynStateStore {
// It worked.
let dependents = self.list_dependent_send_queue_events(room_id).await.unwrap();
assert_eq!(dependents.len(), 1);
assert_eq!(dependents[0].transaction_id, txn0);
assert_eq!(dependents[0].parent_transaction_id, txn0);
assert_eq!(dependents[0].own_transaction_id, child_txn);
assert_eq!(dependents[0].event_id.as_ref(), Some(&event_id));
assert_matches!(dependents[0].kind, DependentQueuedEventKind::Redact);

// Now remove it.
let removed =
self.remove_dependent_send_queue_event(room_id, dependents[0].id).await.unwrap();
let removed = self
.remove_dependent_send_queue_event(room_id, &dependents[0].own_transaction_id)
.await
.unwrap();
assert!(removed);

// It worked.
Expand All @@ -1538,14 +1548,20 @@ impl StateStoreIntegrationTests for DynStateStore {
.unwrap();
self.save_send_queue_event(room_id, txn1.clone(), event1).await.unwrap();

self.save_dependent_send_queue_event(room_id, &txn0, DependentQueuedEventKind::Redact)
.await
.unwrap();
self.save_dependent_send_queue_event(
room_id,
&txn0,
TransactionId::new(),
DependentQueuedEventKind::Redact,
)
.await
.unwrap();
assert_eq!(self.list_dependent_send_queue_events(room_id).await.unwrap().len(), 1);

self.save_dependent_send_queue_event(
room_id,
&txn1,
TransactionId::new(),
DependentQueuedEventKind::Edit {
new_content: SerializableEventContent::new(
&RoomMessageEventContent::text_plain("edit").into(),
Expand Down
28 changes: 9 additions & 19 deletions crates/matrix-sdk-base/src/store/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::{
collections::{BTreeMap, BTreeSet, HashMap},
num::NonZeroUsize,
sync::{Mutex, RwLock as StdRwLock},
sync::RwLock as StdRwLock,
};

use async_trait::async_trait;
Expand Down Expand Up @@ -92,7 +92,6 @@ pub struct MemoryStore {
custom: StdRwLock<HashMap<Vec<u8>, Vec<u8>>>,
send_queue_events: StdRwLock<BTreeMap<OwnedRoomId, Vec<QueuedEvent>>>,
dependent_send_queue_events: StdRwLock<BTreeMap<OwnedRoomId, Vec<DependentQueuedEvent>>>,
dependent_send_queue_event_next_id: Mutex<usize>,
}

// SAFETY: `new_unchecked` is safe because 20 is not zero.
Expand Down Expand Up @@ -124,7 +123,6 @@ impl Default for MemoryStore {
custom: Default::default(),
send_queue_events: Default::default(),
dependent_send_queue_events: Default::default(),
dependent_send_queue_event_next_id: Mutex::new(0),
}
}
}
Expand Down Expand Up @@ -1005,39 +1003,31 @@ impl StateStore for MemoryStore {
async fn save_dependent_send_queue_event(
&self,
room: &RoomId,
transaction_id: &TransactionId,
parent_transaction_id: &TransactionId,
own_transaction_id: OwnedTransactionId,
content: DependentQueuedEventKind,
) -> Result<(), Self::Error> {
let id = {
let mut next_id = self.dependent_send_queue_event_next_id.lock().unwrap();
// Don't tell anyone, but sometimes I miss C++'s `x++` operator.
let id = *next_id;
*next_id += 1;
id
};

self.dependent_send_queue_events.write().unwrap().entry(room.to_owned()).or_default().push(
DependentQueuedEvent {
id,
kind: content,
transaction_id: transaction_id.to_owned(),
parent_transaction_id: parent_transaction_id.to_owned(),
own_transaction_id,
event_id: None,
},
);

Ok(())
}

async fn update_dependent_send_queue_event(
&self,
room: &RoomId,
transaction_id: &TransactionId,
parent_txn_id: &TransactionId,
event_id: OwnedEventId,
) -> Result<usize, Self::Error> {
let mut dependent_send_queue_events = self.dependent_send_queue_events.write().unwrap();
let dependents = dependent_send_queue_events.entry(room.to_owned()).or_default();
let mut num_updated = 0;
for d in dependents.iter_mut().filter(|item| item.transaction_id == transaction_id) {
for d in dependents.iter_mut().filter(|item| item.parent_transaction_id == parent_txn_id) {
d.event_id = Some(event_id.clone());
num_updated += 1;
}
Expand All @@ -1047,11 +1037,11 @@ impl StateStore for MemoryStore {
async fn remove_dependent_send_queue_event(
&self,
room: &RoomId,
id: usize,
txn_id: &TransactionId,
) -> Result<bool, Self::Error> {
let mut dependent_send_queue_events = self.dependent_send_queue_events.write().unwrap();
let dependents = dependent_send_queue_events.entry(room.to_owned()).or_default();
if let Some(pos) = dependents.iter().position(|item| item.id == id) {
if let Some(pos) = dependents.iter().position(|item| item.own_transaction_id == txn_id) {
dependents.remove(pos);
Ok(true)
} else {
Expand Down
24 changes: 13 additions & 11 deletions crates/matrix-sdk-base/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,8 @@ pub trait StateStore: AsyncTraitDeps {
async fn save_dependent_send_queue_event(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
parent_txn_id: &TransactionId,
own_txn_id: OwnedTransactionId,
content: DependentQueuedEventKind,
) -> Result<(), Self::Error>;

Expand All @@ -470,7 +471,7 @@ pub trait StateStore: AsyncTraitDeps {
async fn update_dependent_send_queue_event(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
parent_txn_id: &TransactionId,
event_id: OwnedEventId,
) -> Result<usize, Self::Error>;

Expand All @@ -480,7 +481,7 @@ pub trait StateStore: AsyncTraitDeps {
async fn remove_dependent_send_queue_event(
&self,
room: &RoomId,
id: usize,
own_txn_id: &TransactionId,
) -> Result<bool, Self::Error>;

/// List all the dependent send queue events.
Expand Down Expand Up @@ -767,33 +768,34 @@ impl<T: StateStore> StateStore for EraseStateStoreError<T> {
async fn save_dependent_send_queue_event(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
parent_txn_id: &TransactionId,
own_txn_id: OwnedTransactionId,
content: DependentQueuedEventKind,
) -> Result<(), Self::Error> {
self.0
.save_dependent_send_queue_event(room_id, transaction_id, content)
.save_dependent_send_queue_event(room_id, parent_txn_id, own_txn_id, content)
.await
.map_err(Into::into)
}

async fn update_dependent_send_queue_event(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
parent_txn_id: &TransactionId,
event_id: OwnedEventId,
) -> Result<usize, Self::Error> {
self.0
.update_dependent_send_queue_event(room_id, transaction_id, event_id)
.update_dependent_send_queue_event(room_id, parent_txn_id, event_id)
.await
.map_err(Into::into)
}

async fn remove_dependent_send_queue_event(
&self,
room_id: &RoomId,
id: usize,
own_txn_id: &TransactionId,
) -> Result<bool, Self::Error> {
self.0.remove_dependent_send_queue_event(room_id, id).await.map_err(Into::into)
self.0.remove_dependent_send_queue_event(room_id, own_txn_id).await.map_err(Into::into)
}

async fn list_dependent_send_queue_events(
Expand Down Expand Up @@ -1262,7 +1264,7 @@ pub struct DependentQueuedEvent {
/// Unique identifier for this dependent queued event.
///
/// Useful for deletion.
pub id: usize,
pub own_transaction_id: OwnedTransactionId,

/// The kind of user intent.
pub kind: DependentQueuedEventKind,
Expand All @@ -1272,7 +1274,7 @@ pub struct DependentQueuedEvent {
/// Note: this is the transaction id used for the depended-on event, i.e.
/// the one that was originally sent and that's being modified with this
/// dependent event.
pub transaction_id: OwnedTransactionId,
pub parent_transaction_id: OwnedTransactionId,

/// If the parent event has been sent, the parent's event identifier
/// returned by the server once the local echo has been sent out.
Expand Down
22 changes: 12 additions & 10 deletions crates/matrix-sdk-indexeddb/src/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1575,7 +1575,8 @@ impl_state_store!({
async fn save_dependent_send_queue_event(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
parent_txn_id: &TransactionId,
own_txn_id: OwnedTransactionId,
content: DependentQueuedEventKind,
) -> Result<()> {
let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
Expand All @@ -1596,14 +1597,11 @@ impl_state_store!({
|val| self.deserialize_value::<Vec<DependentQueuedEvent>>(&val),
)?;

// Find the next id by taking the biggest ID we had before, and add 1.
let next_id = prev.iter().fold(0, |max, item| item.id.max(max)) + 1;

// Push the new event.
prev.push(DependentQueuedEvent {
id: next_id,
kind: content,
transaction_id: transaction_id.to_owned(),
parent_transaction_id: parent_txn_id.to_owned(),
own_transaction_id: own_txn_id,
event_id: None,
});

Expand All @@ -1618,7 +1616,7 @@ impl_state_store!({
async fn update_dependent_send_queue_event(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
parent_txn_id: &TransactionId,
event_id: OwnedEventId,
) -> Result<usize> {
let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
Expand All @@ -1641,7 +1639,7 @@ impl_state_store!({

// Modify all events that match.
let mut num_updated = 0;
for entry in prev.iter_mut().filter(|entry| entry.transaction_id == transaction_id) {
for entry in prev.iter_mut().filter(|entry| entry.parent_transaction_id == parent_txn_id) {
entry.event_id = Some(event_id.clone());
num_updated += 1;
}
Expand All @@ -1654,7 +1652,11 @@ impl_state_store!({
Ok(num_updated)
}

async fn remove_dependent_send_queue_event(&self, room_id: &RoomId, id: usize) -> Result<bool> {
async fn remove_dependent_send_queue_event(
&self,
room_id: &RoomId,
txn_id: &TransactionId,
) -> Result<bool> {
let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);

let tx = self.inner.transaction_on_one_with_mode(
Expand All @@ -1668,7 +1670,7 @@ impl_state_store!({
// Reload the previous vector for this room.
if let Some(val) = obj.get(&encoded_key)?.await? {
let mut prev = self.deserialize_value::<Vec<DependentQueuedEvent>>(&val)?;
if let Some(pos) = prev.iter().position(|item| item.id == id) {
if let Some(pos) = prev.iter().position(|item| item.own_transaction_id == txn_id) {
prev.remove(pos);

if prev.is_empty() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ CREATE TABLE "dependent_send_queue_events" (
"room_id" BLOB NOT NULL,

-- This is used as both a key and a value, thus neither encrypted/decrypted/hashed.
"transaction_id" BLOB NOT NULL,
-- This is the transaction id for the *parent* transaction, not our own.
"parent_transaction_id" BLOB NOT NULL,

-- This is used as both a key and a value, thus neither encrypted/decrypted/hashed.
-- This is a transaction id used for the dependent event itself, not the parent.
"own_transaction_id" BLOB NOT NULL,

-- Used as a value (thus encrypted/decrypted), can be null.
"event_id" BLOB NULL,
Expand Down
Loading

0 comments on commit 2eb6930

Please sign in to comment.