diff --git a/bindings/nostr-sdk-ffi/src/client/mod.rs b/bindings/nostr-sdk-ffi/src/client/mod.rs index f68fb8a91..61fc16084 100644 --- a/bindings/nostr-sdk-ffi/src/client/mod.rs +++ b/bindings/nostr-sdk-ffi/src/client/mod.rs @@ -325,7 +325,7 @@ impl Client { pub async fn unsubscribe(&self, subscription_id: String) { self.inner - .unsubscribe(SubscriptionId::new(subscription_id)) + .unsubscribe(&SubscriptionId::new(subscription_id)) .await } diff --git a/bindings/nostr-sdk-ffi/src/protocol/message/client.rs b/bindings/nostr-sdk-ffi/src/protocol/message/client.rs index d698e2890..b632b5d48 100644 --- a/bindings/nostr-sdk-ffi/src/protocol/message/client.rs +++ b/bindings/nostr-sdk-ffi/src/protocol/message/client.rs @@ -2,6 +2,7 @@ // Copyright (c) 2023-2024 Rust Nostr Developers // Distributed under the MIT software license +use std::borrow::Cow; use std::ops::Deref; use std::sync::Arc; @@ -58,24 +59,22 @@ pub enum ClientMessageEnum { }, } -impl From for nostr::ClientMessage { +impl From for nostr::ClientMessage<'static> { fn from(value: ClientMessageEnum) -> Self { match value { - ClientMessageEnum::EventMsg { event } => { - Self::Event(Box::new(event.as_ref().deref().clone())) - } + ClientMessageEnum::EventMsg { event } => Self::event(event.as_ref().deref().clone()), ClientMessageEnum::Req { subscription_id, filter, - } => Self::Req { - subscription_id: SubscriptionId::new(subscription_id), - filter: Box::new(filter.as_ref().deref().clone()), - }, + } => Self::req( + SubscriptionId::new(subscription_id), + filter.as_ref().deref().clone(), + ), ClientMessageEnum::ReqMultiFilter { subscription_id, filters, } => Self::ReqMultiFilter { - subscription_id: SubscriptionId::new(subscription_id), + subscription_id: Cow::Owned(SubscriptionId::new(subscription_id)), filters: filters .into_iter() .map(|f| f.as_ref().deref().clone()) @@ -84,43 +83,41 @@ impl From for nostr::ClientMessage { ClientMessageEnum::Count { subscription_id, filter, - } => Self::Count { - subscription_id: SubscriptionId::new(subscription_id), - filter: Box::new(filter.as_ref().deref().clone()), - }, + } => Self::count( + SubscriptionId::new(subscription_id), + filter.as_ref().deref().clone(), + ), ClientMessageEnum::Close { subscription_id } => { - Self::Close(SubscriptionId::new(subscription_id)) - } - ClientMessageEnum::Auth { event } => { - Self::Auth(Box::new(event.as_ref().deref().clone())) + Self::close(SubscriptionId::new(subscription_id)) } + ClientMessageEnum::Auth { event } => Self::auth(event.as_ref().deref().clone()), ClientMessageEnum::NegOpen { subscription_id, filter, id_size, initial_message, } => Self::NegOpen { - subscription_id: SubscriptionId::new(subscription_id), - filter: Box::new(filter.as_ref().deref().clone()), + subscription_id: Cow::Owned(SubscriptionId::new(subscription_id)), + filter: Cow::Owned(filter.as_ref().deref().clone()), id_size, - initial_message, + initial_message: Cow::Owned(initial_message), }, ClientMessageEnum::NegMsg { subscription_id, message, } => Self::NegMsg { - subscription_id: SubscriptionId::new(subscription_id), - message, + subscription_id: Cow::Owned(SubscriptionId::new(subscription_id)), + message: Cow::Owned(message), }, ClientMessageEnum::NegClose { subscription_id } => Self::NegClose { - subscription_id: SubscriptionId::new(subscription_id), + subscription_id: Cow::Owned(SubscriptionId::new(subscription_id)), }, } } } -impl From for ClientMessageEnum { - fn from(value: nostr::ClientMessage) -> Self { +impl<'a> From> for ClientMessageEnum { + fn from(value: nostr::ClientMessage<'a>) -> Self { match value { nostr::ClientMessage::Event(event) => Self::EventMsg { event: Arc::new(event.as_ref().to_owned().into()), @@ -130,7 +127,7 @@ impl From for ClientMessageEnum { filter, } => Self::Req { subscription_id: subscription_id.to_string(), - filter: Arc::new((*filter).into()), + filter: Arc::new(filter.into_owned().into()), }, nostr::ClientMessage::ReqMultiFilter { subscription_id, @@ -144,7 +141,7 @@ impl From for ClientMessageEnum { filter, } => Self::Count { subscription_id: subscription_id.to_string(), - filter: Arc::new((*filter).into()), + filter: Arc::new(filter.into_owned().into()), }, nostr::ClientMessage::Close(subscription_id) => Self::Close { subscription_id: subscription_id.to_string(), @@ -161,14 +158,14 @@ impl From for ClientMessageEnum { subscription_id: subscription_id.to_string(), filter: Arc::new(filter.as_ref().to_owned().into()), id_size, - initial_message, + initial_message: initial_message.into_owned(), }, nostr::ClientMessage::NegMsg { subscription_id, message, } => Self::NegMsg { subscription_id: subscription_id.to_string(), - message, + message: message.into_owned(), }, nostr::ClientMessage::NegClose { subscription_id } => Self::NegClose { subscription_id: subscription_id.to_string(), @@ -180,19 +177,19 @@ impl From for ClientMessageEnum { #[derive(Debug, PartialEq, Eq, Object)] #[uniffi::export(Debug, Eq)] pub struct ClientMessage { - inner: nostr::ClientMessage, + inner: nostr::ClientMessage<'static>, } impl Deref for ClientMessage { - type Target = nostr::ClientMessage; + type Target = nostr::ClientMessage<'static>; fn deref(&self) -> &Self::Target { &self.inner } } -impl From for ClientMessage { - fn from(inner: nostr::ClientMessage) -> Self { +impl From> for ClientMessage { + fn from(inner: nostr::ClientMessage<'static>) -> Self { Self { inner } } } diff --git a/bindings/nostr-sdk-ffi/src/protocol/message/relay.rs b/bindings/nostr-sdk-ffi/src/protocol/message/relay.rs index 437160d7a..0a003476a 100644 --- a/bindings/nostr-sdk-ffi/src/protocol/message/relay.rs +++ b/bindings/nostr-sdk-ffi/src/protocol/message/relay.rs @@ -3,6 +3,7 @@ // Distributed under the MIT software license use core::ops::Deref; +use std::borrow::Cow; use std::sync::Arc; use nostr::{JsonUtil, SubscriptionId}; @@ -49,8 +50,8 @@ pub enum RelayMessageEnum { }, } -impl From for RelayMessageEnum { - fn from(value: nostr::RelayMessage) -> Self { +impl<'a> From> for RelayMessageEnum { + fn from(value: nostr::RelayMessage<'a>) -> Self { match value { nostr::RelayMessage::Event { subscription_id, @@ -64,9 +65,11 @@ impl From for RelayMessageEnum { message, } => Self::Closed { subscription_id: subscription_id.to_string(), - message, + message: message.into_owned(), + }, + nostr::RelayMessage::Notice(message) => Self::Notice { + message: message.into_owned(), }, - nostr::RelayMessage::Notice(message) => Self::Notice { message }, nostr::RelayMessage::EndOfStoredEvents(sub_id) => Self::EndOfStoredEvents { subscription_id: sub_id.to_string(), }, @@ -77,9 +80,11 @@ impl From for RelayMessageEnum { } => Self::Ok { event_id: Arc::new(event_id.into()), status, - message, + message: message.into_owned(), + }, + nostr::RelayMessage::Auth { challenge } => Self::Auth { + challenge: challenge.into_owned(), }, - nostr::RelayMessage::Auth { challenge } => Self::Auth { challenge }, nostr::RelayMessage::Count { subscription_id, count, @@ -92,37 +97,34 @@ impl From for RelayMessageEnum { message, } => Self::NegMsg { subscription_id: subscription_id.to_string(), - message, + message: message.into_owned(), }, nostr::RelayMessage::NegErr { subscription_id, message, } => Self::NegErr { subscription_id: subscription_id.to_string(), - message, + message: message.into_owned(), }, } } } -impl From for nostr::RelayMessage { +impl From for nostr::RelayMessage<'static> { fn from(value: RelayMessageEnum) -> Self { match value { RelayMessageEnum::EventMsg { subscription_id, event, - } => Self::Event { - subscription_id: SubscriptionId::new(subscription_id), - event: Box::new(event.as_ref().deref().clone()), - }, + } => Self::event( + SubscriptionId::new(subscription_id), + event.as_ref().deref().clone(), + ), RelayMessageEnum::Closed { subscription_id, message, - } => Self::Closed { - subscription_id: SubscriptionId::new(subscription_id), - message, - }, - RelayMessageEnum::Notice { message } => Self::Notice(message), + } => Self::closed(SubscriptionId::new(subscription_id), message), + RelayMessageEnum::Notice { message } => Self::notice(message), RelayMessageEnum::EndOfStoredEvents { subscription_id } => { Self::eose(SubscriptionId::new(subscription_id)) } @@ -130,32 +132,25 @@ impl From for nostr::RelayMessage { event_id, status, message, - } => Self::Ok { - event_id: **event_id, - status, - message, - }, - RelayMessageEnum::Auth { challenge } => Self::Auth { challenge }, + } => Self::ok(**event_id, status, message), + RelayMessageEnum::Auth { challenge } => Self::auth(challenge), RelayMessageEnum::Count { subscription_id, count, - } => Self::Count { - subscription_id: SubscriptionId::new(subscription_id), - count: count as usize, - }, + } => Self::count(SubscriptionId::new(subscription_id), count as usize), RelayMessageEnum::NegMsg { subscription_id, message, } => Self::NegMsg { - subscription_id: SubscriptionId::new(subscription_id), - message, + subscription_id: Cow::Owned(SubscriptionId::new(subscription_id)), + message: Cow::Owned(message), }, RelayMessageEnum::NegErr { subscription_id, message, } => Self::NegErr { - subscription_id: SubscriptionId::new(subscription_id), - message, + subscription_id: Cow::Owned(SubscriptionId::new(subscription_id)), + message: Cow::Owned(message), }, } } @@ -164,11 +159,11 @@ impl From for nostr::RelayMessage { #[derive(Debug, PartialEq, Eq, Hash, Object)] #[uniffi::export(Debug, Eq, Hash)] pub struct RelayMessage { - inner: nostr::RelayMessage, + inner: nostr::RelayMessage<'static>, } -impl From for RelayMessage { - fn from(inner: nostr::RelayMessage) -> Self { +impl From> for RelayMessage { + fn from(inner: nostr::RelayMessage<'static>) -> Self { Self { inner } } } diff --git a/bindings/nostr-sdk-ffi/src/relay/mod.rs b/bindings/nostr-sdk-ffi/src/relay/mod.rs index 5ed171a74..59b4a269a 100644 --- a/bindings/nostr-sdk-ffi/src/relay/mod.rs +++ b/bindings/nostr-sdk-ffi/src/relay/mod.rs @@ -278,7 +278,7 @@ impl Relay { /// Unsubscribe pub async fn unsubscribe(&self, id: String) -> Result<()> { - Ok(self.inner.unsubscribe(SubscriptionId::new(id)).await?) + Ok(self.inner.unsubscribe(&SubscriptionId::new(id)).await?) } /// Unsubscribe from all subscriptions diff --git a/bindings/nostr-sdk-js/src/client/mod.rs b/bindings/nostr-sdk-js/src/client/mod.rs index 99ed4ec15..3999bf258 100644 --- a/bindings/nostr-sdk-js/src/client/mod.rs +++ b/bindings/nostr-sdk-js/src/client/mod.rs @@ -342,7 +342,7 @@ impl JsClient { /// Unsubscribe pub async fn unsubscribe(&self, subscription_id: &str) { self.inner - .unsubscribe(SubscriptionId::new(subscription_id)) + .unsubscribe(&SubscriptionId::new(subscription_id)) .await; } diff --git a/bindings/nostr-sdk-js/src/protocol/message/client.rs b/bindings/nostr-sdk-js/src/protocol/message/client.rs index af6c63219..537cdc8bf 100644 --- a/bindings/nostr-sdk-js/src/protocol/message/client.rs +++ b/bindings/nostr-sdk-js/src/protocol/message/client.rs @@ -13,19 +13,19 @@ use crate::protocol::filter::JsFilter; #[wasm_bindgen(js_name = ClientMessage)] pub struct JsClientMessage { - inner: ClientMessage, + inner: ClientMessage<'static>, } impl Deref for JsClientMessage { - type Target = ClientMessage; + type Target = ClientMessage<'static>; fn deref(&self) -> &Self::Target { &self.inner } } -impl From for JsClientMessage { - fn from(inner: ClientMessage) -> Self { +impl From> for JsClientMessage { + fn from(inner: ClientMessage<'static>) -> Self { Self { inner } } } diff --git a/bindings/nostr-sdk-js/src/protocol/message/relay.rs b/bindings/nostr-sdk-js/src/protocol/message/relay.rs index 0e7141c39..632cac6c7 100644 --- a/bindings/nostr-sdk-js/src/protocol/message/relay.rs +++ b/bindings/nostr-sdk-js/src/protocol/message/relay.rs @@ -12,11 +12,11 @@ use crate::protocol::event::{JsEvent, JsEventId}; #[wasm_bindgen(js_name = RelayMessage)] pub struct JsRelayMessage { - inner: RelayMessage, + inner: RelayMessage<'static>, } -impl From for JsRelayMessage { - fn from(inner: RelayMessage) -> Self { +impl From> for JsRelayMessage { + fn from(inner: RelayMessage<'static>) -> Self { Self { inner } } } diff --git a/bindings/nostr-sdk-js/src/relay/mod.rs b/bindings/nostr-sdk-js/src/relay/mod.rs index 0095f0912..2b8120e24 100644 --- a/bindings/nostr-sdk-js/src/relay/mod.rs +++ b/bindings/nostr-sdk-js/src/relay/mod.rs @@ -239,7 +239,7 @@ impl JsRelay { /// Unsubscribe pub async fn unsubscribe(&self, id: String) -> Result<()> { self.inner - .unsubscribe(SubscriptionId::new(id)) + .unsubscribe(&SubscriptionId::new(id)) .await .map_err(into_err) } diff --git a/crates/nostr-relay-builder/src/local/inner.rs b/crates/nostr-relay-builder/src/local/inner.rs index 2b0aa5288..2d5951c23 100644 --- a/crates/nostr-relay-builder/src/local/inner.rs +++ b/crates/nostr-relay-builder/src/local/inner.rs @@ -2,6 +2,7 @@ // Copyright (c) 2023-2024 Rust Nostr Developers // Distributed under the MIT software license +use std::borrow::Cow; use std::collections::HashMap; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::Arc; @@ -191,7 +192,7 @@ impl InnerLocalRelay { } /// Pass bare [TcpStream] for handling - async fn handle_connection(&self, raw_stream: S, addr: SocketAddr) -> Result<()> + async fn handle_connection(self, raw_stream: S, addr: SocketAddr) -> Result<()> where S: AsyncRead + AsyncWrite + Unpin, { @@ -244,9 +245,9 @@ impl InnerLocalRelay { .await?; } Message::Binary(..) => { - let msg: RelayMessage = - RelayMessage::notice("binary messages are not processed by this relay"); - if let Err(e) = self.send_msg(&mut tx, msg).await { + let msg = + RelayMessage::Notice(Cow::Borrowed("binary messages are not processed by this relay")); + if let Err(e) = send_msg(&mut tx, msg).await { tracing::error!("Can't send msg to client: {e}"); } } @@ -265,9 +266,12 @@ impl InnerLocalRelay { event = new_event.recv() => { if let Ok(event) = event { // Iter subscriptions - for (id, filter) in session.subscriptions.iter() { + for (subscription_id, filter) in session.subscriptions.iter() { if filter.match_event(&event) { - self.send_msg(&mut tx, RelayMessage::event(id.to_owned(), event.clone())).await?; + send_msg(&mut tx, RelayMessage::Event{ + subscription_id: Cow::Borrowed(subscription_id), + event: Cow::Borrowed(&event) + }).await?; } } } @@ -288,7 +292,7 @@ impl InnerLocalRelay { &self, session: &mut Session<'_>, ws_tx: &mut WsTx, - msg: ClientMessage, + msg: ClientMessage<'_>, addr: &SocketAddr, ) -> Result<()> where @@ -300,16 +304,15 @@ impl InnerLocalRelay { if let RateLimiterResponse::Limited = session.check_rate_limit(self.rate_limit.notes_per_minute) { - return self - .send_msg( + return send_msg( ws_tx, RelayMessage::Ok { event_id: event.id, status: false, - message: format!( + message: Cow::Owned(format!( "{}: slow down", MachineReadablePrefix::RateLimited - ), + )), }, ) .await; @@ -318,16 +321,15 @@ impl InnerLocalRelay { // Check POW if let Some(difficulty) = self.min_pow { if !event.id.check_pow(difficulty) { - return self - .send_msg( + return send_msg( ws_tx, RelayMessage::Ok { event_id: event.id, status: false, - message: format!( + message: Cow::Owned(format!( "{}: required a difficulty >= {difficulty}", MachineReadablePrefix::Pow - ), + )), }, ) .await; @@ -341,25 +343,24 @@ impl InnerLocalRelay { // Check mode and if it's authenticated if nip42.mode.is_write() && !session.nip42.is_authenticated() { // Generate and send AUTH challenge - self.send_msg( + send_msg( ws_tx, RelayMessage::Auth { - challenge: session.nip42.generate_challenge(), + challenge: Cow::Owned(session.nip42.generate_challenge()), }, ) .await?; // Return error - return self - .send_msg( + return send_msg( ws_tx, RelayMessage::Ok { event_id: event.id, status: false, - message: format!( + message: Cow::Owned(format!( "{}: you must auth", MachineReadablePrefix::AuthRequired - ), + )), }, ) .await; @@ -370,13 +371,12 @@ impl InnerLocalRelay { for policy in self.write_policy.iter() { let event_id = event.id; if let PolicyResult::Reject(m) = policy.admit_event(&event, addr).await { - return self - .send_msg( + return send_msg( ws_tx, RelayMessage::Ok { event_id, status: false, - message: format!("{}: {}", MachineReadablePrefix::Blocked, m), + message: Cow::Owned(format!("{}: {}", MachineReadablePrefix::Blocked, m)), }, ) .await; @@ -387,31 +387,29 @@ impl InnerLocalRelay { let event_status = self.database.check_id(&event.id).await?; match event_status { DatabaseEventStatus::Saved => { - return self - .send_msg( + return send_msg( ws_tx, RelayMessage::Ok { event_id: event.id, status: true, - message: format!( + message: Cow::Owned(format!( "{}: already have this event", MachineReadablePrefix::Duplicate - ), + )), }, ) .await; } DatabaseEventStatus::Deleted => { - return self - .send_msg( + return send_msg( ws_tx, RelayMessage::Ok { event_id: event.id, status: false, - message: format!( + message: Cow::Owned(format!( "{}: this event is deleted", MachineReadablePrefix::Blocked - ), + )), }, ) .await; @@ -425,16 +423,15 @@ impl InnerLocalRelay { let tagged: bool = event.tags.public_keys().any(|p| p == &pk); if !authored && !tagged { - return self - .send_msg( + return send_msg( ws_tx, RelayMessage::Ok { event_id: event.id, status: false, - message: format!( + message: Cow::Owned(format!( "{}: event not related to owner of this relay", MachineReadablePrefix::Blocked - ), + )), }, ) .await; @@ -442,32 +439,30 @@ impl InnerLocalRelay { } if !event.verify_id() { - return self - .send_msg( + return send_msg( ws_tx, RelayMessage::Ok { event_id: event.id, status: false, - message: format!( + message: Cow::Owned(format!( "{}: invalid event ID", MachineReadablePrefix::Invalid - ), + )), }, ) .await; } if !event.verify_signature() { - return self - .send_msg( + return send_msg( ws_tx, RelayMessage::Ok { event_id: event.id, status: false, - message: format!( + message: Cow::Owned(format!( "{}: invalid event signature", MachineReadablePrefix::Invalid - ), + )), }, ) .await; @@ -477,16 +472,15 @@ impl InnerLocalRelay { let event_id = event.id; // Broadcast to channel - self.new_event.send(*event)?; + self.new_event.send(event.into_owned())?; // Send OK message - return self - .send_msg( + return send_msg( ws_tx, RelayMessage::Ok { event_id, status: true, - message: String::new(), + message: Cow::Owned(String::new()), }, ) .await; @@ -499,19 +493,19 @@ impl InnerLocalRelay { let event_id = event.id; // Broadcast to channel - self.new_event.send(*event)?; + self.new_event.send(event.into_owned())?; // Reply to client RelayMessage::Ok { event_id, status: true, - message: String::new(), + message: Cow::Owned(String::new()), } } else { RelayMessage::Ok { event_id: event.id, status: false, - message: format!("{}: unknown", MachineReadablePrefix::Error), + message: Cow::Owned(format!("{}: unknown", MachineReadablePrefix::Error)), } } } @@ -520,12 +514,12 @@ impl InnerLocalRelay { RelayMessage::Ok { event_id: event.id, status: false, - message: format!("{}: database error", MachineReadablePrefix::Error), + message: Cow::Owned(format!("{}: database error", MachineReadablePrefix::Error)), } } }; - self.send_msg(ws_tx, msg).await + send_msg(ws_tx, msg).await } ClientMessage::Req { subscription_id, @@ -535,15 +529,14 @@ impl InnerLocalRelay { if session.subscriptions.len() >= self.rate_limit.max_reqs && !session.subscriptions.contains_key(&subscription_id) { - return self - .send_msg( + return send_msg( ws_tx, RelayMessage::Closed { subscription_id, - message: format!( + message: Cow::Owned(format!( "{}: too many REQs", MachineReadablePrefix::RateLimited - ), + )), }, ) .await; @@ -556,24 +549,23 @@ impl InnerLocalRelay { // Check mode and if it's authenticated if nip42.mode.is_read() && !session.nip42.is_authenticated() { // Generate and send AUTH challenge - self.send_msg( + send_msg( ws_tx, RelayMessage::Auth { - challenge: session.nip42.generate_challenge(), + challenge: Cow::Owned(session.nip42.generate_challenge()), }, ) .await?; // Return error - return self - .send_msg( + return send_msg( ws_tx, RelayMessage::Closed { subscription_id, - message: format!( + message: Cow::Owned(format!( "{}: you must auth", MachineReadablePrefix::AuthRequired - ), + )), }, ) .await; @@ -583,49 +575,49 @@ impl InnerLocalRelay { // check query policy plugins for plugin in self.query_policy.iter() { if let PolicyResult::Reject(msg) = plugin.admit_query(&filter, addr).await { - return self - .send_msg( + return send_msg( ws_tx, RelayMessage::Closed { subscription_id, - message: format!("{}: {}", MachineReadablePrefix::Error, msg), + message: Cow::Owned(format!("{}: {}", MachineReadablePrefix::Error, msg)), }, ) .await; } } + let filter: Filter = filter.into_owned(); + // Update session subscriptions session .subscriptions - .insert(subscription_id.clone(), *filter.clone()); + .insert(subscription_id.clone().into_owned(), filter.clone()); // Query database - let events = self.database.query(*filter).await?; + let events = self.database.query(filter).await?; tracing::debug!( "Found {} events for subscription '{subscription_id}'", events.len() ); - let mut msgs: Vec = Vec::with_capacity(events.len() + 1); - msgs.extend( + let mut json_msgs: Vec = Vec::with_capacity(events.len() + 1); + json_msgs.extend( events .into_iter() - .map(|e| RelayMessage::event(subscription_id.clone(), e)), + .map(|event| RelayMessage::Event {subscription_id: Cow::Borrowed(subscription_id.as_ref()), event: Cow::Owned(event)}.as_json()), ); - msgs.push(RelayMessage::eose(subscription_id)); - - self.send_msgs(ws_tx, msgs).await?; + json_msgs.push(RelayMessage::EndOfStoredEvents(subscription_id).as_json()); - Ok(()) + // Send JSON messages + send_json_msgs(ws_tx, json_msgs).await } ClientMessage::ReqMultiFilter { subscription_id, .. } => { - self.send_msg( + send_msg( ws_tx, RelayMessage::Closed { subscription_id, - message: format!("{}: multi-filter REQs aren't supported (https://github.com/nostr-protocol/nips/pull/1645)", MachineReadablePrefix::Unsupported), + message: Cow::Owned(format!("{}: multi-filter REQs aren't supported (https://github.com/nostr-protocol/nips/pull/1645)", MachineReadablePrefix::Unsupported)), }, ).await } @@ -633,9 +625,8 @@ impl InnerLocalRelay { subscription_id, filter, } => { - let count: usize = self.database.count(*filter).await?; - self.send_msg(ws_tx, RelayMessage::count(subscription_id, count)) - .await + let count: usize = self.database.count(filter.into_owned()).await?; + send_msg(ws_tx, RelayMessage::Count { subscription_id, count }).await } ClientMessage::Close(subscription_id) => { session.subscriptions.remove(&subscription_id); @@ -643,23 +634,23 @@ impl InnerLocalRelay { } ClientMessage::Auth(event) => match session.nip42.check_challenge(&event) { Ok(()) => { - self.send_msg( + send_msg( ws_tx, RelayMessage::Ok { event_id: event.id, status: true, - message: String::new(), + message: Cow::Owned(String::new()), }, ) .await } Err(e) => { - self.send_msg( + send_msg( ws_tx, RelayMessage::Ok { event_id: event.id, status: false, - message: format!("{}: {e}", MachineReadablePrefix::AuthRequired), + message: Cow::Owned(format!("{}: {e}", MachineReadablePrefix::AuthRequired)), }, ) .await @@ -676,7 +667,7 @@ impl InnerLocalRelay { // TODO: check nip42? // Query database - let items = self.database.negentropy_items(*filter).await?; + let items = self.database.negentropy_items(filter.into_owned()).await?; tracing::debug!( id = %subscription_id, @@ -696,23 +687,24 @@ impl InnerLocalRelay { let mut negentropy = Negentropy::owned(storage, 60_000)?; // Reconcile - let bytes: Vec = hex::decode(initial_message)?; + let bytes: Vec = hex::decode(initial_message.as_ref())?; let message: Vec = negentropy.reconcile(&bytes)?; - // Update subscriptions - session - .negentropy_subscription - .insert(subscription_id.clone(), negentropy); - // Reply - self.send_msg( + send_msg( ws_tx, RelayMessage::NegMsg { - subscription_id, - message: hex::encode(message), + subscription_id: Cow::Borrowed(&subscription_id), + message: Cow::Owned(hex::encode(message)), }, ) - .await + .await?; + + // Update subscriptions + session + .negentropy_subscription + .insert(subscription_id.into_owned(), negentropy); + Ok(()) } ClientMessage::NegMsg { subscription_id, @@ -721,28 +713,28 @@ impl InnerLocalRelay { match session.negentropy_subscription.get_mut(&subscription_id) { Some(negentropy) => { // Reconcile - let bytes: Vec = hex::decode(message)?; + let bytes: Vec = hex::decode(message.as_ref())?; let message = negentropy.reconcile(&bytes)?; // Reply - self.send_msg( + send_msg( ws_tx, RelayMessage::NegMsg { subscription_id, - message: hex::encode(message), + message: Cow::Owned(hex::encode(message)), }, ) .await } None => { - self.send_msg( + send_msg( ws_tx, RelayMessage::NegErr { subscription_id, - message: format!( + message: Cow::Owned(format!( "{}: subscription not found", MachineReadablePrefix::Error - ), + )), }, ) .await @@ -755,25 +747,24 @@ impl InnerLocalRelay { } } } +} - #[inline] - async fn send_msg(&self, tx: &mut WsTx, msg: RelayMessage) -> Result<()> - where - S: AsyncRead + AsyncWrite + Unpin, - { - tx.send(Message::Text(msg.as_json().into())).await?; - Ok(()) - } +#[inline] +async fn send_msg(tx: &mut WsTx, msg: RelayMessage<'_>) -> Result<()> +where + S: AsyncRead + AsyncWrite + Unpin, +{ + tx.send(Message::Text(msg.as_json().into())).await?; + Ok(()) +} - #[inline] - async fn send_msgs(&self, tx: &mut WsTx, msgs: I) -> Result<()> - where - I: IntoIterator, - S: AsyncRead + AsyncWrite + Unpin, - { - let mut stream = - stream::iter(msgs.into_iter()).map(|msg| Ok(Message::Text(msg.as_json().into()))); - tx.send_all(&mut stream).await?; - Ok(()) - } +#[inline] +async fn send_json_msgs(tx: &mut WsTx, json_msgs: I) -> Result<()> +where + I: IntoIterator, + S: AsyncRead + AsyncWrite + Unpin, +{ + let mut stream = stream::iter(json_msgs.into_iter()).map(|msg| Ok(Message::Text(msg.into()))); + tx.send_all(&mut stream).await?; + Ok(()) } diff --git a/crates/nostr-relay-pool/src/pool/mod.rs b/crates/nostr-relay-pool/src/pool/mod.rs index c5f334144..d75c68cc7 100644 --- a/crates/nostr-relay-pool/src/pool/mod.rs +++ b/crates/nostr-relay-pool/src/pool/mod.rs @@ -62,7 +62,7 @@ pub enum RelayPoolNotification { /// The URL of the relay from which the message was received. relay_url: RelayUrl, /// The received relay message. - message: RelayMessage, + message: RelayMessage<'static>, }, /// Shutdown /// @@ -479,7 +479,11 @@ impl RelayPool { /// Send a client message to specific relays /// /// Note: **the relays must already be added!** - pub async fn send_msg_to(&self, urls: I, msg: ClientMessage) -> Result, Error> + pub async fn send_msg_to( + &self, + urls: I, + msg: ClientMessage<'_>, + ) -> Result, Error> where I: IntoIterator, U: TryIntoUrl, @@ -494,7 +498,7 @@ impl RelayPool { pub async fn batch_msg_to( &self, urls: I, - msgs: Vec, + msgs: Vec>, ) -> Result, Error> where I: IntoIterator, @@ -797,9 +801,9 @@ impl RelayPool { } /// Unsubscribe from subscription - pub async fn unsubscribe(&self, id: SubscriptionId) { + pub async fn unsubscribe(&self, id: &SubscriptionId) { // Remove subscription from pool - self.inner.remove_subscription(&id).await; + self.inner.remove_subscription(id).await; // Lock with read shared access let relays = self.inner.atomic.relays.read().await; @@ -808,7 +812,7 @@ impl RelayPool { // Remove subscription from relays for relay in relays.values() { - if let Err(e) = relay.unsubscribe(id.clone()).await { + if let Err(e) = relay.unsubscribe(id).await { tracing::error!("{e}"); } } diff --git a/crates/nostr-relay-pool/src/relay/inner.rs b/crates/nostr-relay-pool/src/relay/inner.rs index 01ae65c63..6e8fecc9e 100644 --- a/crates/nostr-relay-pool/src/relay/inner.rs +++ b/crates/nostr-relay-pool/src/relay/inner.rs @@ -2,6 +2,7 @@ // Copyright (c) 2023-2024 Rust Nostr Developers // Distributed under the MIT software license +use std::borrow::Cow; use std::cmp; use std::collections::{HashMap, HashSet}; #[cfg(feature = "nip11")] @@ -38,11 +39,13 @@ use crate::relay::status::AtomicRelayStatus; use crate::shared::SharedState; use crate::transport::websocket::{BoxSink, BoxStream}; +type ClientMessageJson = String; + #[derive(Debug)] struct RelayChannels { nostr: ( - Sender>, - Mutex>>, + Sender>, + Mutex>>, ), ping: Notify, terminate: Notify, @@ -60,6 +63,10 @@ impl RelayChannels { } pub fn send_client_msgs(&self, msgs: Vec) -> Result<(), Error> { + // Serialize messages to JSON + let msgs: Vec = msgs.into_iter().map(|msg| msg.as_json()).collect(); + + // Send self.nostr .0 .try_send(msgs) @@ -69,7 +76,7 @@ impl RelayChannels { } #[inline] - pub async fn rx_nostr(&self) -> MutexGuard<'_, Receiver>> { + pub async fn rx_nostr(&self) -> MutexGuard<'_, Receiver>> { self.nostr.1.lock().await } @@ -563,7 +570,7 @@ impl InnerRelay { async fn connect_and_run( &self, stream: Option<(BoxSink, BoxStream)>, - rx_nostr: &mut MutexGuard<'_, Receiver>>, + rx_nostr: &mut MutexGuard<'_, Receiver>>, last_ws_error: &mut Option, ) { match stream { @@ -606,7 +613,7 @@ impl InnerRelay { &self, mut ws_tx: BoxSink, ws_rx: BoxStream, - rx_nostr: &mut MutexGuard<'_, Receiver>>, + rx_nostr: &mut MutexGuard<'_, Receiver>>, ) { // Request information document #[cfg(feature = "nip11")] @@ -650,7 +657,7 @@ impl InnerRelay { async fn sender_message_handler( &self, ws_tx: &mut BoxSink, - rx_nostr: &mut MutexGuard<'_, Receiver>>, + rx_nostr: &mut MutexGuard<'_, Receiver>>, ping: &PingTracker, ) -> Result<(), Error> { #[cfg(target_arch = "wasm32")] @@ -660,10 +667,10 @@ impl InnerRelay { tokio::select! { // Nostr channel receiver Some(msgs) = rx_nostr.recv() => { - // Serialize messages to JSON and compose WebSocket text messages + // Compose WebSocket text messages let msgs: Vec = msgs .into_iter() - .map(|msg| Message::Text(msg.as_json())) + .map(Message::Text) .collect(); // Calculate messages size @@ -844,7 +851,7 @@ impl InnerRelay { // Check if NIP42 auto authentication is enabled if self.state.is_auto_authentication_enabled() { let relay = self.clone(); - let challenge: String = challenge.clone(); + let challenge: String = challenge.to_string(); task::spawn(async move { // Authenticate to relay match relay.auth(challenge).await { @@ -897,7 +904,10 @@ impl InnerRelay { } } - async fn handle_raw_relay_message(&self, msg: &str) -> Result, Error> { + async fn handle_raw_relay_message( + &self, + msg: &str, + ) -> Result>, Error> { let size: usize = msg.len(); tracing::trace!(url = %self.url, size = %size, msg = %msg, "Received new relay message."); @@ -927,7 +937,7 @@ impl InnerRelay { &self, subscription_id: String, event: RawEvent, - ) -> Result, Error> { + ) -> Result>, Error> { let kind: Kind = Kind::from(event.kind); // Check event size @@ -1018,7 +1028,6 @@ impl InnerRelay { } let subscription_id: SubscriptionId = SubscriptionId::new(subscription_id); - let event: Box = Box::new(event); // TODO: check if filter match @@ -1034,15 +1043,15 @@ impl InnerRelay { self.send_notification( RelayNotification::Event { subscription_id: subscription_id.clone(), - event: event.clone(), + event: Box::new(event.clone()), }, true, ); } Ok(Some(RelayMessage::Event { - subscription_id, - event, + subscription_id: Cow::Owned(subscription_id), + event: Cow::Owned(event), })) } @@ -1057,11 +1066,11 @@ impl InnerRelay { } #[inline] - pub fn send_msg(&self, msg: ClientMessage) -> Result<(), Error> { + pub fn send_msg(&self, msg: ClientMessage<'_>) -> Result<(), Error> { self.batch_msg(vec![msg]) } - pub fn batch_msg(&self, msgs: Vec) -> Result<(), Error> { + pub fn batch_msg(&self, msgs: Vec>) -> Result<(), Error> { // Perform health checks self.health_check()?; @@ -1080,20 +1089,20 @@ impl InnerRelay { return Err(Error::ReadDisabled); } - // Send message + // Send messages self.atomic.channels.send_client_msgs(msgs) } - fn send_neg_msg(&self, id: SubscriptionId, message: String) -> Result<(), Error> { + fn send_neg_msg(&self, id: &SubscriptionId, message: &str) -> Result<(), Error> { self.send_msg(ClientMessage::NegMsg { - subscription_id: id, - message, + subscription_id: Cow::Borrowed(id), + message: Cow::Borrowed(message), }) } - fn send_neg_close(&self, id: SubscriptionId) -> Result<(), Error> { + fn send_neg_close(&self, id: &SubscriptionId) -> Result<(), Error> { self.send_msg(ClientMessage::NegClose { - subscription_id: id, + subscription_id: Cow::Borrowed(id), }) } @@ -1109,14 +1118,13 @@ impl InnerRelay { // Subscribe to notifications let mut notifications = self.internal_notification_sender.subscribe(); - // Send AUTH message - let id: EventId = event.id; - self.send_msg(ClientMessage::auth(event))?; + // Send the AUTH message + self.send_msg(ClientMessage::Auth(Cow::Borrowed(&event)))?; // Wait for OK // The event ID is already checked in `wait_for_ok` method - let (_, status, message) = self - .wait_for_ok(&mut notifications, id, BATCH_EVENT_ITERATION_TIMEOUT) + let (status, message) = self + .wait_for_ok(&mut notifications, &event.id, BATCH_EVENT_ITERATION_TIMEOUT) .await?; // Check status @@ -1130,9 +1138,9 @@ impl InnerRelay { pub(super) async fn wait_for_ok( &self, notifications: &mut broadcast::Receiver, - id: EventId, + id: &EventId, timeout: Duration, - ) -> Result<(EventId, bool, String), Error> { + ) -> Result<(bool, String), Error> { time::timeout(Some(timeout), async { while let Ok(notification) = notifications.recv().await { match notification { @@ -1145,8 +1153,8 @@ impl InnerRelay { }, } => { // Check if it can return - if id == event_id { - return Ok((event_id, status, message)); + if id == &event_id { + return Ok((status, message.into_owned())); } } RelayNotification::RelayStatus { status } => { @@ -1166,10 +1174,14 @@ impl InnerRelay { } pub async fn resubscribe(&self) -> Result<(), Error> { + // TODO: avoid subscriptions clone let subscriptions = self.subscriptions().await; for (id, filter) in subscriptions.into_iter() { if !filter.is_empty() && self.should_resubscribe(&id).await { - self.send_msg(ClientMessage::req(id, filter))?; + self.send_msg(ClientMessage::Req { + subscription_id: Cow::Owned(id), + filter: Cow::Owned(filter), + })?; } else { tracing::debug!("Skip re-subscription of '{id}'"); } @@ -1189,7 +1201,7 @@ impl InnerRelay { task::spawn(async move { // Check if CLOSE needed let to_close: bool = match relay - .handle_auto_closing(&id, filter, opts, notifications) + .handle_auto_closing(&id, &filter, opts, notifications) .await { Some((to_close, reason)) => { @@ -1213,7 +1225,7 @@ impl InnerRelay { // Close subscription if to_close { tracing::debug!(id = %id, "Auto-closing subscription."); - relay.send_msg(ClientMessage::close(id))?; + relay.send_msg(ClientMessage::Close(Cow::Owned(id)))?; } Ok::<(), Error>(()) @@ -1223,7 +1235,7 @@ impl InnerRelay { async fn handle_auto_closing( &self, id: &SubscriptionId, - filter: Filter, + filter: &Filter, opts: SubscribeAutoCloseOptions, mut notifications: broadcast::Receiver, ) -> Option<(bool, Option)> { @@ -1251,7 +1263,7 @@ impl InnerRelay { RelayMessage::Event { subscription_id, .. } => { - if &subscription_id == id { + if subscription_id.as_ref() == id { // If no-events timeout is enabled, update instant of last event received if opts.idle_timeout.is_some() { last_event = Some(Instant::now()); @@ -1269,7 +1281,7 @@ impl InnerRelay { } } RelayMessage::EndOfStoredEvents(subscription_id) => { - if &subscription_id == id { + if subscription_id.as_ref() == id { received_eose = true; if let ReqExitPolicy::ExitOnEOSE | ReqExitPolicy::WaitDurationAfterEOSE(_) = opts.exit_policy @@ -1282,7 +1294,7 @@ impl InnerRelay { subscription_id, message, } => { - if &subscription_id == id { + if subscription_id.as_ref() == id { // Check if auth required match MachineReadablePrefix::parse(&message) { Some(MachineReadablePrefix::AuthRequired) => { @@ -1291,14 +1303,18 @@ impl InnerRelay { } else { return Some(( false, - Some(SubscriptionAutoClosedReason::Closed(message)), + Some(SubscriptionAutoClosedReason::Closed( + message.into_owned(), + )), )); // No need to send CLOSE msg } } _ => { return Some(( false, - Some(SubscriptionAutoClosedReason::Closed(message)), + Some(SubscriptionAutoClosedReason::Closed( + message.into_owned(), + )), )); // No need to send CLOSE msg } } @@ -1310,7 +1326,10 @@ impl InnerRelay { // Resend REQ if require_resubscription { require_resubscription = false; - let msg: ClientMessage = ClientMessage::req(id.clone(), filter.clone()); + let msg = ClientMessage::Req { + subscription_id: Cow::Borrowed(id), + filter: Cow::Borrowed(filter), + }; let _ = self.send_msg(msg); } } @@ -1358,18 +1377,18 @@ impl InnerRelay { .await? } - pub async fn unsubscribe(&self, id: SubscriptionId) -> Result<(), Error> { + pub async fn unsubscribe(&self, id: &SubscriptionId) -> Result<(), Error> { // Remove subscription - self.remove_subscription(&id).await; + self.remove_subscription(id).await; // Send CLOSE message - self.send_msg(ClientMessage::close(id)) + self.send_msg(ClientMessage::Close(Cow::Borrowed(id))) } pub async fn unsubscribe_all(&self) -> Result<(), Error> { - let subscriptions = self.subscriptions().await; + let subscriptions = self.atomic.subscriptions.read().await; - for id in subscriptions.into_keys() { + for id in subscriptions.keys() { self.unsubscribe(id).await?; } @@ -1379,7 +1398,7 @@ impl InnerRelay { #[inline(never)] fn handle_neg_msg( &self, - subscription_id: SubscriptionId, + subscription_id: &SubscriptionId, msg: Option>, curr_have_ids: I, curr_need_ids: I, @@ -1419,7 +1438,7 @@ impl InnerRelay { } match msg { - Some(query) => self.send_neg_msg(subscription_id, hex::encode(query)), + Some(query) => self.send_neg_msg(subscription_id, &hex::encode(query)), None => { // Mark sync as done *sync_done = true; @@ -1520,7 +1539,10 @@ impl InnerRelay { } let filter = Filter::new().ids(ids); - self.send_msg(ClientMessage::req(down_sub_id.clone(), filter))?; + self.send_msg(ClientMessage::Req { + subscription_id: Cow::Borrowed(down_sub_id), + filter: Cow::Owned(filter), + })?; *in_flight_down = true; @@ -1533,7 +1555,7 @@ impl InnerRelay { in_flight_up: &mut HashSet, event_id: EventId, status: bool, - message: String, + message: Cow<'_, str>, output: &mut Reconciliation, ) { if in_flight_up.remove(&event_id) { @@ -1551,10 +1573,10 @@ impl InnerRelay { .send_failures .entry(self.url.clone()) .and_modify(|map| { - map.insert(event_id, message.clone()); + map.insert(event_id, message.to_string()); }) .or_default() - .insert(event_id, message); + .insert(event_id, message.into_owned()); } } } @@ -1563,7 +1585,7 @@ impl InnerRelay { #[inline(never)] pub(super) async fn sync_new( &self, - filter: Filter, + filter: &Filter, items: Vec<(EventId, Timestamp)>, opts: &SyncOptions, output: &mut Reconciliation, @@ -1582,8 +1604,12 @@ impl InnerRelay { // Send initial negentropy message let sub_id: SubscriptionId = SubscriptionId::generate(); - let open_msg: ClientMessage = - ClientMessage::neg_open(sub_id.clone(), filter, hex::encode(initial_message)); + let open_msg: ClientMessage = ClientMessage::NegOpen { + subscription_id: Cow::Borrowed(&sub_id), + filter: Cow::Borrowed(filter), + id_size: None, + initial_message: Cow::Owned(hex::encode(initial_message)), + }; self.send_msg(open_msg)?; // Check if negentropy is supported @@ -1605,12 +1631,12 @@ impl InnerRelay { subscription_id, message, } => { - if subscription_id == sub_id { + if subscription_id.as_ref() == &sub_id { let mut curr_have_ids: Vec = Vec::new(); let mut curr_need_ids: Vec = Vec::new(); // Parse message - let query: Vec = hex::decode(message)?; + let query: Vec = hex::decode(message.as_ref())?; // Reconcile let msg: Option> = negentropy.reconcile_with_ids( @@ -1621,7 +1647,7 @@ impl InnerRelay { // Handle message self.handle_neg_msg( - subscription_id, + &subscription_id, msg, curr_have_ids.into_iter().map(neg_id_to_event_id), curr_need_ids.into_iter().map(neg_id_to_event_id), @@ -1637,8 +1663,8 @@ impl InnerRelay { subscription_id, message, } => { - if subscription_id == sub_id { - return Err(Error::RelayMessage(message)); + if subscription_id.as_ref() == &sub_id { + return Err(Error::RelayMessage(message.into_owned())); } } RelayMessage::Ok { @@ -1658,12 +1684,12 @@ impl InnerRelay { subscription_id, event, } => { - if subscription_id == down_sub_id { + if subscription_id.as_ref() == &down_sub_id { output.received.insert(event.id); } } RelayMessage::EndOfStoredEvents(id) => { - if id == down_sub_id { + if id.as_ref() == &down_sub_id { in_flight_down = false; } } @@ -1707,7 +1733,7 @@ impl InnerRelay { #[inline(never)] pub(super) async fn sync_deprecated( &self, - filter: Filter, + filter: &Filter, items: Vec<(EventId, Timestamp)>, opts: &SyncOptions, output: &mut Reconciliation, @@ -1729,11 +1755,11 @@ impl InnerRelay { // Send initial negentropy message let sub_id = SubscriptionId::generate(); - let open_msg = ClientMessage::NegOpen { - subscription_id: sub_id.clone(), - filter: Box::new(filter), + let open_msg: ClientMessage = ClientMessage::NegOpen { + subscription_id: Cow::Borrowed(&sub_id), + filter: Cow::Borrowed(filter), id_size: Some(32), - initial_message: hex::encode(initial_message), + initial_message: Cow::Owned(hex::encode(initial_message)), }; self.send_msg(open_msg)?; @@ -1756,12 +1782,13 @@ impl InnerRelay { subscription_id, message, } => { - if subscription_id == sub_id { + if subscription_id.as_ref() == &sub_id { let mut curr_have_ids: Vec = Vec::new(); let mut curr_need_ids: Vec = Vec::new(); // Parse message - let query: BytesDeprecated = BytesDeprecated::from_hex(message)?; + let query: BytesDeprecated = + BytesDeprecated::from_hex(message.as_ref())?; // Reconcile let msg: Option = negentropy.reconcile_with_ids( @@ -1772,7 +1799,7 @@ impl InnerRelay { // Handle message self.handle_neg_msg( - subscription_id, + &subscription_id, msg.map(|m| m.to_bytes()), curr_have_ids.into_iter().filter_map(neg_depr_to_event_id), curr_need_ids.into_iter().filter_map(neg_depr_to_event_id), @@ -1788,8 +1815,8 @@ impl InnerRelay { subscription_id, message, } => { - if subscription_id == sub_id { - return Err(Error::RelayMessage(message)); + if subscription_id.as_ref() == &sub_id { + return Err(Error::RelayMessage(message.into_owned())); } } RelayMessage::Ok { @@ -1809,12 +1836,12 @@ impl InnerRelay { subscription_id, event, } => { - if subscription_id == down_sub_id { + if subscription_id.as_ref() == &down_sub_id { output.received.insert(event.id); } } RelayMessage::EndOfStoredEvents(id) => { - if id == down_sub_id { + if id.as_ref() == &down_sub_id { in_flight_down = false; } } @@ -1914,7 +1941,7 @@ async fn check_negentropy_support( RelayMessage::NegMsg { subscription_id, .. } => { - if &subscription_id == sub_id { + if subscription_id.as_ref() == sub_id { break; } } @@ -1922,8 +1949,8 @@ async fn check_negentropy_support( subscription_id, message, } => { - if &subscription_id == sub_id { - return Err(Error::RelayMessage(message)); + if subscription_id.as_ref() == sub_id { + return Err(Error::RelayMessage(message.into_owned())); } } RelayMessage::Notice(message) => { diff --git a/crates/nostr-relay-pool/src/relay/mod.rs b/crates/nostr-relay-pool/src/relay/mod.rs index b38c3785d..5dc26d8fc 100644 --- a/crates/nostr-relay-pool/src/relay/mod.rs +++ b/crates/nostr-relay-pool/src/relay/mod.rs @@ -4,6 +4,7 @@ //! Relay +use std::borrow::Cow; use std::cmp; use std::collections::{HashMap, HashSet}; use std::sync::Arc; @@ -66,7 +67,7 @@ pub enum RelayNotification { /// Received a [`RelayMessage`]. Includes messages wrapping events that were sent by this client. Message { /// Relay Message - message: RelayMessage, + message: RelayMessage<'static>, }, /// Relay status changed RelayStatus { @@ -341,33 +342,32 @@ impl Relay { /// Send msg to relay #[inline] - pub fn send_msg(&self, msg: ClientMessage) -> Result<(), Error> { - self.batch_msg(vec![msg]) + pub fn send_msg(&self, msg: ClientMessage<'_>) -> Result<(), Error> { + self.inner.send_msg(msg) } /// Send multiple [`ClientMessage`] at once #[inline] - pub fn batch_msg(&self, msgs: Vec) -> Result<(), Error> { + pub fn batch_msg(&self, msgs: Vec>) -> Result<(), Error> { self.inner.batch_msg(msgs) } async fn _send_event( &self, notifications: &mut broadcast::Receiver, - event: Event, - ) -> Result<(EventId, bool, String), Error> { - let id: EventId = event.id; - + event: &Event, + ) -> Result<(bool, String), Error> { // Send message - // TODO: avoid clone - self.send_msg(ClientMessage::event(event))?; + self.inner + .send_msg(ClientMessage::Event(Cow::Borrowed(event)))?; // Wait for OK self.inner - .wait_for_ok(notifications, id, BATCH_EVENT_ITERATION_TIMEOUT) + .wait_for_ok(notifications, &event.id, BATCH_EVENT_ITERATION_TIMEOUT) .await } + // TODO: take reference here? /// Send event and wait for `OK` relay msg pub async fn send_event(&self, event: Event) -> Result { // Health, write permission and number of messages checks are executed in `batch_msg` method. @@ -376,12 +376,11 @@ impl Relay { let mut notifications = self.inner.internal_notification_sender.subscribe(); // Send event - let (event_id, status, message) = - self._send_event(&mut notifications, event.clone()).await?; + let (status, message) = self._send_event(&mut notifications, &event).await?; // Check status if status { - return Ok(event_id); + return Ok(event.id); } // If auth required, wait for authentication adn resend it @@ -394,12 +393,11 @@ impl Relay { .await?; // Try to resend event - let (event_id, status, message) = - self._send_event(&mut notifications, event).await?; + let (status, message) = self._send_event(&mut notifications, &event).await?; // Check status return if status { - Ok(event_id) + Ok(event.id) } else { Err(Error::RelayMessage(message)) }; @@ -478,7 +476,10 @@ impl Relay { opts: SubscribeOptions, ) -> Result<(), Error> { // Compose REQ message - let msg: ClientMessage = ClientMessage::req(id.clone(), filter.clone()); + let msg: ClientMessage = ClientMessage::Req { + subscription_id: Cow::Borrowed(&id), + filter: Cow::Borrowed(&filter), + }; // Check if auto-close condition is set match opts.auto_close { @@ -507,7 +508,7 @@ impl Relay { /// Unsubscribe #[inline] - pub async fn unsubscribe(&self, id: SubscriptionId) -> Result<(), Error> { + pub async fn unsubscribe(&self, id: &SubscriptionId) -> Result<(), Error> { self.inner.unsubscribe(id).await } @@ -552,8 +553,8 @@ impl Relay { }, .. } => { - if subscription_id == id { - callback(*event); + if subscription_id.as_ref() == &id { + callback(event.into_owned()); } } RelayNotification::SubscriptionAutoClosed { reason } => { @@ -616,7 +617,11 @@ impl Relay { /// Count events pub async fn count_events(&self, filter: Filter, timeout: Duration) -> Result { let id = SubscriptionId::generate(); - self.send_msg(ClientMessage::count(id.clone(), filter))?; + let msg = ClientMessage::Count { + subscription_id: Cow::Borrowed(&id), + filter: Cow::Owned(filter), + }; + self.inner.send_msg(msg)?; let mut count = 0; @@ -631,7 +636,7 @@ impl Relay { }, } = notification { - if subscription_id == id { + if subscription_id.as_ref() == &id { count = c; break; } @@ -642,7 +647,7 @@ impl Relay { .ok_or(Error::Timeout)?; // Unsubscribe - self.send_msg(ClientMessage::close(id))?; + self.inner.send_msg(ClientMessage::close(id))?; Ok(count) } @@ -677,7 +682,7 @@ impl Relay { match self .inner - .sync_new(filter.clone(), items.clone(), opts, &mut output) + .sync_new(&filter, items.clone(), opts, &mut output) .await { Ok(..) => {} @@ -685,7 +690,7 @@ impl Relay { Error::NegentropyNotSupported | Error::Negentropy(negentropy::Error::UnsupportedProtocolVersion) => { self.inner - .sync_deprecated(filter, items, opts, &mut output) + .sync_deprecated(&filter, items, opts, &mut output) .await?; } e => return Err(e), diff --git a/crates/nostr-sdk/src/client/mod.rs b/crates/nostr-sdk/src/client/mod.rs index bcec6e6e0..8148ab926 100644 --- a/crates/nostr-sdk/src/client/mod.rs +++ b/crates/nostr-sdk/src/client/mod.rs @@ -656,7 +656,7 @@ impl Client { /// Unsubscribe #[inline] - pub async fn unsubscribe(&self, id: SubscriptionId) { + pub async fn unsubscribe(&self, id: &SubscriptionId) { self.pool.unsubscribe(id).await; } @@ -861,7 +861,11 @@ impl Client { /// Send the client message to a **specific relays** #[inline] - pub async fn send_msg_to(&self, urls: I, msg: ClientMessage) -> Result, Error> + pub async fn send_msg_to( + &self, + urls: I, + msg: ClientMessage<'_>, + ) -> Result, Error> where I: IntoIterator, U: TryIntoUrl, @@ -875,7 +879,7 @@ impl Client { pub async fn batch_msg_to( &self, urls: I, - msgs: Vec, + msgs: Vec>, ) -> Result, Error> where I: IntoIterator, diff --git a/crates/nostr/src/message/client.rs b/crates/nostr/src/message/client.rs index d46340b5e..a76b66e22 100644 --- a/crates/nostr/src/message/client.rs +++ b/crates/nostr/src/message/client.rs @@ -5,7 +5,7 @@ //! Client messages -use alloc::boxed::Box; +use alloc::borrow::Cow; use alloc::string::{String, ToString}; use alloc::vec::Vec; @@ -16,23 +16,23 @@ use super::MessageHandleError; use crate::{Event, Filter, JsonUtil, SubscriptionId}; /// Messages sent by clients, received by relays -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum ClientMessage { +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum ClientMessage<'a> { /// Event - Event(Box), + Event(Cow<'a, Event>), /// Req Req { /// Subscription ID - subscription_id: SubscriptionId, + subscription_id: Cow<'a, SubscriptionId>, /// Filter - filter: Box, + filter: Cow<'a, Filter>, }, /// Multi-filter REQ (deprecated) /// /// ReqMultiFilter { /// Subscription ID - subscription_id: SubscriptionId, + subscription_id: Cow<'a, SubscriptionId>, /// Filters filters: Vec, }, @@ -41,72 +41,52 @@ pub enum ClientMessage { /// Count { /// Subscription ID - subscription_id: SubscriptionId, + subscription_id: Cow<'a, SubscriptionId>, /// Filter - filter: Box, + filter: Cow<'a, Filter>, }, /// Close - Close(SubscriptionId), + Close(Cow<'a, SubscriptionId>), /// Auth - Auth(Box), + Auth(Cow<'a, Event>), /// Negentropy Open NegOpen { /// Subscription ID - subscription_id: SubscriptionId, + subscription_id: Cow<'a, SubscriptionId>, /// Filter - filter: Box, + filter: Cow<'a, Filter>, /// ID size (deprecated) id_size: Option, /// Initial message (hex) - initial_message: String, + initial_message: Cow<'a, str>, }, /// Negentropy Message NegMsg { /// Subscription ID - subscription_id: SubscriptionId, + subscription_id: Cow<'a, SubscriptionId>, /// Message - message: String, + message: Cow<'a, str>, }, /// Negentropy Close NegClose { /// Subscription ID - subscription_id: SubscriptionId, + subscription_id: Cow<'a, SubscriptionId>, }, } -impl Serialize for ClientMessage { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - let json_value: Value = self.as_value(); - json_value.serialize(serializer) - } -} - -impl<'de> Deserialize<'de> for ClientMessage { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let json_value = Value::deserialize(deserializer)?; - ClientMessage::from_value(json_value).map_err(serde::de::Error::custom) - } -} - -impl ClientMessage { +impl ClientMessage<'_> { /// Create `EVENT` message #[inline] pub fn event(event: Event) -> Self { - Self::Event(Box::new(event)) + Self::Event(Cow::Owned(event)) } /// Create `REQ` message #[inline] pub fn req(subscription_id: SubscriptionId, filter: Filter) -> Self { Self::Req { - subscription_id, - filter: Box::new(filter), + subscription_id: Cow::Owned(subscription_id), + filter: Cow::Owned(filter), } } @@ -114,21 +94,21 @@ impl ClientMessage { #[inline] pub fn count(subscription_id: SubscriptionId, filter: Filter) -> Self { Self::Count { - subscription_id, - filter: Box::new(filter), + subscription_id: Cow::Owned(subscription_id), + filter: Cow::Owned(filter), } } /// Create new `CLOSE` message #[inline] pub fn close(subscription_id: SubscriptionId) -> Self { - Self::Close(subscription_id) + Self::Close(Cow::Owned(subscription_id)) } /// Create `AUTH` message #[inline] pub fn auth(event: Event) -> Self { - Self::Auth(Box::new(event)) + Self::Auth(Cow::Owned(event)) } /// Create new `NEG-OPEN` message @@ -138,10 +118,10 @@ impl ClientMessage { initial_message: String, ) -> Self { Self::NegOpen { - subscription_id, - filter: Box::new(filter), + subscription_id: Cow::Owned(subscription_id), + filter: Cow::Owned(filter), id_size: None, - initial_message, + initial_message: Cow::Owned(initial_message), } } @@ -261,7 +241,7 @@ impl ClientMessage { let subscription_id: SubscriptionId = serde_json::from_value(v[1].clone())?; let filters: Vec = serde_json::from_value(Value::Array(v[2..].to_vec()))?; return Ok(Self::ReqMultiFilter { - subscription_id, + subscription_id: Cow::Owned(subscription_id), filters, }); } else { @@ -311,12 +291,7 @@ impl ClientMessage { let subscription_id: SubscriptionId = serde_json::from_value(v[1].clone())?; let filter: Filter = Filter::from_json(v[2].to_string())?; let initial_message: String = serde_json::from_value(v[3].clone())?; - return Ok(Self::NegOpen { - subscription_id, - filter: Box::new(filter), - id_size: None, - initial_message, - }); + return Ok(Self::neg_open(subscription_id, filter, initial_message)); } // Old negentropy protocol message @@ -328,10 +303,10 @@ impl ClientMessage { .ok_or(MessageHandleError::InvalidMessageFormat)? as u8; let initial_message: String = serde_json::from_value(v[4].clone())?; return Ok(Self::NegOpen { - subscription_id, - filter: Box::new(filter), + subscription_id: Cow::Owned(subscription_id), + filter: Cow::Owned(filter), id_size: Some(id_size), - initial_message, + initial_message: Cow::Owned(initial_message), }); } @@ -345,8 +320,8 @@ impl ClientMessage { let subscription_id: SubscriptionId = serde_json::from_value(v[1].clone())?; let message: String = serde_json::from_value(v[2].clone())?; return Ok(Self::NegMsg { - subscription_id, - message, + subscription_id: Cow::Owned(subscription_id), + message: Cow::Owned(message), }); } else { return Err(MessageHandleError::InvalidMessageFormat); @@ -358,7 +333,9 @@ impl ClientMessage { if v[0] == "NEG-CLOSE" { if v_len >= 2 { let subscription_id: SubscriptionId = serde_json::from_value(v[1].clone())?; - return Ok(Self::NegClose { subscription_id }); + return Ok(Self::NegClose { + subscription_id: Cow::Owned(subscription_id), + }); } else { return Err(MessageHandleError::InvalidMessageFormat); } @@ -368,7 +345,27 @@ impl ClientMessage { } } -impl JsonUtil for ClientMessage { +impl Serialize for ClientMessage<'_> { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let json_value: Value = self.as_value(); + json_value.serialize(serializer) + } +} + +impl<'de> Deserialize<'de> for ClientMessage<'_> { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let json_value = Value::deserialize(deserializer)?; + ClientMessage::from_value(json_value).map_err(serde::de::Error::custom) + } +} + +impl JsonUtil for ClientMessage<'_> { type Err = MessageHandleError; /// Deserialize [`ClientMessage`] from JSON string diff --git a/crates/nostr/src/message/relay/mod.rs b/crates/nostr/src/message/relay/mod.rs index 1aab4200d..55ec0196c 100644 --- a/crates/nostr/src/message/relay/mod.rs +++ b/crates/nostr/src/message/relay/mod.rs @@ -5,7 +5,7 @@ //! Relay messages -use alloc::boxed::Box; +use alloc::borrow::Cow; use alloc::string::String; use core::fmt; @@ -95,7 +95,7 @@ impl MachineReadablePrefix { /// Messages sent by relays, received by clients #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub enum RelayMessage { +pub enum RelayMessage<'a> { /// Event /// /// Used to send events requested by clients. @@ -105,9 +105,9 @@ pub enum RelayMessage { /// Event { /// Subscription ID - subscription_id: SubscriptionId, + subscription_id: Cow<'a, SubscriptionId>, /// Event - event: Box, + event: Cow<'a, Event>, }, /// Ok /// @@ -122,7 +122,7 @@ pub enum RelayMessage { /// Status status: bool, /// Message - message: String, + message: Cow<'a, str>, }, /// End of stored events /// @@ -131,7 +131,7 @@ pub enum RelayMessage { /// JSON: `["EOSE", ]`. /// /// - EndOfStoredEvents(SubscriptionId), + EndOfStoredEvents(Cow<'a, SubscriptionId>), /// Notice /// /// Used to send human-readable error messages or other things to clients. @@ -139,7 +139,7 @@ pub enum RelayMessage { /// JSON: `["NOTICE", ]`. /// /// - Notice(String), + Notice(Cow<'a, str>), /// Closed /// /// Used to indicate that a subscription was ended on the server side. @@ -149,9 +149,9 @@ pub enum RelayMessage { /// Closed { /// Subscription ID - subscription_id: SubscriptionId, + subscription_id: Cow<'a, SubscriptionId>, /// Message - message: String, + message: Cow<'a, str>, }, /// Auth /// @@ -160,7 +160,7 @@ pub enum RelayMessage { /// Auth { /// Challenge - challenge: String, + challenge: Cow<'a, str>, }, /// Count /// @@ -169,53 +169,33 @@ pub enum RelayMessage { /// Count { /// Subscription ID - subscription_id: SubscriptionId, + subscription_id: Cow<'a, SubscriptionId>, /// Events count count: usize, }, /// Negentropy Message NegMsg { /// Subscription ID - subscription_id: SubscriptionId, + subscription_id: Cow<'a, SubscriptionId>, /// Message - message: String, + message: Cow<'a, str>, }, /// Negentropy Error NegErr { /// Subscription ID - subscription_id: SubscriptionId, + subscription_id: Cow<'a, SubscriptionId>, /// Error message - message: String, + message: Cow<'a, str>, }, } -impl Serialize for RelayMessage { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - let json_value: Value = self.as_value(); - json_value.serialize(serializer) - } -} - -impl<'de> Deserialize<'de> for RelayMessage { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let json_value = Value::deserialize(deserializer)?; - RelayMessage::from_value(json_value).map_err(serde::de::Error::custom) - } -} - -impl RelayMessage { +impl RelayMessage<'_> { /// Create `EVENT` message #[inline] pub fn event(subscription_id: SubscriptionId, event: Event) -> Self { Self::Event { - subscription_id, - event: Box::new(event), + subscription_id: Cow::Owned(subscription_id), + event: Cow::Owned(event), } } @@ -225,7 +205,7 @@ impl RelayMessage { where S: Into, { - Self::Notice(message.into()) + Self::Notice(Cow::Owned(message.into())) } /// Create `CLOSED` message @@ -235,15 +215,15 @@ impl RelayMessage { S: Into, { Self::Closed { - subscription_id, - message: message.into(), + subscription_id: Cow::Owned(subscription_id), + message: Cow::Owned(message.into()), } } /// Create `EOSE` message #[inline] pub fn eose(subscription_id: SubscriptionId) -> Self { - Self::EndOfStoredEvents(subscription_id) + Self::EndOfStoredEvents(Cow::Owned(subscription_id)) } /// Create `OK` message @@ -255,7 +235,7 @@ impl RelayMessage { Self::Ok { event_id, status, - message: message.into(), + message: Cow::Owned(message.into()), } } @@ -266,7 +246,7 @@ impl RelayMessage { S: Into, { Self::Auth { - challenge: challenge.into(), + challenge: Cow::Owned(challenge.into()), } } @@ -274,11 +254,18 @@ impl RelayMessage { #[inline] pub fn count(subscription_id: SubscriptionId, count: usize) -> Self { Self::Count { - subscription_id, + subscription_id: Cow::Owned(subscription_id), count, } } + /// Deserialize from [`Value`] + #[inline] + pub fn from_value(msg: Value) -> Result { + let raw = RawRelayMessage::from_value(msg)?; + RelayMessage::try_from(raw) + } + fn as_value(&self) -> Value { match self { Self::Event { @@ -313,16 +300,29 @@ impl RelayMessage { } => json!(["NEG-ERR", subscription_id, message]), } } +} - /// Deserialize from [`Value`] - #[inline] - pub fn from_value(msg: Value) -> Result { - let raw = RawRelayMessage::from_value(msg)?; - RelayMessage::try_from(raw) +impl Serialize for RelayMessage<'_> { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let json_value: Value = self.as_value(); + json_value.serialize(serializer) + } +} + +impl<'de> Deserialize<'de> for RelayMessage<'_> { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let json_value = Value::deserialize(deserializer)?; + RelayMessage::from_value(json_value).map_err(serde::de::Error::custom) } } -impl JsonUtil for RelayMessage { +impl JsonUtil for RelayMessage<'_> { type Err = MessageHandleError; /// Deserialize [`RelayMessage`] from JSON string @@ -343,7 +343,7 @@ impl JsonUtil for RelayMessage { } } -impl TryFrom for RelayMessage { +impl TryFrom for RelayMessage<'_> { type Error = MessageHandleError; fn try_from(raw: RawRelayMessage) -> Result { @@ -351,51 +351,41 @@ impl TryFrom for RelayMessage { RawRelayMessage::Event { subscription_id, event, - } => Ok(Self::Event { - subscription_id: SubscriptionId::new(subscription_id), - event: Box::new(event.try_into()?), - }), + } => Ok(Self::event( + SubscriptionId::new(subscription_id), + event.try_into()?, + )), RawRelayMessage::Ok { event_id, status, message, - } => Ok(Self::Ok { - event_id: EventId::from_hex(&event_id)?, - status, - message, - }), - RawRelayMessage::EndOfStoredEvents(subscription_id) => Ok(Self::EndOfStoredEvents( - SubscriptionId::new(subscription_id), - )), - RawRelayMessage::Notice(message) => Ok(Self::Notice(message)), + } => Ok(Self::ok(EventId::from_hex(&event_id)?, status, message)), + RawRelayMessage::EndOfStoredEvents(subscription_id) => { + Ok(Self::eose(SubscriptionId::new(subscription_id))) + } + RawRelayMessage::Notice(message) => Ok(Self::notice(message)), RawRelayMessage::Closed { subscription_id, message, - } => Ok(Self::Closed { - subscription_id: SubscriptionId::new(subscription_id), - message, - }), - RawRelayMessage::Auth { challenge } => Ok(Self::Auth { challenge }), + } => Ok(Self::closed(SubscriptionId::new(subscription_id), message)), + RawRelayMessage::Auth { challenge } => Ok(Self::auth(challenge)), RawRelayMessage::Count { subscription_id, count, - } => Ok(Self::Count { - subscription_id: SubscriptionId::new(subscription_id), - count, - }), + } => Ok(Self::count(SubscriptionId::new(subscription_id), count)), RawRelayMessage::NegMsg { subscription_id, message, } => Ok(Self::NegMsg { - subscription_id: SubscriptionId::new(subscription_id), - message, + subscription_id: Cow::Owned(SubscriptionId::new(subscription_id)), + message: Cow::Owned(message), }), RawRelayMessage::NegErr { subscription_id, message, } => Ok(Self::NegErr { - subscription_id: SubscriptionId::new(subscription_id), - message, + subscription_id: Cow::Owned(SubscriptionId::new(subscription_id)), + message: Cow::Owned(message), }), } }