diff --git a/crates/fuel-streams-core/src/stream/stream_impl.rs b/crates/fuel-streams-core/src/stream/stream_impl.rs index b31ac308..e83ec158 100644 --- a/crates/fuel-streams-core/src/stream/stream_impl.rs +++ b/crates/fuel-streams-core/src/stream/stream_impl.rs @@ -4,7 +4,7 @@ pub use async_nats::Subscriber as StreamLiveSubscriber; use fuel_message_broker::MessageBroker; use fuel_streams_macros::subject::IntoSubject; use fuel_streams_store::{ - db::{Db, DbItem}, + db::{Db, StoreItem}, record::Record, store::Store, }; @@ -17,8 +17,8 @@ use tokio::{sync::OnceCell, time::sleep}; use super::{config, StreamError}; use crate::server::DeliverPolicy; -pub type BoxedStreamItem = Result<(String, Vec), StreamError>; -pub type BoxedStream = Box + Send + Unpin>; +pub type BoxedStoreItem = Result<(String, Vec), StreamError>; +pub type BoxedStream = Box + Send + Unpin>; #[derive(Debug, Clone)] pub struct Stream { diff --git a/crates/fuel-streams-domains/src/blocks/db_item.rs b/crates/fuel-streams-domains/src/blocks/db_item.rs index e7dc8e4d..a642fa4a 100644 --- a/crates/fuel-streams-domains/src/blocks/db_item.rs +++ b/crates/fuel-streams-domains/src/blocks/db_item.rs @@ -1,5 +1,3 @@ -use std::cmp::Ordering; - use fuel_streams_store::{ db::{DbError, DbItem}, record::{DataEncoder, RecordEntity, RecordPacket, RecordPacketError}, @@ -36,18 +34,6 @@ impl DbItem for BlockDbItem { } } -impl PartialOrd for BlockDbItem { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for BlockDbItem { - fn cmp(&self, other: &Self) -> Ordering { - self.block_height.cmp(&other.block_height) - } -} - impl TryFrom<&RecordPacket> for BlockDbItem { type Error = RecordPacketError; fn try_from(packet: &RecordPacket) -> Result { diff --git a/crates/fuel-streams-domains/src/blocks/mod.rs b/crates/fuel-streams-domains/src/blocks/mod.rs index 8921f0aa..4ba911c6 100644 --- a/crates/fuel-streams-domains/src/blocks/mod.rs +++ b/crates/fuel-streams-domains/src/blocks/mod.rs @@ -1,9 +1,11 @@ mod db_item; mod packets; mod record_impl; +mod store_item; pub mod subjects; pub mod types; pub use db_item::*; +pub use store_item::*; pub use subjects::*; pub use types::*; diff --git a/crates/fuel-streams-domains/src/blocks/record_impl.rs b/crates/fuel-streams-domains/src/blocks/record_impl.rs index 74b01bf5..8a118fdc 100644 --- a/crates/fuel-streams-domains/src/blocks/record_impl.rs +++ b/crates/fuel-streams-domains/src/blocks/record_impl.rs @@ -5,7 +5,7 @@ use fuel_streams_store::{ }; use sqlx::PgExecutor; -use super::{Block, BlockDbItem}; +use super::{Block, BlockDbItem, BlockStoreItem}; impl DataEncoder for Block { type Err = DbError; @@ -14,6 +14,8 @@ impl DataEncoder for Block { #[async_trait] impl Record for Block { type DbItem = BlockDbItem; + type StoreItem = BlockStoreItem; + const ENTITY: RecordEntity = RecordEntity::Block; const ORDER_PROPS: &'static [&'static str] = &["block_height"]; diff --git a/crates/fuel-streams-domains/src/blocks/store_item.rs b/crates/fuel-streams-domains/src/blocks/store_item.rs new file mode 100644 index 00000000..1fcd684c --- /dev/null +++ b/crates/fuel-streams-domains/src/blocks/store_item.rs @@ -0,0 +1,46 @@ +use std::cmp::Ordering; + +use fuel_streams_store::{ + db::{DbError, StoreItem}, + record::{DataEncoder, RecordEntity}, +}; +use serde::{Deserialize, Serialize}; + +#[derive( + Debug, Clone, Serialize, Deserialize, PartialEq, Eq, sqlx::FromRow, +)] +pub struct BlockStoreItem { + pub subject: String, + pub value: Vec, + pub block_height: i64, // This is our order prop +} + +impl DataEncoder for BlockStoreItem { + type Err = DbError; +} + +impl StoreItem for BlockStoreItem { + fn entity(&self) -> &RecordEntity { + &RecordEntity::Block + } + + fn encoded_value(&self) -> &[u8] { + &self.value + } + + fn subject_str(&self) -> String { + self.subject.clone() + } +} + +impl PartialOrd for BlockStoreItem { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for BlockStoreItem { + fn cmp(&self, other: &Self) -> Ordering { + self.block_height.cmp(&other.block_height) + } +} diff --git a/crates/fuel-streams-domains/src/inputs/db_item.rs b/crates/fuel-streams-domains/src/inputs/db_item.rs index 52d0f09f..eb75316f 100644 --- a/crates/fuel-streams-domains/src/inputs/db_item.rs +++ b/crates/fuel-streams-domains/src/inputs/db_item.rs @@ -1,5 +1,3 @@ -use std::cmp::Ordering; - use fuel_streams_store::{ db::{DbError, DbItem}, record::{DataEncoder, RecordEntity, RecordPacket, RecordPacketError}, @@ -44,24 +42,6 @@ impl DbItem for InputDbItem { } } -impl PartialOrd for InputDbItem { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for InputDbItem { - fn cmp(&self, other: &Self) -> Ordering { - // Order by block height first - self.block_height - .cmp(&other.block_height) - // Then by transaction index within the block - .then(self.tx_index.cmp(&other.tx_index)) - // Finally by input index within the transaction - .then(self.input_index.cmp(&other.input_index)) - } -} - impl TryFrom<&RecordPacket> for InputDbItem { type Error = RecordPacketError; fn try_from(packet: &RecordPacket) -> Result { diff --git a/crates/fuel-streams-domains/src/inputs/mod.rs b/crates/fuel-streams-domains/src/inputs/mod.rs index 8921f0aa..4ba911c6 100644 --- a/crates/fuel-streams-domains/src/inputs/mod.rs +++ b/crates/fuel-streams-domains/src/inputs/mod.rs @@ -1,9 +1,11 @@ mod db_item; mod packets; mod record_impl; +mod store_item; pub mod subjects; pub mod types; pub use db_item::*; +pub use store_item::*; pub use subjects::*; pub use types::*; diff --git a/crates/fuel-streams-domains/src/inputs/record_impl.rs b/crates/fuel-streams-domains/src/inputs/record_impl.rs index 0e305836..c1b087de 100644 --- a/crates/fuel-streams-domains/src/inputs/record_impl.rs +++ b/crates/fuel-streams-domains/src/inputs/record_impl.rs @@ -5,7 +5,7 @@ use fuel_streams_store::{ }; use sqlx::PgExecutor; -use super::{Input, InputDbItem}; +use super::{Input, InputDbItem, InputStoreItem}; impl DataEncoder for Input { type Err = DbError; @@ -14,6 +14,7 @@ impl DataEncoder for Input { #[async_trait] impl Record for Input { type DbItem = InputDbItem; + type StoreItem = InputStoreItem; const ENTITY: RecordEntity = RecordEntity::Input; const ORDER_PROPS: &'static [&'static str] = diff --git a/crates/fuel-streams-domains/src/inputs/store_item.rs b/crates/fuel-streams-domains/src/inputs/store_item.rs new file mode 100644 index 00000000..1668b8e7 --- /dev/null +++ b/crates/fuel-streams-domains/src/inputs/store_item.rs @@ -0,0 +1,54 @@ +use std::cmp::Ordering; + +use fuel_streams_store::{ + db::{DbError, StoreItem}, + record::{DataEncoder, RecordEntity}, +}; +use serde::{Deserialize, Serialize}; + +#[derive( + Debug, Clone, Serialize, Deserialize, PartialEq, Eq, sqlx::FromRow, +)] +pub struct InputStoreItem { + pub subject: String, + pub value: Vec, + pub block_height: i64, + pub tx_index: i64, + pub input_index: i64, +} + +impl DataEncoder for InputStoreItem { + type Err = DbError; +} + +impl StoreItem for InputStoreItem { + fn entity(&self) -> &RecordEntity { + &RecordEntity::Input + } + + fn encoded_value(&self) -> &[u8] { + &self.value + } + + fn subject_str(&self) -> String { + self.subject.clone() + } +} + +impl PartialOrd for InputStoreItem { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for InputStoreItem { + fn cmp(&self, other: &Self) -> Ordering { + // Order by block height first + self.block_height + .cmp(&other.block_height) + // Then by transaction index within the block + .then(self.tx_index.cmp(&other.tx_index)) + // Finally by input index within the transaction + .then(self.input_index.cmp(&other.input_index)) + } +} diff --git a/crates/fuel-streams-domains/src/outputs/db_item.rs b/crates/fuel-streams-domains/src/outputs/db_item.rs index 079dd901..a2300c0a 100644 --- a/crates/fuel-streams-domains/src/outputs/db_item.rs +++ b/crates/fuel-streams-domains/src/outputs/db_item.rs @@ -1,5 +1,3 @@ -use std::cmp::Ordering; - use fuel_streams_store::{ db::{DbError, DbItem}, record::{DataEncoder, RecordEntity, RecordPacket, RecordPacketError}, @@ -42,24 +40,6 @@ impl DbItem for OutputDbItem { } } -impl PartialOrd for OutputDbItem { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for OutputDbItem { - fn cmp(&self, other: &Self) -> Ordering { - // Order by block height first - self.block_height - .cmp(&other.block_height) - // Then by transaction index within the block - .then(self.tx_index.cmp(&other.tx_index)) - // Finally by output index within the transaction - .then(self.output_index.cmp(&other.output_index)) - } -} - impl TryFrom<&RecordPacket> for OutputDbItem { type Error = RecordPacketError; fn try_from(packet: &RecordPacket) -> Result { diff --git a/crates/fuel-streams-domains/src/outputs/mod.rs b/crates/fuel-streams-domains/src/outputs/mod.rs index 8921f0aa..4ba911c6 100644 --- a/crates/fuel-streams-domains/src/outputs/mod.rs +++ b/crates/fuel-streams-domains/src/outputs/mod.rs @@ -1,9 +1,11 @@ mod db_item; mod packets; mod record_impl; +mod store_item; pub mod subjects; pub mod types; pub use db_item::*; +pub use store_item::*; pub use subjects::*; pub use types::*; diff --git a/crates/fuel-streams-domains/src/outputs/record_impl.rs b/crates/fuel-streams-domains/src/outputs/record_impl.rs index 1c88e57a..ad7307ae 100644 --- a/crates/fuel-streams-domains/src/outputs/record_impl.rs +++ b/crates/fuel-streams-domains/src/outputs/record_impl.rs @@ -5,7 +5,7 @@ use fuel_streams_store::{ }; use sqlx::PgExecutor; -use super::{Output, OutputDbItem}; +use super::{Output, OutputDbItem, OutputStoreItem}; impl DataEncoder for Output { type Err = DbError; @@ -14,6 +14,7 @@ impl DataEncoder for Output { #[async_trait] impl Record for Output { type DbItem = OutputDbItem; + type StoreItem = OutputStoreItem; const ENTITY: RecordEntity = RecordEntity::Output; const ORDER_PROPS: &'static [&'static str] = diff --git a/crates/fuel-streams-domains/src/outputs/store_item.rs b/crates/fuel-streams-domains/src/outputs/store_item.rs new file mode 100644 index 00000000..04ade799 --- /dev/null +++ b/crates/fuel-streams-domains/src/outputs/store_item.rs @@ -0,0 +1,54 @@ +use std::cmp::Ordering; + +use fuel_streams_store::{ + db::{DbError, StoreItem}, + record::{DataEncoder, RecordEntity}, +}; +use serde::{Deserialize, Serialize}; + +#[derive( + Debug, Clone, Serialize, Deserialize, PartialEq, Eq, sqlx::FromRow, +)] +pub struct OutputStoreItem { + pub subject: String, + pub value: Vec, + pub block_height: i64, + pub tx_index: i64, + pub output_index: i64, +} + +impl DataEncoder for OutputStoreItem { + type Err = DbError; +} + +impl StoreItem for OutputStoreItem { + fn entity(&self) -> &RecordEntity { + &RecordEntity::Output + } + + fn encoded_value(&self) -> &[u8] { + &self.value + } + + fn subject_str(&self) -> String { + self.subject.clone() + } +} + +impl PartialOrd for OutputStoreItem { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for OutputStoreItem { + fn cmp(&self, other: &Self) -> Ordering { + // Order by block height first + self.block_height + .cmp(&other.block_height) + // Then by transaction index within the block + .then(self.tx_index.cmp(&other.tx_index)) + // Finally by output index within the transaction + .then(self.output_index.cmp(&other.output_index)) + } +} diff --git a/crates/fuel-streams-domains/src/receipts/db_item.rs b/crates/fuel-streams-domains/src/receipts/db_item.rs index 80e15c06..e9e8c2c6 100644 --- a/crates/fuel-streams-domains/src/receipts/db_item.rs +++ b/crates/fuel-streams-domains/src/receipts/db_item.rs @@ -1,5 +1,3 @@ -use std::cmp::Ordering; - use fuel_streams_store::{ db::{DbError, DbItem}, record::{DataEncoder, RecordEntity, RecordPacket, RecordPacketError}, @@ -47,24 +45,6 @@ impl DbItem for ReceiptDbItem { } } -impl PartialOrd for ReceiptDbItem { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for ReceiptDbItem { - fn cmp(&self, other: &Self) -> Ordering { - // Order by block height first - self.block_height - .cmp(&other.block_height) - // Then by transaction index within the block - .then(self.tx_index.cmp(&other.tx_index)) - // Finally by receipt index within the transaction - .then(self.receipt_index.cmp(&other.receipt_index)) - } -} - impl TryFrom<&RecordPacket> for ReceiptDbItem { type Error = RecordPacketError; fn try_from(packet: &RecordPacket) -> Result { diff --git a/crates/fuel-streams-domains/src/receipts/mod.rs b/crates/fuel-streams-domains/src/receipts/mod.rs index 8921f0aa..4ba911c6 100644 --- a/crates/fuel-streams-domains/src/receipts/mod.rs +++ b/crates/fuel-streams-domains/src/receipts/mod.rs @@ -1,9 +1,11 @@ mod db_item; mod packets; mod record_impl; +mod store_item; pub mod subjects; pub mod types; pub use db_item::*; +pub use store_item::*; pub use subjects::*; pub use types::*; diff --git a/crates/fuel-streams-domains/src/receipts/record_impl.rs b/crates/fuel-streams-domains/src/receipts/record_impl.rs index e4792295..33d09364 100644 --- a/crates/fuel-streams-domains/src/receipts/record_impl.rs +++ b/crates/fuel-streams-domains/src/receipts/record_impl.rs @@ -5,7 +5,7 @@ use fuel_streams_store::{ }; use sqlx::PgExecutor; -use super::{Receipt, ReceiptDbItem}; +use super::{Receipt, ReceiptDbItem, ReceiptStoreItem}; impl DataEncoder for Receipt { type Err = DbError; @@ -14,6 +14,7 @@ impl DataEncoder for Receipt { #[async_trait] impl Record for Receipt { type DbItem = ReceiptDbItem; + type StoreItem = ReceiptStoreItem; const ENTITY: RecordEntity = RecordEntity::Receipt; const ORDER_PROPS: &'static [&'static str] = diff --git a/crates/fuel-streams-domains/src/receipts/store_item.rs b/crates/fuel-streams-domains/src/receipts/store_item.rs new file mode 100644 index 00000000..0123e69d --- /dev/null +++ b/crates/fuel-streams-domains/src/receipts/store_item.rs @@ -0,0 +1,54 @@ +use std::cmp::Ordering; + +use fuel_streams_store::{ + db::{DbError, StoreItem}, + record::{DataEncoder, RecordEntity}, +}; +use serde::{Deserialize, Serialize}; + +#[derive( + Debug, Clone, Serialize, Deserialize, PartialEq, Eq, sqlx::FromRow, +)] +pub struct ReceiptStoreItem { + pub subject: String, + pub value: Vec, + pub block_height: i64, + pub tx_index: i64, + pub receipt_index: i64, +} + +impl DataEncoder for ReceiptStoreItem { + type Err = DbError; +} + +impl StoreItem for ReceiptStoreItem { + fn entity(&self) -> &RecordEntity { + &RecordEntity::Receipt + } + + fn encoded_value(&self) -> &[u8] { + &self.value + } + + fn subject_str(&self) -> String { + self.subject.clone() + } +} + +impl PartialOrd for ReceiptStoreItem { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for ReceiptStoreItem { + fn cmp(&self, other: &Self) -> Ordering { + // Order by block height first + self.block_height + .cmp(&other.block_height) + // Then by transaction index within the block + .then(self.tx_index.cmp(&other.tx_index)) + // Finally by receipt index within the transaction + .then(self.receipt_index.cmp(&other.receipt_index)) + } +} diff --git a/crates/fuel-streams-domains/src/transactions/db_item.rs b/crates/fuel-streams-domains/src/transactions/db_item.rs index 962bedaa..f38a3810 100644 --- a/crates/fuel-streams-domains/src/transactions/db_item.rs +++ b/crates/fuel-streams-domains/src/transactions/db_item.rs @@ -1,5 +1,3 @@ -use std::cmp::Ordering; - use fuel_streams_store::{ db::{DbError, DbItem}, record::{DataEncoder, RecordEntity, RecordPacket, RecordPacketError}, @@ -39,22 +37,6 @@ impl DbItem for TransactionDbItem { } } -impl PartialOrd for TransactionDbItem { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for TransactionDbItem { - fn cmp(&self, other: &Self) -> Ordering { - // Order by block height first - self.block_height - .cmp(&other.block_height) - // Then by transaction index within the block - .then(self.tx_index.cmp(&other.tx_index)) - } -} - impl TryFrom<&RecordPacket> for TransactionDbItem { type Error = RecordPacketError; fn try_from(packet: &RecordPacket) -> Result { diff --git a/crates/fuel-streams-domains/src/transactions/mod.rs b/crates/fuel-streams-domains/src/transactions/mod.rs index 8921f0aa..4ba911c6 100644 --- a/crates/fuel-streams-domains/src/transactions/mod.rs +++ b/crates/fuel-streams-domains/src/transactions/mod.rs @@ -1,9 +1,11 @@ mod db_item; mod packets; mod record_impl; +mod store_item; pub mod subjects; pub mod types; pub use db_item::*; +pub use store_item::*; pub use subjects::*; pub use types::*; diff --git a/crates/fuel-streams-domains/src/transactions/record_impl.rs b/crates/fuel-streams-domains/src/transactions/record_impl.rs index 0d2881de..f18e5d50 100644 --- a/crates/fuel-streams-domains/src/transactions/record_impl.rs +++ b/crates/fuel-streams-domains/src/transactions/record_impl.rs @@ -5,7 +5,7 @@ use fuel_streams_store::{ }; use sqlx::PgExecutor; -use super::{Transaction, TransactionDbItem}; +use super::{Transaction, TransactionDbItem, TransactionStoreItem}; impl DataEncoder for Transaction { type Err = DbError; @@ -14,6 +14,7 @@ impl DataEncoder for Transaction { #[async_trait] impl Record for Transaction { type DbItem = TransactionDbItem; + type StoreItem = TransactionStoreItem; const ENTITY: RecordEntity = RecordEntity::Transaction; const ORDER_PROPS: &'static [&'static str] = &["block_height", "tx_index"]; diff --git a/crates/fuel-streams-domains/src/transactions/store_item.rs b/crates/fuel-streams-domains/src/transactions/store_item.rs new file mode 100644 index 00000000..c86a7118 --- /dev/null +++ b/crates/fuel-streams-domains/src/transactions/store_item.rs @@ -0,0 +1,51 @@ +use std::cmp::Ordering; + +use fuel_streams_store::{ + db::{DbError, StoreItem}, + record::{DataEncoder, RecordEntity}, +}; +use serde::{Deserialize, Serialize}; + +#[derive( + Debug, Clone, Serialize, Deserialize, PartialEq, Eq, sqlx::FromRow, +)] +pub struct TransactionStoreItem { + pub subject: String, + pub value: Vec, + pub block_height: i64, + pub tx_index: i64, +} + +impl DataEncoder for TransactionStoreItem { + type Err = DbError; +} + +impl StoreItem for TransactionStoreItem { + fn entity(&self) -> &RecordEntity { + &RecordEntity::Transaction + } + + fn encoded_value(&self) -> &[u8] { + &self.value + } + + fn subject_str(&self) -> String { + self.subject.clone() + } +} + +impl PartialOrd for TransactionStoreItem { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for TransactionStoreItem { + fn cmp(&self, other: &Self) -> Ordering { + // Order by block height first + self.block_height + .cmp(&other.block_height) + // Then by transaction index within the block + .then(self.tx_index.cmp(&other.tx_index)) + } +} diff --git a/crates/fuel-streams-domains/src/utxos/db_item.rs b/crates/fuel-streams-domains/src/utxos/db_item.rs index dab77d55..8fe0748c 100644 --- a/crates/fuel-streams-domains/src/utxos/db_item.rs +++ b/crates/fuel-streams-domains/src/utxos/db_item.rs @@ -1,5 +1,3 @@ -use std::cmp::Ordering; - use fuel_streams_store::{ db::{DbError, DbItem}, record::{DataEncoder, RecordEntity, RecordPacket, RecordPacketError}, @@ -40,24 +38,6 @@ impl DbItem for UtxoDbItem { } } -impl PartialOrd for UtxoDbItem { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for UtxoDbItem { - fn cmp(&self, other: &Self) -> Ordering { - // Order by block height first - self.block_height - .cmp(&other.block_height) - // Then by transaction index within the block - .then(self.tx_index.cmp(&other.tx_index)) - // Finally by input index - .then(self.input_index.cmp(&other.input_index)) - } -} - impl TryFrom<&RecordPacket> for UtxoDbItem { type Error = RecordPacketError; fn try_from(packet: &RecordPacket) -> Result { diff --git a/crates/fuel-streams-domains/src/utxos/mod.rs b/crates/fuel-streams-domains/src/utxos/mod.rs index 8921f0aa..4ba911c6 100644 --- a/crates/fuel-streams-domains/src/utxos/mod.rs +++ b/crates/fuel-streams-domains/src/utxos/mod.rs @@ -1,9 +1,11 @@ mod db_item; mod packets; mod record_impl; +mod store_item; pub mod subjects; pub mod types; pub use db_item::*; +pub use store_item::*; pub use subjects::*; pub use types::*; diff --git a/crates/fuel-streams-domains/src/utxos/record_impl.rs b/crates/fuel-streams-domains/src/utxos/record_impl.rs index b94465e6..eee1e1de 100644 --- a/crates/fuel-streams-domains/src/utxos/record_impl.rs +++ b/crates/fuel-streams-domains/src/utxos/record_impl.rs @@ -5,7 +5,7 @@ use fuel_streams_store::{ }; use sqlx::PgExecutor; -use super::{Utxo, UtxoDbItem}; +use super::{Utxo, UtxoDbItem, UtxoStoreItem}; impl DataEncoder for Utxo { type Err = DbError; @@ -14,6 +14,7 @@ impl DataEncoder for Utxo { #[async_trait] impl Record for Utxo { type DbItem = UtxoDbItem; + type StoreItem = UtxoStoreItem; const ENTITY: RecordEntity = RecordEntity::Utxo; const ORDER_PROPS: &'static [&'static str] = diff --git a/crates/fuel-streams-domains/src/utxos/store_item.rs b/crates/fuel-streams-domains/src/utxos/store_item.rs new file mode 100644 index 00000000..3ae37160 --- /dev/null +++ b/crates/fuel-streams-domains/src/utxos/store_item.rs @@ -0,0 +1,54 @@ +use std::cmp::Ordering; + +use fuel_streams_store::{ + db::{DbError, StoreItem}, + record::{DataEncoder, RecordEntity}, +}; +use serde::{Deserialize, Serialize}; + +#[derive( + Debug, Clone, Serialize, Deserialize, PartialEq, Eq, sqlx::FromRow, +)] +pub struct UtxoStoreItem { + pub subject: String, + pub value: Vec, + pub block_height: i64, + pub tx_index: i64, + pub input_index: i64, +} + +impl DataEncoder for UtxoStoreItem { + type Err = DbError; +} + +impl StoreItem for UtxoStoreItem { + fn entity(&self) -> &RecordEntity { + &RecordEntity::Utxo + } + + fn encoded_value(&self) -> &[u8] { + &self.value + } + + fn subject_str(&self) -> String { + self.subject.clone() + } +} + +impl PartialOrd for UtxoStoreItem { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for UtxoStoreItem { + fn cmp(&self, other: &Self) -> Ordering { + // Order by block height first + self.block_height + .cmp(&other.block_height) + // Then by transaction index within the block + .then(self.tx_index.cmp(&other.tx_index)) + // Finally by input index + .then(self.input_index.cmp(&other.input_index)) + } +} diff --git a/crates/fuel-streams-macros/src/lib.rs b/crates/fuel-streams-macros/src/lib.rs index 9c11bc98..8ec40feb 100644 --- a/crates/fuel-streams-macros/src/lib.rs +++ b/crates/fuel-streams-macros/src/lib.rs @@ -54,7 +54,8 @@ pub mod subject { fn id(&self) -> &'static str; fn parse(&self) -> String; fn wildcard(&self) -> &'static str; - fn to_sql_where(&self) -> String; + fn to_sql_where(&self) -> Option; + fn to_sql_select(&self) -> Option; fn schema(&self) -> Schema; } impl_downcast!(IntoSubject); diff --git a/crates/fuel-streams-macros/subject-derive/src/into_subject.rs b/crates/fuel-streams-macros/subject-derive/src/into_subject.rs index da5c59f8..becc86d4 100644 --- a/crates/fuel-streams-macros/subject-derive/src/into_subject.rs +++ b/crates/fuel-streams-macros/subject-derive/src/into_subject.rs @@ -50,13 +50,44 @@ pub fn to_sql_where_fn(fields: &[FieldInfo]) -> TokenStream { .collect(); quote! { - fn to_sql_where(&self) -> String { + fn to_sql_where(&self) -> Option { let mut conditions = Vec::new(); #(#conditions)* if conditions.is_empty() { - "TRUE".to_string() + None + } else { + Some(conditions.join(" AND ")) + } + } + } +} + +pub fn to_sql_select_fn(fields: &[FieldInfo]) -> TokenStream { + let column_names: Vec = fields + .iter() + .map(|field| { + let name = field.ident; + let column_name = match &field.attributes.sql_column { + Some(val) => val.clone(), + None => name.to_string(), + }; + + quote! { + if self.#name.is_some() { + columns.push(#column_name.to_string()); + } + } + }) + .collect(); + + quote! { + fn to_sql_select(&self) -> Option { + let mut columns = Vec::new(); + #(#column_names)* + if columns.is_empty() { + None } else { - conditions.join(" AND ") + Some(columns.join(", ")) } } } diff --git a/crates/fuel-streams-macros/subject-derive/src/lib.rs b/crates/fuel-streams-macros/subject-derive/src/lib.rs index 7ad15c76..8ad71acd 100644 --- a/crates/fuel-streams-macros/subject-derive/src/lib.rs +++ b/crates/fuel-streams-macros/subject-derive/src/lib.rs @@ -21,6 +21,7 @@ pub fn subject_derive(input: TokenStream) -> TokenStream { let wildcard_fn = into_subject::wildcard_fn(); let parse_fn = into_subject::parse_fn(&input, &field_names); let to_sql_where_fn = into_subject::to_sql_where_fn(&field_infos); + let to_sql_select_fn = into_subject::to_sql_select_fn(&field_infos); let from_json_fn = into_subject::from_json_fn(&field_names); let to_json_fn = into_subject::to_json_fn(); let schema_fn = into_subject::schema_fn(&input, &field_names, &field_types); @@ -43,6 +44,7 @@ pub fn subject_derive(input: TokenStream) -> TokenStream { #parse_fn #wildcard_fn #to_sql_where_fn + #to_sql_select_fn #schema_fn } diff --git a/crates/fuel-streams-macros/tests/subject-derive.rs b/crates/fuel-streams-macros/tests/subject-derive.rs index 05665d2f..9a66e3bd 100644 --- a/crates/fuel-streams-macros/tests/subject-derive.rs +++ b/crates/fuel-streams-macros/tests/subject-derive.rs @@ -66,7 +66,10 @@ fn subject_derive_sql_where_exact_match() { assert_eq!(subject.parse(), "test.foo.55.bar"); assert_eq!( subject.to_sql_where(), - "field_id1 = 'foo' AND field_id2 = '55' AND field_id3 = 'bar'" + Some( + "field_id1 = 'foo' AND field_id2 = '55' AND field_id3 = 'bar'" + .to_string() + ) ); } @@ -81,7 +84,7 @@ fn subject_derive_sql_where_wildcards() { assert_eq!(subject.parse(), "test.*.55.bar"); assert_eq!( subject.to_sql_where(), - "field_id2 = '55' AND field_id3 = 'bar'" + Some("field_id2 = '55' AND field_id3 = 'bar'".to_string()) ); } @@ -95,7 +98,7 @@ fn subject_derive_sql_where_greater_than() { assert_eq!( subject.to_sql_where(), - "field_id1 = 'foo' AND field_id3 = 'bar'" + Some("field_id1 = 'foo' AND field_id3 = 'bar'".to_string()) ); } @@ -108,15 +111,15 @@ fn subject_derive_sql_where_table_only() { }; assert_eq!(subject.parse(), "test.>"); - assert_eq!(subject.to_sql_where(), "TRUE"); + assert_eq!(subject.to_sql_where(), None); let subject2 = TestSubject::default(); assert_eq!(subject2.parse(), "test.>"); - assert_eq!(subject2.to_sql_where(), "TRUE"); + assert_eq!(subject2.to_sql_where(), None); let subject3 = TestSubject::new(); assert_eq!(subject3.parse(), "test.>"); - assert_eq!(subject3.to_sql_where(), "TRUE"); + assert_eq!(subject3.to_sql_where(), None); } #[test] @@ -252,3 +255,32 @@ fn subject_derive_schema() { assert_eq!(schema, expected_schema); } + +#[test] +fn subject_derive_sql_select() { + // Test with all fields + let subject = TestSubject { + field1: Some("foo".to_string()), + field2: Some(55), + field3: Some("bar".to_string()), + }; + assert_eq!( + subject.to_sql_select(), + Some("field_id1, field_id2, field_id3".to_string()) + ); + + // Test with partial fields + let subject = TestSubject { + field1: Some("foo".to_string()), + field2: None, + field3: Some("bar".to_string()), + }; + assert_eq!( + subject.to_sql_select(), + Some("field_id1, field_id3".to_string()) + ); + + // Test with no fields + let subject = TestSubject::default(); + assert_eq!(subject.to_sql_select(), None); +} diff --git a/crates/fuel-streams-store/src/db/db_item.rs b/crates/fuel-streams-store/src/db/db_item.rs index ac580006..7163c269 100644 --- a/crates/fuel-streams-store/src/db/db_item.rs +++ b/crates/fuel-streams-store/src/db/db_item.rs @@ -12,8 +12,28 @@ pub trait DbItem: + std::fmt::Debug + PartialEq + Eq + + Send + + Sync + + Sized + + serde::Serialize + + serde::de::DeserializeOwned + + for<'r> sqlx::FromRow<'r, PgRow> + + 'static +{ + fn entity(&self) -> &RecordEntity; + fn encoded_value(&self) -> &[u8]; + fn subject_str(&self) -> String; +} + +#[async_trait] +pub trait StoreItem: + DataEncoder + + Unpin + + std::fmt::Debug + + PartialEq + Ord + PartialOrd + + Eq + Send + Sync + Sized diff --git a/crates/fuel-streams-store/src/record/query_options.rs b/crates/fuel-streams-store/src/record/query_options.rs index 89182727..be676e23 100644 --- a/crates/fuel-streams-store/src/record/query_options.rs +++ b/crates/fuel-streams-store/src/record/query_options.rs @@ -3,6 +3,7 @@ pub struct QueryOptions { pub offset: i64, pub limit: i64, pub from_block: Option, + pub to_block: Option, pub namespace: Option, } impl Default for QueryOptions { @@ -11,6 +12,7 @@ impl Default for QueryOptions { offset: 0, limit: 100, from_block: None, + to_block: None, namespace: None, } } @@ -36,4 +38,8 @@ impl QueryOptions { pub fn increment_offset(&mut self) { self.offset += self.limit; } + pub fn with_to_block(mut self, to_block: Option) -> Self { + self.to_block = to_block; + self + } } diff --git a/crates/fuel-streams-store/src/record/record_impl.rs b/crates/fuel-streams-store/src/record/record_impl.rs index a95904c6..2e2978a8 100644 --- a/crates/fuel-streams-store/src/record/record_impl.rs +++ b/crates/fuel-streams-store/src/record/record_impl.rs @@ -6,7 +6,7 @@ use fuel_streams_macros::subject::IntoSubject; use sqlx::{PgConnection, PgExecutor, Postgres, QueryBuilder}; use super::{QueryOptions, RecordEntity, RecordPacket}; -use crate::db::{Db, DbError, DbItem, DbResult}; +use crate::db::{Db, DbError, DbItem, DbResult, StoreItem}; pub trait RecordEncoder: DataEncoder {} impl> RecordEncoder for T {} @@ -17,6 +17,7 @@ pub type DbConnection = PgConnection; #[async_trait] pub trait Record: RecordEncoder + 'static { type DbItem: DbItem; + type StoreItem: StoreItem; const ENTITY: RecordEntity; const ORDER_PROPS: &'static [&'static str]; @@ -43,8 +44,12 @@ pub trait Record: RecordEncoder + 'static { RecordPacket::new(subject.to_owned(), value) } - async fn from_db_item(record: &Self::DbItem) -> DbResult { - Self::decode(record.encoded_value()).await + fn from_db_item(record: &Self::DbItem) -> DbResult { + Self::decode_json(record.encoded_value()) + } + + fn from_store_item(item: &Self::StoreItem) -> DbResult { + Self::decode_json(item.encoded_value()) } fn build_find_many_query( @@ -52,23 +57,41 @@ pub trait Record: RecordEncoder + 'static { options: QueryOptions, ) -> QueryBuilder<'static, Postgres> { let mut query_builder: QueryBuilder = QueryBuilder::default(); - let select = format!("SELECT * FROM {}", Self::ENTITY.table_name()); - query_builder.push(select); - query_builder.push(" WHERE "); - query_builder.push(subject.to_sql_where()); + query_builder.push("SELECT "); + query_builder.push(Self::build_select_fields()); + query_builder.push(" FROM "); + query_builder.push(Self::ENTITY.table_name()); + + let mut conditions = Vec::new(); + + // Add subject conditions if any + if let Some(where_clause) = subject.to_sql_where() { + conditions.push(where_clause); + } + + // Add block conditions if let Some(block) = options.from_block { - query_builder.push(" AND block_height >= "); - query_builder.push_bind(block as i64); + conditions.push(format!("block_height >= {}", block)); } + if let Some(block) = options.to_block { + conditions.push(format!("block_height < {}", block)); + } + + // Add namespace condition for tests if cfg!(any(test, feature = "test-helpers")) { if let Some(ns) = options.namespace { - query_builder.push(" AND subject LIKE "); - query_builder.push_bind(format!("{}%", ns)); + conditions.push(format!("subject LIKE '{ns}%'")); } } + // Add WHERE clause if we have any conditions + if !conditions.is_empty() { + query_builder.push(" WHERE "); + query_builder.push(conditions.join(" AND ")); + } + query_builder.push(" ORDER BY "); query_builder.push(Self::ORDER_PROPS.join(", ")); query_builder.push(" ASC LIMIT "); @@ -78,6 +101,20 @@ pub trait Record: RecordEncoder + 'static { query_builder } + fn build_select_fields() -> String { + let mut select_fields = vec![ + "_id".to_string(), + "subject".to_string(), + "value".to_string(), + ]; + + // Add order fields first + let order_fields: Vec = + Self::ORDER_PROPS.iter().map(|&s| s.to_string()).collect(); + select_fields.extend(order_fields); + select_fields.join(", ") + } + async fn find_last_record( db: &Db, options: QueryOptions, diff --git a/crates/fuel-streams-store/src/store/store_impl.rs b/crates/fuel-streams-store/src/store/store_impl.rs index 21e2d14c..aaeb9b2d 100644 --- a/crates/fuel-streams-store/src/store/store_impl.rs +++ b/crates/fuel-streams-store/src/store/store_impl.rs @@ -60,10 +60,12 @@ impl Store { &self, subject: &Arc, mut options: QueryOptions, - ) -> StoreResult> { + ) -> StoreResult> { options = options.with_namespace(self.namespace.clone()); - R::build_find_many_query(subject.clone(), options.clone()) - .build_query_as::() + let mut query = + R::build_find_many_query(subject.clone(), options.clone()); + query + .build_query_as::() .fetch_all(&self.db.pool) .await .map_err(StoreError::from) @@ -73,7 +75,7 @@ impl Store { &self, subject: &Arc, from_block: Option, - ) -> BoxStream<'static, Result> { + ) -> BoxStream<'static, Result> { let db = Arc::clone(&self.db); let namespace = self.namespace.clone(); let subject = subject.clone(); @@ -84,7 +86,7 @@ impl Store { .with_limit(*config::STORE_PAGINATION_LIMIT); let mut query = R::build_find_many_query(subject, options.clone()); let mut stream = query - .build_query_as::() + .build_query_as::() .fetch(&db.pool); while let Some(result) = stream.try_next().await? { yield Ok(result); diff --git a/tests/tests/services/consumer.rs b/tests/tests/services/consumer.rs index 66c5182b..62a7ca37 100644 --- a/tests/tests/services/consumer.rs +++ b/tests/tests/services/consumer.rs @@ -11,7 +11,7 @@ use fuel_streams_core::{ TransactionsSubject, UtxosSubject, }, - types::BlockHeight, + types::{BlockHeight, Transaction}, FuelStreams, }; use fuel_streams_domains::{MockMsgPayload, MsgPayload}; @@ -64,8 +64,13 @@ async fn verify_transactions( .into_iter() .map(|id| id.to_string()) .collect(); - let actual_tx_ids: Vec = - transactions.iter().map(|tx| tx.tx_id.clone()).collect(); + let actual_tx_ids: Vec = transactions + .iter() + .map(|tx| { + let decoded = Transaction::decode_json(&tx.value).unwrap(); + decoded.id.to_string() + }) + .collect(); assert_eq!( actual_tx_ids.len(), diff --git a/tests/tests/store/blocks.rs b/tests/tests/store/blocks.rs index b6e56a0c..66365c07 100644 --- a/tests/tests/store/blocks.rs +++ b/tests/tests/store/blocks.rs @@ -39,7 +39,7 @@ async fn store_can_record_blocks() -> anyhow::Result<()> { let db_record: BlockDbItem = store.insert_record(&packet).await?; assert_eq!(db_record.subject, packet.subject_str()); - assert_eq!(Block::from_db_item(&db_record).await?, block); + assert_eq!(Block::from_db_item(&db_record)?, block); Ok(()) } diff --git a/tests/tests/store/mod.rs b/tests/tests/store/mod.rs index ca8d2285..ccdd6709 100644 --- a/tests/tests/store/mod.rs +++ b/tests/tests/store/mod.rs @@ -2,6 +2,7 @@ mod blocks; mod inputs; mod outputs; mod pattern_matching; +mod query_builder; mod receipts; mod record; mod transactions; diff --git a/tests/tests/store/query_builder.rs b/tests/tests/store/query_builder.rs new file mode 100644 index 00000000..574237d7 --- /dev/null +++ b/tests/tests/store/query_builder.rs @@ -0,0 +1,184 @@ +use std::sync::Arc; + +use fuel_streams_core::{ + inputs::{InputsCoinSubject, InputsContractSubject, InputsMessageSubject}, + subjects::*, + types::{Block, Input}, +}; +use fuel_streams_domains::blocks::subjects::BlocksSubject; +use fuel_streams_store::record::{QueryOptions, Record}; +use fuel_streams_types::{Address, AssetId, ContractId, TxId}; +use pretty_assertions::assert_eq; + +#[test] +fn test_query_builder() { + let subject = Arc::new( + BlocksSubject::new() + .with_height(Some(50.into())) + .with_producer(Some(Address::default())), + ); + + let options = QueryOptions { + offset: 0, + limit: 10, + from_block: Some(100), + to_block: Some(200), + namespace: Some("test_ns".to_string()), + }; + + let sql_statement = subject.to_sql_select(); + let sql_where = subject.to_sql_where(); + assert_eq!( + sql_statement, + Some("producer_address, block_height".to_string()) + ); + assert_eq!( + sql_where, + Some(format!( + "producer_address = '{}' AND block_height = '50'", + Address::default() + )) + ); + + let query = Block::build_find_many_query(subject, options); + let sql = query.sql(); + + assert_eq!( + sql, + format!( + r#"SELECT _id, subject, value, block_height FROM blocks WHERE producer_address = '{}' AND block_height = '50' AND block_height >= 100 AND block_height < 200 AND subject LIKE 'test_ns%' ORDER BY block_height ASC LIMIT $1 OFFSET $2"#, + Address::default() + ) + ); +} + +#[test] +fn test_query_builder_with_no_subject_fields() { + let subject = Arc::new(BlocksSubject::new()); + + let options = QueryOptions::default(); + let query = Block::build_find_many_query(subject, options); + let sql = query.sql(); + + assert_eq!( + sql, + r#"SELECT _id, subject, value, block_height FROM blocks ORDER BY block_height ASC LIMIT $1 OFFSET $2"# + ); +} + +#[test] +fn test_query_builder_coin_input() { + let tx_id = TxId::default(); + let subject = Arc::new(InputsCoinSubject { + block_height: Some(100.into()), + tx_id: Some(tx_id), + tx_index: Some(1), + input_index: Some(2), + owner: Some(Address::default()), + asset: Some(AssetId::default()), + }); + + let options = QueryOptions { + offset: 0, + limit: 20, + from_block: Some(50), + to_block: Some(150), + namespace: Some("test_ns".to_string()), + }; + + let query = Input::build_find_many_query(subject, options); + let sql = query.sql(); + + assert_eq!( + sql, + format!( + r#"SELECT _id, subject, value, block_height, tx_index, input_index FROM inputs WHERE block_height = '100' AND tx_id = '{}' AND tx_index = '1' AND input_index = '2' AND owner_id = '{}' AND asset_id = '{}' AND block_height >= 50 AND block_height < 150 AND subject LIKE 'test_ns%' ORDER BY block_height, tx_index, input_index ASC LIMIT $1 OFFSET $2"#, + TxId::default(), + Address::default(), + AssetId::default(), + ) + ); +} + +#[test] +fn test_query_builder_contract_input() { + let contract_id = ContractId::default(); + let subject = Arc::new(InputsContractSubject { + block_height: Some(100.into()), + tx_id: None, + tx_index: None, + input_index: None, + contract: Some(contract_id.clone()), + }); + + let options = QueryOptions::default(); + let query = Input::build_find_many_query(subject, options); + let sql = query.sql(); + + assert_eq!( + sql, + format!( + r#"SELECT _id, subject, value, block_height, tx_index, input_index FROM inputs WHERE block_height = '100' AND contract_id = '{}' ORDER BY block_height, tx_index, input_index ASC LIMIT $1 OFFSET $2"#, + contract_id, + ) + ); +} + +#[test] +fn test_query_builder_message_input() { + let sender = Address::default(); + let subject = Arc::new(InputsMessageSubject { + block_height: None, + tx_id: None, + tx_index: None, + input_index: None, + sender: Some(sender.clone()), + recipient: None, + }); + + let options = QueryOptions::default(); + let query = Input::build_find_many_query(subject, options); + let sql = query.sql(); + + assert_eq!( + sql, + format!( + r#"SELECT _id, subject, value, block_height, tx_index, input_index FROM inputs WHERE sender_address = '{}' ORDER BY block_height, tx_index, input_index ASC LIMIT $1 OFFSET $2"#, + sender, + ) + ); +} + +#[test] +fn test_query_builder_empty_subject() { + let subject = Arc::new(InputsCoinSubject::new()); + let options = QueryOptions::default(); + + let query = Input::build_find_many_query(subject, options); + let sql = query.sql(); + + assert_eq!( + sql, + r#"SELECT _id, subject, value, block_height, tx_index, input_index FROM inputs ORDER BY block_height, tx_index, input_index ASC LIMIT $1 OFFSET $2"# + ); +} + +#[test] +fn test_query_builder_only_block_range() { + let subject = Arc::new(InputsMessageSubject::new()); + let options = QueryOptions { + offset: 0, + limit: 50, + from_block: Some(100), + to_block: Some(200), + namespace: None, + }; + + let query = Input::build_find_many_query(subject, options); + let sql = query.sql(); + + assert_eq!( + sql, + r#"SELECT _id, subject, value, block_height, tx_index, input_index FROM inputs WHERE block_height >= 100 AND block_height < 200 ORDER BY block_height, tx_index, input_index ASC LIMIT $1 OFFSET $2"# + ); +} diff --git a/tests/tests/store/record.rs b/tests/tests/store/record.rs index 0fe68209..f032fb1d 100644 --- a/tests/tests/store/record.rs +++ b/tests/tests/store/record.rs @@ -24,8 +24,8 @@ async fn test_multiple_inserts() -> anyhow::Result<()> { let db_record2 = db_items.get(1).unwrap(); let block1 = &blocks.first().unwrap().1; let block2 = &blocks.get(1).unwrap().1; - assert_eq!(&Block::from_db_item(db_record1).await?, block1); - assert_eq!(&Block::from_db_item(db_record2).await?, block2); + assert_eq!(&Block::from_db_item(db_record1)?, block1); + assert_eq!(&Block::from_db_item(db_record2)?, block2); // Verify both records are found let subject = BlocksSubject::new().with_height(None).dyn_arc(); @@ -54,7 +54,7 @@ async fn test_find_many_by_subject() -> anyhow::Result<()> { .find_many_by_subject(&subject1, QueryOptions::default()) .await?; assert_eq!(records.len(), 1); - assert_eq!(&Block::from_db_item(&records[0]).await?, block1); + assert_eq!(&Block::from_store_item(&records[0])?, block1); // Test finding by subject2 let subject2 = BlocksSubject::build(None, Some(2.into())).dyn_arc(); @@ -62,7 +62,7 @@ async fn test_find_many_by_subject() -> anyhow::Result<()> { .find_many_by_subject(&subject2, QueryOptions::default()) .await?; assert_eq!(records.len(), 1); - assert_eq!(&Block::from_db_item(&records[0]).await?, block2); + assert_eq!(&Block::from_store_item(&records[0])?, block2); Ok(()) } @@ -81,7 +81,7 @@ async fn test_find_last_record() -> anyhow::Result<()> { // Test finding last record let last_record = store.find_last_record().await?; assert!(last_record.is_some()); - let last_block = Block::from_db_item(&last_record.unwrap()).await?; + let last_block = Block::from_db_item(&last_record.unwrap())?; assert_eq!(&last_block, block4); Ok(()) @@ -123,7 +123,7 @@ async fn test_insert_with_transaction() -> anyhow::Result<()> { // Verify the records match the original blocks for (record, item) in found_records.iter().zip(blocks.iter()) { let (_, block) = item; - assert_eq!(&Block::from_db_item(record).await?, block); + assert_eq!(&Block::from_store_item(record)?, block); } Ok(())