-
Notifications
You must be signed in to change notification settings - Fork 28
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
9 changed files
with
272 additions
and
45 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
# 5. Application errors | ||
|
||
Date: 2024-03-26 | ||
|
||
## Context | ||
|
||
Motivated by the Kafka integration, it's often useful to call methods on an object with guaranteed serialisation of | ||
access, just as it happens in actors, which protect their mutable state. | ||
|
||
## Decision | ||
|
||
The current implementation of actors is very simple, and allows sending any thunk to be executed given the current | ||
actor's state. This forces the internal state to be mutable. Such an approach was chosen because of its simplicity, | ||
and how well it fits the motivating Kafka use-case, but it might need revisiting once more use-cases arise. | ||
|
||
An alternative implementation would force each actor invocation to return the updated actor state, in addition to | ||
the value that should be returned to the caller (if any). However, it's not clear then how to combine this with | ||
the type-safe syntax of invoking actors (or "sending messages" to them). For each method `T.m(args ...): U` that is | ||
accessible via `ActorRef[T]`, the actor itself would need to have a `TA.ma(args ...): S => (U, S)` method, where `S` is | ||
the actor's state. The fact that the `T` and `TA` types "match" in this way could be probably verified using a macro, | ||
but would be harder to implement by users and more complex. | ||
|
||
While the idea is that the thunks passed to `ActorRef.ask` and `ActorRef.tell` should invoked a single method on the | ||
actor's interface (similar to "sending a message"), this is not actually verified. As an improvement, these methods | ||
could be changed to a macro that would verify the shape of the lambda passed to them: | ||
|
||
```scala | ||
def doAsk[T, U: Type](f: Expr[T => U], c: Expr[Sink[MethodInvocation]])(using Quotes): Expr[U] = | ||
import quotes.reflect.* | ||
'{ | ||
val cf = new CompletableFuture[U]() | ||
val onResult = (v: Any) => { val _ = cf.complete(v.asInstanceOf[U]) } | ||
val onException = (e: Throwable) => { val _ = cf.completeExceptionally(e) } | ||
$c.send(${ | ||
f.asTerm match { | ||
case Inlined(_, _, Block(List(DefDef(_, _, _, Some(Apply(Select(_, method), parameters)))), _)) => | ||
'{ MethodInvocation(${ Expr(method) }, ${ Expr.ofList(parameters.map(_.asExpr)) }, onResult, onException) } | ||
case _ => report.errorAndAbort(s"Expected a method call in the form _.someMethod(param1, param2), but got: ${f.show}") | ||
} | ||
}) | ||
cf.get() | ||
} | ||
``` | ||
|
||
Another limitation of this implementation is that it's not possible to schedule messages to self, as using the actor's | ||
`ActorRef` from within the actor's implementation can easily lead to a deadlock (always, if the invocation would be an | ||
`ask`, and with some probability if it would be a `tell` - when the actor's channel would become full). | ||
|
||
Finally, error handling might be implemented differently - so that each exception thrown by the actor's methods would | ||
be propagated to the actor's enclosing scope, and would close the actor's channel. While this is the only possibility | ||
in case of `.tell`, as otherwise the exception would go unnoticed, in case of `.ask` only fata exceptions are propagated | ||
this way. Non-fatal ones are propagated to the caller, keeping with the original assumption that using an actor should | ||
be as close as possible to calling the method directly (which would simply propagate the exception). |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
# Actors | ||
|
||
Actors in ox enable invoking methods on an object serially, keeping the behavior as close as possible to a direct | ||
invocation. That is, even though invocations may happen from multiple threads, they are guaranteed to happen one after | ||
the other, not concurrently. | ||
|
||
Actor invocations are fully type-safe, with minimal overhead. They use [channels](index.md) and | ||
[scopes](../fork-join.md) behind the scenes. | ||
|
||
One of the use-cases is integrating with external APIs, which are represented by an object containing mutable state. | ||
Such integrations must be protected and cannot be accessed by multiple threads concurrently. | ||
|
||
```eval_rst | ||
.. note:: | ||
Note that actors as described below are a very basic implementation, covering only some use cases for local | ||
concurrency. In general, actors are especially useful when working in distributedor clustered systems, or when | ||
implementing patterns such as event sourcing. For these use-cases, see the `Pekko <https://pekko.apache.org>`_ | ||
project. | ||
``` | ||
|
||
An actor can be created given any value (representing the actor's state) using `Actor.create`. This creates a fork in | ||
the current concurrency scope, and a channel (using the `StageCapacity` in scope) for scheduling invocations on the | ||
actor's logic. | ||
|
||
The result is an `ActorRef`, using which invocations can be scheduled using either the `ask` or `tell` methods. | ||
|
||
## Ask | ||
|
||
`ask` sends an invocation to the actor and awaits for a result. For example: | ||
|
||
```scala | ||
import ox.supervised | ||
import ox.channels.* | ||
|
||
class Stateful: | ||
private var counter: Int = 0 | ||
def increment(delta: Int): Int = | ||
counter += delta | ||
counter | ||
|
||
supervised { | ||
val ref = Actor.create(new Stateful) | ||
|
||
ref.ask(_.increment(5)) // blocks until the invocation completes | ||
ref.ask(_.increment(4)) // returns 9 | ||
} | ||
``` | ||
|
||
If a non-fatal exception is thrown by the invocation, it's propagated to the caller, and the actor continues processing | ||
other invocations. Fatal exceptions (e.g. interruptions) are propagated to the enclosing actor's scope, and the actor | ||
closes - trying to create another invocation will throw an exception. | ||
|
||
In this approach, actor's internal state usually has to be mutable. For a more functional style, an actor's | ||
implementation can contain a state machine with a single mutable field, containing the current state; each invocation of | ||
an actor's method can then match on the current state, and calculate the next one. | ||
|
||
## Tell | ||
|
||
It's also possible to schedule an invocation to be processed in the background using `.tell`. This method only blocks | ||
until the invocation can be sent to the actor's channel, but doesn't wait until it's processed. | ||
|
||
Note that any exceptions that occur when handling invocations scheduled using `.tell` will be propagated to the actor's | ||
enclosing scope, and will cause the actor to close. | ||
|
||
## Close | ||
|
||
When creating an actor, it's possible to specify a callback that will be called uninterruptedly before the actor closes. | ||
Such a callback can be used to release any resources held by the actor's logic. It's called when the actor closes, which | ||
includes closing of the enclosing scope: | ||
|
||
```scala | ||
import ox.supervised | ||
import ox.channels.* | ||
|
||
class Stateful: | ||
def work(howHard: Int): Unit = throw new RuntimeException("boom!") | ||
def close(): Unit = println("Closing") | ||
|
||
supervised { | ||
val ref = Actor.create(new Stateful, Some(_.close())) | ||
|
||
// fire-and-forget, exception causes the scope to close | ||
ref.tell(_.work(5)) | ||
} | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,11 @@ | ||
# Control flow methods | ||
|
||
There are some helper methods which might be useful when writing forked code: | ||
There are some helper methods which might be useful when writing code using ox's concurrency operators: | ||
|
||
* `forever { ... }` repeatedly evaluates the given code block forever | ||
* `repeatWhile { ... }` repeatedly evaluates the given code block, as long as it returns `true` | ||
* `repeatUntil { ... }` repeatedly evaluates the given code block, until it returns `true` | ||
* `uninterruptible { ... }` evaluates the given code block making sure it can't be interrupted | ||
* `never` blocks the current thread indefinitely, until it is interrupted | ||
|
||
All of these are `inline` methods. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.