From 624b4ada9236d19035a1d951e2248fbd8f3e451d Mon Sep 17 00:00:00 2001 From: neuecc Date: Wed, 13 Dec 2023 19:17:57 +0900 Subject: [PATCH] do --- sandbox/ConsoleApp1/Program.cs | 172 ++++------------------------- src/R3/EventSystem.cs | 18 ++- src/R3/Factories/_EventFactory.cs | 129 ++++++++++------------ src/R3/FrameProvider.cs | 75 ------------- src/R3/ThreadSleepFrameProvider.cs | 76 +++++++++++++ 5 files changed, 172 insertions(+), 298 deletions(-) create mode 100644 src/R3/ThreadSleepFrameProvider.cs diff --git a/sandbox/ConsoleApp1/Program.cs b/sandbox/ConsoleApp1/Program.cs index bb1ed462..102356a4 100644 --- a/sandbox/ConsoleApp1/Program.cs +++ b/sandbox/ConsoleApp1/Program.cs @@ -7,36 +7,6 @@ using System.Reactive.Subjects; using System.Threading.Channels; using ZLogger; -//using System.Reactive.Disposables; -//using System.Reactive.Subjects; -//using System.Threading.Channels; - - - - - - - - -var disposables = Enumerable.Range(1, 100).Select(x => new TestDisposable()).ToArray(); -var composite = new System.Reactive.Disposables.CompositeDisposable(disposables); - -foreach (var item in disposables) -{ - composite.Remove(item); -} - - - - - - - - - - - - SubscriptionTracker.EnableTracking = true; SubscriptionTracker.EnableStackTrace = true; @@ -51,145 +21,43 @@ -var publisher = new Publisher(); - -var d = publisher - .Where(x => true) - .Select(x => - { - //if (x == 2) throw new Exception("e"); - return x; - }) - .Take(5) - .OnErrorAsComplete() - .Subscribe(x => - { - if (x == 2) throw new Exception("e"); - logger.ZLogInformation($"OnNext: {x}"); - }, e => - { - logger.ZLogInformation($"failure resume"); - }, - x => - { - //logger.ZLogInformation($"end:{x}"); - x.TryThrow(); - }); - -SubscriptionTracker.ForEachActiveTask(x => -{ - // logger.ZLogInformation($"{x.TrackingId,3}: {Environment.NewLine}{x.StackTrace.Replace("R2.", "").Replace("C:\\MyGit\\R2\\sandbox\\ConsoleApp1\\", "").Replace("C:\\MyGit\\R2\\src\\R2\\", "")}"); - - - // logger.ZLogInformation($"{x.TrackingId,3}: {x.FormattedType}"); -}); - -//publisher.PublishOnNext(1); -//publisher.PublishOnNext(2); -//publisher.PublishOnNext(3); - -//publisher.PublishOnErrorResume(new Exception("ERROR")); - -//publisher.PublishOnNext(4); -//publisher.PublishOnNext(5); -//publisher.PublishOnNext(6); -//publisher.PublishOnNext(7); - - -//d.Dispose(); - +var ct = new CancellationTokenSource(1000); +EventSystem.DefaultFrameProvider = new ThreadSleepFrameProvider(60); -var iiii = Enumerable.Range(1, 10).ElementAt(^12); -Console.WriteLine(iiii); - -// System.Reactive.Linq.Observable.Empty( - -var s = new System.Reactive.Subjects.Subject(); - -// Console.WriteLine($"Average: {Enumerable.Empty().Average()}"); - -// s.ToListObservable(); - -// Observable.Throw( -// s.Where( - -// new Result( - -// s.ObserveOn( -// Observable.FromEventPattern( - - - -//foreach (var item in typeof(System.Reactive.Linq.Observable).GetMethods().Select(x => x.Name).Distinct().OrderBy(x => x)) +//var t = new Thread(() => //{ -// if (item == "ToString" || item == "Equals" || item == "GetHashCode" || item == "GetType") +// while (true) // { -// continue; +// Console.WriteLine("loop"); Thread.Sleep(60); // } -// Console.WriteLine("- [ ] " + item); -//} - - - - - - - -var subject = new Subject(); - -subject.OnCompleted(); - -Console.WriteLine("Subscribe"); -subject.Subscribe(x => Console.WriteLine(x)); -subject.OnNext(99); - - - - -// subject.ForEachAsync( - - - - - - -//p.PublishOnNext(4); -//p.PublishOnNext(5); - -//Console.WriteLine("-------------------------"); - - -//SubscriptionTracker.ForEachActiveTask(x => -//{ -// Console.WriteLine($"{x.TrackingId,3}: {x.FirstLine}"); //}); +//t.IsBackground = true; +//t.Start(); +//var s = new NewThreadScheduler(_ => new Thread(() => { while (true) { Console.WriteLine("loop"); Thread.Sleep(60); } })); +//s.Schedule(() => Console.WriteLine("do once")); +//using var f = new ThreadSleepFrameProvider(60); -//Event.Return(10, TimeProvider.System) -// .WriteLine(); - -//Console.ReadLine(); - -//var a = new ReactiveProperty(100); -//var b = new ReactiveProperty(999); +var source = Event.EveryUpdate(ct.Token); -//a.CombineLatest(b, (x, y) => (x, y)).WriteLine(); +source.DoOnDisposed(() => {/*Console.WriteLine("DISPOSED")*/}).WriteLine(); -//a.Value = 3; -//a.Value = 4; -//b.Value = 99999; -//b.Value = 1111; +// TODO: if WaitAsync is using, not disposed??? +await source.WaitAsync(); +Console.WriteLine("Press Key to done."); -//Observable.event - - +Console.ReadLine(); +SubscriptionTracker.ForEachActiveTask(x => +{ + Console.WriteLine(x); +}); public static class Extensions { diff --git a/src/R3/EventSystem.cs b/src/R3/EventSystem.cs index ac2972ab..948d8348 100644 --- a/src/R3/EventSystem.cs +++ b/src/R3/EventSystem.cs @@ -8,10 +8,13 @@ public class EventSystem { public static ILogger Logger { get; set; } = NullLogger.Instance; + public static TimeProvider DefaultTimeProvider { get; set; } = TimeProvider.System; + public static FrameProvider DefaultFrameProvider { get; set; } = new NotSupportedFrameProvider(); + static Action unhandledException = WriteLog; // Prevent +=, use Set and Get method. - public static void SetUnhandledExceptionHandler(Action unhandledExceptionHandler) + public static void RegisterUnhandledExceptionHandler(Action unhandledExceptionHandler) { unhandledException = unhandledExceptionHandler; } @@ -34,6 +37,19 @@ static void WriteLog(Exception exception) } } +internal sealed class NotSupportedFrameProvider : FrameProvider +{ + public override long GetFrameCount() + { + throw new NotSupportedException("EventSystem.DefaultFrameProvider is not set. Please set EventSystem.DefaultFrameProvider to a valid FrameProvider(ThreadSleepFrameProvider, etc...)."); + } + + public override void Register(IFrameRunnerWorkItem callback) + { + throw new NotSupportedException("EventSystem.DefaultFrameProvider is not set. Please set EventSystem.DefaultFrameProvider to a valid FrameProvider(ThreadSleepFrameProvider, etc...)."); + } +} + internal static partial class SystemLoggerExtensions { [LoggerMessage(Trace, "Add subscription tracking TrackingId: {TrackingId}.")] diff --git a/src/R3/Factories/_EventFactory.cs b/src/R3/Factories/_EventFactory.cs index 52b882d0..32eaa6f7 100644 --- a/src/R3/Factories/_EventFactory.cs +++ b/src/R3/Factories/_EventFactory.cs @@ -26,76 +26,65 @@ public static partial class Event // AsUnitComplete // AsNeverComplete + // TODO: use SystemDefault + + public static Event EveryUpdate() + { + return new EveryUpdate(EventSystem.DefaultFrameProvider, CancellationToken.None); + } + + public static Event EveryUpdate(CancellationToken cancellationToken) + { + return new EveryUpdate(EventSystem.DefaultFrameProvider, cancellationToken); + } + + public static Event EveryUpdate(FrameProvider frameProvider) + { + return new EveryUpdate(frameProvider, CancellationToken.None); + } + + public static Event EveryUpdate(FrameProvider frameProvider, CancellationToken cancellationToken) + { + return new EveryUpdate(frameProvider, cancellationToken); + } +} - //public static Event EveryUpdate(FrameProvider frameProvider) - //{ - // return new R3.Factories.EveryUpdate(frameProvider); - //} - //public static CompletableEvent EveryUpdate(FrameProvider frameProvider, CancellationToken cancellationToken) - //{ - // return new R3.Factories.EveryUpdate(frameProvider); - //} -} -//internal sealed class EveryUpdate(FrameProvider frameProvider, CancellationToken cancellationToken) : Event -//{ -// protected override IDisposable SubscribeCore(Subscriber subscriber) -// { -// var runner = new EveryUpdateRunnerWorkItem(subscriber, cancellationToken); -// frameProvider.Register(runner); -// return runner; -// } - -// class EveryUpdateRunnerWorkItem(Subscriber subscriber, CancellationToken cancellationToken) : IFrameRunnerWorkItem, IDisposable -// { -// bool isDisposed; - -// public bool MoveNext(long frameCount) -// { -// if (isDisposed || cancellationToken.IsCancellationRequested) -// { -// return false; -// } - -// subscriber.OnNext(default); -// return true; -// } - -// public void Dispose() -// { -// isDisposed = true; -// } -// } -//} - -//internal sealed class EveryUpdate(FrameProvider frameProvider, CancellationToken cancellationToken) : Event -//{ -// protected override IDisposable SubscribeCore(Subscriber subscriber) -// { -// var runner = new EveryUpdateRunnerWorkItem(subscriber, cancellationToken); -// frameProvider.Register(runner); -// return runner; -// } - -// class EveryUpdateRunnerWorkItem(Subscriber subscriber, CancellationToken cancellationToken) : IFrameRunnerWorkItem, IDisposable -// { -// bool isDisposed; - -// public bool MoveNext(long frameCount) -// { -// if (isDisposed || cancellationToken.IsCancellationRequested) -// { -// return false; -// } - -// subscriber.OnNext(default); -// return true; -// } - -// public void Dispose() -// { -// isDisposed = true; -// } -// } -//} +internal sealed class EveryUpdate(FrameProvider frameProvider, CancellationToken cancellationToken) : Event +{ + protected override IDisposable SubscribeCore(Subscriber subscriber) + { + var runner = new EveryUpdateRunnerWorkItem(subscriber, cancellationToken); + frameProvider.Register(runner); + return runner; + } + + class EveryUpdateRunnerWorkItem(Subscriber subscriber, CancellationToken cancellationToken) : IFrameRunnerWorkItem, IDisposable + { + bool isDisposed; + + public bool MoveNext(long frameCount) + { + if (isDisposed) + { + return false; + } + + if (cancellationToken.IsCancellationRequested) + { + subscriber.OnCompleted(); + Dispose(); + return false; + } + + subscriber.OnNext(default); + return true; + } + + public void Dispose() + { + isDisposed = true; + } + } +} diff --git a/src/R3/FrameProvider.cs b/src/R3/FrameProvider.cs index d99e833e..11a3f7d2 100644 --- a/src/R3/FrameProvider.cs +++ b/src/R3/FrameProvider.cs @@ -17,78 +17,3 @@ public interface IFrameRunnerWorkItem // true, continue bool MoveNext(long frameCount); } - -public sealed class ThreadSleepFrameProvider : FrameProvider, IDisposable -{ - readonly int sleepMilliseconds; - bool disposed; - - long frameCount; - FreeListCore list; - Thread thread; - - public ThreadSleepFrameProvider() - : this(1) - { - } - - public ThreadSleepFrameProvider(int sleepMilliseconds) - { - this.sleepMilliseconds = sleepMilliseconds; - this.list = new FreeListCore(this); - this.thread = new Thread(Run); - this.thread.Start(); - } - - public override long GetFrameCount() - { - ObjectDisposedException.ThrowIf(disposed, typeof(ThreadSleepFrameProvider)); - return frameCount; - } - - public override void Register(IFrameRunnerWorkItem callback) - { - ObjectDisposedException.ThrowIf(disposed, typeof(ThreadSleepFrameProvider)); - list.Add(callback); - } - - public void Dispose() - { - disposed = true; - } - - void Run() - { - while (!disposed) - { - var span = list.AsSpan(); - for (int i = 0; i < span.Length; i++) - { - ref readonly var item = ref span[i]; - if (item != null) - { - try - { - if (!item.MoveNext(frameCount)) - { - list.Remove(i); - } - } - catch (Exception ex) - { - list.Remove(i); - try - { - OnUnhandledException(ex); - } - catch { } - } - } - } - - Thread.Sleep(sleepMilliseconds); - frameCount++; - } - list.Dispose(); - } -} diff --git a/src/R3/ThreadSleepFrameProvider.cs b/src/R3/ThreadSleepFrameProvider.cs new file mode 100644 index 00000000..a99a74d0 --- /dev/null +++ b/src/R3/ThreadSleepFrameProvider.cs @@ -0,0 +1,76 @@ +namespace R3; + +public sealed class ThreadSleepFrameProvider : FrameProvider, IDisposable +{ + readonly int sleepMilliseconds; + bool disposed; + + long frameCount; + FreeListCore list; + Thread thread; + + public ThreadSleepFrameProvider() + : this(1) + { + } + + public ThreadSleepFrameProvider(int sleepMilliseconds) + { + this.sleepMilliseconds = sleepMilliseconds; + this.list = new FreeListCore(this); + this.thread = new Thread(Run) { IsBackground = true }; // IsBackground = true, when main thread is terminated, this thread is also terminated. + this.thread.Start(); + } + + public override long GetFrameCount() + { + ObjectDisposedException.ThrowIf(disposed, typeof(ThreadSleepFrameProvider)); + return frameCount; + } + + public override void Register(IFrameRunnerWorkItem callback) + { + ObjectDisposedException.ThrowIf(disposed, typeof(ThreadSleepFrameProvider)); + list.Add(callback); + } + + public void Dispose() + { + disposed = true; + } + + void Run() + { + while (!disposed) + { + var span = list.AsSpan(); + for (int i = 0; i < span.Length; i++) + { + ref readonly var item = ref span[i]; + if (item != null) + { + try + { + if (!item.MoveNext(frameCount)) + { + list.Remove(i); + } + } + catch (Exception ex) + { + list.Remove(i); + try + { + OnUnhandledException(ex); + } + catch { } + } + } + } + + Thread.Sleep(sleepMilliseconds); + frameCount++; + } + list.Dispose(); + } +}