diff --git a/std/src/std/sync.inko b/std/src/std/sync.inko index 409d153d..55b675f8 100644 --- a/std/src/std/sync.inko +++ b/std/src/std/sync.inko @@ -58,6 +58,12 @@ let NO_WAITER = 0 let UNLOCKED = 0 let LOCKED = 1 +type copy enum Status { + case Connected + case NoFuture + case NoPromise +} + # The state shared between a `Future` and a `Promise`. type FutureState[T] { # A spinlock used to restrict access to the state to a single thread/process @@ -76,11 +82,8 @@ type FutureState[T] { # them to begin with for this particular workload. let @locked: UInt8 - # A flag indicating if both the `Future` and `Promise` still exist. - # - # When either the `Future` or `Promise` is dropped, it sets this flag to - # `false` and the other half is responsible for cleaning up the shared state. - let @connected: Bool + # The status of the `Future` and `Promise`. + let @status: Status # The process waiting for a value to be written to the future. # @@ -146,7 +149,7 @@ type pub Future[T] { let fut: FutureState[uni T] = FutureState( waiter: NO_WAITER as Pointer[UInt8], locked: UNLOCKED as UInt8, - connected: true, + status: Status.Connected, value: Option.None, ) @@ -210,8 +213,8 @@ type pub Future[T] { inko_process_wait_for_value( _INKO.process, mut fut.locked, - 1 as UInt8, - 0 as UInt8, + LOCKED as UInt8, + UNLOCKED as UInt8, ) # Ensure the shared state isn't dropped. @@ -224,9 +227,23 @@ type pub Future[T] { # Returns the value of the future, blocking the calling process until a value # is available or the given deadline is exceeded. # - # If a value is resolved within the deadline, a `Result.Ok` containing the - # value is returned. If the timeout expired, a `Result.Error` is returned - # containing a new `Future` to use for resolving the value. + # If a value is resolved within the deadline, an `Option.Some` containing the + # value is returned. If the timeout expired, an `Option.None` is returned. + # + # In both cases `self` is consumed. This is because trying to wait for a + # result is inherently racy, and may result in unexpected results. For + # example, if a value were to be written using `Promise.set` _just_ after we + # return from this method, we wouldn't observe it unless the operation is + # retried. If we don't do so, the value would be dropped. + # + # However, it's more often than not clear how often the operation should be + # retried, as the time waited might not necessarily be the same or longer as + # the time it takes before `Promise.set` is called. + # + # Always consuming `self` instead forces the caller to create a new `Promise` + # and `Future` pair _if_ a retry is desired, and ensures that _if_ + # `Promise.set` is called _after_ returning from this method the value passed + # to `Promise.set` is returned to its caller. # # # Deadlocks # @@ -244,13 +261,11 @@ type pub Future[T] { # match Future.new { # case (future, promise) -> { # promise.set(42) - # future.get_until(Duration.from_secs(1)) # => Result.Ok(42) + # future.get_until(Duration.from_secs(1)) # => Option.Some(42) # } # } # ``` - fn pub move get_until[D: ToInstant]( - deadline: ref D, - ) -> Result[uni T, Future[T]] { + fn pub move get_until[D: ToInstant](deadline: ref D) -> Option[uni T] { let nanos = deadline.to_instant.to_int as UInt64 loop { @@ -262,7 +277,7 @@ type pub Future[T] { # Ensure the shared state isn't dropped. _INKO.moved(fut) - return Result.Ok(val) + return Option.Some(val) } case _ -> { fut.waiter = _INKO.process @@ -273,18 +288,42 @@ type pub Future[T] { _INKO.state, _INKO.process, mut fut.locked, - 1 as UInt8, - 0 as UInt8, + LOCKED as UInt8, + UNLOCKED as UInt8, nanos, ) # Ensure the shared state isn't dropped. _INKO.moved(fut) - if timed_out { return Result.Error(self) } + if timed_out { break } } } } + + # It's possible for a write to happen _just_ after we time out. We don't + # want to silently discard the value in that case. In addition, it's + # possible for a value to be written after returning from this method, which + # would result in the value also being lost. + # + # To prevent this from happening we disconnect the future immediately and + # perform a final check to see if a value is present. This ensures that + # beyond this point any values written using `Promise.set` are returned to + # the caller, instead of just being dropped. + let fut = lock + + match fut.status { + case Connected -> fut.status = Status.NoFuture + case _ -> {} + } + + let val = fut.value := Option.None + + fut.unlock + + # Ensure the shared state isn't dropped. + _INKO.moved(fut) + val } # Returns the value of the future if one is present, without blocking the @@ -337,18 +376,27 @@ impl Drop for Future { fn mut drop { let fut = lock - if fut.connected { - # The `Promise` is still present, so it will be tasked with cleaning up - # the shared state. - fut.connected = false - fut.unlock - - # Ensure the shared state isn't dropped. - _INKO.moved(fut) - } else { - # The `Promise` is already dropped, so it's our job to clean up the shared - # state. - drop_value(fut) + match fut.status { + case Connected -> { + fut.status = Status.NoFuture + fut.unlock + + # Ensure the shared state isn't dropped. + _INKO.moved(fut) + } + case NoPromise -> { + # The `Promise` is already dropped, so it's our job to clean up the + # shared state. + drop_value(fut) + } + case _ -> { + # We can encounter this branch if Future.get_until times out because it + # sets the status to NoFuture. + fut.unlock + + # Ensure the shared state isn't dropped. + _INKO.moved(fut) + } } } } @@ -405,10 +453,15 @@ type pub Promise[T] { let val = Option.Some(value) let fut = lock - if fut.connected.false? { - fut.unlock - _INKO.moved(fut) - return val + match fut.status { + case NoFuture -> { + fut.unlock + + # Ensure the shared state isn't dropped. + _INKO.moved(fut) + return val + } + case _ -> {} } let waiter = fut.waiter := NO_WAITER as Pointer[UInt8] @@ -416,7 +469,7 @@ type pub Promise[T] { fut.value = val fut.unlock - # Ensure we don't drop the shared state. + # Ensure the shared state isn't dropped. _INKO.moved(fut) # If the waiter is waiting for a value, we have to reschedule it. @@ -439,16 +492,25 @@ impl Drop for Promise { fn mut drop { let fut = lock - if fut.connected { - # The `Future` is still present, so it will be tasked with cleaning up the - # shared state. - fut.connected = false - fut.unlock - _INKO.moved(fut) - } else { - # The `Future` is already dropped, so it's our job to clean up the shared - # state. - drop_value(fut) + match fut.status { + case Connected -> { + fut.status = Status.NoPromise + fut.unlock + + # Ensure the shared state isn't dropped. + _INKO.moved(fut) + return + } + case NoFuture -> { + # The `Future` is already dropped, so it's our job to clean up the + # shared state. + drop_value(fut) + } + case _ -> { + # This ensures `fut` is moved in all branches, such that we don't try to + # double drop it outside the match. + _INKO.moved(fut) + } } } } @@ -622,7 +684,7 @@ type pub inline Channel[T] { match Future.new { case (future, promise) -> { @state.receive(promise) - future.get_until(deadline).ok + future.get_until(deadline) } } } diff --git a/std/test/std/test_sync.inko b/std/test/std/test_sync.inko index ec4c0c9a..75db97e2 100644 --- a/std/test/std/test_sync.inko +++ b/std/test/std/test_sync.inko @@ -27,20 +27,17 @@ fn pub tests(t: mut Tests) { } }) - t.ok('Future.get_until', fn (t) { + t.test('Future.get_until', fn (t) { match int_future { - case (r, w) -> { - let r = match r.get_until(Duration.from_millis(1)) { - case Ok(_) -> throw 'expected an Error' - case Error(r) -> r - } + case (r, _w) -> t.equal(r.get_until(Duration.from_millis(1)), Option.None) + } + match int_future { + case (r, w) -> { w.set(42) - t.equal(r.get_until(Duration.from_millis(1)).ok, Option.Some(42)) + t.equal(r.get_until(Duration.from_millis(1)), Option.Some(42)) } } - - Result.Ok(nil) }) t.ok('Future.try_get', fn (t) { @@ -63,7 +60,7 @@ fn pub tests(t: mut Tests) { match int_future { case (r, w) -> { drop(w) - t.true(r.get_until(Duration.from_millis(1)).error?) + t.true(r.get_until(Duration.from_millis(1)).none?) } } })