-
-
Notifications
You must be signed in to change notification settings - Fork 422
Concurrency
One aspect of using immutable data-types like Map
, Seq
, HashSet
, etc. is that in most applications, at some point, you're likely to have some shared reference to one and need to mutate that shared reference. This often requires using synchronisation primitives like lock
(which are not composable and are prone to error).
With a nod to the atom
type in Clojure language-ext has two types:
Atom<A>
Atom<M, A>
These types both wrap a value of A
and provides several variants of the method: Swap
(and Prelude
functions swap
) for atomically mutating the wrapped value without locking.
var atom = Atom(Set("A", "B", "C"));
atom.Swap(old => old.Add("D"));
atom.Swap(old => old.Add("E"));
atom.Swap(old => old.Add("F"));
Debug.Assert(atom == Set("A", "B", "C", "D", "E", "F"));
One thing that must be noted is that if another thread mutates the atom whilst you're running Swap
then your update will rollback. Swap
will then re-run the provided lambda function with the newly updated value (from the other thread) so that it can get a valid updated value to apply. This means that you must be careful to not have side-effects in the Swap
function, or at the very least it needs to be reliably repeatable.
The Atom
constructor can take a Func<A, bool>
validation lambda. This is run against the initial value and all subsequent values before being swapped. If the validation lambda returns false
for the proposed value then false
is returned from Swap
and no update takes place.
The Atom
types both have a Change
event:
public event AtomChangedEvent<A> Change;
It is fired after each successful atomic update to the wrapped value. If you're using the LanguageExt.Rx
extensions then you can also consume the atom.OnChange()
observable.
The Atom<M, A>
type with an M
generic argument, take an additional meta-data argument on construction which can be used to pass through an environment or some sort of context for the Swap
functions:
var env = new Env();
var atom = Atom(env, Set("A", "B", "C"));
atom.Swap((nenv, old) => old.Add("D"));
There are also other variants of Swap
that can take up to two additional arguments and pass them through to the lambda:
var atom = Atom(Set(1, 2, 3));
atom.Swap(4, 5, (x, y, old) => old.Add(x).Add(y));
Debug.Assert(atom == Set(1, 2, 3, 4, 5));
The wrapped value can be accessed by calling atom.Value
or using the implicit operator
conversion to A
.
Atom
is all well and good until you need to update multiple values atomically. This is an entirely different class of problem and can't rely on the built-in Interlocked
atomic operations. And so language-ext provides another type called Ref<A>
. Ref
wraps a value of type A
just like Atom<A>
. But a Ref
can only be updated within a transaction.
Internally language-ext uses a Software Transactional Memory (STM) with Multi-Version Concurrency Control (MVCC)
Let's start with the classic example of a bank-account where we need to atomically debit one account and credit another. Either both operations should succeed or none.
First the definition of the Account
:
public class Account
{
public int Balance;
Account(int balance) =>
Balance = balance;
public Account SetBalance(int value) =>
new Account(value);
public static Ref<Account> New(int balance) =>
Ref(new Account(balance), Account.Validate);
public static bool Validate(Account a) =>
a.Balance >= 0;
}
Note how I've made the constructor private and then made a static
method called New
which forces the result to be a Ref
. It also provides an optional validation function to the Ref
. This will be run before any transaction is committed to ensure we have a valid state.
It isn't necessary to wrap up construction like this, but for something like a bank-account type it's clear we'd always want updates to be atomic, and so this is a really nice way of enforcing the rules of the type.
Now we can write a static class
to do our balance transfer:
public static class Transfer
{
public static Unit Do(Ref<Account> from, Ref<Account> to, int amount) =>
dosync(() =>
{
from.Swap(a => a.SetBalance(a.Balance - amount));
to.Swap(a => a.SetBalance(a.Balance + amount));
});
}
NOTE: Swap
here is a convenience function for updating the value in the Ref
. We could do it manually by setting the Value
property, but it's not quite as pretty:
from.Value = from.Value.SetBalance(from.Value.Balance - amount);
We can also use the swap
prelude function:
swap(from, a => a.SetBalance(a.Balance - amount));
The key here is the dosync
function. That wraps the transactional operation. When it starts it will take a snapshot of the Ref
world. Then all operations are performed in isolation. On completion of the lambda the transaction system will commit the changes. However, if another transaction has run and modified the same values in the meantime then the whole process starts again and the lambda is re-run with the new state of the Ref
world.
dosync
covers the ACI
of ACID
:
- Atomic - means that every change to Refs made within a transaction occurs or none do
- Consistent - means that each new value can be checked with a validator function before allowing the transaction to commit
- Isolated - means that no transaction sees the effects of any other transaction while it is running.
There are two isolation modes, the default being Isolation.Snapshot
-
Isolation.Snapshot
- This is the slightly weaker isolation mode. A snapshot transaction will only fail if there are write conflicts between two transactions. This is usually good enough, but can lead to the problem of:- Transaction 1: read ref A to get the value to write in ref B
- Transaction 2: read ref B to get the value to write in ref A
- These two will never have a write conflict, but the reads conflict with the writes
-
Isolation.Serialisable
- This is the strictest isolation mode and solves the problem mentioned above. However, you should only use this if you have the problem above, as it's more likely to encounter conflicts and have to re-run the transaction.
This is a very good video on Multi-Version Concurrency Control and the two isolation modes listed above
Sometimes you're not looking to modify a value, but you want a consistent snapshot of multiple items. One thing to note is:
-
Ref
reads never block other readers or writers -
Ref
writes never block other readers or writers
And so we can very quickly get a snapshot of a number of Ref
values. If we have a couple of Ref<Account>
values:
Ref<Account> accountA = Account.New(100);
Ref<Account> accountB = Account.New(400);
var (balanceA, balanceB) = dosync(() => (accountA.Value, accountB.Value));
This will always give you a consistent read. You can read from the Value
property outside of a dosync
, it will just return the latest version. But reading the Value
properties from multiple Ref
values outside of a dosync
could lead to an inconsistent view.
It is also possible to set the Value
property with yourRef.Value = ...
, but it must be done within a dosync
otherwise an exception will be thrown.
BIG BIG NOTE: The types that
Ref
wraps should be immutable. Mutable objects can be modified outside of the STM system, breaking this whole concept. If you do this, you're on your own!
I hope the Atom
and Ref
types finds some use, I know I've bumped up against this issue many times in the past and have either ended up manually building synchronisation primitives or fallen back to using the ugly ConcurrentDictionary
or similar.