Skip to content

Commit

Permalink
TaskSubscriberBase
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 12, 2023
1 parent bd2a01c commit 0f73153
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 106 deletions.
71 changes: 71 additions & 0 deletions src/R3/Internal/TaskSubscriberBase.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@

namespace R3.Internal;

// for return Task(tcs.TrySet***)
// include proper Cancel registration

internal abstract class TaskSubscriberBase<TMessage, TTask> : Subscriber<TMessage>
{
protected TaskCompletionSource<TTask> tcs; // use this field.

CancellationToken cancellationToken;
CancellationTokenRegistration tokenRegistration;

public Task<TTask> Task => tcs.Task;

public TaskSubscriberBase(CancellationToken cancellationToken)
{
this.tcs = new TaskCompletionSource<TTask>();
this.cancellationToken = cancellationToken;

if (cancellationToken.CanBeCanceled)
{
this.tokenRegistration = cancellationToken.UnsafeRegister(static state =>
{
var s = (TaskSubscriberBase<TMessage, TTask>)state!;

s.Dispose(); // subscriber is subscription, dispose
s.tcs.TrySetCanceled(s.cancellationToken);
}, this);
}
}

// if override, should call base.DisposeCore(), be careful.
protected override void DisposeCore()
{
tokenRegistration.Dispose();
}
}

internal abstract class TaskSubscriberBase<TMessage, TComplete, TTask> : Subscriber<TMessage, TComplete>
{
protected TaskCompletionSource<TTask> tcs; // use this field.

CancellationToken cancellationToken;
CancellationTokenRegistration tokenRegistration;

public Task<TTask> Task => tcs.Task;

public TaskSubscriberBase(CancellationToken cancellationToken)
{
this.tcs = new TaskCompletionSource<TTask>();
this.cancellationToken = cancellationToken;

if (cancellationToken.CanBeCanceled)
{
this.tokenRegistration = cancellationToken.UnsafeRegister(static state =>
{
var s = (TaskSubscriberBase<TMessage, TComplete, TTask>)state!;

s.Dispose(); // subscriber is subscription, dispose
s.tcs.TrySetCanceled(s.cancellationToken);
}, this);
}
}

// if override, should call base.DisposeCore(), be careful.
protected override void DisposeCore()
{
tokenRegistration.Dispose();
}
}
72 changes: 12 additions & 60 deletions src/R3/Operators/AggregateAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,9 @@ public static Task<TResult> AggregateAsync<TMessage, TComplete, TAccumulate, TRe
Func<TAccumulate, TComplete, TResult> resultSelector,
CancellationToken cancellationToken = default)
{
var tcs = new TaskCompletionSource<TResult>();

var subscriber = new AggregateAsync<TMessage, TComplete, TAccumulate, TResult>(tcs, seed, func, resultSelector, cancellationToken);

// before Subscribe, register and set CancellationTokenRegistration
subscriber.tokenRegistration = cancellationToken.UnsafeRegister(static state =>
{
var s = (AggregateAsync<TMessage, TComplete, TAccumulate, TResult>)state!;

s.Dispose(); // subscriber is subscription, dispose
s.tcs.TrySetCanceled(s.cancellationToken);
}, subscriber);

source.Subscribe(subscriber); // return subscriber self so ignore subscription

return tcs.Task;
var subscriber = new AggregateAsync<TMessage, TComplete, TAccumulate, TResult>(seed, func, resultSelector, cancellationToken);
source.Subscribe(subscriber);
return subscriber.Task;
}

public static Task<TResult> AggregateAsync<TMessage, TComplete, TAccumulate, TResult>
Expand All @@ -34,41 +21,22 @@ public static Task<TResult> AggregateAsync<TMessage, TComplete, TAccumulate, TRe
Func<TAccumulate, Result<TComplete>, TResult> resultSelector,
CancellationToken cancellationToken = default)
{
var tcs = new TaskCompletionSource<TResult>();

var subscriber = new AggregateAsyncR<TMessage, TComplete, TAccumulate, TResult>(tcs, seed, func, resultSelector, cancellationToken);

// before Subscribe, register and set CancellationTokenRegistration
subscriber.tokenRegistration = cancellationToken.UnsafeRegister(static state =>
{
var s = (AggregateAsyncR<TMessage, TComplete, TAccumulate, TResult>)state!;

s.Dispose(); // subscriber is subscription, dispose
s.tcs.TrySetCanceled(s.cancellationToken);
}, subscriber);

source.Subscribe(subscriber); // return subscriber self so ignore subscription

return tcs.Task;
var subscriber = new AggregateAsyncR<TMessage, TComplete, TAccumulate, TResult>(seed, func, resultSelector, cancellationToken);
source.Subscribe(subscriber);
return subscriber.Task;
}
}
}

namespace R3.Operators
{

internal sealed class AggregateAsync<TMessage, TComplete, TAccumulate, TResult>(
TaskCompletionSource<TResult> tcs,
TAccumulate seed,
Func<TAccumulate, TMessage, TAccumulate> func,
Func<TAccumulate, TComplete, TResult> resultSelector,
CancellationToken cancellationToken) : Subscriber<TMessage, TComplete>
TAccumulate seed,
Func<TAccumulate, TMessage, TAccumulate> func,
Func<TAccumulate, TComplete, TResult> resultSelector,
CancellationToken cancellationToken)
: TaskSubscriberBase<TMessage, TComplete, TResult>(cancellationToken)
{
// hold state for CancellationToken.Register
internal TaskCompletionSource<TResult> tcs = tcs;
internal CancellationToken cancellationToken = cancellationToken;
internal CancellationTokenRegistration tokenRegistration;

TAccumulate value = seed;

protected override void OnNextCore(TMessage message)
Expand All @@ -93,25 +61,14 @@ protected override void OnCompletedCore(TComplete complete)
tcs.TrySetException(ex);
}
}

