Skip to content

Commit

Permalink
Amb
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 27, 2023
1 parent 935fcb4 commit a6bbd53
Show file tree
Hide file tree
Showing 10 changed files with 454 additions and 109 deletions.
2 changes: 2 additions & 0 deletions sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
// Enumerable.Empty<int>().ElementAtOrDefault(

var range = System.Reactive.Linq.Observable.Range(1, 10);


// range.Catch(
// range.Append(

Expand Down
129 changes: 111 additions & 18 deletions src/R3/Factories/Amb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,128 @@

public static partial class Observable
{
public static IObservable<T> Amb<T>(params IObservable<T>[] sources)
public static Observable<T> Amb<T>(params Observable<T>[] sources)
{
throw new NotImplementedException();
return new Amb<T>(sources);
}

public static IObservable<T> Amb<T>(IEnumerable<IObservable<T>> sources)
public static Observable<T> Amb<T>(IEnumerable<Observable<T>> sources)
{
throw new NotImplementedException();
return new Amb<T>(sources);
}
}

internal sealed class Amb<T>(IEnumerable<IObservable<T>> sources) : Observable<T>
public static partial class ObservableExtensions
{
public static Observable<T> Amb<T>(this Observable<T> source, Observable<T> second)
{
return Observable.Amb(source, second);
}
}

internal sealed class Amb<T>(IEnumerable<Observable<T>> sources) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
//new CompositeDiposableBuilder
// Disposable.CreateBuilder();
throw new NotImplementedException();
//if (sources.TryGetNonEnumeratedCount(out var count))
//{


//}
//else
//{

//}
// throw new NotImplementedException();
if (!sources.TryGetNonEnumeratedCount(out var count))
{
count = 4;
}

var amb = new _Amb(observer, count);
var index = 0;
foreach (var item in sources)
{
var d = item.Subscribe(new _AmbObserver(amb, index++));
amb.disposables.Add(d);
}
return amb;
}

sealed class _Amb : IDisposable
{
public Observer<T> observer;
public ListDisposableCore disposables;

public _AmbObserver? winner;

public _Amb(Observer<T> observer, int initialCount)
{
this.observer = observer;
this.disposables = new ListDisposableCore(initialCount, this);
}

public void Dispose()
{
disposables.Dispose();
}
}

sealed class _AmbObserver(_Amb parent, int index) : Observer<T>
{
protected override void OnNextCore(T value)
{
var field = Interlocked.CompareExchange(ref parent.winner, this, null);
if (field == null)
{
// first, dispose others.
parent.disposables.RemoveAllExceptAt(index);
parent.observer.OnNext(value);
}
else if (field == this)
{
parent.observer.OnNext(value);
}
else
{
// dispose self.
Dispose();
}
}

protected override void OnErrorResumeCore(Exception error)
{
var field = Interlocked.CompareExchange(ref parent.winner, this, null);
if (field == null)
{
// first, dispose others.
parent.disposables.RemoveAllExceptAt(index);
parent.observer.OnErrorResume(error);
}
else if (field == this)
{
parent.observer.OnErrorResume(error);
}
else
{
// dispose self.
Dispose();
}
}

protected override void OnCompletedCore(Result result)
{
var field = Interlocked.CompareExchange(ref parent.winner, this, null);
if (field == null)
{
// first, dispose others.
parent.disposables.RemoveAllExceptAt(index);
parent.observer.OnCompleted(result);
}
else if (field == this)
{
parent.observer.OnCompleted(result);
}
else
{
// dispose self.
Dispose();
}
}

protected override void DisposeCore()
{
parent.disposables.RemoveAt(index);
}
}
}
103 changes: 103 additions & 0 deletions src/R3/Factories/Concat.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
namespace R3;

public static partial class Observable
{
public static Observable<T> Concat<T>(params Observable<T>[] sources)
{
return new Concat<T>(sources);
}

public static Observable<T> Concat<T>(IEnumerable<Observable<T>> sources)
{
return new Concat<T>(sources);
}
}

public static partial class ObservableExtensions
{
public static Observable<T> Concat<T>(this Observable<T> source, Observable<T> second)
{
return new Concat<T>(new[] { source, second });
}
}

internal sealed class Concat<T>(IEnumerable<Observable<T>> sources) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
return new _Concat(observer, sources).Run();
}

sealed class _Concat : IDisposable
{
public Observer<T> observer;
public IEnumerator<Observable<T>> enumerator;
public SerialDisposableCore disposable;

public _Concat(Observer<T> observer, IEnumerable<Observable<T>> sources)
{
this.observer = observer;
this.enumerator = sources.GetEnumerator();
}

public IDisposable Run()
{
if (!enumerator.MoveNext())
{
observer.OnCompleted();
enumerator.Dispose();
return Disposable.Empty;
}
else
{
disposable.Disposable = enumerator.Current.Subscribe(new _ConcatObserver(this));
return this;
}
}

public void Dispose()
{
enumerator.Dispose();
disposable.Dispose();
}
}

sealed class _ConcatObserver(_Concat parent) : Observer<T>
{
protected override void OnNextCore(T value)
{
parent.observer.OnNext(value);
}

protected override void OnErrorResumeCore(Exception error)
{
parent.observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
if (result.IsFailure)
{
try
{
parent.observer.OnCompleted(result);
}
finally
{
Dispose();
}
}
else
{
if (parent.enumerator.MoveNext())
{
parent.disposable.Disposable = parent.enumerator.Current.Subscribe(new _ConcatObserver(parent));
}
else
{
parent.observer.OnCompleted();
}
}
}
}
}
81 changes: 81 additions & 0 deletions src/R3/Internal/ListDisposableCore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
namespace R3.Internal;

internal struct ListDisposableCore : IDisposable
{
IDisposable?[] disposables;
int count;
object gate;

public ListDisposableCore(int initialCount, object gate)
{
this.disposables = new IDisposable?[initialCount];
this.gate = gate;
}

public void Add(IDisposable disposable)
{
lock (gate)
{
if (disposables.Length == count)
{
Array.Resize(ref disposables, count * 2);
}

disposables[count++] = disposable;
}
}

public void RemoveAt(int index)
{
lock (gate)
{
if (index < 0 || index >= count)
{
return;
}

ref var d = ref disposables[index];
if (d != null)
{
d.Dispose();
}
d = null;
}
}

public void RemoveAllExceptAt(int index)
{
lock (gate)
{
if (index < 0 || index >= count)
{
return;
}

for (int i = 0; i < count; i++)
{
if (i == index) continue;

ref var d = ref disposables[i];
if (d != null)
{
d.Dispose();
}
d = null;
}
}
}

public void Dispose()
{
lock (gate)
{
for (int i = 0; i < count; i++)
{
disposables[i]?.Dispose();
disposables[i] = null;
count = 0;
}
}
}
}
1 change: 1 addition & 0 deletions src/R3/Internal/PooledThreadPoolWorkItem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace R3.Internal;

// TODO: remove this(maybe no use).
internal sealed class PooledThreadPoolWorkItem<T> : IThreadPoolWorkItem
{
static ConcurrentQueue<PooledThreadPoolWorkItem<T>> pool = new();
Expand Down
Loading

0 comments on commit a6bbd53

Please sign in to comment.