Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pool: auto-close subscription after Duration of no new events #693

Merged
merged 1 commit into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
* database: impl PartialEq and Eq for `Events` ([Yuki Kishimoto])
* database: add `SaveEventStatus` enum ([Yuki Kishimoto])
* pool: add `ReceiverStream` ([Yuki Kishimoto])
* Add `SubscribeAutoCloseOptions::idle_timeout` ([Yuki Kishimoto])
* sdk: automatically resend event after NIP-42 authentication ([Yuki Kishimoto])
* sdk: add `Connection::embedded_tor_with_path` ([Yuki Kishimoto])
* connect: add `NostrConnect::status` ([Yuki Kishimoto])
Expand Down
9 changes: 8 additions & 1 deletion bindings/nostr-sdk-ffi/src/relay/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,19 @@ impl SubscribeAutoCloseOptions {
builder
}

/// Automatically close subscription after `Duration`
/// Automatically close subscription after duration.
pub fn timeout(&self, timeout: Option<Duration>) -> Self {
let mut builder = self.clone();
builder.inner = builder.inner.timeout(timeout);
builder
}

/// Automatically close subscription if no notifications/events are received within the duration.
pub fn idle_timeout(&self, timeout: Option<Duration>) -> Self {
let mut builder = self.clone();
builder.inner = builder.inner.idle_timeout(timeout);
builder
}
}

/// Subscribe options
Expand Down
6 changes: 6 additions & 0 deletions bindings/nostr-sdk-js/src/relay/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ impl JsSubscribeAutoCloseOptions {
pub fn timeout(self, timeout: Option<JsDuration>) -> Self {
self.inner.timeout(timeout.map(|t| *t)).into()
}

/// Automatically close subscription if no notifications/events are received within the duration.
#[wasm_bindgen(js_name = idleTimeout)]
pub fn idle_timeout(self, timeout: Option<JsDuration>) -> Self {
self.inner.idle_timeout(timeout.map(|t| *t)).into()
}
}

