Skip to content

Commit

Permalink
pool: add atomic-destructor
Browse files Browse the repository at this point in the history
Wrap `InternalRelay` and `InternalRelayPool` in `AtomicDestructor`
Remove `util` module
  • Loading branch information
yukibtc committed Mar 6, 2024
1 parent b9f31fc commit 7e197ba
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 229 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/nostr-relay-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ nip11 = ["nostr/nip11"]
[dependencies]
async-utility.workspace = true
async-wsocket = "0.3"
atomic-destructor = { version = "0.1", default-features = false, features = ["tracing"] }
nostr = { workspace = true, features = ["std"] }
nostr-database.workspace = true
thiserror.workspace = true
Expand Down
1 change: 0 additions & 1 deletion crates/nostr-relay-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
pub mod pool;
pub mod prelude;
pub mod relay;
mod util;

pub use self::pool::options::RelayPoolOptions;
pub use self::pool::{RelayPool, RelayPoolNotification};
Expand Down
22 changes: 20 additions & 2 deletions crates/nostr-relay-pool/src/pool/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ use std::sync::Arc;
use std::time::Duration;

use async_utility::{thread, time};
use atomic_destructor::AtomicDestroyer;
use nostr::message::MessageHandleError;
use nostr::{ClientMessage, Event, EventId, Filter, Timestamp, TryIntoUrl, Url};
use nostr_database::{DatabaseError, DynNostrDatabase, IntoNostrDatabase, Order};
use thiserror::Error;
use tokio::sync::{broadcast, Mutex, RwLock};

use super::RelayPoolNotification;
use super::options::RelayPoolOptions;
use super::RelayPoolNotification;
use crate::relay::limits::Limits;
use crate::relay::options::{FilterOptions, NegentropyOptions, RelayOptions, RelaySendOptions};
use crate::relay::{Error as RelayError, InternalSubscriptionId, Relay};
Expand Down Expand Up @@ -69,6 +70,21 @@ pub struct InternalRelayPool {
// opts: RelayPoolOptions,
}

impl AtomicDestroyer for InternalRelayPool {
fn name(&self) -> Option<String> {
Some(String::from("Relay Pool"))
}

fn on_destroy(&self) {
let pool = self.clone();
let _ = thread::spawn(async move {
if let Err(e) = pool.shutdown().await {
tracing::error!("Impossible to shutdown Relay Pool: {e}");
}
});
}
}

impl InternalRelayPool {
pub fn with_database<D>(opts: RelayPoolOptions, database: D) -> Self
where
Expand Down Expand Up @@ -112,6 +128,8 @@ impl InternalRelayPool {
})
.await;

tracing::info!("Relay pool shutdown");

Ok(())
}

Expand Down Expand Up @@ -550,4 +568,4 @@ impl InternalRelayPool {

Ok(())
}
}
}
104 changes: 27 additions & 77 deletions crates/nostr-relay-pool/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,22 @@
//! Relay Pool
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;

use async_utility::thread;
use atomic_destructor::AtomicDestructor;
use nostr::{ClientMessage, Event, EventId, Filter, RelayMessage, Timestamp, TryIntoUrl, Url};
use nostr_database::{DynNostrDatabase, IntoNostrDatabase, MemoryDatabase};
use tokio::sync::broadcast;

pub mod options;
mod internal;
pub mod options;

pub use self::internal::Error;
use self::internal::InternalRelayPool;
pub use self::options::RelayPoolOptions;
use crate::relay::options::{FilterOptions, NegentropyOptions, RelayOptions, RelaySendOptions};
use crate::relay::{Relay, RelayStatus};
use crate::util::SaturatingUsize;

/// Relay Pool Notification
#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -55,11 +53,9 @@ pub enum RelayPoolNotification {
}

/// Relay Pool
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct RelayPool {
internal: InternalRelayPool,
shutdown: Arc<AtomicBool>,
ref_counter: Arc<AtomicUsize>,
inner: AtomicDestructor<InternalRelayPool>,
}

