Skip to content

Commit

Permalink
re:compilable
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 14, 2023
1 parent 369b334 commit a635b3c
Show file tree
Hide file tree
Showing 30 changed files with 197 additions and 270 deletions.
9 changes: 8 additions & 1 deletion R3.sln
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,14 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ConsoleApp1", "sandbox\Cons
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{0544806B-3BB4-43CF-8277-BC612F32208D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "R3.Tests", "tests\R3.Tests\R3.Tests.csproj", "{42F7C4F7-3BB3-4DC8-8285-9DFF7E93BC4B}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "R3.Tests", "tests\R3.Tests\R3.Tests.csproj", "{42F7C4F7-3BB3-4DC8-8285-9DFF7E93BC4B}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "docs", "docs", "{C7327A31-448F-4A7D-9AC6-C06F5AA03D02}"
ProjectSection(SolutionItems) = preProject
.gitignore = .gitignore
Directory.Build.props = Directory.Build.props
README.md = README.md
EndProjectSection
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down
28 changes: 5 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,15 @@
Third Generation of Reactive Extensions.

```csharp
public abstract class Event<TMessage>
public abstract class Event<T>
{
public IDisposable Subscribe(Subscriber<TMessage> subscriber);
public IDisposable Subscribe(Subscriber<T> subscriber);
}

public abstract class Subscriber<TMessage> : IDisposable
public abstract class Subscriber<T> : IDisposable
{
public void OnNext(TMessage message);
public void OnNext(T value);
public void OnErrorResume(Exception error);
}

// Completable
public abstract class CompletableEvent<TMessage, TComplete>
{
public IDisposable Subscribe(Subscriber<TMessage, TComplete> subscriber)
}

public abstract class Subscriber<TMessage, TComplete> : IDisposable
{
public void OnNext(TMessage message);
public void OnErrorResume(Exception error);
public void OnCompleted(TComplete complete);
public void OnCompleted(Result result);
}
```

```csharp
// similar as IObserver<T>
CompletableEvent<TMessage, Result<TComplete>>
```

4 changes: 2 additions & 2 deletions src/R3/EventSubscribeExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ internal sealed class AnonymousRSubscriber<T>(Action<T> onNext, Action<Exception
[DebuggerStepThrough]
protected override void OnNextCore(T value)
{
onNext(message);
onNext(value);
}

[DebuggerStepThrough]
Expand All @@ -94,7 +94,7 @@ internal sealed class AnonymousSubscriber<T>(Action<T> onNext, Action<Exception>
[DebuggerStepThrough]
protected override void OnNextCore(T value)
{
onNext(message);
onNext(value);
}

[DebuggerStepThrough]
Expand Down
10 changes: 2 additions & 8 deletions src/R3/Factories/Return.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,8 @@ sealed class _Return(T value, Subscriber<T> subscriber) : IDisposable
static void NextTick(object? state)
{
var self = (_Return)state!;
if (self.subscriber.OnNext(self.value))
{
self.subscriber.OnCompleted();
}
else
{
self.subscriber.OnCompleted(Result.Failure);
}
self.subscriber.OnNext(self.value);
self.subscriber.OnCompleted();
}

public void Dispose()
Expand Down
36 changes: 0 additions & 36 deletions src/R3/Factories/ToCompletableEvent.cs

This file was deleted.

49 changes: 43 additions & 6 deletions src/R3/Factories/ToEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,58 @@

public static partial class Event
{
public static Event<TMessage> ToEvent<T>(this IEnumerable<T> source)
public static Event<T> ToCompletableEvent<T>(this Task<T> task)
{
return new ToEvent<T>(source);
return new TaskToEvent<T>(task);
}

public static Event<T> ToEvent<T>(this IEnumerable<T> source)
{
return new EnumerableToEvent<T>(source);
}
}

internal sealed class TaskToEvent<T>(Task<T> task) : Event<T>
{
protected override IDisposable SubscribeCore(Subscriber<T> subscriber)
{
var subscription = new CancellationDisposable();
SubscribeTask(subscriber, subscription.Token);
return subscription;
}

async void SubscribeTask(Subscriber<T> subscriber, CancellationToken cancellationToken)
{
T? result;
try
{
result = await task.WaitAsync(cancellationToken);
}
catch (Exception ex)
{
if (ex is OperationCanceledException oce && oce.CancellationToken == cancellationToken) // disposed.
{
return;
}

subscriber.OnCompleted(ex);
return;
}

subscriber.OnNext(result);
subscriber.OnCompleted();
}
}

internal class ToEvent<T>(IEnumerable<T> source) : Event<TMessage>
internal class EnumerableToEvent<T>(IEnumerable<T> source) : Event<T>
{
protected override IDisposable SubscribeCore(Subscriber<TMessage> subscriber)
protected override IDisposable SubscribeCore(Subscriber<T> subscriber)
{
foreach (var message in source)
{
subscriber.OnNext(value);
subscriber.OnNext(message);
}
subscriber.OnCompleted(default);
subscriber.OnCompleted();
return Disposable.Empty;
}
}
16 changes: 12 additions & 4 deletions src/R3/LiveList.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Collections;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.InteropServices;

namespace R3;
Expand All @@ -24,12 +23,21 @@ public sealed class LiveList<T> : IReadOnlyList<T>, IDisposable
readonly int bufferSize;

bool isCompleted;
Result? completedValue;
Result completedValue;

[MemberNotNullWhen(true, nameof(CompletedValue))]
public bool IsCompleted => isCompleted;

public Result? CompletedValue => completedValue;
public Result CompletedValue
{
get
{
lock (list)
{
if (!isCompleted) throw new InvalidOperationException("LiveList is not completed, you should check IsCompleted.");
return completedValue;
}
}
}

public LiveList(Event<T> source)
{
Expand Down
9 changes: 9 additions & 0 deletions src/R3/Operators/DoOnCompleted.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@ public static partial class EventExtensions
{
// TODO: more accurate impl
// TODO: with state


// TODO: other file.
public static Event<T> CancelOnCompleted<T>(this Event<T> source, CancellationTokenSource cancellationTokenSource)
{
return new DoOnCompleted<T>(source, _ => cancellationTokenSource.Cancel());
}


public static Event<T> DoOnCompleted<T>(this Event<T> source, Action<Result> action)
{
return new DoOnCompleted<T>(source, action);
Expand Down
35 changes: 0 additions & 35 deletions src/R3/Operators/OnErrorAsComplete.cs

This file was deleted.

35 changes: 35 additions & 0 deletions src/R3/Operators/OnErrorResumeAsFailure.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
namespace R3;

public static partial class EventExtensions
{
public static Event<T> OnErrorResumeAsFailure<T>(this Event<T> source)
{
return new OnErrorResumeAsFailure<T>(source);
}
}

internal class OnErrorResumeAsFailure<T>(Event<T> source) : Event<T>
{
protected override IDisposable SubscribeCore(Subscriber<T> subscriber)
{
return source.Subscribe(new _OnErrorAsComplete(subscriber));
}

sealed class _OnErrorAsComplete(Subscriber<T> subscriber) : Subscriber<T>
{
protected override void OnNextCore(T value)
{
subscriber.OnNext(value);
}

protected override void OnErrorResumeCore(Exception error)
{
subscriber.OnCompleted(error);
}

protected override void OnCompletedCore(Result complete)
{
subscriber.OnCompleted(complete);
}
}
}
35 changes: 10 additions & 25 deletions src/R3/Operators/Select.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,51 +6,36 @@ public static partial class EventExtensions
// TODO: CompletableEvent.Select
// TODO: Element index overload

// TODO: Select for Result


public static Event<TMessageResult> Select<TMessage, TMessageResult>(
this Event<T> source,
Func<TMessage, TMessageResult> messageSelector)
{
return new Select<TMessage, TMessageResult>(source, messageSelector, Stubs<Result>.ReturnSelf);
}

public static Event<TMessageResultResult> Select<TMessage, TMessageResultResult>(
public static Event<TResult> Select<T, TResult>(
this Event<T> source,
Func<TMessage, TMessageResult> messageSelector,
Func<ResultResult> completeSelector)
Func<T, TResult> selector)
{
return new Select<TMessage, TMessageResultResult>(source, messageSelector, completeSelector);
return new Select<T, TResult>(source, selector);
}
}

internal sealed class Select<TMessage, TMessageResultResult>(
Event<T> source,
Func<TMessage, TMessageResult> messageSelector,
Func<ResultResult> completeSelector
) : Event<TMessageResultResult>
internal sealed class Select<T, TResult>(Event<T> source, Func<T, TResult> selector) : Event<TResult>
{
protected override IDisposable SubscribeCore(Subscriber<TMessageResultResult> subscriber)
protected override IDisposable SubscribeCore(Subscriber<TResult> subscriber)
{
return source.Subscribe(new _Select(subscriber, messageSelector, completeSelector));
return source.Subscribe(new _Select(subscriber, selector));
}

class _Select(Subscriber<TMessageResultResult> subscriber, Func<TMessage, TMessageResult> messageSelector, Func<ResultResult> completeSelector) : Subscriber<T>
class _Select(Subscriber<TResult> subscriber, Func<T, TResult> selector) : Subscriber<T>
{
protected override void OnNextCore(T value)
{
subscriber.OnNext(messageSelector(message));
subscriber.OnNext(selector(value));
}

protected override void OnErrorResumeCore(Exception error)
{
subscriber.OnErrorResume(error);
}

protected override void OnCompletedCore(Result complete)
protected override void OnCompletedCore(Result result)
{
subscriber.OnCompleted(completeSelector(complete));
subscriber.OnCompleted(result);
}
}
}
Loading

0 comments on commit a635b3c

Please sign in to comment.