Skip to content

Commit

Permalink
Add SingleAssignmentSubject
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Aug 20, 2024
1 parent d15789b commit 7ebf09c
Show file tree
Hide file tree
Showing 2 changed files with 288 additions and 0 deletions.
176 changes: 176 additions & 0 deletions src/R3/SingleAssignmentSubject.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
namespace R3;

public sealed class SingleAssignmentSubject<T> : Observable<T>, ISubject<T>, IDisposable
{
Observer<T>? singleObserver;
Result completed;

public bool IsDisposed => singleObserver == DisposedObserver.Instance;

public void OnNext(T value)
{
var observer = singleObserver;
if (observer == CompletedObserver.Instance || observer == null)
{
// do nothing
}
else if (observer == DisposedObserver.Instance)
{
ThrowAlreadyDisposed();
}
else
{
observer.OnNext(value);
}
}

public void OnErrorResume(Exception error)
{
var observer = singleObserver;
if (observer == CompletedObserver.Instance || observer == null)
{
// do nothing
}
else if (observer == DisposedObserver.Instance)
{
ThrowAlreadyDisposed();
}
else
{
observer.OnErrorResume(error);
}
}

public void OnCompleted(Result complete)
{
while (true)
{
var observer = Volatile.Read(ref singleObserver);
if (observer == CompletedObserver.Instance)
{
// do nothing
return;
}
else if (observer == DisposedObserver.Instance)
{
ThrowAlreadyDisposed();
return;
}
else
{
this.completed = complete;
if (Interlocked.CompareExchange(ref singleObserver, CompletedObserver.Instance, observer) == observer)
{
observer?.OnCompleted(complete);
return;
}
}
}
}

protected override IDisposable SubscribeCore(Observer<T> observer)
{
var field = Interlocked.CompareExchange(ref singleObserver, observer, null);
if (field == null)
{
// ok to set.
return new Subscription(this);
}

if (field == DisposedObserver.Instance)
{
ThrowAlreadyDisposed();
}
else if (field == CompletedObserver.Instance)
{
observer.OnCompleted(completed);
}
else
{
ThrowAlreadyAssignment();
}
return Disposable.Empty;
}

public void Dispose()
{
Dispose(true);
}

public void Dispose(bool callOnCompleted)
{
var observer = Interlocked.Exchange(ref singleObserver, DisposedObserver.Instance);
if (observer != DisposedObserver.Instance && observer != null && callOnCompleted)
{
observer.OnCompleted();
}
}

static void ThrowAlreadyAssignment()
{
throw new InvalidOperationException("Observer is already assigned.");
}

void ThrowAlreadyDisposed()
{
throw new ObjectDisposedException("");
}

class Subscription(SingleAssignmentSubject<T> parent) : IDisposable
{
public void Dispose()
{
while (true)
{
var observer = Volatile.Read(ref parent.singleObserver);
if (observer == CompletedObserver.Instance || observer == DisposedObserver.Instance || observer == null)
{
// do nothing
return;
}
else
{
// reset to null(allow multiple assignment after first subscription is disposed)
if (Interlocked.CompareExchange(ref parent.singleObserver, null, observer) == observer)
{
return;
}
}
}
}
}

sealed class CompletedObserver : Observer<T>
{
public static readonly CompletedObserver Instance = new();

protected override void OnCompletedCore(Result result)
{
}

protected override void OnErrorResumeCore(Exception error)
{
}

protected override void OnNextCore(T value)
{
}
}

sealed class DisposedObserver : Observer<T>
{
public static readonly DisposedObserver Instance = new();

protected override void OnCompletedCore(Result result)
{
}

protected override void OnErrorResumeCore(Exception error)
{
}

protected override void OnNextCore(T value)
{
}
}
}
112 changes: 112 additions & 0 deletions tests/R3.Tests/SubjectTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,116 @@ public void SubscribeAfterCompleted()
l.Result.Exception!.Message.Should().Be("foo");
}
}

[Fact]
public void SingleAssignment()
{
// normal
{
var s = new SingleAssignmentSubject<int>();
using var l = s.ToLiveList();
s.OnNext(1);
s.OnNext(2);
s.OnNext(3);
l.Should().Equal(1, 2, 3);

s.OnCompleted();
l.AssertIsCompleted();
}
// subscribe twice
{
var s = new SingleAssignmentSubject<int>();
using var l = s.ToLiveList();
Assert.Throws<InvalidOperationException>(() => s.Subscribe());
}

// subject test copy

// Dispose(not yet completed)
{
var s = new SingleAssignmentSubject<int>();
using var l = s.ToLiveList();
s.OnNext(1);
s.OnNext(2);
s.OnNext(3);
s.Dispose();

l.AssertEqual([1, 2, 3]);
l.AssertIsCompleted();
s.IsDisposed.Should().BeTrue();
}

// already OnCompleted(Success), Dispose
{
var s = new SingleAssignmentSubject<int>();
using var l = s.ToLiveList();
s.OnNext(1);
s.OnNext(2);
s.OnNext(3);
s.OnCompleted();
s.Dispose();

l.AssertEqual([1, 2, 3]);
l.AssertIsCompleted();
s.IsDisposed.Should().BeTrue();
}

// already OnCompleted(Failure), Dispose
{
var s = new SingleAssignmentSubject<int>();
using var l = s.ToLiveList();
s.OnNext(1);
s.OnNext(2);
s.OnNext(3);
s.OnCompleted(new Exception("foo"));
s.Dispose();

l.AssertEqual([1, 2, 3]);
l.AssertIsCompleted();
s.IsDisposed.Should().BeTrue();
}


// already Disposed, call OnNext
{
var s = new SingleAssignmentSubject<int>();
s.Dispose();
Assert.Throws<ObjectDisposedException>(() => s.OnNext(1));
}
// already Disposed, call OnError
{
var s = new SingleAssignmentSubject<int>();
s.Dispose();
Assert.Throws<ObjectDisposedException>(() => s.OnErrorResume(new Exception()));
}
// already Disposed, call OnCompleted
{
var s = new SingleAssignmentSubject<int>();
s.Dispose();
Assert.Throws<ObjectDisposedException>(() => s.OnCompleted());
}


{
// after Success
var s = new SingleAssignmentSubject<int>();
s.OnCompleted();

using var l = s.ToLiveList();

l.AssertIsCompleted();
l.Result.IsSuccess.Should().BeTrue();
}
{
// after Failure
var s = new SingleAssignmentSubject<int>();
s.OnCompleted(new Exception("foo"));

using var l = s.ToLiveList();

l.AssertIsCompleted();
l.Result.IsFailure.Should().BeTrue();
l.Result.Exception!.Message.Should().Be("foo");
}
}
}

0 comments on commit 7ebf09c

Please sign in to comment.