Skip to content

Commit

Permalink
many compile errors still
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 14, 2023
1 parent 624b4ad commit 369b334
Show file tree
Hide file tree
Showing 48 changed files with 645 additions and 1,019 deletions.
4 changes: 2 additions & 2 deletions sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@

public static class Extensions
{
public static IDisposable WriteLine<T, U>(this Event<T, U> source)
public static IDisposable WriteLine<T>(this Event<T> source)
{
return source.Subscribe(x => Console.WriteLine(x), _ => Console.WriteLine("COMPLETED"));
return source.Subscribe(x => Console.WriteLine(x), x => Console.WriteLine(x));
}
}

Expand Down
23 changes: 11 additions & 12 deletions src/R3/Event.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

namespace R3;

public abstract class Event<TMessage, TComplete>
public abstract class Event<T>
{
[StackTraceHidden, DebuggerStepThrough]
public IDisposable Subscribe(Subscriber<TMessage, TComplete> subscriber)
public IDisposable Subscribe(Subscriber<T> subscriber)
{
try
{
Expand All @@ -28,14 +28,13 @@ public IDisposable Subscribe(Subscriber<TMessage, TComplete> subscriber)
}
}

protected abstract IDisposable SubscribeCore(Subscriber<TMessage, TComplete> subscriber);
protected abstract IDisposable SubscribeCore(Subscriber<T> subscriber);
}

// similar as IObserver<T>
public abstract class Subscriber<TMessage, TComplete> : IDisposable
public abstract class Subscriber<T> : IDisposable
{
#if DEBUG
[Obsolete("Only allow in CompletableEvent<TMessage>.")]
[Obsolete("Only allow in Event<T>.")]
#endif
internal SingleAssignmentDisposableCore SourceSubscription;

Expand All @@ -46,21 +45,21 @@ public abstract class Subscriber<TMessage, TComplete> : IDisposable
bool IsCalledCompleted => Volatile.Read(ref calledOnCompleted) != 0;

[StackTraceHidden, DebuggerStepThrough]
public void OnNext(TMessage message)
public void OnNext(T value)
{
if (IsDisposed || IsCalledCompleted) return;

try
{
OnNextCore(message);
OnNextCore(value);
}
catch (Exception ex)
{
OnErrorResume(ex);
}
}

protected abstract void OnNextCore(TMessage message);
protected abstract void OnNextCore(T value);

[StackTraceHidden, DebuggerStepThrough]
public void OnErrorResume(Exception error)
Expand All @@ -80,7 +79,7 @@ public void OnErrorResume(Exception error)
protected abstract void OnErrorResumeCore(Exception error);

[StackTraceHidden, DebuggerStepThrough]
public void OnCompleted(TComplete complete)
public void OnCompleted(Result result)
{
if (Interlocked.Exchange(ref calledOnCompleted, 1) != 0)
{
Expand All @@ -90,7 +89,7 @@ public void OnCompleted(TComplete complete)

try
{
OnCompletedCore(complete);
OnCompletedCore(result);
}
catch (Exception ex)
{
Expand All @@ -102,7 +101,7 @@ public void OnCompleted(TComplete complete)
}
}

protected abstract void OnCompletedCore(TComplete complete);
protected abstract void OnCompletedCore(Result result);

[StackTraceHidden, DebuggerStepThrough]
public void Dispose()
Expand Down
88 changes: 26 additions & 62 deletions src/R3/EventSubscribeExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,53 +7,42 @@ public static class EventSubscribeExtensions
// TODO: with State

[DebuggerStepThrough]
public static IDisposable Subscribe<TMessage, TComplete>(this Event<TMessage, TComplete> source)
public static IDisposable Subscribe<T>(this Event<T> source)
{
return source.Subscribe(NopSubscriber<TMessage, TComplete>.Instance);
return source.Subscribe(NopSubscriber<T>.Instance);
}

[DebuggerStepThrough]
public static IDisposable Subscribe<TMessage, TComplete>(this Event<TMessage, Result<TComplete>> source)
{
return source.Subscribe(NopRSubscriber<TMessage, TComplete>.Instance);
}

[DebuggerStepThrough]
public static IDisposable Subscribe<TMessage, TComplete>(this Event<TMessage, TComplete> source, Action<TMessage> onNext)
public static IDisposable Subscribe<T>(this Event<T> source, Action<T> onNext)
{
return source.Subscribe(new AnonymousSubscriber<TMessage, TComplete>(onNext, EventSystem.GetUnhandledExceptionHandler(), Stubs<TComplete>.Nop));
return source.Subscribe(new AnonymousSubscriber<T>(onNext, EventSystem.GetUnhandledExceptionHandler(), Stubs.HandleError));
}

[DebuggerStepThrough]
public static IDisposable Subscribe<TMessage, TComplete>(this Event<TMessage, Result<TComplete>> source, Action<TMessage> onNext)
public static IDisposable Subscribe<T>(this Event<T> source, Action<T> onNext, Action<Result> onComplete)
{
return source.Subscribe(new AnonymousRSubscriber<TMessage, TComplete>(onNext, EventSystem.GetUnhandledExceptionHandler()));
return source.Subscribe(new AnonymousSubscriber<T>(onNext, EventSystem.GetUnhandledExceptionHandler(), onComplete));
}

[DebuggerStepThrough]
public static IDisposable Subscribe<TMessage, TComplete>(this Event<TMessage, TComplete> source, Action<TMessage> onNext, Action<TComplete> onComplete)
public static IDisposable Subscribe<T>(this Event<T> source, Action<T> onNext, Action<Exception> onErrorResume, Action<Result> onComplete)
{
return source.Subscribe(new AnonymousSubscriber<TMessage, TComplete>(onNext, EventSystem.GetUnhandledExceptionHandler(), onComplete));
}

[DebuggerStepThrough]
public static IDisposable Subscribe<TMessage, TComplete>(this Event<TMessage, TComplete> source, Action<TMessage> onNext, Action<Exception> onErrorResume, Action<TComplete> onComplete)
{
return source.Subscribe(new AnonymousSubscriber<TMessage, TComplete>(onNext, onErrorResume, onComplete));
return source.Subscribe(new AnonymousSubscriber<T>(onNext, onErrorResume, onComplete));
}
}

[DebuggerStepThrough]
internal sealed class NopSubscriber<TMessage, TComplete> : Subscriber<TMessage, TComplete>
internal sealed class NopSubscriber<T> : Subscriber<T>
{
public static readonly NopSubscriber<TMessage, TComplete> Instance = new();
public static readonly NopSubscriber<T> Instance = new();

private NopSubscriber()
{
}

[DebuggerStepThrough]
protected override void OnNextCore(TMessage message)
protected override void OnNextCore(T value)
{
}

Expand All @@ -64,46 +53,20 @@ protected override void OnErrorResumeCore(Exception error)
}

[DebuggerStepThrough]
protected override void OnCompletedCore(TComplete complete)
{
}
}

