Skip to content

Commit

Permalink
ElementAtAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 14, 2023
1 parent 6096017 commit 8a6ccf0
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 129 deletions.
52 changes: 11 additions & 41 deletions sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,52 +26,21 @@
EventSystem.DefaultFrameProvider = new ThreadSleepFrameProvider(60);


//var t = new Thread(() =>
//{
// while (true)
// {
// Console.WriteLine("loop"); Thread.Sleep(60);
// }
//});
//t.IsBackground = true;
//t.Start();
// Enumerable.Empty<int>().ElementAtOrDefault(

//var s = new NewThreadScheduler(_ => new Thread(() => { while (true) { Console.WriteLine("loop"); Thread.Sleep(60); } }));
var i = Enumerable.Range(4, 10).ElementAtOrDefault(^0);
Console.WriteLine(i);

//s.Schedule(() => Console.WriteLine("do once"));
//using var f = new ThreadSleepFrameProvider(60);

var source = Event.EveryUpdate(ct.Token);



source.DoOnDisposed(() => { Console.WriteLine("DISPOSED"); }).WriteLine();

SubscriptionTracker.ForEachActiveTask(x =>
{
Console.WriteLine(x);
});



Console.WriteLine("BeforeId:" + Thread.CurrentThread.ManagedThreadId);

await source.WaitAsync();
Console.WriteLine("Press Key to done.");


await Task.Yield();

Console.ReadLine();


SubscriptionTracker.ForEachActiveTask(x =>
IEnumerable<int> Range(int count)
{
Console.WriteLine(x);
});

Console.WriteLine("----------------");
Console.WriteLine("AfterId:" + Thread.CurrentThread.ManagedThreadId);
for (int i = 0; i < count; i++)
{
Console.WriteLine(i);
yield return i;
}
}


public static class Extensions
Expand All @@ -92,3 +61,4 @@ public void Dispose()
CalledCount += 1;
}
}