protected override void DisposeCore()
{
tokenRegistration.Dispose();
}
}

internal sealed class AggregateAsyncR<TMessage, TComplete, TAccumulate, TResult>(
TaskCompletionSource<TResult> tcs,
TAccumulate seed,
Func<TAccumulate, TMessage, TAccumulate> func,
Func<TAccumulate, Result<TComplete>, TResult> resultSelector,
CancellationToken cancellationToken) : Subscriber<TMessage, Result<TComplete>>
CancellationToken cancellationToken) : TaskSubscriberBase<TMessage, Result<TComplete>, TResult>(cancellationToken)
{
// hold state for CancellationToken.Register
internal TaskCompletionSource<TResult> tcs = tcs;
internal CancellationToken cancellationToken = cancellationToken;
internal CancellationTokenRegistration tokenRegistration;

TAccumulate value = seed;

protected override void OnNextCore(TMessage message)
Expand Down Expand Up @@ -143,10 +100,5 @@ protected override void OnCompletedCore(Result<TComplete> complete)
tcs.TrySetException(ex);
}
}

protected override void DisposeCore()
{
tokenRegistration.Dispose();
}
}
}
56 changes: 10 additions & 46 deletions src/R3/Operators/FirstLastSingle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,56 +25,25 @@ public static partial class EventExtensions

public static Task<TMessage> FirstAsync<TMessage>(this Event<TMessage> source, Func<TMessage, bool> predicate, CancellationToken cancellationToken = default)
{
var tcs = new TaskCompletionSource<TMessage>();

var subscriber = new First<TMessage>(tcs, predicate, cancellationToken);

// before Subscribe, register and set CancellationTokenRegistration
subscriber.tokenRegistration = cancellationToken.UnsafeRegister(static state =>
{
var s = (First<TMessage>)state!;

s.Dispose(); // subscriber is subscription, dispose
s.tcs.TrySetCanceled(s.cancellationToken);
}, subscriber);

source.Subscribe(subscriber); // return subscriber self so ignore subscription

return tcs.Task;
var subscriber = new First<TMessage>(predicate, cancellationToken);
source.Subscribe(subscriber);
return subscriber.Task;
}

static Task<TMessage> FirstLastSingleAsync<TMessage, TComplete>(this CompletableEvent<TMessage, TComplete> source, FirstLastSingleOperation operation, bool useDefaultIfEmpty, TMessage? defaultValue, Func<TMessage, bool> predicate, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<TMessage>();

var subscriber = new FirstLastSingle<TMessage, TComplete>(tcs, operation, useDefaultIfEmpty, defaultValue, predicate, cancellationToken);

// before Subscribe, register and set CancellationTokenRegistration
subscriber.tokenRegistration = cancellationToken.UnsafeRegister(static state =>
{
var s = (FirstLastSingle<TMessage, TComplete>)state!;

s.Dispose(); // subscriber is subscription, dispose
s.tcs.TrySetCanceled(s.cancellationToken);
}, subscriber);

source.Subscribe(subscriber); // return subscriber self so ignore subscription

return tcs.Task;
var subscriber = new FirstLastSingle<TMessage, TComplete>(operation, useDefaultIfEmpty, defaultValue, predicate, cancellationToken);
source.Subscribe(subscriber);
return subscriber.Task;
}
}
}

namespace R3.Operators
{
internal sealed class First<TMessage>(TaskCompletionSource<TMessage> tcs, Func<TMessage, bool> predicate, CancellationToken cancellationToken)
: Subscriber<TMessage>
internal sealed class First<TMessage>(Func<TMessage, bool> predicate, CancellationToken cancellationToken)
: TaskSubscriberBase<TMessage, TMessage>(cancellationToken)
{
// hold state for CancellationToken.Register
internal TaskCompletionSource<TMessage> tcs = tcs;
internal CancellationToken cancellationToken = cancellationToken;
internal CancellationTokenRegistration tokenRegistration;

protected override void OnNextCore(TMessage message)
{
if (!predicate(message)) return;
Expand All @@ -89,14 +58,9 @@ protected override void OnErrorResumeCore(Exception error)
}
}

internal sealed class FirstLastSingle<TMessage, TComplete>(TaskCompletionSource<TMessage> tcs, FirstLastSingleOperation operation, bool useDefaultIfEmpty, TMessage? defaultValue, Func<TMessage, bool> predicate, CancellationToken cancellationToken)
: Subscriber<TMessage, TComplete>
internal sealed class FirstLastSingle<TMessage, TComplete>(FirstLastSingleOperation operation, bool useDefaultIfEmpty, TMessage? defaultValue, Func<TMessage, bool> predicate, CancellationToken cancellationToken)
: TaskSubscriberBase<TMessage, TComplete, TMessage>(cancellationToken)
{
// hold state for CancellationToken.Register
internal TaskCompletionSource<TMessage> tcs = tcs;
internal CancellationToken cancellationToken = cancellationToken;
internal CancellationTokenRegistration tokenRegistration;

bool hasValue;
TMessage? latestValue = defaultValue;

Expand Down

0 comments on commit 0f73153

Please sign in to comment.