Skip to content

Commit

Permalink
State for Subject
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 29, 2023
1 parent bcd70e8 commit b7ef16f
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 129 deletions.
129 changes: 0 additions & 129 deletions src/R3/Internal/CompleteState.cs

This file was deleted.

2 changes: 2 additions & 0 deletions src/R3/ReactiveProperty.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ public abstract class ReadOnlyReactiveProperty<T> : Observable<T>
public abstract T CurrentValue { get; }
}

// almostly same code as Subject<T>.

// allow inherit
public class ReactiveProperty<T> : ReadOnlyReactiveProperty<T>, ISubject<T>, IDisposable
{
Expand Down
133 changes: 133 additions & 0 deletions src/R3/Subject.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,138 @@
namespace R3;

// thread-safety state for Subject.
internal struct CompleteState
{
internal enum ResultStatus
{
Done,
AlreadySuccess,
AlreadyFailed
}

const int NotCompleted = 0;
const int CompletedSuccess = 1;
const int CompletedFailure = 2;
const int Disposed = 3;

int completeState;
Exception? error;

public ResultStatus TrySetResult(Result result)
{
int field;
if (result.IsSuccess)
{
field = Interlocked.CompareExchange(ref completeState, CompletedSuccess, NotCompleted); // try set success
}
else
{
field = Interlocked.CompareExchange(ref completeState, CompletedFailure, NotCompleted); // try set failure
Volatile.Write(ref error, result.Exception); // set failure immmediately(but not locked).
}

switch (field)
{
case NotCompleted:
return ResultStatus.Done;
case CompletedSuccess:
return ResultStatus.AlreadySuccess;
case CompletedFailure:
return ResultStatus.AlreadyFailed;
case Disposed:
ThrowObjectDiposedException();
break;
}

return ResultStatus.Done; // not here.
}

public bool TrySetDisposed(out bool alreadyCompleted)
{
var field = Interlocked.Exchange(ref completeState, Disposed);
switch (field)
{
case NotCompleted:
alreadyCompleted = false;
return true;
case CompletedSuccess:
case CompletedFailure:
alreadyCompleted = true;
return true;
case Disposed:
break;
}

alreadyCompleted = false;
return false;
}

public bool IsCompleted
{
get
{
var currentState = Volatile.Read(ref completeState);
switch (currentState)
{
case NotCompleted:
return false;
case CompletedSuccess:
return true;
case CompletedFailure:
return true;
case Disposed:
ThrowObjectDiposedException();
break;
}

return false; // not here.
}
}

public bool IsDisposed => Volatile.Read(ref completeState) == Disposed;

public Result? TryGetResult()
{
var currentState = Volatile.Read(ref completeState);

switch (currentState)
{
case NotCompleted:
return null;
case CompletedSuccess:
return Result.Success;
case CompletedFailure:
return Result.Failure(GetException());
case Disposed:
ThrowObjectDiposedException();
break;
}

return null; // not here.
}

// be careful to use, this method need to call after ResultStatus.AlreadyFailed.
Exception GetException()
{
Exception? error = Volatile.Read(ref this.error);
if (error != null) return error;

var spinner = new SpinWait();
do
{
spinner.SpinOnce();
error = Volatile.Read(ref this.error);
} while (error == null);

return error;
}

static void ThrowObjectDiposedException()
{
throw new ObjectDisposedException("");
}
}

public sealed class Subject<T> : Observable<T>, ISubject<T>, IDisposable
{
FreeListCore<Subscription> list; // struct(array, int)
Expand Down

0 comments on commit b7ef16f

Please sign in to comment.