3 changes: 2 additions & 1 deletion src/R3/Factories/Range.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ protected override IDisposable SubscribeCore(Subscriber<int> subscriber)
{
if (cancellationToken.IsCancellationRequested)
{
subscriber.OnCompleted();
return Disposable.Empty;
}
subscriber.OnNext(start + i);
}
subscriber.OnCompleted(default);
subscriber.OnCompleted();
return Disposable.Empty;
}
}
1 change: 1 addition & 0 deletions src/R3/Factories/Repeat.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ protected override IDisposable SubscribeCore(Subscriber<T> subscriber)
{
if (cancellationToken.IsCancellationRequested)
{
subscriber.OnCompleted();
return Disposable.Empty;
}
subscriber.OnNext(value);
Expand Down
134 changes: 134 additions & 0 deletions src/R3/Operators/ElementAtAsync.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
namespace R3;

public static partial class EventExtensions
{
public static Task<T> ElementAtAsync<T>(this Event<T> source, int index, CancellationToken cancellationToken = default)
{
if (index < 0) throw new ArgumentOutOfRangeException("index");

var subscriber = new ElementAtAsync<T>(index, false, default, cancellationToken);
source.Subscribe(subscriber);
return subscriber.Task;
}

public static Task<T> ElementAtAsync<T>(this Event<T> source, Index index, CancellationToken cancellationToken = default)
{
if (index.IsFromEnd)
{
if (index.Value <= 0) throw new ArgumentOutOfRangeException("index");
var subscriber = new ElementAtFromEndAsync<T>(index.Value, false, default, cancellationToken);
source.Subscribe(subscriber);
return subscriber.Task;
}
else
{
return ElementAtAsync(source, index.Value, cancellationToken);
}
}

public static Task<T> ElementAtOrDefaultAsync<T>(this Event<T> source, int index, T? defaultValue = default, CancellationToken cancellationToken = default)
{
if (index < 0) throw new ArgumentOutOfRangeException("index");
var subscriber = new ElementAtAsync<T>(index, true, defaultValue, cancellationToken);
source.Subscribe(subscriber);
return subscriber.Task;
}

public static Task<T> ElementAtOrDefaultAsync<T>(this Event<T> source, Index index, T? defaultValue = default, CancellationToken cancellationToken = default)
{
if (index.IsFromEnd)
{
var subscriber = new ElementAtFromEndAsync<T>(index.Value, true, defaultValue, cancellationToken);
source.Subscribe(subscriber);
return subscriber.Task;
}
else
{
return ElementAtOrDefaultAsync(source, index.Value, defaultValue, cancellationToken);
}
}
}

internal sealed class ElementAtAsync<T>(int index, bool useDefaultValue, T? defaultValue, CancellationToken cancellationToken)
: TaskSubscriberBase<T, T>(cancellationToken)
{
int count = 0;

protected override void OnNextCore(T value)
{
if (count++ == index)
{
TrySetResult(value);
}
}

protected override void OnErrorResumeCore(Exception error)
{
TrySetException(error);
}

protected override void OnCompletedCore(Result result)
{
if (result.IsFailure)
{
TrySetException(result.Exception);
}
else
{
if (useDefaultValue)
{
TrySetResult(defaultValue!);
}
else
{
TrySetException(new ArgumentOutOfRangeException("index"));
}
}
}
}

// Index.IsFromEnd
internal sealed class ElementAtFromEndAsync<T>(int fromEndIndex, bool useDefaultValue, T? defaultValue, CancellationToken cancellationToken)
: TaskSubscriberBase<T, T>(cancellationToken)
{
Queue<T> queue = new Queue<T>(fromEndIndex);

protected override void OnNextCore(T value)
{
if (queue.Count == fromEndIndex && queue.Count != 0)
{
queue.Dequeue();
}

queue.Enqueue(value);
}

protected override void OnErrorResumeCore(Exception error)
{
TrySetException(error);
}

protected override void OnCompletedCore(Result result)
{
if (result.IsFailure)
{
TrySetException(result.Exception);
return;
}

if (queue.Count == fromEndIndex)
{
var value = queue.Dequeue();
TrySetResult(value);
return;
}

if (useDefaultValue)
{
TrySetResult(defaultValue!);
return;
}

TrySetException(new ArgumentOutOfRangeException("index"));
}
}
87 changes: 0 additions & 87 deletions src/R3/Operators/_Operators.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,90 +22,3 @@ public static partial class EventExtensions
// return tasks:
// All, Any, Contains, SequenceEqual, ElementAt, ElementAtOrDefault, IsEmpty, MaxBy, MinBy, ToDictionary, ToLookup, ForEachAsync
}

// TODO: now working



internal sealed class ElementAtAsync<T>(int index, bool useDefaultValue, T? defaultValue, CancellationToken cancellationToken)
: TaskSubscriberBase<T, T>(cancellationToken)
{
int count = 0;
bool hasValue;

protected override void OnNextCore(T value)
{
hasValue = true;
if (count++ == index)
{
TrySetResult(value);
}
}

protected override void OnErrorResumeCore(Exception error)
{
TrySetException(error);
}

protected override void OnCompletedCore(Result result)
{
if (result.IsFailure)
{
// TODO:...
}
else
{
// TODO:...
}
}
}

// Index.IsFromEnd
internal sealed class ElementAtFromEndAsync<T>(int fromEndIndex, bool useDefaultValue, T? defaultValue, CancellationToken cancellationToken)
: TaskSubscriberBase<T, T>(cancellationToken)
{
bool hasValue;

Queue<T> queue = new Queue<T>(fromEndIndex);

protected override void OnNextCore(T value)
{
hasValue = true;
if (queue.Count == fromEndIndex)
{
queue.Dequeue();
}

queue.Enqueue(value);
}

protected override void OnErrorResumeCore(Exception error)
{
TrySetException(error);
}

protected override void OnCompletedCore(Result result)
{
if (queue.Count == fromEndIndex)
{
var value = queue.Dequeue();
TrySetResult(value);
return;
}

if (useDefaultValue)
{
TrySetResult(defaultValue!);
return;
}

if (!hasValue)
{
TrySetException(new InvalidOperationException("Sequence contains no elements"));
}
else
{
TrySetException(new ArgumentOutOfRangeException("index"));
}
}
}
Loading

0 comments on commit 8a6ccf0

Please sign in to comment.