/// Subscribe options
Expand Down
117 changes: 71 additions & 46 deletions crates/nostr-relay-pool/src/relay/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1361,40 +1361,7 @@ impl InnerRelay {

// Check if auto-close condition is set
match opts.auto_close {
Some(opts) => {
let relay = self.clone();
task::spawn(async move {
let res: Option<(bool, Option<SubscriptionAutoClosedReason>)> =
relay.handle_auto_closing(&id, filters, opts).await;

// Check if CLOSE needed
let to_close: bool = match res {
Some((to_close, reason)) => {
// Send subscription auto closed notification
if let Some(reason) = reason {
relay.send_notification(
RelayNotification::SubscriptionAutoClosed { reason },
false,
);
}

to_close
}
None => {
tracing::warn!(id = %id, "Timeout reached for subscription, auto-closing.");
true
}
};

// Close subscription
if to_close {
tracing::debug!(id = %id, "Auto-closing subscription.");
relay.send_msg(ClientMessage::close(id))?;
}

Ok::<(), Error>(())
});
}
Some(opts) => self.spawn_auto_closing_handler(id, filters, opts),
None => {
// No auto-close subscription: update subscription filters
self.update_subscription(id, filters, true).await;
Expand All @@ -1404,25 +1371,83 @@ impl InnerRelay {
Ok(())
}

fn spawn_auto_closing_handler(
&self,
id: SubscriptionId,
filters: Vec<Filter>,
opts: SubscribeAutoCloseOptions,
) {
let relay = self.clone();
task::spawn(async move {
// Check if CLOSE needed
let to_close: bool = match relay.handle_auto_closing(&id, filters, opts).await {
Some((to_close, reason)) => {
// Send subscription auto-closed notification
if let Some(reason) = reason {
relay.send_notification(
RelayNotification::SubscriptionAutoClosed { reason },
false,
);
}

to_close
}
// Timeout
None => {
tracing::warn!(id = %id, "Timeout reached for subscription, auto-closing.");
true
}
};

// Close subscription
if to_close {
tracing::debug!(id = %id, "Auto-closing subscription.");
relay.send_msg(ClientMessage::close(id))?;
}

Ok::<(), Error>(())
});
}

async fn handle_auto_closing(
&self,
id: &SubscriptionId,
filters: Vec<Filter>,
opts: SubscribeAutoCloseOptions,
) -> Option<(bool, Option<SubscriptionAutoClosedReason>)> {
time::timeout(opts.timeout, async move {
let mut counter = 0;
let mut counter: u16 = 0;
let mut received_eose: bool = false;
let mut require_resubscription: bool = false;
let mut last_event: Option<Instant> = None;

// Subscribe to notifications
let mut notifications = self.internal_notification_sender.subscribe();
while let Ok(notification) = notifications.recv().await {

// Listen to notifications with timeout
// If no notification is received within no-events timeout, `None` is returned.
while let Ok(notification) =
time::timeout(opts.idle_timeout, notifications.recv()).await?
{
// Check if no-events timeout is reached
if let (Some(idle_timeout), Some(last_event)) = (opts.idle_timeout, last_event) {
if last_event.elapsed() > idle_timeout {
// Close the subscription
return Some((true, None)); // TODO: use SubscriptionAutoClosedReason::Timeout?
}
}

match notification {
RelayNotification::Message { message, .. } => match message {
RelayMessage::Event {
subscription_id, ..
} => {
if &subscription_id == id {
// If no-events timeout is enabled, update instant of last event received
if opts.idle_timeout.is_some() {
last_event = Some(Instant::now());
}

if let ReqExitPolicy::WaitForEventsAfterEOSE(num) = opts.exit_policy
{
if received_eose {
Expand Down Expand Up @@ -1455,17 +1480,17 @@ impl InnerRelay {
if self.state.is_auto_authentication_enabled() {
require_resubscription = true;
} else {
return (
return Some((
false,
Some(SubscriptionAutoClosedReason::Closed(message)),
); // No need to send CLOSE msg
)); // No need to send CLOSE msg
}
}
_ => {
return (
return Some((
false,
Some(SubscriptionAutoClosedReason::Closed(message)),
); // No need to send CLOSE msg
)); // No need to send CLOSE msg
}
}
}
Expand All @@ -1482,18 +1507,18 @@ impl InnerRelay {
}
}
RelayNotification::AuthenticationFailed => {
return (
return Some((
false,
Some(SubscriptionAutoClosedReason::AuthenticationFailed),
); // No need to send CLOSE msg
)); // No need to send CLOSE msg
}
RelayNotification::RelayStatus { status } => {
if status.is_disconnected() {
return (false, None); // No need to send CLOSE msg
return Some((false, None)); // No need to send CLOSE msg
}
}
RelayNotification::Shutdown => {
return (false, None); // No need to send CLOSE msg
return Some((false, None)); // No need to send CLOSE msg
}
_ => (),
}
Expand All @@ -1520,9 +1545,9 @@ impl InnerRelay {
.await;
}

(true, Some(SubscriptionAutoClosedReason::Completed)) // Need to send CLOSE msg
Some((true, Some(SubscriptionAutoClosedReason::Completed))) // Need to send CLOSE msg
})
.await
.await?
}

pub async fn unsubscribe(&self, id: SubscriptionId) -> Result<(), Error> {
Expand Down
9 changes: 8 additions & 1 deletion crates/nostr-relay-pool/src/relay/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ impl RelayOptions {
pub struct SubscribeAutoCloseOptions {
pub(super) exit_policy: ReqExitPolicy,
pub(super) timeout: Option<Duration>,
pub(super) idle_timeout: Option<Duration>,
}

impl SubscribeAutoCloseOptions {
Expand All @@ -172,11 +173,17 @@ impl SubscribeAutoCloseOptions {
self
}

/// Automatically close subscription after [Duration]
/// Automatically close subscription after [`Duration`].
pub fn timeout(mut self, timeout: Option<Duration>) -> Self {
self.timeout = timeout;
self
}

/// Automatically close subscription if no notifications/events are received within the [`Duration`].
pub fn idle_timeout(mut self, timeout: Option<Duration>) -> Self {
self.idle_timeout = timeout;
self
}
}

/// Subscribe options
Expand Down
Loading