Skip to content

Commit

Permalink
ToAsyncEnumerable
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 27, 2023
1 parent a6bbd53 commit 9ae13e0
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 2 deletions.
61 changes: 61 additions & 0 deletions src/R3/Operators/ToAsyncEnumerable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
using System.Threading.Channels;

namespace R3;

public static partial class ObservableExtensions
{
public static IAsyncEnumerable<T> ToAsyncEnumerable<T>(this Observable<T> source, CancellationToken cancellationToken = default)
{
var channel = Channel.CreateUnbounded<T>(new UnboundedChannelOptions
{
SingleWriter = true,
SingleReader = true,
AllowSynchronousContinuations = true
});

var observer = new ToAsyncEnumerable<T>(channel.Writer);
var disposable = source.Subscribe(observer);

if (cancellationToken.CanBeCanceled)
{
observer.registration = cancellationToken.UnsafeRegister(state =>
{
((IDisposable)state!).Dispose(); // cancel IAsyncEnumerable<T> may call from ReadAllAsync so don't care in here.
}, disposable);
}

return channel.Reader.ReadAllAsync(cancellationToken);
}
}

sealed class ToAsyncEnumerable<T>(ChannelWriter<T> writer) : Observer<T>
{
public CancellationTokenRegistration registration;

protected override void OnNextCore(T value)
{
writer.TryWrite(value);
}

protected override void OnErrorResumeCore(Exception error)
{
writer.TryComplete(error);
}

protected override void OnCompletedCore(Result result)
{
if (result.IsFailure)
{
writer.TryComplete(result.Exception);
}
else
{
writer.TryComplete();
}
}

protected override void DisposeCore()
{
registration.Dispose();
}
}
3 changes: 3 additions & 0 deletions src/R3/Operators/_Operators.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@

using System.Runtime.CompilerServices;
using System.Threading.Channels;

namespace R3;

public static partial class ObservableExtensions
Expand Down
4 changes: 2 additions & 2 deletions tests/R3.Tests/FactoryTests/ToObservableTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ public async Task AsyncEnumerableToObservable()

fakeTime.Advance(TimeSpan.FromSeconds(1));

await Task.Delay(1);
await Task.Delay(100);
list.AssertEqual([1, 2]);

fakeTime.Advance(TimeSpan.FromSeconds(1));

await Task.Delay(1);
await Task.Delay(100);
list.AssertEqual([1, 2, 3]);
}

Expand Down
58 changes: 58 additions & 0 deletions tests/R3.Tests/OperatorTests/ToAsyncEnumerableTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
namespace R3.Tests.OperatorTests;

public class ToAsyncEnumerableTest
{
[Fact]
async void Test()
{
var publisher = new Subject<int>();
var cts = new CancellationTokenSource();
var e = publisher.ToAsyncEnumerable(cts.Token);

publisher.OnNext(1);
publisher.OnNext(10);
publisher.OnNext(100);
publisher.OnCompleted();

var l = new List<int>();
await foreach (var item in e)
{
l.Add(item);
}

l.Should().Equal([1, 10, 100]);
}

[Fact]
async void Cancel()
{
var publisher = new Subject<int>();
var cts = new CancellationTokenSource();

var disposed = false;
var e = publisher.Do(onDispose: () => disposed = true).ToAsyncEnumerable(cts.Token);

publisher.OnNext(1);
publisher.OnNext(10);

publisher.OnNext(100);
// publisher.OnCompleted();

var l = new List<int>();

await Assert.ThrowsAsync<TaskCanceledException>(async () =>
{
await foreach (var item in e)
{
l.Add(item);
if (item == 10)
{
cts.Cancel();
}
}
});

l.Should().Equal([1, 10, 100]);
disposed.Should().BeTrue();
}
}

0 comments on commit 9ae13e0

Please sign in to comment.