Skip to content

Commit

Permalink
fix(webserver): Never panic when sending wrong data format (#380)
Browse files Browse the repository at this point in the history
* feat(macros): add entity for subjects

* feat(macros): add schema() for subjects

* feat(repo): add subjects schema crate

* fix(webserver): dont throw when wrong subjects

* feat(webserver): change response payload
  • Loading branch information
pedronauck authored Jan 18, 2025
1 parent b2cf435 commit 2f0ff1d
Show file tree
Hide file tree
Showing 48 changed files with 448 additions and 569 deletions.
5 changes: 5 additions & 0 deletions .github/actions/setup-rust/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ runs:
target: ${{ inputs.target }}
components: clippy, rustfmt

- name: Install Rustfmt
uses: taiki-e/cache-cargo-install-action@v2
with:
tool: [email protected]

- uses: taiki-e/cache-cargo-install-action@v2
with:
tool: [email protected]
Expand Down
31 changes: 12 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
[workspace]
default-members = ["crates/fuel-streams"]
resolver = "2"
members = ["crates/*", "crates/fuel-streams-macros/subject-derive", "examples", "tests"]
members = [
"crates/*",
"crates/fuel-streams-macros/subject-derive",
"examples",
"scripts/subjects-schema",
"scripts/subjects-schema",
"tests",
]

[workspace.package]
authors = ["Fuel Labs <[email protected]>"]
Expand Down
11 changes: 11 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -347,3 +347,14 @@ cluster-down: pre-cluster
CLUSTER_MODE=$(MODE) tilt --file ./Tiltfile down

cluster-reset: cluster-down cluster-up

# ------------------------------------------------------------
# Subjects Schema
# ------------------------------------------------------------

subjects-schema:
@echo "Generating subjects schema..."
@cd scripts/subjects-schema && cargo run
@cat scripts/subjects-schema/schema.json | pbcopy
@echo "Subjects schema copied to clipboard"
@rm -rf scripts/subjects-schema/schema.json
7 changes: 5 additions & 2 deletions crates/fuel-message-broker/src/msg_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,11 @@ pub type MessageBlockStream = Box<
+ Unpin,
>;

pub type MessageStream =
Box<dyn Stream<Item = Result<Vec<u8>, MessageBrokerError>> + Send + Unpin>;
pub type MessageStream = Box<
dyn Stream<Item = Result<(String, Vec<u8>), MessageBrokerError>>
+ Send
+ Unpin,
>;

#[async_trait]
pub trait MessageBroker: std::fmt::Debug + Send + Sync + 'static {
Expand Down
5 changes: 3 additions & 2 deletions crates/fuel-message-broker/src/nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ impl MessageBroker for NatsMessageBroker {
.subscribe(subject)
.await
.map_err(|e| MessageBrokerError::Subscription(e.to_string()))?
.map(|msg| Ok(msg.payload.to_vec()));
.map(|msg| Ok((msg.subject.to_string(), msg.payload.to_vec())));
Ok(Box::new(stream))
}

Expand Down Expand Up @@ -309,7 +309,8 @@ mod tests {
.publish_event("test.topic", vec![4, 5, 6].into())
.await?;
let result = receiver.await.expect("receiver task panicked")?;
assert_eq!(result, vec![4, 5, 6]);
let topic = format!("{}.{}", broker.namespace(), "test.topic");
assert_eq!(result, (topic, vec![4, 5, 6]));
Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions crates/fuel-streams-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub mod types {
utxos::types::*,
};
pub use fuel_streams_types::*;

pub use crate::server::DeliverPolicy;
}

pub mod subjects {
Expand Down
4 changes: 2 additions & 2 deletions crates/fuel-streams-core/src/server/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ pub enum ServerMessage {

#[derive(Debug, Serialize, Deserialize)]
pub struct ResponseMessage {
pub subject: String,
pub payload: serde_json::Value,
pub key: String,
pub data: serde_json::Value,
}

#[cfg(test)]
Expand Down
15 changes: 10 additions & 5 deletions crates/fuel-streams-core/src/stream/stream_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use tokio::{sync::OnceCell, time::sleep};
use super::{config, StreamError};
use crate::server::DeliverPolicy;

pub type BoxedStreamItem = Result<Vec<u8>, StreamError>;
pub type BoxedStreamItem = Result<(String, Vec<u8>), StreamError>;
pub type BoxedStream = Box<dyn FStream<Item = BoxedStreamItem> + Send + Unpin>;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -74,7 +74,7 @@ impl<R: Record> Stream<R> {
&self,
subject: Arc<dyn IntoSubject>,
deliver_policy: DeliverPolicy,
) -> BoxStream<'static, Result<Vec<u8>, StreamError>> {
) -> BoxStream<'static, Result<(String, Vec<u8>), StreamError>> {
let store = self.store.clone();
let broker = self.broker.clone();
let subject_clone = subject.clone();
Expand All @@ -84,14 +84,19 @@ impl<R: Record> Stream<R> {
let mut historical = store.stream_by_subject(&subject_clone, height);
while let Some(result) = historical.next().await {
let item = result.map_err(StreamError::Store)?;
yield item.encoded_value().to_vec();
let subject = item.subject_str();
let value = item.encoded_value().to_vec();
yield (subject, value);
let throttle_time = *config::STREAM_THROTTLE_HISTORICAL;
sleep(Duration::from_millis(throttle_time as u64)).await;
}
}
let mut live = broker.subscribe_to_events(&subject.parse()).await?;
while let Some(msg) = live.next().await {
yield msg?;
let msg = msg?;
let subject = msg.0;
let value = msg.1;
yield (subject, value);
let throttle_time = *config::STREAM_THROTTLE_LIVE;
sleep(Duration::from_millis(throttle_time as u64)).await;
}
Expand All @@ -103,7 +108,7 @@ impl<R: Record> Stream<R> {
&self,
subject: S,
deliver_policy: DeliverPolicy,
) -> BoxStream<'static, Result<Vec<u8>, StreamError>> {
) -> BoxStream<'static, Result<(String, Vec<u8>), StreamError>> {
let subject = Arc::new(subject);
self.subscribe_dynamic(subject, deliver_policy).await
}
Expand Down
1 change: 1 addition & 0 deletions crates/fuel-streams-domains/src/blocks/subjects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use super::types::*;

