Skip to content

Commit

Permalink
Subscribe with state
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 14, 2023
1 parent 46319a2 commit 14abb79
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 15 deletions.
4 changes: 2 additions & 2 deletions sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
// range.TakeLast(


//var publisher = new Publisher<int>();
var publisher = new Publisher<int>();
//publisher.PublishOnNext(1);


// publisher.Subscribe(new object(), (x,y) => y

//var xs = await publisher.Take(TimeSpan.FromSeconds(5));

Expand Down
48 changes: 43 additions & 5 deletions src/R3/EventSubscribeExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,16 @@ namespace R3;

public static class EventSubscribeExtensions
{
// TODO: with State

[DebuggerStepThrough]
public static IDisposable Subscribe<T>(this Event<T> source)
{
return source.Subscribe(NopSubscriber<T>.Instance);
}


[DebuggerStepThrough]
public static IDisposable Subscribe<T>(this Event<T> source, Action<T> onNext)
{
return source.Subscribe(new AnonymousSubscriber<T>(onNext, EventSystem.GetUnhandledExceptionHandler(), Stubs.HandleError));
return source.Subscribe(new AnonymousSubscriber<T>(onNext, EventSystem.GetUnhandledExceptionHandler(), Stubs.HandleResult));
}

[DebuggerStepThrough]
Expand All @@ -30,6 +27,26 @@ public static IDisposable Subscribe<T>(this Event<T> source, Action<T> onNext, A
{
return source.Subscribe(new AnonymousSubscriber<T>(onNext, onErrorResume, onComplete));
}

// with state

[DebuggerStepThrough]
public static IDisposable Subscribe<T, TState>(this Event<T> source, TState state, Action<T, TState> onNext)
{
return source.Subscribe(new AnonymousSubscriber<T, TState>(onNext, Stubs<TState>.HandleException, Stubs<TState>.HandleResult, state));
}

[DebuggerStepThrough]
public static IDisposable Subscribe<T, TState>(this Event<T> source, TState state, Action<T, TState> onNext, Action<Result, TState> onComplete)
{
return source.Subscribe(new AnonymousSubscriber<T, TState>(onNext, Stubs<TState>.HandleException, onComplete, state));
}

[DebuggerStepThrough]
public static IDisposable Subscribe<T, TState>(this Event<T> source, TState state, Action<T, TState> onNext, Action<Exception, TState> onErrorResume, Action<Result, TState> onComplete)
{
return source.Subscribe(new AnonymousSubscriber<T, TState>(onNext, onErrorResume, onComplete, state));
}
}

[DebuggerStepThrough]
Expand Down Expand Up @@ -87,7 +104,6 @@ protected override void OnCompletedCore(Result result)
}
}


[DebuggerStepThrough]
internal sealed class AnonymousSubscriber<T>(Action<T> onNext, Action<Exception> onErrorResume, Action<Result> onComplete) : Subscriber<T>
{
Expand All @@ -109,3 +125,25 @@ protected override void OnCompletedCore(Result complete)
onComplete(complete);
}
}

[DebuggerStepThrough]
internal sealed class AnonymousSubscriber<T, TState>(Action<T, TState> onNext, Action<Exception, TState> onErrorResume, Action<Result, TState> onComplete, TState state) : Subscriber<T>
{
[DebuggerStepThrough]
protected override void OnNextCore(T value)
{
onNext(value, state);
}

[DebuggerStepThrough]
protected override void OnErrorResumeCore(Exception error)
{
onErrorResume(error, state);
}

[DebuggerStepThrough]
protected override void OnCompletedCore(Result complete)
{
onComplete(complete, state);
}
}
2 changes: 2 additions & 0 deletions src/R3/Factories/Timer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

