Skip to content

Commit

Permalink
API improvement
Browse files Browse the repository at this point in the history
- `shutdown` and `panic` commands are no longer sendable by the user,
  they are now part of the communication handle that they consume
  • Loading branch information
alfred-hodler committed May 20, 2024
1 parent dc4d56b commit 0b2340e
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 36 deletions.
104 changes: 77 additions & 27 deletions src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ pub enum Command<M: Message, T: IntoTarget> {
Disconnect(PeerId),
/// Send a message to a peer.
Message(PeerId, M),
}

/// System commands. Not for external use.
#[derive(Debug)]
enum SystemCommand<M: Message, T: IntoTarget> {
/// Various P2P commands.
P2P(Command<M, T>),
/// Close all connections and shut down the reactor.
Shutdown,
/// Causes the event loop to panic. Only available in debug mode for integration testing.
Expand Down Expand Up @@ -113,7 +120,7 @@ where
poll: Poll,
config: Config,
sender: EventSender<M, T>,
receiver: Receiver<Command<M, T>>,
receiver: Receiver<SystemCommand<M, T>>,
connector: C,
waker: Arc<Waker>,
connect_tx: crossbeam_channel::Sender<ConnectResult<T>>,
Expand Down Expand Up @@ -201,7 +208,7 @@ where
/// Provides bidirectional communication with a reactor. If this is dropped the reactor stops.
pub struct Handle<M: Message, T: IntoTarget> {
waker: Arc<Waker>,
sender: Sender<Command<M, T>>,
sender: Sender<SystemCommand<M, T>>,
receiver: Receiver<Event<M, T>>,
}

Expand All @@ -211,9 +218,9 @@ impl<M: Message, T: IntoTarget> Handle<M, T> {
/// it is appropriate for use in async contexts.
pub fn send(&self, command: Command<M, T>) -> io::Result<()> {
#[cfg(not(feature = "async"))]
let result = self.sender.send(command);
let result = self.sender.send(SystemCommand::P2P(command));
#[cfg(feature = "async")]
let result = self.sender.try_send(command);
let result = self.sender.try_send(SystemCommand::P2P(command));

match result {
Ok(()) => self.waker.wake(),
Expand Down Expand Up @@ -242,6 +249,39 @@ impl<M: Message, T: IntoTarget> Handle<M, T> {
pub fn receiver(&self) -> &Receiver<Event<M, T>> {
&self.receiver
}

/// Shuts down the reactor and consumes the handle. No further commands can be sent afterward.
pub fn shutdown(self) -> io::Result<()> {
#[cfg(not(feature = "async"))]
let result = self.sender.send(SystemCommand::Shutdown);
#[cfg(feature = "async")]
let result = self.sender.try_send(SystemCommand::Shutdown);

match result {
Ok(()) => self.waker.wake(),
Err(_) => Err(io::Error::new(
io::ErrorKind::BrokenPipe,
"channel disconnected",
)),
}
}

/// Causes the reactor to panic. For testing only. No further commands can be sent afterward.
#[cfg(debug_assertions)]
pub fn panic(self) -> io::Result<()> {
#[cfg(not(feature = "async"))]
let result = self.sender.send(SystemCommand::Panic);
#[cfg(feature = "async")]
let result = self.sender.try_send(SystemCommand::Panic);

match result {
Ok(()) => self.waker.wake(),
Err(_) => Err(io::Error::new(
io::ErrorKind::BrokenPipe,
"channel disconnected",
)),
}
}
}

/// The direction of a peer connection.
Expand Down Expand Up @@ -357,11 +397,11 @@ where
log::trace!("command: {:?}", cmd);

match cmd {
Command::Connect(target) => {
SystemCommand::P2P(Command::Connect(target)) => {
initiate_connect(&connector, target, &waker, &connect_tx);
}

Command::Disconnect(peer) => {
SystemCommand::P2P(Command::Disconnect(peer)) => {
if token_map.contains_key(&peer.0) {
let mut connection = remove_stream(
poll.registry(),
Expand All @@ -385,32 +425,42 @@ where
}
}

Command::Message(peer, message) => match token_map.get(&peer.0) {
Some(token) => {
let connection =
connections.get_mut(token.0).expect("must exist here");

if connection.stream.queue_message(&message) {
poll.registry().reregister(
connection.stream.as_source(),
*token,
Interest::READABLE | Interest::WRITABLE,
)?;
} else {
sender.send(Event::SendBufferFull { peer, message });
log::debug!("message: send buffer for peer {peer} is full");
SystemCommand::P2P(Command::Message(peer, message)) => {
match token_map.get(&peer.0) {
Some(token) => {
let connection =
connections.get_mut(token.0).expect("must exist here");

if connection.stream.queue_message(&message) {
poll.registry().reregister(
connection.stream.as_source(),
*token,
Interest::READABLE | Interest::WRITABLE,
)?;
} else {
sender.send(Event::SendBufferFull { peer, message });
log::debug!(
"message: send buffer for peer {peer} is full"
);
}
}
}

None => {
sender.send(Event::NoPeer(peer));
log::warn!("message: peer {peer} not found");
None => {
sender.send(Event::NoPeer(peer));
log::warn!("message: peer {peer} not found");
}
}
},
}

Command::Shutdown => {
SystemCommand::Shutdown => {
for (id, mut connection) in connections {
let _ = connection.stream.write(now);
if connection
.stream
.is_write_stale(now + Duration::from_secs(3600))
{
log::debug!("shutdown: connection had unsent data");
}
let r = connection.stream.shutdown();

log::debug!("shutdown: stream {}: {:?}", id, r);
Expand All @@ -420,7 +470,7 @@ where
}

#[cfg(debug_assertions)]
Command::Panic => panic!("panic command received"),
SystemCommand::Panic => panic!("panic command received"),
}
}
}
Expand Down
29 changes: 20 additions & 9 deletions tests/disconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,43 @@ use common::Message;
// orderly, disorderly, and abrupt. In each instance, the remaining party must notice the
// disconnect immediately and without fail.

enum LeaveType {
Disconnect(PeerId),
Shutdown,
Panic,
}

#[test]
fn client_orderly_disconnect() {
shutdown_test(8000, Command::Disconnect(PeerId(0)), true);
shutdown_test(8000, LeaveType::Disconnect(PeerId(0)), true);
}

#[test]
fn client_shutdown_leave() {
shutdown_test(8001, Command::Shutdown, true);
shutdown_test(8001, LeaveType::Shutdown, true);
}

#[test]
fn client_abrupt_leave() {
shutdown_test(8002, Command::Panic, true);
shutdown_test(8002, LeaveType::Panic, true);
}

#[test]
fn server_orderly_disconnect() {
shutdown_test(8003, Command::Disconnect(PeerId(0)), false);
shutdown_test(8003, LeaveType::Disconnect(PeerId(0)), false);
}

#[test]
fn server_shutdown_leave() {
shutdown_test(8004, Command::Shutdown, false);
shutdown_test(8004, LeaveType::Shutdown, false);
}

#[test]
fn server_abrupt_leave() {
shutdown_test(8005, Command::Panic, false);
shutdown_test(8005, LeaveType::Panic, false);
}

fn shutdown_test(port: u16, shutdown_command: Command<Message, String>, client_is_leaving: bool) {
fn shutdown_test(port: u16, shutdown_command: LeaveType, client_is_leaving: bool) {
let _ = env_logger::builder().is_test(true).try_init();

let server_addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into();
Expand All @@ -50,7 +56,7 @@ fn shutdown_test(port: u16, shutdown_command: Command<Message, String>, client_i
..Default::default()
};

let (server_reactor, server_handle) = Reactor::new(config).unwrap();
let (server_reactor, server_handle) = Reactor::<_, Message, _>::new(config).unwrap();
let (client_reactor, client_handle) = Reactor::new(Config::default()).unwrap();

let _ = server_reactor.run();
Expand Down Expand Up @@ -82,7 +88,12 @@ fn shutdown_test(port: u16, shutdown_command: Command<Message, String>, client_i
(server_handle, client_handle)
};

leaving.send(shutdown_command).unwrap();
match shutdown_command {
LeaveType::Disconnect(peer_id) => leaving.send(Command::Disconnect(peer_id)),
LeaveType::Shutdown => leaving.shutdown(),
LeaveType::Panic => leaving.panic(),
}
.unwrap();

assert!(matches!(
remaining.receive_blocking().unwrap(),
Expand Down

0 comments on commit 0b2340e

Please sign in to comment.