#[derive(Subject, Debug, Clone, Default, Serialize, Deserialize)]
#[subject(id = "blocks")]
#[subject(entity = "Block")]
#[subject(wildcard = "blocks.>")]
#[subject(format = "blocks.{producer}.{height}")]
pub struct BlocksSubject {
Expand Down
9 changes: 6 additions & 3 deletions crates/fuel-streams-domains/src/inputs/subjects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use crate::blocks::types::*;

#[derive(Subject, Debug, Clone, Default, Serialize, Deserialize)]
#[subject(id = "inputs_coin")]
#[subject(wildcard = "inputs.>")]
#[subject(entity = "Input")]
#[subject(wildcard = "inputs.coin.>")]
#[subject(
format = "inputs.coin.{block_height}.{tx_id}.{tx_index}.{input_index}.{owner}.{asset}"
)]
Expand All @@ -23,7 +24,8 @@ pub struct InputsCoinSubject {

#[derive(Subject, Debug, Clone, Default, Serialize, Deserialize)]
#[subject(id = "inputs_contract")]
#[subject(wildcard = "inputs.>")]
#[subject(entity = "Input")]
#[subject(wildcard = "inputs.contract.>")]
#[subject(
format = "inputs.contract.{block_height}.{tx_id}.{tx_index}.{input_index}.{contract}"
)]
Expand All @@ -38,7 +40,8 @@ pub struct InputsContractSubject {

#[derive(Subject, Debug, Clone, Default, Serialize, Deserialize)]
#[subject(id = "inputs_message")]
#[subject(wildcard = "inputs.>")]
#[subject(entity = "Input")]
#[subject(wildcard = "inputs.message.>")]
#[subject(
format = "inputs.message.{block_height}.{tx_id}.{tx_index}.{input_index}.{sender}.{recipient}"
)]
Expand Down
15 changes: 10 additions & 5 deletions crates/fuel-streams-domains/src/outputs/subjects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use crate::blocks::types::*;

#[derive(Subject, Debug, Clone, Default, Serialize, Deserialize)]
#[subject(id = "outputs_coin")]
#[subject(wildcard = "outputs.>")]
#[subject(entity = "Output")]
#[subject(wildcard = "outputs.coin.>")]
#[subject(
format = "outputs.coin.{block_height}.{tx_id}.{tx_index}.{output_index}.{to}.{asset}"
)]
Expand All @@ -23,7 +24,8 @@ pub struct OutputsCoinSubject {

#[derive(Subject, Debug, Clone, Default, Serialize, Deserialize)]
#[subject(id = "outputs_contract")]
#[subject(wildcard = "outputs.>")]
#[subject(entity = "Output")]
#[subject(wildcard = "outputs.contract.>")]
#[subject(
format = "outputs.contract.{block_height}.{tx_id}.{tx_index}.{output_index}.{contract}"
)]
Expand All @@ -38,7 +40,8 @@ pub struct OutputsContractSubject {

#[derive(Subject, Debug, Clone, Default, Serialize, Deserialize)]
#[subject(id = "outputs_change")]
#[subject(wildcard = "outputs.>")]
#[subject(entity = "Output")]
#[subject(wildcard = "outputs.change.>")]
#[subject(
format = "outputs.change.{block_height}.{tx_id}.{tx_index}.{output_index}.{to}.{asset}"
)]
Expand All @@ -55,7 +58,8 @@ pub struct OutputsChangeSubject {

#[derive(Subject, Debug, Clone, Default, Serialize, Deserialize)]
#[subject(id = "outputs_variable")]
#[subject(wildcard = "outputs.>")]
#[subject(entity = "Output")]
#[subject(wildcard = "outputs.variable.>")]
#[subject(
format = "outputs.variable.{block_height}.{tx_id}.{tx_index}.{output_index}.{to}.{asset}"
)]
Expand All @@ -72,7 +76,8 @@ pub struct OutputsVariableSubject {

#[derive(Subject, Debug, Clone, Default, Serialize, Deserialize)]
#[subject(id = "outputs_contract_created")]
#[subject(wildcard = "outputs.>")]
#[subject(entity = "Output")]
#[subject(wildcard = "outputs.contract_created.>")]
#[subject(
format = "outputs.contract_created.{block_height}.{tx_id}.{tx_index}.{output_index}.{contract}"
)]
Expand Down
Loading

0 comments on commit 2f0ff1d

Please sign in to comment.