Skip to content

Commit

Permalink
Implement simple type-safe direct-style actors for local concurrency (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw authored Mar 26, 2024
1 parent 072f61c commit 1d5ef36
Show file tree
Hide file tree
Showing 11 changed files with 366 additions and 85 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ select(c.sendClause(10), d.receiveClause)

More examples [in the docs!](https://ox.softwaremill.com).

## Other projects

The wider goal of direct-style Scala is enabling teams to deliver working software quickly and with confidence. Our
other projects, including [sttp client](https://sttp.softwaremill.com) and [tapir](https://tapir.softwaremill.com),
also include integrations directly tailored towards direct-style.

## Contributing

All suggestions welcome :)
Expand Down
66 changes: 66 additions & 0 deletions core/src/main/scala/ox/channels/actor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package ox.channels

import ox.*

import java.util.concurrent.CompletableFuture
import scala.util.control.NonFatal

object Actor:
/** Creates a new actor, that is a fork in the current concurrency scope, which protects a mutable resource (`logic`) and executes
* invocations on it serially, one after another. It is guaranteed that `logic` will be accessed by at most one thread at a time. The
* methods of `logic: T` define the actor's interface (the messages that can be "sent to the actor").
*
* Invocations can be scheduled using the returned `ActorRef`. When an invocation is an [[ActorRef.ask]], any non-fatal exceptions are
* propagated to the caller, and the actor continues. Fatal exceptions, or exceptions that occur during [[ActorRef.tell]] invocations,
* cause the actor's channel to be closed with an error, and are propagated to the enclosing scope.
*
* The actor's mailbox (incoming channel) will have a capacity as specified by the [[StageCapacity]] in scope.
*/
def create[T](logic: T, close: Option[T => Unit] = None)(using ox: Ox, sc: StageCapacity): ActorRef[T] =
val c = StageCapacity.newChannel[T => Unit]
val ref = ActorRef(c)
fork {
try
forever {
val m = c.receive()
try m(logic)
catch
case t: Throwable =>
c.error(t)
throw t
}
finally close.foreach(c => uninterruptible(c(logic)))
}
ref

class ActorRef[T](c: Sink[T => Unit]):
/** Send an invocation to the actor and await for the result.
*
* The `f` function should be an invocation of a method on `T` and should not directly or indirectly return the `T` value, as this might
* expose the actor's internal mutable state to other threads.
*
* Any non-fatal exceptions thrown by `f` will be propagated to the caller and the actor will continue processing other invocations.
* Fatal exceptions will be propagated to the actor's enclosing scope, and the actor will close.
*/
def ask[U](f: T => U): U =
val cf = new CompletableFuture[U]()
c.send { t =>
try
val _ = cf.complete(f(t))
catch
case NonFatal(e) =>
// since this is an ask, only propagating the exception to the caller, not to the scope
val _ = cf.completeExceptionally(e)
case t: Throwable =>
// fatal exceptions are propagated to the scope (e.g. InterruptedException)
val _ = cf.completeExceptionally(t)
throw t
}
unwrapExecutionException(cf.get())

/** Send an invocation to the actor that should be processed in the background (fire-and-forget). Might block until there's enough space
* in the actor's mailbox (incoming channel).
*
* Any exceptions thrown by `f` will be propagated to the actor's enclosing scope, and the actor will close.
*/
def tell(f: T => Unit): Unit = c.send(f)
88 changes: 88 additions & 0 deletions core/src/test/scala/ox/channels/ActorTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package ox.channels

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

import java.util.concurrent.atomic.AtomicBoolean

class ActorTest extends AnyFlatSpec with Matchers:

trait Test1 {
def f(x: Int): Long
}

it should "invoke methods on the actor" in supervised {
var state = 0L
val logic = new Test1 {
override def f(x: Int): Long =
state += x
state
}

val ref = Actor.create(logic)

ref.ask(_.f(10)) shouldBe 10
ref.ask(_.f(20)) shouldBe 30
}

it should "protect the internal state of the actor" in supervised {
var state = 0L
val logic = new Test1 {
override def f(x: Int): Long =
state += x
state
}

val ref = Actor.create(logic)

val outer = 1000
val inner = 1000

val forks = for (i <- 1 to outer) yield fork {
for (j <- 1 to inner) {
ref.ask(_.f(1))
}
}
forks.foreach(_.join())

ref.ask(_.f(0)) shouldBe outer.toLong * inner
}

it should "run the close callback before re-throwing the exception" in {
val isClosed = new AtomicBoolean(false)
val thrown = the[RuntimeException] thrownBy {
supervised {
var state = 0L
val logic = new Test1 {
override def f(x: Int): Long =
state += x
if state > 2 then throw new RuntimeException("too much")
state
}

val ref = Actor.create(logic, Some(_ => isClosed.set(true)))

ref.ask(_.f(5))
}
}

thrown.getMessage shouldBe "too much"
isClosed.get() shouldBe true
}

it should "end the scope when an exception is thrown when handling .tell" in {
val thrown = the[RuntimeException] thrownBy {
supervised {
val logic = new Test1 {
override def f(x: Int): Long = throw new RuntimeException("boom")
}

val ref = Actor.create(logic)
ref.tell(_.f(5))
Thread.sleep(1000)
}
}

thrown.getMessage shouldBe "boom"
}
53 changes: 53 additions & 0 deletions doc/adr/0006-actors.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# 5. Application errors

Date: 2024-03-26

## Context

Motivated by the Kafka integration, it's often useful to call methods on an object with guaranteed serialisation of
access, just as it happens in actors, which protect their mutable state.

## Decision

The current implementation of actors is very simple, and allows sending any thunk to be executed given the current
actor's state. This forces the internal state to be mutable. Such an approach was chosen because of its simplicity,
and how well it fits the motivating Kafka use-case, but it might need revisiting once more use-cases arise.

An alternative implementation would force each actor invocation to return the updated actor state, in addition to
the value that should be returned to the caller (if any). However, it's not clear then how to combine this with
the type-safe syntax of invoking actors (or "sending messages" to them). For each method `T.m(args ...): U` that is
accessible via `ActorRef[T]`, the actor itself would need to have a `TA.ma(args ...): S => (U, S)` method, where `S` is
the actor's state. The fact that the `T` and `TA` types "match" in this way could be probably verified using a macro,
but would be harder to implement by users and more complex.

While the idea is that the thunks passed to `ActorRef.ask` and `ActorRef.tell` should invoked a single method on the
actor's interface (similar to "sending a message"), this is not actually verified. As an improvement, these methods
could be changed to a macro that would verify the shape of the lambda passed to them:

```scala
def doAsk[T, U: Type](f: Expr[T => U], c: Expr[Sink[MethodInvocation]])(using Quotes): Expr[U] =
import quotes.reflect.*
'{
val cf = new CompletableFuture[U]()
val onResult = (v: Any) => { val _ = cf.complete(v.asInstanceOf[U]) }
val onException = (e: Throwable) => { val _ = cf.completeExceptionally(e) }
$c.send(${
f.asTerm match {
case Inlined(_, _, Block(List(DefDef(_, _, _, Some(Apply(Select(_, method), parameters)))), _)) =>
'{ MethodInvocation(${ Expr(method) }, ${ Expr.ofList(parameters.map(_.asExpr)) }, onResult, onException) }
case _ => report.errorAndAbort(s"Expected a method call in the form _.someMethod(param1, param2), but got: ${f.show}")
}
})
cf.get()
}
```

Another limitation of this implementation is that it's not possible to schedule messages to self, as using the actor's
`ActorRef` from within the actor's implementation can easily lead to a deadlock (always, if the invocation would be an
`ask`, and with some probability if it would be a `tell` - when the actor's channel would become full).

Finally, error handling might be implemented differently - so that each exception thrown by the actor's methods would
be propagated to the actor's enclosing scope, and would close the actor's channel. While this is the only possibility
in case of `.tell`, as otherwise the exception would go unnoticed, in case of `.ask` only fata exceptions are propagated
this way. Non-fatal ones are propagated to the caller, keeping with the original assumption that using an actor should
be as close as possible to calling the method directly (which would simply propagate the exception).
86 changes: 86 additions & 0 deletions doc/channels/actors.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Actors

Actors in ox enable invoking methods on an object serially, keeping the behavior as close as possible to a direct
invocation. That is, even though invocations may happen from multiple threads, they are guaranteed to happen one after
the other, not concurrently.

Actor invocations are fully type-safe, with minimal overhead. They use [channels](index.md) and
[scopes](../fork-join.md) behind the scenes.

One of the use-cases is integrating with external APIs, which are represented by an object containing mutable state.
Such integrations must be protected and cannot be accessed by multiple threads concurrently.

```eval_rst
.. note::
Note that actors as described below are a very basic implementation, covering only some use cases for local
concurrency. In general, actors are especially useful when working in distributedor clustered systems, or when
implementing patterns such as event sourcing. For these use-cases, see the `Pekko <https://pekko.apache.org>`_
project.
```

An actor can be created given any value (representing the actor's state) using `Actor.create`. This creates a fork in
the current concurrency scope, and a channel (using the `StageCapacity` in scope) for scheduling invocations on the
actor's logic.

The result is an `ActorRef`, using which invocations can be scheduled using either the `ask` or `tell` methods.

## Ask

`ask` sends an invocation to the actor and awaits for a result. For example:

```scala mdoc:compile-only
import ox.supervised
import ox.channels.*

class Stateful:
private var counter: Int = 0
def increment(delta: Int): Int =
counter += delta
counter

supervised {
val ref = Actor.create(new Stateful)

ref.ask(_.increment(5)) // blocks until the invocation completes
ref.ask(_.increment(4)) // returns 9
}
```

If a non-fatal exception is thrown by the invocation, it's propagated to the caller, and the actor continues processing
other invocations. Fatal exceptions (e.g. interruptions) are propagated to the enclosing actor's scope, and the actor
closes - trying to create another invocation will throw an exception.

In this approach, actor's internal state usually has to be mutable. For a more functional style, an actor's
implementation can contain a state machine with a single mutable field, containing the current state; each invocation of
an actor's method can then match on the current state, and calculate the next one.

## Tell

It's also possible to schedule an invocation to be processed in the background using `.tell`. This method only blocks
until the invocation can be sent to the actor's channel, but doesn't wait until it's processed.

Note that any exceptions that occur when handling invocations scheduled using `.tell` will be propagated to the actor's
enclosing scope, and will cause the actor to close.

## Close

When creating an actor, it's possible to specify a callback that will be called uninterruptedly before the actor closes.
Such a callback can be used to release any resources held by the actor's logic. It's called when the actor closes, which
includes closing of the enclosing scope:

```scala mdoc:compile-only
import ox.supervised
import ox.channels.*

class Stateful:
def work(howHard: Int): Unit = throw new RuntimeException("boom!")
def close(): Unit = println("Closing")

supervised {
val ref = Actor.create(new Stateful, Some(_.close()))

// fire-and-forget, exception causes the scope to close
ref.tell(_.work(5))
}
```
11 changes: 11 additions & 0 deletions doc/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ Development and maintenance of ox is sponsored by [SoftwareMill](https://softwar

[![](https://files.softwaremill.com/logo/logo.png "SoftwareMill")](https://softwaremill.com)

## Other projects

The wider goal of direct-style Scala is enabling teams to deliver working software quickly and with confidence. Our
other projects, including [sttp client](https://sttp.softwaremill.com) and [tapir](https://tapir.softwaremill.com),
also include integrations directly tailored towards direct-style.

## Commercial Support

We offer commercial support for ox and related technologies, as well as development services. [Contact us](https://softwaremill.com/contact/) to learn more about our offer!
Expand All @@ -48,6 +54,10 @@ We offer commercial support for ox and related technologies, as well as developm

* [Prototype Loom-based concurrency API for Scala](https://softwaremill.com/prototype-loom-based-concurrency-api-for-scala/)
* [Go-like channels using project Loom and Scala](https://softwaremill.com/go-like-channels-using-project-loom-and-scala/)
* [Two types of futures](https://softwaremill.com/two-types-of-futures/)
* [Supervision, Kafka and Java 21: what’s new in Ox](https://softwaremill.com/supervision-kafka-and-java-21-whats-new-in-ox/)
* [Designing a (yet another) retry API](https://softwaremill.com/designing-a-yet-another-retry-api/)
* [Handling errors in direct-style Scala](https://softwaremill.com/handling-errors-in-direct-style-scala/)

## Inspiration & building blocks

Expand Down Expand Up @@ -93,6 +103,7 @@ We offer commercial support for ox and related technologies, as well as developm
channels/select
channels/errors
channels/backpressure
channels/actors
.. toctree::
:maxdepth: 2
Expand Down
Loading

0 comments on commit 1d5ef36

Please sign in to comment.