Skip to content

Commit

Permalink
working ObserveOn
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 25, 2023
1 parent 12bfd55 commit 4891702
Show file tree
Hide file tree
Showing 4 changed files with 529 additions and 3 deletions.
41 changes: 41 additions & 0 deletions src/R3/Internal/PooledThreadPoolWorkItem.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using System.Collections.Concurrent;

namespace R3.Internal;

internal sealed class PooledThreadPoolWorkItem<T> : IThreadPoolWorkItem
{
static ConcurrentQueue<PooledThreadPoolWorkItem<T>> pool = new();

T state = default!;
Action<T> action = default!;

PooledThreadPoolWorkItem()
{
}

public static IThreadPoolWorkItem Create(Action<T> action, T state)
{
if (!pool.TryDequeue(out var item))
{
item = new PooledThreadPoolWorkItem<T>();
}

item.state = state;
item.action = action;
return item;
}

public void Execute()
{
try
{
action(state);
}
finally
{
state = default!;
action = default!;
pool.Enqueue(this);
}
}
}
60 changes: 60 additions & 0 deletions src/R3/Notification.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
namespace R3;

public enum NotificationKind
{
OnNext,
OnErrorResume,
OnCompleted
}

public readonly struct Notification<T>
{
readonly NotificationKind kind;
readonly T? value;
readonly Exception? error;
readonly Result? result;

public NotificationKind Kind => kind;
public T? Value => value;
public Exception? Error => error;
public Result? Result => result;

public Notification(T value)
{
this.kind = NotificationKind.OnNext;
this.value = value;
this.error = null;
this.result = default;
}

public Notification(Exception error)
{
this.kind = NotificationKind.OnErrorResume;
this.value = default;
this.error = error;
this.result = default;
}

public Notification(Result result)
{
this.kind = NotificationKind.OnCompleted;
this.value = default;
this.error = default;
this.result = result;
}

public override string? ToString()
{
switch (kind)
{
case NotificationKind.OnNext:
return value!.ToString();
case NotificationKind.OnErrorResume:
return error!.ToString();
case NotificationKind.OnCompleted:
return result!.Value.ToString();
default:
return "";
}
}
}
4 changes: 2 additions & 2 deletions src/R3/Operators/Do.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ protected override void OnErrorResumeCore(Exception error)
protected override void OnCompletedCore(Result result)
{
onCompleted?.Invoke(result);
observer.OnCompleted();
observer.OnCompleted(result);
}

protected override void DisposeCore()
Expand Down Expand Up @@ -78,7 +78,7 @@ protected override void OnErrorResumeCore(Exception error)
protected override void OnCompletedCore(Result result)
{
onCompleted?.Invoke(result, state);
observer.OnCompleted();
observer.OnCompleted(result);
}

protected override void DisposeCore()
Expand Down
Loading

0 comments on commit 4891702

Please sign in to comment.