impl Default for RelayPool {
Expand All @@ -68,52 +64,6 @@ impl Default for RelayPool {
}
}

impl Clone for RelayPool {
fn clone(&self) -> Self {
// Increase counter
let new_ref_counter: usize = self.ref_counter.saturating_increment(Ordering::SeqCst);
tracing::debug!("Relay Pool cloned: ref counter increased to {new_ref_counter}");

// Clone
Self {
internal: self.internal.clone(),
shutdown: self.shutdown.clone(),
ref_counter: self.ref_counter.clone(),
}
}
}

impl Drop for RelayPool {
fn drop(&mut self) {
// Check if already shutdown
if self.shutdown.load(Ordering::SeqCst) {
tracing::debug!("Relay Pool already shutdown");
} else {
// Decrease counter
let new_ref_counter: usize = self.ref_counter.saturating_decrement(Ordering::SeqCst);
tracing::debug!("Relay Pool dropped: ref counter decreased to {new_ref_counter}");

// Check if it's time for shutdown
if new_ref_counter == 0 {
tracing::debug!("Shutting down Relay Pool...");

// Mark as shutdown
self.shutdown.store(true, Ordering::SeqCst);

// Clone internal pool and shutdown
let pool: InternalRelayPool = self.internal.clone();
let _ = thread::spawn(async move {
if let Err(e) = pool.shutdown().await {
tracing::error!("Impossible to shutdown Relay Pool: {e}");
}
});

tracing::info!("Relay Pool shutdown.");
}
}
}
}