[DebuggerStepThrough]
internal sealed class NopRSubscriber<TMessage, TComplete> : Subscriber<TMessage, Result<TComplete>>
{
public static readonly NopRSubscriber<TMessage, TComplete> Instance = new();

private NopRSubscriber()
{
}

[DebuggerStepThrough]
protected override void OnNextCore(TMessage message)
protected override void OnCompletedCore(Result result)
{
}

[DebuggerStepThrough]
protected override void OnErrorResumeCore(Exception error)
{
EventSystem.GetUnhandledExceptionHandler().Invoke(error);
}

[DebuggerStepThrough]
protected override void OnCompletedCore(Result<TComplete> complete)
{
if (complete.IsFailure)
if (result.IsFailure)
{
EventSystem.GetUnhandledExceptionHandler().Invoke(complete.Exception);
EventSystem.GetUnhandledExceptionHandler().Invoke(result.Exception);
}
}
}

[DebuggerStepThrough]
internal sealed class AnonymousSubscriber<TMessage, TComplete>(Action<TMessage> onNext, Action<Exception> onErrorResume, Action<TComplete> onComplete) : Subscriber<TMessage, TComplete>
internal sealed class AnonymousRSubscriber<T>(Action<T> onNext, Action<Exception> onErrorResume) : Subscriber<T>
{
[DebuggerStepThrough]
protected override void OnNextCore(TMessage message)
protected override void OnNextCore(T value)
{
onNext(message);
}
Expand All @@ -115,17 +78,21 @@ protected override void OnErrorResumeCore(Exception error)
}

[DebuggerStepThrough]
protected override void OnCompletedCore(TComplete complete)
protected override void OnCompletedCore(Result result)
{
onComplete(complete);
if (result.IsFailure)
{
EventSystem.GetUnhandledExceptionHandler().Invoke(result.Exception);
}
}
}


[DebuggerStepThrough]
internal sealed class AnonymousRSubscriber<TMessage, TComplete>(Action<TMessage> onNext, Action<Exception> onErrorResume) : Subscriber<TMessage, Result<TComplete>>
internal sealed class AnonymousSubscriber<T>(Action<T> onNext, Action<Exception> onErrorResume, Action<Result> onComplete) : Subscriber<T>
{
[DebuggerStepThrough]
protected override void OnNextCore(TMessage message)
protected override void OnNextCore(T value)
{
onNext(message);
}
Expand All @@ -137,11 +104,8 @@ protected override void OnErrorResumeCore(Exception error)
}

