Skip to content

Commit

Permalink
FromEvent supports CancellationToken
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Jan 6, 2024
1 parent 409b5f4 commit edf8ee6
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 18 deletions.
70 changes: 52 additions & 18 deletions src/R3/Factories/FromEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,111 +2,145 @@

public static partial class Observable
{
public static Observable<(object? sender, EventArgs e)> FromEventHandler(Action<EventHandler> addHandler, Action<EventHandler> removeHandler)
public static Observable<(object? sender, EventArgs e)> FromEventHandler(Action<EventHandler> addHandler, Action<EventHandler> removeHandler, CancellationToken cancellationToken = default)
{
return new FromEvent<EventHandler, (object? sender, EventArgs e)>(h => (sender, e) => h((sender, e)), addHandler, removeHandler);
return new FromEvent<EventHandler, (object? sender, EventArgs e)>(h => (sender, e) => h((sender, e)), addHandler, removeHandler, cancellationToken);
}

public static Observable<(object? sender, TEventArgs e)> FromEventHandler<TEventArgs>(Action<EventHandler<TEventArgs>> addHandler, Action<EventHandler<TEventArgs>> removeHandler)
public static Observable<(object? sender, TEventArgs e)> FromEventHandler<TEventArgs>(Action<EventHandler<TEventArgs>> addHandler, Action<EventHandler<TEventArgs>> removeHandler, CancellationToken cancellationToken = default)
{
return new FromEvent<EventHandler<TEventArgs>, (object? sender, TEventArgs e)>(h => (sender, e) => h((sender, e)), addHandler, removeHandler);
return new FromEvent<EventHandler<TEventArgs>, (object? sender, TEventArgs e)>(h => (sender, e) => h((sender, e)), addHandler, removeHandler, cancellationToken);
}

public static Observable<Unit> FromEvent(Action<Action> addHandler, Action<Action> removeHandler)
public static Observable<Unit> FromEvent(Action<Action> addHandler, Action<Action> removeHandler, CancellationToken cancellationToken = default)
{
return new FromEvent<Action>(static h => h, addHandler, removeHandler);
return new FromEvent<Action>(static h => h, addHandler, removeHandler, cancellationToken);
}

public static Observable<T> FromEvent<T>(Action<Action<T>> addHandler, Action<Action<T>> removeHandler)
public static Observable<T> FromEvent<T>(Action<Action<T>> addHandler, Action<Action<T>> removeHandler, CancellationToken cancellationToken = default)
{
return new FromEvent<Action<T>, T>(static h => h, addHandler, removeHandler);
return new FromEvent<Action<T>, T>(static h => h, addHandler, removeHandler, cancellationToken);
}

public static Observable<Unit> FromEvent<TDelegate>(Func<Action, TDelegate> conversion, Action<TDelegate> addHandler, Action<TDelegate> removeHandler)
public static Observable<Unit> FromEvent<TDelegate>(Func<Action, TDelegate> conversion, Action<TDelegate> addHandler, Action<TDelegate> removeHandler, CancellationToken cancellationToken = default)
{
return new FromEvent<TDelegate>(conversion, addHandler, removeHandler);
return new FromEvent<TDelegate>(conversion, addHandler, removeHandler, cancellationToken);
}

public static Observable<T> FromEvent<TDelegate, T>(Func<Action<T>, TDelegate> conversion, Action<TDelegate> addHandler, Action<TDelegate> removeHandler)
public static Observable<T> FromEvent<TDelegate, T>(Func<Action<T>, TDelegate> conversion, Action<TDelegate> addHandler, Action<TDelegate> removeHandler, CancellationToken cancellationToken = default)
{
return new FromEvent<TDelegate, T>(conversion, addHandler, removeHandler);
return new FromEvent<TDelegate, T>(conversion, addHandler, removeHandler, cancellationToken);
}
}

internal sealed class FromEvent<TDelegate>(Func<Action, TDelegate> conversion, Action<TDelegate> addHandler, Action<TDelegate> removeHandler)
internal sealed class FromEvent<TDelegate>(Func<Action, TDelegate> conversion, Action<TDelegate> addHandler, Action<TDelegate> removeHandler, CancellationToken cancellationToken)
: Observable<Unit>
{
protected override IDisposable SubscribeCore(Observer<Unit> observer)
{
return new _FromEventPattern(conversion, addHandler, removeHandler, observer);
return new _FromEventPattern(conversion, addHandler, removeHandler, observer, cancellationToken);
}

sealed class _FromEventPattern : IDisposable
{
Observer<Unit>? observer;
Action<TDelegate>? removeHandler;
TDelegate registeredHandler;
CancellationTokenRegistration cancellationTokenRegistration;

public _FromEventPattern(Func<Action, TDelegate> conversion, Action<TDelegate> addHandler, Action<TDelegate> removeHandler, Observer<Unit> observer)
public _FromEventPattern(Func<Action, TDelegate> conversion, Action<TDelegate> addHandler, Action<TDelegate> removeHandler, Observer<Unit> observer, CancellationToken cancellationToken)
{
this.observer = observer;
this.removeHandler = removeHandler;
this.registeredHandler = conversion(OnNext);
addHandler(this.registeredHandler);

if (cancellationToken.CanBeCanceled)
{
this.cancellationTokenRegistration = cancellationToken.UnsafeRegister(static state =>
{
var s = (_FromEventPattern)state!;
s.CompleteDispose();
}, this);
}
}

void OnNext()
{
observer?.OnNext(default);
}

void CompleteDispose()
{
observer?.OnCompleted();
Dispose();
}

public void Dispose()
{
var handler = Interlocked.Exchange(ref removeHandler, null);
if (handler != null)
{
observer = null;
removeHandler = null;
cancellationTokenRegistration.Dispose();
handler(this.registeredHandler);
}
}
}
}

