Skip to content

Commit

Permalink
pool: refactor Relay::send_event to take event ref
Browse files Browse the repository at this point in the history
Signed-off-by: Yuki Kishimoto <[email protected]>
  • Loading branch information
yukibtc committed Jan 29, 2025
1 parent dd94ad5 commit b318ca8
Show file tree
Hide file tree
Showing 11 changed files with 44 additions and 40 deletions.
8 changes: 2 additions & 6 deletions bindings/nostr-sdk-ffi/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,16 +410,12 @@ impl Client {
/// Send event to all relays with `WRITE` flag.
/// If `gossip` is enabled (see `Options`) the event will be sent also to NIP65 relays (automatically discovered).
pub async fn send_event(&self, event: &Event) -> Result<SendEventOutput> {
Ok(self.inner.send_event(event.deref().clone()).await?.into())
Ok(self.inner.send_event(event.deref()).await?.into())
}

/// Send event to specific relays.
pub async fn send_event_to(&self, urls: Vec<String>, event: &Event) -> Result<SendEventOutput> {
Ok(self
.inner
.send_event_to(urls, event.deref().clone())
.await?
.into())
Ok(self.inner.send_event_to(urls, event.deref()).await?.into())
}

/// Signs the `EventBuilder` into an `Event` using the `NostrSigner`
Expand Down
4 changes: 1 addition & 3 deletions bindings/nostr-sdk-ffi/src/relay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,7 @@ impl Relay {

/// Send event and wait for `OK` relay msg
pub async fn send_event(&self, event: &Event) -> Result<Arc<EventId>> {
Ok(Arc::new(
self.inner.send_event(event.deref().clone()).await?.into(),
))
Ok(Arc::new(self.inner.send_event(event.deref()).await?.into()))
}

/// Subscribe to filters
Expand Down
4 changes: 2 additions & 2 deletions bindings/nostr-sdk-js/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ impl JsClient {
#[wasm_bindgen(js_name = sendEvent)]
pub async fn send_event(&self, event: &JsEvent) -> Result<JsSendEventOutput> {
self.inner
.send_event(event.deref().clone())
.send_event(event.deref())
.await
.map_err(into_err)
.map(|id| id.into())
Expand All @@ -457,7 +457,7 @@ impl JsClient {
event: &JsEvent,
) -> Result<JsSendEventOutput> {
self.inner
.send_event_to(urls, event.deref().clone())
.send_event_to(urls, event.deref())
.await
.map_err(into_err)
.map(|id| id.into())
Expand Down
2 changes: 1 addition & 1 deletion bindings/nostr-sdk-js/src/relay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ impl JsRelay {
pub async fn send_event(&self, event: &JsEvent) -> Result<JsEventId> {
Ok(self
.inner
.send_event(event.deref().clone())
.send_event(event.deref())
.await
.map_err(into_err)?
.into())
Expand Down
2 changes: 1 addition & 1 deletion crates/nostr-connect/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ impl NostrConnect {
let mut notifications = self.pool.notifications();

// Send request
self.pool.send_event(event).await?;
self.pool.send_event(&event).await?;

time::timeout(Some(self.timeout), async {
while let Ok(notification) = notifications.recv().await {
Expand Down
4 changes: 2 additions & 2 deletions crates/nostr-connect/src/signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl NostrConnectRemoteSigner {
});
let event = EventBuilder::nostr_connect(&self.keys.signer, public_key, msg)?
.sign_with_keys(&self.keys.signer)?;
self.pool.send_event(event).await?;
self.pool.send_event(&event).await?;
Ok(())
}

Expand Down Expand Up @@ -296,7 +296,7 @@ impl NostrConnectRemoteSigner {
msg,
)?
.sign_with_keys(&self.keys.signer)?;
self.pool.send_event(event).await?;
self.pool.send_event(&event).await?;
}
}
Err(e) => {
Expand Down
2 changes: 1 addition & 1 deletion crates/nostr-relay-pool/examples/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async fn main() -> Result<()> {
pool.connect().await;

let event = Event::from_json(r#"{"content":"","created_at":1698412975,"id":"f55c30722f056e330d8a7a6a9ba1522f7522c0f1ced1c93d78ea833c78a3d6ec","kind":3,"pubkey":"f831caf722214748c72db4829986bd0cbb2bb8b3aeade1c959624a52a9629046","sig":"5092a9ffaecdae7d7794706f085ff5852befdf79df424cc3419bb797bf515ae05d4f19404cb8324b8b4380a4bd497763ac7b0f3b1b63ef4d3baa17e5f5901808","tags":[["p","4ddeb9109a8cd29ba279a637f5ec344f2479ee07df1f4043f3fe26d8948cfef9","",""],["p","bb6fd06e156929649a73e6b278af5e648214a69d88943702f1fb627c02179b95","",""],["p","b8b8210f33888fdbf5cedee9edf13c3e9638612698fe6408aff8609059053420","",""],["p","9dcee4fabcd690dc1da9abdba94afebf82e1e7614f4ea92d61d52ef9cd74e083","",""],["p","3eea9e831fefdaa8df35187a204d82edb589a36b170955ac5ca6b88340befaa0","",""],["p","885238ab4568f271b572bf48b9d6f99fa07644731f288259bd395998ee24754e","",""],["p","568a25c71fba591e39bebe309794d5c15d27dbfa7114cacb9f3586ea1314d126","",""]]}"#).unwrap();
pool.send_event(event).await?;
pool.send_event(&event).await?;
}
// Pool dropped

Expand Down
11 changes: 7 additions & 4 deletions crates/nostr-relay-pool/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,13 +559,17 @@ impl RelayPool {
}

/// Send event to all relays with `WRITE` flag (check [`RelayServiceFlags`] for more details).
pub async fn send_event(&self, event: Event) -> Result<Output<EventId>, Error> {
pub async fn send_event(&self, event: &Event) -> Result<Output<EventId>, Error> {
let urls: Vec<RelayUrl> = self.write_relay_urls().await;
self.send_event_to(urls, event).await
}

/// Send event to specific relays
pub async fn send_event_to<I, U>(&self, urls: I, event: Event) -> Result<Output<EventId>, Error>
pub async fn send_event_to<I, U>(
&self,
urls: I,
event: &Event,
) -> Result<Output<EventId>, Error>
where
I: IntoIterator<Item = U>,
U: TryIntoUrl,
Expand Down Expand Up @@ -595,7 +599,7 @@ impl RelayPool {
}

// Save event into database
self.inner.state.database().save_event(&event).await?;
self.inner.state.database().save_event(event).await?;

let mut urls: Vec<RelayUrl> = Vec::with_capacity(set.len());
let mut futures = Vec::with_capacity(set.len());
Expand All @@ -608,7 +612,6 @@ impl RelayPool {
// Compose futures
for url in set.into_iter() {
let relay: &Relay = self.internal_relay(&relays, &url)?;
let event: Event = event.clone();
urls.push(url);
futures.push(relay.send_event(event));
}
Expand Down
17 changes: 8 additions & 9 deletions crates/nostr-relay-pool/src/relay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ impl Relay {
notifications: &mut broadcast::Receiver<RelayNotification>,
event: &Event,
) -> Result<(bool, String), Error> {
// Send message
// Send the EVENT message
self.inner
.send_msg(ClientMessage::Event(Cow::Borrowed(event)))?;

Expand All @@ -367,16 +367,15 @@ impl Relay {
.await
}

// TODO: take reference here?
/// Send event and wait for `OK` relay msg
pub async fn send_event(&self, event: Event) -> Result<EventId, Error> {
pub async fn send_event(&self, event: &Event) -> Result<EventId, Error> {
// Health, write permission and number of messages checks are executed in `batch_msg` method.

// Subscribe to notifications
let mut notifications = self.inner.internal_notification_sender.subscribe();

// Send event
let (status, message) = self._send_event(&mut notifications, &event).await?;
let (status, message) = self._send_event(&mut notifications, event).await?;

// Check status
if status {
Expand All @@ -393,7 +392,7 @@ impl Relay {
.await?;

// Try to resend event
let (status, message) = self._send_event(&mut notifications, &event).await?;
let (status, message) = self._send_event(&mut notifications, event).await?;

// Check status
return if status {
Expand Down Expand Up @@ -741,7 +740,7 @@ mod tests {
let event = EventBuilder::text_note("Test")
.sign_with_keys(&keys)
.unwrap();
relay.send_event(event).await.unwrap();
relay.send_event(&event).await.unwrap();
}

#[tokio::test]
Expand Down Expand Up @@ -1052,7 +1051,7 @@ mod tests {
let event = EventBuilder::text_note("Test")
.sign_with_keys(&keys)
.unwrap();
let err = relay.send_event(event).await.unwrap_err();
let err = relay.send_event(&event).await.unwrap_err();
if let Error::RelayMessage(msg) = err {
assert_eq!(
MachineReadablePrefix::parse(&msg).unwrap(),
Expand All @@ -1069,7 +1068,7 @@ mod tests {
let event = EventBuilder::text_note("Test")
.sign_with_keys(&keys)
.unwrap();
assert!(relay.send_event(event).await.is_ok());
assert!(relay.send_event(&event).await.is_ok());
}

#[tokio::test]
Expand All @@ -1093,7 +1092,7 @@ mod tests {
let event = EventBuilder::text_note("Test")
.sign_with_keys(&keys)
.unwrap();
relay.send_event(event).await.unwrap();
relay.send_event(&event).await.unwrap();

let filter = Filter::new().kind(Kind::TextNote).limit(3);

Expand Down
28 changes: 18 additions & 10 deletions crates/nostr-sdk/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,7 @@ impl Client {
/// Send [`Event`] to all relays with [`RelayServiceFlags::WRITE`] flag.
/// If `gossip` is enabled (see [`Options::gossip`]) the event will be sent also to NIP65 relays (automatically discovered).
#[inline]
pub async fn send_event(&self, event: Event) -> Result<Output<EventId>, Error> {
pub async fn send_event(&self, event: &Event) -> Result<Output<EventId>, Error> {
// NOT gossip, send event to all relays
if !self.opts.gossip {
return Ok(self.pool.send_event(event).await?);
Expand All @@ -905,7 +905,11 @@ impl Client {

/// Send event to specific relays.
#[inline]
pub async fn send_event_to<I, U>(&self, urls: I, event: Event) -> Result<Output<EventId>, Error>
pub async fn send_event_to<I, U>(
&self,
urls: I,
event: &Event,
) -> Result<Output<EventId>, Error>
where
I: IntoIterator<Item = U>,
U: TryIntoUrl,
Expand All @@ -931,7 +935,7 @@ impl Client {
builder: EventBuilder,
) -> Result<Output<EventId>, Error> {
let event: Event = self.sign_event_builder(builder).await?;
self.send_event(event).await
self.send_event(&event).await
}

/// Take an [`EventBuilder`], sign it by using the [`NostrSigner`] and broadcast to specific relays.
Expand All @@ -949,7 +953,7 @@ impl Client {
pool::Error: From<<U as TryIntoUrl>::Err>,
{
let event: Event = self.sign_event_builder(builder).await?;
self.send_event_to(urls, event).await
self.send_event_to(urls, &event).await
}

/// Fetch the newest public key metadata from relays.
Expand Down Expand Up @@ -1110,10 +1114,10 @@ impl Client {

// NOT gossip, send to all relays
if !self.opts.gossip {
return self.send_event(event).await;
return self.send_event(&event).await;
}

self.gossip_send_event(event, true).await
self.gossip_send_event(&event, true).await
}

/// Send a private direct message to specific relays
Expand All @@ -1140,7 +1144,7 @@ impl Client {
let signer = self.signer().await?;
let event: Event =
EventBuilder::private_msg(&signer, receiver, message, rumor_extra_tags).await?;
self.send_event_to(urls, event).await
self.send_event_to(urls, &event).await
}

/// Construct Gift Wrap and send to relays
Expand Down Expand Up @@ -1169,7 +1173,7 @@ impl Client {
EventBuilder::gift_wrap(&signer, receiver, rumor, extra_tags).await?;

// Send
self.send_event(gift_wrap).await
self.send_event(&gift_wrap).await
}

/// Construct Gift Wrap and send to specific relays
Expand Down Expand Up @@ -1200,7 +1204,7 @@ impl Client {
EventBuilder::gift_wrap(&signer, receiver, rumor, extra_tags).await?;

// Send
self.send_event_to(urls, gift_wrap).await
self.send_event_to(urls, &gift_wrap).await
}

/// Unwrap Gift Wrap event
Expand Down Expand Up @@ -1323,7 +1327,11 @@ impl Client {
Ok(filters)
}

async fn gossip_send_event(&self, event: Event, nip17: bool) -> Result<Output<EventId>, Error> {
async fn gossip_send_event(
&self,
event: &Event,
nip17: bool,
) -> Result<Output<EventId>, Error> {
// Get all public keys involved in the event
let public_keys = event
.tags
Expand Down
2 changes: 1 addition & 1 deletion crates/nwc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl NWC {
let mut notifications = self.relay.notifications();

// Send request
let id: EventId = self.relay.send_event(event).await?;
let id: EventId = self.relay.send_event(&event).await?;

time::timeout(Some(self.opts.timeout), async {
while let Ok(notification) = notifications.recv().await {
Expand Down

0 comments on commit b318ca8

Please sign in to comment.