Skip to content

Commit

Permalink
refactor(store): Improve database performance dynamic queries (#381)
Browse files Browse the repository at this point in the history
* feat(macros): add to_sql_select for subjects

* refactor(store): Improve database performance on queries

* fix(store): pagination logic
  • Loading branch information
pedronauck authored Jan 18, 2025
1 parent 3a81ed6 commit 7f32333
Show file tree
Hide file tree
Showing 38 changed files with 698 additions and 157 deletions.
6 changes: 3 additions & 3 deletions crates/fuel-streams-core/src/stream/stream_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub use async_nats::Subscriber as StreamLiveSubscriber;
use fuel_message_broker::MessageBroker;
use fuel_streams_macros::subject::IntoSubject;
use fuel_streams_store::{
db::{Db, DbItem},
db::{Db, StoreItem},
record::Record,
store::Store,
};
Expand All @@ -17,8 +17,8 @@ use tokio::{sync::OnceCell, time::sleep};
use super::{config, StreamError};
use crate::server::DeliverPolicy;

pub type BoxedStreamItem = Result<(String, Vec<u8>), StreamError>;
pub type BoxedStream = Box<dyn FStream<Item = BoxedStreamItem> + Send + Unpin>;
pub type BoxedStoreItem = Result<(String, Vec<u8>), StreamError>;
pub type BoxedStream = Box<dyn FStream<Item = BoxedStoreItem> + Send + Unpin>;

#[derive(Debug, Clone)]
pub struct Stream<S: Record> {
Expand Down
14 changes: 0 additions & 14 deletions crates/fuel-streams-domains/src/blocks/db_item.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::cmp::Ordering;

use fuel_streams_store::{
db::{DbError, DbItem},
record::{DataEncoder, RecordEntity, RecordPacket, RecordPacketError},
Expand Down Expand Up @@ -36,18 +34,6 @@ impl DbItem for BlockDbItem {
}
}

impl PartialOrd for BlockDbItem {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for BlockDbItem {
fn cmp(&self, other: &Self) -> Ordering {
self.block_height.cmp(&other.block_height)
}
}

impl TryFrom<&RecordPacket> for BlockDbItem {
type Error = RecordPacketError;
fn try_from(packet: &RecordPacket) -> Result<Self, Self::Error> {
Expand Down
2 changes: 2 additions & 0 deletions crates/fuel-streams-domains/src/blocks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
mod db_item;
mod packets;
mod record_impl;
mod store_item;
pub mod subjects;
pub mod types;

pub use db_item::*;
pub use store_item::*;
pub use subjects::*;
pub use types::*;
4 changes: 3 additions & 1 deletion crates/fuel-streams-domains/src/blocks/record_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use fuel_streams_store::{
};
use sqlx::PgExecutor;

use super::{Block, BlockDbItem};
use super::{Block, BlockDbItem, BlockStoreItem};

impl DataEncoder for Block {
type Err = DbError;
Expand All @@ -14,6 +14,8 @@ impl DataEncoder for Block {
#[async_trait]
impl Record for Block {
type DbItem = BlockDbItem;
type StoreItem = BlockStoreItem;

const ENTITY: RecordEntity = RecordEntity::Block;
const ORDER_PROPS: &'static [&'static str] = &["block_height"];

Expand Down
46 changes: 46 additions & 0 deletions crates/fuel-streams-domains/src/blocks/store_item.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use std::cmp::Ordering;

use fuel_streams_store::{
db::{DbError, StoreItem},
record::{DataEncoder, RecordEntity},
};
use serde::{Deserialize, Serialize};

#[derive(
Debug, Clone, Serialize, Deserialize, PartialEq, Eq, sqlx::FromRow,
)]
pub struct BlockStoreItem {
pub subject: String,
pub value: Vec<u8>,
pub block_height: i64, // This is our order prop
}

impl DataEncoder for BlockStoreItem {
type Err = DbError;
}

impl StoreItem for BlockStoreItem {
fn entity(&self) -> &RecordEntity {
&RecordEntity::Block
}

fn encoded_value(&self) -> &[u8] {
&self.value
}

fn subject_str(&self) -> String {
self.subject.clone()
}
}

impl PartialOrd for BlockStoreItem {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for BlockStoreItem {
fn cmp(&self, other: &Self) -> Ordering {
self.block_height.cmp(&other.block_height)
}
}
20 changes: 0 additions & 20 deletions crates/fuel-streams-domains/src/inputs/db_item.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::cmp::Ordering;

use fuel_streams_store::{
db::{DbError, DbItem},
record::{DataEncoder, RecordEntity, RecordPacket, RecordPacketError},
Expand Down Expand Up @@ -44,24 +42,6 @@ impl DbItem for InputDbItem {
}
}

impl PartialOrd for InputDbItem {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for InputDbItem {
fn cmp(&self, other: &Self) -> Ordering {
// Order by block height first
self.block_height
.cmp(&other.block_height)
// Then by transaction index within the block
.then(self.tx_index.cmp(&other.tx_index))
// Finally by input index within the transaction
.then(self.input_index.cmp(&other.input_index))
}
}

impl TryFrom<&RecordPacket> for InputDbItem {
type Error = RecordPacketError;
fn try_from(packet: &RecordPacket) -> Result<Self, Self::Error> {
Expand Down
2 changes: 2 additions & 0 deletions crates/fuel-streams-domains/src/inputs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
mod db_item;
mod packets;
mod record_impl;
mod store_item;
pub mod subjects;
pub mod types;

pub use db_item::*;
pub use store_item::*;
pub use subjects::*;
pub use types::*;
3 changes: 2 additions & 1 deletion crates/fuel-streams-domains/src/inputs/record_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use fuel_streams_store::{
};
use sqlx::PgExecutor;

use super::{Input, InputDbItem};
use super::{Input, InputDbItem, InputStoreItem};

impl DataEncoder for Input {
type Err = DbError;
Expand All @@ -14,6 +14,7 @@ impl DataEncoder for Input {
#[async_trait]
impl Record for Input {
type DbItem = InputDbItem;
type StoreItem = InputStoreItem;

const ENTITY: RecordEntity = RecordEntity::Input;
const ORDER_PROPS: &'static [&'static str] =
Expand Down
54 changes: 54 additions & 0 deletions crates/fuel-streams-domains/src/inputs/store_item.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::cmp::Ordering;

use fuel_streams_store::{
db::{DbError, StoreItem},
record::{DataEncoder, RecordEntity},
};
use serde::{Deserialize, Serialize};

#[derive(
Debug, Clone, Serialize, Deserialize, PartialEq, Eq, sqlx::FromRow,
)]
pub struct InputStoreItem {
pub subject: String,
pub value: Vec<u8>,
pub block_height: i64,
pub tx_index: i64,
pub input_index: i64,
}

impl DataEncoder for InputStoreItem {
type Err = DbError;
}

impl StoreItem for InputStoreItem {
fn entity(&self) -> &RecordEntity {
&RecordEntity::Input
}

fn encoded_value(&self) -> &[u8] {
&self.value
}

fn subject_str(&self) -> String {
self.subject.clone()
}
}

impl PartialOrd for InputStoreItem {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for InputStoreItem {
fn cmp(&self, other: &Self) -> Ordering {
// Order by block height first
self.block_height
.cmp(&other.block_height)
// Then by transaction index within the block
.then(self.tx_index.cmp(&other.tx_index))
// Finally by input index within the transaction
.then(self.input_index.cmp(&other.input_index))
}
}
20 changes: 0 additions & 20 deletions crates/fuel-streams-domains/src/outputs/db_item.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::cmp::Ordering;

use fuel_streams_store::{
db::{DbError, DbItem},
record::{DataEncoder, RecordEntity, RecordPacket, RecordPacketError},
Expand Down Expand Up @@ -42,24 +40,6 @@ impl DbItem for OutputDbItem {
}
}

impl PartialOrd for OutputDbItem {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for OutputDbItem {
fn cmp(&self, other: &Self) -> Ordering {
// Order by block height first
self.block_height
.cmp(&other.block_height)
// Then by transaction index within the block
.then(self.tx_index.cmp(&other.tx_index))
// Finally by output index within the transaction
.then(self.output_index.cmp(&other.output_index))
}
}

impl TryFrom<&RecordPacket> for OutputDbItem {
type Error = RecordPacketError;
fn try_from(packet: &RecordPacket) -> Result<Self, Self::Error> {
Expand Down
2 changes: 2 additions & 0 deletions crates/fuel-streams-domains/src/outputs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
mod db_item;
mod packets;
mod record_impl;
mod store_item;
pub mod subjects;
pub mod types;

pub use db_item::*;
pub use store_item::*;
pub use subjects::*;
pub use types::*;
3 changes: 2 additions & 1 deletion crates/fuel-streams-domains/src/outputs/record_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use fuel_streams_store::{
};
use sqlx::PgExecutor;

use super::{Output, OutputDbItem};
use super::{Output, OutputDbItem, OutputStoreItem};

impl DataEncoder for Output {
type Err = DbError;
Expand All @@ -14,6 +14,7 @@ impl DataEncoder for Output {
#[async_trait]
impl Record for Output {
type DbItem = OutputDbItem;
type StoreItem = OutputStoreItem;

const ENTITY: RecordEntity = RecordEntity::Output;
const ORDER_PROPS: &'static [&'static str] =
Expand Down
54 changes: 54 additions & 0 deletions crates/fuel-streams-domains/src/outputs/store_item.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::cmp::Ordering;

use fuel_streams_store::{
db::{DbError, StoreItem},
record::{DataEncoder, RecordEntity},
};
use serde::{Deserialize, Serialize};

#[derive(
Debug, Clone, Serialize, Deserialize, PartialEq, Eq, sqlx::FromRow,
)]
pub struct OutputStoreItem {
pub subject: String,
pub value: Vec<u8>,
pub block_height: i64,
pub tx_index: i64,
pub output_index: i64,
}

impl DataEncoder for OutputStoreItem {
type Err = DbError;
}

impl StoreItem for OutputStoreItem {
fn entity(&self) -> &RecordEntity {
&RecordEntity::Output
}

fn encoded_value(&self) -> &[u8] {
&self.value
}

fn subject_str(&self) -> String {
self.subject.clone()
}
}

impl PartialOrd for OutputStoreItem {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for OutputStoreItem {
fn cmp(&self, other: &Self) -> Ordering {
// Order by block height first
self.block_height
.cmp(&other.block_height)
// Then by transaction index within the block
.then(self.tx_index.cmp(&other.tx_index))
// Finally by output index within the transaction
.then(self.output_index.cmp(&other.output_index))
}
}
20 changes: 0 additions & 20 deletions crates/fuel-streams-domains/src/receipts/db_item.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::cmp::Ordering;

use fuel_streams_store::{
db::{DbError, DbItem},
record::{DataEncoder, RecordEntity, RecordPacket, RecordPacketError},
Expand Down Expand Up @@ -47,24 +45,6 @@ impl DbItem for ReceiptDbItem {
}
}

impl PartialOrd for ReceiptDbItem {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for ReceiptDbItem {
fn cmp(&self, other: &Self) -> Ordering {
// Order by block height first
self.block_height
.cmp(&other.block_height)
// Then by transaction index within the block
.then(self.tx_index.cmp(&other.tx_index))
// Finally by receipt index within the transaction
.then(self.receipt_index.cmp(&other.receipt_index))
}
}

impl TryFrom<&RecordPacket> for ReceiptDbItem {
type Error = RecordPacketError;
fn try_from(packet: &RecordPacket) -> Result<Self, Self::Error> {
Expand Down
2 changes: 2 additions & 0 deletions crates/fuel-streams-domains/src/receipts/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
mod db_item;
mod packets;
mod record_impl;
mod store_item;
pub mod subjects;
pub mod types;

pub use db_item::*;
pub use store_item::*;
pub use subjects::*;
pub use types::*;
Loading

0 comments on commit 7f32333

Please sign in to comment.