diff --git a/rt/src/process.rs b/rt/src/process.rs index fcd5a308..f3581750 100644 --- a/rt/src/process.rs +++ b/rt/src/process.rs @@ -1,7 +1,6 @@ -use crate::arc_without_weak::ArcWithoutWeak; use crate::mem::{allocate, header_of, Header, TypePointer}; use crate::scheduler::process::Thread; -use crate::scheduler::timeouts::Timeout; +use crate::scheduler::timeouts::TimeoutProcess; use crate::stack::Stack; use crate::state::State; use std::alloc::dealloc; @@ -223,7 +222,7 @@ pub(crate) struct ProcessState { /// /// If missing and the process is suspended, it means the process is /// suspended indefinitely. - timeout: Option>, + timeout: Option, } impl ProcessState { @@ -235,17 +234,7 @@ impl ProcessState { } } - pub(crate) fn has_same_timeout( - &self, - timeout: &ArcWithoutWeak, - ) -> bool { - self.timeout - .as_ref() - .map(|t| t.as_ptr() == timeout.as_ptr()) - .unwrap_or(false) - } - - pub(crate) fn suspend(&mut self, timeout: ArcWithoutWeak) { + pub(crate) fn suspend(&mut self, timeout: TimeoutProcess) { self.timeout = Some(timeout); self.status.set_sleeping(true); } @@ -274,16 +263,13 @@ impl ProcessState { pub(crate) fn waiting_for_value( &mut self, - timeout: Option>, + timeout: Option, ) { self.timeout = timeout; self.status.set_waiting_for_value(true); } - pub(crate) fn waiting_for_io( - &mut self, - timeout: Option>, - ) { + pub(crate) fn waiting_for_io(&mut self, timeout: Option) { self.timeout = timeout; self.status.set_waiting_for_io(true); } @@ -719,10 +705,9 @@ impl DerefMut for ProcessPointer { #[cfg(test)] mod tests { use super::*; - use crate::test::{empty_process_type, setup, OwnedProcess}; + use crate::test::{empty_process_type, OwnedProcess}; use rustix::param::page_size; use std::mem::size_of; - use std::time::Duration; macro_rules! offset_of { ($value: expr, $field: ident) => {{ @@ -860,22 +845,8 @@ mod tests { assert!(RescheduleRights::AcquiredWithTimeout.are_acquired()); } - #[test] - fn test_process_state_has_same_timeout() { - let state = setup(); - let mut proc_state = ProcessState::new(); - let timeout = Timeout::duration(&state, Duration::from_secs(0)); - - assert!(!proc_state.has_same_timeout(&timeout)); - - proc_state.timeout = Some(timeout.clone()); - - assert!(proc_state.has_same_timeout(&timeout)); - } - #[test] fn test_process_state_try_reschedule_after_timeout() { - let state = setup(); let mut proc_state = ProcessState::new(); assert_eq!( @@ -893,7 +864,8 @@ mod tests { assert!(!proc_state.status.is_waiting_for_value()); assert!(!proc_state.status.is_waiting()); - let timeout = Timeout::duration(&state, Duration::from_secs(0)); + let timeout = + TimeoutProcess::new(unsafe { ProcessPointer::new(0x4 as _) }); proc_state.waiting_for_value(Some(timeout)); @@ -908,9 +880,9 @@ mod tests { #[test] fn test_process_state_waiting_for_value() { - let state = setup(); let mut proc_state = ProcessState::new(); - let timeout = Timeout::duration(&state, Duration::from_secs(0)); + let timeout = + TimeoutProcess::new(unsafe { ProcessPointer::new(0x4 as _) }); proc_state.waiting_for_value(None); @@ -943,7 +915,6 @@ mod tests { #[test] fn test_process_state_try_reschedule_for_value() { - let state = setup(); let mut proc_state = ProcessState::new(); assert_eq!( @@ -960,7 +931,7 @@ mod tests { proc_state.status.set_waiting_for_value(true); proc_state.timeout = - Some(Timeout::duration(&state, Duration::from_secs(0))); + Some(TimeoutProcess::new(unsafe { ProcessPointer::new(0x4 as _) })); assert_eq!( proc_state.try_reschedule_for_value(), @@ -1004,11 +975,10 @@ mod tests { #[test] fn test_process_state_suspend() { - let state = setup(); let typ = empty_process_type("A"); let stack = Stack::new(32, page_size()); let process = OwnedProcess::new(Process::alloc(*typ, stack)); - let timeout = Timeout::duration(&state, Duration::from_secs(0)); + let timeout = TimeoutProcess::new(*process); process.state().suspend(timeout); @@ -1018,11 +988,10 @@ mod tests { #[test] fn test_process_timeout_expired() { - let state = setup(); let typ = empty_process_type("A"); let stack = Stack::new(32, page_size()); let process = OwnedProcess::new(Process::alloc(*typ, stack)); - let timeout = Timeout::duration(&state, Duration::from_secs(0)); + let timeout = TimeoutProcess::new(*process); assert!(!process.timeout_expired()); diff --git a/rt/src/runtime/process.rs b/rt/src/runtime/process.rs index fc3e5be3..c4338046 100644 --- a/rt/src/runtime/process.rs +++ b/rt/src/runtime/process.rs @@ -132,10 +132,7 @@ pub unsafe extern "system" fn inko_process_suspend( let timeout = Timeout::duration(state, Duration::from_nanos(nanos as _)); { - let mut proc_state = process.state(); - - proc_state.suspend(timeout.clone()); - state.timeout_worker.suspend(process, timeout); + process.state().suspend(state.timeout_worker.suspend(process, timeout)); } // Safety: the current thread is holding on to the run lock @@ -245,9 +242,6 @@ pub unsafe extern "system" fn inko_process_wait_for_value_until( let state = &*state; let deadline = Timeout::until(nanos); let mut proc_state = process.state(); - - proc_state.waiting_for_value(Some(deadline.clone())); - let _ = (*lock).compare_exchange( current, new, @@ -255,10 +249,12 @@ pub unsafe extern "system" fn inko_process_wait_for_value_until( Ordering::Acquire, ); + proc_state.waiting_for_value(Some( + state.timeout_worker.suspend(process, deadline), + )); drop(proc_state); // Safety: the current thread is holding on to the run lock - state.timeout_worker.suspend(process, deadline); context::switch(process); process.timeout_expired() } diff --git a/rt/src/runtime/socket.rs b/rt/src/runtime/socket.rs index f2f9fad1..ab6665be 100644 --- a/rt/src/runtime/socket.rs +++ b/rt/src/runtime/socket.rs @@ -28,8 +28,9 @@ pub(crate) unsafe extern "system" fn inko_socket_poll( if deadline >= 0 { let time = Timeout::until(deadline as u64); - proc_state.waiting_for_io(Some(time.clone())); - state.timeout_worker.suspend(process, time); + proc_state.waiting_for_io(Some( + state.timeout_worker.suspend(process, time), + )); } else { proc_state.waiting_for_io(None); } diff --git a/rt/src/scheduler/timeout_worker.rs b/rt/src/scheduler/timeout_worker.rs index ed29b4ee..e9b0a22d 100644 --- a/rt/src/scheduler/timeout_worker.rs +++ b/rt/src/scheduler/timeout_worker.rs @@ -1,8 +1,7 @@ //! Rescheduling of processes with expired timeouts. -use crate::arc_without_weak::ArcWithoutWeak; use crate::process::ProcessPointer; use crate::scheduler::process::Scheduler; -use crate::scheduler::timeouts::{Timeout, Timeouts}; +use crate::scheduler::timeouts::{Timeout, TimeoutProcess, Timeouts}; use crate::state::State; use std::cell::UnsafeCell; use std::collections::VecDeque; @@ -23,8 +22,9 @@ const QUEUE_START_CAPACITY: usize = 1024 / size_of::(); const FRAGMENTATION_THRESHOLD: f64 = 0.1; struct Message { - process: ProcessPointer, - timeout: ArcWithoutWeak, + id: usize, + process: TimeoutProcess, + timeout: Timeout, } /// The inner part of a worker, only accessible by the owning thread. @@ -100,12 +100,16 @@ impl TimeoutWorker { pub(crate) fn suspend( &self, process: ProcessPointer, - timeout: ArcWithoutWeak, - ) { + timeout: Timeout, + ) -> TimeoutProcess { + let id = process.identifier(); + let ours = TimeoutProcess::new(process); + let theirs = ours.clone(); let mut queue = self.queue.lock().unwrap(); - queue.push_back(Message { process, timeout }); + queue.push_back(Message { id, process: ours, timeout }); self.cvar.notify_one(); + theirs } fn run_iteration(&self, state: &State) -> Option { @@ -153,7 +157,7 @@ impl TimeoutWorker { fn handle_pending_messages(&self) { while let Some(msg) = self.inner_mut().queue.pop_front() { - self.inner_mut().timeouts.insert(msg.process, msg.timeout); + self.inner_mut().timeouts.insert(msg.id, msg.process, msg.timeout); } } @@ -234,8 +238,9 @@ mod tests { for time in &[10_u64, 5_u64] { let timeout = Timeout::duration(&state, Duration::from_secs(*time)); - process.state().waiting_for_value(Some(timeout.clone())); - worker.suspend(process, timeout); + process + .state() + .waiting_for_value(Some(worker.suspend(process, timeout))); } worker.increase_expired_timeouts(); @@ -258,8 +263,9 @@ mod tests { let worker = TimeoutWorker::new(); let timeout = Timeout::duration(&state, Duration::from_secs(10)); - process.state().waiting_for_value(Some(timeout.clone())); - worker.suspend(process, timeout); + process + .state() + .waiting_for_value(Some(worker.suspend(process, timeout))); worker.run_iteration(&state); assert_eq!(worker.inner().timeouts.len(), 1); @@ -273,8 +279,9 @@ mod tests { let worker = TimeoutWorker::new(); let timeout = Timeout::duration(&state, Duration::from_secs(0)); - process.state().waiting_for_value(Some(timeout.clone())); - worker.suspend(process, timeout); + process + .state() + .waiting_for_value(Some(worker.suspend(process, timeout))); worker.run_iteration(&state); assert_eq!(worker.inner().timeouts.len(), 0); @@ -288,8 +295,9 @@ mod tests { let worker = TimeoutWorker::new(); let timeout = Timeout::duration(&state, Duration::from_secs(1)); - process.state().waiting_for_value(Some(timeout.clone())); - worker.suspend(process, timeout); + process + .state() + .waiting_for_value(Some(worker.suspend(process, timeout))); worker.move_messages(); worker.handle_pending_messages(); worker.defragment_heap(); @@ -308,8 +316,9 @@ mod tests { for time in &[1_u64, 1_u64] { let timeout = Timeout::duration(&state, Duration::from_secs(*time)); - process.state().waiting_for_value(Some(timeout.clone())); - worker.suspend(process, timeout); + process + .state() + .waiting_for_value(Some(worker.suspend(process, timeout))); } worker.increase_expired_timeouts(); diff --git a/rt/src/scheduler/timeouts.rs b/rt/src/scheduler/timeouts.rs index af7d3b7e..5e494557 100644 --- a/rt/src/scheduler/timeouts.rs +++ b/rt/src/scheduler/timeouts.rs @@ -5,9 +5,11 @@ use crate::state::State; use std::cmp; use std::collections::BinaryHeap; use std::ops::Drop; +use std::sync::{Mutex, MutexGuard}; use std::time::{Duration, Instant}; /// An process that should be resumed after a certain point in time. +#[derive(Eq, PartialEq)] pub(crate) struct Timeout { /// The time after which to resume, in nanoseconds since the runtime epoch. /// @@ -20,14 +22,11 @@ pub(crate) struct Timeout { } impl Timeout { - pub(crate) fn until(nanos: u64) -> ArcWithoutWeak { - ArcWithoutWeak::new(Timeout { resume_after: nanos }) + pub(crate) fn until(nanos: u64) -> Self { + Timeout { resume_after: nanos } } - pub(crate) fn duration( - state: &State, - duration: Duration, - ) -> ArcWithoutWeak { + pub(crate) fn duration(state: &State, duration: Duration) -> Self { let deadline = (Instant::now() - state.start_time + duration).as_nanos() as u64; @@ -40,6 +39,44 @@ impl Timeout { } } +/// A process that's suspended for a timeout. +/// +/// Both the timeouts heap and the process have access to this data. When a +/// process is rescheduled by another process, it swaps the internal value with +/// a None and the timeouts logic will discard the entry. This ensures that if a +/// process is dropped while one or more (stale) entries exist, we don't try to +/// use the now invalidated process pointer. +#[derive(Clone)] +pub(crate) struct TimeoutProcess { + inner: ArcWithoutWeak>>, +} + +impl TimeoutProcess { + pub(crate) fn new(process: ProcessPointer) -> TimeoutProcess { + TimeoutProcess { inner: ArcWithoutWeak::new(Mutex::new(Some(process))) } + } + + pub(crate) fn is_valid(&self) -> bool { + self.inner.lock().unwrap().is_some() + } + + pub(crate) fn get(&self) -> MutexGuard> { + self.inner.lock().unwrap() + } + + pub(crate) fn take(&self) -> Option { + self.get().take() + } +} + +impl Drop for TimeoutProcess { + fn drop(&mut self) { + // This ensures that if some process P₁ expires the timeout for process + // P₂, we stop using the pointer to P₂. + self.take(); + } +} + /// A Timeout and a Process to store in the timeout heap. /// /// Since the Timeout is also stored in an process we can't also store an @@ -47,16 +84,18 @@ impl Timeout { /// around this, we store the two values in this separate TimeoutEntry /// structure. struct TimeoutEntry { - timeout: ArcWithoutWeak, - process: ProcessPointer, + timeout: Timeout, + id: usize, + process: TimeoutProcess, } impl TimeoutEntry { pub(crate) fn new( - process: ProcessPointer, - timeout: ArcWithoutWeak, + id: usize, + process: TimeoutProcess, + timeout: Timeout, ) -> Self { - TimeoutEntry { timeout, process } + TimeoutEntry { timeout, id, process } } } @@ -78,7 +117,7 @@ impl Ord for TimeoutEntry { impl PartialEq for TimeoutEntry { fn eq(&self, other: &Self) -> bool { self.timeout.resume_after == other.timeout.resume_after - && self.process.identifier() == other.process.identifier() + && self.id == other.id } } @@ -105,10 +144,11 @@ impl Timeouts { pub(crate) fn insert( &mut self, - process: ProcessPointer, - timeout: ArcWithoutWeak, + id: usize, + process: TimeoutProcess, + timeout: Timeout, ) { - self.timeouts.push(TimeoutEntry::new(process, timeout)); + self.timeouts.push(TimeoutEntry::new(id, process, timeout)); } pub(crate) fn len(&self) -> usize { @@ -121,7 +161,7 @@ impl Timeouts { .timeouts .drain() .filter(|entry| { - if entry.process.state().has_same_timeout(&entry.timeout) { + if entry.process.is_valid() { true } else { removed += 1; @@ -143,16 +183,16 @@ impl Timeouts { let mut time_until_expiration = None; while let Some(entry) = self.timeouts.pop() { - let mut proc_state = entry.process.state(); - - if !proc_state.has_same_timeout(&entry.timeout) { - continue; - } + // TODO: this deadlocks as another process might acquire the locks + // in the opposite order. + let entry_lock = entry.process.get(); + let Some(proc) = *entry_lock else { continue }; + let mut proc_state = proc.state(); if let Some(duration) = entry.timeout.remaining_time(state) { drop(proc_state); + drop(entry_lock); self.timeouts.push(entry); - time_until_expiration = Some(duration); // If this timeout didn't expire yet, any following timeouts @@ -160,9 +200,16 @@ impl Timeouts { break; } + // We need to drop this lock first otherwise we might deadlock in + // the call below. + drop(entry_lock); + if proc_state.try_reschedule_after_timeout().are_acquired() { drop(proc_state); - reschedule.push(entry.process); + + // Safety: we are holding on to the process run lock, so the + // process pointer won't be invalidated. + reschedule.push(proc); } } @@ -173,11 +220,11 @@ impl Timeouts { impl Drop for Timeouts { fn drop(&mut self) { for entry in &self.timeouts { - if entry.process.state().has_same_timeout(&entry.timeout) { - // We may encounter outdated timeouts. In this case the process - // may have been rescheduled and/or already dropped. - Process::drop_and_deallocate(entry.process); - } + let Some(proc) = entry.process.take() else { continue }; + + // We may encounter outdated timeouts. In this case the process + // may have been rescheduled and/or already dropped. + Process::drop_and_deallocate(proc); } } } @@ -230,12 +277,14 @@ mod tests { let typ = empty_process_type("A"); let process = new_process(*typ); let entry1 = TimeoutEntry::new( - *process, + 0, + TimeoutProcess::new(*process), Timeout::duration(&state, Duration::from_secs(1)), ); let entry2 = TimeoutEntry::new( - *process, + 1, + TimeoutProcess::new(*process), Timeout::duration(&state, Duration::from_secs(5)), ); @@ -253,12 +302,14 @@ mod tests { let typ = empty_process_type("A"); let process = new_process(*typ); let entry1 = TimeoutEntry::new( - *process, + 0, + TimeoutProcess::new(*process), Timeout::duration(&state, Duration::from_secs(1)), ); let entry2 = TimeoutEntry::new( - *process, + 1, + TimeoutProcess::new(*process), Timeout::duration(&state, Duration::from_secs(5)), ); @@ -272,12 +323,14 @@ mod tests { let typ = empty_process_type("A"); let process = new_process(*typ); let entry1 = TimeoutEntry::new( - *process, + 0, + TimeoutProcess::new(*process), Timeout::duration(&state, Duration::from_secs(1)), ); let entry2 = TimeoutEntry::new( - *process, + 1, + TimeoutProcess::new(*process), Timeout::duration(&state, Duration::from_secs(5)), ); @@ -297,7 +350,11 @@ mod tests { let mut timeouts = Timeouts::new(); let timeout = Timeout::duration(&state, Duration::from_secs(10)); - timeouts.insert(*process, timeout); + timeouts.insert( + 0, + TimeoutProcess::new(process.take_and_forget()), + timeout, + ); assert_eq!(timeouts.timeouts.len(), 1); } @@ -310,7 +367,11 @@ mod tests { let mut timeouts = Timeouts::new(); let timeout = Timeout::duration(&state, Duration::from_secs(10)); - timeouts.insert(*process, timeout); + timeouts.insert( + 0, + TimeoutProcess::new(process.take_and_forget()), + timeout, + ); assert_eq!(timeouts.len(), 1); } @@ -321,10 +382,11 @@ mod tests { let typ = empty_process_type("A"); let process = Process::alloc(*typ, Stack::new(1024, page_size())); let mut timeouts = Timeouts::new(); + let tproc = TimeoutProcess::new(process); let timeout = Timeout::duration(&state, Duration::from_secs(10)); - process.state().waiting_for_value(Some(timeout.clone())); - timeouts.insert(process, timeout); + process.state().waiting_for_value(Some(tproc.clone())); + timeouts.insert(0, tproc, timeout); assert_eq!(timeouts.remove_invalid_entries(), 0); assert_eq!(timeouts.len(), 1); @@ -337,8 +399,10 @@ mod tests { let process = new_process(*typ); let mut timeouts = Timeouts::new(); let timeout = Timeout::duration(&state, Duration::from_secs(10)); + let tproc = TimeoutProcess::new(*process); - timeouts.insert(*process, timeout); + tproc.take(); + timeouts.insert(0, tproc, timeout); assert_eq!(timeouts.remove_invalid_entries(), 1); assert_eq!(timeouts.len(), 0); @@ -351,8 +415,10 @@ mod tests { let process = new_process(*typ); let mut timeouts = Timeouts::new(); let timeout = Timeout::duration(&state, Duration::from_secs(10)); + let tproc = TimeoutProcess::new(*process); - timeouts.insert(*process, timeout); + tproc.take(); + timeouts.insert(0, tproc, timeout); let (reschedule, expiration) = timeouts.processes_to_reschedule(&state); @@ -368,9 +434,10 @@ mod tests { let process = Process::alloc(*typ, Stack::new(1024, page_size())); let mut timeouts = Timeouts::new(); let timeout = Timeout::duration(&state, Duration::from_secs(10)); + let tproc = TimeoutProcess::new(process); - process.state().waiting_for_value(Some(timeout.clone())); - timeouts.insert(process, timeout); + process.state().waiting_for_value(Some(tproc.clone())); + timeouts.insert(0, tproc, timeout); let (reschedule, expiration) = timeouts.processes_to_reschedule(&state); @@ -387,9 +454,10 @@ mod tests { let process = new_process(*typ); let mut timeouts = Timeouts::new(); let timeout = Timeout::duration(&state, Duration::from_secs(0)); + let tproc = TimeoutProcess::new(*process); - process.state().waiting_for_value(Some(timeout.clone())); - timeouts.insert(*process, timeout); + process.state().waiting_for_value(Some(tproc.clone())); + timeouts.insert(0, tproc, timeout); let (reschedule, expiration) = timeouts.processes_to_reschedule(&state);