Skip to content

Commit

Permalink
full rename
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 15, 2023
1 parent cedf915 commit 2e50606
Show file tree
Hide file tree
Showing 53 changed files with 720 additions and 676 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
Third Generation of Reactive Extensions.

```csharp
public abstract class Event<T>
public abstract class Observable<T>
{
public IDisposable Subscribe(Subscriber<T> subscriber);
public IDisposable Subscribe(Observer<T> observer);
}

public abstract class Subscriber<T> : IDisposable
public abstract class Observer<T> : IDisposable
{
public void OnNext(T value);
public void OnErrorResume(Exception error);
Expand Down
6 changes: 3 additions & 3 deletions sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
x.SetMinimumLevel(LogLevel.Trace);
x.AddZLoggerConsole();
});
EventSystem.Logger = factory.CreateLogger<EventSystem>();
ObservableSystem.Logger = factory.CreateLogger<ObservableSystem>();
var logger = factory.CreateLogger<Program>();



var ct = new CancellationTokenSource(1000);
EventSystem.DefaultFrameProvider = new ThreadSleepFrameProvider(60);
ObservableSystem.DefaultFrameProvider = new ThreadSleepFrameProvider(60);


// Enumerable.Empty<int>().ElementAtOrDefault(
Expand All @@ -33,7 +33,7 @@
// range.TakeLast(


var publisher = new Publisher<int>();
var publisher = new R3.Subject<int>();
//publisher.PublishOnNext(1);

// publisher.Subscribe(new object(), (x,y) => y
Expand Down
4 changes: 2 additions & 2 deletions src/R3/Factories/Empty.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ internal sealed class Empty<T> : Observable<T>
// singleton
public static readonly Empty<T> Instance = new Empty<T>();

protected override IDisposable SubscribeCore(Observer<T> subscriber)
protected override IDisposable SubscribeCore(Observer<T> observer)
{
subscriber.OnCompleted();
observer.OnCompleted();
return Disposable.Empty;
}

Expand Down
20 changes: 10 additions & 10 deletions src/R3/Factories/EveryUpdate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ public static partial class Observable
{
public static Observable<Unit> EveryUpdate()
{
return new EveryUpdate(EventSystem.DefaultFrameProvider, CancellationToken.None, cancelImmediately: false);
return new EveryUpdate(ObservableSystem.DefaultFrameProvider, CancellationToken.None, cancelImmediately: false);
}

public static Observable<Unit> EveryUpdate(CancellationToken cancellationToken)
{
return new EveryUpdate(EventSystem.DefaultFrameProvider, cancellationToken, cancelImmediately: false);
return new EveryUpdate(ObservableSystem.DefaultFrameProvider, cancellationToken, cancelImmediately: false);
}

public static Observable<Unit> EveryUpdate(FrameProvider frameProvider)
Expand All @@ -31,31 +31,31 @@ public static Observable<Unit> EveryUpdate(FrameProvider frameProvider, Cancella

internal sealed class EveryUpdate(FrameProvider frameProvider, CancellationToken cancellationToken, bool cancelImmediately) : Observable<Unit>
{
protected override IDisposable SubscribeCore(Observer<Unit> subscriber)
protected override IDisposable SubscribeCore(Observer<Unit> observer)
{
var runner = new EveryUpdateRunnerWorkItem(subscriber, cancellationToken, cancelImmediately);
var runner = new EveryUpdateRunnerWorkItem(observer, cancellationToken, cancelImmediately);
frameProvider.Register(runner);
return runner;
}

class EveryUpdateRunnerWorkItem : IFrameRunnerWorkItem, IDisposable
{
Observer<Unit> subscriber;
Observer<Unit> observer;
CancellationToken cancellationToken;
CancellationTokenRegistration cancellationTokenRegistration;
bool isDisposed;

public EveryUpdateRunnerWorkItem(Observer<Unit> subscriber, CancellationToken cancellationToken, bool cancelImmediately)
public EveryUpdateRunnerWorkItem(Observer<Unit> observer, CancellationToken cancellationToken, bool cancelImmediately)
{
this.subscriber = subscriber;
this.observer = observer;
this.cancellationToken = cancellationToken;

if (cancelImmediately && cancellationToken.CanBeCanceled)
{
cancellationTokenRegistration = cancellationToken.UnsafeRegister(static state =>
{
var s = (EveryUpdateRunnerWorkItem)state!;
s.subscriber.OnCompleted();
s.observer.OnCompleted();
s.Dispose();
}, this);
}
Expand All @@ -70,12 +70,12 @@ public bool MoveNext(long frameCount)

if (cancellationToken.IsCancellationRequested)
{
subscriber.OnCompleted();
observer.OnCompleted();
Dispose();
return false;
}

subscriber.OnNext(default);
observer.OnNext(default);
return true;
}

Expand Down
2 changes: 1 addition & 1 deletion src/R3/Factories/Never.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ internal sealed class Never<T> : Observable<T>

}

protected override IDisposable SubscribeCore(Observer<T> subscriber)
protected override IDisposable SubscribeCore(Observer<T> observer)
{
return Disposable.Empty;
}
Expand Down
14 changes: 7 additions & 7 deletions src/R3/Factories/Range.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,31 +39,31 @@ public static Observable<int> Range(int start, int count, CancellationToken canc

internal sealed class Range(int start, int count) : Observable<int>
{
protected override IDisposable SubscribeCore(Observer<int> subscriber)
protected override IDisposable SubscribeCore(Observer<int> observer)
{
for (int i = 0; i < count; i++)
{
subscriber.OnNext(start + i);
observer.OnNext(start + i);
}
subscriber.OnCompleted(default);
observer.OnCompleted(default);
return Disposable.Empty;
}
}

internal sealed class RangeC(int start, int count, CancellationToken cancellationToken) : Observable<int>
{
protected override IDisposable SubscribeCore(Observer<int> subscriber)
protected override IDisposable SubscribeCore(Observer<int> observer)
{
for (int i = 0; i < count; i++)
{
if (cancellationToken.IsCancellationRequested)
{
subscriber.OnCompleted();
observer.OnCompleted();
return Disposable.Empty;
}
subscriber.OnNext(start + i);
observer.OnNext(start + i);
}
subscriber.OnCompleted();
observer.OnCompleted();
return Disposable.Empty;
}
}
14 changes: 7 additions & 7 deletions src/R3/Factories/Repeat.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,31 +38,31 @@ public static Observable<T> Repeat<T>(T value, int count, CancellationToken canc

internal sealed class Repeat<T>(T value, int count) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> subscriber)
protected override IDisposable SubscribeCore(Observer<T> observer)
{
for (int i = 0; i < count; i++)
{
subscriber.OnNext(value);
observer.OnNext(value);
}
subscriber.OnCompleted(default);
observer.OnCompleted(default);
return Disposable.Empty;
}
}

internal sealed class RepeatC<T>(T value, int count, CancellationToken cancellationToken) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> subscriber)
protected override IDisposable SubscribeCore(Observer<T> observer)
{
for (int i = 0; i < count; i++)
{
if (cancellationToken.IsCancellationRequested)
{
subscriber.OnCompleted();
observer.OnCompleted();
return Disposable.Empty;
}
subscriber.OnNext(value);
observer.OnNext(value);
}
subscriber.OnCompleted(default);
observer.OnCompleted(default);
return Disposable.Empty;
}
}
28 changes: 14 additions & 14 deletions src/R3/Factories/Return.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,28 @@ public static Observable<T> Return<T>(T value, TimeSpan dueTime, TimeProvider ti

internal class Return<T>(T value, TimeSpan dueTime, TimeProvider timeProvider) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> subscriber)
protected override IDisposable SubscribeCore(Observer<T> observer)
{
var method = new _Return(value, subscriber);
var method = new _Return(value, observer);
method.Timer = timeProvider.CreateStoppedTimer(_Return.timerCallback, method);
method.Timer.InvokeOnce(dueTime);
return method;
}

sealed class _Return(T value, Observer<T> subscriber) : IDisposable
sealed class _Return(T value, Observer<T> observer) : IDisposable
{
public static readonly TimerCallback timerCallback = NextTick;

readonly T value = value;
readonly Observer<T> subscriber = subscriber;
readonly Observer<T> observer = observer;

public ITimer? Timer { get; set; }

static void NextTick(object? state)
{
var self = (_Return)state!;
self.subscriber.OnNext(self.value);
self.subscriber.OnCompleted();
self.observer.OnNext(self.value);
self.observer.OnCompleted();
}

public void Dispose()
Expand All @@ -62,33 +62,33 @@ public void Dispose()

internal class ImmediateScheduleReturn<T>(T value) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> subscriber)
protected override IDisposable SubscribeCore(Observer<T> observer)
{
subscriber.OnNext(value);
subscriber.OnCompleted();
observer.OnNext(value);
observer.OnCompleted();
return Disposable.Empty;
}
}

internal class ThreadPoolScheduleReturn<T>(T value) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> subscriber)
protected override IDisposable SubscribeCore(Observer<T> observer)
{
var method = new _Return(value, subscriber);
var method = new _Return(value, observer);
ThreadPool.UnsafeQueueUserWorkItem(method, preferLocal: false);
return method;
}

sealed class _Return(T value, Observer<T> subscriber) : IDisposable, IThreadPoolWorkItem
sealed class _Return(T value, Observer<T> observer) : IDisposable, IThreadPoolWorkItem
{
bool stop;

public void Execute()
{
if (stop) return;

subscriber.OnNext(value);
subscriber.OnCompleted();
observer.OnNext(value);
observer.OnCompleted();
}

public void Dispose()
Expand Down
22 changes: 11 additions & 11 deletions src/R3/Factories/ReturnOnCompleted.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,29 @@ public static Observable<T> ReturnOnCompleted<T>(Result result, TimeSpan dueTime

internal class ImmediateScheduleReturnOnCompleted<T>(Result result) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> subscriber)
protected override IDisposable SubscribeCore(Observer<T> observer)
{
subscriber.OnCompleted(result);
observer.OnCompleted(result);
return Disposable.Empty;
}
}

internal class ReturnOnCompleted<T>(Result complete, TimeSpan dueTime, TimeProvider timeProvider) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> subscriber)
protected override IDisposable SubscribeCore(Observer<T> observer)
{
var method = new _ReturnOnCompleted(complete, subscriber);
var method = new _ReturnOnCompleted(complete, observer);
method.Timer = timeProvider.CreateStoppedTimer(_ReturnOnCompleted.timerCallback, method);
method.Timer.InvokeOnce(dueTime);
return method;
}

sealed class _ReturnOnCompleted(Result result, Observer<T> subscriber) : IDisposable
sealed class _ReturnOnCompleted(Result result, Observer<T> observer) : IDisposable
{
public static readonly TimerCallback timerCallback = NextTick;

readonly Result result = result;
readonly Observer<T> subscriber = subscriber;
readonly Observer<T> observer = observer;

public ITimer? Timer { get; set; }

Expand All @@ -59,7 +59,7 @@ static void NextTick(object? state)
var self = (_ReturnOnCompleted)state!;
try
{
self.subscriber.OnCompleted(self.result);
self.observer.OnCompleted(self.result);
}
finally
{
Expand All @@ -77,22 +77,22 @@ public void Dispose()

internal class ThreadPoolScheduleReturnOnCompleted<T>(Result result) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> subscriber)
protected override IDisposable SubscribeCore(Observer<T> observer)
{
var method = new _ReturnOnCompleted(result, subscriber);
var method = new _ReturnOnCompleted(result, observer);
ThreadPool.UnsafeQueueUserWorkItem(method, preferLocal: false);
return method;
}

sealed class _ReturnOnCompleted(Result result, Observer<T> subscriber) : IDisposable, IThreadPoolWorkItem
sealed class _ReturnOnCompleted(Result result, Observer<T> observer) : IDisposable, IThreadPoolWorkItem
{
bool stop;

public void Execute()
{
if (stop) return;

subscriber.OnCompleted(result);
observer.OnCompleted(result);
}

public void Dispose()
Expand Down
12 changes: 6 additions & 6 deletions src/R3/Factories/Timer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@ public Timer(TimeSpan dueTime, TimeProvider timeProvider)
this.timeProvider = timeProvider;
}

protected override IDisposable SubscribeCore(Observer<Unit> subscriber)
protected override IDisposable SubscribeCore(Observer<Unit> observer)
{
var method = new _Timer(subscriber);
var method = new _Timer(observer);
method.Timer = timeProvider.CreateStoppedTimer(_Timer.timerCallback, method);
method.Timer.InvokeOnce(dueTime);
return method;
}

sealed class _Timer(Observer<Unit> subscriber) : IDisposable
sealed class _Timer(Observer<Unit> observer) : IDisposable
{
public static readonly TimerCallback timerCallback = NextTick;

Observer<Unit> subscriber = subscriber;
Observer<Unit> observer = observer;

public ITimer? Timer { get; set; }

Expand All @@ -42,8 +42,8 @@ static void NextTick(object? state)
var self = (_Timer)state!;
try
{
self.subscriber.OnNext(default);
self.subscriber.OnCompleted();
self.observer.OnNext(default);
self.observer.OnCompleted();
}
finally
{
Expand Down
Loading

0 comments on commit 2e50606

Please sign in to comment.