From f7b5c0d9e79851114b47b5e725a61724870fa562 Mon Sep 17 00:00:00 2001 From: Yuki Kishimoto Date: Tue, 28 Jan 2025 16:00:56 +0100 Subject: [PATCH] nostr: take a single filter per `REQ` Ref https://github.com/nostr-protocol/nips/pull/1645 Closes https://github.com/rust-nostr/nostr/issues/568 Signed-off-by: Yuki Kishimoto --- bindings/nostr-sdk-ffi/src/client/mod.rs | 86 ++-- bindings/nostr-sdk-ffi/src/database/mod.rs | 18 +- .../src/protocol/message/client.rs | 40 +- bindings/nostr-sdk-ffi/src/relay/mod.rs | 57 +-- bindings/nostr-sdk-js/src/client/mod.rs | 48 +-- bindings/nostr-sdk-js/src/database/mod.rs | 19 +- bindings/nostr-sdk-js/src/protocol/filter.rs | 9 - .../src/protocol/message/client.rs | 11 +- bindings/nostr-sdk-js/src/relay/mod.rs | 24 +- crates/nostr-cli/src/main.rs | 2 +- crates/nostr-connect/src/client.rs | 4 +- crates/nostr-connect/src/signer.rs | 2 +- .../nostr-database/src/collections/events.rs | 38 +- crates/nostr-database/src/events/helper.rs | 187 ++++----- crates/nostr-database/src/events/mod.rs | 22 +- crates/nostr-database/src/memory.rs | 8 +- crates/nostr-indexeddb/src/lib.rs | 8 +- crates/nostr-lmdb/src/lib.rs | 43 +- crates/nostr-lmdb/src/store/lmdb/mod.rs | 20 +- crates/nostr-lmdb/src/store/mod.rs | 15 +- crates/nostr-ndb/src/lib.rs | 18 +- crates/nostr-relay-builder/examples/policy.rs | 7 +- crates/nostr-relay-builder/src/builder.rs | 2 +- crates/nostr-relay-builder/src/local/inner.rs | 17 +- .../nostr-relay-builder/src/local/session.rs | 2 +- crates/nostr-relay-pool/src/pool/inner.rs | 12 +- crates/nostr-relay-pool/src/pool/mod.rs | 82 ++-- crates/nostr-relay-pool/src/relay/error.rs | 3 - crates/nostr-relay-pool/src/relay/inner.rs | 34 +- crates/nostr-relay-pool/src/relay/mod.rs | 105 ++--- crates/nostr-sdk/examples/blacklist.rs | 4 +- crates/nostr-sdk/examples/bot.rs | 2 +- crates/nostr-sdk/examples/comment.rs | 2 +- crates/nostr-sdk/examples/fetch-events.rs | 6 +- crates/nostr-sdk/examples/gossip.rs | 4 +- crates/nostr-sdk/examples/stream-events.rs | 2 +- crates/nostr-sdk/examples/subscriptions.rs | 6 +- crates/nostr-sdk/examples/switch-account.rs | 4 +- crates/nostr-sdk/examples/whitelist.rs | 4 +- crates/nostr-sdk/src/client/mod.rs | 245 ++++------- crates/nostr-sdk/src/gossip/graph.rs | 393 +++++++----------- crates/nostr/src/message/client.rs | 91 ++-- crates/nwc/src/lib.rs | 6 +- 43 files changed, 679 insertions(+), 1033 deletions(-) diff --git a/bindings/nostr-sdk-ffi/src/client/mod.rs b/bindings/nostr-sdk-ffi/src/client/mod.rs index 822f14015..f68fb8a91 100644 --- a/bindings/nostr-sdk-ffi/src/client/mod.rs +++ b/bindings/nostr-sdk-ffi/src/client/mod.rs @@ -216,25 +216,20 @@ impl Client { self.inner.disconnect().await } - pub async fn subscriptions(&self) -> HashMap>> { + pub async fn subscriptions(&self) -> HashMap> { self.inner .subscriptions() .await .into_iter() - .map(|(id, filters)| { - ( - id.to_string(), - filters.into_iter().map(|f| Arc::new(f.into())).collect(), - ) - }) + .map(|(id, f)| (id.to_string(), Arc::new(f.into()))) .collect() } - pub async fn subscription(&self, id: String) -> Option>> { + pub async fn subscription(&self, id: String) -> Option> { self.inner .subscription(&SubscriptionId::new(id)) .await - .map(|filters| filters.into_iter().map(|f| Arc::new(f.into())).collect()) + .map(|f| Arc::new(f.into())) } /// Subscribe to filters @@ -248,16 +243,12 @@ impl Client { #[uniffi::method(default(opts = None))] pub async fn subscribe( &self, - filters: Vec>, + filter: &Filter, opts: Option>, ) -> Result { - let filters = filters - .into_iter() - .map(|f| f.as_ref().deref().clone()) - .collect(); Ok(self .inner - .subscribe(filters, opts.map(|o| **o)) + .subscribe(filter.deref().clone(), opts.map(|o| **o)) .await? .into()) } @@ -274,16 +265,16 @@ impl Client { pub async fn subscribe_with_id( &self, id: String, - filters: Vec>, + filter: &Filter, opts: Option>, ) -> Result { - let filters = filters - .into_iter() - .map(|f| f.as_ref().deref().clone()) - .collect(); Ok(self .inner - .subscribe_with_id(SubscriptionId::new(id), filters, opts.map(|o| **o)) + .subscribe_with_id( + SubscriptionId::new(id), + filter.deref().clone(), + opts.map(|o| **o), + ) .await? .into()) } @@ -297,16 +288,12 @@ impl Client { pub async fn subscribe_to( &self, urls: Vec, - filters: Vec>, + filter: &Filter, opts: Option>, ) -> Result { - let filters = filters - .into_iter() - .map(|f| f.as_ref().deref().clone()) - .collect(); Ok(self .inner - .subscribe_to(urls, filters, opts.map(|o| **o)) + .subscribe_to(urls, filter.deref().clone(), opts.map(|o| **o)) .await? .into()) } @@ -321,16 +308,17 @@ impl Client { &self, urls: Vec, id: String, - filters: Vec>, + filter: &Filter, opts: Option>, ) -> Result { - let filters = filters - .into_iter() - .map(|f| f.as_ref().deref().clone()) - .collect(); Ok(self .inner - .subscribe_with_id_to(urls, SubscriptionId::new(id), filters, opts.map(|o| **o)) + .subscribe_with_id_to( + urls, + SubscriptionId::new(id), + filter.deref().clone(), + opts.map(|o| **o), + ) .await? .into()) } @@ -363,32 +351,24 @@ impl Client { /// /// If `gossip` is enabled (see `Options`) the events will be requested also to /// NIP65 relays (automatically discovered) of public keys included in filters (if any). - pub async fn fetch_events( - &self, - filters: Vec>, - timeout: Duration, - ) -> Result { - let filters = filters - .into_iter() - .map(|f| f.as_ref().deref().clone()) - .collect(); - Ok(self.inner.fetch_events(filters, timeout).await?.into()) + pub async fn fetch_events(&self, filter: &Filter, timeout: Duration) -> Result { + Ok(self + .inner + .fetch_events(filter.deref().clone(), timeout) + .await? + .into()) } /// Fetch events from specific relays pub async fn fetch_events_from( &self, urls: Vec, - filters: Vec>, + filter: &Filter, timeout: Duration, ) -> Result { - let filters = filters - .into_iter() - .map(|f| f.as_ref().deref().clone()) - .collect(); Ok(self .inner - .fetch_events_from(urls, filters, timeout) + .fetch_events_from(urls, filter.deref().clone(), timeout) .await? .into()) } @@ -407,16 +387,12 @@ impl Client { /// NIP65 relays (automatically discovered) of public keys included in filters (if any). pub async fn fetch_combined_events( &self, - filters: Vec>, + filter: &Filter, timeout: Duration, ) -> Result { - let filters = filters - .into_iter() - .map(|f| f.as_ref().deref().clone()) - .collect(); Ok(self .inner - .fetch_combined_events(filters, timeout) + .fetch_combined_events(filter.deref().clone(), timeout) .await? .into()) } diff --git a/bindings/nostr-sdk-ffi/src/database/mod.rs b/bindings/nostr-sdk-ffi/src/database/mod.rs index 46ce5a07e..f64d6aeec 100644 --- a/bindings/nostr-sdk-ffi/src/database/mod.rs +++ b/bindings/nostr-sdk-ffi/src/database/mod.rs @@ -136,20 +136,14 @@ impl NostrDatabase { .map(|e| Arc::new(e.into()))) } - pub async fn count(&self, filters: Vec>) -> Result { - let filters = filters - .into_iter() - .map(|f| f.as_ref().deref().clone()) - .collect(); - Ok(self.inner.count(filters).await? as u64) + pub async fn count(&self, filter: &Filter) -> Result { + Ok(self.inner.count(filter.deref().clone()).await? as u64) } - pub async fn query(&self, filters: Vec>) -> Result> { - let filters = filters - .into_iter() - .map(|f| f.as_ref().deref().clone()) - .collect(); - Ok(Arc::new(self.inner.query(filters).await?.into())) + pub async fn query(&self, filter: &Filter) -> Result> { + Ok(Arc::new( + self.inner.query(filter.deref().clone()).await?.into(), + )) } /// Delete all events that match the `Filter` diff --git a/bindings/nostr-sdk-ffi/src/protocol/message/client.rs b/bindings/nostr-sdk-ffi/src/protocol/message/client.rs index f300c4872..f24184f0d 100644 --- a/bindings/nostr-sdk-ffi/src/protocol/message/client.rs +++ b/bindings/nostr-sdk-ffi/src/protocol/message/client.rs @@ -20,11 +20,11 @@ pub enum ClientMessageEnum { }, Req { subscription_id: String, - filters: Vec>, + filter: Arc, }, Count { subscription_id: String, - filters: Vec>, + filter: Arc, }, Close { subscription_id: String, @@ -59,23 +59,17 @@ impl From for nostr::ClientMessage { } ClientMessageEnum::Req { subscription_id, - filters, + filter, } => Self::Req { subscription_id: SubscriptionId::new(subscription_id), - filters: filters - .into_iter() - .map(|f| f.as_ref().deref().clone()) - .collect(), + filter: Box::new(filter.as_ref().deref().clone()), }, ClientMessageEnum::Count { subscription_id, - filters, + filter, } => Self::Count { subscription_id: SubscriptionId::new(subscription_id), - filters: filters - .into_iter() - .map(|f| f.as_ref().deref().clone()) - .collect(), + filter: Box::new(filter.as_ref().deref().clone()), }, ClientMessageEnum::Close { subscription_id } => { Self::Close(SubscriptionId::new(subscription_id)) @@ -116,17 +110,17 @@ impl From for ClientMessageEnum { }, nostr::ClientMessage::Req { subscription_id, - filters, + filter, } => Self::Req { subscription_id: subscription_id.to_string(), - filters: filters.into_iter().map(|f| Arc::new(f.into())).collect(), + filter: Arc::new((*filter).into()), }, nostr::ClientMessage::Count { subscription_id, - filters, + filter, } => Self::Count { subscription_id: subscription_id.to_string(), - filters: filters.into_iter().map(|f| Arc::new(f.into())).collect(), + filter: Arc::new((*filter).into()), }, nostr::ClientMessage::Close(subscription_id) => Self::Close { subscription_id: subscription_id.to_string(), @@ -191,28 +185,22 @@ impl ClientMessage { /// Create new `REQ` message #[uniffi::constructor] - pub fn req(subscription_id: &str, filters: Vec>) -> Self { + pub fn req(subscription_id: &str, filter: &Filter) -> Self { Self { inner: nostr::ClientMessage::req( SubscriptionId::new(subscription_id), - filters - .into_iter() - .map(|f| f.as_ref().deref().clone()) - .collect(), + filter.deref().clone(), ), } } /// Create new `COUNT` message #[uniffi::constructor] - pub fn count(subscription_id: &str, filters: Vec>) -> Self { + pub fn count(subscription_id: &str, filter: &Filter) -> Self { Self { inner: nostr::ClientMessage::count( SubscriptionId::new(subscription_id), - filters - .into_iter() - .map(|f| f.as_ref().deref().clone()) - .collect(), + filter.deref().clone(), ), } } diff --git a/bindings/nostr-sdk-ffi/src/relay/mod.rs b/bindings/nostr-sdk-ffi/src/relay/mod.rs index 0fcf7ed78..5ed171a74 100644 --- a/bindings/nostr-sdk-ffi/src/relay/mod.rs +++ b/bindings/nostr-sdk-ffi/src/relay/mod.rs @@ -165,27 +165,22 @@ impl Relay { self.inner.document().await.into() } - pub async fn subscriptions(&self) -> HashMap>> { + pub async fn subscriptions(&self) -> HashMap> { self.inner .subscriptions() .await .into_iter() - .map(|(id, filters)| { - ( - id.to_string(), - filters.into_iter().map(|f| Arc::new(f.into())).collect(), - ) - }) + .map(|(id, f)| (id.to_string(), Arc::new(f.into()))) .collect() } /// Get filters by subscription ID - pub async fn subscription(&self, id: String) -> Option>> { + pub async fn subscription(&self, id: String) -> Option> { let id = SubscriptionId::new(id); self.inner .subscription(&id) .await - .map(|f| f.into_iter().map(|f| Arc::new(f.into())).collect()) + .map(|f| Arc::new(f.into())) } pub fn opts(&self) -> RelayOptions { @@ -254,20 +249,10 @@ impl Relay { /// It's possible to automatically close a subscription by configuring the `SubscribeOptions`. /// /// Note: auto-closing subscriptions aren't saved in subscriptions map! - pub async fn subscribe( - &self, - filters: Vec>, - opts: &SubscribeOptions, - ) -> Result { + pub async fn subscribe(&self, filter: &Filter, opts: &SubscribeOptions) -> Result { Ok(self .inner - .subscribe( - filters - .into_iter() - .map(|f| f.as_ref().deref().clone()) - .collect(), - **opts, - ) + .subscribe(filter.deref().clone(), **opts) .await? .to_string()) } @@ -282,19 +267,12 @@ impl Relay { pub async fn subscribe_with_id( &self, id: String, - filters: Vec>, + filter: &Filter, opts: &SubscribeOptions, ) -> Result<()> { Ok(self .inner - .subscribe_with_id( - SubscriptionId::new(id), - filters - .into_iter() - .map(|f| f.as_ref().deref().clone()) - .collect(), - **opts, - ) + .subscribe_with_id(SubscriptionId::new(id), filter.deref().clone(), **opts) .await?) } @@ -311,28 +289,23 @@ impl Relay { /// Fetch events pub async fn fetch_events( &self, - filters: Vec>, + filter: &Filter, timeout: Duration, policy: ReqExitPolicy, ) -> Result { - let filters = filters - .into_iter() - .map(|f| f.as_ref().deref().clone()) - .collect(); Ok(self .inner - .fetch_events(filters, timeout, policy.into()) + .fetch_events(filter.deref().clone(), timeout, policy.into()) .await? .into()) } /// Count events - pub async fn count_events(&self, filters: Vec>, timeout: Duration) -> Result { - let filters = filters - .into_iter() - .map(|f| f.as_ref().deref().clone()) - .collect(); - Ok(self.inner.count_events(filters, timeout).await? as u64) + pub async fn count_events(&self, filter: &Filter, timeout: Duration) -> Result { + Ok(self + .inner + .count_events(filter.deref().clone(), timeout) + .await? as u64) } /// Sync events with relays (negentropy reconciliation) diff --git a/bindings/nostr-sdk-js/src/client/mod.rs b/bindings/nostr-sdk-js/src/client/mod.rs index 2fba731e5..99ed4ec15 100644 --- a/bindings/nostr-sdk-js/src/client/mod.rs +++ b/bindings/nostr-sdk-js/src/client/mod.rs @@ -259,12 +259,11 @@ impl JsClient { /// It's possible to automatically close a subscription by configuring the `SubscribeAutoCloseOptions`. pub async fn subscribe( &self, - filters: Vec, + filter: &JsFilter, opts: Option, ) -> Result { - let filters: Vec = filters.into_iter().map(|f| f.into()).collect(); self.inner - .subscribe(filters, opts.map(|o| *o)) + .subscribe(filter.deref().clone(), opts.map(|o| *o)) .await .map_err(into_err) .map(|o| o.into()) @@ -282,12 +281,15 @@ impl JsClient { pub async fn subscribe_with_id( &self, id: &str, - filters: Vec, + filter: &JsFilter, opts: Option, ) -> Result { - let filters: Vec = filters.into_iter().map(|f| f.into()).collect(); self.inner - .subscribe_with_id(SubscriptionId::new(id), filters, opts.map(|o| *o)) + .subscribe_with_id( + SubscriptionId::new(id), + filter.deref().clone(), + opts.map(|o| *o), + ) .await .map_err(into_err) .map(|o| o.into()) @@ -302,12 +304,11 @@ impl JsClient { pub async fn subscribe_to( &self, urls: Vec, - filters: Vec, + filter: &JsFilter, opts: Option, ) -> Result { - let filters: Vec = filters.into_iter().map(|f| f.into()).collect(); self.inner - .subscribe_to(urls, filters, opts.map(|o| *o)) + .subscribe_to(urls, filter.deref().clone(), opts.map(|o| *o)) .await .map_err(into_err) .map(|o| o.into()) @@ -323,12 +324,16 @@ impl JsClient { &self, urls: Vec, id: &str, - filters: Vec, + filter: &JsFilter, opts: Option, ) -> Result { - let filters: Vec = filters.into_iter().map(|f| f.into()).collect(); self.inner - .subscribe_with_id_to(urls, SubscriptionId::new(id), filters, opts.map(|o| *o)) + .subscribe_with_id_to( + urls, + SubscriptionId::new(id), + filter.deref().clone(), + opts.map(|o| *o), + ) .await .map_err(into_err) .map(|o| o.into()) @@ -370,15 +375,10 @@ impl JsClient { /// If `gossip` is enabled (see `Options`) the events will be requested also to /// NIP65 relays (automatically discovered) of public keys included in filters (if any). #[wasm_bindgen(js_name = fetchEvents)] - pub async fn fetch_events( - &self, - filters: Vec, - timeout: &JsDuration, - ) -> Result { - let filters: Vec = filters.into_iter().map(|f| f.into()).collect(); + pub async fn fetch_events(&self, filter: &JsFilter, timeout: &JsDuration) -> Result { let events: Events = self .inner - .fetch_events(filters, **timeout) + .fetch_events(filter.deref().clone(), **timeout) .await .map_err(into_err)?; Ok(events.into()) @@ -389,13 +389,12 @@ impl JsClient { pub async fn fetch_events_from( &self, urls: Vec, - filters: Vec, + filter: &JsFilter, timeout: &JsDuration, ) -> Result { - let filters: Vec = filters.into_iter().map(|f| f.into()).collect(); let events: Events = self .inner - .fetch_events_from(urls, filters, **timeout) + .fetch_events_from(urls, filter.deref().clone(), **timeout) .await .map_err(into_err)?; Ok(events.into()) @@ -416,13 +415,12 @@ impl JsClient { #[wasm_bindgen(js_name = fetchCombinedEvents)] pub async fn fetch_combined_events( &self, - filters: Vec, + filter: &JsFilter, timeout: &JsDuration, ) -> Result { - let filters: Vec = filters.into_iter().map(|f| f.into()).collect(); let events: Events = self .inner - .fetch_combined_events(filters, **timeout) + .fetch_combined_events(filter.deref().clone(), **timeout) .await .map_err(into_err)?; Ok(events.into()) diff --git a/bindings/nostr-sdk-js/src/database/mod.rs b/bindings/nostr-sdk-js/src/database/mod.rs index 9f1e084b1..4c7724142 100644 --- a/bindings/nostr-sdk-js/src/database/mod.rs +++ b/bindings/nostr-sdk-js/src/database/mod.rs @@ -113,14 +113,21 @@ impl JsNostrDatabase { .map(|e| e.into())) } - pub async fn count(&self, filters: Vec) -> Result { - let filters = filters.into_iter().map(|f| f.into()).collect(); - Ok(self.inner.count(filters).await.map_err(into_err)? as u64) + pub async fn count(&self, filter: &JsFilter) -> Result { + Ok(self + .inner + .count(filter.deref().clone()) + .await + .map_err(into_err)? as u64) } - pub async fn query(&self, filters: Vec) -> Result { - let filters = filters.into_iter().map(|f| f.into()).collect(); - Ok(self.inner.query(filters).await.map_err(into_err)?.into()) + pub async fn query(&self, filter: &JsFilter) -> Result { + Ok(self + .inner + .query(filter.deref().clone()) + .await + .map_err(into_err)? + .into()) } /// Wipe all data diff --git a/bindings/nostr-sdk-js/src/protocol/filter.rs b/bindings/nostr-sdk-js/src/protocol/filter.rs index c89e8c35e..b34b9cf73 100644 --- a/bindings/nostr-sdk-js/src/protocol/filter.rs +++ b/bindings/nostr-sdk-js/src/protocol/filter.rs @@ -182,15 +182,6 @@ impl JsFilter { } } - /// Clone object - #[inline] - #[allow(clippy::should_implement_trait)] - pub fn clone(&self) -> Self { - Self { - inner: self.inner.clone(), - } - } - #[wasm_bindgen(js_name = fromJson)] pub fn from_json(json: &str) -> Result { Ok(Self { diff --git a/bindings/nostr-sdk-js/src/protocol/message/client.rs b/bindings/nostr-sdk-js/src/protocol/message/client.rs index 2105d51c4..af6c63219 100644 --- a/bindings/nostr-sdk-js/src/protocol/message/client.rs +++ b/bindings/nostr-sdk-js/src/protocol/message/client.rs @@ -40,21 +40,18 @@ impl JsClientMessage { } /// Create new `REQ` message - pub fn req(subscription_id: &str, filters: Vec) -> Self { + pub fn req(subscription_id: &str, filter: &JsFilter) -> Self { Self { - inner: ClientMessage::req( - SubscriptionId::new(subscription_id), - filters.into_iter().map(|f| f.into()).collect(), - ), + inner: ClientMessage::req(SubscriptionId::new(subscription_id), filter.deref().clone()), } } /// Create new `COUNT` message - pub fn count(subscription_id: &str, filters: Vec) -> Self { + pub fn count(subscription_id: &str, filter: &JsFilter) -> Self { Self { inner: ClientMessage::count( SubscriptionId::new(subscription_id), - filters.into_iter().map(|f| f.into()).collect(), + filter.deref().clone(), ), } } diff --git a/bindings/nostr-sdk-js/src/relay/mod.rs b/bindings/nostr-sdk-js/src/relay/mod.rs index c24340fbf..0095f0912 100644 --- a/bindings/nostr-sdk-js/src/relay/mod.rs +++ b/bindings/nostr-sdk-js/src/relay/mod.rs @@ -209,15 +209,10 @@ impl JsRelay { /// ### Auto-closing subscription /// /// It's possible to automatically close a subscription by configuring the `SubscribeOptions`. - pub async fn subscribe( - &self, - filters: Vec, - opts: &JsSubscribeOptions, - ) -> Result { - let filters: Vec = filters.into_iter().map(|f| f.into()).collect(); + pub async fn subscribe(&self, filter: &JsFilter, opts: &JsSubscribeOptions) -> Result { Ok(self .inner - .subscribe(filters, **opts) // TODO: allow to pass opts as reference + .subscribe(filter.deref().clone(), **opts) // TODO: allow to pass opts as reference .await .map_err(into_err)? .to_string()) @@ -232,12 +227,11 @@ impl JsRelay { pub async fn subscribe_with_id( &self, id: &str, - filters: Vec, + filter: &JsFilter, opts: &JsSubscribeOptions, ) -> Result<()> { - let filters: Vec = filters.into_iter().map(|f| f.into()).collect(); self.inner - .subscribe_with_id(SubscriptionId::new(id), filters, **opts) // TODO: allow to pass opts as reference + .subscribe_with_id(SubscriptionId::new(id), filter.deref().clone(), **opts) // TODO: allow to pass opts as reference .await .map_err(into_err) } @@ -260,14 +254,13 @@ impl JsRelay { #[wasm_bindgen(js_name = fetchEvents)] pub async fn fetch_events( &self, - filters: Vec, + filter: &JsFilter, timeout: &JsDuration, policy: &JsReqExitPolicy, ) -> Result { - let filters: Vec = filters.into_iter().map(|f| f.into()).collect(); Ok(self .inner - .fetch_events(filters, **timeout, **policy) + .fetch_events(filter.deref().clone(), **timeout, **policy) .await .map_err(into_err)? .into()) @@ -275,11 +268,10 @@ impl JsRelay { /// Count events #[wasm_bindgen(js_name = countEvents)] - pub async fn count_events(&self, filters: Vec, timeout: &JsDuration) -> Result { - let filters: Vec = filters.into_iter().map(|f| f.into()).collect(); + pub async fn count_events(&self, filter: &JsFilter, timeout: &JsDuration) -> Result { Ok(self .inner - .count_events(filters, **timeout) + .count_events(filter.deref().clone(), **timeout) .await .map_err(into_err)? as u64) } diff --git a/crates/nostr-cli/src/main.rs b/crates/nostr-cli/src/main.rs index dcebe8e4e..693771b2f 100644 --- a/crates/nostr-cli/src/main.rs +++ b/crates/nostr-cli/src/main.rs @@ -283,7 +283,7 @@ async fn handle_command(command: ShellCommand, client: &Client) -> Result<()> { } else if database { // Query database let now = Instant::now(); - let events = db.query(vec![filter]).await?; + let events = db.query(filter).await?; let duration = now.elapsed(); println!( diff --git a/crates/nostr-connect/src/client.rs b/crates/nostr-connect/src/client.rs index c46c76d47..a65e2c68a 100644 --- a/crates/nostr-connect/src/client.rs +++ b/crates/nostr-connect/src/client.rs @@ -95,7 +95,7 @@ impl NostrConnect { /// connect.auth_url_handler(MyAuthUrlHandler); /// /// // ... - /// + /// /// Ok(()) /// } /// ``` @@ -178,7 +178,7 @@ impl NostrConnect { // Subscribe self.pool - .subscribe(vec![filter], SubscribeOptions::default()) + .subscribe(filter, SubscribeOptions::default()) .await?; Ok(notifications) diff --git a/crates/nostr-connect/src/signer.rs b/crates/nostr-connect/src/signer.rs index 492a3c927..aaf196aa4 100644 --- a/crates/nostr-connect/src/signer.rs +++ b/crates/nostr-connect/src/signer.rs @@ -138,7 +138,7 @@ impl NostrConnectRemoteSigner { // Subscribe self.pool - .subscribe(vec![filter], SubscribeOptions::default()) + .subscribe(filter, SubscribeOptions::default()) .await?; // Mark as bootstrapped diff --git a/crates/nostr-database/src/collections/events.rs b/crates/nostr-database/src/collections/events.rs index 91bf51cc3..fcece8cee 100644 --- a/crates/nostr-database/src/collections/events.rs +++ b/crates/nostr-database/src/collections/events.rs @@ -32,18 +32,12 @@ impl Eq for Events {} impl Events { /// New collection #[inline] - pub fn new(filters: &[Filter]) -> Self { - // Check how many filters are passed and return the limit - let limit: Option = match (filters.len(), filters.first()) { - (1, Some(filter)) => filter.limit, - _ => None, - }; - + pub fn new(filter: &Filter) -> Self { let mut hasher = DefaultHasher::new(); - filters.hash(&mut hasher); + filter.hash(&mut hasher); let hash: u64 = hasher.finish(); - let set: BTreeCappedSet = match limit { + let set: BTreeCappedSet = match filter.limit { Some(limit) => BTreeCappedSet::bounded_with_policy(limit, POLICY), None => BTreeCappedSet::unbounded(), }; @@ -172,11 +166,11 @@ mod tests { // Match { let event1 = Event::from_json(r#"{"content":"Kind 10050 is for DMs, kind 10002 for the other stuff. But both have the same aim. So IMO both have to be under the `gossip` option.","created_at":1732738371,"id":"f2d71a515ce3576d238aaaeaa48fde97388162d08208f729b540a4c3f9723e6b","kind":1,"pubkey":"68d81165918100b7da43fc28f7d1fc12554466e1115886b9e7bb326f65ec4272","sig":"d88d3ac21036cfb541809288c12844747dbf1d20a246133dbd37374254b281808c5582bade27c880477759491b2b964d7235142c8b80d233dfb9ae8a50252119","tags":[["e","8262a50cf7832351ae3f21c429e111bb31be0cf754ec437e015534bf5cc2eee8","","root"],["e","0f4bcc83ef2af2febbc7eb9aea5d615a29084ed9e65c467ef2a9387ff79b57e8"],["e","94469431e367b2c16e6d224a4ac2c369c18718a1abdf42759ff591d9816b5ff3","","reply"],["p","68d81165918100b7da43fc28f7d1fc12554466e1115886b9e7bb326f65ec4272"],["p","1739d937dc8c0c7370aa27585938c119e25c41f6c441a5d34c6d38503e3136ef"],["p","03f9cfd948e95aeb04f780382344f7c1cfc0210d9af3f4006bb6d451c7b08692"],["p","126103bfddc8df256b6e0abfd7f3797c80dcc4ea88f7c2f87dd4104220b4d65f"],["p","13a665157257e79d9dcc960deeb367fd79383be2d0babb3d861679a5701d463b"],["p","ee0d20b47fb298e8a9ed3609108fe7f2296bd71e8b82fb4f9ff8f61f62bbc7a6"],["p","1c71312fb45273956b078e27981dcc15b178db8d55bffd7ad57a8cfaed6b5ab4"],["p","800e0fe3d8638ce3f75a56ed865df9d96fc9d9cd2f75550df0d7f5c1d8468b0b"]]}"#).unwrap(); - let mut events1 = Events::new(&[Filter::new().kind(Kind::TextNote).limit(1)]); + let mut events1 = Events::new(&Filter::new().kind(Kind::TextNote).limit(1)); events1.insert(event1); let event2 = Event::from_json(r#"{"content":"Kind 10050 is for DMs, kind 10002 for the other stuff. But both have the same aim. So IMO both have to be under the `gossip` option.","created_at":1732738371,"id":"f2d71a515ce3576d238aaaeaa48fde97388162d08208f729b540a4c3f9723e6b","kind":1,"pubkey":"68d81165918100b7da43fc28f7d1fc12554466e1115886b9e7bb326f65ec4272","sig":"d88d3ac21036cfb541809288c12844747dbf1d20a246133dbd37374254b281808c5582bade27c880477759491b2b964d7235142c8b80d233dfb9ae8a50252119","tags":[["e","8262a50cf7832351ae3f21c429e111bb31be0cf754ec437e015534bf5cc2eee8","","root"],["e","0f4bcc83ef2af2febbc7eb9aea5d615a29084ed9e65c467ef2a9387ff79b57e8"],["e","94469431e367b2c16e6d224a4ac2c369c18718a1abdf42759ff591d9816b5ff3","","reply"],["p","68d81165918100b7da43fc28f7d1fc12554466e1115886b9e7bb326f65ec4272"],["p","1739d937dc8c0c7370aa27585938c119e25c41f6c441a5d34c6d38503e3136ef"],["p","03f9cfd948e95aeb04f780382344f7c1cfc0210d9af3f4006bb6d451c7b08692"],["p","126103bfddc8df256b6e0abfd7f3797c80dcc4ea88f7c2f87dd4104220b4d65f"],["p","13a665157257e79d9dcc960deeb367fd79383be2d0babb3d861679a5701d463b"],["p","ee0d20b47fb298e8a9ed3609108fe7f2296bd71e8b82fb4f9ff8f61f62bbc7a6"],["p","1c71312fb45273956b078e27981dcc15b178db8d55bffd7ad57a8cfaed6b5ab4"],["p","800e0fe3d8638ce3f75a56ed865df9d96fc9d9cd2f75550df0d7f5c1d8468b0b"]]}"#).unwrap(); - let mut events2 = Events::new(&[Filter::new().kind(Kind::TextNote).limit(2)]); // Different filter from above + let mut events2 = Events::new(&Filter::new().kind(Kind::TextNote).limit(2)); // Different filter from above events2.insert(event2); assert_eq!(events1, events2); @@ -185,11 +179,11 @@ mod tests { // NOT match { let event1 = Event::from_json(r#"{"content":"Kind 10050 is for DMs, kind 10002 for the other stuff. But both have the same aim. So IMO both have to be under the `gossip` option.","created_at":1732738371,"id":"f2d71a515ce3576d238aaaeaa48fde97388162d08208f729b540a4c3f9723e6b","kind":1,"pubkey":"68d81165918100b7da43fc28f7d1fc12554466e1115886b9e7bb326f65ec4272","sig":"d88d3ac21036cfb541809288c12844747dbf1d20a246133dbd37374254b281808c5582bade27c880477759491b2b964d7235142c8b80d233dfb9ae8a50252119","tags":[["e","8262a50cf7832351ae3f21c429e111bb31be0cf754ec437e015534bf5cc2eee8","","root"],["e","0f4bcc83ef2af2febbc7eb9aea5d615a29084ed9e65c467ef2a9387ff79b57e8"],["e","94469431e367b2c16e6d224a4ac2c369c18718a1abdf42759ff591d9816b5ff3","","reply"],["p","68d81165918100b7da43fc28f7d1fc12554466e1115886b9e7bb326f65ec4272"],["p","1739d937dc8c0c7370aa27585938c119e25c41f6c441a5d34c6d38503e3136ef"],["p","03f9cfd948e95aeb04f780382344f7c1cfc0210d9af3f4006bb6d451c7b08692"],["p","126103bfddc8df256b6e0abfd7f3797c80dcc4ea88f7c2f87dd4104220b4d65f"],["p","13a665157257e79d9dcc960deeb367fd79383be2d0babb3d861679a5701d463b"],["p","ee0d20b47fb298e8a9ed3609108fe7f2296bd71e8b82fb4f9ff8f61f62bbc7a6"],["p","1c71312fb45273956b078e27981dcc15b178db8d55bffd7ad57a8cfaed6b5ab4"],["p","800e0fe3d8638ce3f75a56ed865df9d96fc9d9cd2f75550df0d7f5c1d8468b0b"]]}"#).unwrap(); - let mut events1 = Events::new(&[Filter::new().kind(Kind::TextNote).limit(1)]); + let mut events1 = Events::new(&Filter::new().kind(Kind::TextNote).limit(1)); events1.insert(event1); let event2 = Event::from_json(r#"{"content":"Thank you !","created_at":1732738224,"id":"035a18ba52a9b40137c0c60ed955eb1f1f93e12423082f6d8a83f62726462d21","kind":1,"pubkey":"1c71312fb45273956b078e27981dcc15b178db8d55bffd7ad57a8cfaed6b5ab4","sig":"54921c7a4f972428c67267a0d99df7d5094c7ca4d26fe9c08221de88ffafb0cab347939ff77129ecfdebad6b18cd2c4c229bf67ce8914fe778d24e19bc22be43","tags":[["p","68d81165918100b7da43fc28f7d1fc12554466e1115886b9e7bb326f65ec4272"],["p","1739d937dc8c0c7370aa27585938c119e25c41f6c441a5d34c6d38503e3136ef"],["p","03f9cfd948e95aeb04f780382344f7c1cfc0210d9af3f4006bb6d451c7b08692"],["p","126103bfddc8df256b6e0abfd7f3797c80dcc4ea88f7c2f87dd4104220b4d65f"],["p","13a665157257e79d9dcc960deeb367fd79383be2d0babb3d861679a5701d463b"],["p","ee0d20b47fb298e8a9ed3609108fe7f2296bd71e8b82fb4f9ff8f61f62bbc7a6"],["e","8262a50cf7832351ae3f21c429e111bb31be0cf754ec437e015534bf5cc2eee8","wss://nos.lol/","root"],["e","670303f9cbb24568c705b545c277be1f5172ad84795cc9e700aeea5bb248fd74","wss://n.ok0.org/","reply"]]}"#).unwrap(); - let mut events2 = Events::new(&[Filter::new().kind(Kind::TextNote).limit(2)]); // Different filter from above + let mut events2 = Events::new(&Filter::new().kind(Kind::TextNote).limit(2)); // Different filter from above events2.insert(event2); assert_ne!(events1, events2); @@ -199,9 +193,9 @@ mod tests { #[test] fn test_merge() { // Same filter - let filters = vec![Filter::new().kind(Kind::TextNote).limit(100)]; + let filter = Filter::new().kind(Kind::TextNote).limit(100); - let events1 = Events::new(&filters); + let events1 = Events::new(&filter); assert_eq!( events1.set.capacity(), Capacity::Bounded { @@ -210,7 +204,7 @@ mod tests { } ); - let events2 = Events::new(&filters); + let events2 = Events::new(&filter); assert_eq!( events2.set.capacity(), Capacity::Bounded { @@ -235,11 +229,11 @@ mod tests { ); // Different filters - let filters1 = vec![Filter::new().kind(Kind::TextNote).limit(100)]; - let filters2 = vec![Filter::new().kind(Kind::Metadata).limit(10)]; - let filters3 = vec![Filter::new().kind(Kind::ContactList).limit(1)]; + let filter1 = Filter::new().kind(Kind::TextNote).limit(100); + let filter2 = Filter::new().kind(Kind::Metadata).limit(10); + let filter3 = Filter::new().kind(Kind::ContactList).limit(1); - let events1 = Events::new(&filters1); + let events1 = Events::new(&filter1); assert_eq!( events1.set.capacity(), Capacity::Bounded { @@ -248,7 +242,7 @@ mod tests { } ); - let events2 = Events::new(&filters2); + let events2 = Events::new(&filter2); assert_eq!( events2.set.capacity(), Capacity::Bounded { @@ -257,7 +251,7 @@ mod tests { } ); - let events3 = Events::new(&filters3); + let events3 = Events::new(&filter3); assert_eq!( events3.set.capacity(), Capacity::Bounded { diff --git a/crates/nostr-database/src/events/helper.rs b/crates/nostr-database/src/events/helper.rs index 487d123dc..7b58f8b87 100644 --- a/crates/nostr-database/src/events/helper.rs +++ b/crates/nostr-database/src/events/helper.rs @@ -542,42 +542,36 @@ impl InternalDatabaseHelper { .filter(move |event| !self.deleted_ids.contains(&event.id) && filter.match_event(event)) } - fn internal_query(&self, filters: I) -> InternalQueryResult - where - I: IntoIterator, - { - let mut matching_ids: BTreeSet<&DatabaseEvent> = BTreeSet::new(); + fn internal_query(&self, filter: Filter) -> InternalQueryResult { + if filter.is_empty() { + return InternalQueryResult::All; + } - for filter in filters.into_iter() { - if filter.is_empty() { - return InternalQueryResult::All; + if let (Some(since), Some(until)) = (filter.since, filter.until) { + if since > until { + return InternalQueryResult::Set(BTreeSet::new()); } + } - if let (Some(since), Some(until)) = (filter.since, filter.until) { - if since > until { - continue; + let mut matching_ids: BTreeSet<&DatabaseEvent> = BTreeSet::new(); + let limit: Option = filter.limit; + + let evs: Box> = match QueryPattern::from(filter) { + QueryPattern::Author(params) => self.internal_query_by_author(params), + QueryPattern::KindAuthor(params) => self.internal_query_by_kind_and_author(params), + QueryPattern::ParamReplaceable(params) => { + match self.internal_query_param_replaceable(params) { + Some(ev) => Box::new(iter::once(ev)), + None => Box::new(iter::empty()), } } + QueryPattern::Generic(filter) => Box::new(self.internal_generic_query(*filter)), + }; - let limit: Option = filter.limit; - - let evs: Box> = match QueryPattern::from(filter) { - QueryPattern::Author(params) => self.internal_query_by_author(params), - QueryPattern::KindAuthor(params) => self.internal_query_by_kind_and_author(params), - QueryPattern::ParamReplaceable(params) => { - match self.internal_query_param_replaceable(params) { - Some(ev) => Box::new(iter::once(ev)), - None => Box::new(iter::empty()), - } - } - QueryPattern::Generic(filter) => Box::new(self.internal_generic_query(*filter)), - }; - - if let Some(limit) = limit { - matching_ids.extend(evs.take(limit)) - } else { - matching_ids.extend(evs) - } + if let Some(limit) = limit { + matching_ids.extend(evs.take(limit)) + } else { + matching_ids.extend(evs) } InternalQueryResult::Set(matching_ids) @@ -594,29 +588,23 @@ impl InternalDatabaseHelper { } /// Query - pub fn query<'a, I>(&'a self, filters: I) -> Box + 'a> - where - I: IntoIterator, - { - match self.internal_query(filters) { + pub fn query<'a>(&'a self, filter: Filter) -> Box + 'a> { + match self.internal_query(filter) { InternalQueryResult::All => Box::new(self.events.iter().map(|ev| ev.as_ref())), InternalQueryResult::Set(set) => Box::new(set.into_iter().map(|ev| ev.as_ref())), } } /// Count events - pub fn count(&self, filters: I) -> usize - where - I: IntoIterator, - { - match self.internal_query(filters) { + pub fn count(&self, filter: Filter) -> usize { + match self.internal_query(filter) { InternalQueryResult::All => self.events.len(), InternalQueryResult::Set(set) => set.len(), } } pub fn negentropy_items(&self, filter: Filter) -> Vec<(EventId, Timestamp)> { - match self.internal_query([filter]) { + match self.internal_query(filter) { InternalQueryResult::All => self .events .iter() @@ -647,7 +635,7 @@ impl InternalDatabaseHelper { } pub fn delete(&mut self, filter: Filter) -> Option> { - match self.internal_query([filter]) { + match self.internal_query(filter) { InternalQueryResult::All => { self.clear(); None @@ -741,32 +729,26 @@ impl DatabaseHelper { } /// Query - pub async fn query(&self, filters: Vec) -> Events { + pub async fn query(&self, filter: Filter) -> Events { let inner = self.inner.read().await; - let mut events = Events::new(&filters); - events.extend(inner.query(filters).cloned()); + let mut events = Events::new(&filter); + events.extend(inner.query(filter).cloned()); events } /// Query - pub fn fast_query<'a, I>( + pub fn fast_query<'a>( &self, txn: &'a QueryTransaction, - filters: I, - ) -> Box + 'a> - where - I: IntoIterator, - { - txn.guard.query(filters) + filter: Filter, + ) -> Box + 'a> { + txn.guard.query(filter) } /// Count events - pub async fn count(&self, filters: I) -> usize - where - I: IntoIterator, - { + pub async fn count(&self, filter: Filter) -> usize { let inner = self.inner.read().await; - inner.count(filters) + inner.count(filter) } /// Get negentropy items @@ -868,35 +850,38 @@ mod tests { Event::from_json(EVENTS[1]).unwrap(), Event::from_json(EVENTS[0]).unwrap(), ]; - assert_eq!( - indexes.query(vec![Filter::new()]).await.to_vec(), - expected_output - ); - assert_eq!(indexes.count([Filter::new()]).await, 8); + assert_eq!(indexes.query(Filter::new()).await.to_vec(), expected_output); + assert_eq!(indexes.count(Filter::new()).await, 8); // Test get previously deleted replaceable event (check if was deleted by indexes) assert!(indexes - .query(vec![Filter::new() - .kind(Kind::Metadata) - .author(keys_a.public_key())]) + .query( + Filter::new() + .kind(Kind::Metadata) + .author(keys_a.public_key()) + ) .await .is_empty()); // Test get previously deleted param. replaceable event (check if was deleted by indexes) assert!(indexes - .query(vec![Filter::new() - .kind(Kind::Custom(32122)) - .author(keys_a.public_key()) - .identifier("id-2")]) + .query( + Filter::new() + .kind(Kind::Custom(32122)) + .author(keys_a.public_key()) + .identifier("id-2") + ) .await .is_empty()); // Test get param replaceable events WITHOUT using indexes (identifier not passed) assert_eq!( indexes - .query(vec![Filter::new() - .kind(Kind::Custom(32122)) - .author(keys_b.public_key())]) + .query( + Filter::new() + .kind(Kind::Custom(32122)) + .author(keys_b.public_key()) + ) .await .to_vec(), vec![ @@ -908,10 +893,12 @@ mod tests { // Test get param replaceable events using indexes assert_eq!( indexes - .query(vec![Filter::new() - .kind(Kind::Custom(32122)) - .author(keys_b.public_key()) - .identifier("id-3")]) + .query( + Filter::new() + .kind(Kind::Custom(32122)) + .author(keys_b.public_key()) + .identifier("id-3") + ) .await .to_vec(), vec![Event::from_json(EVENTS[4]).unwrap()] @@ -919,7 +906,7 @@ mod tests { assert_eq!( indexes - .query(vec![Filter::new().author(keys_a.public_key())]) + .query(Filter::new().author(keys_a.public_key())) .await .to_vec(), vec![ @@ -933,9 +920,11 @@ mod tests { assert_eq!( indexes - .query(vec![Filter::new() - .author(keys_a.public_key()) - .kinds([Kind::TextNote, Kind::Custom(32121)])]) + .query( + Filter::new() + .author(keys_a.public_key()) + .kinds([Kind::TextNote, Kind::Custom(32121)]) + ) .await .to_vec(), vec![ @@ -946,9 +935,11 @@ mod tests { assert_eq!( indexes - .query(vec![Filter::new() - .authors([keys_a.public_key(), keys_b.public_key()]) - .kinds([Kind::TextNote, Kind::Custom(32121)])]) + .query( + Filter::new() + .authors([keys_a.public_key(), keys_b.public_key()]) + .kinds([Kind::TextNote, Kind::Custom(32121)]) + ) .await .to_vec(), vec![ @@ -960,7 +951,7 @@ mod tests { // Test get param replaceable events using identifier assert_eq!( indexes - .query(vec![Filter::new().identifier("id-1")]) + .query(Filter::new().identifier("id-1")) .await .to_vec(), vec![ @@ -973,7 +964,7 @@ mod tests { // Test get param replaceable events with multiple tags using identifier assert_eq!( indexes - .query(vec![Filter::new().identifier("multi-id")]) + .query(Filter::new().identifier("multi-id")) .await .to_vec(), vec![Event::from_json(EVENTS[13]).unwrap()] @@ -981,10 +972,12 @@ mod tests { // As above but by using kind and pubkey assert_eq!( indexes - .query(vec![Filter::new() - .pubkey(keys_a.public_key()) - .kind(Kind::Custom(30333)) - .limit(1)]) + .query( + Filter::new() + .pubkey(keys_a.public_key()) + .kind(Kind::Custom(30333)) + .limit(1) + ) .await .to_vec(), vec![Event::from_json(EVENTS[13]).unwrap()] @@ -997,9 +990,11 @@ mod tests { assert!(res.to_discard.is_empty()); assert_eq!( indexes - .query(vec![Filter::new() - .kind(Kind::Metadata) - .author(keys_a.public_key())]) + .query( + Filter::new() + .kind(Kind::Metadata) + .author(keys_a.public_key()) + ) .await .to_vec(), vec![first_ev_metadata.clone()] @@ -1012,9 +1007,11 @@ mod tests { assert!(res.to_discard.contains(&first_ev_metadata.id)); assert_eq!( indexes - .query(vec![Filter::new() - .kind(Kind::Metadata) - .author(keys_a.public_key())]) + .query( + Filter::new() + .kind(Kind::Metadata) + .author(keys_a.public_key()) + ) .await .to_vec(), vec![ev] diff --git a/crates/nostr-database/src/events/mod.rs b/crates/nostr-database/src/events/mod.rs index b92ac325e..5148a60f8 100644 --- a/crates/nostr-database/src/events/mod.rs +++ b/crates/nostr-database/src/events/mod.rs @@ -146,13 +146,13 @@ pub trait NostrEventsDatabase: fmt::Debug + Send + Sync { event_id: &'a EventId, ) -> BoxedFuture<'a, Result, DatabaseError>>; - /// Count number of [`Event`] found by filters + /// Count the number of [`Event`] found by [`Filter`]. /// /// Use `Filter::new()` or `Filter::default()` to count all events. - fn count(&self, filters: Vec) -> BoxedFuture>; + fn count(&self, filter: Filter) -> BoxedFuture>; - /// Query store with filters - fn query(&self, filters: Vec) -> BoxedFuture>; + /// Query stored events with [`Filter`]. + fn query(&self, filter: Filter) -> BoxedFuture>; /// Get `negentropy` items fn negentropy_items( @@ -160,7 +160,7 @@ pub trait NostrEventsDatabase: fmt::Debug + Send + Sync { filter: Filter, ) -> BoxedFuture, DatabaseError>> { Box::pin(async move { - let events: Events = self.query(vec![filter]).await?; + let events: Events = self.query(filter).await?; Ok(events.into_iter().map(|e| (e.id, e.created_at)).collect()) }) } @@ -181,7 +181,7 @@ pub trait NostrEventsDatabaseExt: NostrEventsDatabase { .author(public_key) .kind(Kind::Metadata) .limit(1); - let events: Events = self.query(vec![filter]).await?; + let events: Events = self.query(filter).await?; match events.first_owned() { Some(event) => Ok(Some( Metadata::from_json(event.content).map_err(DatabaseError::backend)?, @@ -201,7 +201,7 @@ pub trait NostrEventsDatabaseExt: NostrEventsDatabase { .author(public_key) .kind(Kind::ContactList) .limit(1); - let events: Events = self.query(vec![filter]).await?; + let events: Events = self.query(filter).await?; match events.first_owned() { Some(event) => Ok(event.tags.public_keys().copied().collect()), None => Ok(HashSet::new()), @@ -219,7 +219,7 @@ pub trait NostrEventsDatabaseExt: NostrEventsDatabase { .author(public_key) .kind(Kind::ContactList) .limit(1); - let events: Events = self.query(vec![filter]).await?; + let events: Events = self.query(filter).await?; match events.first_owned() { Some(event) => { // Get contacts metadata @@ -227,7 +227,7 @@ pub trait NostrEventsDatabaseExt: NostrEventsDatabase { .authors(event.tags.public_keys().copied()) .kind(Kind::Metadata); let mut contacts: HashSet = self - .query(vec![filter]) + .query(filter) .await? .into_iter() .map(|e| { @@ -257,7 +257,7 @@ pub trait NostrEventsDatabaseExt: NostrEventsDatabase { .author(public_key) .kind(Kind::RelayList) .limit(1); - let events: Events = self.query(vec![filter]).await?; + let events: Events = self.query(filter).await?; // Extract relay list (NIP65) match events.first_owned() { @@ -280,7 +280,7 @@ pub trait NostrEventsDatabaseExt: NostrEventsDatabase { Box::pin(async move { // Query let filter: Filter = Filter::default().authors(public_keys).kind(Kind::RelayList); - let events: Events = self.query(vec![filter]).await?; + let events: Events = self.query(filter).await?; let mut map = HashMap::with_capacity(events.len()); diff --git a/crates/nostr-database/src/memory.rs b/crates/nostr-database/src/memory.rs index fcb73f702..1baf669ff 100644 --- a/crates/nostr-database/src/memory.rs +++ b/crates/nostr-database/src/memory.rs @@ -159,12 +159,12 @@ impl NostrEventsDatabase for MemoryDatabase { Box::pin(async move { Ok(self.helper.event_by_id(event_id).await) }) } - fn count(&self, filters: Vec) -> BoxedFuture> { - Box::pin(async move { Ok(self.helper.count(filters).await) }) + fn count(&self, filter: Filter) -> BoxedFuture> { + Box::pin(async move { Ok(self.helper.count(filter).await) }) } - fn query(&self, filters: Vec) -> BoxedFuture> { - Box::pin(async move { Ok(self.helper.query(filters).await) }) + fn query(&self, filter: Filter) -> BoxedFuture> { + Box::pin(async move { Ok(self.helper.query(filter).await) }) } fn negentropy_items( diff --git a/crates/nostr-indexeddb/src/lib.rs b/crates/nostr-indexeddb/src/lib.rs index 3e30178f1..77659c702 100644 --- a/crates/nostr-indexeddb/src/lib.rs +++ b/crates/nostr-indexeddb/src/lib.rs @@ -386,12 +386,12 @@ impl NostrEventsDatabase for WebDatabase { Box::pin(async move { Ok(self.helper.event_by_id(event_id).await) }) } - fn count(&self, filters: Vec) -> BoxedFuture> { - Box::pin(async move { Ok(self.helper.count(filters).await) }) + fn count(&self, filter: Filter) -> BoxedFuture> { + Box::pin(async move { Ok(self.helper.count(filter).await) }) } - fn query(&self, filters: Vec) -> BoxedFuture> { - Box::pin(async move { Ok(self.helper.query(filters).await) }) + fn query(&self, filter: Filter) -> BoxedFuture> { + Box::pin(async move { Ok(self.helper.query(filter).await) }) } fn negentropy_items( diff --git a/crates/nostr-lmdb/src/lib.rs b/crates/nostr-lmdb/src/lib.rs index 67cffa436..33d464e2f 100644 --- a/crates/nostr-lmdb/src/lib.rs +++ b/crates/nostr-lmdb/src/lib.rs @@ -129,12 +129,12 @@ impl NostrEventsDatabase for NostrLMDB { }) } - fn count(&self, filters: Vec) -> BoxedFuture> { - Box::pin(async move { self.db.count(filters).await.map_err(DatabaseError::backend) }) + fn count(&self, filter: Filter) -> BoxedFuture> { + Box::pin(async move { self.db.count(filter).await.map_err(DatabaseError::backend) }) } - fn query(&self, filters: Vec) -> BoxedFuture> { - Box::pin(async move { self.db.query(filters).await.map_err(DatabaseError::backend) }) + fn query(&self, filter: Filter) -> BoxedFuture> { + Box::pin(async move { self.db.query(filter).await.map_err(DatabaseError::backend) }) } fn negentropy_items( @@ -268,7 +268,7 @@ mod tests { } async fn count_all(&self) -> usize { - self.db.count(vec![Filter::new()]).await.unwrap() + self.db.count(Filter::new()).await.unwrap() } } @@ -310,9 +310,7 @@ mod tests { // Test filter query let events = db - .query(vec![Filter::new() - .author(keys.public_key) - .kind(Kind::Metadata)]) + .query(Filter::new().author(keys.public_key).kind(Kind::Metadata)) .await .unwrap(); assert_eq!(events.to_vec(), vec![expected_event.clone()]); @@ -342,9 +340,7 @@ mod tests { // Test filter query let events = db - .query(vec![Filter::new() - .author(keys.public_key) - .kind(Kind::Metadata)]) + .query(Filter::new().author(keys.public_key).kind(Kind::Metadata)) .await .unwrap(); assert_eq!(events.to_vec(), vec![new_expected_event]); @@ -375,7 +371,7 @@ mod tests { assert_eq!(event, expected_event); // Test filter query - let events = db.query(vec![coordinate.clone().into()]).await.unwrap(); + let events = db.query(coordinate.clone().into()).await.unwrap(); assert_eq!(events.to_vec(), vec![expected_event.clone()]); // Check if number of events in database match the expected @@ -404,7 +400,7 @@ mod tests { assert_eq!(event, new_expected_event); // Test filter query - let events = db.query(vec![coordinate.into()]).await.unwrap(); + let events = db.query(coordinate.into()).await.unwrap(); assert_eq!(events.to_vec(), vec![new_expected_event]); // Check if number of events in database match the expected @@ -428,28 +424,19 @@ mod tests { let _added_events: usize = db.add_random_events().await; - let events = db - .query(vec![Filter::new().search("Account A")]) - .await - .unwrap(); + let events = db.query(Filter::new().search("Account A")).await.unwrap(); assert_eq!(events.len(), 1); - let events = db - .query(vec![Filter::new().search("account a")]) - .await - .unwrap(); + let events = db.query(Filter::new().search("account a")).await.unwrap(); assert_eq!(events.len(), 1); - let events = db - .query(vec![Filter::new().search("text note")]) - .await - .unwrap(); + let events = db.query(Filter::new().search("text note")).await.unwrap(); assert_eq!(events.len(), 2); - let events = db.query(vec![Filter::new().search("notes")]).await.unwrap(); + let events = db.query(Filter::new().search("notes")).await.unwrap(); assert_eq!(events.len(), 0); - let events = db.query(vec![Filter::new().search("hola")]).await.unwrap(); + let events = db.query(Filter::new().search("hola")).await.unwrap(); assert_eq!(events.len(), 0); } @@ -480,7 +467,7 @@ mod tests { Event::from_json(EVENTS[0]).unwrap(), ]; assert_eq!( - db.query(vec![Filter::new()]).await.unwrap().to_vec(), + db.query(Filter::new()).await.unwrap().to_vec(), expected_output ); assert_eq!(db.count_all().await, 8); diff --git a/crates/nostr-lmdb/src/store/lmdb/mod.rs b/crates/nostr-lmdb/src/store/lmdb/mod.rs index 2365aeaa7..ec158055c 100644 --- a/crates/nostr-lmdb/src/store/lmdb/mod.rs +++ b/crates/nostr-lmdb/src/store/lmdb/mod.rs @@ -294,24 +294,8 @@ impl Lmdb { } } - pub fn query<'a, I>( - &self, - txn: &'a RoTxn, - filters: I, - ) -> Result>, Error> - where - I: IntoIterator, - { - let mut output: BTreeSet> = BTreeSet::new(); - for filter in filters.into_iter() { - let events = self.single_filter_query(txn, filter)?; - output.extend(events); - } - Ok(output) - } - pub fn delete(&self, read_txn: &RoTxn, txn: &mut RwTxn, filter: Filter) -> Result<(), Error> { - let events = self.single_filter_query(read_txn, filter)?; + let events = self.query(read_txn, filter)?; for event in events.into_iter() { self.remove(txn, &event)?; } @@ -319,7 +303,7 @@ impl Lmdb { } /// Find all events that match the filter - fn single_filter_query<'a>( + pub fn query<'a>( &self, txn: &'a RoTxn, filter: Filter, diff --git a/crates/nostr-lmdb/src/store/mod.rs b/crates/nostr-lmdb/src/store/mod.rs index 2b8defd6e..8caf18361 100644 --- a/crates/nostr-lmdb/src/store/mod.rs +++ b/crates/nostr-lmdb/src/store/mod.rs @@ -3,7 +3,6 @@ // Copyright (c) 2023-2024 Rust Nostr Developers // Distributed under the MIT software license -use std::collections::BTreeSet; use std::fs; use std::path::Path; use std::sync::{Arc, Mutex}; @@ -251,11 +250,11 @@ impl Store { .await? } - pub async fn count(&self, filters: Vec) -> Result { + pub async fn count(&self, filter: Filter) -> Result { self.interact(move |db| { let txn = db.read_txn()?; - let output = db.query(&txn, filters)?; - let len: usize = output.len(); + let output = db.query(&txn, filter)?; + let len: usize = output.count(); txn.commit()?; Ok(len) }) @@ -263,12 +262,12 @@ impl Store { } // Lookup ID: EVENT_ORD_IMPL - pub async fn query(&self, filters: Vec) -> Result { + pub async fn query(&self, filter: Filter) -> Result { self.interact(move |db| { - let mut events: Events = Events::new(&filters); + let mut events: Events = Events::new(&filter); let txn: RoTxn = db.read_txn()?; - let output: BTreeSet = db.query(&txn, filters)?; + let output = db.query(&txn, filter)?; events.extend(output.into_iter().map(|e| e.into_owned())); txn.commit()?; @@ -283,7 +282,7 @@ impl Store { ) -> Result, Error> { self.interact(move |db| { let txn = db.read_txn()?; - let events = db.query(&txn, vec![filter])?; + let events = db.query(&txn, filter)?; let items = events .into_iter() .map(|e| (EventId::from_byte_array(*e.id), e.created_at)) diff --git a/crates/nostr-ndb/src/lib.rs b/crates/nostr-ndb/src/lib.rs index 62f910867..79c4322c7 100644 --- a/crates/nostr-ndb/src/lib.rs +++ b/crates/nostr-ndb/src/lib.rs @@ -139,19 +139,19 @@ impl NostrEventsDatabase for NdbDatabase { }) } - fn count(&self, filters: Vec) -> BoxedFuture> { + fn count(&self, filter: Filter) -> BoxedFuture> { Box::pin(async move { let txn: Transaction = Transaction::new(&self.db).map_err(DatabaseError::backend)?; - let res: Vec = ndb_query(&self.db, &txn, filters)?; + let res: Vec = ndb_query(&self.db, &txn, filter)?; Ok(res.len()) }) } - fn query(&self, filters: Vec) -> BoxedFuture> { + fn query(&self, filter: Filter) -> BoxedFuture> { Box::pin(async move { let txn: Transaction = Transaction::new(&self.db).map_err(DatabaseError::backend)?; - let mut events: Events = Events::new(&filters); - let res: Vec = ndb_query(&self.db, &txn, filters)?; + let mut events: Events = Events::new(&filter); + let res: Vec = ndb_query(&self.db, &txn, filter)?; events.extend( res.into_iter() .filter_map(|r| ndb_note_to_event(r.note).ok()) @@ -167,7 +167,7 @@ impl NostrEventsDatabase for NdbDatabase { ) -> BoxedFuture, DatabaseError>> { Box::pin(async move { let txn: Transaction = Transaction::new(&self.db).map_err(DatabaseError::backend)?; - let res: Vec = ndb_query(&self.db, &txn, vec![filter])?; + let res: Vec = ndb_query(&self.db, &txn, filter)?; Ok(res .into_iter() .map(|r| ndb_note_to_neg_item(r.note)) @@ -190,10 +190,10 @@ impl NostrDatabaseWipe for NdbDatabase { fn ndb_query<'a>( db: &Ndb, txn: &'a Transaction, - filters: Vec, + filter: Filter, ) -> Result>, DatabaseError> { - let filters: Vec = filters.into_iter().map(ndb_filter_conversion).collect(); - db.query(txn, &filters, MAX_RESULTS) + let filter: nostrdb::Filter = ndb_filter_conversion(filter); + db.query(txn, &[filter], MAX_RESULTS) .map_err(DatabaseError::backend) } diff --git a/crates/nostr-relay-builder/examples/policy.rs b/crates/nostr-relay-builder/examples/policy.rs index e12d2123d..06eb63da6 100644 --- a/crates/nostr-relay-builder/examples/policy.rs +++ b/crates/nostr-relay-builder/examples/policy.rs @@ -38,14 +38,11 @@ struct RejectAuthorLimit { impl QueryPolicy for RejectAuthorLimit { fn admit_query<'a>( &'a self, - query: &'a [Filter], + query: &'a Filter, _addr: &'a SocketAddr, ) -> BoxedFuture<'a, PolicyResult> { Box::pin(async move { - if query - .iter() - .any(|f| f.authors.as_ref().map(|a| a.len()).unwrap_or(0) > self.limit) - { + if query.authors.as_ref().map(|a| a.len()).unwrap_or(0) > self.limit { PolicyResult::Reject("query too expensive".to_string()) } else { PolicyResult::Accept diff --git a/crates/nostr-relay-builder/src/builder.rs b/crates/nostr-relay-builder/src/builder.rs index b297a4b47..b8d152a76 100644 --- a/crates/nostr-relay-builder/src/builder.rs +++ b/crates/nostr-relay-builder/src/builder.rs @@ -117,7 +117,7 @@ pub trait QueryPolicy: fmt::Debug + Send + Sync { /// Check if the policy should accept a query fn admit_query<'a>( &'a self, - query: &'a [Filter], + query: &'a Filter, addr: &'a SocketAddr, ) -> BoxedFuture<'a, PolicyResult>; } diff --git a/crates/nostr-relay-builder/src/local/inner.rs b/crates/nostr-relay-builder/src/local/inner.rs index 935cd5647..f856bbe1f 100644 --- a/crates/nostr-relay-builder/src/local/inner.rs +++ b/crates/nostr-relay-builder/src/local/inner.rs @@ -265,8 +265,8 @@ impl InnerLocalRelay { event = new_event.recv() => { if let Ok(event) = event { // Iter subscriptions - for (id, filters) in session.subscriptions.iter() { - if filters.iter().any(|f| f.match_event(&event)) { + for (id, filter) in session.subscriptions.iter() { + if filter.match_event(&event) { self.send_msg(&mut tx, RelayMessage::event(id.to_owned(), event.clone())).await?; } } @@ -529,8 +529,9 @@ impl InnerLocalRelay { } ClientMessage::Req { subscription_id, - filters, + filter, } => { + // TODO: remove this limit // Check number of subscriptions if session.subscriptions.len() >= self.rate_limit.max_reqs && !session.subscriptions.contains_key(&subscription_id) @@ -582,7 +583,7 @@ impl InnerLocalRelay { // check query policy plugins for plugin in self.query_policy.iter() { - if let PolicyResult::Reject(msg) = plugin.admit_query(&filters, addr).await { + if let PolicyResult::Reject(msg) = plugin.admit_query(&filter, addr).await { return self .send_msg( ws_tx, @@ -598,10 +599,10 @@ impl InnerLocalRelay { // Update session subscriptions session .subscriptions - .insert(subscription_id.clone(), filters.clone()); + .insert(subscription_id.clone(), *filter.clone()); // Query database - let events = self.database.query(filters).await?; + let events = self.database.query(*filter).await?; tracing::debug!( "Found {} events for subscription '{subscription_id}'", @@ -622,9 +623,9 @@ impl InnerLocalRelay { } ClientMessage::Count { subscription_id, - filters, + filter, } => { - let count: usize = self.database.count(filters).await?; + let count: usize = self.database.count(*filter).await?; self.send_msg(ws_tx, RelayMessage::count(subscription_id, count)) .await } diff --git a/crates/nostr-relay-builder/src/local/session.rs b/crates/nostr-relay-builder/src/local/session.rs index a9a3a7f01..97b9013a1 100644 --- a/crates/nostr-relay-builder/src/local/session.rs +++ b/crates/nostr-relay-builder/src/local/session.rs @@ -73,7 +73,7 @@ impl Nip42Session { } pub(super) struct Session<'a> { - pub subscriptions: HashMap>, + pub subscriptions: HashMap, pub negentropy_subscription: HashMap>, pub nip42: Nip42Session, pub tokens: Tokens, diff --git a/crates/nostr-relay-pool/src/pool/inner.rs b/crates/nostr-relay-pool/src/pool/inner.rs index f617f8180..4697cb980 100644 --- a/crates/nostr-relay-pool/src/pool/inner.rs +++ b/crates/nostr-relay-pool/src/pool/inner.rs @@ -27,7 +27,7 @@ pub(super) type Relays = HashMap; #[derive(Debug)] pub(super) struct AtomicPrivateData { pub(super) relays: RwLock, - subscriptions: RwLock>>, + subscriptions: RwLock>, shutdown: AtomicBool, } @@ -83,19 +83,19 @@ impl InnerRelayPool { self.atomic.shutdown.store(true, Ordering::SeqCst); } - pub async fn subscriptions(&self) -> HashMap> { + pub async fn subscriptions(&self) -> HashMap { self.atomic.subscriptions.read().await.clone() } - pub async fn subscription(&self, id: &SubscriptionId) -> Option> { + pub async fn subscription(&self, id: &SubscriptionId) -> Option { let subscriptions = self.atomic.subscriptions.read().await; subscriptions.get(id).cloned() } - pub async fn save_subscription(&self, id: SubscriptionId, filters: Vec) { + pub async fn save_subscription(&self, id: SubscriptionId, filter: Filter) { let mut subscriptions = self.atomic.subscriptions.write().await; - let current: &mut Vec = subscriptions.entry(id).or_default(); - *current = filters; + let current: &mut Filter = subscriptions.entry(id).or_default(); + *current = filter; } pub(crate) async fn remove_subscription(&self, id: &SubscriptionId) { diff --git a/crates/nostr-relay-pool/src/pool/mod.rs b/crates/nostr-relay-pool/src/pool/mod.rs index 8ae226981..c5f334144 100644 --- a/crates/nostr-relay-pool/src/pool/mod.rs +++ b/crates/nostr-relay-pool/src/pool/mod.rs @@ -458,13 +458,13 @@ impl RelayPool { /// Get subscriptions #[inline] - pub async fn subscriptions(&self) -> HashMap> { + pub async fn subscriptions(&self) -> HashMap { self.inner.subscriptions().await } /// Get subscription #[inline] - pub async fn subscription(&self, id: &SubscriptionId) -> Option> { + pub async fn subscription(&self, id: &SubscriptionId) -> Option { self.inner.subscription(id).await } @@ -472,8 +472,8 @@ impl RelayPool { /// /// When a new relay will be added, saved subscriptions will be automatically used for it. #[inline] - pub async fn save_subscription(&self, id: SubscriptionId, filters: Vec) { - self.inner.save_subscription(id, filters).await + pub async fn save_subscription(&self, id: SubscriptionId, filter: Filter) { + self.inner.save_subscription(id, filter).await } /// Send a client message to specific relays @@ -637,11 +637,11 @@ impl RelayPool { /// Check [`RelayPool::subscribe_with_id_to`] docs to learn more. pub async fn subscribe( &self, - filters: Vec, + filter: Filter, opts: SubscribeOptions, ) -> Result, Error> { let id: SubscriptionId = SubscriptionId::generate(); - let output: Output<()> = self.subscribe_with_id(id.clone(), filters, opts).await?; + let output: Output<()> = self.subscribe_with_id(id.clone(), filter, opts).await?; Ok(Output { val: id, success: output.success, @@ -655,20 +655,20 @@ impl RelayPool { pub async fn subscribe_with_id( &self, id: SubscriptionId, - filters: Vec, + filter: Filter, opts: SubscribeOptions, ) -> Result, Error> { // Check if isn't auto-closing subscription if !opts.is_auto_closing() { // Save subscription - self.save_subscription(id.clone(), filters.clone()).await; + self.save_subscription(id.clone(), filter.clone()).await; } // Get relay urls let urls: Vec = self.read_relay_urls().await; // Subscribe - self.subscribe_with_id_to(urls, id, filters, opts).await + self.subscribe_with_id_to(urls, id, filter, opts).await } /// Subscribe to filters to specific relays @@ -677,7 +677,7 @@ impl RelayPool { pub async fn subscribe_to( &self, urls: I, - filters: Vec, + filter: Filter, opts: SubscribeOptions, ) -> Result, Error> where @@ -687,7 +687,7 @@ impl RelayPool { { let id: SubscriptionId = SubscriptionId::generate(); let output: Output<()> = self - .subscribe_with_id_to(urls, id.clone(), filters, opts) + .subscribe_with_id_to(urls, id.clone(), filter, opts) .await?; Ok(Output { val: id, @@ -711,7 +711,7 @@ impl RelayPool { &self, urls: I, id: SubscriptionId, - filters: Vec, + filter: Filter, opts: SubscribeOptions, ) -> Result, Error> where @@ -719,7 +719,7 @@ impl RelayPool { U: TryIntoUrl, Error: From<::Err>, { - let targets = urls.into_iter().map(|u| (u, filters.clone())); + let targets = urls.into_iter().map(|u| (u, filter.clone())); self.subscribe_targeted(id, targets, opts).await } @@ -733,12 +733,12 @@ impl RelayPool { opts: SubscribeOptions, ) -> Result, Error> where - I: IntoIterator)>, + I: IntoIterator, U: TryIntoUrl, Error: From<::Err>, { // Collect targets - let targets: HashMap> = targets + let targets: HashMap = targets .into_iter() .map(|(u, f)| Ok((u.try_into_url()?, f))) .collect::>()?; @@ -766,11 +766,11 @@ impl RelayPool { let mut output: Output<()> = Output::default(); // Compose futures - for (url, filters) in targets.into_iter() { + for (url, filter) in targets.into_iter() { let relay: &Relay = self.internal_relay(&relays, &url)?; let id: SubscriptionId = id.clone(); urls.push(url); - futures.push(relay.subscribe_with_id(id, filters, opts)); + futures.push(relay.subscribe_with_id(id, filter, opts)); } // Join futures @@ -862,12 +862,10 @@ impl RelayPool { .negentropy_items(filter.clone()) .await?; - // Compose filters - let mut filters: HashMap> = HashMap::with_capacity(1); - filters.insert(filter, items); + let tup: (Filter, Vec<(EventId, Timestamp)>) = (filter, items); // Reconcile - let targets = urls.into_iter().map(|u| (u, filters.clone())); + let targets = urls.into_iter().map(|u| (u, tup.clone())); self.sync_targeted(targets, opts).await } @@ -878,12 +876,12 @@ impl RelayPool { opts: &SyncOptions, ) -> Result, Error> where - I: IntoIterator>)>, + I: IntoIterator))>, U: TryIntoUrl, Error: From<::Err>, { // Collect targets - let targets: HashMap>> = targets + let targets: HashMap)> = targets .into_iter() .map(|(u, v)| Ok((u.try_into_url()?, v))) .collect::>()?; @@ -913,10 +911,10 @@ impl RelayPool { let mut output: Output = Output::default(); // Compose futures - for (url, filters) in targets.into_iter() { + for (url, (filter, items)) in targets.into_iter() { let relay: &Relay = self.internal_relay(&relays, &url)?; urls.push(url); - futures.push(relay.sync_multi(filters, opts)); + futures.push(relay.sync_with_items(filter, items, opts)); } // Join futures @@ -947,19 +945,19 @@ impl RelayPool { /// Fetch events from relays with [`RelayServiceFlags::READ`] flag. pub async fn fetch_events( &self, - filters: Vec, + filter: Filter, timeout: Duration, policy: ReqExitPolicy, ) -> Result { let urls: Vec = self.read_relay_urls().await; - self.fetch_events_from(urls, filters, timeout, policy).await + self.fetch_events_from(urls, filter, timeout, policy).await } /// Fetch events from specific relays pub async fn fetch_events_from( &self, urls: I, - filters: Vec, + filter: Filter, timeout: Duration, policy: ReqExitPolicy, ) -> Result @@ -968,11 +966,11 @@ impl RelayPool { U: TryIntoUrl, Error: From<::Err>, { - let mut events: Events = Events::new(&filters); + let mut events: Events = Events::new(&filter); // Stream events let mut stream = self - .stream_events_from(urls, filters, timeout, policy) + .stream_events_from(urls, filter, timeout, policy) .await?; while let Some(event) = stream.next().await { events.insert(event); @@ -984,20 +982,19 @@ impl RelayPool { /// Stream events from relays with `READ` flag. pub async fn stream_events( &self, - filters: Vec, + filter: Filter, timeout: Duration, policy: ReqExitPolicy, ) -> Result, Error> { let urls: Vec = self.read_relay_urls().await; - self.stream_events_from(urls, filters, timeout, policy) - .await + self.stream_events_from(urls, filter, timeout, policy).await } /// Stream events from specific relays pub async fn stream_events_from( &self, urls: I, - filters: Vec, + filter: Filter, timeout: Duration, policy: ReqExitPolicy, ) -> Result, Error> @@ -1008,7 +1005,7 @@ impl RelayPool { { let targets = urls .into_iter() - .map(|u| Ok((u.try_into_url()?, filters.clone()))) + .map(|u| Ok((u.try_into_url()?, filter.clone()))) .collect::>()?; self.stream_events_targeted(targets, timeout, policy).await } @@ -1018,11 +1015,11 @@ impl RelayPool { /// Stream events from specific relays with specific filters pub async fn stream_events_targeted( &self, - targets: HashMap>, + targets: HashMap, timeout: Duration, policy: ReqExitPolicy, ) -> Result, Error> { - // Check if targets map is empty + // Check if `targets` map is empty if targets.is_empty() { return Err(Error::NoRelaysSpecified); } @@ -1036,17 +1033,16 @@ impl RelayPool { } // Construct new map with also `Relay` struct - let mut map: HashMap)> = - HashMap::with_capacity(targets.len()); + let mut map: HashMap = HashMap::with_capacity(targets.len()); // Populate the new map. // Return an error if the relay doesn't exists. - for (url, filters) in targets.into_iter() { + for (url, filter) in targets.into_iter() { // Get relay let relay: Relay = self.internal_relay(&relays, &url).cloned()?; // Insert into new map - map.insert(url, (relay, filters)); + map.insert(url, (relay, filter)); } // Drop relays read guard @@ -1065,10 +1061,10 @@ impl RelayPool { let mut futures = Vec::with_capacity(map.len()); // Populate `urls` and `futures` vectors - for (url, (relay, filters)) in map.into_iter() { + for (url, (relay, filter)) in map.into_iter() { urls.push(url); futures.push(relay.fetch_events_with_callback_owned( - filters, + filter, timeout, policy, |event| { diff --git a/crates/nostr-relay-pool/src/relay/error.rs b/crates/nostr-relay-pool/src/relay/error.rs index 72ca8d69f..efd100bea 100644 --- a/crates/nostr-relay-pool/src/relay/error.rs +++ b/crates/nostr-relay-pool/src/relay/error.rs @@ -74,8 +74,6 @@ pub enum Error { ReadDisabled, /// Write actions disabled WriteDisabled, - /// Filters empty - FiltersEmpty, /// Negentropy not supported NegentropyNotSupported, /// Unknown negentropy error @@ -157,7 +155,6 @@ impl fmt::Display for Error { Self::BatchMessagesEmpty => write!(f, "can't batch empty list of messages"), Self::ReadDisabled => write!(f, "read actions are disabled"), Self::WriteDisabled => write!(f, "write actions are disabled"), - Self::FiltersEmpty => write!(f, "filters empty"), Self::NegentropyNotSupported => write!(f, "negentropy not supported"), Self::UnknownNegentropyError => write!(f, "unknown negentropy error"), Self::RelayMessageTooLarge { size, max_size } => write!( diff --git a/crates/nostr-relay-pool/src/relay/inner.rs b/crates/nostr-relay-pool/src/relay/inner.rs index 2348f2559..d1ed42497 100644 --- a/crates/nostr-relay-pool/src/relay/inner.rs +++ b/crates/nostr-relay-pool/src/relay/inner.rs @@ -90,7 +90,7 @@ impl RelayChannels { #[derive(Debug, Clone)] struct SubscriptionData { - pub filters: Vec, + pub filter: Filter, pub subscribed_at: Timestamp, /// Subscription closed by relay pub closed: bool, @@ -99,7 +99,8 @@ struct SubscriptionData { impl Default for SubscriptionData { fn default() -> Self { Self { - filters: Vec::new(), + // TODO: use `Option`? + filter: Filter::new(), subscribed_at: Timestamp::zero(), closed: false, } @@ -270,28 +271,28 @@ impl InnerRelay { } } - pub async fn subscriptions(&self) -> HashMap> { + pub async fn subscriptions(&self) -> HashMap { let subscription = self.atomic.subscriptions.read().await; subscription .iter() - .map(|(k, v)| (k.clone(), v.filters.clone())) + .map(|(k, v)| (k.clone(), v.filter.clone())) .collect() } - pub async fn subscription(&self, id: &SubscriptionId) -> Option> { + pub async fn subscription(&self, id: &SubscriptionId) -> Option { let subscription = self.atomic.subscriptions.read().await; - subscription.get(id).map(|d| d.filters.clone()) + subscription.get(id).map(|d| d.filter.clone()) } pub(crate) async fn update_subscription( &self, id: SubscriptionId, - filters: Vec, + filter: Filter, update_subscribed_at: bool, ) { let mut subscriptions = self.atomic.subscriptions.write().await; let data: &mut SubscriptionData = subscriptions.entry(id).or_default(); - data.filters = filters; + data.filter = filter; if update_subscribed_at { data.subscribed_at = Timestamp::now(); @@ -1166,9 +1167,9 @@ impl InnerRelay { pub async fn resubscribe(&self) -> Result<(), Error> { let subscriptions = self.subscriptions().await; - for (id, filters) in subscriptions.into_iter() { - if !filters.is_empty() && self.should_resubscribe(&id).await { - self.send_msg(ClientMessage::req(id, filters))?; + for (id, filter) in subscriptions.into_iter() { + if !filter.is_empty() && self.should_resubscribe(&id).await { + self.send_msg(ClientMessage::req(id, filter))?; } else { tracing::debug!("Skip re-subscription of '{id}'"); } @@ -1180,13 +1181,13 @@ impl InnerRelay { pub(super) fn spawn_auto_closing_handler( &self, id: SubscriptionId, - filters: Vec, + filter: Filter, opts: SubscribeAutoCloseOptions, ) { let relay = self.clone(); // <-- FULL RELAY CLONE HERE task::spawn(async move { // Check if CLOSE needed - let to_close: bool = match relay.handle_auto_closing(&id, filters, opts).await { + let to_close: bool = match relay.handle_auto_closing(&id, filter, opts).await { Some((to_close, reason)) => { // Send subscription auto-closed notification if let Some(reason) = reason { @@ -1218,7 +1219,7 @@ impl InnerRelay { async fn handle_auto_closing( &self, id: &SubscriptionId, - filters: Vec, + filter: Filter, opts: SubscribeAutoCloseOptions, ) -> Option<(bool, Option)> { time::timeout(opts.timeout, async move { @@ -1307,8 +1308,7 @@ impl InnerRelay { // Resend REQ if require_resubscription { require_resubscription = false; - let msg: ClientMessage = - ClientMessage::req(id.clone(), filters.clone()); + let msg: ClientMessage = ClientMessage::req(id.clone(), filter.clone()); let _ = self.send_msg(msg); } } @@ -1518,7 +1518,7 @@ impl InnerRelay { } let filter = Filter::new().ids(ids); - self.send_msg(ClientMessage::req(down_sub_id.clone(), vec![filter]))?; + self.send_msg(ClientMessage::req(down_sub_id.clone(), filter))?; *in_flight_down = true; diff --git a/crates/nostr-relay-pool/src/relay/mod.rs b/crates/nostr-relay-pool/src/relay/mod.rs index f144afb88..f5e823259 100644 --- a/crates/nostr-relay-pool/src/relay/mod.rs +++ b/crates/nostr-relay-pool/src/relay/mod.rs @@ -224,13 +224,13 @@ impl Relay { /// Get subscriptions #[inline] - pub async fn subscriptions(&self) -> HashMap> { + pub async fn subscriptions(&self) -> HashMap { self.inner.subscriptions().await } /// Get filters by [SubscriptionId] #[inline] - pub async fn subscription(&self, id: &SubscriptionId) -> Option> { + pub async fn subscription(&self, id: &SubscriptionId) -> Option { self.inner.subscription(id).await } @@ -447,7 +447,7 @@ impl Relay { /// Subscribe to filters /// - /// Internally generate a new random [SubscriptionId]. Check `subscribe_with_id` method to use a custom [SubscriptionId]. + /// Internally generate a new random [`SubscriptionId`]. Check `subscribe_with_id` method to use a custom [SubscriptionId]. /// /// ### Auto-closing subscription /// @@ -456,7 +456,7 @@ impl Relay { /// Note: auto-closing subscriptions aren't saved in subscriptions map! pub async fn subscribe( &self, - filters: Vec, + filters: Filter, opts: SubscribeOptions, ) -> Result { let id: SubscriptionId = SubscriptionId::generate(); @@ -464,7 +464,7 @@ impl Relay { Ok(id) } - /// Subscribe with custom [SubscriptionId] + /// Subscribe with custom [`SubscriptionId`] /// /// ### Auto-closing subscription /// @@ -474,24 +474,19 @@ impl Relay { pub async fn subscribe_with_id( &self, id: SubscriptionId, - filters: Vec, + filter: Filter, opts: SubscribeOptions, ) -> Result<(), Error> { - // Check if filters are empty - if filters.is_empty() { - return Err(Error::FiltersEmpty); - } - // Compose and send REQ message - let msg: ClientMessage = ClientMessage::req(id.clone(), filters.clone()); + let msg: ClientMessage = ClientMessage::req(id.clone(), filter.clone()); self.send_msg(msg)?; // Check if auto-close condition is set match opts.auto_close { - Some(opts) => self.inner.spawn_auto_closing_handler(id, filters, opts), + Some(opts) => self.inner.spawn_auto_closing_handler(id, filter, opts), None => { - // No auto-close subscription: update subscription filters - self.inner.update_subscription(id, filters, true).await; + // No auto-close subscription: update subscription filter + self.inner.update_subscription(id, filter, true).await; } }; @@ -510,10 +505,10 @@ impl Relay { self.inner.unsubscribe_all().await } - /// Get events of filters with custom callback + /// Get events of filter with custom callback pub(crate) async fn fetch_events_with_callback( &self, - filters: Vec, + filter: Filter, timeout: Duration, policy: ReqExitPolicy, mut callback: impl FnMut(Event), @@ -532,7 +527,7 @@ impl Relay { let mut notifications = self.inner.internal_notification_sender.subscribe(); // Subscribe with auto-close - let id: SubscriptionId = self.subscribe(filters, subscribe_opts).await?; + let id: SubscriptionId = self.subscribe(filter, subscribe_opts).await?; time::timeout(Some(timeout), async { while let Ok(notification) = notifications.recv().await { @@ -582,24 +577,24 @@ impl Relay { #[inline] pub(crate) async fn fetch_events_with_callback_owned( self, - filters: Vec, + filter: Filter, timeout: Duration, policy: ReqExitPolicy, callback: impl Fn(Event), ) -> Result<(), Error> { - self.fetch_events_with_callback(filters, timeout, policy, callback) + self.fetch_events_with_callback(filter, timeout, policy, callback) .await } /// Fetch events pub async fn fetch_events( &self, - filters: Vec, + filter: Filter, timeout: Duration, policy: ReqExitPolicy, ) -> Result { - let mut events: Events = Events::new(&filters); - self.fetch_events_with_callback(filters, timeout, policy, |event| { + let mut events: Events = Events::new(&filter); + self.fetch_events_with_callback(filter, timeout, policy, |event| { events.insert(event); }) .await?; @@ -607,13 +602,9 @@ impl Relay { } /// Count events - pub async fn count_events( - &self, - filters: Vec, - timeout: Duration, - ) -> Result { + pub async fn count_events(&self, filter: Filter, timeout: Duration) -> Result { let id = SubscriptionId::generate(); - self.send_msg(ClientMessage::count(id.clone(), filters))?; + self.send_msg(ClientMessage::count(id.clone(), filter))?; let mut count = 0; @@ -661,20 +652,6 @@ impl Relay { filter: Filter, items: Vec<(EventId, Timestamp)>, opts: &SyncOptions, - ) -> Result { - // Compose map - let mut map = HashMap::with_capacity(1); - map.insert(filter, items); - - // Reconcile - self.sync_multi(map, opts).await - } - - /// Sync events with relays (negentropy reconciliation) - pub async fn sync_multi( - &self, - map: HashMap>, - opts: &SyncOptions, ) -> Result { // Perform health checks self.inner.health_check()?; @@ -686,23 +663,21 @@ impl Relay { let mut output: Reconciliation = Reconciliation::default(); - for (filter, items) in map.into_iter() { - match self - .inner - .sync_new(filter.clone(), items.clone(), opts, &mut output) - .await - { - Ok(..) => {} - Err(e) => match e { - Error::NegentropyNotSupported - | Error::Negentropy(negentropy::Error::UnsupportedProtocolVersion) => { - self.inner - .sync_deprecated(filter, items, opts, &mut output) - .await?; - } - e => return Err(e), - }, - } + match self + .inner + .sync_new(filter.clone(), items.clone(), opts, &mut output) + .await + { + Ok(..) => {} + Err(e) => match e { + Error::NegentropyNotSupported + | Error::Negentropy(negentropy::Error::UnsupportedProtocolVersion) => { + self.inner + .sync_deprecated(filter, items, opts, &mut output) + .await?; + } + e => return Err(e), + }, } Ok(output) @@ -1111,7 +1086,7 @@ mod tests { // Unauthenticated fetch (MUST return error) let err = relay .fetch_events( - vec![filter.clone()], + filter.clone(), Duration::from_secs(5), ReqExitPolicy::ExitOnEOSE, ) @@ -1133,7 +1108,7 @@ mod tests { // Unauthenticated fetch (MUST return error) let err = relay .fetch_events( - vec![filter.clone()], + filter.clone(), Duration::from_secs(5), ReqExitPolicy::ExitOnEOSE, ) @@ -1146,11 +1121,7 @@ mod tests { // Authenticated fetch let res = relay - .fetch_events( - vec![filter], - Duration::from_secs(5), - ReqExitPolicy::ExitOnEOSE, - ) + .fetch_events(filter, Duration::from_secs(5), ReqExitPolicy::ExitOnEOSE) .await; assert!(res.is_ok()); } diff --git a/crates/nostr-sdk/examples/blacklist.rs b/crates/nostr-sdk/examples/blacklist.rs index bcea344b7..eba537bca 100644 --- a/crates/nostr-sdk/examples/blacklist.rs +++ b/crates/nostr-sdk/examples/blacklist.rs @@ -27,9 +27,7 @@ async fn main() -> Result<()> { let filter = Filter::new() .authors([muted_public_key, public_key]) .kind(Kind::Metadata); - let events = client - .fetch_events(vec![filter], Duration::from_secs(10)) - .await?; + let events = client.fetch_events(filter, Duration::from_secs(10)).await?; println!("Received {} events.", events.len()); Ok(()) diff --git a/crates/nostr-sdk/examples/bot.rs b/crates/nostr-sdk/examples/bot.rs index 67f8ef9b0..b6083355b 100644 --- a/crates/nostr-sdk/examples/bot.rs +++ b/crates/nostr-sdk/examples/bot.rs @@ -36,7 +36,7 @@ async fn main() -> Result<()> { .kind(Kind::GiftWrap) .limit(0); // Limit set to 0 to get only new events! Timestamp::now() CAN'T be used for gift wrap since the timestamps are tweaked! - client.subscribe(vec![subscription], None).await?; + client.subscribe(subscription, None).await?; client .handle_notifications(|notification| async { diff --git a/crates/nostr-sdk/examples/comment.rs b/crates/nostr-sdk/examples/comment.rs index 056d30681..e13337036 100644 --- a/crates/nostr-sdk/examples/comment.rs +++ b/crates/nostr-sdk/examples/comment.rs @@ -21,7 +21,7 @@ async fn main() -> Result<()> { let event_id = EventId::from_bech32("note1hrrgx2309my3wgeecx2tt6fl2nl8hcwl0myr3xvkcqpnq24pxg2q06armr")?; let events = client - .fetch_events(vec![Filter::new().id(event_id)], Duration::from_secs(10)) + .fetch_events(Filter::new().id(event_id), Duration::from_secs(10)) .await?; let comment_to = events.first().unwrap(); diff --git a/crates/nostr-sdk/examples/fetch-events.rs b/crates/nostr-sdk/examples/fetch-events.rs index 6ebdad468..9e5bdfbb4 100644 --- a/crates/nostr-sdk/examples/fetch-events.rs +++ b/crates/nostr-sdk/examples/fetch-events.rs @@ -20,9 +20,7 @@ async fn main() -> Result<()> { client.connect().await; let filter = Filter::new().author(public_key).kind(Kind::Metadata); - let events = client - .fetch_events(vec![filter], Duration::from_secs(10)) - .await?; + let events = client.fetch_events(filter, Duration::from_secs(10)).await?; println!("{events:#?}"); let filter = Filter::new() @@ -32,7 +30,7 @@ async fn main() -> Result<()> { let events = client .fetch_events_from( ["wss://relay.damus.io", "wss://relay.rip"], - vec![filter], + filter, Duration::from_secs(10), ) .await?; diff --git a/crates/nostr-sdk/examples/gossip.rs b/crates/nostr-sdk/examples/gossip.rs index a74e6ae3f..39f5a25cb 100644 --- a/crates/nostr-sdk/examples/gossip.rs +++ b/crates/nostr-sdk/examples/gossip.rs @@ -46,9 +46,7 @@ async fn main() -> Result<()> { // Get events let filter = Filter::new().author(pubkey).kind(Kind::TextNote).limit(3); - let events = client - .fetch_events(vec![filter], Duration::from_secs(10)) - .await?; + let events = client.fetch_events(filter, Duration::from_secs(10)).await?; for event in events.into_iter() { println!("{}", event.as_json()); diff --git a/crates/nostr-sdk/examples/stream-events.rs b/crates/nostr-sdk/examples/stream-events.rs index c313e38a8..72b77f9e7 100644 --- a/crates/nostr-sdk/examples/stream-events.rs +++ b/crates/nostr-sdk/examples/stream-events.rs @@ -19,7 +19,7 @@ async fn main() -> Result<()> { // Stream events from all connected relays let filter = Filter::new().kind(Kind::TextNote).limit(100); let mut stream = client - .stream_events(vec![filter], Duration::from_secs(15)) + .stream_events(filter, Duration::from_secs(15)) .await?; while let Some(event) = stream.next().await { diff --git a/crates/nostr-sdk/examples/subscriptions.rs b/crates/nostr-sdk/examples/subscriptions.rs index 4a91a0a71..374f8dda6 100644 --- a/crates/nostr-sdk/examples/subscriptions.rs +++ b/crates/nostr-sdk/examples/subscriptions.rs @@ -25,7 +25,7 @@ async fn main() -> Result<()> { .since(Timestamp::now()); // Subscribe (auto generate subscription ID) - let Output { val: sub_id_1, .. } = client.subscribe(vec![subscription], None).await?; + let Output { val: sub_id_1, .. } = client.subscribe(subscription, None).await?; // Subscribe with custom ID let sub_id_2 = SubscriptionId::new("other-id"); @@ -34,7 +34,7 @@ async fn main() -> Result<()> { .kind(Kind::TextNote) .since(Timestamp::now()); client - .subscribe_with_id(sub_id_2.clone(), vec![filter], None) + .subscribe_with_id(sub_id_2.clone(), filter, None) .await?; // Overwrite previous subscription @@ -43,7 +43,7 @@ async fn main() -> Result<()> { .kind(Kind::EncryptedDirectMessage) .since(Timestamp::now()); client - .subscribe_with_id(sub_id_1.clone(), vec![filter], None) + .subscribe_with_id(sub_id_1.clone(), filter, None) .await?; // Handle subscription notifications with `handle_notifications` method diff --git a/crates/nostr-sdk/examples/switch-account.rs b/crates/nostr-sdk/examples/switch-account.rs index 441a72c55..d00747e2a 100644 --- a/crates/nostr-sdk/examples/switch-account.rs +++ b/crates/nostr-sdk/examples/switch-account.rs @@ -22,7 +22,7 @@ async fn main() -> Result<()> { .author(keys1.public_key) .kind(Kind::TextNote) .limit(10); - client.subscribe(vec![filter], None).await?; + client.subscribe(filter, None).await?; // Wait a little tokio::time::sleep(Duration::from_secs(20)).await; @@ -46,7 +46,7 @@ async fn main() -> Result<()> { .author(keys2.public_key) .kind(Kind::TextNote) .limit(5); - client.subscribe(vec![filter], None).await?; + client.subscribe(filter, None).await?; client .handle_notifications(|notification| async move { diff --git a/crates/nostr-sdk/examples/whitelist.rs b/crates/nostr-sdk/examples/whitelist.rs index 112677852..cc7e3079b 100644 --- a/crates/nostr-sdk/examples/whitelist.rs +++ b/crates/nostr-sdk/examples/whitelist.rs @@ -29,9 +29,7 @@ async fn main() -> Result<()> { let filter = Filter::new() .authors([allowed_public_key, not_in_whitelist_public_key]) .kind(Kind::Metadata); - let events = client - .fetch_events(vec![filter], Duration::from_secs(10)) - .await?; + let events = client.fetch_events(filter, Duration::from_secs(10)).await?; println!("Received {} events.", events.len()); for event in events.into_iter() { diff --git a/crates/nostr-sdk/src/client/mod.rs b/crates/nostr-sdk/src/client/mod.rs index ab280c3e7..2108dcd9f 100644 --- a/crates/nostr-sdk/src/client/mod.rs +++ b/crates/nostr-sdk/src/client/mod.rs @@ -4,7 +4,7 @@ //! Client -use std::collections::{BTreeSet, HashMap, HashSet}; +use std::collections::{HashMap, HashSet}; use std::future::Future; use std::iter; use std::sync::Arc; @@ -25,7 +25,7 @@ pub use self::error::Error; pub use self::options::Options; #[cfg(not(target_arch = "wasm32"))] pub use self::options::{Connection, ConnectionTarget}; -use crate::gossip::graph::GossipGraph; +use crate::gossip::graph::{BrokenDownFilters, GossipGraph}; /// Nostr client #[derive(Debug, Clone)] @@ -498,13 +498,13 @@ impl Client { /// Get pool subscriptions #[inline] - pub async fn subscriptions(&self) -> HashMap> { + pub async fn subscriptions(&self) -> HashMap { self.pool.subscriptions().await } /// Get pool subscription #[inline] - pub async fn subscription(&self, id: &SubscriptionId) -> Option> { + pub async fn subscription(&self, id: &SubscriptionId) -> Option { self.pool.subscription(id).await } @@ -536,25 +536,25 @@ impl Client { /// .since(Timestamp::now()); /// /// // Subscribe - /// let output = client.subscribe(vec![subscription], None).await?; + /// let output = client.subscribe(subscription, None).await?; /// println!("Subscription ID: {}", output.val); /// /// // Auto-closing subscription /// let id = SubscriptionId::generate(); /// let subscription = Filter::new().kind(Kind::TextNote).limit(10); /// let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE); - /// let output = client.subscribe(vec![subscription], Some(opts)).await?; + /// let output = client.subscribe(subscription, Some(opts)).await?; /// println!("Subscription ID: {} [auto-closing]", output.val); /// # Ok(()) /// # } /// ``` pub async fn subscribe( &self, - filters: Vec, + filter: Filter, opts: Option, ) -> Result, Error> { let id: SubscriptionId = SubscriptionId::generate(); - let output: Output<()> = self.subscribe_with_id(id.clone(), filters, opts).await?; + let output: Output<()> = self.subscribe_with_id(id.clone(), filter, opts).await?; Ok(Output { val: id, success: output.success, @@ -575,15 +575,15 @@ impl Client { pub async fn subscribe_with_id( &self, id: SubscriptionId, - filters: Vec, + filter: Filter, opts: Option, ) -> Result, Error> { let opts: SubscribeOptions = SubscribeOptions::default().close_on(opts); if self.opts.gossip { - self.gossip_subscribe(id, filters, opts).await + self.gossip_subscribe(id, filter, opts).await } else { - Ok(self.pool.subscribe_with_id(id, filters, opts).await?) + Ok(self.pool.subscribe_with_id(id, filter, opts).await?) } } @@ -599,7 +599,7 @@ impl Client { pub async fn subscribe_to( &self, urls: I, - filters: Vec, + filter: Filter, opts: Option, ) -> Result, Error> where @@ -608,10 +608,10 @@ impl Client { pool::Error: From<::Err>, { let opts: SubscribeOptions = SubscribeOptions::default().close_on(opts); - Ok(self.pool.subscribe_to(urls, filters, opts).await?) + Ok(self.pool.subscribe_to(urls, filter, opts).await?) } - /// Subscribe to filters with custom [SubscriptionId] to specific relays + /// Subscribe to filter with custom [SubscriptionId] to specific relays /// /// ### Auto-closing subscription /// @@ -621,7 +621,7 @@ impl Client { &self, urls: I, id: SubscriptionId, - filters: Vec, + filter: Filter, opts: Option, ) -> Result, Error> where @@ -632,7 +632,7 @@ impl Client { let opts: SubscribeOptions = SubscribeOptions::default().close_on(opts); Ok(self .pool - .subscribe_with_id_to(urls, id, filters, opts) + .subscribe_with_id_to(urls, id, filter, opts) .await?) } @@ -647,7 +647,7 @@ impl Client { opts: SubscribeOptions, ) -> Result, Error> where - I: IntoIterator)>, + I: IntoIterator, U: TryIntoUrl, pool::Error: From<::Err>, { @@ -720,23 +720,19 @@ impl Client { /// .since(Timestamp::now()); /// /// let _events = client - /// .fetch_events(vec![subscription], Duration::from_secs(10)) + /// .fetch_events(subscription, Duration::from_secs(10)) /// .await /// .unwrap(); /// # } /// ``` - pub async fn fetch_events( - &self, - filters: Vec, - timeout: Duration, - ) -> Result { + pub async fn fetch_events(&self, filter: Filter, timeout: Duration) -> Result { if self.opts.gossip { - return self.gossip_fetch_events(filters, timeout).await; + return self.gossip_fetch_events(filter, timeout).await; } Ok(self .pool - .fetch_events(filters, timeout, ReqExitPolicy::default()) + .fetch_events(filter, timeout, ReqExitPolicy::default()) .await?) } @@ -745,7 +741,7 @@ impl Client { pub async fn fetch_events_from( &self, urls: I, - filters: Vec, + filter: Filter, timeout: Duration, ) -> Result where @@ -755,7 +751,7 @@ impl Client { { Ok(self .pool - .fetch_events_from(urls, filters, timeout, ReqExitPolicy::default()) + .fetch_events_from(urls, filter, timeout, ReqExitPolicy::default()) .await?) } @@ -778,14 +774,12 @@ impl Client { /// # #[tokio::main] /// # async fn main() -> Result<()> { /// # let client = Client::default(); - /// # let filters = vec![Filter::new().limit(1)]; + /// # let filter = Filter::new().limit(1); /// // Query database - /// let stored_events: Events = client.database().query(filters.clone()).await?; + /// let stored_events: Events = client.database().query(filter.clone()).await?; /// /// // Query relays - /// let fetched_events: Events = client - /// .fetch_events(filters, Duration::from_secs(10)) - /// .await?; + /// let fetched_events: Events = client.fetch_events(filter, Duration::from_secs(10)).await?; /// /// // Merge result /// let events: Events = stored_events.merge(fetched_events); @@ -799,14 +793,14 @@ impl Client { /// ``` pub async fn fetch_combined_events( &self, - filters: Vec, + filter: Filter, timeout: Duration, ) -> Result { // Query database - let stored_events: Events = self.database().query(filters.clone()).await?; + let stored_events: Events = self.database().query(filter.clone()).await?; // Query relays - let fetched_events: Events = self.fetch_events(filters, timeout).await?; + let fetched_events: Events = self.fetch_events(filter, timeout).await?; // Merge result Ok(stored_events.merge(fetched_events)) @@ -818,16 +812,16 @@ impl Client { /// NIP65 relays (automatically discovered) of public keys included in filters (if any). pub async fn stream_events( &self, - filters: Vec, + filter: Filter, timeout: Duration, ) -> Result, Error> { // Check if gossip is enabled if self.opts.gossip { - self.gossip_stream_events(filters, timeout).await + self.gossip_stream_events(filter, timeout).await } else { Ok(self .pool - .stream_events(filters, timeout, ReqExitPolicy::default()) + .stream_events(filter, timeout, ReqExitPolicy::default()) .await?) } } @@ -837,7 +831,7 @@ impl Client { pub async fn stream_events_from( &self, urls: I, - filters: Vec, + filter: Filter, timeout: Duration, ) -> Result, Error> where @@ -847,7 +841,7 @@ impl Client { { Ok(self .pool - .stream_events_from(urls, filters, timeout, ReqExitPolicy::default()) + .stream_events_from(urls, filter, timeout, ReqExitPolicy::default()) .await?) } @@ -856,7 +850,7 @@ impl Client { /// Stream events from specific relays with specific filters pub async fn stream_events_targeted( &self, - targets: HashMap>, + targets: HashMap, timeout: Duration, ) -> Result, Error> { Ok(self @@ -865,7 +859,7 @@ impl Client { .await?) } - /// Send client message to a **specific relays** + /// Send the client message to a **specific relays** #[inline] pub async fn send_msg_to(&self, urls: I, msg: ClientMessage) -> Result, Error> where @@ -969,7 +963,7 @@ impl Client { .author(public_key) .kind(Kind::Metadata) .limit(1); - let events: Events = self.fetch_events(vec![filter], timeout).await?; + let events: Events = self.fetch_events(filter, timeout).await?; match events.first() { Some(event) => Ok(Metadata::try_from(event)?), None => Err(Error::MetadataNotFound), @@ -1005,14 +999,14 @@ impl Client { self.send_event_builder(builder).await } - async fn get_contact_list_filters(&self) -> Result, Error> { + async fn get_contact_list_filter(&self) -> Result { let signer = self.signer().await?; let public_key = signer.get_public_key().await?; let filter: Filter = Filter::new() .author(public_key) .kind(Kind::ContactList) .limit(1); - Ok(vec![filter]) + Ok(filter) } /// Get the contact list from relays. @@ -1022,8 +1016,8 @@ impl Client { /// pub async fn get_contact_list(&self, timeout: Duration) -> Result, Error> { let mut contact_list: Vec = Vec::new(); - let filters: Vec = self.get_contact_list_filters().await?; - let events: Events = self.fetch_events(filters, timeout).await?; + let filter: Filter = self.get_contact_list_filter().await?; + let events: Events = self.fetch_events(filter, timeout).await?; // Get first event (result of `fetch_events` is sorted DESC by timestamp) if let Some(event) = events.first_owned() { @@ -1053,8 +1047,8 @@ impl Client { timeout: Duration, ) -> Result, Error> { let mut pubkeys: Vec = Vec::new(); - let filters: Vec = self.get_contact_list_filters().await?; - let events: Events = self.fetch_events(filters, timeout).await?; + let filter: Filter = self.get_contact_list_filter().await?; + let events: Events = self.fetch_events(filter, timeout).await?; for event in events.into_iter() { pubkeys.extend(event.tags.public_keys()); @@ -1076,16 +1070,10 @@ impl Client { let chunk_size: usize = self.opts.req_filters_chunk_size as usize; for chunk in public_keys.chunks(chunk_size) { - let mut filters: Vec = Vec::new(); - for public_key in chunk.iter() { - filters.push( - Filter::new() - .author(*public_key) - .kind(Kind::Metadata) - .limit(1), - ); - } - let events: Events = self.fetch_events(filters, timeout).await?; + let filter: Filter = Filter::new() + .authors(chunk.iter().copied()) + .kind(Kind::Metadata); + let events: Events = self.fetch_events(filter, timeout).await?; for event in events.into_iter() { let metadata = Metadata::from_json(&event.content)?; if let Some(m) = contacts.get_mut(&event.pubkey) { @@ -1256,8 +1244,7 @@ impl Client { .kinds([Kind::RelayList, Kind::InboxRelays]); // Query from database - let database = self.database(); - let stored_events: Events = database.query(vec![filter.clone()]).await?; + let stored_events: Events = self.database().query(filter.clone()).await?; // Get DISCOVERY and READ relays // TODO: avoid clone of both url and relay @@ -1272,7 +1259,7 @@ impl Client { // Get events from discovery and read relays let events: Events = self - .fetch_events_from(relays, vec![filter], Duration::from_secs(10)) + .fetch_events_from(relays, filter, Duration::from_secs(10)) .await?; // Update last check for these public keys @@ -1291,12 +1278,9 @@ impl Client { } /// Break down filters for gossip and discovery relays - async fn break_down_filters( - &self, - filters: Vec, - ) -> Result>, Error> { + async fn break_down_filter(&self, filter: Filter) -> Result, Error> { // Extract all public keys from filters - let public_keys = filters.iter().flat_map(|f| f.extract_public_keys()); + let public_keys = filter.extract_public_keys(); // Check outdated ones let outdated_public_keys = self.gossip_graph.check_outdated(public_keys).await; @@ -1305,85 +1289,39 @@ impl Client { self.update_outdated_gossip_graph(outdated_public_keys) .await?; - // Broken down filters - let broken_down = self.gossip_graph.break_down_filters(filters).await; - - let mut filters: HashMap> = broken_down.filters; - - // Get read relays - let read_relays = self - .pool - .relays_with_flag(RelayServiceFlags::READ, FlagCheck::All) - .await; - - match (broken_down.orphans, broken_down.others) { - (Some(orphans), Some(others)) => { - for url in read_relays.into_keys() { - filters - .entry(url) - .and_modify(|f| { - f.extend(orphans.clone()); - f.extend(others.clone()); - }) - .or_insert_with(|| { - let mut new = BTreeSet::new(); - new.extend(orphans.clone()); - new.extend(others.clone()); - new - }); - } - } - (Some(orphans), None) => { - for url in read_relays.into_keys() { - filters - .entry(url) - .and_modify(|f| { - f.extend(orphans.clone()); - }) - .or_insert_with(|| { - let mut new = BTreeSet::new(); - new.extend(orphans.clone()); - new - }); - } - } - (None, Some(others)) => { - // Extend filters with read relays and "other" filters (the filters that aren't linked to public keys) - for url in read_relays.into_keys() { - filters - .entry(url) - .and_modify(|f| { - f.extend(others.clone()); - }) - .or_insert_with(|| { - let mut new = BTreeSet::new(); - new.extend(others.clone()); - new - }); + // Broken-down filters + let filters: HashMap = + match self.gossip_graph.break_down_filter(filter).await { + BrokenDownFilters::Filters(filters) => filters, + BrokenDownFilters::Orphan(filter) | BrokenDownFilters::Other(filter) => { + // Get read relays + let read_relays = self + .pool + .relays_with_flag(RelayServiceFlags::READ, FlagCheck::All) + .await; + + let mut map = HashMap::with_capacity(read_relays.len()); + for url in read_relays.into_keys() { + map.insert(url, filter.clone()); + } + map } - } - (None, None) => { - // Nothing to do - } - } + }; - // Add outbox and inbox relays - for url in broken_down.urls.into_iter() { - if self.add_gossip_relay(&url).await? { + // Add gossip (outbox and inbox) relays + for url in filters.keys() { + if self.add_gossip_relay(url).await? { self.connect_relay(url).await?; } } // Check if filters are empty + // TODO: this can't be empty, right? if filters.is_empty() { return Err(Error::GossipFiltersEmpty); } - // Convert btree filters to vec - Ok(filters - .into_iter() - .map(|(u, f)| (u, f.into_iter().collect())) - .collect()) + Ok(filters) } async fn gossip_send_event(&self, event: Event, nip17: bool) -> Result, Error> { @@ -1458,10 +1396,10 @@ impl Client { async fn gossip_stream_events( &self, - filters: Vec, + filter: Filter, timeout: Duration, ) -> Result, Error> { - let filters = self.break_down_filters(filters).await?; + let filters = self.break_down_filter(filter).await?; // Stream events let stream: ReceiverStream = self @@ -1474,13 +1412,13 @@ impl Client { async fn gossip_fetch_events( &self, - filters: Vec, + filter: Filter, timeout: Duration, ) -> Result { - let mut events: Events = Events::new(&filters); + let mut events: Events = Events::new(&filter); // Stream events - let mut stream: ReceiverStream = self.gossip_stream_events(filters, timeout).await?; + let mut stream: ReceiverStream = self.gossip_stream_events(filter, timeout).await?; while let Some(event) = stream.next().await { events.insert(event); @@ -1492,10 +1430,10 @@ impl Client { async fn gossip_subscribe( &self, id: SubscriptionId, - filters: Vec, + filter: Filter, opts: SubscribeOptions, ) -> Result, Error> { - let filters = self.break_down_filters(filters).await?; + let filters = self.break_down_filter(filter).await?; Ok(self.pool.subscribe_targeted(id, filters, opts).await?) } @@ -1505,26 +1443,19 @@ impl Client { opts: &SyncOptions, ) -> Result, Error> { // Break down filter - let temp_filters = self.break_down_filters(vec![filter]).await?; + let temp_filters = self.break_down_filter(filter).await?; let database = self.database(); - let mut filters = HashMap::with_capacity(temp_filters.len()); + let mut filters: HashMap)> = + HashMap::with_capacity(temp_filters.len()); // Iterate broken down filters and compose new filters for targeted reconciliation - for (url, value) in temp_filters.into_iter() { - let mut map = HashMap::with_capacity(value.len()); - - // Iterate per-url filters and get items - for filter in value.into_iter() { - // Get items - let items: Vec<(EventId, Timestamp)> = - database.negentropy_items(filter.clone()).await?; - - // Add filter and items to map - map.insert(filter, items); - } + for (url, filter) in temp_filters.into_iter() { + // Get items + let items: Vec<(EventId, Timestamp)> = + database.negentropy_items(filter.clone()).await?; - filters.insert(url, map); + filters.insert(url, (filter, items)); } // Reconciliation diff --git a/crates/nostr-sdk/src/gossip/graph.rs b/crates/nostr-sdk/src/gossip/graph.rs index e3d1b0b72..808d871d7 100644 --- a/crates/nostr-sdk/src/gossip/graph.rs +++ b/crates/nostr-sdk/src/gossip/graph.rs @@ -13,16 +13,13 @@ use super::constant::{CHECK_OUTDATED_INTERVAL, MAX_RELAYS_LIST, PUBKEY_METADATA_ const P_TAG: SingleLetterTag = SingleLetterTag::lowercase(Alphabet::P); #[derive(Debug)] -pub struct BrokenDownFilters { +pub enum BrokenDownFilters { /// Filters by url - pub filters: HashMap>, + Filters(HashMap), /// Filters that match a certain pattern but where no relays are available - pub orphans: Option>, + Orphan(Filter), /// Filters that can be sent to read relays (generic query, not related to public keys) - pub others: Option>, - /// All inbox and outbox relays - // TODO: remove? - pub urls: HashSet, + Other(Filter), } #[derive(Debug, Clone, Default)] @@ -353,137 +350,98 @@ impl GossipGraph { self.map_nip65_relays(txn, public_keys, RelayMetadata::Read) } - pub async fn break_down_filters(&self, filters: I) -> BrokenDownFilters - where - I: IntoIterator, - { - let mut map: HashMap> = HashMap::new(); - let mut orphans: BTreeSet = BTreeSet::new(); - let mut others: BTreeSet = BTreeSet::new(); - let mut urls: HashSet = HashSet::new(); - + pub async fn break_down_filter(&self, filter: Filter) -> BrokenDownFilters { let txn = self.public_keys.read().await; - for filter in filters.into_iter() { - // Extract `p` tag from generic tags and parse public key hex - let p_tag: Option> = filter.generic_tags.get(&P_TAG).map(|s| { - s.iter() - .filter_map(|p| PublicKey::from_hex(p).ok()) - .collect() - }); - - // Match pattern - match (&filter.authors, &p_tag) { - (Some(authors), None) => { - // Get map of outbox relays - let mut outbox = self.map_nip65_outbox_relays(&txn, authors); - - // Extend with NIP17 relays - outbox.extend(self.map_nip17_relays(&txn, authors)); - - // No relay available for the authors - if outbox.is_empty() { - orphans.insert(filter.clone()); - continue; - } + // Extract `p` tag from generic tags and parse public key hex + let p_tag: Option> = filter.generic_tags.get(&P_TAG).map(|s| { + s.iter() + .filter_map(|p| PublicKey::from_hex(p).ok()) + .collect() + }); + + // Match pattern + match (&filter.authors, &p_tag) { + (Some(authors), None) => { + // Get map of outbox relays + let mut outbox: HashMap> = + self.map_nip65_outbox_relays(&txn, authors); + + // Extend with NIP17 relays + outbox.extend(self.map_nip17_relays(&txn, authors)); + + // No relay available for the authors + if outbox.is_empty() { + return BrokenDownFilters::Orphan(filter); + } - // Construct new filters - for (relay, pk_set) in outbox.into_iter() { - urls.insert(relay.clone()); + let mut map: HashMap = HashMap::with_capacity(outbox.len()); - // Clone filter and change authors - let mut new_filter: Filter = filter.clone(); - new_filter.authors = Some(pk_set); + // Construct new filters + for (relay, pk_set) in outbox.into_iter() { + // Clone filter and change authors + let mut new_filter: Filter = filter.clone(); + new_filter.authors = Some(pk_set); - // Update map - map.entry(relay) - .and_modify(|f| { - f.insert(new_filter.clone()); - }) - .or_default() - .insert(new_filter); - } + // Update map + map.insert(relay, new_filter); } - (None, Some(p_public_keys)) => { - // Get map of inbox relays - let mut inbox = self.map_nip65_inbox_relays(&txn, p_public_keys); - // Extend with NIP17 relays - inbox.extend(self.map_nip17_relays(&txn, p_public_keys)); + BrokenDownFilters::Filters(map) + } + (None, Some(p_public_keys)) => { + // Get map of inbox relays + let mut inbox: HashMap> = + self.map_nip65_inbox_relays(&txn, p_public_keys); - // No relay available for the p tags - if inbox.is_empty() { - orphans.insert(filter.clone()); - continue; - } + // Extend with NIP17 relays + inbox.extend(self.map_nip17_relays(&txn, p_public_keys)); - // Construct new filters - for (relay, pk_set) in inbox.into_iter() { - urls.insert(relay.clone()); + // No relay available for the p tags + if inbox.is_empty() { + return BrokenDownFilters::Orphan(filter); + } - // Clone filter and change p tags - let mut new_filter: Filter = filter.clone(); - new_filter - .generic_tags - .insert(P_TAG, pk_set.into_iter().map(|p| p.to_string()).collect()); + let mut map: HashMap = HashMap::with_capacity(inbox.len()); - // Update map - map.entry(relay) - .and_modify(|f| { - f.insert(new_filter.clone()); - }) - .or_default() - .insert(new_filter); - } - } - (Some(authors), Some(p_public_keys)) => { - // Get map of outbox and inbox relays - let mut relays = - self.get_nip65_relays(&txn, authors.union(p_public_keys), None); + // Construct new filters + for (relay, pk_set) in inbox.into_iter() { + // Clone filter and change p tags + let mut new_filter: Filter = filter.clone(); + new_filter + .generic_tags + .insert(P_TAG, pk_set.into_iter().map(|p| p.to_string()).collect()); - // Extend with NIP17 relays - relays.extend(self.get_nip17_relays(&txn, authors.union(p_public_keys))); + // Update map + map.insert(relay, new_filter); + } - // No relay available for the authors and p tags - if relays.is_empty() { - orphans.insert(filter.clone()); - continue; - } + BrokenDownFilters::Filters(map) + } + (Some(authors), Some(p_public_keys)) => { + // Get map of outbox and inbox relays + let mut relays: HashSet = + self.get_nip65_relays(&txn, authors.union(p_public_keys), None); - for relay in relays.into_iter() { - urls.insert(relay.clone()); + // Extend with NIP17 relays + relays.extend(self.get_nip17_relays(&txn, authors.union(p_public_keys))); - // Update map - map.entry(relay) - .and_modify(|f| { - f.insert(filter.clone()); - }) - .or_default() - .insert(filter.clone()); - } + // No relay available for the authors and p tags + if relays.is_empty() { + return BrokenDownFilters::Orphan(filter); } - // Nothing to do, add to `other` list - (None, None) => { - others.insert(filter); + + let mut map: HashMap = HashMap::with_capacity(relays.len()); + + for relay in relays.into_iter() { + // Update map + map.insert(relay, filter.clone()); } - } - } - tracing::debug!(gossip = %map.len(), orphans = %orphans.len(), others = %others.len(), "Broken down filters:"); - - BrokenDownFilters { - filters: map, - orphans: if orphans.is_empty() { - None - } else { - Some(orphans) - }, - others: if others.is_empty() { - None - } else { - Some(others) - }, - urls, + BrokenDownFilters::Filters(map) + } + // Nothing to do, add to `other` list + (None, None) => BrokenDownFilters::Other(filter), } } } @@ -509,18 +467,6 @@ mod tests { ("wss://relay.snort.social", Some(RelayMetadata::Read)), ]; - macro_rules! btreeset { - ($( $x:expr ),* $(,)?) => { - { - let mut set = BTreeSet::new(); - $( - set.insert($x); - )* - set - } - }; - } - fn build_relay_list_event( secret_key: &str, relays: Vec<(&str, Option)>, @@ -547,7 +493,7 @@ mod tests { } #[tokio::test] - async fn test_break_down_filters() { + async fn test_break_down_filter() { let keys_a = Keys::parse(SECRET_KEY_A).unwrap(); let keys_b = Keys::parse(SECRET_KEY_B).unwrap(); @@ -561,113 +507,94 @@ mod tests { let graph = setup_graph().await; - // Single filter, single author - let filters = btreeset![Filter::new().author(keys_a.public_key)]; - let broken_down = graph.break_down_filters(filters.clone()).await; - - assert_eq!(broken_down.filters.get(&damus_url).unwrap(), &filters); - assert_eq!(broken_down.filters.get(&nostr_bg_url).unwrap(), &filters); - assert_eq!(broken_down.filters.get(&nos_lol_url).unwrap(), &filters); - assert!(!broken_down.filters.contains_key(&nostr_mom_url)); - assert!(broken_down.orphans.is_none()); - assert!(broken_down.others.is_none()); + // Single author + let filter = Filter::new().author(keys_a.public_key); + match graph.break_down_filter(filter.clone()).await { + BrokenDownFilters::Filters(map) => { + assert_eq!(map.get(&damus_url).unwrap(), &filter); + assert_eq!(map.get(&nostr_bg_url).unwrap(), &filter); + assert_eq!(map.get(&nos_lol_url).unwrap(), &filter); + assert!(!map.contains_key(&nostr_mom_url)); + } + _ => panic!("Expected filters"), + } - // Multiple filters, multiple authors + // Multiple authors let authors_filter = Filter::new().authors([keys_a.public_key, keys_b.public_key]); + match graph.break_down_filter(authors_filter.clone()).await { + BrokenDownFilters::Filters(map) => { + assert_eq!(map.get(&damus_url).unwrap(), &authors_filter); + assert_eq!( + map.get(&nostr_bg_url).unwrap(), + &Filter::new().author(keys_a.public_key) + ); + assert_eq!( + map.get(&nos_lol_url).unwrap(), + &Filter::new().author(keys_a.public_key) + ); + assert!(!map.contains_key(&nostr_mom_url)); + assert_eq!( + map.get(&nostr_info_url).unwrap(), + &Filter::new().author(keys_b.public_key) + ); + assert_eq!( + map.get(&relay_rip_url).unwrap(), + &Filter::new().author(keys_b.public_key) + ); + assert!(!map.contains_key(&snort_url)); + } + _ => panic!("Expected filters"), + } + + // Other filter let search_filter = Filter::new().search("Test").limit(10); - let filters = btreeset![authors_filter.clone(), search_filter.clone()]; - let broken_down = graph.break_down_filters(filters.clone()).await; - - assert_eq!( - broken_down.filters.get(&damus_url).unwrap(), - &btreeset![authors_filter] - ); - assert_eq!( - broken_down.filters.get(&nostr_bg_url).unwrap(), - &btreeset![Filter::new().author(keys_a.public_key)] - ); - assert_eq!( - broken_down.filters.get(&nos_lol_url).unwrap(), - &btreeset![Filter::new().author(keys_a.public_key)] - ); - assert!(!broken_down.filters.contains_key(&nostr_mom_url)); - assert_eq!( - broken_down.filters.get(&nostr_info_url).unwrap(), - &btreeset![Filter::new().author(keys_b.public_key)] - ); - assert_eq!( - broken_down.filters.get(&relay_rip_url).unwrap(), - &btreeset![Filter::new().author(keys_b.public_key)] - ); - assert!(!broken_down.filters.contains_key(&snort_url)); - assert!(broken_down.orphans.is_none()); - assert_eq!(broken_down.others, Some(btreeset![search_filter])); - - // Multiple filters, multiple authors and single p tags - let authors_filter = Filter::new().authors([keys_a.public_key, keys_b.public_key]); + match graph.break_down_filter(search_filter.clone()).await { + BrokenDownFilters::Other(filter) => { + assert_eq!(filter, search_filter); + } + _ => panic!("Expected other"), + } + + // Single p tags let p_tag_filter = Filter::new().pubkey(keys_a.public_key); - let search_filter = Filter::new().search("Test").limit(10); - let filters = btreeset![ - authors_filter.clone(), - p_tag_filter.clone(), - search_filter.clone(), - ]; - let broken_down = graph.break_down_filters(filters.clone()).await; - - assert_eq!( - broken_down.filters.get(&damus_url).unwrap(), - &btreeset![p_tag_filter.clone(), authors_filter] - ); - assert_eq!( - broken_down.filters.get(&nostr_bg_url).unwrap(), - &btreeset![ - p_tag_filter.clone(), - Filter::new().author(keys_a.public_key), - ] - ); - assert_eq!( - broken_down.filters.get(&nos_lol_url).unwrap(), - &btreeset![Filter::new().author(keys_a.public_key)] - ); - assert_eq!( - broken_down.filters.get(&nostr_mom_url).unwrap(), - &btreeset![p_tag_filter] - ); - assert_eq!( - broken_down.filters.get(&nostr_info_url).unwrap(), - &btreeset![Filter::new().author(keys_b.public_key)] - ); - assert_eq!( - broken_down.filters.get(&relay_rip_url).unwrap(), - &btreeset![Filter::new().author(keys_b.public_key)] - ); - assert!(!broken_down.filters.contains_key(&snort_url)); - assert!(broken_down.orphans.is_none()); - assert_eq!(broken_down.others, Some(btreeset![search_filter])); - - // Single filter, both author and p tag - let filters = btreeset![Filter::new() + match graph.break_down_filter(p_tag_filter.clone()).await { + BrokenDownFilters::Filters(map) => { + assert_eq!(map.get(&damus_url).unwrap(), &p_tag_filter); + assert_eq!(map.get(&nostr_bg_url).unwrap(), &p_tag_filter); + assert_eq!(map.get(&nostr_mom_url).unwrap(), &p_tag_filter); + assert!(!map.contains_key(&nos_lol_url)); + assert!(!map.contains_key(&nostr_info_url)); + assert!(!map.contains_key(&relay_rip_url)); + assert!(!map.contains_key(&snort_url)); + } + _ => panic!("Expected filters"), + } + + // Both author and p tag + let filter = Filter::new() .author(keys_a.public_key) - .pubkey(keys_b.public_key)]; - let broken_down = graph.break_down_filters(filters.clone()).await; - - assert_eq!(broken_down.filters.get(&damus_url).unwrap(), &filters); - assert_eq!(broken_down.filters.get(&nostr_bg_url).unwrap(), &filters); - assert_eq!(broken_down.filters.get(&nos_lol_url).unwrap(), &filters); - assert_eq!(broken_down.filters.get(&nostr_mom_url).unwrap(), &filters); - assert_eq!(broken_down.filters.get(&nostr_info_url).unwrap(), &filters); - assert_eq!(broken_down.filters.get(&relay_rip_url).unwrap(), &filters); - assert_eq!(broken_down.filters.get(&snort_url).unwrap(), &filters); - assert!(broken_down.orphans.is_none()); - assert!(broken_down.others.is_none()); + .pubkey(keys_b.public_key); + match graph.break_down_filter(filter.clone()).await { + BrokenDownFilters::Filters(map) => { + assert_eq!(map.get(&damus_url).unwrap(), &filter); + assert_eq!(map.get(&nostr_bg_url).unwrap(), &filter); + assert_eq!(map.get(&nos_lol_url).unwrap(), &filter); + assert_eq!(map.get(&nostr_mom_url).unwrap(), &filter); + assert_eq!(map.get(&nostr_info_url).unwrap(), &filter); + assert_eq!(map.get(&relay_rip_url).unwrap(), &filter); + assert_eq!(map.get(&snort_url).unwrap(), &filter); + } + _ => panic!("Expected filters"), + } // test orphan filters let random_keys = Keys::generate(); - let filters = btreeset![Filter::new().author(random_keys.public_key)]; - let broken_down = graph.break_down_filters(filters.clone()).await; - - assert!(broken_down.filters.is_empty()); - assert_eq!(broken_down.orphans, Some(filters.clone())); - assert!(broken_down.others.is_none()); + let filter = Filter::new().author(random_keys.public_key); + match graph.break_down_filter(filter.clone()).await { + BrokenDownFilters::Orphan(f) => { + assert_eq!(f, filter); + } + _ => panic!("Expected filters"), + } } } diff --git a/crates/nostr/src/message/client.rs b/crates/nostr/src/message/client.rs index a02fb8454..8d218c50f 100644 --- a/crates/nostr/src/message/client.rs +++ b/crates/nostr/src/message/client.rs @@ -7,7 +7,6 @@ use alloc::boxed::Box; use alloc::string::{String, ToString}; -use alloc::vec::Vec; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde_json::{json, Value}; @@ -24,8 +23,8 @@ pub enum ClientMessage { Req { /// Subscription ID subscription_id: SubscriptionId, - /// Filters - filters: Vec, + /// Filter + filter: Box, }, /// Count /// @@ -33,8 +32,8 @@ pub enum ClientMessage { Count { /// Subscription ID subscription_id: SubscriptionId, - /// Filters - filters: Vec, + /// Filter + filter: Box, }, /// Close Close(SubscriptionId), @@ -94,19 +93,19 @@ impl ClientMessage { /// Create `REQ` message #[inline] - pub fn req(subscription_id: SubscriptionId, filters: Vec) -> Self { + pub fn req(subscription_id: SubscriptionId, filter: Filter) -> Self { Self::Req { subscription_id, - filters, + filter: Box::new(filter), } } /// Create `COUNT` message #[inline] - pub fn count(subscription_id: SubscriptionId, filters: Vec) -> Self { + pub fn count(subscription_id: SubscriptionId, filter: Filter) -> Self { Self::Count { subscription_id, - filters, + filter: Box::new(filter), } } @@ -166,33 +165,15 @@ impl ClientMessage { Self::Event(event) => json!(["EVENT", event]), Self::Req { subscription_id, - filters, + filter, } => { - let mut json = json!(["REQ", subscription_id]); - let mut filters = json!(filters); - - if let Some(json) = json.as_array_mut() { - if let Some(filters) = filters.as_array_mut() { - json.append(filters); - } - } - - json + json!(["REQ", subscription_id, filter]) } Self::Count { subscription_id, - filters, + filter, } => { - let mut json = json!(["COUNT", subscription_id]); - let mut filters = json!(filters); - - if let Some(json) = json.as_array_mut() { - if let Some(filters) = filters.as_array_mut() { - json.append(filters); - } - } - - json + json!(["COUNT", subscription_id, filter]) } Self::Close(subscription_id) => json!(["CLOSE", subscription_id]), Self::Auth(event) => json!(["AUTH", event]), @@ -233,7 +214,6 @@ impl ClientMessage { let v_len: usize = v.len(); - // Event // ["EVENT", ] if v[0] == "EVENT" { if v_len >= 2 { @@ -244,30 +224,23 @@ impl ClientMessage { } } - // Req - // ["REQ", , , ...] + // ["REQ", , ] if v[0] == "REQ" { - if v_len == 2 { - let subscription_id: SubscriptionId = serde_json::from_value(v[1].clone())?; - return Ok(Self::req(subscription_id, Vec::new())); - } else if v_len >= 3 { + if v_len >= 3 { let subscription_id: SubscriptionId = serde_json::from_value(v[1].clone())?; - let filters: Vec = serde_json::from_value(Value::Array(v[2..].to_vec()))?; - return Ok(Self::req(subscription_id, filters)); + let filter: Filter = serde_json::from_value(v[2].clone())?; + return Ok(Self::req(subscription_id, filter)); } else { return Err(MessageHandleError::InvalidMessageFormat); } } - // ["COUNT", , , ...] + // ["COUNT", , ] if v[0] == "COUNT" { - if v_len == 2 { - let subscription_id: SubscriptionId = serde_json::from_value(v[1].clone())?; - return Ok(Self::count(subscription_id, Vec::new())); - } else if v_len >= 3 { + if v_len >= 3 { let subscription_id: SubscriptionId = serde_json::from_value(v[1].clone())?; - let filters: Vec = serde_json::from_value(Value::Array(v[2..].to_vec()))?; - return Ok(Self::count(subscription_id, filters)); + let filter: Filter = serde_json::from_value(v[2].clone())?; + return Ok(Self::count(subscription_id, filter)); } else { return Err(MessageHandleError::InvalidMessageFormat); } @@ -366,7 +339,7 @@ impl JsonUtil for ClientMessage { /// Deserialize [`ClientMessage`] from JSON string /// - /// **This method NOT verify the event signature!** + /// **This method doesn't verify the event signature!** fn from_json(json: T) -> Result where T: AsRef<[u8]>, @@ -394,32 +367,20 @@ mod tests { let pk = PublicKey::from_str("379e863e8357163b5bce5d2688dc4f1dcc2d505222fb8d74db600f30535dfdfe") .unwrap(); - let filters = vec![ - Filter::new().kind(Kind::EncryptedDirectMessage), - Filter::new().pubkey(pk), - ]; - let client_req = ClientMessage::req(SubscriptionId::new("test"), filters); + let client_req = ClientMessage::req(SubscriptionId::new("test"), Filter::new().pubkey(pk)); assert_eq!( client_req.as_json(), - r##"["REQ","test",{"kinds":[4]},{"#p":["379e863e8357163b5bce5d2688dc4f1dcc2d505222fb8d74db600f30535dfdfe"]}]"## + r##"["REQ","test",{"#p":["379e863e8357163b5bce5d2688dc4f1dcc2d505222fb8d74db600f30535dfdfe"]}]"## ); } #[test] fn test_client_message_custom_kind() { - let pk = - PublicKey::from_str("379e863e8357163b5bce5d2688dc4f1dcc2d505222fb8d74db600f30535dfdfe") - .unwrap(); - let filters = vec![ + let client_req = ClientMessage::req( + SubscriptionId::new("test"), Filter::new().kind(Kind::Custom(22)), - Filter::new().pubkey(pk), - ]; - - let client_req = ClientMessage::req(SubscriptionId::new("test"), filters); - assert_eq!( - client_req.as_json(), - r##"["REQ","test",{"kinds":[22]},{"#p":["379e863e8357163b5bce5d2688dc4f1dcc2d505222fb8d74db600f30535dfdfe"]}]"## ); + assert_eq!(client_req.as_json(), r##"["REQ","test",{"kinds":[22]}]"##); } } diff --git a/crates/nwc/src/lib.rs b/crates/nwc/src/lib.rs index 108c431b6..5e8b06f7d 100644 --- a/crates/nwc/src/lib.rs +++ b/crates/nwc/src/lib.rs @@ -80,11 +80,7 @@ impl NWC { // Subscribe self.relay - .subscribe_with_id( - SubscriptionId::new(ID), - vec![filter], - SubscribeOptions::default(), - ) + .subscribe_with_id(SubscriptionId::new(ID), filter, SubscribeOptions::default()) .await?; // Mark as bootstrapped