Skip to content

Commit

Permalink
Refactor execution handling for commands
Browse files Browse the repository at this point in the history
- Added support for being notified when IsExecuting has been changed
  • Loading branch information
michaelstonis committed Dec 6, 2024
1 parent 26f62f1 commit 389a768
Showing 1 changed file with 81 additions and 8 deletions.
89 changes: 81 additions & 8 deletions src/R3/ReactiveCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public class ReactiveCommand<T> : Observable<T>, ICommand, IDisposable
IDisposable subscription; // from canExecuteSource (and onNext).
bool canExecute; // set from observable sequence
int executionCount;
readonly object gate = new();

public event EventHandler? CanExecuteChanged;

Expand Down Expand Up @@ -96,14 +97,23 @@ private void HandleExecution<T>(T value, Action<T> execute)
{
try
{
IsExecuting.Value = Interlocked.Increment(ref executionCount) > 0;
lock (gate)
{
executionCount++;
IsExecuting.Value = executionCount > 0;
}

execute(value);
}
finally
{
if (Interlocked.Decrement(ref executionCount) == 0)
lock (gate)
{
IsExecuting.Value = false;
executionCount--;
if (executionCount == 0)
{
IsExecuting.Value = false;
}
}
}
}
Expand All @@ -112,14 +122,23 @@ private async ValueTask HandleAsyncExecution<T>(T value, Func<T, CancellationTok
{
try
{
IsExecuting.Value = Interlocked.Increment(ref executionCount) > 0;
lock (gate)
{
executionCount++;
IsExecuting.Value = executionCount > 0;
}

await executeAsync(value, cancellationToken);
}
finally
{
if (Interlocked.Decrement(ref executionCount) == 0)
lock (gate)
{
IsExecuting.Value = false;
executionCount--;
if (executionCount == 0)
{
IsExecuting.Value = false;
}
}
}
}
Expand Down Expand Up @@ -202,12 +221,16 @@ public class ReactiveCommand<TInput, TOutput> : Observable<TOutput>, ICommand, I
CompleteState completeState; // struct(int, IntPtr)
bool canExecute; // set from observable sequence
IDisposable subscription;
int executionCount;
readonly object gate = new();

readonly Func<TInput, TOutput>? convert; // for sync
SingleAssignmentSubject<TInput>? asyncInput; // for async

public event EventHandler? CanExecuteChanged;

public ReactiveProperty<bool> IsExecuting { get; } = new();

public ReactiveCommand(Func<TInput, TOutput> convert)
{
this.list = new FreeListCore<Subscription>(this);
Expand All @@ -221,7 +244,7 @@ public ReactiveCommand(Func<TInput, CancellationToken, ValueTask<TOutput>> conve
this.list = new FreeListCore<Subscription>(this);
this.canExecute = true;
this.asyncInput = new SingleAssignmentSubject<TInput>();
this.subscription = asyncInput.SelectAwait(convertAsync, awaitOperation, configureAwait, cancelOnCompleted, maxSequential).Subscribe(this, static (x, state) =>
this.subscription = asyncInput.SelectAwait((Command: this, ConvertAsync: convertAsync), static (value, state, cancellationToken) => state.Command.HandleAsyncExecution(value, state.ConvertAsync, cancellationToken), awaitOperation, configureAwait, cancelOnCompleted, maxSequential).Subscribe(this, static (x, state) =>
{
if (state.completeState.IsCompleted) return;

Expand Down Expand Up @@ -316,6 +339,56 @@ public void Execute(TInput parameter)
}
}

private void HandleExecution(TInput value, Action<TInput> execute)
{
try
{
lock (gate)
{
executionCount++;
IsExecuting.Value = executionCount > 0;
}

execute(value);
}
finally
{
lock (gate)
{
executionCount--;
if (executionCount == 0)
{
IsExecuting.Value = false;
}
}
}
}

private async ValueTask<TOutput> HandleAsyncExecution(TInput value, Func<TInput, CancellationToken, ValueTask<TOutput>> executeAsync, CancellationToken cancellationToken)
{
try
{
lock (gate)
{
executionCount++;
IsExecuting.Value = executionCount > 0;
}

return await executeAsync(value, cancellationToken);
}
finally
{
lock (gate)
{
executionCount--;
if (executionCount == 0)
{
IsExecuting.Value = false;
}
}
}
}

protected override IDisposable SubscribeCore(Observer<TOutput> observer)
{
var result = completeState.TryGetResult();
Expand Down Expand Up @@ -456,7 +529,7 @@ public static ReactiveCommand<T> ToReactiveCommand<T>(
{
var command = new ReactiveCommand<T>(canExecuteSource, initialCanExecute);

var subscription = command.SubscribeAwait(async (x, ct) => await executeAsync(x, ct), awaitOperation, configureAwait, cancelOnCompleted, maxSequential);
var subscription = command.SubscribeAwait(executeAsync, async (x, func, ct) => await func(x, ct), awaitOperation, configureAwait, cancelOnCompleted, maxSequential);
command.CombineSubscription(subscription);

return command;
Expand Down

0 comments on commit 389a768

Please sign in to comment.