public static partial class Event
{
// TODO: No Provider overload?

public static Event<Unit> Timer(TimeSpan dueTime, TimeProvider timeProvider)
{
return new Timer(dueTime, timeProvider);
Expand Down
2 changes: 1 addition & 1 deletion src/R3/Factories/ToEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

public static partial class Event
{
public static Event<T> ToCompletableEvent<T>(this Task<T> task)
public static Event<T> ToEvent<T>(this Task<T> task)
{
return new TaskToEvent<T>(task);
}
Expand Down
19 changes: 18 additions & 1 deletion src/R3/Internal/Stubs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

internal static class Stubs
{
internal static readonly Action<Result> HandleError = static x =>
internal static readonly Action<Result> HandleResult = static x =>
{
if (x.IsFailure)
{
Expand All @@ -14,4 +14,21 @@ internal static class Stubs
internal static class Stubs<T>
{
internal static readonly Func<T, T> ReturnSelf = static x => x;


// TState

internal static readonly Action<Exception, T> HandleException = static (x, _) =>
{
EventSystem.GetUnhandledExceptionHandler().Invoke(x);
};


internal static readonly Action<Result, T> HandleResult = static (x, _) =>
{
if (x.IsFailure)
{
EventSystem.GetUnhandledExceptionHandler().Invoke(x.Exception);
}
};
}
2 changes: 1 addition & 1 deletion src/R3/LiveList.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void ForEach(Action<T> action)
}
}

public void ForEach<TState>(Action<T, TState> action, TState state)
public void ForEach<TState>(TState state, Action<T, TState> action)
{
lock (list)
{
Expand Down
2 changes: 1 addition & 1 deletion src/R3/Operators/DoOnDisposed.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public static Event<T> DoOnDisposed<T>(this Event<T> source, Action action)
return new DoOnDisposed<T>(source, action);
}

public static Event<T> DoOnDisposed<T, TState>(this Event<T> source, Action<TState> action, TState state)
public static Event<T> DoOnDisposed<T, TState>(this Event<T> source, TState state, Action<TState> action)
{
return new DoOnDisposed<T, TState>(source, action, state);
}
Expand Down
36 changes: 32 additions & 4 deletions src/R3/Operators/Select.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@
public static partial class EventExtensions
{
// TODO: Optimize Where.Select
// TODO: CompletableEvent.Select
// TODO: Element index overload

public static Event<TResult> Select<T, TResult>(
this Event<T> source,
Func<T, TResult> selector)
public static Event<TResult> Select<T, TResult>(this Event<T> source, Func<T, TResult> selector)
{
return new Select<T, TResult>(source, selector);
}

public static Event<TResult> Select<T, TResult, TState>(this Event<T> source, TState state, Func<T, TState, TResult> selector)
{
return new Select<T, TResult, TState>(source, selector, state);
}
}

internal sealed class Select<T, TResult>(Event<T> source, Func<T, TResult> selector) : Event<TResult>
Expand Down Expand Up @@ -39,3 +41,29 @@ protected override void OnCompletedCore(Result result)
}
}
}

internal sealed class Select<T, TResult, TState>(Event<T> source, Func<T, TState, TResult> selector, TState state) : Event<TResult>
{
protected override IDisposable SubscribeCore(Subscriber<TResult> subscriber)
{
return source.Subscribe(new _Select(subscriber, selector, state));
}

class _Select(Subscriber<TResult> subscriber, Func<T, TState, TResult> selector, TState state) : Subscriber<T>
{
protected override void OnNextCore(T value)
{
subscriber.OnNext(selector(value, state));
}

protected override void OnErrorResumeCore(Exception error)
{
subscriber.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
subscriber.OnCompleted(result);
}
}
}
2 changes: 2 additions & 0 deletions src/R3/Operators/Where.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

public static partial class EventExtensions
{
// TODO: TState

public static Event<T> Where<T>(this Event<T> source, Func<T, bool> predicate)
{
if (source is Where<T> where)
Expand Down

0 comments on commit 14abb79

Please sign in to comment.