Skip to content

Commit

Permalink
SelectMany
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 26, 2023
1 parent a05145a commit edc5702
Show file tree
Hide file tree
Showing 4 changed files with 402 additions and 3 deletions.
2 changes: 1 addition & 1 deletion sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

var range = System.Reactive.Linq.Observable.Range(1, 10);


// range.SelectMany(

// range.TakeLast(

Expand Down
256 changes: 256 additions & 0 deletions src/R3/Operators/SelectMany.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
namespace R3;

public static partial class ObservableExtensions
{
public static Observable<TResult> SelectMany<TSource, TResult>(this Observable<TSource> source, Func<TSource, Observable<TResult>> selector)
{
return SelectMany(source, selector, static (sourceValue, collectionValue) => collectionValue);
}


public static Observable<TResult> SelectMany<TSource, TCollection, TResult>(this Observable<TSource> source, Func<TSource, Observable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
{
return new SelectMany<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
}

// with index

public static Observable<TResult> SelectMany<TSource, TResult>(this Observable<TSource> source, Func<TSource, int, Observable<TResult>> selector)
{
return SelectMany(source, selector, static (sourceValue, sourceIndex, collectionValue, collectionIndex) => collectionValue);
}

public static Observable<TResult> SelectMany<TSource, TCollection, TResult>(this Observable<TSource> source, Func<TSource, int, Observable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
{
return new SelectManyIndexed<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
}
}

internal sealed class SelectMany<TSource, TCollection, TResult>(Observable<TSource> source, Func<TSource, Observable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
: Observable<TResult>
{
protected override IDisposable SubscribeCore(Observer<TResult> observer)
{
return source.Subscribe(new _SelectMany(observer, collectionSelector, resultSelector));
}

sealed class _SelectMany(Observer<TResult> observer, Func<TSource, Observable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector) : Observer<TSource>
{
readonly Observer<TResult> observer = observer;
readonly Func<TSource, Observable<TCollection>> collectionSelector = collectionSelector;
readonly Func<TSource, TCollection, TResult> resultSelector = resultSelector;
readonly CompositeDisposable compositeDisposable = new();
readonly object gate = new object();
bool isStopped;

protected override bool AutoDisposeOnCompleted => false;

protected override void OnNextCore(TSource value)
{
var nextSource = collectionSelector(value);
nextSource.Subscribe(new _SelectManyCollectionObserver(value, this))
.AddTo(compositeDisposable);
}

protected override void OnErrorResumeCore(Exception error)
{
lock (gate)
{
observer.OnErrorResume(error);
}
}

protected override void OnCompletedCore(Result result)
{
if (result.IsFailure)
{
PublishCompleted(result);
}
else
{
lock (gate)
{
isStopped = true;
if (compositeDisposable.Count == 0)
{
PublishCompleted(result);
}
}
}
}

protected override void DisposeCore()
{
compositeDisposable.Dispose();
}

void PublishCompleted(Result result)
{
try
{
lock (gate)
{
observer.OnCompleted(result);
}
}
finally
{
Dispose();
}
}

sealed class _SelectManyCollectionObserver(TSource sourceValue, _SelectMany parent) : Observer<TCollection>
{
protected override void OnNextCore(TCollection value)
{
var result = parent.resultSelector(sourceValue, value);
parent.observer.OnNext(result);
}

protected override void OnErrorResumeCore(Exception error)
{
parent.observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
if (result.IsFailure)
{
parent.OnCompleted(result);
}
else
{
lock (parent.gate)
{
if (parent.isStopped && parent.compositeDisposable.Count == 1) // only self
{
parent.PublishCompleted(result);
}
}
}
}

protected override void DisposeCore()
{
parent.compositeDisposable.Remove(this);
}
}
}
}


internal sealed class SelectManyIndexed<TSource, TCollection, TResult>(Observable<TSource> source, Func<TSource, int, Observable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
: Observable<TResult>
{
protected override IDisposable SubscribeCore(Observer<TResult> observer)
{
return source.Subscribe(new _SelectMany(observer, collectionSelector, resultSelector));
}

sealed class _SelectMany(Observer<TResult> observer, Func<TSource, int, Observable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector) : Observer<TSource>
{
readonly Observer<TResult> observer = observer;
readonly Func<TSource, int, Observable<TCollection>> collectionSelector = collectionSelector;
readonly Func<TSource, int, TCollection, int, TResult> resultSelector = resultSelector;
readonly CompositeDisposable compositeDisposable = new();
readonly object gate = new object();
bool isStopped;
int index = 0;

protected override bool AutoDisposeOnCompleted => false;

protected override void OnNextCore(TSource value)
{
var i = index++;
var nextSource = collectionSelector(value, i);
nextSource.Subscribe(new _SelectManyCollectionObserver(value, this, i))
.AddTo(compositeDisposable);
}

protected override void OnErrorResumeCore(Exception error)
{
lock (gate)
{
observer.OnErrorResume(error);
}
}

protected override void OnCompletedCore(Result result)
{
if (result.IsFailure)
{
PublishCompleted(result);
}
else
{
lock (gate)
{
isStopped = true;
if (compositeDisposable.Count == 0)
{
PublishCompleted(result);
}
}
}
}

protected override void DisposeCore()
{
compositeDisposable.Dispose();
}

void PublishCompleted(Result result)
{
try
{
lock (gate)
{
observer.OnCompleted(result);
}
}
finally
{
Dispose();
}
}

sealed class _SelectManyCollectionObserver(TSource sourceValue, _SelectMany parent, int sourceIndex) : Observer<TCollection>
{
int index = 0;

protected override void OnNextCore(TCollection value)
{
var result = parent.resultSelector(sourceValue, sourceIndex, value, index++);
parent.observer.OnNext(result);
}

protected override void OnErrorResumeCore(Exception error)
{
parent.observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
if (result.IsFailure)
{
parent.OnCompleted(result);
}
else
{
lock (parent.gate)
{
if (parent.isStopped && parent.compositeDisposable.Count == 1) // only self
{
parent.PublishCompleted(result);
}
}
}
}

protected override void DisposeCore()
{
parent.compositeDisposable.Remove(this);
}
}
}
}
3 changes: 1 addition & 2 deletions src/R3/Operators/_Operators.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ public static partial class ObservableExtensions
//CombineLatest, Merge, Zip, WithLatestFrom, ZipLatest, Switch, MostRecent

// Standard Query:
// Concat, Append, Prepend, Distinct, DistinctUntilChanged, Scan, Select, SelectMany
// Concat, Append, Prepend, Distinct, DistinctUntilChanged, Scan

// SkipTake:
// Skip, SkipLast, SkipUntil, SkipWhile

// return tasks:
// All, Any, Contains, SequenceEqual, IsEmpty, MaxBy, MinBy, ToDictionary, ToLookup,

}
Loading

0 comments on commit edc5702

Please sign in to comment.