impl RelayPool {
/// Create new `RelayPool`
pub fn new(opts: RelayPoolOptions) -> Self {
Expand All @@ -126,37 +76,35 @@ impl RelayPool {
D: IntoNostrDatabase,
{
Self {
internal: InternalRelayPool::with_database(opts, database),
shutdown: Arc::new(AtomicBool::new(false)),
ref_counter: Arc::new(AtomicUsize::new(1)),
inner: AtomicDestructor::new(InternalRelayPool::with_database(opts, database)),
}
}

/// Stop
///
/// Call `connect` to re-start relays connections
pub async fn stop(&self) -> Result<(), Error> {
self.internal.stop().await
self.inner.stop().await
}

/// Completely shutdown pool
pub async fn shutdown(self) -> Result<(), Error> {
self.internal.shutdown().await
self.inner.shutdown().await
}

/// Get new **pool** notification listener
pub fn notifications(&self) -> broadcast::Receiver<RelayPoolNotification> {
self.internal.notifications()
self.inner.notifications()
}

/// Get database
pub fn database(&self) -> Arc<DynNostrDatabase> {
self.internal.database()
self.inner.database()
}

/// Get relays
pub async fn relays(&self) -> HashMap<Url, Relay> {
self.internal.relays().await
self.inner.relays().await
}

/// Get [`Relay`]
Expand All @@ -165,12 +113,12 @@ impl RelayPool {
U: TryIntoUrl,
Error: From<<U as TryIntoUrl>::Err>,
{
self.internal.relay(url).await
self.inner.relay(url).await
}

/// Get subscription filters
pub async fn subscription_filters(&self) -> Vec<Filter> {
self.internal.subscription_filters().await
self.inner.subscription_filters().await
}

/// Add new relay
Expand All @@ -179,7 +127,7 @@ impl RelayPool {
U: TryIntoUrl,
Error: From<<U as TryIntoUrl>::Err>,
{
self.internal.add_relay(url, opts).await
self.inner.add_relay(url, opts).await
}

/// Disconnect and remove relay
Expand All @@ -188,12 +136,12 @@ impl RelayPool {
U: TryIntoUrl,
Error: From<<U as TryIntoUrl>::Err>,
{
self.internal.remove_relay(url).await
self.inner.remove_relay(url).await
}

/// Disconnect and remove all relays
pub async fn remove_all_relays(&self) -> Result<(), Error> {
self.internal.remove_all_relays().await
self.inner.remove_all_relays().await
}

/// Send client message
Expand Down Expand Up @@ -243,7 +191,7 @@ impl RelayPool {
U: TryIntoUrl,
Error: From<<U as TryIntoUrl>::Err>,
{
self.internal.batch_msg_to(urls, msgs, opts).await
self.inner.batch_msg_to(urls, msgs, opts).await
}

/// Send event and wait for `OK` relay msg
Expand Down Expand Up @@ -291,21 +239,21 @@ impl RelayPool {
U: TryIntoUrl,
Error: From<<U as TryIntoUrl>::Err>,
{
self.internal.batch_event_to(urls, events, opts).await
self.inner.batch_event_to(urls, events, opts).await
}

/// Subscribe to filters
///
/// Internal Subscription ID set to `InternalSubscriptionId::Pool`
pub async fn subscribe(&self, filters: Vec<Filter>, opts: RelaySendOptions) {
self.internal.subscribe(filters, opts).await
self.inner.subscribe(filters, opts).await
}

/// Unsubscribe from filters
///
/// Internal Subscription ID set to `InternalSubscriptionId::Pool`
pub async fn unsubscribe(&self, opts: RelaySendOptions) {
self.internal.unsubscribe(opts).await
self.inner.unsubscribe(opts).await
}

/// Get events of filters
Expand Down Expand Up @@ -339,7 +287,9 @@ impl RelayPool {
U: TryIntoUrl,
Error: From<<U as TryIntoUrl>::Err>,
{
self.internal.get_events_from(urls, filters, timeout, opts).await
self.inner
.get_events_from(urls, filters, timeout, opts)
.await
}

/// Request events of filter.
Expand Down Expand Up @@ -387,24 +337,24 @@ impl RelayPool {

/// Connect to all added relays and keep connection alive
pub async fn connect(&self, connection_timeout: Option<Duration>) {
self.internal.connect(connection_timeout).await
self.inner.connect(connection_timeout).await
}

/// Disconnect from all relays
pub async fn disconnect(&self) -> Result<(), Error> {
self.internal.disconnect().await
self.inner.disconnect().await
}

/// Connect to relay
///
/// Internal Subscription ID set to `InternalSubscriptionId::Pool`
pub async fn connect_relay(&self, relay: &Relay, connection_timeout: Option<Duration>) {
self.internal.connect_relay(relay, connection_timeout).await
self.inner.connect_relay(relay, connection_timeout).await
}

/// Negentropy reconciliation
pub async fn reconcile(&self, filter: Filter, opts: NegentropyOptions) -> Result<(), Error> {
self.internal.reconcile(filter, opts).await
self.inner.reconcile(filter, opts).await
}

/// Negentropy reconciliation with custom items
Expand All @@ -414,6 +364,6 @@ impl RelayPool {
items: Vec<(EventId, Timestamp)>,
opts: NegentropyOptions,
) -> Result<(), Error> {
self.internal.reconcile_with_items(filter, items, opts).await
self.inner.reconcile_with_items(filter, items, opts).await
}
}
16 changes: 16 additions & 0 deletions crates/nostr-relay-pool/src/relay/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use async_utility::futures_util::stream::AbortHandle;
use async_utility::{futures_util, thread, time};
use async_wsocket::futures_util::{Future, SinkExt, StreamExt};
use async_wsocket::WsMessage;
use atomic_destructor::AtomicDestroyer;
use nostr::message::relay::NegentropyErrorCode;
use nostr::message::MessageHandleError;
use nostr::negentropy::{self, Bytes, Negentropy};
Expand Down Expand Up @@ -201,6 +202,21 @@ pub(super) struct InternalRelay {
limits: Limits,
}

impl AtomicDestroyer for InternalRelay {
fn name(&self) -> Option<String> {
Some(format!("Relay {}", self.url))
}

fn on_destroy(&self) {
let relay = self.clone();
let _ = thread::spawn(async move {
if let Err(e) = relay.terminate().await {
tracing::error!("Impossible to shutdown {} relay: {e}", relay.url);
}
});
}
}

impl InternalRelay {
/// Create new `Relay`
pub fn new(
Expand Down
Loading

0 comments on commit 7e197ba

Please sign in to comment.