diff --git a/std/src/std/sync.inko b/std/src/std/sync.inko index 9c363fb5..409d153d 100644 --- a/std/src/std/sync.inko +++ b/std/src/std/sync.inko @@ -176,7 +176,7 @@ type pub Future[T] { # cases as doing so is notoriously difficult. # # To avoid a deadlock, make sure to always write a value to a `Promise` - # _before_ discarding it. + # _before_ discarding it, or use `Future.get_until` to wait using a deadline. # # # Examples # @@ -367,23 +367,53 @@ type pub Promise[T] { # # This method never blocks the calling process. # + # # Disconnected writes + # + # If the corresponding `Future` is dropped, this method returns the value + # wrapped in an `Option.Some`, otherwise an `Option.None` is returned. This + # allows callers to detect a disconnected `Promise` and act accordingly, such + # as by storing the value elsewhere. + # # # Examples # + # Resolving a `Future` using a `Promise`: + # # ```inko # import std.sync (Future) # # match Future.new { # case (future, promise) -> { - # promise.set(42) - # future.get # => 42 + # promise.set(42) # => Option.None + # future.get # => 42 + # } + # } + # ``` + # + # Trying to resolve a dropped `Future`: + # + # ```inko + # import std.sync (Future) + # + # match Future.new { + # case (future, promise) -> { + # drop(future) + # promise.set(42) # => Option.Some(42) # } # } # ``` - fn pub move set(value: uni T) { + fn pub move set(value: uni T) -> Option[uni T] { + let val = Option.Some(value) let fut = lock + + if fut.connected.false? { + fut.unlock + _INKO.moved(fut) + return val + } + let waiter = fut.waiter := NO_WAITER as Pointer[UInt8] - fut.value = Option.Some(value) + fut.value = val fut.unlock # Ensure we don't drop the shared state. @@ -393,6 +423,8 @@ type pub Promise[T] { if waiter as Int != NO_WAITER { inko_process_reschedule_for_value(_INKO.state, _INKO.process, waiter) } + + Option.None } fn lock -> FutureState[uni T] { @@ -431,15 +463,51 @@ type async ChannelState[T] { } fn async mut send(value: uni T) { - match @promises.pop_front { - case Some(p) -> p.set(value) - case _ -> @values.push_back(value) + @values.push_back(value) + + # Now that we have at least one (but possibly multiple) values, we can start + # flushing them to the queued up Promises (if any). This ensures that both + # Promises and values are processed in FIFO order, and as soon as possible. + # + # An important detail here is the handling of disconnected Promises. + # Consider the following case: we have 3 Promises, and the first two are + # disconnected. When sending a value, we want to skip the first two and + # resolve the third Promise. + loop { + match (@promises.pop_front, @values.pop_front) { + case (Some(p), Some(v)) -> { + match p.set(v) { + case Some(v) -> { + # If the Promise is disconnected we'll try the value again with + # the next Promise. + @values.push_front(v) + next + } + case _ -> {} + } + } + case (Some(p), _) -> @promises.push_front(p) + case (_, Some(v)) -> @values.push_front(v) + case _ -> {} + } + + # At this point we either only have Promises or values, meaning there's + # nothing left to do. + break } } fn async mut receive(promise: uni Promise[uni T]) { match @values.pop_front { - case Some(v) -> promise.set(v) + case Some(v) -> { + # If the Promise is disconnected at this point we push the value back to + # the _start_ such that the next Promise will resolve to it, instead of + # the last Promise. + match promise.set(v) { + case Some(v) -> @values.push_front(v) + case _ -> {} + } + } case _ -> @promises.push_back(promise) } } diff --git a/std/test/std/test_sync.inko b/std/test/std/test_sync.inko index 842ac5d8..ec4c0c9a 100644 --- a/std/test/std/test_sync.inko +++ b/std/test/std/test_sync.inko @@ -77,11 +77,20 @@ fn pub tests(t: mut Tests) { } }) + t.test('Promise.set', fn (t) { + match int_future { + case (r, w) -> { + t.equal(w.set(42), Option.None) + t.equal(r.get, 42) + } + } + }) + t.test('Promise.set with a dropped Future', fn (t) { match int_future { case (r, w) -> { drop(r) - w.set(42) + t.equal(w.set(42), Option.Some(42)) } } }) @@ -117,6 +126,26 @@ fn pub tests(t: mut Tests) { t.equal(chan.receive_until(deadline), Option.None) }) + t.test('Channel.receive_until after a previous receive timed out', fn (t) { + let chan: Channel[Int] = Channel.new + + t.equal(chan.receive_until(Duration.from_millis(5)), Option.None) + chan.send(10) + chan.send(20) + t.equal(chan.receive_until(Duration.from_millis(5)), Option.Some(10)) + t.equal(chan.receive_until(Duration.from_millis(5)), Option.Some(20)) + }) + + t.test('Channel.receive after a previous receive timed out', fn (t) { + let chan: Channel[Int] = Channel.new + + t.equal(chan.receive_until(Duration.from_millis(5)), Option.None) + chan.send(10) + chan.send(20) + t.equal(chan.receive, 10) + t.equal(chan.receive, 20) + }) + t.test('Channel.clone', fn (t) { let chan1 = Channel.new let chan2 = chan1.clone