Skip to content

Commit

Permalink
pool: rename FilterOptions to ReqExitPolicy
Browse files Browse the repository at this point in the history
Ref #691

Signed-off-by: Yuki Kishimoto <[email protected]>
  • Loading branch information
yukibtc committed Dec 25, 2024
1 parent 6dde132 commit 014e852
Show file tree
Hide file tree
Showing 13 changed files with 113 additions and 99 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
* pool: decrease `MAX_RETRY_INTERVAL` to 60 secs ([Yuki Kishimoto])
* pool: rework retry interval calculation ([Yuki Kishimoto])
* pool: improve shutdown docs ([dluvian])
* pool: rename `FilterOptions` to `ReqExitPolicy` ([Yuki Kishimoto])
* sdk: refactor POW difficulty management ([Yuki Kishimoto])
* connect: require `fmt::Debug`, `Send` and `Sync` for `AuthUrlHandler` ([Yuki Kishimoto])
* zapper: bump `webln` to 0.4 ([Yuki Kishimoto])
Expand Down Expand Up @@ -114,6 +115,7 @@
### Deprecated

* pool: deprecated batch event methods ([Yuki Kishimoto])
* pool: deprecate `FilterOptions` ([Yuki Kishimoto])
* sdk: deprecate `timeout` option ([Yuki Kishimoto])
* sdk: deprecate `Options::difficulty` and `Client::update_difficulty` ([Yuki Kishimoto])

Expand Down
7 changes: 4 additions & 3 deletions bindings/nostr-sdk-ffi/src/relay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;

use nostr_sdk::{pool, FilterOptions, RelayUrl, SubscriptionId};
use nostr_sdk::{pool, RelayUrl, SubscriptionId};
use uniffi::{Object, Record};

pub mod filtering;
Expand All @@ -19,7 +19,7 @@ pub mod status;
pub use self::filtering::{RelayFiltering, RelayFilteringMode};
pub use self::limits::RelayLimits;
use self::options::SyncOptions;
pub use self::options::{ConnectionMode, RelayOptions, SubscribeOptions};
pub use self::options::{ConnectionMode, RelayOptions, ReqExitPolicy, SubscribeOptions};
pub use self::stats::RelayConnectionStats;
pub use self::status::RelayStatus;
use crate::database::events::Events;
Expand Down Expand Up @@ -299,14 +299,15 @@ impl Relay {
&self,
filters: Vec<Arc<Filter>>,
timeout: Duration,
policy: ReqExitPolicy,
) -> Result<Events> {
let filters = filters
.into_iter()
.map(|f| f.as_ref().deref().clone())
.collect();
Ok(self
.inner
.fetch_events(filters, timeout, FilterOptions::ExitOnEOSE)
.fetch_events(filters, timeout, policy.into())
.await?
.into())
}
Expand Down
26 changes: 13 additions & 13 deletions bindings/nostr-sdk-ffi/src/relay/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::ops::Deref;
use std::path::PathBuf;
use std::time::Duration;

use nostr_sdk::pool;
use nostr_sdk::{pool, prelude};
use uniffi::{Enum, Object};

use super::{RelayFilteringMode, RelayLimits};
Expand Down Expand Up @@ -159,23 +159,23 @@ impl RelayOptions {
}
}

