diff --git a/src/app/Edelstein.Application.Server/Extensions/ServiceCollectionExtensions.cs b/src/app/Edelstein.Application.Server/Extensions/ServiceCollectionExtensions.cs index 8c3485162..f49279a09 100644 --- a/src/app/Edelstein.Application.Server/Extensions/ServiceCollectionExtensions.cs +++ b/src/app/Edelstein.Application.Server/Extensions/ServiceCollectionExtensions.cs @@ -3,6 +3,7 @@ using Edelstein.Protocol.Gameplay; using Edelstein.Protocol.Network.Transports; using Edelstein.Protocol.Plugin; +using Edelstein.Protocol.Services.Dispatch; using Edelstein.Protocol.Services.Server; using MapsterMapper; using Microsoft.Extensions.Configuration; @@ -51,6 +52,7 @@ IConfiguration config subProvider.GetRequiredService(), version, p.GetRequiredService>(), + p.GetRequiredService(), subProvider.GetRequiredService() ); }); diff --git a/src/app/Edelstein.Application.Server/Services/SystemHostService.cs b/src/app/Edelstein.Application.Server/Services/SystemHostService.cs index b6f84ea51..54b6800d1 100644 --- a/src/app/Edelstein.Application.Server/Services/SystemHostService.cs +++ b/src/app/Edelstein.Application.Server/Services/SystemHostService.cs @@ -1,4 +1,5 @@ -using System.IO; +using System; +using System.IO; using System.Threading; using System.Threading.Tasks; using Edelstein.Application.Server.Bindings; @@ -6,10 +7,14 @@ using Edelstein.Protocol.Gameplay; using Edelstein.Protocol.Network.Transports; using Edelstein.Protocol.Plugin; +using Edelstein.Protocol.Services.Dispatch; +using Edelstein.Protocol.Services.Dispatch.Contracts; using Edelstein.Protocol.Services.Server; +using Grpc.Core; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; +using R3; namespace Edelstein.Application.Server.Services; @@ -20,12 +25,14 @@ public class SystemHostService( IServerInfo info, TransportVersion version, IPluginManager plugins, + IDispatchService dispatch, TContext context ) : IHostedService where TStageSystem : IStageSystem where TStageSystemUser : class, IStageSystemUser { private ITransportContext? Context { get; set; } + private IDisposable? Subscription { get; set; } public async Task StartAsync(CancellationToken cancellationToken) { @@ -37,6 +44,22 @@ public async Task StartAsync(CancellationToken cancellationToken) system, system ).Accept(info.Host, info.Port); + + var request = new DispatchServiceSubscribeRequest + { + ServerID = info.ID + }; + + Subscription = dispatch + .Subscribe(request) + .ToObservable() + .Select(i => + { + // TODO + Console.WriteLine(i); + return Task.CompletedTask; + }) + .Subscribe(); logger.LogSystemHostServiceStarted( info.ID, @@ -51,6 +74,7 @@ public async Task StopAsync(CancellationToken cancellationToken) if (Context != null) await Context.Close(); + Subscription?.Dispose(); logger.LogSystemHostServiceStopped(info.ID); diff --git a/src/common/Edelstein.Common.Services.Dispatch/DispatchService.Send.cs b/src/common/Edelstein.Common.Services.Dispatch/DispatchService.Send.cs index a1f32f255..e82772a29 100644 --- a/src/common/Edelstein.Common.Services.Dispatch/DispatchService.Send.cs +++ b/src/common/Edelstein.Common.Services.Dispatch/DispatchService.Send.cs @@ -1,4 +1,5 @@ -using System.Linq; +using System.Collections.Generic; +using System.Linq; using System.Threading.Tasks; using Edelstein.Protocol.Services.Dispatch; using Edelstein.Protocol.Services.Dispatch.Contracts; @@ -11,68 +12,32 @@ public partial class DispatchService { public async Task Send(DispatchServiceSendRequest request, CallContext context = default) { - switch (request.Info.TargetType) + var info = request.Info; + var targets = (await _repository.RetrieveAll()).AsEnumerable(); + + if (info.TargetServerID != null) + targets = targets.Where(t => t.ServerID == info.TargetServerID); + + if (info.TargetWorldID.HasValue) + targets = targets.Where(t => t.WorldID == info.TargetWorldID); + + if (info.TargetChannelID.HasValue) + targets = targets.Where(t => t.ChannelID == info.TargetChannelID); + + if (info.TargetCharacterID.HasValue) { - case DispatchTarget.All: - await Task.WhenAll( - (await _serverIdIndex.RetrieveAll()) - .Select(async e - => await e.Channel.WriteAsync(request.Info, context.CancellationToken))); - break; - case DispatchTarget.World: + var session = await sessions.GetByActiveCharacter(new SessionServiceGetByActiveCharacterRequest { - var entry = await _worldIdIndex.Retrieve(request.Info.TargetID); + CharacterID = info.TargetCharacterID.Value + }); - if (entry == null) - return new DispatchServiceResponse - { - Result = DispatchServiceResult.FailedUnknown - }; - - await entry.Channel.WriteAsync(request.Info, context.CancellationToken); - break; - } - case DispatchTarget.Channel: - { - var entry = await _channelIdIndex.Retrieve(request.Info.TargetID); - - if (entry == null) - return new DispatchServiceResponse - { - Result = DispatchServiceResult.FailedUnknown - }; - - await entry.Channel.WriteAsync(request.Info, context.CancellationToken); - break; - } - case DispatchTarget.Character: - { - var session = await sessions.GetByActiveCharacter(new SessionServiceGetByActiveCharacterRequest - { - CharacterID = request.Info.TargetID - }); - - if (session.Info != null) - { - var entry = await _serverIdIndex.Retrieve(session.Info.ServerID); - - if (entry == null) - return new DispatchServiceResponse - { - Result = DispatchServiceResult.FailedUnknown - }; - - await entry.Channel.WriteAsync(request.Info, context.CancellationToken); - } - break; - } - default: - return new DispatchServiceResponse - { - Result = DispatchServiceResult.FailedUnknown - }; + if (session.Info != null) + targets = targets.Where(t => t.ServerID == session.Info.ServerID); } + await Task.WhenAll(targets + .Select(async t + => await t.Channel.WriteAsync(request.Info))); return new DispatchServiceResponse { Result = DispatchServiceResult.Success diff --git a/src/common/Edelstein.Common.Services.Dispatch/DispatchService.Subscribe.cs b/src/common/Edelstein.Common.Services.Dispatch/DispatchService.Subscribe.cs index 611e0ccda..23c5dccfa 100644 --- a/src/common/Edelstein.Common.Services.Dispatch/DispatchService.Subscribe.cs +++ b/src/common/Edelstein.Common.Services.Dispatch/DispatchService.Subscribe.cs @@ -1,4 +1,7 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; +using System.Runtime.CompilerServices; +using System.Threading; using System.Threading.Channels; using Edelstein.Protocol.Services.Dispatch.Contracts; using ProtoBuf.Grpc; @@ -7,21 +10,35 @@ namespace Edelstein.Common.Services.Dispatch; public partial class DispatchService { - public async IAsyncEnumerable Subscribe(DispatchServiceSubscribeRequest request, CallContext context = default) + public IAsyncEnumerable Subscribe(DispatchServiceSubscribeRequest request, CallContext context = default) + => Subscribe(request, context.CancellationToken); + + private async IAsyncEnumerable Subscribe(DispatchServiceSubscribeRequest request, [EnumeratorCancellation] CancellationToken token) { + if (await _repository.Retrieve(request.ServerID) != null) + yield break; + var channel = Channel.CreateUnbounded(); + var subscription = new DispatchSubscription + { + Channel = channel, + ServerID = request.ServerID, + WorldID = request.WorldID, + ChannelID = request.ChannelID + }; - await _serverIdIndex.Insert(new DispatchServiceEntry(request.ServerID, channel)); - if (request.WorldID.HasValue) await _worldIdIndex.Insert(new DispatchServiceEntry(request.WorldID.Value, channel)); - if (request.ChannelID.HasValue) await _channelIdIndex.Insert(new DispatchServiceEntry(request.ChannelID.Value, channel)); + await _repository.Insert(subscription); - while (!context.CancellationToken.IsCancellationRequested) - yield return await channel.Reader.ReadAsync(context.CancellationToken); - - await _serverIdIndex.Delete(request.ServerID); - if (request.WorldID.HasValue) await _worldIdIndex.Delete(request.WorldID.Value); - if (request.ChannelID.HasValue) await _channelIdIndex.Delete(request.ChannelID.Value); - - channel.Writer.Complete(); + try + { + await foreach (var info in channel.Reader.ReadAllAsync(token)) + yield return info; + } + finally + { + Console.WriteLine("CLEANED"); + await _repository.Delete(subscription); + channel.Writer.Complete(); + } } } diff --git a/src/common/Edelstein.Common.Services.Dispatch/DispatchService.cs b/src/common/Edelstein.Common.Services.Dispatch/DispatchService.cs index 239199dcb..805b034c3 100644 --- a/src/common/Edelstein.Common.Services.Dispatch/DispatchService.cs +++ b/src/common/Edelstein.Common.Services.Dispatch/DispatchService.cs @@ -1,4 +1,5 @@ -using Edelstein.Protocol.Services.Dispatch; +using Edelstein.Common.Utilities.Repositories; +using Edelstein.Protocol.Services.Dispatch; using Edelstein.Protocol.Services.Session; namespace Edelstein.Common.Services.Dispatch; @@ -7,7 +8,5 @@ public partial class DispatchService( ISessionService sessions ) : IDispatchService { - private readonly DispatchServiceEntryRepository _serverIdIndex = new(); - private readonly DispatchServiceEntryRepository _worldIdIndex = new(); - private readonly DispatchServiceEntryRepository _channelIdIndex = new(); + private readonly Repository _repository = new(); } diff --git a/src/common/Edelstein.Common.Services.Dispatch/DispatchServiceEntry.cs b/src/common/Edelstein.Common.Services.Dispatch/DispatchServiceEntry.cs deleted file mode 100644 index ae3a6e23e..000000000 --- a/src/common/Edelstein.Common.Services.Dispatch/DispatchServiceEntry.cs +++ /dev/null @@ -1,10 +0,0 @@ -using System.Threading.Channels; -using Edelstein.Protocol.Services.Dispatch.Contracts; -using Edelstein.Protocol.Utilities.Repositories; - -namespace Edelstein.Common.Services.Dispatch; - -public record DispatchServiceEntry( - TKey ID, - ChannelWriter Channel -) : IRepositoryEntry; diff --git a/src/common/Edelstein.Common.Services.Dispatch/DispatchServiceEntryRepository.cs b/src/common/Edelstein.Common.Services.Dispatch/DispatchServiceEntryRepository.cs deleted file mode 100644 index aa9103639..000000000 --- a/src/common/Edelstein.Common.Services.Dispatch/DispatchServiceEntryRepository.cs +++ /dev/null @@ -1,6 +0,0 @@ -using Edelstein.Common.Utilities.Repositories; - -namespace Edelstein.Common.Services.Dispatch; - -public class DispatchServiceEntryRepository : Repository> - where TKey : notnull; diff --git a/src/common/Edelstein.Common.Services.Dispatch/DispatchSubscription.cs b/src/common/Edelstein.Common.Services.Dispatch/DispatchSubscription.cs new file mode 100644 index 000000000..f48a2c16d --- /dev/null +++ b/src/common/Edelstein.Common.Services.Dispatch/DispatchSubscription.cs @@ -0,0 +1,11 @@ +using System.Threading.Channels; +using Edelstein.Protocol.Services.Dispatch.Contracts; +using Edelstein.Protocol.Utilities.Repositories; + +namespace Edelstein.Common.Services.Dispatch; + +public record DispatchSubscription : DispatchServiceSubscribeRequest, IRepositoryEntry +{ + public string ID => ServerID; + public required ChannelWriter Channel { get; init; } +} diff --git a/src/protocol/Edelstein.Protocol.Services.Dispatch/Contracts/DispatchInfo.cs b/src/protocol/Edelstein.Protocol.Services.Dispatch/Contracts/DispatchInfo.cs index 9d2755162..cf813d96f 100644 --- a/src/protocol/Edelstein.Protocol.Services.Dispatch/Contracts/DispatchInfo.cs +++ b/src/protocol/Edelstein.Protocol.Services.Dispatch/Contracts/DispatchInfo.cs @@ -5,12 +5,10 @@ namespace Edelstein.Protocol.Services.Dispatch.Contracts; [DataContract] public record DispatchInfo : IDispatchInfo { - [DataMember(Order = 1)] - public required DispatchTarget TargetType { get; init; } + [DataMember(Order = 1)] public string? TargetServerID { get; init; } + [DataMember(Order = 2)] public int? TargetWorldID { get; init; } + [DataMember(Order = 3)] public int? TargetChannelID { get; init; } + [DataMember(Order = 4)] public int? TargetCharacterID { get; init; } - [DataMember(Order = 2)] - public required int TargetID { get; init; } - - [DataMember(Order = 3)] - public required byte[] Payload { get; init; } + [DataMember(Order = 5)] public required byte[] Payload { get; init; } } diff --git a/src/protocol/Edelstein.Protocol.Services.Dispatch/DispatchTarget.cs b/src/protocol/Edelstein.Protocol.Services.Dispatch/DispatchTarget.cs deleted file mode 100644 index 8a55f5179..000000000 --- a/src/protocol/Edelstein.Protocol.Services.Dispatch/DispatchTarget.cs +++ /dev/null @@ -1,13 +0,0 @@ -namespace Edelstein.Protocol.Services.Dispatch; - -public enum DispatchTarget -{ - All, - - Server, - - World, - Channel, - - Character -} diff --git a/src/protocol/Edelstein.Protocol.Services.Dispatch/IDispatchInfo.cs b/src/protocol/Edelstein.Protocol.Services.Dispatch/IDispatchInfo.cs index 1c33c2cd9..cb0de257d 100644 --- a/src/protocol/Edelstein.Protocol.Services.Dispatch/IDispatchInfo.cs +++ b/src/protocol/Edelstein.Protocol.Services.Dispatch/IDispatchInfo.cs @@ -2,8 +2,10 @@ public interface IDispatchInfo { - DispatchTarget TargetType { get; } - int TargetID { get; } + public string? TargetServerID { get; init; } + public int? TargetWorldID { get; init; } + public int? TargetChannelID { get; init; } + public int? TargetCharacterID { get; init; } byte[] Payload { get; } }