internal sealed class FromEvent<TDelegate, T>(Func<Action<T>, TDelegate> conversion, Action<TDelegate> addHandler, Action<TDelegate> removeHandler)
internal sealed class FromEvent<TDelegate, T>(Func<Action<T>, TDelegate> conversion, Action<TDelegate> addHandler, Action<TDelegate> removeHandler, CancellationToken cancellationToken)
: Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
return new _FromEventPattern(conversion, addHandler, removeHandler, observer);
return new _FromEventPattern(conversion, addHandler, removeHandler, observer, cancellationToken);
}

sealed class _FromEventPattern : IDisposable
{
Observer<T>? observer;
Action<TDelegate>? removeHandler;
TDelegate registeredHandler;
CancellationTokenRegistration cancellationTokenRegistration;

public _FromEventPattern(Func<Action<T>, TDelegate> conversion, Action<TDelegate> addHandler, Action<TDelegate> removeHandler, Observer<T> observer)
public _FromEventPattern(Func<Action<T>, TDelegate> conversion, Action<TDelegate> addHandler, Action<TDelegate> removeHandler, Observer<T> observer, CancellationToken cancellationToken)
{
this.observer = observer;
this.removeHandler = removeHandler;
this.registeredHandler = conversion(OnNext);
addHandler(this.registeredHandler);

if (cancellationToken.CanBeCanceled)
{
this.cancellationTokenRegistration = cancellationToken.UnsafeRegister(static state =>
{
var s = (_FromEventPattern)state!;
s.CompleteDispose();
}, this);
}
}

void OnNext(T value)
{
observer?.OnNext(value);
}

void CompleteDispose()
{
observer?.OnCompleted();
Dispose();
}

public void Dispose()
{
var handler = Interlocked.Exchange(ref removeHandler, null);
if (handler != null)
{
observer = null;
removeHandler = null;
cancellationTokenRegistration.Dispose();
handler(this.registeredHandler);
}
}
Expand Down
34 changes: 34 additions & 0 deletions tests/R3.Tests/FactoryTests/FromEventTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,40 @@ public void Event()
ev.InvocationListCount().Should().Be((0, 0, 0, 0, 0, 0, 0));
}

[Fact]
public void Cancel()
{
var cts = new CancellationTokenSource();

var ev = new EventPattern();

var l1 = Observable.FromEventHandler(h => ev.E1 += h, h => ev.E1 -= h, cts.Token).ToLiveList();
var l2 = Observable.FromEventHandler<int>(h => ev.E2 += h, h => ev.E2 -= h, cts.Token).ToLiveList();
var l3 = Observable.FromEvent(h => ev.A1 += h, h => ev.A1 -= h, cts.Token).ToLiveList();
var l4 = Observable.FromEvent<int>(h => ev.A2 += h, h => ev.A2 -= h, cts.Token).ToLiveList();
var l5 = Observable.FromEvent<MyDelegate1>(h => new MyDelegate1(h), h => ev.M1 += h, h => ev.M1 -= h, cts.Token).ToLiveList();
var l6 = Observable.FromEvent<MyDelegate2, int>(h => new MyDelegate2(h), h => ev.M2 += h, h => ev.M2 -= h, cts.Token).ToLiveList();
var l7 = Observable.FromEvent<MyDelegate3, (int x, int y)>(h => (x, y) => h((x, y)), h => ev.M3 += h, h => ev.M3 -= h, cts.Token).ToLiveList();

ev.Raise(10, 20);
ev.Raise(100, 200);

l1.Should().HaveCount(2);
l3.Should().HaveCount(2);
l5.Should().HaveCount(2);

l2.Select(x => x.e).Should().Equal([10, 100]);
l4.AssertEqual([10, 100]);
l6.AssertEqual([10, 100]);
l7.AssertEqual([(10, 20), (100, 200)]);

ev.InvocationListCount().Should().Be((1, 1, 1, 1, 1, 1, 1));

cts.Cancel();

ev.InvocationListCount().Should().Be((0, 0, 0, 0, 0, 0, 0));
}


class EventPattern
{
Expand Down

0 comments on commit edf8ee6

Please sign in to comment.