/// Filter options
/// Request (REQ) exit policy
#[derive(Enum)]
pub enum FilterOptions {
pub enum ReqExitPolicy {
/// Exit on EOSE
ExitOnEOSE,
/// After EOSE is received, keep listening for N more events that match the filter, then return
/// After EOSE is received, keep listening for N more events that match the filter.
WaitForEventsAfterEOSE { num: u16 },
/// After EOSE is received, keep listening for matching events for `Duration` more time, then return
/// After EOSE is received, keep listening for matching events for `Duration` more time.
WaitDurationAfterEOSE { duration: Duration },
}

impl From<FilterOptions> for nostr_sdk::FilterOptions {
fn from(value: FilterOptions) -> Self {
impl From<ReqExitPolicy> for prelude::ReqExitPolicy {
fn from(value: ReqExitPolicy) -> Self {
match value {
FilterOptions::ExitOnEOSE => Self::ExitOnEOSE,
FilterOptions::WaitForEventsAfterEOSE { num } => Self::WaitForEventsAfterEOSE(num),
FilterOptions::WaitDurationAfterEOSE { duration } => {
ReqExitPolicy::ExitOnEOSE => Self::ExitOnEOSE,
ReqExitPolicy::WaitForEventsAfterEOSE { num } => Self::WaitForEventsAfterEOSE(num),
ReqExitPolicy::WaitDurationAfterEOSE { duration } => {
Self::WaitDurationAfterEOSE(duration)
}
}
Expand Down Expand Up @@ -205,10 +205,10 @@ impl SubscribeAutoCloseOptions {
}
}

/// Close subscription when `FilterOptions` is satisfied
pub fn filter(&self, filter: FilterOptions) -> Self {
/// Close subscription when the policy is satisfied
pub fn exit_policy(&self, policy: ReqExitPolicy) -> Self {
let mut builder = self.clone();
builder.inner = builder.inner.filter(filter.into());
builder.inner = builder.inner.exit_policy(policy.into());
builder
}

Expand Down
6 changes: 3 additions & 3 deletions bindings/nostr-sdk-js/src/relay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub mod options;

use self::filtering::JsRelayFiltering;
use self::flags::JsAtomicRelayServiceFlags;
use self::options::{JsFilterOptions, JsRelayOptions, JsSubscribeOptions, JsSyncOptions};
use self::options::{JsRelayOptions, JsReqExitPolicy, JsSubscribeOptions, JsSyncOptions};
use crate::database::JsEvents;
use crate::duration::JsDuration;
use crate::error::{into_err, Result};
Expand Down Expand Up @@ -250,12 +250,12 @@ impl JsRelay {
&self,
filters: Vec<JsFilter>,
timeout: &JsDuration,
opts: &JsFilterOptions,
policy: &JsReqExitPolicy,
) -> Result<JsEvents> {
let filters: Vec<Filter> = filters.into_iter().map(|f| f.into()).collect();
Ok(self
.inner
.fetch_events(filters, **timeout, **opts)
.fetch_events(filters, **timeout, **policy)
.await
.map_err(into_err)?
.into())
Expand Down
33 changes: 17 additions & 16 deletions bindings/nostr-sdk-js/src/relay/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,43 +91,43 @@ impl JsRelayOptions {
}
}

/// Filter options
#[wasm_bindgen(js_name = FilterOptions)]
pub struct JsFilterOptions {
inner: FilterOptions,
/// Request (REQ) exit policy
#[wasm_bindgen(js_name = ReqExitPolicy)]
pub struct JsReqExitPolicy {
inner: ReqExitPolicy,
}

impl Deref for JsFilterOptions {
type Target = FilterOptions;
impl Deref for JsReqExitPolicy {
type Target = ReqExitPolicy;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

#[wasm_bindgen(js_class = FilterOptions)]
impl JsFilterOptions {
#[wasm_bindgen(js_class = ReqExitPolicy)]
impl JsReqExitPolicy {
/// Exit on EOSE
#[wasm_bindgen(js_name = exitOnEose)]
pub fn exit_on_eose() -> Self {
Self {
inner: FilterOptions::ExitOnEOSE,
inner: ReqExitPolicy::ExitOnEOSE,
}
}

/// After EOSE is received, keep listening for N more events that match the filter, then return
/// After EOSE is received, keep listening for N more events that match the filter
#[wasm_bindgen(js_name = waitForEventsAfterEOSE)]
pub fn wait_for_events_after_eose(num: u16) -> Self {
Self {
inner: FilterOptions::WaitForEventsAfterEOSE(num),
inner: ReqExitPolicy::WaitForEventsAfterEOSE(num),
}
}

/// After EOSE is received, keep listening for matching events for `Duration` more time, then return
/// After EOSE is received, keep listening for matching events for `Duration` more time
#[wasm_bindgen(js_name = waitDurationAfterEOSE)]
pub fn wait_duration_after_eose(duration: &JsDuration) -> Self {
Self {
inner: FilterOptions::WaitDurationAfterEOSE(**duration),
inner: ReqExitPolicy::WaitDurationAfterEOSE(**duration),
}
}
}
Expand Down Expand Up @@ -161,9 +161,10 @@ impl JsSubscribeAutoCloseOptions {
}
}

/// Close subscription when `FilterOptions` is satisfied
pub fn filter(self, filter: JsFilterOptions) -> Self {
self.inner.filter(filter.inner).into()
/// Close subscription when the policy is satisfied
#[wasm_bindgen(js_name = exitPolicy)]
pub fn exit_policy(self, policy: JsReqExitPolicy) -> Self {
self.inner.exit_policy(policy.inner).into()
}

/// Automatically close subscription after `Duration`
Expand Down
3 changes: 1 addition & 2 deletions crates/nostr-relay-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ pub use self::pool::{Output, RelayPool, RelayPoolNotification};
pub use self::relay::flags::{AtomicRelayServiceFlags, RelayServiceFlags};
pub use self::relay::limits::RelayLimits;
pub use self::relay::options::{
FilterOptions, RelayOptions, SubscribeAutoCloseOptions, SubscribeOptions, SyncDirection,
SyncOptions,
RelayOptions, SubscribeAutoCloseOptions, SubscribeOptions, SyncDirection, SyncOptions,
};
pub use self::relay::stats::RelayConnectionStats;
pub use self::relay::{
Expand Down
23 changes: 12 additions & 11 deletions crates/nostr-relay-pool/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tokio::sync::{broadcast, mpsc, Mutex, RwLock, RwLockReadGuard};
use super::constants::MAX_CONNECTING_CHUNK;
use super::options::RelayPoolOptions;
use super::{Error, Output, RelayPoolNotification};
use crate::relay::options::{FilterOptions, RelayOptions, SyncOptions};
use crate::relay::options::{RelayOptions, ReqExitPolicy, SyncOptions};
use crate::relay::{FlagCheck, Reconciliation, Relay};
use crate::shared::SharedState;
use crate::stream::ReceiverStream;
Expand Down Expand Up @@ -762,18 +762,18 @@ impl InnerRelayPool {
&self,
filters: Vec<Filter>,
timeout: Duration,
opts: FilterOptions,
policy: ReqExitPolicy,
) -> Result<Events, Error> {
let urls: Vec<RelayUrl> = self.read_relay_urls().await;
self.fetch_events_from(urls, filters, timeout, opts).await
self.fetch_events_from(urls, filters, timeout, policy).await
}

pub async fn fetch_events_from<I, U>(
&self,
urls: I,
filters: Vec<Filter>,
timeout: Duration,
opts: FilterOptions,
policy: ReqExitPolicy,
) -> Result<Events, Error>
where
I: IntoIterator<Item = U>,
Expand All @@ -784,7 +784,7 @@ impl InnerRelayPool {

// Stream events
let mut stream = self
.stream_events_from(urls, filters, timeout, opts)
.stream_events_from(urls, filters, timeout, policy)
.await?;
while let Some(event) = stream.next().await {
events.insert(event);
Expand All @@ -798,10 +798,11 @@ impl InnerRelayPool {
&self,
filters: Vec<Filter>,
timeout: Duration,
opts: FilterOptions,
policy: ReqExitPolicy,
) -> Result<ReceiverStream<Event>, Error> {
let urls: Vec<RelayUrl> = self.read_relay_urls().await;
self.stream_events_from(urls, filters, timeout, opts).await
self.stream_events_from(urls, filters, timeout, policy)
.await
}

#[inline]
Expand All @@ -810,23 +811,23 @@ impl InnerRelayPool {
urls: I,
filters: Vec<Filter>,
timeout: Duration,
opts: FilterOptions,
policy: ReqExitPolicy,
) -> Result<ReceiverStream<Event>, Error>
where
I: IntoIterator<Item = U>,
U: TryIntoUrl,
Error: From<<U as TryIntoUrl>::Err>,
{
let targets = urls.into_iter().map(|u| (u, filters.clone()));
self.stream_events_targeted(targets, timeout, opts).await
self.stream_events_targeted(targets, timeout, policy).await
}

// TODO: change target type to `HashMap<Url, Vec<Filter>>`?
pub async fn stream_events_targeted<I, U>(
&self,
targets: I,
timeout: Duration,
opts: FilterOptions,
policy: ReqExitPolicy,
) -> Result<ReceiverStream<Event>, Error>
where
I: IntoIterator<Item = (U, Vec<Filter>)>,
Expand Down Expand Up @@ -882,7 +883,7 @@ impl InnerRelayPool {
futures.push(relay.fetch_events_with_callback(
filters,
timeout,
opts,
policy,
|event| async {
let mut ids = ids.lock().await;
if ids.insert(event.id) {
Expand Down
22 changes: 11 additions & 11 deletions crates/nostr-relay-pool/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use self::inner::InnerRelayPool;
pub use self::options::RelayPoolOptions;
pub use self::output::Output;
use crate::relay::flags::FlagCheck;
use crate::relay::options::{FilterOptions, RelayOptions, SyncOptions};
use crate::relay::options::{RelayOptions, ReqExitPolicy, SyncOptions};
use crate::relay::{Relay, RelayFiltering, RelayStatus};
use crate::shared::SharedState;
use crate::stream::ReceiverStream;
Expand Down Expand Up @@ -524,9 +524,9 @@ impl RelayPool {
&self,
filters: Vec<Filter>,
timeout: Duration,
opts: FilterOptions,
policy: ReqExitPolicy,
) -> Result<Events, Error> {
self.inner.fetch_events(filters, timeout, opts).await
self.inner.fetch_events(filters, timeout, policy).await
}

/// Fetch events from specific relays
Expand All @@ -536,15 +536,15 @@ impl RelayPool {
urls: I,
filters: Vec<Filter>,
timeout: Duration,
opts: FilterOptions,
policy: ReqExitPolicy,
) -> Result<Events, Error>
where
I: IntoIterator<Item = U>,
U: TryIntoUrl,
Error: From<<U as TryIntoUrl>::Err>,
{
self.inner
.fetch_events_from(urls, filters, timeout, opts)
.fetch_events_from(urls, filters, timeout, policy)
.await
}

Expand All @@ -554,9 +554,9 @@ impl RelayPool {
&self,
filters: Vec<Filter>,
timeout: Duration,
opts: FilterOptions,
policy: ReqExitPolicy,
) -> Result<ReceiverStream<Event>, Error> {
self.inner.stream_events(filters, timeout, opts).await
self.inner.stream_events(filters, timeout, policy).await
}

/// Stream events from specific relays
Expand All @@ -566,15 +566,15 @@ impl RelayPool {
urls: I,
filters: Vec<Filter>,
timeout: Duration,
opts: FilterOptions,
policy: ReqExitPolicy,
) -> Result<ReceiverStream<Event>, Error>
where
I: IntoIterator<Item = U>,
U: TryIntoUrl,
Error: From<<U as TryIntoUrl>::Err>,
{
self.inner
.stream_events_from(urls, filters, timeout, opts)
.stream_events_from(urls, filters, timeout, policy)
.await
}

Expand All @@ -586,15 +586,15 @@ impl RelayPool {
&self,
source: I,
timeout: Duration,
opts: FilterOptions,
policy: ReqExitPolicy,
) -> Result<ReceiverStream<Event>, Error>
where
I: IntoIterator<Item = (U, Vec<Filter>)>,
U: TryIntoUrl,
Error: From<<U as TryIntoUrl>::Err>,
{
self.inner
.stream_events_targeted(source, timeout, opts)
.stream_events_targeted(source, timeout, policy)
.await
}

Expand Down
Loading

0 comments on commit 014e852

Please sign in to comment.