Skip to content

Commit

Permalink
Make cancel tests more robust
Browse files Browse the repository at this point in the history
Similar to what we did for the cancel_all tests.
  • Loading branch information
Thomasdezeeuw committed Jun 8, 2024
1 parent a64b9fc commit 2043d42
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 23 deletions.
16 changes: 6 additions & 10 deletions tests/async_fd/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ use a10::{Extract, Ring};

use crate::async_fd::io::{BadBuf, BadBufSlice, BadReadBuf, BadReadBufSlice};
use crate::util::{
bind_and_listen_ipv4, bind_ipv4, block_on, expect_io_errno, expect_io_error_kind, init,
is_send, is_sync, new_socket, next, require_kernel, start_op, syscall, tcp_ipv4_socket,
test_queue, udp_ipv4_socket, Waker,
bind_and_listen_ipv4, bind_ipv4, block_on, cancel, expect_io_errno, expect_io_error_kind, init,
is_send, is_sync, new_socket, next, require_kernel, start_mulitshot_op, start_op, syscall,
tcp_ipv4_socket, test_queue, udp_ipv4_socket, Waker,
};

const DATA1: &[u8] = b"Hello, World!";
Expand Down Expand Up @@ -121,12 +121,10 @@ fn cancel_accept() {
let listener = waker.block_on(tcp_ipv4_socket(sq));
bind_and_listen_ipv4(&listener);

let accept = listener.accept::<NoAddress>();
let mut accept = std::pin::pin!(accept);
start_op(&mut accept);
let mut accept = listener.accept::<NoAddress>();

// Then cancel the accept multishot call.
waker.block_on(accept.as_mut().cancel()).unwrap();
cancel(&waker, &mut accept, start_op);

expect_io_errno(waker.block_on(accept), libc::ECANCELED);
}
Expand Down Expand Up @@ -252,9 +250,7 @@ fn cancel_multishot_accept() {
let c_addr1 = peer_addr(client1.as_fd()).expect("failed to get address");

// Then cancel the accept multishot call.
waker
.block_on(accept_stream.cancel())
.expect("failed to cancel");
cancel(&waker, &mut accept_stream, start_mulitshot_op);

// We should still be able to accept the second connection.
let client2 = waker
Expand Down
26 changes: 13 additions & 13 deletions tests/ring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use std::task::{self, Poll, Wake};
use std::thread;
use std::time::{Duration, Instant};

use a10::cancel::Cancel;
use a10::fs::{Open, OpenOptions};
use a10::io::ReadBufPool;
use a10::msg::{msg_listener, send_msg, try_send_msg, MsgListener, MsgToken, SendMsg};
Expand All @@ -24,7 +23,7 @@ use a10::{fd, mem, process, Config, Ring, SubmissionQueue};

mod util;
use util::{
defer, expect_io_errno, init, is_send, is_sync, next, poll_nop, require_kernel,
cancel, defer, expect_io_errno, init, is_send, is_sync, next, poll_nop, require_kernel,
start_mulitshot_op, start_op, test_queue, Waker,
};

Expand Down Expand Up @@ -340,10 +339,9 @@ fn cancel_oneshot_poll() {

let (receiver, sender) = pipe2().unwrap();

let mut receiver_read = pin!(oneshot_poll(&sq, receiver.as_fd(), libc::POLLIN as _));
start_op(&mut receiver_read);
let mut receiver_read = oneshot_poll(&sq, receiver.as_fd(), libc::POLLIN as _);

waker.block_on(receiver_read.cancel()).unwrap();
cancel(&waker, &mut receiver_read, start_op);
expect_io_errno(waker.block_on(receiver_read), libc::ECANCELED);
drop(sender);
}
Expand Down Expand Up @@ -384,10 +382,9 @@ fn cancel_multishot_poll() {

let (receiver, sender) = pipe2().unwrap();

let mut receiver_read = pin!(multishot_poll(&sq, receiver.as_fd(), libc::POLLIN as _));
start_mulitshot_op(&mut receiver_read);
let mut receiver_read = multishot_poll(&sq, receiver.as_fd(), libc::POLLIN as _);

waker.block_on(receiver_read.cancel()).unwrap();
cancel(&waker, &mut receiver_read, start_mulitshot_op);
assert!(waker.block_on(next(receiver_read)).is_none());
drop(sender);
}
Expand Down Expand Up @@ -478,12 +475,15 @@ fn process_wait_on_cancel() {
let mut process = Command::new("sleep").arg("1000").spawn().unwrap();

let mut future = process::wait_on(sq, &process, libc::WEXITED);
let result = poll_nop(Pin::new(&mut future));
if !result.is_pending() {
panic!("unexpected result, expected it to return Poll::Pending");
}

waker.block_on(future.cancel()).unwrap();
cancel(&waker, &mut future, |future| {
// NOTE: can't use `start_op` as `siginfo_t` doesn't implemented
// `fmt::Debug`.
let result = poll_nop(Pin::new(future));
if !result.is_pending() {
panic!("unexpected result, expected it to return Poll::Pending");
}
});

process.kill().unwrap();
process.wait().unwrap();
Expand Down
29 changes: 29 additions & 0 deletions tests/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::task::{self, Poll};
use std::thread::{self, Thread};
use std::{fmt, mem, panic, process, ptr, str};

use a10::cancel::Cancel;
use a10::fd::Descriptor;
use a10::net::socket;
use a10::{AsyncFd, Ring, SubmissionQueue};
Expand Down Expand Up @@ -204,6 +205,34 @@ impl Waker {
}
}

/// Cancel `operation`.
///
/// `Future`s are inert and we can't determine if an operation has actually been
/// started or the submission queue was full. This means that we can't ensure
/// that the operation has been queued with the Kernel. This means that in some
/// cases we won't actually cancel the operation simply because it hasn't
/// started. This introduces flakyness in the tests.
///
/// To work around this we have this cancel function. If we fail to cancel the
/// operation we try to start the operation using `start_op`, before canceling t
/// again. Looping until the operation is canceled.
#[track_caller]
pub(crate) fn cancel<O, F>(waker: &Arc<Waker>, operation: &mut O, start_op: F)
where
O: Cancel,
F: Fn(&mut O),
{
for _ in 0..100 {
start_op(operation);
match waker.block_on(operation.cancel()) {
Ok(()) => return,
Err(ref err) if err.kind() == io::ErrorKind::NotFound => continue,
Err(err) => panic!("unexpected error canceling operation: {err}"),
}
}
panic!("couldn't cancel operation");
}

/// Cancel all operations of `fd`.
///
/// `Future`s are inert and we can't determine if an operation has actually been
Expand Down

0 comments on commit 2043d42

Please sign in to comment.