[DebuggerStepThrough]
protected override void OnCompletedCore(Result<TComplete> complete)
protected override void OnCompletedCore(Result complete)
{
if (complete.IsFailure)
{
EventSystem.GetUnhandledExceptionHandler().Invoke(complete.Exception);
}
onComplete(complete);
}
}
20 changes: 10 additions & 10 deletions src/R3/Factories/Empty.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,30 @@

public static partial class Event
{
public static Event<TMessage, Unit> Empty<TMessage>()
public static Event<T> Empty<T>()
{
return R3.Empty<TMessage>.Instance;
return R3.Empty<T>.Instance;
}

public static Event<TMessage, Unit> Empty<TMessage>(TimeProvider timeProvider)
public static Event<T> Empty<T>(TimeProvider timeProvider)
{
return ReturnOnCompleted<TMessage, Unit>(default, timeProvider);
return ReturnOnCompleted<T>(Result.Success, timeProvider);
}

public static Event<TMessage, Unit> Empty<TMessage>(TimeSpan dueTime, TimeProvider timeProvider)
public static Event<T> Empty<T>(TimeSpan dueTime, TimeProvider timeProvider)
{
return ReturnOnCompleted<TMessage, Unit>(default, dueTime, timeProvider);
return ReturnOnCompleted<T>(Result.Success, dueTime, timeProvider);
}
}

internal sealed class Empty<TMessage> : Event<TMessage, Unit>
internal sealed class Empty<T> : Event<T>
{
// singleton
public static readonly Empty<TMessage> Instance = new Empty<TMessage>();
public static readonly Empty<T> Instance = new Empty<T>();

protected override IDisposable SubscribeCore(Subscriber<TMessage, Unit> subscriber)
protected override IDisposable SubscribeCore(Subscriber<T> subscriber)
{
subscriber.OnCompleted(default);
subscriber.OnCompleted();
return Disposable.Empty;
}

Expand Down
10 changes: 5 additions & 5 deletions src/R3/Factories/Never.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,23 @@
public static partial class Event
{
// Never
public static Event<TMessage, TComplete> Never<TMessage, TComplete>()
public static Event<T> Never<T>()
{
return R3.Never<TMessage, TComplete>.Instance;
return R3.Never<T>.Instance;
}
}

internal sealed class Never<TMessage, TComplete> : Event<TMessage, TComplete>
internal sealed class Never<T> : Event<T>
{
// singleton
public static readonly Never<TMessage, TComplete> Instance = new Never<TMessage, TComplete>();
public static readonly Never<T> Instance = new Never<T>();

Never()
{

}

protected override IDisposable SubscribeCore(Subscriber<TMessage, TComplete> subscriber)
protected override IDisposable SubscribeCore(Subscriber<T> subscriber)
{
return Disposable.Empty;
}
Expand Down
12 changes: 6 additions & 6 deletions src/R3/Factories/Range.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ public static partial class Event
{
// no scheduler(TimeProvider) overload

public static Event<int, Unit> Range(int start, int count)
public static Event<int> Range(int start, int count)
{
long max = ((long)start) + count - 1;
if (count < 0 || max > int.MaxValue)
Expand All @@ -20,7 +20,7 @@ public static Event<int, Unit> Range(int start, int count)
return new Range(start, count);
}

public static Event<int, Unit> Range(int start, int count, CancellationToken cancellationToken)
public static Event<int> Range(int start, int count, CancellationToken cancellationToken)
{
long max = ((long)start) + count - 1;
if (count < 0 || max > int.MaxValue)
Expand All @@ -37,9 +37,9 @@ public static Event<int, Unit> Range(int start, int count, CancellationToken can
}
}

internal sealed class Range(int start, int count) : Event<int, Unit>
internal sealed class Range(int start, int count) : Event<int>
{
protected override IDisposable SubscribeCore(Subscriber<int, Unit> subscriber)
protected override IDisposable SubscribeCore(Subscriber<int> subscriber)
{
for (int i = 0; i < count; i++)
{
Expand All @@ -50,9 +50,9 @@ protected override IDisposable SubscribeCore(Subscriber<int, Unit> subscriber)
}
}

internal sealed class RangeC(int start, int count, CancellationToken cancellationToken) : Event<int, Unit>
internal sealed class RangeC(int start, int count, CancellationToken cancellationToken) : Event<int>
{
protected override IDisposable SubscribeCore(Subscriber<int, Unit> subscriber)
protected override IDisposable SubscribeCore(Subscriber<int> subscriber)
{
for (int i = 0; i < count; i++)
{
Expand Down
Loading

0 comments on commit 369b334

Please sign in to comment.